Kafka Connect

  • 概念
  • 特点
  • 组件
    • Connectors
    • Tasks
    • Workers
    • Converters
    • Transforms
  • Dead Letter Queue
  • rebalance触发场景
  • 参考文章

概念

Kafka Connect是一个用于将数据流输入和输出Kafka的框架。Confluent平台附带了几个内置connector,可以使用这些connector进行关系数据库或HDFS等常用系统到kafka的数据传输

特点

  1. 数据从数据源读出或写入时延低;
  2. 从从不同数据源获取数据或将数据写入到不同数据源(如:客户端、数据库、HDFS、静态文件等等)

组件

Connectors

通过管理task来协调数据流的高级抽象。
可分为两种connectors:

  • Source connector
    源连接器可以从多种渠道(如:数据库、静态文件、HDFS客户端等)拉取数据到kafka topic中
  • Sink connector
    宿连接器将topic中的数据push到多种目的端消费。将Kafka主题中的数据传递到Elasticsearch等二级索引中,或Hadoop等批处理系统中,用于离线分析。

Tasks

如何将数据复制到Kafka或从Kafka复制数据的实现。实际进行数据传输的单元,和连接器一样同样分为 Source和Sink
Task是Connect数据模型中的主要处理数据的角色。每个connector实例协调一组实际复制数据的task。通过允许connector将单个作业分解为多个task,Kafka Connect提供了内置的对并行性和可伸缩数据复制的支持,只需很少的配置。这些任务没有存储任何状态。任务状态存储在Kafka中的特殊主题config.storage.topic和status.storage.topic中。因此,可以在任何时候启动、停止或重新启动任务,以提供弹性的、可伸缩的数据管道。

Workers

执行Connector和Task的运行进程。Connectors 和Task 属于逻辑单元,而Worker 是实际运行逻辑单元的进程

Standalone Workers
Standalone模式是最简单的模式,用单一进程负责执行所有connector和task。适用于特定场景,如收集主机日志

Distributed Workers
分布式模式为Kafka Connect提供了可扩展性和自动容错能力,使用更广。在分布式模式下,相同group.id的Worker,会自动组成集群。当新增Worker,或者有Worker挂掉时,其余的worker将检测到这一点,集群会自动协调分配所有的Connector 和 Task(这个过程称为Rebalance)

Converters

用于在Connect和外部系统发送或接收数据之间转换数据的代码。Kafka Connect 通过 Converter 将数据在Kafka(字节数组)与Task(Object)之间进行转换
在向Kafka写入或从Kafka读取数据时,Converter是使Kafka Connect支持特定数据格式所必需的。task使用转换器将数据格式从字节更改为连接内部数据格式,反之亦然。

默认支持以下Converter

  • AvroConverter(建议)

io.confluent.connect.avro.AvroConverter 与Schema Registry一起使用

  • ProtobufConverter

io.confluent.connect.protobuf.ProtobufConverter: 需要使用 Schema Registry

  • JsonConverter
    org.apache.kafka.connect.json.JsonConverter (无需 Schema Registry): 转换为json结构
  • JsonSchemaConverter

io.confluent.connect.json.JsonSchemaConverter: 需要使用 Schema Registry。适合结构数据

  • StringConverter

org.apache.kafka.connect.storage.StringConverter: 简单的字符串格式

  • ByteArrayConverter

org.apache.kafka.connect.converters.ByteArrayConverter 提供不进行转换的“传递”选项

转换器与连接器本身解耦,以便在连接器之间自然地重用转换器。下图展示了在Kafka Connect中,Converter 在何时进行数据转换

Transforms

更改由连接器生成或发送到连接器的每个消息的简单逻辑

Connector可以配置转换,以便对单个消息(对应代码中的Record)进行简单且轻量的修改。可以配置多个Transform 组成一个链。例如让所有消息的topic加一个前缀、sink无法消费source 写入的数据格式,这些场景都可以使用Transform 解决。
然而,应用于多个消息的更复杂的转换最好使用KSQL和Kafka Stream实现。

