【Kafka Connect】
Kafka Connect
- 概念
- 特点
- 组件
- Connectors
- Tasks
- Workers
- Converters
- Transforms
- Dead Letter Queue
- rebalance触发场景
- 参考文章
概念
Kafka Connect是一个用于将数据流输入和输出Kafka的框架。Confluent平台附带了几个内置connector,可以使用这些connector进行关系数据库或HDFS等常用系统到kafka的数据传输
特点
- 数据从数据源读出或写入时延低;
- 从从不同数据源获取数据或将数据写入到不同数据源(如:客户端、数据库、HDFS、静态文件等等)
组件
Connectors
通过管理task来协调数据流的高级抽象。
可分为两种connectors:
- Source connector
源连接器可以从多种渠道(如:数据库、静态文件、HDFS客户端等)拉取数据到kafka topic中 - Sink connector
宿连接器将topic中的数据push到多种目的端消费。将Kafka主题中的数据传递到Elasticsearch等二级索引中,或Hadoop等批处理系统中,用于离线分析。
Tasks
如何将数据复制到Kafka或从Kafka复制数据的实现。实际进行数据传输的单元,和连接器一样同样分为 Source和Sink
Task是Connect数据模型中的主要处理数据的角色。每个connector实例协调一组实际复制数据的task。通过允许connector将单个作业分解为多个task,Kafka Connect提供了内置的对并行性和可伸缩数据复制的支持,只需很少的配置。这些任务没有存储任何状态。任务状态存储在Kafka中的特殊主题config.storage.topic和status.storage.topic中。因此,可以在任何时候启动、停止或重新启动任务,以提供弹性的、可伸缩的数据管道。
Workers
执行Connector和Task的运行进程。Connectors 和Task 属于逻辑单元,而Worker 是实际运行逻辑单元的进程
Standalone Workers
Standalone模式是最简单的模式,用单一进程负责执行所有connector和task。适用于特定场景,如收集主机日志
Distributed Workers
分布式模式为Kafka Connect提供了可扩展性和自动容错能力,使用更广。在分布式模式下,相同group.id的Worker,会自动组成集群。当新增Worker,或者有Worker挂掉时,其余的worker将检测到这一点,集群会自动协调分配所有的Connector 和 Task(这个过程称为Rebalance)
Converters
用于在Connect和外部系统发送或接收数据之间转换数据的代码。Kafka Connect 通过 Converter 将数据在Kafka(字节数组)与Task(Object)之间进行转换。
在向Kafka写入或从Kafka读取数据时,Converter是使Kafka Connect支持特定数据格式所必需的。task使用转换器将数据格式从字节更改为连接内部数据格式,反之亦然。
默认支持以下Converter
- AvroConverter(建议)
io.confluent.connect.avro.AvroConverter 与Schema Registry一起使用
- ProtobufConverter
io.confluent.connect.protobuf.ProtobufConverter: 需要使用 Schema Registry
- JsonConverter
org.apache.kafka.connect.json.JsonConverter (无需 Schema Registry): 转换为json结构 - JsonSchemaConverter
io.confluent.connect.json.JsonSchemaConverter: 需要使用 Schema Registry。适合结构数据
- StringConverter
org.apache.kafka.connect.storage.StringConverter: 简单的字符串格式
- ByteArrayConverter
org.apache.kafka.connect.converters.ByteArrayConverter 提供不进行转换的“传递”选项
转换器与连接器本身解耦,以便在连接器之间自然地重用转换器。下图展示了在Kafka Connect中,Converter 在何时进行数据转换
Transforms
更改由连接器生成或发送到连接器的每个消息的简单逻辑
Connector可以配置转换,以便对单个消息(对应代码中的Record)进行简单且轻量的修改。可以配置多个Transform 组成一个链。例如让所有消息的topic加一个前缀、sink无法消费source 写入的数据格式,这些场景都可以使用Transform 解决。
然而,应用于多个消息的更复杂的转换最好使用KSQL和Kafka Stream实现。
转换是一个简单的函数,输入一条记录,并输出一条修改过的记录。Kafka Connect提供许多转换,它们都执行简单但有用的修改。可以使用自己的逻辑定制实现转换接口,将它们打包为Kafka Connect插件,将它们与connector一起使用。
Transform 如果配置在Source 则在Task之后执行,如果配置在Sink 则在Task之前执行。当转换与source connector一起使用时,Kafka Connect通过第一个转换传递connector生成的每条源记录,第一个转换对其进行修改并输出一个新的源记录。将更新后的源记录传递到链中的下一个转换,该转换再生成一个新的修改后的源记录。最后更新的源记录会被转换为二进制格式写入到kafka。转换也可以与sink connector一起使用。
转换算子 | 功能 |
---|---|
Cast | 将字段或整个键或值强制转换为特定类型(例如,将整数字段强制转换为较小的宽度) |
Drop | 从记录中删除键或值,并将其设置为null。 |
ExtractField | 在存在架构时,从结构中提取指定的字段,在无架构数据的情况下,从map中提取特定字段。null不做修改直接传递。 |
ExtractTopic | 用新topic替换旧topic |
Filter (Apache Kafka) | Drop all records. Designed to be used in conjunction with a Predicate. |
Filter (Confluent) | Include or drop records that match a configurable filter.condition. |
Flatten | Flatten a nested data structure. This generates names for each field by concatenating the field names at each level with a configurable delimiter character. |
HoistField | Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data. |
InsertField | Insert field using attributes from the record metadata or a configured static value. |
MaskField | Mask specified fields with a valid null value for the field type. |
MessageTimeStampRouter | Update the record’s topic field as a function of the original topic value and the record’s timestamp field. |
RegexRouter | 当前不适用于managed connector。使用配置的正则表达式和替换字符串更新记录主题。 |
ReplaceField | 过滤或重命名字段。 |
SetSchemaMetadata | Set the schema name, version, or both on the record’s key or value schema. |
TimestampConverter | Convert timestamps between different formats such as Unix epoch, strings, and Connect Date and Timestamp types. |
TimestampRouter | TimestampRouter |
TombstoneHandler | Manage tombstone records. A tombstone record is defined as a record with the entire value field being null, whether or not it has ValueSchema. |
ValueToKey | 将记录键替换为由记录值中的字段子集形成的新键。 |
Dead Letter Queue
与其他MQ不同,Kafka 并没有死信队列这个功能。但是Kafka Connect提供了这一功能。
当Sink Task遇到无法处理的消息,会根据errors.tolerance配置项决定如何处理,默认情况下(errors.tolerance=none) Sink 遇到无法处理的记录会直接抛出异常,Task进入Fail 状态。开发人员需要根据Worker的错误日志解决问题,然后重启Task,才能继续消费数据
设置 errors.tolerance=all,Sink Task 会忽略所有的错误,继续处理。Worker中不会有任何错误日志。可以通过配置errors.deadletterqueue.topic.name = 让无法处理的消息路由到 Dead Letter Topic
rebalance触发场景
- 有新的connector加入或退出集群
- 当connector增加或减少它们所需的task数量,或者更改connector的配置时
- 当一个worker失败时,task在active的worker之间重新平衡
PS: 当一个task失败时,不会触发再平衡,因为task失败被认为是一个例外情况。因此,失败的task不会被框架自动重新启动,应该通过REST API重新启动
参考文章
https://docs.confluent.io/platform/current/connect/index.html
https://www.jianshu.com/p/fae25cc63997
https://segmentfault.com/a/1190000039395164
【Kafka Connect】相关推荐
- 【Kafka笔记】4.Kafka API详细解析 Java版本(Producer API,Consumer API,拦截器等)
简介 Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...
- 【AAD Connect】01:AAD Connect把本地AD账户同步到Office365(AD域账户迁移)
前言 现在很多企业都会遇到把本地AD账号同步到Office365上,接下来就和大家一起了解一下,如果通过AAD Connect来进行同步操作 [AAD Connect]01:AAD Connect把本 ...
- 【AAD Connect】03:使用AAD Connect同步到Office365时的同步规则(AD账号同步到O365)
前言 配置并启动AAD Connect之后,同步程序将会自动进行同步,根据测试结果,同步规则如下 [AAD Connect]01:AAD Connect把本地AD账户同步到Office365(AD域账 ...
- 【AAD Connect】05:通过AAD Connect疑难解答检查同步问题,以及根据提示如何解决问题(AD账户迁移到O365)
前言 使用aad connect把本地AD用户同步到Office 365时候,如果同步过程中有同步不成功的问题,可以通过AAD Connect疑难解答工具进行问题的自查,具体操作如下 [AAD Con ...
- 【kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,附视频)
超强!!! Kafka高质量专栏学习大全,点我获取!!! 文章目录 前提 所有异常情况 1. TargetBroker若不在线,迁移脚本执行会失败 情景演示 2. TargetBroker在开始迁移过 ...
- js判断object对象中是否存在某个key【Mirth Connect】
@js判断object对象中是否存在某个key[Mirth Connect] 背景 在配置Mirth Connect(ETL工具)时,由于场景需求,根据输出的JSON中的某个key是否存在value来 ...
- 【NewTek Connect】Studio Monitor无法获取NewTek Connect内容的问题
[NewTek Connect]Studio Monitor无法获取NewTek Connect内容的问题 解决办法: 1.检查防火墙,在防火墙应用设置那块对所有NDI都放行. 2.(我遇到的)安装N ...
- 【AAD Connect】04:AD账户同步到O365报错:同步服务未运行,启动“ADSync”服务或Unable to connect to the Synchronization Service
前言 通过AADC(Azure Active Directory Connet)同步AD账号到office 365 AAD时,如果服务器重启之后: 1.如果打开Azuer AD Connect遇到&q ...
- confluent【kafka企业版】安装配置————附带详细信息
文章目录 7 confluent配置 7.0 背景 7.1 下载安装 7.2 配置环境变量 7.3 开放监听端口 7.4 安装debezium插件 7.5 启动confluent 7.6 配置debe ...
最新文章
- MXNet中依赖库介绍及简单使用
- Python标准库:内置函数dict(mapping, **kwarg)
- 自学python需要下载什么软件-一个零基础学习Python应该知道的学习步骤与规划
- 重邮计算机导师评价,李章勇_重庆邮电大学研究生导师信息
- python数据类型-元组(tuple)
- python人门指南小说-Python入门深度学习完整指南
- Vue router路由懒加载
- 微信小程序demo测试实践
- Map与数组、对象之间的转换
- N字霸气多空博弈大师能量潮拐点战法通达信 主图/副图/选股指标
- 三菱FX2N系列PLC的模拟量扩展模块简介
- 需求分析师如何做好非功能性需求
- 文件上传的测试点整理
- php经典实例博客管理,PHP经典项目案例-(一)博客管理系统2
- python 正数变成负数_LeetCode 007:整数反转 (Python)
- iOS之AR开发--demo制作篇:图片识别
- 学计算机心得体会50字,阅读心得体会50字
- vue 动态渲染图片 不出来
- 【Codeforces 1157F】 Maximum Balanced Circle | 思维、dp、二分
- 简单的小青蛙跳一跳问题