python kafka收不到消息_python通过Pykafka库来连接kafka并收发消息
1.安装pykafka
pip install pykafka
2.下载安装
git clone https://github.com/Parsely/pykafka.git
然后将下载下来的pykafka文件夹下的pykafka文件(pykafka的库文件)放到/Library/Python/2.7/site-packages/路径下即可
3.假设你有至少一个卡夫卡实例在本地运行,你可以使用pykafka连接它。
consumer.py 消费者
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
#kafka默认端口为9092
client = KafkaClient(hosts='192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092')#这里连接多个客户端
topic = client.topics['test_kafka_topic']
#从zookeeper消费,zookeeper的默认端口为2181
balanced_consumer = topic.get_balanced_consumer(
consumer_group='test_kafka_group',
auto_commit_enable=True, # 设置为False的时候不需要添加consumer_group,直接连接topic即可取到消息
zookeeper_connect='192.168.1.140:2181,192.168.1.141:2181,192.168.1.142:2181'#这里就是连接多个zk
)
for message in balanced_consumer:
# print message
if message is not None:
print message.offset, message.value#打印接收到的消息体的偏移个数和值
producer.py 生产者
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
client = KafkaClient(hosts ="192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092") #可接受多个client
#查看所有的topic
client.topics
print client.topics
topic = client.topics['test_kafka_topic']#选择一个topic
message ="test message test message"
#当有了topic之后呢,可以创建一个producer,来发消息,生产kafka数据,通过字符串形式,
with topic.get_sync_producer() as producer:
producer.produce(message)
#The example above would produce to kafka synchronously -
#the call only returns after we have confirmation that the message made it to the cluster.
#以上的例子将产生kafka同步消息,这个调用仅仅在我们已经确认消息已经发送到集群之后
#但生产环境,为了达到高吞吐量,要采用异步的方式,通过delivery_reports =True来启用队列接口;
with topic.get_sync_producer() as producer:
producer.produce('test message',partition_key='{}'.)
producer=topic.get_producer()
producer.produce(message)
print message
python kafka收不到消息_python通过Pykafka库来连接kafka并收发消息相关推荐
- 怎么看rabbitmq的浏览器信息_没用过消息队列?一文带你体验RabbitMQ收发消息
人生终将是场单人旅途,孤独之前是迷茫,孤独过后是成长. 楔子 先给大家说声抱歉,最近一周都没有发文,有一些比较要紧重要的事需要处理. 今天正好得空,本来说准备写SpringIOC相关的东西,但是发现想 ...
- java连接rabbitmq_没用过消息队列?一文带你体验RabbitMQ收发消息
楔子 先给大家说声抱歉,最近一周都没有发文,有一些比较要紧重要的事需要处理. 今天正好得空,本来说准备写SpringIOC相关的东西,但是发现想要梳理一遍,还是需要很多时间,所以我打算慢慢写,先把MQ ...
- python撤销上一步操作_Python 神操作,还原已撤回的微信消息
项目环境 语言:Python3 编辑器:Pycharm 导包效果展示 以下截图显示的撤回消息类型依次是文字消息.微信自带表情.图片.语音.定位地图.名片.公众号文章.音乐.视频.有群里撤回的,也有个人 ...
- python 微信公众号回复图片_Python webpy微信公众号开发之 回复图文消息
新建图文回复模板reply_pictext.xml: $def with (toUser,fromUser,createTime,title1,description1,picurl1,url1) $ ...
- python中bin函数的用法_Python 3标准库用法--ascii()、bin()、breakpoint()函数
ascii(object) 就像函数 repr(),返回一个对象可打印的字符串,但是 repr() 返回的字符串中非 ASCII 编码的字符,会使用 \x.\u 和 \U 来转义.生成的字符串和 Py ...
- python中re的安装步骤_Python中requent库的安装与卸载【原创】
以管理员身份运行cmd,输入 pip install requests,点击回车键后,需要等待大概十几秒钟就可以安装成功了,出现如下界面 安装完成后进行安装测试,此时要先输入python,敲击回车,运 ...
- python接口测试之requests详解_Python接口测试-requests库
一.requests库 Requests 是用Python语言编写,基于 urllib,采用 Apache2 Licensed 开源协议的 HTTP 库.它比 urllib 更加方便,可以节约我们大量 ...
- python xlrd读取文件报错_python中xlrd库如何实现文件读取?
俗话说得好,技多不压身,虽然我们已经掌握了多种可以实现读取文件的方式,但是丝毫不影响我们要学会精益求精,他说学习文件读取的奥秘,况且,数据分析是十分重要的,一切的代码运行,总归都是要服务于数据,好啦, ...
- python处理一亿条数据_Python基础数据处理库
Numpy 简介 import numpy as np Numpy是应用Python进行科学计算的基础库.它的功能包括多维数组.基本线性代数.基本统计计算.随机模拟等.Numpy的核心功能是ndarr ...
最新文章
- MongoDB (二) MongoDB 优点
- Android核心分析 之一分析方法论探讨之设计意图
- windows简易使用composer 安装国内镜像
- CVE-2019-8660 iMessage 漏洞复现
- axios不发起请求_axios无法发起跨域请求
- LeetCode 112路径总和-简单
- 为什么Prim算法不适用于带权有向图
- typecho 去掉index.php,Typecho设置伪静态去掉url中的index.php
- 181219每日一句
- 青花瓷音乐的单片机c语言程序,单片机蜂鸣器演奏青花瓷的程序
- SolidWorks 2018 安装教程
- MapGuide应用最佳实践—MapGuide Server和MapGuide WebExtension分开部署
- matlab计算abc三相短路电流_供配电系统设计需要用到的计算公式(结合手册简要总结)...
- sqlserver中65535_Sql Server数据导出EXCEL 解决行数超过65535问题
- 调整html元素大小 resize
- 暴雪战网怎么修改服务器,战网昵称修改服务
- openstack资料-陈沙克整理
- Tms320F28335中软件触发信号采样(ADC)
- RV1126RV1109 buildroot 增加USB双摄支持(一)
- Apache Kylin在百度地图的实践