Kafka MirrorMaker是Kafka官网提供的跨数据中心流数据同步方案,其实现原理是通过从Source集群消费消息,然后将消息生产到Target集群从而完成数据迁移操作。用户只需要通过简单的consumer配置和producer配置,启动MirrorMaker,即可实现实时数据同步。

本文章主要聚焦跑通Kafka MirrorMaker数据迁移流程。实验中的Source Kafka版本为2.12,基于本地机器搭建。Sink集群为火山引擎Kafka中间件

步骤1:本地Kafka创建测试Topic

以下我们将以名称为“testTopic”的Topic为例演示。

创建Topic命令:

kafka-topics.sh \
--create \
--zookeeper localhost:2181 \ #根据实际情况填写
--replication-factor 1 \
--partitions 1 \
--topic testTopic

创建成功后可以通过以下命令对topic进行检查

bin/kafka-topics.sh \
--list \
--zookeeper localhost:2181 #根据实际情况填写

执行如下:

步骤2:同步创建火山kafka Topic

创建Kafka实例后,在“Topic管理”页签下,创建同名Topic。注意分区数最好保持与原集群分区保持一致。

步骤3:下载SASL_SSL证书

在下载SASL_SSL证书前,先确认用于访问的用户是否已经存在:

如果未建立,请先创建用户。

确认完成后,在“实例管理”页签下下载SASL_SSL证书

关于使用SASL_SSL可参考:文档中心-火山引擎

步骤4:修改Mirror Maker 生产者/消费者配置

consumer生产者的配置(consumer.properties)一般在kafka目录下的config目录下。修改如下:

bootstrap.servers=localhost:9092 # 需要根据实际情况修改
group.id=test-consumer-group # 需要根据实际情况修改

同样,producer消费者的配置(producer.properties)也在此config目录下,该文件有较大修改:

bootstrap.servers= SASL接入点(公网) # 需要根据实际情况修改

接入点获取途径如下:

外网访问,需要添加SASL认证信息:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="替换用户名" password="替换密码";
sasl.mechanism=PLAIN security.protocol=SASL_SSL
ssl.truststore.location=/xxx/Kafka.client.truststore.jks #根据实际情况替换证书路径
ssl.truststore.password=KafkaTrustStorePass ssl.endpoint.identification.algorithm=

步骤5:启动MirrorMaker

文件配置修改完成后,通过下方命令启动MirrorMaker实现本地Kafka与火山Kafka联通

kafka-mirror-maker.sh \
--consumer.config ../config/consumer.properties \ #根据实际情况指定consumer.properteis
--producer.config ../config/producer.properties \ #根据实际情况指定
producer.properties --whitelist "testTopic"

步骤6:启动Producer生产数据

我们启动生产者对测试Topic进行消息生产

kafka-console-producer.sh \
--broker-list localhost:9092 \ #根据实际情况填写
--topic testTopic

步骤7:数据同步结果检查

在火山引擎Kafka实例“消息查询”页签,我们可以查询testTopic最近的数据,发现是有数据写入的。此时数量上和我们写入的数量一致。

由于火山对下载的消息进行了 Base64 编码传输,因此很难确认消息是否正确性、完整性。

可以通过客户端消费如下(客户端下载与使用可参考:文档中心-火山引擎):

经过检查,消息与发送端数据保持一致。

可能出现的问题:

(1)MirrorMakker启动报错:java.lang.OutOfMemoryError: Java heap space

解决方法:修改 /bin/kafka-run-class.sh,找到 Memory options处,默认设置是256M,将其修改为如下值:

if [ -z "$KAFKA_HEAP_OPTS" ]; then KAFKA_HEAP_OPTS="-Xmx1024M -Xms512M" fi

保存退出。

(2)Error while fetching metadata with correlation

修改 config\server.properties,修改内容如下:

listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092

重启kafka,启动程序进行入库操作

MirrorMaker的配置说明:

--consumer.config # 消费者配置,详情参考kafka consumer配置

--producer.config # 生产者配置,详情参考kafka producer配置

--whitelist #需要mirror的topic,支持Java正则表达式,例如'ABTestMsg,AppColdStartMsg’

--blacklist #不需要拷贝的topic,支持Java正则表达式

--num.producers #producer数量,默认为1

--num.streams #consumer数量,默认为1

--queue.size #consumer和producer之间缓存的queue size,默认10000

详情见:Kafka mirroring (MirrorMaker) - Apache Kafka - Apache Software Foundation

其他注意事项:

1)whitelist和blacklist支持正则表达式。比如需要包含两个topic可以这样写,--whitelist 'A|B' or --whitelist 'A,B' ,或者想迁移所有topic可以这样写 --whitelist '*'

2)注意在迁移之前创建好相关topic以及规划好partition数量。

3)老版本和新版本迁移主要考虑consumer和producer的兼容性

