前面已经讲到了,在Kafka中,Message是由Producer产生的,Producer产生的Message会发送到Topic的指定Partition中。Producer可以有多种形式,也可以由用户通过Java,C以及Python语言来自定义。
  Kafka中Producer的主要作用和地位如下图所示,Producer通过获取某个Topic指定Partition的Leader节点连接到Kafka集群中,

一、Java Producer API

  用户可以基于Kafka提供的API自定义Producer,在这些API中有几个主要的类:

1. kafka.javaapi.producer.Producer
  类定义:

class Producer[ K,V ](private val underlying: kafka.producer.Producer[K ,V])

  UML图:
  

2. kafka.producer.ProducerConfig
  类定义:   

class ProducerConfig private (val props: VerifiableProperties)extends AsyncProducerConfig with SyncProducerConfigShared

  UML图:
  

3. kafka.producer.KeyedMessage
  类定义:

case class KeyedMessage[ K, V ](val topic: String, val key: K, val partKey: Any , val message: V)

二、自定义简单的Producer

  接下来根据上面的三个类,使用Java代码实现一个简单的Producer向Broker发送Message。这个Producer会为特定的Topic生成Message并发送到默认的Partition中。
  具体代码和过程在代码和注释中。
1、Java代码

package ckm.kafka.producer;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;import java.util.Date;
import java.util.Properties;/*** 一个简单的Kafka Producer类,传入两个参数:* topic num* 设置主题和message条数** 执行过程:* 1、创建一个topic* kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic xxxx* 2、运行本类中的代码* 3、查看message* kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic xxxx* kafka*/
public class SimpleKafkaProducer {/*** Producer的两个泛型,第一个指定Key的类型,第二个指定value的类型*/private static Producer<String, String> producer;public SimpleKafkaProducer() {Properties props = new Properties();/*** 指定producer连接的broker列表*/props.put("metadata.broker.list", "m000:9092, m001:9092, m002:9092");/*** 指定message的序列化方法,用户可以通过实现kafka.serializer.Encoder接口自定义该类* 默认情况下message的key和value都用相同的序列化,但是可以使用"key.serializer.class"指定key的序列化*/props.put("serializer.class", "kafka.serializer.StringEncoder");/*** 这个参数用于通知broker接收到message后是否向producer发送确认信号*  0 - 表示producer不用等待任何确认信号,会一直发送消息,* 否则producer进入等待状态* -1 - 表示leader状态的replica需要等待所有in-sync状态的replica都接收到消息后才会向producer发送确认信号,* 再次之前producer一直处于等待状态*/props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);producer = new Producer<String, String>(config);}public static void main(String[] args) {if (args.length < 2) {System.out.println("Please Input Topic and Message Numbers");}String topic = (String) args[0];int count = Integer.parseInt((String) args[1]);System.out.println("Topic = " + topic);System.out.println("Message Nums = " + count);SimpleKafkaProducer simpleProducer = new SimpleKafkaProducer();simpleProducer.publishMessage(topic, count);}/*** 根据topic和消息条数发送消息* @param topic* @param count*/private void publishMessage(String topic, int count) {for (int i = 0; i < count; i ++) {String runtime = new Date().toString();String msg = "Message published time - " + runtime;System.out.println("msg = " + msg);/*** 第一个泛型指定用于分区的key的类型,第二个泛型指message的类型* topic只能为String类型*/KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg);producer.send(data);}producer.close();}
}

2、运行
(1)启动ZooKeeper

$ZK_HOME/bin/zkServer.sh start


(2)启动Kafka集群

cd $KAFKA_HOME
nohup bin/kafka-server-start.sh config/server.properties &


(3)创建测试Topic

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper m000:2181 --replication-factor 1 --partition 3 --topic simple-kafka-producer


(4)运行SimpleKafkaProducer 代码
  运行该代码,向simple-kafka-producer Topic发送10条Message

java -cp KafkaTestProgram.jar ckm.kafka.producer.SimpleKafkaProducer simple-kafka-producer 10


(5)查看simple-kafka-producer中的Message

bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic simple-kafka-producer

三、自定义Partition的Producer

  这一节中除了实现Producer之外,还自定义了Message的Partition划分过程。
  在这里,将会模拟一个网页访问日志生成的过程,每条随机生成的日志Message中包含三个部分的信息:
- 页面访问时间戳
- 页面名称
- 访问页面的IP地址
  
1、Java代码
(1)Producer

