一、 kafka-python和pykafka基本对比

对比 kafka-python pykafka
出现时间 更早 稍晚
最新版本 2.0.1(2020年2月20日) 2.8.0(2018年9月24日)
语言实现 纯python python和连接 librdkafka C 库
API风格 更类似java客户端 更类似python客户端
kafka版本支持 0.8以上 0.8.2 以上
zookeeper支持 kafka0.9以上支持 支持
python版本支持 2.7 3.4 3.5 3.6 3.7 2.7, 3.4, 3.5

二、kafka-python和pykafka使用对比

对比 kafka-python pykafka
生产初始化 初始化时,不需要知道 topic 初始化 producer 时,需要知道 topic
广大网友测试稳定性(早期版本) broker 重启后,producer 会死掉 broker 重启后,producer 不会死掉
广大网友测试结论(早期版本) 消费速率更好一些 生产效率更高一些
gevent/eventlet 不支持gevent 可支持gevent
是否支持ssl 支持 支持
是否支持压缩格式 支持 支持
是否支持指定分区 支持 支持
是否线程安全 producer是线程安全,consumer不是线程安全 不是线程安全
个人使用感受 producer的参数更多,控制的更细 -

三、Pykafka使用介绍

官网使用 https://pykafka.readthedocs.io/en/latest/

3.1 生产

3.1.1 代码

from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092,127.0.0.1:9093,...")
topic = client.topics['my.test']
//不需要report
with topic.get_sync_producer(sync=False, delivery_reports=False) as producer:for i in range(4):producer.produce('test message ' + str(i ** 2))//需要report
with topic.get_producer(delivery_reports=True) as producer:count = 0while True:count += 1producer.produce('test msg', partition_key='{}'.format(count))if count % 10 ** 5 == 0:  # adjust this or bring lots of RAM ;)while True:try:msg, exc = producer.get_delivery_report(block=False)if exc is not None:print 'Failed to deliver msg {}: {}'.format(msg.partition_key, repr(exc))else:print 'Successfully delivered msg {}'.format(msg.partition_key)except Queue.Empty:break

注意:
1)在程序退出时调用producer.stop(),否则会报 ReferenceError: weakly-referenced object no longer exists错误
2) delivery report 是只在当前线程存在,如果设置了delivery_reports=True,一定要调用get_delivery_report,否则会造成pykafka内存上涨。使用pending_timeout_ms参数配置等待报告时间

3.1.2 rdkafka使用

  1. pykafka producer支持use_rdkafka = True参数,但前提是安装了librdkafka这个库。同时pykafka的consumer只有simpleconsumer能用rdkafka,balanced_consumer就不支持rdkafka了
    pykafka.exceptions.ProducerQueueFullError错误。
  2. block_on_queue_full = True 这个参数在使用rdkafka时失效,当队列满时不会阻塞,会抛出异常。因为在rdkafka producer中block_on_queue_full参数。
  3. 虽然用greenlets能提高pykafka的效率,但是提升有限,但是如果使用rdkafka的话效率有成倍的提升。

3.1.3 多进程使用

多进程使用pykafka共享一个client,会造成只有进程能够正常的写入数据,如果使用了delivery_reports(包括同步),会导致子进程彻底阻塞掉不可用

3.2 消费

有 get_simple_consumer 和get_balanced_consumer两种方式:
两个get_simple_consumer会消费相同的数据
get_balanced_consumer会根据指定的groupid进行动态分配,保证相同的组不会消费到相同数据

四、kafka-python的使用介绍

使用 https://kafka-python.readthedocs.io/en/master/

4.1 消费

4.1.1 关键参数介绍

