kafka简介(摘自百度百科)

一、简介:

详见:https://blog.csdn.net/Beyond_F4/article/details/80310507

二、安装

详见博客:https://blog.csdn.net/beyond_f4/article/details/80095689

三、按照官网的样例,先跑一个应用

1、生产者:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092']) #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]

for i in range(3):

msg = "msg%d" % i

producer.send('test', msg)

producer.close()

2、消费者(简单demo):

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',

bootstrap_servers=['172.21.10.136:9092'])

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

启动后生产者、消费者可以正常消费。

3、消费者(消费群组)

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',

group_id='my-group',

bootstrap_servers=['172.21.10.136:9092'])

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力

4、消费者(读取目前最早可读的消息)

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',

auto_offset_reset='earliest',

bootstrap_servers=['172.21.10.136:9092'])

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest

源码定义:{'smallest': 'earliest', 'largest': 'latest'}

5、消费者(手动设置偏移量)

from kafka import KafkaConsumer

from kafka.structs import TopicPartition

consumer = KafkaConsumer('test',

bootstrap_servers=['172.21.10.136:9092'])

print consumer.partitions_for_topic("test") #获取test主题的分区信息

print consumer.topics() #获取主题列表

print consumer.subscription() #获取当前消费者订阅的主题

print consumer.assignment() #获取当前消费者topic、分区信息

print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量

consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,从第5个偏移量消费

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

6、消费者(订阅多个主题)

from kafka import KafkaConsumer

from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])

consumer.subscribe(topics=('test','test0')) #订阅要消费的主题

print consumer.topics()

print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量

for message in consumer:

print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

message.offset, message.key,

message.value))

7、消费者(手动拉取消息)

from kafka import KafkaConsumer

import time

consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])

consumer.subscribe(topics=('test','test0'))

while True:

msg = consumer.poll(timeout_ms=5) #从kafka获取消息

print msg

time.sleep(1)

8、消费者(消息挂起与恢复)

from kafka import KafkaConsumer

from kafka.structs import TopicPartition

import time

consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])

consumer.subscribe(topics=('test'))

consumer.topics()

consumer.pause(TopicPartition(topic=u'test', partition=0))

num = 0

while True:

print num

print consumer.paused() #获取当前挂起的消费者

msg = consumer.poll(timeout_ms=5)

print msg

time.sleep(2)

num = num + 1

if num == 10:

print "resume..."

consumer.resume(TopicPartition(topic=u'test', partition=0))

print "resume......"

pause执行后,consumer不能读取,直到调用resume后恢复。

如果对您有帮助,记得给我点赞诺

如果对您有帮助,记得给我点赞诺

python消费kafka_Python脚本消费kafka数据相关推荐

  1. kafka 的pom文件_Flink 消费 Kafka 数据

    kafka核心概念: Kafka 是一个消息队列,生产者向消息队列中写入数据,消费者从队列中获取数据并进行消费.可以认为一个 Topic 就是一个队列,每个 Topic 又会被分成多个 Partiti ...

  2. sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案

    下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...

  3. 客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中

    目录 将消费的kafka数据同步到Kudu中 一.导入表名映射关系类

  4. Storm 消费Kafka数据及相关异常解决

    Storm 消费Kafka数据及相关异常解决 问题 KafkaTopoDemo类 bolt类 问题 storm报错:Exception in thread "main" java. ...

  5. java消费kafka数据之后,进行堆积之后在插入数据库

    java高频的获取kafka数据,导致数据库数据一致在高频读写,为了降低数据库的高频连接搞高频读写,可以将数据堆积一段时间之后,进行插入数据库操作. 主要采用了队列和缓存,将获取到的数据放入java队 ...

  6. SparkStreaming安全消费Kafka数据

    前言 在这之前做SparkStreaming连接Kafka,我会这么写: val sparkConf = new SparkConf().setAppName("Spark2Kafka&qu ...

  7. java debug非同期ski,简记kafka group id相同导致的不同consumers启动后不消费和延时消费问题...

    场景: 在一个线程内,使用相同的brokers和group id等配置,根据传入的topic数量N,分别定义N个consumer,按定义顺序先后调用consumers消费 现象: 程序启动后,kafk ...

  8. kafka控制台模拟消费_Flink初试——对接Kafka

    本篇文章我们用 Flink Kafka Connector对接Kafka,实现一个简单的报警业务.我们暂时不去谈论理论,先上手实现这个简单的需求. flink-connector-kafka是 fli ...

  9. mac系统下使用flink消费docker运行的kafka

    版本 flink 1.12.0 scala 2.11 java 1.8 kafka 2.0.2 首先使用maven创建一个新的工程 mvn archetype:generate -Darchetype ...

  10. 【python】使用python脚本将LFW数据中1672组同一个人多张照片拷贝出来

    使用python脚本将LFW数据中1672组同一个人多张照片拷贝出来 dataCleaning4multiple.py 源码如下: import os, random, shutil import s ...

最新文章

  1. Mysql分析-profile详解
  2. 谷歌语音转录背后的神经网络
  3. Codeforces 576D Flights for Regular Customers (图论、矩阵乘法、Bitset)
  4. 64位MinGW和MSYS的安装
  5. 数据预处理之归一化/标准化/正则化/零均值化
  6. 中国科学院数学与系统科学研究院关于2019年招收硕士研究生复试规程
  7. cloud源码-Feign
  8. 商业智能在公安交通管理领域的应用
  9. Redis学习笔记整理(黑马程序员视频课程)
  10. android禁止屏幕自动旋转_Android 禁止屏幕旋转 屏幕旋转不刷新 Activity 动态更改屏幕方向...
  11. 中文邮件分类[朴素贝叶斯、支持向量机、Logistic,TF-IDF,词袋模型]
  12. 学习笔记:Github(1)站在巨人的肩膀上
  13. Unity Profiler
  14. 照片像素化项目(on Github)
  15. 互联网协议学习笔记-----IP协议与传输协议
  16. 阿里巴巴牵头发起对雅虎的250亿美元并购
  17. 局部非饱和性的含义_范里安-微观经济学现代观点讲义(new)
  18. 数据增强 data augmentation
  19. MySQL-Test-Run测试工具
  20. 设置阿里云镜像(registry.cn-beijing.aliyuncs.com)登录凭证

热门文章

  1. CMD/Dos下远程开启3389与远程改3389端口
  2. 如何通过U盘等外部设备启动带有T2芯片的Mac?
  3. Photoshop 入门教程「3」如何缩放和平移图像?
  4. Loopback for Mac虚拟音频传输管理工具
  5. Galaxy Fold上市时间推延 屏幕瑕疵是主因
  6. background: radial-gradient径向渐变
  7. 烂泥:windows server 2008取消关闭事件跟踪
  8. 6、PXE安装ESXI6.0
  9. 进阶之路(基础篇) - 003 I/O的模拟的读取
  10. Debian更新软件源提示There is no public key available for the following key IDs的解决方法