package ckm.kafka.producer;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;import java.util.Date;
import java.util.Properties;
import java.util.Random;/*** 一个自定义分区的Kafka Producer类,传入两个参数:* topic num* 设置主题和message条数** 模拟用户点击日志,日志格式为:“时间,网址,IP地址"格式** 自定义分区,通过IP地址最后一位与分区数求余,message分散到0~partition - 1这些分区中** 执行过程:* 1、创建一个topic* kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic xxxx* 2、运行本类中的代码* 3、查看message* kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic xxxx* kafka*/
public class KafkaProducerWithPartition {/*** Producer的两个泛型,第一个指定Key的类型,第二个指定value的类型*/private static Producer<String, String> producer;public KafkaProducerWithPartition() {Properties props = new Properties();/*** 指定producer连接的broker列表*/props.put("metadata.broker.list", "m000:9092, m001:9092, m002:9092");/*** 指定message的序列化方法,用户可以通过实现kafka.serializer.Encoder接口自定义该类* 默认情况下message的key和value都用相同的序列化,但是可以使用"key.serializer.class"指定key的序列化*/props.put("serializer.class", "kafka.serializer.StringEncoder");/*** 这个参数用于通知broker接收到message后是否向producer发送确认信号*  0 - 表示producer不用等待任何确认信号,会一直发送消息*  1 - 表示leader状态的replica在接收到message后需要向producer发送一个确认信号,否则producer进入等待状态* -1 - 表示leader状态的replica需要等待所有in-sync状态的replica都接收到消息后才会向producer发送确认信号,再次之前producer一直处于等待状态*/props.put("request.required.acks", "1");/*** 指定partition类,自定义的分区类,继承自kafka.producer.Partitioner接口*/props.put("partitioner.class", "ckm.kafka.producer.SimplePartitioner");ProducerConfig config = new ProducerConfig(props);producer = new Producer<String, String>(config);}public static void main(String[] args) {if (args.length < 2) {System.out.println("Please Input Topic and Message Numbers");}String topic = (String) args[0];int count = Integer.parseInt((String) args[1]);System.out.println("Topic = " + topic);System.out.println("Message Nums = " + count);KafkaProducerWithPartition simpleProducer = new KafkaProducerWithPartition();simpleProducer.publishMessage(topic, count);}/*** 根据topic和消息条数发送消息* @param topic* @param count*/private void publishMessage(String topic, int count) {Random random = new Random();for (int i = 0; i < count; i ++) {String runtime = new Date().toString();// 访问的IP地址String clientIP = "192.168.1." + random.nextInt(255);String msg = runtime + ",kafka.apache.org," + clientIP;System.out.println("msg = " + msg);/*** 第一个泛型指定用于分区的key的类型,第二个泛型指message的类型* topic只能为String类型* 和上一个Producer相比,多了一个用于分区的key*/KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, clientIP, msg);producer.send(data);}producer.close();}
}

(2)Partitioner

package ckm.kafka.producer;import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;/*** Created by ckm on 2016/8/3.*/
public class SimplePartitioner implements Partitioner {/*** 不写这个方法,会报错* Exception in thread "main" java.lang.NoSuchMethodException: ckm.kafka.producer.SimplePartitioner.<init>(kafka.utils.VerifiableProperties)* at java.lang.Class.getConstructor0(Class.java:2892)* at java.lang.Class.getConstructor(Class.java:1723)* at kafka.utils.Utils$.createObject(Utils.scala:436)* at kafka.producer.Producer.<init>(Producer.scala:61)* at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)* at ckm.kafka.producer.KafkaProducerWithPartition.<init>(KafkaProducerWithPartition.java:58)* at ckm.kafka.producer.KafkaProducerWithPartition.main(KafkaProducerWithPartition.java:70)* @param verifiableProperties*/public SimplePartitioner(VerifiableProperties verifiableProperties) {}public int partition(Object key, int numPartitions) {int partition = 0;String partitionKey = (String) key;int offset = partitionKey.lastIndexOf('.');if (offset > 0) {partition = Integer.parseInt(partitionKey.substring(offset + 1)) % numPartitions;}return partition;}
}

2、运行
  由于前面已经启动了ZooKeeper以及Kafka,这里直接从创建Topic开始
(1)创建Topic
  创建一个partition为3,replication为3的topic。

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper m000:2181 --replication-factor 3 --partitions 3 --topic partition-kafka-producer


  如何使用list命令查看该Topic,可以参考前面的示例
 (2)运行Java代码

java -cp KafkaTestProgram.jar ckm.kafka.producer.KafkaProducerWithPartition partition-kafka-producer 100

  往partition-kafka-producer Topic中写入100条随机生成的Message。

(3)查看这些Message

$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic partition-kafka-producer

四、自定义Producer的封装

  上面两种自定义的Producer中,其实有很多代码是重复性的。接下来对Kafka自定义Producer进行一定的封装,使其使用和配置更加简便。
  经过封装后,producer有关的参数都写在properties文件中。
  第二步中的Producer的调用方法为:

KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
kafkaProducerTool.publishMessage("test message");

  两行代码就可以将该message发送到配置的Kafka集群指定的topic中。