参数名称及默认值 含义
‘fetch_min_bytes’: 1 一次请求时,server返给client的最小字节,如果没达到,会等fetch_max_wait_ms时间
‘fetch_max_wait_ms’: 500 server最长阻塞时间,当达到fetch_min_bytes时,返回给client
‘fetch_max_bytes’: 52428800=50MB 一次请求时,server返给client的最大字节
‘max_partition_fetch_bytes’: 1 * 1024 * 1024 server返给每个partition最大字节数
‘request_timeout_ms’: 40 * 1000 一次request超时时间
‘auto_offset_reset’: ‘latest’ 可选:earliest
‘enable_auto_commit’: True 是否在后台周期自动提交offset
‘auto_commit_interval_ms’: 5000 自动周期提交offset时间
‘metadata_max_age_ms’: 5 * 60 * 1000 每隔5分钟强制进行metadata刷新
‘heartbeat_interval_ms’: 3000 客户端与服务端保活包发送周期,每3秒发送一个,这个值一定要比session_timeout_ms小,且小于1/3的session_timeout_ms
‘session_timeout_ms’: 30000 session超时时间
‘max_poll_records’: 500 调取poll时默认最大读取数量

4.1.2 Rebalance问题记录

记录kafka-python1.4.6 的一个rebalance错误:https://www.cnblogs.com/piperck/p/11373018.html
总结原因就是:
1)消费者消费太慢,导致超时(我们通常会一次性 poll 拉默认 500 条数据下来。我们需要在 300s 中完成 500 条数据的处理。如果不能完成的话就可能会触发rebalance问题)
2)消费太快,导致消费者处于空poll的状态,阻塞发送心跳线程;
以上两个原因都会让server认为需要rebalance。

第一种原因的话,按照文中的方案,降低max_poll_records(默认500)或提高metadata_max_age_ms(默认5分钟强制刷新metadata)
第二种原因的话,解决方案是修改heartbeat_interval_ms(默认3秒)和metadata_max_age_ms(默认5分钟)为差不多的大小,例如将metadata_max_age_ms修改为3s

五、kafka-python和pykafka 生产关键参数对比

对比 kafka-python pykafka
生产模式 异步处理,可通过acks的值支持是否需要收到全部broker确认 sync=False,默认异步
指定分区 如果指定了key,那么每次会分到当前可用的相同分区,如果没有指定key,则随机分配;支持回调 无论是否指定key,默认随机,支持hash分配
出错重试 retries:0 max_retries=3,这会无法导致数据顺序到达broker,如果要保序,置成同步模式
队列最大缓存数量 ‘buffer_memory’: 33554432,即32MB,如果超过时,会阻塞max_block_ms=60秒,之后会抛出异常 max_queued_messages=100000,如果发送给broker速度小于接收速度,超过此大小时,根据block_on_queue_full参数,判断是阻塞producer还是抛出异常:block_on_queue_full=True,默认阻塞,设成False为抛出异常。如果同步模式下,此参数设置时效
队列最小缓存数量 一个分区batch_size:16384即16K min_queued_messages=70000,若当前为同步模式,此值自动覆盖为1
最长缓存超时 ‘linger_ms’: 0 linger_ms=5 * 1000,设置为0则不缓存,一旦produce,就会发出去
空队列时超时检测 - queue_empty_timeout_ms=0,若当前队列为空,则每等待queue_empty_timeout_ms时间,启动linger_ms的检测
一次请求最大支持字节数 ‘max_request_size’: 1048576 即1MB max_request_size=1000012,约等于1MB

参考链接:
两个安装包网友总结对比 https://www.iteye.com/blog/chenxiuheng-2265887
gevent下不支持kafka-python
https://www.cnblogs.com/shengulong/p/10473600.html
https://www.zhihu.com/question/31809734
rdkafka使用注意和性能测试参考 https://blog.csdn.net/liuxingen/article/details/71746218
pykafka 生产、消费的详细例子 https://www.jianshu.com/p/0da5d81cf649

