c踩坑

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld

将localhost必须和PLAINTEXT配置的地址保持一致,否则的话会无限警告不能接收数据

生产者;

import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerDemo  {    public static void main(String[] args){

        try {        //新建一个类这个类用来配置kafka       Properties properties = new Properties();//用来配置 kafka的IP地址和端口号             properties.put("bootstrap.servers", "master:9092");//ack 的状态可以分为三种  1,-1,all    1代表当生产者            properties.put("acks", "1");//重要//最多重复几次生产操作            properties.put("retries", 3);//重要  //缓存每个分区未发送消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。            properties.put("batch.size", 16384);//监听间隔
            properties.put("linger.ms", 1);//控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。            properties.put("buffer.memory", 33554432);//序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//初始化生产者            Producer<String, String> producer = null;            producer = new KafkaProducer<String, String>(properties);            for (int i = 0; i < 100; i++) {                String msg = "Messagea" + i;//topic的名字:HelloWorld                producer.send(new ProducerRecord<String, String>("HelloWorld1", msg));                System.out.println("Sent:" + msg);                //Thread.sleep(1000);            }        } catch (Exception e) {            e.printStackTrace();

        }

    }}之后再启动消费者  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld --from-beginning

//消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;import java.util.Properties;

public class ConsumerDemo {    public static void main(String[] args) {

    Properties properties = new Properties();

properties.put("bootstrap.servers", "master:9092");

properties.put("group.id", "i");    //是否自动提交properties.put("enable.auto.commit", "false");

properties.put("auto.commit.interval.ms", "1000");//偏移量自增properties.put("auto.offset.reset", "earliest");////session保存时间properties.put("session.timeout.ms", "30000");//序列化properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);//topictest2kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));        while (true) {        ConsumerRecords<String, String> records = kafkaConsumer.poll(100);        for (ConsumerRecord<String, String> record : records)            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());    }

}}最后启动生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic HelloWorld

但是以上操作都有可能遇到数据重复或者数据丢失,在kafka0.11后引入了幂等和事务完美的解决了这个问题切忌是0.11版本之后包括0.11,如欲了解详情请看下篇

老规矩:散文欣赏《不老苍穹》今后的日子,我的头发越来越少,我写给你的句子越来越少,我剩下的时间越来越少,但世界永恒,星辰常在,苍穹不老。

最后的日子,我抱起你会很难,我看清你会很难,我记起你会很难,但此心至诚,百年不变,千岁未寒。

转载于:https://www.cnblogs.com/lppz/p/9991993.html

kafka java api 生产者 producer 与消费者consumer相关推荐

  1. Kafka学习笔记 --- 生产者producer与消费者关系comsumer

    生产者:生产者可以将数据发布到所选择的topic(主题)中.生产者负责将记录分配到topic的哪一个 partition(分区)中.可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数( ...

  2. kafka java api 删除_Kafka入门系列—6. Kafka 常用命令及Java API使用

    常用命令 启动Zookeeper ./zkServer.sh start-foreground 可选参数: ./zkServer.sh {start|start-foreground|stop|res ...

  3. 【Kafka】kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小

    1.概述 kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小 package com.dtwave.kafka.storage;import org. ...

  4. kafka java api 删除,Kafka:删除闲置的消费者组ID

    In some cases, I use Kafka-stream to model a small in memory (hashmap) projection of a topic. The K, ...

  5. kafka java api 删除_使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)...

    使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题 ...

  6. kafka java api 入数据报错:Error: MESSAGE_TOO_LARGE

    1.kafka版本:kafka_2.11-2.1.0-kafka-4.0.0.jar 2.server.properties:所有调优参数都是默认 3.topic :null,所有参数默认 4.入库1 ...

  7. 【Kafka笔记】4.Kafka API详细解析 Java版本(Producer API,Consumer API,拦截器等)

    简介 Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...

  8. java kafka api_kafka java API的使用

    Kafka包含四种核心的API: 1.Producer API支持应用将数据流发送到Kafka集群的主题 2.Consumer API支持应用从Kafka集群的主题中读取数据流 3.Streams A ...

  9. java kafka 消费_java编程之Kafka_消费者API详解

    1 消息发送 1.异步发送导入依赖 org.apache.kafka kafka-clients 0.11.0.0 编写代码 需要用到的类: KafkaProducer:需要创建一个生产者对象,用来发 ...

最新文章

  1. Lumen配置文件按需加载出现的坑
  2. Java进程占用内存过高,排查解决方法
  3. Tomcat - 源码分析Tomcat是如何处理一个Servlet请求的
  4. 计算机应用基础寒假作业,计算机应用基础理论试卷寒假作业.doc
  5. Python操作Memcache
  6. 20200114每日一句
  7. 【数据挖掘学习笔记】数据挖掘中主要问题有哪些?
  8. 科研论文检索方法入门(计算机领域)
  9. 2018年个人学习计划总结
  10. AutoJs+mitmproxy App爬虫
  11. 一些杂乱的C语言算法
  12. Pycharm 配置 Autopep8
  13. 软件卸载清理工具IObit Uninstaller Pro 中文版的下载、安装与注册激活教程
  14. Java实现子序列问题
  15. python 拼音相似度_Python实现简单的文本相似度分析操作详解
  16. 代码随想录训练营day60
  17. 学计算机的人有多可怕,人类的大脑其实很可怕 不会感到疼痛还会自相残杀
  18. 微型计算机就是指pc,pc机是什么
  19. zufeoj 2350 贪吃的松鼠
  20. Leetcode_1279_红绿灯路口_多线程

热门文章

  1. lv655液晶电视东芝
  2. MySql添加用户以及授予权限
  3. C语言合并8位数据为16位数据
  4. 怎么样才能做好seo?我们如何做好seo
  5. Docker-SaltStack-Foreman-Puppet一体化安装说明
  6. docker快速搭建smokeping
  7. d=[张三,李四,王五] 输出d[0] 结果 '\xe5\xbc\xa0\xe4\xb8\x89' Python2.6列表中文输出问题怎么解决?
  8. 福利!囤书正当时——
  9. 好文转载:90后妹纸学习CPA广告推广 创业就是这么简单
  10. Android TextView自动缩放字体