文章目录

  • POM 依赖
  • 生产者
  • 消费者
  • 测试

POM 依赖

版本请同使用的kafka服务端的版本保持一致

     <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>

生产者

请小伙伴注意一下注释,这里就不做多余的解释啦

package com.artisan.kafka.first;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 19:45* @mark: show me the code , change the world*/
public class ProduceClient {private static final String TOPIC = "artisanTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {// 属性设置 Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.126.140:9092");properties.put(ProducerConfig.ACKS_CONFIG,"1");properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 根据属性实例化KafkaProducerProducer<String,String>  producer = new KafkaProducer<String, String>(properties);// 创建消息  三个参数,分别是 Topic ,消息的 key ,消息的 message 。String message = "mockValue";ProducerRecord<String ,String> producerRecord = new ProducerRecord<>(TOPIC, "mockKey", message);// 发送消息  (同步)Future<RecordMetadata> result = producer.send(producerRecord);// 获取同步发送的结果RecordMetadata recordMetadata = result.get();System.out.println(String.format("Message[ %s ] sent to Topic: %s  || Partition: %s || Offset: %s",message, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()));}}

消费者

请小伙伴注意一下注释,这里就不做多余的解释啦

package com.artisan.kafka.first;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 20:09* @mark: show me the code , change the world*/
public class ConsumerClient {private static final String TOPIC = "artisanTopic";public static void main(String[] args) {// 属性设置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG , "192.168.126.140:9092");  // Broker 的地址properties.put(ConsumerConfig.GROUP_ID_CONFIG,"artisan-consumer-group");// 消费者分组properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");  // 设置消费者分组最初的消费进度为 earliestproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); // 是否自动提交消费进度properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); // 自动提交消费进度频率properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息的 key 的反序列化方式properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 消息的 value 的反序列化方式// 根据设置实例化KafkaConsumerConsumer<String,String>  consumer = new KafkaConsumer<>(properties);// 订阅消息consumer.subscribe(Collections.singleton(TOPIC));// 循环拉取消息while (true){// 拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));// 遍历处理消息records.forEach(record -> System.out.println(String.format("接收到消息:Key %s  || 内容 %s" , record.key(),record.value())));}}
}

属性的话,需要结合kafka的特性来讲解,后面的单独介绍


测试

运行Produce

运行消费端

Apache Kafka-生产消费基础篇相关推荐

  1. Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析

    文章目录 概述 Spring-kafka生产者源码流程 Spring-kafka消费者源码流程(`@EnableKafka`和`@KafkaListener` ) Flow KafkaListener ...

  2. kafka生产消费原理笔记

    一.什么是kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性 ...

  3. java利用kafka生产消费消息

    2019独角兽企业重金招聘Python工程师标准>>> 1.producer程序 package com.test.frame.kafka.controller;import kaf ...

  4. java kafka 消费_java利用kafka生产消费消息

    1.producer程序 package com.test.frame.kafka.controller; import kafka.javaapi.producer.Producer; import ...

  5. 本地windows下新建kafka生产消费数据

    Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper 1.安装zookeeper 1.1 下载安装文件: http://mirror.bit.edu. ...

  6. Apache Doris 系列: 基础篇-Routine Load

    简介 Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中. 目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CS ...

  7. Apache Doris 系列: 基础篇-Flink SQL写入Doris

    简介 本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分: Flink Doris connector Doris FE 节点配置 Flink SQL 写 Doris ...

  8. KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)

    文章目录 1. 技术选型 2. 导入依赖 3. kafka配置 4. 生产者(同步) 5. 生产者(异步) 6. 消费者 1. 技术选型 软件/框架 版本 jdk 1.8.0_202 springbo ...

  9. MAC搭建kafka客户端以及实现生产消费

    Kafka 部分参数说明 (1)max.in.flight.requests.per.connection Kafka 可以保证同一个分区里的消息是有序的.也就是说,如果生产者按照一定的顺序发送消息, ...

最新文章

  1. python什么时候进入中国-Python什么时候开始流行的?还能流行多久?
  2. 计算机缺失esul.dll,SceneUI.ES.dll
  3. $.ajax所犯的错误。success后面不执行
  4. LeetCode 第 198 场周赛(434/5778,前7.51%)
  5. Fedora/RedHat上使用Docker命令搭建Mysql服务器
  6. 如何提高码农产量,基于ASP.NET MVC的敏捷开发框架之移动端开发随笔二
  7. 【番外篇】ASP.NET MVC快速入门之免费jQuery控件库(MVC5+EF6)
  8. Spring Security的HTTP基本验证示例
  9. java future模式 所线程实现异步调用
  10. 移动平台开发项目(推箱子小游戏)
  11. 拓端tecdat|Excel中计算票面利率Coupon Rate
  12. 辞职的新方式:一言不合就消失!
  13. 商务与经济统计学 第五章案例题
  14. 冰点还原精灵有什么作用
  15. java分布式日志组件
  16. 基于HTML+CSS+JavaScript+Bootstarp响应式健身网站(web前端期末大作业)
  17. Android Studio 如何查看Sqlite数据文件
  18. 我的世界服务器登录显示motd,[信息]ColorMOTD —— 究极Motd插件,外带反压测[1.7-1.8]...
  19. 【论文简述】Rethinking Depth Estimation for Multi-View Stereo: A Unified Representation(CVPR 2022)
  20. 铰削和铰刀的基本概念

热门文章

  1. java数组的clone方法_深入浅出,如何更彻底地理解Java数组的clone方法
  2. java osgi web开发_基于 OSGi 和 Spring 开发 Web 应用
  3. python输入逗号分隔值文件_如何在Python(Pygame)中显示逗号分隔值(.txt)文件中的前10个高分...
  4. python解析html模块_Python HTMLParser模块解析html获取url实例
  5. 重温强化学习之基于模型方法:动态规划
  6. 文巾解题 929. 独特的电子邮件地址
  7. MATLAB从入门到精通-Simulink模块连续模块之积分(Continuous-Integrator)
  8. 深度学习核心技术精讲100篇(三十六)-EdgeRec:边缘计算在淘宝推荐系统中的大规模应用
  9. 增强学习(五)----- 时间差分学习(Q learning, Sarsa learning)
  10. 使用Python绘制热图的库