Kafka-python Pykafka对比使用参数大全相关推荐

  1. python常用扩展模块资源(大全)

    本文由 大侠(AhcaoZhu)整理并转载,转载请请引用原出处. 链接: https://blog.csdn.net/Ahcao2008 python常用扩展模块资源(大全) Python 资源大全中 ...

  2. python图像对比_用python实现对比两张图片的不同

    from PIL import Image from PIL import ImageChops def compare_images(path_one, path_two, diff_save_lo ...

  3. python中函数的参数_Python小知识-Python中的函数参数(基础篇)

    0 总述 这个题目分为2篇,第一篇基本上是概念以及实例,有个相对系统的认识,第二篇想写的深入一些,不同参数类型的区别以及和c/c++的函数参数对比下异同-下面开始是第一篇的内容啦- Python中的函 ...

  4. 【Python】| 基于Python实现对比Excel的小工具

    目的:设计一个应用GUI用于对比两个Excel文件 背景:因工作中长于数据打交道,不可避免的会进行简单繁琐数据的对比,故人生苦短我用Python! 思路 1.参数 同一个excel文件两个sheet页 ...

  5. python快速对比两个excel的数据是否一致

    python快速对比两个excel的数据是否一致 功能概述 导入包 封装函数 程序开始 功能概述 两个sheet里面的表头有部分不一致,但是数据对应的内容是一致的,因此需要匹配表格多的值是否一致. 输 ...

  6. 【转】python改文件夹名称大全_python文件和目录操作方法大全(含更改文件夹下所有文件名称的实例)

    原文:​​​​​​ ​​​​​​​​​​​​​​​​​​​python改文件夹名称大全_python文件和目录操作方法大全(含更改文件夹下所有文件名称的实例)..._心失荒野的博客-CSDN博客 一. ...

  7. 【AIGC提示工程 - MidJourney教程:二】《MidJourney参数大全指南:实现最佳图像输出的关键》

    关注元壤教育公众号系统学习AIGC提示工程课程. 更多AIGC好博客,请移步访问AIGC博客派 这篇文章介绍了不同的MidJourney参数和提示词,帮助你创建你选择的图像.探索如何使用不同的风格和参 ...

  8. python命令之m参数 局域网传输

    在命令行中使用python时,python支持在其后面添加可选参数. python命令的可选参数有很多,例如:使用可选参数h可以查询python的帮助信息: 可选参数m 下面我们来说说python命令 ...

  9. Python 函数的可变参数(*paramter与**paramter)的使用

    Python 函数的可变参数主要有 *paramter与**paramter 可变参数主要有 *paramter的作用 接受任意多个实际参数并放到一个元组中 def people(*people):f ...

最新文章

  1. AttributeError: ‘NoneType‘ object has no attribute ‘span‘
  2. 从此不再为Web页面中的Tooltip烦恼
  3. 以色列研发出0经验AI,无需训练就能学会抠图
  4. Laravel5.2之Filesystem-从Dropbox中下载文件到AWS S3
  5. python实验七答案_Python实验报告七
  6. springMVC请求流程详解
  7. Java多线程编程实战(读书笔记)
  8. Cocos2dx 之 ButtonSprite
  9. python怎么读取excel-python 读取 Excel
  10. [转载] python迭代器
  11. 一致性Hash与负载均衡
  12. Juphoon RTC年度成绩单,请查收!
  13. 实时调试WebDriver代码
  14. 使用Hourglass网络来理解人体姿态
  15. printf()中%n格式说明符
  16. Vue,React,微信小程序,快应用,TS 和 Koa 一把梭
  17. 基于JAVA干洗店订单管理系统设计与实现计算机毕业设计源码+数据库+lw文档+系统+部署
  18. “着色器”是什么意思? 如何使用HTML5和WebGL创建它们
  19. 电磁波的加上Mur吸收边界条件的FDTD算法
  20. 三星2016换电池教程

热门文章

  1. Python:实现sylvester西尔维斯特方程算法(附完整源码)
  2. python人机对话_人机交互程序 python实现人机对话
  3. C语言生成随机字符串
  4. Python 游戏框架搭建
  5. Django计算机毕业设计母婴用品店管理系统python(源码程序+lw+远程部署)
  6. 深度学习与自然语言处理(8)_斯坦福cs224d RNN,MV-RNN与RNTN
  7. 收纳箱底部滚轮怎样装_抽屉式收纳箱怎么样 收纳其实很简单
  8. 全球市场药丸收纳盒市场销售情况及需求规模预测报告(新版)2022年
  9. Sandcastle 学习教程 (1) 基础入门
  10. MySQL表锁、行锁、排它锁和共享锁