转换是一个简单的函数,输入一条记录,并输出一条修改过的记录。Kafka Connect提供许多转换,它们都执行简单但有用的修改。可以使用自己的逻辑定制实现转换接口,将它们打包为Kafka Connect插件,将它们与connector一起使用。

Transform 如果配置在Source 则在Task之后执行,如果配置在Sink 则在Task之前执行。当转换与source connector一起使用时,Kafka Connect通过第一个转换传递connector生成的每条源记录,第一个转换对其进行修改并输出一个新的源记录。将更新后的源记录传递到链中的下一个转换,该转换再生成一个新的修改后的源记录。最后更新的源记录会被转换为二进制格式写入到kafka。转换也可以与sink connector一起使用。

转换算子 功能
Cast 将字段或整个键或值强制转换为特定类型(例如,将整数字段强制转换为较小的宽度)
Drop 从记录中删除键或值,并将其设置为null。
ExtractField 在存在架构时,从结构中提取指定的字段,在无架构数据的情况下,从map中提取特定字段。null不做修改直接传递。
ExtractTopic 用新topic替换旧topic
Filter (Apache Kafka) Drop all records. Designed to be used in conjunction with a Predicate.
Filter (Confluent) Include or drop records that match a configurable filter.condition.
Flatten Flatten a nested data structure. This generates names for each field by concatenating the field names at each level with a configurable delimiter character.
HoistField Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data.
InsertField Insert field using attributes from the record metadata or a configured static value.
MaskField Mask specified fields with a valid null value for the field type.
MessageTimeStampRouter Update the record’s topic field as a function of the original topic value and the record’s timestamp field.
RegexRouter 当前不适用于managed connector。使用配置的正则表达式和替换字符串更新记录主题。
ReplaceField 过滤或重命名字段。
SetSchemaMetadata Set the schema name, version, or both on the record’s key or value schema.
TimestampConverter Convert timestamps between different formats such as Unix epoch, strings, and Connect Date and Timestamp types.
TimestampRouter TimestampRouter
TombstoneHandler Manage tombstone records. A tombstone record is defined as a record with the entire value field being null, whether or not it has ValueSchema.
ValueToKey 将记录键替换为由记录值中的字段子集形成的新键。

Dead Letter Queue

与其他MQ不同,Kafka 并没有死信队列这个功能。但是Kafka Connect提供了这一功能。

当Sink Task遇到无法处理的消息,会根据errors.tolerance配置项决定如何处理,默认情况下(errors.tolerance=none) Sink 遇到无法处理的记录会直接抛出异常,Task进入Fail 状态。开发人员需要根据Worker的错误日志解决问题,然后重启Task,才能继续消费数据

设置 errors.tolerance=all,Sink Task 会忽略所有的错误,继续处理。Worker中不会有任何错误日志。可以通过配置errors.deadletterqueue.topic.name = 让无法处理的消息路由到 Dead Letter Topic

rebalance触发场景

  1. 有新的connector加入或退出集群
  2. 当connector增加或减少它们所需的task数量,或者更改connector的配置时
  3. 当一个worker失败时,task在active的worker之间重新平衡

PS: 当一个task失败时,不会触发再平衡,因为task失败被认为是一个例外情况。因此,失败的task不会被框架自动重新启动,应该通过REST API重新启动

参考文章

https://docs.confluent.io/platform/current/connect/index.html
https://www.jianshu.com/p/fae25cc63997
https://segmentfault.com/a/1190000039395164

