rocketmq-connector
  • Message Connector 简介
  • 架构设计
    • Woker
    • Runtime
    • Connector
    • Routing Task
    • 消费位点同步
    • 消息路由监控
  • Quick Start
    • 前期准备
      • RocketMQ单机环境
      • RocketMQ集群搭建
    • RocketMQ Runtime
    • File Connector
  • RocketMQ Connect
    • RocketMQ Connect 简介
    • 应用场景
    • RocketMQ Runtime
      • 启动流程
      • Runtime 参数配置
      • RESTful 接口
      • Connector生命周期
      • 负载均衡
    • RocketMQ Console
      • Console 简介
      • 安装和使用
      • 使用指南
  • RocketMQ Connector
    • RocketMQ Replicator
      • Replicator简介
      • Replicator快速开始
      • Replicator 参数配置
    • Connect-Mongo
    • Connect-MySQL
    • Connect-Redis
  • 开发者中心
    • RELEASE NOTE
    • mqadmin 操作指南
    • 最佳实践
    • 工业实践
      • connector-mongo实践
  • 贡献者指北
    • 参与告知
    • 文档贡献指北
    • 本地调试向导
    • 一个简单Connector的实现
    • Contributor二三言
Powered by GitBook
On this page
  • 拷贝流程
  • 项目安装使用
  • 环境依赖
  • 项目构建
  • 项目安装
  • 启动Source Connector
  • 启动前准备
  • Source Connector启动
  • 启动Sink Connector
  • 启动前准备
  • Sink Connector 启动

Was this helpful?

  1. Quick Start

File Connector

一个简单的文件同步connector

PreviousRocketMQ RuntimeNextRocketMQ Connect 简介

Last updated 5 years ago

Was this helpful?

拷贝流程

首先有个源文件,文件中保存有一些数据。启动Runtime,通过RESTful接口启动Source和Sink Connector,启动完成以后Source Connector会读取文件内容发送到RocketMQ指定的Topic中,Sink Connector会从这个Topic拉去数据写入指定文件中,这样就完成从源数据存储到目标存储之间的数据同步。

这里只是个简单的文件拷贝,其它数据源流程也是类似,通过RocketMQ Connect几乎可以实现任意数据源之间数据同步,包括异构数据源之间数据同步。

项目安装使用

环境依赖

  1. 64bit JDK 1.8+;

  2. Maven 3.2.x或以上版本;

  3. 两套RocketMQ集群环境;

  4. RocketMQ Runtime环境

项目构建

file-connector 是runtime的样例Connector,所以在构建runtime时已经构建过,具体参照前一节runtime构建过程。

项目安装

建议将connector插件放于一个公共的目录下,推荐为

/usr/local/connector-plugins/

  • 将jar包拷贝到connector插件目录下

# 进入到rocketmq-connect-sample所在的目录
$ cd {rocketmq-external目录}/rocketmq-connect/rocketmq-connect-sample/
# 进入jar包所在的目录
$ cd target
# 将jar包拷贝到connector插件目录下
$ cp rocketmq-connect-sample-0.0.1-SNAPSHOT.jar /usr/local/connector-plugins/
  • 启动runtime,具体参照前一节的项目启动

启动Source Connector

启动前准备

  • 在/opt/文件下创建source-file文件夹(当然也可以自主选择合适的位置)

$ cd /opt/
$ mkdir source-file
$ cd source-file
  • 创建一个source-file.txt用于测试

$ vim source-file.txt
# 在其中写入一些文字,并保存
  • 创建Topic

# 创建Topic
# 注意这里的ip地址要换成自己服务器ip
$ sh mqadmin updateTopic -b 4xx.1xx.2xx.2xx:10911 -t fileTopic
....
create topic to 4xx.1xx.2xx.2xx:10911 success.

Source Connector启动

启动模板

# GET请求  
http://(worker ip):(port)/connectors/(connector name)?config={
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}   

参数说明

参数

含义

能否为空

示例

worker ip

runtime上wokerde启动ip

false

127.0.0.1

port

端口号,可以在配置项修改,默认为8081

false

8081

connector name

connector配置的唯一key,

除保留字`stopAll`外的任一字符串,

但是connector示例之间不可以重名

false

file-connector

配置说明

key

description

能否为空

connector-class

实现Connector接口的类名称(包含包名)

false

filename

数据源文件名称

false

task-class

实现SourceTask类名称(包含包名)

false

topic

同步文件数据所需topic

false

update-timestamp

配置更新时间戳

true

source-record-converter

用于将SourceDataEntry转换为byte[]的转换器

实现的完整类名

false

启动示例

http://localhost:8081/connectors/fileConnectorSource?config={
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}

在浏览器上使用上面请求(或者使用其他工具发送GET请求),可以看到浏览器返回success,并且控制台输出以下信息即表示成功

2019-11-05 13:34:49 INFO qtp726181440-29 - config: {
"connector-class":"org.apache.rocketmq.connect.file.FileSourceConnector",
"topic":"fileTopic",
"filename":"/opt/source-file/source-file.txt",
"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}

启动Sink Connector

启动前准备

注意:启动Source Connector之前已经在rocketmq集群上创建了Topic即fileTopic,所以Sink Connector 不需要再创建Topic

Sink Connector 启动

启动模板

 # GET请求  
 http://(your worker ip):(port)/connectors/(connector name)?config={
    "connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
    "topicNames":"fileTopic",
    "filename":"/opt/sink-files/sink-file.txt",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }

参数说明

配置说明

key

description

能否为空

connector-class

实现Connector接口的类名称(包含包名)

false

topicNames

sink需要处理数据消息topics

false

task-class

实现SourceTask类名称(包含包名)

false

filename

sink拉去的数据保存到文件

false

update-timestamp

配置更新时间戳

true

source-record-converter

用于将SourceDataEntry转换为byte[]的转换器

实现的完整类名

false

启动示例

 http://localhost:8081/connectors/fileConnectorSink?config={
    "connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
    "topicNames":"fileTopic",
    "filename":"/opt/sink-files/sink-file.txt",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }

在浏览器上使用上面请求(或者使用其他工具发送GET请求),可以看到浏览器返回success,并且控制台输出以下信息即表示成功。

2019-11-05 18:48:51 INFO qtp726181440-26 - fileConnectorSink?config={
    "connector-class":"org.apache.rocketmq.connect.file.FileSinkConnector",
    "topicNames":"fileTopic",
    "filename":"/opt/sink-files/sink-file.txt",
    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }

如果rocketmq集群没有过多的持久化消息,可以看到相同内容的文件/opt/sink-files/sink-file.txt

更多创建Topic内容参照

参照Source Connector

参数说明
文件传输总览
runtime项目构建
runtime项目运行
创建Topic