文章目录

  • 四、Kafka API
    • 1、Producer API
      • 1.1 消息发送流程
      • 1.2 异步发送 API
      • 1.3 分区器
      • 1.4 同步发送 API

四、Kafka API

1、Producer API

1.1 消息发送流程

  • Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。

顺序:监控器 —> 序列化器 —> 分区器

相关参数:

  • batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
  • linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。

1.2 异步发送 API

1)导入依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.12.0</version></dependency>
</dependencies>

2)添加log4j2配置文件(用于打印日志)

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig"><Appenders><!-- 类型名为Console,名称为必须属性 --><Appender type="Console" name="STDOUT"><!-- 布局为PatternLayout的方式,输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here --><Layout type="PatternLayout"pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" /></Appender></Appenders><Loggers><!-- 可加性为false --><Logger name="test" level="info" additivity="false"><AppenderRef ref="STDOUT" /></Logger><!-- root loggerConfig设置 --><Root level="info"><AppenderRef ref="STDOUT" /></Root></Loggers></Configuration>

3)编写代码

需要用到的类:

KafkaProducer:需要创建一个生产者对象,用来发送数据

ProducerConfig:获取所需的一系列配置参数

ProducerRecord:每条数据都要封装成一个ProducerRecord对象

  • kafka生产者的api最简单版本
public class MyProducer{public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers","hadoop105:9092");properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<String,String>(properties);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("bajie", "--gaolaozhuangshuiyundong--" + i));}producer.close();}
}
[xiaoxq@hadoop105 kafka_2.11-2.4.1]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop105:9092 --topic bajie
--gaolaozhuangshuiyundong--0
--gaolaozhuangshuiyundong--1
--gaolaozhuangshuiyundong--2
--gaolaozhuangshuiyundong--3
--gaolaozhuangshuiyundong--4
--gaolaozhuangshuiyundong--5
--gaolaozhuangshuiyundong--6
--gaolaozhuangshuiyundong--7
--gaolaozhuangshuiyundong--8
--gaolaozhuangshuiyundong--9

(1)不带回调函数的API

public class MyProducer2 {public static void main(String[] args) {//todo 创建生产者的配置对象Properties properties = new Properties();//1.设定集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop105:9092");//2.设定ack等级properties.put(ProducerConfig.ACKS_CONFIG,"all");//3.设定重试的次数properties.put(ProducerConfig.RETRIES_CONFIG,"2");//4.设定batch的大小properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");//5.设定间隔时间properties.put(ProducerConfig.LINGER_MS_CONFIG,1);//6.设定缓冲区大小properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");//7.设定key和value的序列化方式properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//todo 创建生产者的对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//发送数据for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("bajie","gaolaozhuang---" + i));}//TODO 关闭生产者对象producer.close();}
}
[xiaoxq@hadoop105 kafka_2.11-2.4.1]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop105:9092 --topic bajiegaolaozhuang---0
gaolaozhuang---1
gaolaozhuang---2
gaolaozhuang---3
gaolaozhuang---4
gaolaozhuang---5
gaolaozhuang---6
gaolaozhuang---7
gaolaozhuang---8
gaolaozhuang---9

(2)带回调函数的API

  • 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadataException,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。
  • 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
public class MyProducer3 {public static void main(String[] args) {//todo 创建生产者的配置对象Properties properties = new Properties();//1.设定集群,broker-listproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop105:9092");//2.设定ack等级properties.put(ProducerConfig.ACKS_CONFIG,"all");//3.设定重试的次数properties.put(ProducerConfig.RETRIES_CONFIG,"2");//4.设定batch的大小 16kbproperties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");//5.设定间隔时间 1msproperties.put(ProducerConfig.LINGER_MS_CONFIG,1);//6.设定缓冲区大小 32Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");//7.设定key和value的序列化方式properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//todo 创建生产者的对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//发送数据for (int i = 0; i < 10; i++) {//采用生产者对象发送数据producer.send(new ProducerRecord<String, String>("bajie","er shi xiong shi fu bei yao guai zhua zou le---" + i),new Callback() {@Override//这个回调函数 在ack正常返回的时候是返回metadata,如果不正常返回则返回异常public void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e==null){System.out.println("topic:" + recordMetadata.topic()+ "--partition:" + recordMetadata.partition()+ "--offset:" + recordMetadata.offset() );}}});}//TODO 关闭生产者对象//这个close会在关闭前将所有数据处理完再退出生产者.producer.close();}
}

控制台显示

topic:bajie--partition:1--offset:10
topic:bajie--partition:1--offset:11
topic:bajie--partition:1--offset:12
topic:bajie--partition:1--offset:13
topic:bajie--partition:1--offset:14
topic:bajie--partition:1--offset:15
topic:bajie--partition:1--offset:16
topic:bajie--partition:1--offset:17
topic:bajie--partition:1--offset:18
topic:bajie--partition:1--offset:19

主题中添加的内容

er shi xiong shi fu bei yao guai zhua zou le---0
er shi xiong shi fu bei yao guai zhua zou le---1
er shi xiong shi fu bei yao guai zhua zou le---2
er shi xiong shi fu bei yao guai zhua zou le---3
er shi xiong shi fu bei yao guai zhua zou le---4
er shi xiong shi fu bei yao guai zhua zou le---5
er shi xiong shi fu bei yao guai zhua zou le---6
er shi xiong shi fu bei yao guai zhua zou le---7
er shi xiong shi fu bei yao guai zhua zou le---8
er shi xiong shi fu bei yao guai zhua zou le---9

1.3 分区器

1) 默认的分区器 DefaultPartitioner

