kafka java api 生产者 producer 与消费者consumer
c踩坑
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld
将localhost必须和PLAINTEXT配置的地址保持一致,否则的话会无限警告不能接收数据
生产者;
import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerDemo { public static void main(String[] args){ try { //新建一个类这个类用来配置kafka Properties properties = new Properties();//用来配置 kafka的IP地址和端口号 properties.put("bootstrap.servers", "master:9092");//ack 的状态可以分为三种 1,-1,all 1代表当生产者 properties.put("acks", "1");//重要//最多重复几次生产操作 properties.put("retries", 3);//重要 //缓存每个分区未发送消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。 properties.put("batch.size", 16384);//监听间隔
properties.put("linger.ms", 1);//控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。 properties.put("buffer.memory", 33554432);//序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//初始化生产者 Producer<String, String> producer = null; producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 100; i++) { String msg = "Messagea" + i;//topic的名字:HelloWorld producer.send(new ProducerRecord<String, String>("HelloWorld1", msg)); System.out.println("Sent:" + msg); //Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } }}之后再启动消费者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld --from-beginning //消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays;import java.util.Properties; public class ConsumerDemo { public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers", "master:9092"); properties.put("group.id", "i"); //是否自动提交properties.put("enable.auto.commit", "false"); properties.put("auto.commit.interval.ms", "1000");//偏移量自增properties.put("auto.offset.reset", "earliest");////session保存时间properties.put("session.timeout.ms", "30000");//序列化properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);//topictest2kafkaConsumer.subscribe(Arrays.asList("HelloWorld")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }}最后启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic HelloWorld
但是以上操作都有可能遇到数据重复或者数据丢失,在kafka0.11后引入了幂等和事务完美的解决了这个问题切忌是0.11版本之后包括0.11,如欲了解详情请看下篇 老规矩:散文欣赏《不老苍穹》今后的日子,我的头发越来越少,我写给你的句子越来越少,我剩下的时间越来越少,但世界永恒,星辰常在,苍穹不老。 最后的日子,我抱起你会很难,我看清你会很难,我记起你会很难,但此心至诚,百年不变,千岁未寒。
转载于:https://www.cnblogs.com/lppz/p/9991993.html
kafka java api 生产者 producer 与消费者consumer相关推荐
- Kafka学习笔记 --- 生产者producer与消费者关系comsumer
生产者:生产者可以将数据发布到所选择的topic(主题)中.生产者负责将记录分配到topic的哪一个 partition(分区)中.可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数( ...
- kafka java api 删除_Kafka入门系列—6. Kafka 常用命令及Java API使用
常用命令 启动Zookeeper ./zkServer.sh start-foreground 可选参数: ./zkServer.sh {start|start-foreground|stop|res ...
- 【Kafka】kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小
1.概述 kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小 package com.dtwave.kafka.storage;import org. ...
- kafka java api 删除,Kafka:删除闲置的消费者组ID
In some cases, I use Kafka-stream to model a small in memory (hashmap) projection of a topic. The K, ...
- kafka java api 删除_使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)...
使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题 ...
- kafka java api 入数据报错:Error: MESSAGE_TOO_LARGE
1.kafka版本:kafka_2.11-2.1.0-kafka-4.0.0.jar 2.server.properties:所有调优参数都是默认 3.topic :null,所有参数默认 4.入库1 ...
- 【Kafka笔记】4.Kafka API详细解析 Java版本(Producer API,Consumer API,拦截器等)
简介 Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...
- java kafka api_kafka java API的使用
Kafka包含四种核心的API: 1.Producer API支持应用将数据流发送到Kafka集群的主题 2.Consumer API支持应用从Kafka集群的主题中读取数据流 3.Streams A ...
- java kafka 消费_java编程之Kafka_消费者API详解
1 消息发送 1.异步发送导入依赖 org.apache.kafka kafka-clients 0.11.0.0 编写代码 需要用到的类: KafkaProducer:需要创建一个生产者对象,用来发 ...
最新文章
- Lumen配置文件按需加载出现的坑
- Java进程占用内存过高,排查解决方法
- Tomcat - 源码分析Tomcat是如何处理一个Servlet请求的
- 计算机应用基础寒假作业,计算机应用基础理论试卷寒假作业.doc
- Python操作Memcache
- 20200114每日一句
- 【数据挖掘学习笔记】数据挖掘中主要问题有哪些?
- 科研论文检索方法入门(计算机领域)
- 2018年个人学习计划总结
- AutoJs+mitmproxy App爬虫
- 一些杂乱的C语言算法
- Pycharm 配置 Autopep8
- 软件卸载清理工具IObit Uninstaller Pro 中文版的下载、安装与注册激活教程
- Java实现子序列问题
- python 拼音相似度_Python实现简单的文本相似度分析操作详解
- 代码随想录训练营day60
- 学计算机的人有多可怕,人类的大脑其实很可怕 不会感到疼痛还会自相残杀
- 微型计算机就是指pc,pc机是什么
- zufeoj 2350 贪吃的松鼠
- Leetcode_1279_红绿灯路口_多线程
热门文章
- lv655液晶电视东芝
- MySql添加用户以及授予权限
- C语言合并8位数据为16位数据
- 怎么样才能做好seo?我们如何做好seo
- Docker-SaltStack-Foreman-Puppet一体化安装说明
- docker快速搭建smokeping
- d=[张三,李四,王五] 输出d[0] 结果 '\xe5\xbc\xa0\xe4\xb8\x89' Python2.6列表中文输出问题怎么解决?
- 福利!囤书正当时——
- 好文转载:90后妹纸学习CPA广告推广 创业就是这么简单
- Android TextView自动缩放字体