  第三步中的自定义Partitioner的Producer的调用方法为:

KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
Properties producerProperties = kafkaProducerTool.getProducerProperties();
// 如果properties配置文件中没有配置该参数的话,手动设置
producerProperties.put("partitioner.class", "SimplePartitioner");
kafkaProducerTool.publishPartitionedMessage("partition-key", "test messate");

  具体代码可以参考KafkaProducerTool。
  欢迎提出宝贵意见。

Kafka系列之-自定义Producer相关推荐

  1. Kafka系列之-Kafka监控工具KafkaOffsetMonitor配置及使用

    KafkaOffsetMonitor是一个可以用于监控Kafka的Topic及Consumer消费状况的工具,其配置和使用特别的方便.源项目Github地址为:https://github.com/q ...

  2. Kafka系列之-Kafka Protocol实例分析

    本文基于A Guide To The Kafka Protocol文档,以及Spark Streaming中实现的org.apache.spark.streaming.kafka.KafkaClust ...

  3. Kafka系列一之架构介绍和安装

    Kafka架构介绍和安装 写在前面 还是那句话,当你学习一个新的东西之前,你总得知道这个东西是什么?这个东西可以用来做什么?然后你才会去学习它,使用它.简单来说,kafka既是一个消息队列,如今,它也 ...

  4. java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器

    Java kafka如何实现自定义分区类和拦截器 2.producer配置文件指定,具体的分区类 // 具体的分区类 props.put(ProducerConfig.PARTITIONER_CLAS ...

  5. Kafka系列 —— 生产实践分享

    Kafka系列文章: Kafka系列 -- 入门及应用场景 & 部署 & 简单测试 Kafka系列 -- Kafka核心概念 Kafka系列 -- Kafka常用命令 Kafka系列 ...

  6. Kafka系列之:增加Kafka节点扩展Kafka集群

    Kafka系列之:增加Kafka节点扩展Kafka集群 一.增加Kafka节点 二.分区重新分配工具三种工作模式 三.自动将数据迁移到新机器 四.自定义分区分配和迁移 五.增加复制因子 六.在数据迁移 ...

  7. kafka系列之kafka生产者与分区(3)

    概要 当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量? 举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消 ...

  8. Kafka系列(五)、开启SASL安全认证以及配置ACL权限控制

    目录 开启SASL 控制台配置用户 ACL授权 Python客户端访问 ACL常用命令 Kafka系列: kafka 2.4.1单机版部署及使用 kafka监控系统kafka eagle安装使用 滴滴 ...

  9. Kafka系列(七)、Kafka套件 Confluent Platform 单机/集群部署

    目录 简介 单机部署 集群部署 尾巴 Kafka系列: kafka 2.4.1单机版部署及使用 kafka监控系统kafka eagle安装使用 滴滴开源的kafka-manager编译及部署使用 k ...

  10. Kafka系列(六)、Kafka开发套件kafka lenses 安装及使用(带WebUI)

    目录 介绍 安装 使用 Kafka系列: kafka 2.4.1单机版部署及使用 kafka监控系统kafka eagle安装使用 滴滴开源的kafka-manager编译及部署使用 kafka管理监 ...

最新文章

  1. 洛谷p1162填涂颜色(dfs写法)
  2. win10: Coursera 视频无法观看问题解决。
  3. 华为快应用引擎架构及开发实践
  4. 名图空间实测_骗子!20来万的大众SUV号称7.8秒破百,实测后我服了!
  5. APP视觉稿该怎么切图和标注
  6. 最佳实践|Spring Boot 应用如何快速接入 Prometheus 监控
  7. 数据可视化系列(二):艺术画笔见乾坤
  8. USACO 1.5 Number Triangles
  9. SSM博客 点赞和文章浏览量实现
  10. 三级java_java三级(最全的题库).doc
  11. 金额要用BigDecimal,原理分析
  12. 合成孔径雷达(微波遥感)的应用
  13. FastAPI 快速入门
  14. 计算机移动硬盘无法访问,移动硬盘无法访问参数不正确的解决方法
  15. 二进制除法原理——两种简便方法
  16. 使用Eclipse WTP进行快速Web开发
  17. Cisco WLC9800 CWA FlexConnect with ISE
  18. python爬取微博热搜榜
  19. 汉堡菜单html加logo,HTML+Sass实现HambergurMenu(汉堡包式菜单)
  20. 论大学生能参加的比赛,看这一篇文章就够了

热门文章

  1. 洛谷B2046 骑车与走路
  2. deep linux 看视频卡,在Deepin 20系统中用4K分辨率使桌面假死和卡顿的解决
  3. ant design的文档真的是一坨屎
  4. 【补交】 软工第0次作业
  5. Pix2Pix-GAN阅读笔记
  6. weblogic节点启动错误:<BEA-000110> Socket closed ;<BEA-000361>
  7. python局域网下载文件
  8. java购物网站设计计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
  9. U盘安装windows xp/win7系统
  10. git 错误信息 6 uncommitted changes would be overwritten by merge