  • interceptors拦截器过滤 —–> serializer序列化器进行序列化方便数据的传输 —–> partitions分区

DefaultPartitioner分区实现:

  • 当指定了key,则采用固定的算法,每次都会计算出同一分区号,如果key不存在则计算出一个可用分区的的一个任意分区

2)自定义分区器

public class MyPartitioner implements Partitioner {@Override/*** 主要写分区逻辑* 有bajie 的就往0号分区* 没有bajie 的就往1号分区*/public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {if (new String(bytes1).contains("bajie")){return 0;}else {return 1;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

编写partitionerProducer

public class PartitionerProducer {public static void main(String[] args) {//todo 创建生产者的配置对象Properties properties = new Properties();//1.设定集群,broker-listproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop105:9092");//2.设定ack等级properties.put(ProducerConfig.ACKS_CONFIG,"all");//3.设定重试的次数properties.put(ProducerConfig.RETRIES_CONFIG,"2");//4.设定batch的大小 16kbproperties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");//5.设定间隔时间 1msproperties.put(ProducerConfig.LINGER_MS_CONFIG,1);//6.设定缓冲区大小 32Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");//7.设定分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"partitioner.MyPartitioner");//8.设定key和value的序列化方式properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//todo 创建生产者的对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//发送数据for (int i = 0; i < 10; i++) {//采用生产者对象发送数据if (i<5){producer.send(new ProducerRecord<String, String>("bajie","er shi xiong shi bajie ---" + i),new Callback() {@Override//这个回调函数 在ack正常返回的时候是返回 metadata,如果不正常返回则返回异常public void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e==null){System.out.println("topic:" + recordMetadata.topic()+ "--partition:" + recordMetadata.partition()+ "--offset:" + recordMetadata.offset() );}}});}else{producer.send(new ProducerRecord<String, String>("bajie","da shi xiong shi wu kong ---" + i),new Callback() {@Override//这个回调函数 在ack正常返回的时候是返回 metadata,如果不正常返回则返回异常public void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e==null){System.out.println("topic:" + recordMetadata.topic()+ "--partition:" + recordMetadata.partition()+ "--offset:" + recordMetadata.offset() );}}});}}//TODO 关闭生产者对象//这个close会在关闭前将所有数据处理完再退出生产者.producer.close();}
}

执行后结果为

topic:bajie--partition:1--offset:25
topic:bajie--partition:1--offset:26
topic:bajie--partition:1--offset:27
topic:bajie--partition:1--offset:28
topic:bajie--partition:1--offset:29
topic:bajie--partition:0--offset:14
topic:bajie--partition:0--offset:15
topic:bajie--partition:0--offset:16
topic:bajie--partition:0--offset:17
topic:bajie--partition:0--offset:18

1.4 同步发送 API

  • 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
  • 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方法即可。

编写SyncProducer

public class SyncProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//todo 创建生产者的配置对象Properties properties = new Properties();//1.设定集群,broker-listproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop105:9092");//2.设定ack等级properties.put(ProducerConfig.ACKS_CONFIG,"all");//3.设定重试的次数properties.put(ProducerConfig.RETRIES_CONFIG,"2");//4.设定batch的大小 16kbproperties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");//5.设定间隔时间 1msproperties.put(ProducerConfig.LINGER_MS_CONFIG,1);//6.设定缓冲区大小 32Mproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");//7.设定key和value的序列化方式properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//todo 创建生产者的对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);//发送数据for (int i = 0; i < 10; i++) {//采用生产者对象发送数据Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("bajie", "er shi xiong shi bajie ---" + i), new Callback() {@Override//这个回调函数 在ack正常返回的时候是返回 metadata,如果不正常返回则返回异常public void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println("topic:" + recordMetadata.topic()+ "--partition:" + recordMetadata.partition()+ "--offset:" + recordMetadata.offset());}}});//System.out.println("消息已经发送了");future.get();}//TODO 关闭生产者对象//这个close会在关闭前将所有数据处理完再退出生产者.producer.close();}
}

执行后结果

opic:bajie--partition:0--offset:34
topic:bajie--partition:1--offset:35
topic:bajie--partition:0--offset:35
topic:bajie--partition:1--offset:36
topic:bajie--partition:0--offset:36
topic:bajie--partition:1--offset:37
topic:bajie--partition:0--offset:37
topic:bajie--partition:1--offset:38
topic:bajie--partition:0--offset:38
topic:bajie--partition:1--offset:39当 //System.out.println("消息已经发送了");注释打开后执行结果
消息已经发送了
topic:bajie--partition:0--offset:39
消息已经发送了
topic:bajie--partition:1--offset:40
消息已经发送了
topic:bajie--partition:0--offset:40
消息已经发送了
topic:bajie--partition:1--offset:41
消息已经发送了
topic:bajie--partition:0--offset:41
消息已经发送了
topic:bajie--partition:1--offset:42
消息已经发送了
topic:bajie--partition:0--offset:42
消息已经发送了
topic:bajie--partition:1--offset:43
消息已经发送了
topic:bajie--partition:0--offset:43
消息已经发送了
topic:bajie--partition:1--offset:44

Kafka API的运用(Producer API)相关推荐