4)如果允许的话,建议将MirrorMaker部署在目标集群内,这是因为如果一旦发生网络分区,消费者与源集群断开连接比生产者与目标集群断开连接要安全。如果消费者断开连接,那么只是当前读取不到数据,但是数据仍然在源集群内,并不会丢失;而生产者断开连接,MirrorMaker便生产不了数据,如果MirrorMaker本身处理不当,可能会丢失数据。

5)开始之前配置好限流,防止影响原来集群的正常工作。

基于MirrorMaker与火山引擎的Kafka数据同步相关推荐

  1. 火山引擎正式发布大数据研发治理套件

    在数字化程度日益加深的今天,数据对企业增长的作用越来越重要.如何使用并发挥数据的价值,是当下企业所面临的主要问题. 然而企业的数字化转型并非一蹴而就,需要在组织.业务流程和技术等方面持续投入.调研发现 ...

  2. 客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中

    目录 将消费的kafka数据同步到Kudu中 一.导入表名映射关系类

  3. 智能学习灯赛道竞争日趋激烈 火山引擎VeDI用数据技术助力打造新优势

    更多技术交流.求职机会,欢迎关注字节跳动数据平台微信公众号,并进入官方交流群 智能学习灯的赛道正变得越来越拥挤. 2021 年 3 月 2 日,腾讯教育联合暗物智能科技联合发布"AILA 智 ...

  4. 微信小程序接入火山引擎埋点数据

    1,文档中心-火山引擎 2,文档中心-火山引擎 3,文档中心-火山引擎 文档中心-火山引擎验证埋点.小程序实时扫码验证 上面的链接是三个最重点的部分. 首先按照1的步骤安装SDK.npm instal ...

  5. Clickhouse Engine kafka 将kafka数据同步clickhouse

    本篇文章转自:https://blog.csdn.net/weixin_41461992/article/details/106790507 起因 由于需要做各种数据库摆渡到kafka的组件研究. 其 ...

  6. kafka数据同步Elasticsearch深入详解

    1.kafka同步到Elasticsearch方式? 目前已知常用的方式有四种: 1)logstash_input_kafka插件: 缺点:不稳定(ES中文社区讨论) 2)spark stream同步 ...

  7. 30.kafka数据同步Elasticsearch深入详解(ES与Kafka同步)

    1.kafka同步到Elasticsearch方式? 目前已知常用的方式有四种:  1)logstash_input_kafka插件:  缺点:不稳定(ES中文社区讨论)  2)spark strea ...

  8. Kafka-connect将Kafka数据同步到Mysql

    一.背景信息 Kafka Connect主要用于将数据流输入和输出消息队列Kafka版.Kafka Connect主要通过各种Source Connector的实现,将数据从第三方系统输入到Kafka ...

  9. GBase 8a Kafka 数据同步

    数据同步系统通过 Oracle Goldengate.GBase RTSync 等工具复制 Oracle.GBase 8s等数据库的业务数据到 GBase 8a MPP Cluster,为了应对业务系 ...

最新文章

  1. Ehcache缓存配置
  2. redistemplate set方法_spring boot整合redis ---- RedisTemplate 三分钟快速入门
  3. Linux Vim多窗口编辑,Vim打开多个窗口方法详解
  4. Echarts地图添加自定义图标
  5. *【HDU - 2819】Swap(二分图匹配,输出路径)(待证明:是否是最少交换次数?)
  6. 春节快到了,来写个烟花动效吧
  7. Swift解决【闭包引起的循环强引用】
  8. 中国AI英雄风云榜十位领军人揭晓 | 网易2018中国年度AI领域人物评选
  9. kubernetes资源对象--pod和job
  10. java事件处理的题目_介绍一下java的事件机制
  11. Activiz 使用笔记-4 数据源(2)
  12. FreeSWITCH 初步
  13. 空间回归分析笔记3——OLS、GWR输出结果的意义
  14. chrome安装JSONview插件,即可在浏览中查看json文件
  15. 消失的遗传力--wiki
  16. 什么是ISP(网络业务提供商)?
  17. maven打包报错failed: Unable to find a single main class from the following candidates []
  18. 实例分析神经网络传播过程
  19. Android获取视频文件时长
  20. U-boot启动过程之——relocate_code分析

热门文章

  1. 推荐系统的评价指标及其Python实现
  2. Jeston NX 学习笔记(一)欢迎咨询
  3. uniapp字体图标的使用步骤详细版【前端开发】
  4. 【LaTeX应用】绘制椭圆曲线图形
  5. Python range函数
  6. java 橡皮擦_js canvas实现橡皮擦效果
  7. 真正优秀的质量工程师,都有这些特质
  8. 万字长文解读电商搜索——如何让你买得又快又好
  9. python ansys接口_以高效的方式从Python运行ANSYS Mechanical APDL
  10. opencv 学习笔记——读入一张图片,并将其转为灰度图