# File Connector

## 拷贝流程

![文件传输总览](https://1190388593-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-Lm4-doAUYYZgDcb_Jnz%2F-LtODl3VKnSvkyZAjFyL%2F-LtOGQCO5LWmvdC8gI9K%2FFile%20Connect%20Overview%20\(2\).png?alt=media\&token=528d3900-0e15-42d1-94ed-941c35d91ed5)

首先有个源文件，文件中保存有一些数据。启动Runtime，通过RESTful接口启动Source和Sink Connector，启动完成以后Source Connector会读取文件内容发送到RocketMQ指定的Topic中，Sink Connector会从这个Topic拉去数据写入指定文件中，这样就完成从源数据存储到目标存储之间的数据同步。

这里只是个简单的文件拷贝，其它数据源流程也是类似，通过RocketMQ Connect几乎可以实现任意数据源之间数据同步，包括异构数据源之间数据同步。

## 项目安装使用

### 环境依赖

1. **64bit JDK 1.8+;**
2. **Maven 3.2.x或以上版本;**
3. **两套RocketMQ集群环境;**&#x20;
4. **RocketMQ Runtime环境**

### **项目构建**

`file-connector` 是`runtime`的样例Connector，所以在构建`runtime`时已经构建过，具体参照前一节`runtime`构建过程。

#### [runtime项目构建](https://rocketmq-1.gitbook.io/rocketmq-connector/runtime-qs#xiang-mu-gou-jian)

### 项目安装

{% hint style="warning" %}
建议将connector插件放于一个公共的目录下，推荐为

**/usr/local/connector-plugins/**
{% endhint %}

* 将jar包拷贝到connector插件目录下

```bash
# 进入到rocketmq-connect-sample所在的目录
$ cd {rocketmq-external目录}/rocketmq-connect/rocketmq-connect-sample/
# 进入jar包所在的目录
$ cd target
# 将jar包拷贝到connector插件目录下
$ cp rocketmq-connect-sample-0.0.1-SNAPSHOT.jar /usr/local/connector-plugins/
```

* 启动runtime，具体参照前一节的项目启动

&#x20;    [runtime项目运行](https://rocketmq-1.gitbook.io/rocketmq-connector/runtime-qs#rocketmq-runtime-pei-zhi-wen-dang)

## 启动Source Connector

### 启动前准备

* 在`/opt/`文件下创建`source-file`文件夹(当然也可以自主选择合适的位置)

```bash
$ cd /opt/
$ mkdir source-file
$ cd source-file
```

* 创建一个`source-file.txt`用于测试

```bash
$ vim source-file.txt
# 在其中写入一些文字，并保存
```

* &#x20;创建Topic

```bash
# 创建Topic
# 注意这里的ip地址要换成自己服务器ip
$ sh mqadmin updateTopic -b 4xx.1xx.2xx.2xx:10911 -t fileTopic
....
create topic to 4xx.1xx.2xx.2xx:10911 success.

```

更多创建Topic内容参照 [创建Topic](https://rocketmq-1.gitbook.io/rocketmq-connector/runtime-qs#chuang-jian-topic)

### Source Connector启动

#### 启动模板

```typescript
# GET请求  
http://(worker ip):(port)/connectors/(connector name)?config={
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}   

```

#### 参数说明

| 参数             | 含义                                                                                | 能否为空  | 示例             |
| -------------- | --------------------------------------------------------------------------------- | ----- | -------------- |
| worker ip      | runtime上wokerde启动ip                                                               | false | 127.0.0.1      |
| port           | 端口号，可以在配置项修改，默认为8081                                                              | false | 8081           |
| connector name | <p>connector配置的唯一key，</p><p>除保留字`stopAll`外的任一字符串，</p><p>但是connector示例之间不可以重名 </p> | false | file-connector |

#### 配置说明

| key                     | description                                           | 能否为空  |
| ----------------------- | ----------------------------------------------------- | ----- |
| connector-class         | 实现Connector接口的类名称（包含包名）                               | false |
| filename                | 数据源文件名称                                               | false |
| task-class              | 实现SourceTask类名称（包含包名）                                 | false |
| topic                   | 同步文件数据所需topic                                         | false |
| update-timestamp        | 配置更新时间戳                                               | true  |
| source-record-converter | <p>用于将SourceDataEntry转换为byte\[]的转换器</p><p>实现的完整类名</p> | false |

#### 启动示例

```javascript
http://localhost:8081/connectors/fileConnectorSource?config={
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
```

在浏览器上使用上面请求（或者使用其他工具发送GET请求），可以看到浏览器返回`success`，并且控制台输出以下信息即表示成功

```bash
2019-11-05 13:34:49 INFO qtp726181440-29 - config: {
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}

```

## 启动Sink Connector

### 启动前准备

{% hint style="info" %}
注意：启动**Source Connector**之前已经在rocketmq集群上创建了Topic即**fileTopic，**&#x6240;以Sink Connector 不需要再创建Topic
{% endhint %}

### Sink Connector 启动

#### 启动模板

```typescript
 # GET请求  
 http://(your worker ip):(port)/connectors/(connector name)?config={
    "connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
    "topicNames":"fileTopic",
    "filename":"/opt/sink-files/sink-file.txt",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }
```

#### 参数说明

参照Source Connector [参数说明](#can-shu-shuo-ming)

#### 配置说明

| key                     | description                                           | 能否为空  |
| ----------------------- | ----------------------------------------------------- | ----- |
| connector-class         | 实现Connector接口的类名称（包含包名）                               | false |
| topicNames              | sink需要处理数据消息topics                                    | false |
| task-class              | 实现SourceTask类名称（包含包名）                                 | false |
| filename                | sink拉去的数据保存到文件                                        | false |
| update-timestamp        | 配置更新时间戳                                               | true  |
| source-record-converter | <p>用于将SourceDataEntry转换为byte\[]的转换器</p><p>实现的完整类名</p> | false |

#### 启动示例

```typescript
 http://localhost:8081/connectors/fileConnectorSink?config={
    "connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
    "topicNames":"fileTopic",
    "filename":"/opt/sink-files/sink-file.txt",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }
```

在浏览器上使用上面请求（或者使用其他工具发送GET请求），可以看到浏览器返回`success`，并且控制台输出以下信息即表示成功。

```bash
2019-11-05 18:48:51 INFO qtp726181440-26 - fileConnectorSink?config={
    "connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
    "topicNames":"fileTopic",
    "filename":"/opt/sink-files/sink-file.txt",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }

```

如果rocketmq集群没有过多的持久化消息，可以看到相同内容的文件`/opt/sink-files/sink-file.txt`
