# File Connector

## 拷贝流程

![文件传输总览](/files/-LtOGQCO5LWmvdC8gI9K)

首先有个源文件，文件中保存有一些数据。启动Runtime，通过RESTful接口启动Source和Sink Connector，启动完成以后Source Connector会读取文件内容发送到RocketMQ指定的Topic中，Sink Connector会从这个Topic拉去数据写入指定文件中，这样就完成从源数据存储到目标存储之间的数据同步。

这里只是个简单的文件拷贝，其它数据源流程也是类似，通过RocketMQ Connect几乎可以实现任意数据源之间数据同步，包括异构数据源之间数据同步。

## 项目安装使用

### 环境依赖

1. **64bit JDK 1.8+;**
2. **Maven 3.2.x或以上版本;**
3. **两套RocketMQ集群环境;**&#x20;
4. **RocketMQ Runtime环境**

### **项目构建**

`file-connector` 是`runtime`的样例Connector，所以在构建`runtime`时已经构建过，具体参照前一节`runtime`构建过程。

#### [runtime项目构建](/rocketmq-connector/quick-start/runtime-qs.md#xiang-mu-gou-jian)

### 项目安装

{% hint style="warning" %}
建议将connector插件放于一个公共的目录下，推荐为

**/usr/local/connector-plugins/**
{% endhint %}

* 将jar包拷贝到connector插件目录下

```bash
# 进入到rocketmq-connect-sample所在的目录
$ cd {rocketmq-external目录}/rocketmq-connect/rocketmq-connect-sample/
# 进入jar包所在的目录
$ cd target
# 将jar包拷贝到connector插件目录下
$ cp rocketmq-connect-sample-0.0.1-SNAPSHOT.jar /usr/local/connector-plugins/
```

* 启动runtime，具体参照前一节的项目启动

&#x20;    [runtime项目运行](/rocketmq-connector/quick-start/runtime-qs.md#rocketmq-runtime-pei-zhi-wen-dang)

## 启动Source Connector

### 启动前准备

* 在`/opt/`文件下创建`source-file`文件夹(当然也可以自主选择合适的位置)

```bash
$ cd /opt/
$ mkdir source-file
$ cd source-file
```

* 创建一个`source-file.txt`用于测试

```bash
$ vim source-file.txt
# 在其中写入一些文字，并保存
```

* &#x20;创建Topic

```bash
# 创建Topic
# 注意这里的ip地址要换成自己服务器ip
$ sh mqadmin updateTopic -b 4xx.1xx.2xx.2xx:10911 -t fileTopic
....
create topic to 4xx.1xx.2xx.2xx:10911 success.

```

更多创建Topic内容参照 [创建Topic](/rocketmq-connector/quick-start/runtime-qs.md#chuang-jian-topic)

### Source Connector启动

#### 启动模板

```typescript
# GET请求  
http://(worker ip):(port)/connectors/(connector name)?config={
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}   

```

#### 参数说明

| 参数             | 含义                                                                                | 能否为空  | 示例             |
| -------------- | --------------------------------------------------------------------------------- | ----- | -------------- |
| worker ip      | runtime上wokerde启动ip                                                               | false | 127.0.0.1      |
| port           | 端口号，可以在配置项修改，默认为8081                                                              | false | 8081           |
| connector name | <p>connector配置的唯一key，</p><p>除保留字`stopAll`外的任一字符串，</p><p>但是connector示例之间不可以重名 </p> | false | file-connector |

#### 配置说明

| key                     | description                                           | 能否为空  |
| ----------------------- | ----------------------------------------------------- | ----- |
| connector-class         | 实现Connector接口的类名称（包含包名）                               | false |
| filename                | 数据源文件名称                                               | false |
| task-class              | 实现SourceTask类名称（包含包名）                                 | false |
| topic                   | 同步文件数据所需topic                                         | false |
| update-timestamp        | 配置更新时间戳                                               | true  |
| source-record-converter | <p>用于将SourceDataEntry转换为byte\[]的转换器</p><p>实现的完整类名</p> | false |

#### 启动示例

```javascript
http://localhost:8081/connectors/fileConnectorSource?config={
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
```

在浏览器上使用上面请求（或者使用其他工具发送GET请求），可以看到浏览器返回`success`，并且控制台输出以下信息即表示成功

```bash
2019-11-05 13:34:49 INFO qtp726181440-29 - config: {
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}

```

## 启动Sink Connector

### 启动前准备

{% hint style="info" %}
注意：启动**Source Connector**之前已经在rocketmq集群上创建了Topic即**fileTopic，**&#x6240;以Sink Connector 不需要再创建Topic
{% endhint %}

### Sink Connector 启动

#### 启动模板

```typescript
 # GET请求  
 http://(your worker ip):(port)/connectors/(connector name)?config={
    "connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
    "topicNames":"fileTopic",
    "filename":"/opt/sink-files/sink-file.txt",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }
```

#### 参数说明

参照Source Connector [参数说明](/rocketmq-connector/quick-start/file-connector.md#can-shu-shuo-ming)

#### 配置说明

| key                     | description                                           | 能否为空  |
| ----------------------- | ----------------------------------------------------- | ----- |
| connector-class         | 实现Connector接口的类名称（包含包名）                               | false |
| topicNames              | sink需要处理数据消息topics                                    | false |
| task-class              | 实现SourceTask类名称（包含包名）                                 | false |
| filename                | sink拉去的数据保存到文件                                        | false |
| update-timestamp        | 配置更新时间戳                                               | true  |
| source-record-converter | <p>用于将SourceDataEntry转换为byte\[]的转换器</p><p>实现的完整类名</p> | false |

#### 启动示例

```typescript
 http://localhost:8081/connectors/fileConnectorSink?config={
    "connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
    "topicNames":"fileTopic",
    "filename":"/opt/sink-files/sink-file.txt",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }
```

在浏览器上使用上面请求（或者使用其他工具发送GET请求），可以看到浏览器返回`success`，并且控制台输出以下信息即表示成功。

```bash
2019-11-05 18:48:51 INFO qtp726181440-26 - fileConnectorSink?config={
    "connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
    "topicNames":"fileTopic",
    "filename":"/opt/sink-files/sink-file.txt",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }

```

如果rocketmq集群没有过多的持久化消息，可以看到相同内容的文件`/opt/sink-files/sink-file.txt`


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://rocketmq-1.gitbook.io/rocketmq-connector/quick-start/file-connector.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
