File Connector
一个简单的文件同步connector
拷贝流程

首先有个源文件,文件中保存有一些数据。启动Runtime,通过RESTful接口启动Source和Sink Connector,启动完成以后Source Connector会读取文件内容发送到RocketMQ指定的Topic中,Sink Connector会从这个Topic拉去数据写入指定文件中,这样就完成从源数据存储到目标存储之间的数据同步。
这里只是个简单的文件拷贝,其它数据源流程也是类似,通过RocketMQ Connect几乎可以实现任意数据源之间数据同步,包括异构数据源之间数据同步。
项目安装使用
环境依赖
64bit JDK 1.8+;
Maven 3.2.x或以上版本;
两套RocketMQ集群环境;
RocketMQ Runtime环境
项目构建
file-connector
是runtime
的样例Connector,所以在构建runtime
时已经构建过,具体参照前一节runtime
构建过程。
项目安装
建议将connector插件放于一个公共的目录下,推荐为
/usr/local/connector-plugins/
将jar包拷贝到connector插件目录下
# 进入到rocketmq-connect-sample所在的目录
$ cd {rocketmq-external目录}/rocketmq-connect/rocketmq-connect-sample/
# 进入jar包所在的目录
$ cd target
# 将jar包拷贝到connector插件目录下
$ cp rocketmq-connect-sample-0.0.1-SNAPSHOT.jar /usr/local/connector-plugins/
启动runtime,具体参照前一节的项目启动
启动Source Connector
启动前准备
在
/opt/
文件下创建source-file
文件夹(当然也可以自主选择合适的位置)
$ cd /opt/
$ mkdir source-file
$ cd source-file
创建一个
source-file.txt
用于测试
$ vim source-file.txt
# 在其中写入一些文字,并保存
创建Topic
# 创建Topic
# 注意这里的ip地址要换成自己服务器ip
$ sh mqadmin updateTopic -b 4xx.1xx.2xx.2xx:10911 -t fileTopic
....
create topic to 4xx.1xx.2xx.2xx:10911 success.
更多创建Topic内容参照 创建Topic
Source Connector启动
启动模板
# GET请求
http://(worker ip):(port)/connectors/(connector name)?config={
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
参数说明
参数
含义
能否为空
示例
worker ip
runtime上wokerde启动ip
false
127.0.0.1
port
端口号,可以在配置项修改,默认为8081
false
8081
connector name
connector配置的唯一key,
除保留字`stopAll`外的任一字符串,
但是connector示例之间不可以重名
false
file-connector
配置说明
key
description
能否为空
connector-class
实现Connector接口的类名称(包含包名)
false
filename
数据源文件名称
false
task-class
实现SourceTask类名称(包含包名)
false
topic
同步文件数据所需topic
false
update-timestamp
配置更新时间戳
true
source-record-converter
用于将SourceDataEntry转换为byte[]的转换器
实现的完整类名
false
启动示例
http://localhost:8081/connectors/fileConnectorSource?config={
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
在浏览器上使用上面请求(或者使用其他工具发送GET请求),可以看到浏览器返回success
,并且控制台输出以下信息即表示成功
2019-11-05 13:34:49 INFO qtp726181440-29 - config: {
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
启动Sink Connector
启动前准备
Sink Connector 启动
启动模板
# GET请求
http://(your worker ip):(port)/connectors/(connector name)?config={
"connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
"topicNames":"fileTopic",
"filename":"/opt/sink-files/sink-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
参数说明
参照Source Connector 参数说明
配置说明
key
description
能否为空
connector-class
实现Connector接口的类名称(包含包名)
false
topicNames
sink需要处理数据消息topics
false
task-class
实现SourceTask类名称(包含包名)
false
filename
sink拉去的数据保存到文件
false
update-timestamp
配置更新时间戳
true
source-record-converter
用于将SourceDataEntry转换为byte[]的转换器
实现的完整类名
false
启动示例
http://localhost:8081/connectors/fileConnectorSink?config={
"connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
"topicNames":"fileTopic",
"filename":"/opt/sink-files/sink-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
在浏览器上使用上面请求(或者使用其他工具发送GET请求),可以看到浏览器返回success
,并且控制台输出以下信息即表示成功。
2019-11-05 18:48:51 INFO qtp726181440-26 - fileConnectorSink?config={
"connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
"topicNames":"fileTopic",
"filename":"/opt/sink-files/sink-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
如果rocketmq集群没有过多的持久化消息,可以看到相同内容的文件/opt/sink-files/sink-file.txt
Last updated
Was this helpful?