rocketmq-connector
  • Message Connector 简介
  • 架构设计
    • Woker
    • Runtime
    • Connector
    • Routing Task
    • 消费位点同步
    • 消息路由监控
  • Quick Start
    • 前期准备
      • RocketMQ单机环境
      • RocketMQ集群搭建
    • RocketMQ Runtime
    • File Connector
  • RocketMQ Connect
    • RocketMQ Connect 简介
    • 应用场景
    • RocketMQ Runtime
      • 启动流程
      • Runtime 参数配置
      • RESTful 接口
      • Connector生命周期
      • 负载均衡
    • RocketMQ Console
      • Console 简介
      • 安装和使用
      • 使用指南
  • RocketMQ Connector
    • RocketMQ Replicator
      • Replicator简介
      • Replicator快速开始
      • Replicator 参数配置
    • Connect-Mongo
    • Connect-MySQL
    • Connect-Redis
  • 开发者中心
    • RELEASE NOTE
    • mqadmin 操作指南
    • 最佳实践
    • 工业实践
      • connector-mongo实践
  • 贡献者指北
    • 参与告知
    • 文档贡献指北
    • 本地调试向导
    • 一个简单Connector的实现
    • Contributor二三言
Powered by GitBook
On this page
  • runtime配置参数说明
  • runtime支持JVM参数说明
  • Runtime config 参数详细说明
  • allocTaskStrategy说明
  • Runtime JVM 参数详细说明
  • VirtualNode
  • consistentHashFunc

Was this helpful?

  1. RocketMQ Connect
  2. RocketMQ Runtime

Runtime 参数配置

Runtime 主要参数配置

runtime配置参数说明

key

nullable

default

description

workerId

false

DEFAULT_WORKER_1

集群节点唯一标识

namesrvAddr

false

RocketMQ Name Server地址列表,多个NameServer地址用分号隔开

httpPort

false

8081

runtime提供restful接口服务端口

pluginPaths

false

source或者sink目录,启动runttime时加载

storePathRootDir

true

(user.home)/connectorStore

持久化文件保存目录

positionPersistInterval

true

20s

source端持久化position数据间隔

offsetPersistInterval

true

20s

sink端持久化offset数据间隔

configPersistInterval

true

20s

集群中配置信息持久化间隔

rmqProducerGroup

true

defaultProducerGroup

Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组

rmqConsumerGroup

true

defaultConsumerGroup

Consumer组名,多个Consumer如果属于一个应用,发送同样的消息,则应该将它们归为同一组

maxMessageSize

true

4MB

RocketMQ最大消息大小

operationTimeout

true

3s

Producer发送消息超时时间

rmqMaxRedeliveryTimes

true

最大重新消费次数

rmqMessageConsumeTimeout

true

3s

Consumer超时时间

rmqMaxConsumeThreadNums

true

32

Consumer客户端最大线程数

rmqMinConsumeThreadNums

true

1

Consumer客户端最小线程数

allocTaskStrategy

true

org.apache.rocketmq.

connect.runtime.service.strategy.

DefaultAllocateConn

AndTaskStrategy

负载均衡实现类

注:以上表格为表达方便,对表格内数据进行了换行。

runtime支持JVM参数说明

key

nullable

default

description

rocketmq.runtime.cluster.rebalance.waitInterval

true

20s

负载均衡间隔

rocketmq.runtime.max.message.size

true

4M

Runtime限制最大消息大小

virtualNode

true

1

一致性hash负载均衡的虚拟节点数

consistentHashFunc

true

MD5Hash

一致性hash负载均衡算法实现类

注:以上表格为表达方便,对表格内数据进行了换行。

Runtime config 参数详细说明

allocTaskStrategy说明

该参数默认可省略,这是一个可选参数,目前选项如下:

  • 默认值

org.apache.rocketmq.connect.runtime.service.strategy.DefaultAllocateConnAndTaskStrategy
  • 一致性Hash

org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategyByConsistentHash

Runtime JVM 参数详细说明

VirtualNode

一致性hash中虚拟节点数

consistentHashFunc

hash算法具体实现类,可以自己实现,在后续版本中会增加更多策略,该类应该实现

org.apache.rocketmq.common.consistenthash.HashFunction;

package org.apache.rocketmq.common.consistenthash;

public interface HashFunction {
    long hash(String var1);
}

默认情况下采用的是MD5Hash算法

private static class MD5Hash implements HashFunction {
        MessageDigest instance;

        public MD5Hash() {
            try {
                this.instance = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException var2) {
            }

        }

        public long hash(String key) {
            this.instance.reset();
            this.instance.update(key.getBytes());
            byte[] digest = this.instance.digest();
            long h = 0L;

            for(int i = 0; i < 4; ++i) {
                h <<= 8;
                h |= (long)(digest[i] & 255);
            }

            return h;
        }
    }
Previous启动流程NextRESTful 接口

Last updated 5 years ago

Was this helpful?