【Kafka Connect】相关推荐

  1. 【Kafka笔记】4.Kafka API详细解析 Java版本(Producer API,Consumer API,拦截器等)

    简介 Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...

  2. 【AAD Connect】01:AAD Connect把本地AD账户同步到Office365(AD域账户迁移)

    前言 现在很多企业都会遇到把本地AD账号同步到Office365上,接下来就和大家一起了解一下,如果通过AAD Connect来进行同步操作 [AAD Connect]01:AAD Connect把本 ...

  3. 【AAD Connect】03:使用AAD Connect同步到Office365时的同步规则(AD账号同步到O365)

    前言 配置并启动AAD Connect之后,同步程序将会自动进行同步,根据测试结果,同步规则如下 [AAD Connect]01:AAD Connect把本地AD账户同步到Office365(AD域账 ...

  4. 【AAD Connect】05:通过AAD Connect疑难解答检查同步问题,以及根据提示如何解决问题(AD账户迁移到O365)

    前言 使用aad connect把本地AD用户同步到Office 365时候,如果同步过程中有同步不成功的问题,可以通过AAD Connect疑难解答工具进行问题的自查,具体操作如下 [AAD Con ...

  5. 【kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,附视频)

    超强!!! Kafka高质量专栏学习大全,点我获取!!! 文章目录 前提 所有异常情况 1. TargetBroker若不在线,迁移脚本执行会失败 情景演示 2. TargetBroker在开始迁移过 ...

  6. js判断object对象中是否存在某个key【Mirth Connect】

    @js判断object对象中是否存在某个key[Mirth Connect] 背景 在配置Mirth Connect(ETL工具)时,由于场景需求,根据输出的JSON中的某个key是否存在value来 ...

  7. 【NewTek Connect】Studio Monitor无法获取NewTek Connect内容的问题

    [NewTek Connect]Studio Monitor无法获取NewTek Connect内容的问题 解决办法: 1.检查防火墙,在防火墙应用设置那块对所有NDI都放行. 2.(我遇到的)安装N ...

  8. 【AAD Connect】04:AD账户同步到O365报错:同步服务未运行,启动“ADSync”服务或Unable to connect to the Synchronization Service

    前言 通过AADC(Azure Active Directory Connet)同步AD账号到office 365 AAD时,如果服务器重启之后: 1.如果打开Azuer AD Connect遇到&q ...

  9. confluent【kafka企业版】安装配置————附带详细信息

    文章目录 7 confluent配置 7.0 背景 7.1 下载安装 7.2 配置环境变量 7.3 开放监听端口 7.4 安装debezium插件 7.5 启动confluent 7.6 配置debe ...

最新文章

  1. MXNet中依赖库介绍及简单使用
  2. Python标准库:内置函数dict(mapping, **kwarg)
  3. 自学python需要下载什么软件-一个零基础学习Python应该知道的学习步骤与规划
  4. 重邮计算机导师评价,李章勇_重庆邮电大学研究生导师信息
  5. python数据类型-元组(tuple)
  6. python人门指南小说-Python入门深度学习完整指南
  7. Vue router路由懒加载
  8. 微信小程序demo测试实践
  9. Map与数组、对象之间的转换
  10. N字霸气多空博弈大师能量潮拐点战法通达信 主图/副图/选股指标
  11. 三菱FX2N系列PLC的模拟量扩展模块简介
  12. 需求分析师如何做好非功能性需求
  13. 文件上传的测试点整理
  14. php经典实例博客管理,PHP经典项目案例-(一)博客管理系统2
  15. python 正数变成负数_LeetCode 007:整数反转 (Python)
  16. iOS之AR开发--demo制作篇:图片识别
  17. 学计算机心得体会50字,阅读心得体会50字
  18. vue 动态渲染图片 不出来
  19. 【Codeforces 1157F】 Maximum Balanced Circle | 思维、dp、二分
  20. 简单的小青蛙跳一跳问题

热门文章

  1. 2019,愿AI和新芒与你同在
  2. 京东/京粉/京东联盟商品查返利转返利链接接口API
  3. java使用poi将excel转csv文件(所有sheet页、值)
  4. Java的平台独立性
  5. 【数学知识】欧几里得空间
  6. 大学数学视频课程推荐
  7. CIDR(Classless InterDomain Routing ,无类别域际路由选择)
  8. Matlab基本函数length、size、numel
  9. Linux下IIC驱动编写,介绍IIC子系统框架的使用
  10. 小觅深度版-realsense系列,深度相机对比