  1. 学习笔记Kafka(五)—— Kafka 开发环境配置及Producer API

    一.开发环境说明 1.创建Maven工程 1.1.开发环境 Maven && JDK 1.2.Pom配置 Compiler Configuration 在pom.xml添加: < ...

  2. 【Kafka笔记】4.Kafka API详细解析 Java版本(Producer API,Consumer API,拦截器等)

    简介 Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...

  3. kafka系列九、kafka事务原理、事务API和使用场景

    一.事务场景 最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 . producer可能会给多个topic,多个partition发消息,这些 ...

  4. kafka 主动消费_Kafka核心API——Consumer消费者

    Consumer之自动提交 在上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了.因此,本文 ...

  5. kafka java api 删除_Kafka入门系列—6. Kafka 常用命令及Java API使用

    常用命令 启动Zookeeper ./zkServer.sh start-foreground 可选参数: ./zkServer.sh {start|start-foreground|stop|res ...

  6. Producer API

    Producer API 文章目录 一.消息发送流程 二.异步发送API 1.1.不带回调函数的API 1.2在集群上启动消费者 2.1带回调函数的API 三.同步发送API 一.消息发送流程 ​ K ...

  7. Kafka的四个核心API

    · 使用 Producer API 发布消息到kafka集群中一个或多个topic. · 使用 Consumer API 来订阅一个或多个topic,并处理产生的消息. · 使用 Streams AP ...

  8. 海边的卡夫卡之 - kafka的基本概念以及Api使用

    海边的卡夫卡之 - kafka的基本概念以及Api使用 kafka的应用以及与其他MQ的对比 关于kafka的介绍,也许没有人能比官网更具有话语权,所以这里可以参考官网了解一下kafka:Kafka介 ...

  9. kafka Java客户端之 consumer API 消费消息

    背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...

最新文章

  1. VMware安装CentOS6
  2. apache 下实现防盗链
  3. SpringBoot+Thymeleaf+DataTables实现数据的查询与显示
  4. boost::hana::is_convertible用法的测试程序
  5. 自己有電腦的一定要看看,非常有用呢。
  6. 【软考高项】信息系统项目管理师 论文写作技巧分享 (上)
  7. Opencv_黑白素描
  8. python基础:os.path的相关操作
  9. sql compare mysql版本_SQL Compare 13免费版
  10. <C++>初识类的继承,用三行情诗打开继承的大门
  11. 【机器学习】数据驱动方法在电网稳定分析应用浅谈
  12. Linux如何测试驱动性能,掌握 Linux PC 性能之基准测试
  13. 大武口计算机考试培训,大武口电脑培训地址
  14. linux mkdir命令用法,linux里面的mkdir命令
  15. 高新技术企业申请容易吗?如何提高申报通过的机率?
  16. java实现m3u8文件抓取器
  17. PX4仿真时,如何在Gazebo下添加物理环境
  18. AUTOCAD使用心得
  19. 【TVM帮助文档学习】使用TVMC编译和优化模型
  20. 盐酸除铁方法 盐酸除铁树脂 盐酸里面的铁怎么去除

热门文章

  1. 医院药品进销存管理系统
  2. Springboot科研项目申报网站设计6109r计算机毕业设计-课程设计-期末作业-毕设程序代做
  3. 多线程的十二种设计模式
  4. ubuntu18.04 Intel Realsense T265与Realsense D435i 使用教程
  5. 吴恩达说人工智能永恒的春天已经到来你准备好了吗?
  6. 斯坦福SR810锁相放大器常用的噪声分析方法
  7. 嵌入式Linux开发(转载)
  8. 择校秘籍|浙大工程师、华科机械 和 南开人工智能 应该怎么选?
  9. ie9 去掉bing 加速
  10. PR片头模板 超震撼大气蓝色(颜色可自定义)粒子特效开场片头PR模板