Apache Kafka-生产消费基础篇
文章目录
- POM 依赖
- 生产者
- 消费者
- 测试
POM 依赖
版本请同使用的kafka服务端的版本保持一致
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>
生产者
请小伙伴注意一下注释,这里就不做多余的解释啦
package com.artisan.kafka.first;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 19:45* @mark: show me the code , change the world*/
public class ProduceClient {private static final String TOPIC = "artisanTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {// 属性设置 Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.126.140:9092");properties.put(ProducerConfig.ACKS_CONFIG,"1");properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 根据属性实例化KafkaProducerProducer<String,String> producer = new KafkaProducer<String, String>(properties);// 创建消息 三个参数,分别是 Topic ,消息的 key ,消息的 message 。String message = "mockValue";ProducerRecord<String ,String> producerRecord = new ProducerRecord<>(TOPIC, "mockKey", message);// 发送消息 (同步)Future<RecordMetadata> result = producer.send(producerRecord);// 获取同步发送的结果RecordMetadata recordMetadata = result.get();System.out.println(String.format("Message[ %s ] sent to Topic: %s || Partition: %s || Offset: %s",message, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()));}}
消费者
请小伙伴注意一下注释,这里就不做多余的解释啦
package com.artisan.kafka.first;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 20:09* @mark: show me the code , change the world*/
public class ConsumerClient {private static final String TOPIC = "artisanTopic";public static void main(String[] args) {// 属性设置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG , "192.168.126.140:9092"); // Broker 的地址properties.put(ConsumerConfig.GROUP_ID_CONFIG,"artisan-consumer-group");// 消费者分组properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); // 设置消费者分组最初的消费进度为 earliestproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true"); // 是否自动提交消费进度properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); // 自动提交消费进度频率properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息的 key 的反序列化方式properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 消息的 value 的反序列化方式// 根据设置实例化KafkaConsumerConsumer<String,String> consumer = new KafkaConsumer<>(properties);// 订阅消息consumer.subscribe(Collections.singleton(TOPIC));// 循环拉取消息while (true){// 拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));// 遍历处理消息records.forEach(record -> System.out.println(String.format("接收到消息:Key %s || 内容 %s" , record.key(),record.value())));}}
}
属性的话,需要结合kafka的特性来讲解,后面的单独介绍
测试
运行Produce
运行消费端
Apache Kafka-生产消费基础篇相关推荐
- Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析
文章目录 概述 Spring-kafka生产者源码流程 Spring-kafka消费者源码流程(`@EnableKafka`和`@KafkaListener` ) Flow KafkaListener ...
- kafka生产消费原理笔记
一.什么是kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性 ...
- java利用kafka生产消费消息
2019独角兽企业重金招聘Python工程师标准>>> 1.producer程序 package com.test.frame.kafka.controller;import kaf ...
- java kafka 消费_java利用kafka生产消费消息
1.producer程序 package com.test.frame.kafka.controller; import kafka.javaapi.producer.Producer; import ...
- 本地windows下新建kafka生产消费数据
Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper 1.安装zookeeper 1.1 下载安装文件: http://mirror.bit.edu. ...
- Apache Doris 系列: 基础篇-Routine Load
简介 Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中. 目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CS ...
- Apache Doris 系列: 基础篇-Flink SQL写入Doris
简介 本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分: Flink Doris connector Doris FE 节点配置 Flink SQL 写 Doris ...
- KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)
文章目录 1. 技术选型 2. 导入依赖 3. kafka配置 4. 生产者(同步) 5. 生产者(异步) 6. 消费者 1. 技术选型 软件/框架 版本 jdk 1.8.0_202 springbo ...
- MAC搭建kafka客户端以及实现生产消费
Kafka 部分参数说明 (1)max.in.flight.requests.per.connection Kafka 可以保证同一个分区里的消息是有序的.也就是说,如果生产者按照一定的顺序发送消息, ...
最新文章
- python什么时候进入中国-Python什么时候开始流行的?还能流行多久?
- 计算机缺失esul.dll,SceneUI.ES.dll
- $.ajax所犯的错误。success后面不执行
- LeetCode 第 198 场周赛(434/5778,前7.51%)
- Fedora/RedHat上使用Docker命令搭建Mysql服务器
- 如何提高码农产量,基于ASP.NET MVC的敏捷开发框架之移动端开发随笔二
- 【番外篇】ASP.NET MVC快速入门之免费jQuery控件库(MVC5+EF6)
- Spring Security的HTTP基本验证示例
- java future模式 所线程实现异步调用
- 移动平台开发项目(推箱子小游戏)
- 拓端tecdat|Excel中计算票面利率Coupon Rate
- 辞职的新方式:一言不合就消失!
- 商务与经济统计学 第五章案例题
- 冰点还原精灵有什么作用
- java分布式日志组件
- 基于HTML+CSS+JavaScript+Bootstarp响应式健身网站(web前端期末大作业)
- Android Studio 如何查看Sqlite数据文件
- 我的世界服务器登录显示motd,[信息]ColorMOTD —— 究极Motd插件,外带反压测[1.7-1.8]...
- 【论文简述】Rethinking Depth Estimation for Multi-View Stereo: A Unified Representation(CVPR 2022)
- 铰削和铰刀的基本概念
热门文章
- java数组的clone方法_深入浅出,如何更彻底地理解Java数组的clone方法
- java osgi web开发_基于 OSGi 和 Spring 开发 Web 应用
- python输入逗号分隔值文件_如何在Python(Pygame)中显示逗号分隔值(.txt)文件中的前10个高分...
- python解析html模块_Python HTMLParser模块解析html获取url实例
- 重温强化学习之基于模型方法:动态规划
- 文巾解题 929. 独特的电子邮件地址
- MATLAB从入门到精通-Simulink模块连续模块之积分(Continuous-Integrator)
- 深度学习核心技术精讲100篇(三十六)-EdgeRec:边缘计算在淘宝推荐系统中的大规模应用
- 增强学习(五)----- 时间差分学习(Q learning, Sarsa learning)
- 使用Python绘制热图的库