转自:https://www.cnblogs.com/kaishan1990/p/7228683.html,但是其中实例有细微修改。
Kafka 简易教程
1.初识概念

Apache Kafka是一个分布式消息发布订阅系统。

Topic
Kafka将消息种子(Feed)分门别类, 每一类的消息称之为话题(Topic).

Producer
发布消息的对象称之为话题生产者(Kafka topic producer)

Consumer
订阅消息并处理发布的消息的种子的对象称之为话题消费者(consumers)
Broker
已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。

分区

一个topic可以有一个或多个分区,每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分配了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。

每个分区有一个leader,零或多个follower。Leader处理此分区的所有的读写请求而follower被动的复制数据。如果leader当机,其它的一个follower会被推举为新的leader。

通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。

Kafka 只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要 topic 中所有消息的有序性,那就只能让这个 topic 只有一个分区,当然也就只有一个 consumer 组消费它。

2.安装使用

  1. 下载 Kafka

下载 wget http://apache.01link.hk/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz 或者
wget http://ftp.cuhk.edu.hk/pub/packages/apache.org/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz(看哪个源比较快)
解压 tar -xzf kafka_2.11-0.10.0.0.tgz
进入文件夹 cd kafka_2.11-0.10.0.0/
2. 启动服务

启动 ZooKeeper bin/zookeeper-server-start.sh config/zookeeper.properties &(利用 &放到后台方便继续操作)
启动 Kafka bin/kafka-server-start.sh config/server.properties &
3. 创建一个叫做 dawang 的 topic,它只有一个分区,一个副本

创建 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dawang
查看 bin/kafka-topics.sh --list --zookeeper localhost:2181
还可以配置 broker 让它自动创建 topic
4. 发送消息。Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。

发送消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dawang(然后可以随意输入内容,回车可以发送,ctrl+c 退出)
5. 启动 consumer。可以读取消息并输出到标准输出:

接收消息 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dawang --from-beginning
在一个终端中运行 consumer 命令行,另一个终端中运行 producer 命令行,就可以在一个终端输入消息,另一个终端读取消息。这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。
6. 搭建一个多个 broker 的集群,启动有 3 个 broker 组成的集群,这些 broker 节点也都在本机

首先复制一下配置文件:cp config/server.properties config/server-1.properties 和 cp config/server.properties config/server-2.properties

两个文件需要改动的内容为:

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
这里我们把 broker id, 端口号和日志地址配置成和之前不一样,然后我们启动这两个 broker:

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
然后创建一个复制因子为 3 的 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic oh3topic

可以使用 describe 命令来显示 topic 详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oh3topic
Topic:oh3topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: oh3topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
我们也可以来看看之前的另一个 topic 的情况

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic dawang
Topic:dawang PartitionCount:1 ReplicationFactor:1 Configs:
Topic: dawang Partition: 0 Leader: 0 Replicas: 0 Isr: 0
最后我们可以按照同样的方法来生产和消费消息,例如
#生产
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic oh3topic

消费

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic oh3topic
开俩终端就可以一边生产消息,一边消费消息了。

测试一下容错. 干掉leader,也就是Broker 1:

ps -ef | grep server-1.properties

Leader被切换到一个follower上节, 点 1 不会被列在isr中了,因为它死了:

再次使用 describe 命令来显示 topic 详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oh3topic

但是,消息没丢啊,不信你试试:

彻底删除kafka中的topic

1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
2、Kafka 删除topic的命令是:

如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic oh3topic

3.代码实例

需要自行安装librdkafka库

https://github.com/edenhill/librdkafka

produce

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <list>
#include <memory>
#include <string>
#include <string.h>
#include "librdkafka/rdkafkacpp.h"
//#include "librdkafka/rdkafka.h"
using namespace std;bool run = true;class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb
{public:void dr_cb (RdKafka::Message &message) {std::cout << "Message delivery for (" << message.len() << " bytes): " <<message.errstr() << std::endl;if (message.key())std::cout << "Key: " << *(message.key()) << ";" << std::endl;}
};class ExampleEventCb : public RdKafka::EventCb {public:void event_cb (RdKafka::Event &event) {switch (event.type()){case RdKafka::Event::EVENT_ERROR:std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)run = false;break;case RdKafka::Event::EVENT_STATS:std::cerr << "\"STATS\": " << event.str() << std::endl;break;case RdKafka::Event::EVENT_LOG:fprintf(stderr, "LOG-%i-%s: %s\n",event.severity(), event.fac().c_str(), event.str().c_str());break;default:std::cerr << "EVENT " << event.type() <<" (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;break;}}
};/* Use of this partitioner is pretty pointless since no key is provided * in the produce() call.so when you need input your key */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {public:int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque){std::cout<<"partition_cnt="<<partition_cnt<<std::endl;return djb_hash(key->c_str(), key->size()) % partition_cnt;}private:static inline unsigned int djb_hash (const char *str, size_t len){unsigned int hash = 5381;for (size_t i = 0 ; i < len ; i++)hash = ((hash << 5) + hash) + str[i];std::cout<<"hash1="<<hash<<std::endl;return hash;}
};void TestProducer()
{std::string brokers = "localhost";std::string errstr;std::string topic_str="helloworld_kugou_test";//自行制定主题topicMyHashPartitionerCb hash_partitioner;int32_t partition = RdKafka::Topic::PARTITION_UA;int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;bool do_conf_dump = false;int opt;int use_ccb = 0;//Create configuration objectsRdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK){std::cerr << errstr << std::endl;exit(1);}/* * Set configuration properties */conf->set("metadata.broker.list", brokers, errstr);ExampleEventCb ex_event_cb;conf->set("event_cb", &ex_event_cb, errstr);ExampleDeliveryReportCb ex_dr_cb;/* Set delivery report callback */conf->set("dr_cb", &ex_dr_cb, errstr);/* * Create producer using accumulated global configuration. */RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);if (!producer){std::cerr << "Failed to create producer: " << errstr << std::endl;exit(1);}std::cout << "% Created producer " << producer->name() << std::endl;/* * Create topic handle. */RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);if (!topic) {std::cerr << "Failed to create topic: " << errstr << std::endl;exit(1);}/* * Read messages from stdin and produce to broker. */for (std::string line; run && std::getline(std::cin, line);){if (line.empty()){producer->poll(0);continue;}/* * Produce message // 1. topic // 2. partition // 3. flags // 4. payload // 5. payload len // 6. std::string key // 7. msg_opaque? NULL */std::string key=line.substr(0,5);//根据line前5个字符串作为key值// int a = MyHashPartitionerCb::djb_hash(key.c_str(),key.size());// std::cout<<"hash="<<a<<std::endl;RdKafka::ErrorCode resp = producer->produce(topic, partition,RdKafka::Producer::RK_MSG_COPY /* Copy payload */,const_cast<char *>(line.c_str()), line.size(),key.c_str(), key.size(), NULL);//这里可以设计key值,因为会根据key值放在对应的partitionif (resp != RdKafka::ERR_NO_ERROR)std::cerr << "% Produce failed: " <<RdKafka::err2str(resp) << std::endl;elsestd::cerr << "% Produced message (" << line.size() << " bytes)" <<std::endl;producer->poll(0);//对于socket进行读写操作。poll方法才是做实际的IO操作的。return the number of events served}//run = true;while (run && producer->outq_len() > 0) {std::cerr << "Waiting for " << producer->outq_len() << std::endl;producer->poll(1000);}delete topic;delete producer;
}int main(int argc, char *argv[])
{TestProducer();return EXIT_SUCCESS;
}

consumer

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <list>
#include <memory>
#include <string>
#include <string.h>
#include "librdkafka/rdkafkacpp.h"
using namespace std;bool run = true;
bool exit_eof = true;
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb
{public:void dr_cb (RdKafka::Message &message) {std::cout << "Message delivery for (" << message.len() << " bytes): " <<message.errstr() << std::endl;if (message.key())std::cout << "Key: " << *(message.key()) << ";" << std::endl;}
};class ExampleEventCb : public RdKafka::EventCb {public:void event_cb (RdKafka::Event &event) {switch (event.type()){case RdKafka::Event::EVENT_ERROR:std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)run = false;break;case RdKafka::Event::EVENT_STATS:std::cerr << "\"STATS\": " << event.str() << std::endl;break;case RdKafka::Event::EVENT_LOG:fprintf(stderr, "LOG-%i-%s: %s\n",event.severity(), event.fac().c_str(), event.str().c_str());break;default:std::cerr << "EVENT " << event.type() <<" (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;break;}}
};/* Use of this partitioner is pretty pointless since no key is provided * in the produce() call.so when you need input your key */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {public:int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque){std::cout<<"partition_cnt="<<partition_cnt<<std::endl;return djb_hash(key->c_str(), key->size()) % partition_cnt;}private:static inline unsigned int djb_hash (const char *str, size_t len){unsigned int hash = 5381;for (size_t i = 0 ; i < len ; i++)hash = ((hash << 5) + hash) + str[i];std::cout<<"hash1="<<hash<<std::endl;return hash;}
};void msg_consume(RdKafka::Message* message, void* opaque)
{switch (message->err()){case RdKafka::ERR__TIMED_OUT:break;case RdKafka::ERR_NO_ERROR:/* Real message */std::cout << "Read msg at offset " << message->offset() << std::endl;if (message->key()){std::cout << "Key: " << *message->key() << std::endl;}printf("%.*s\n", static_cast<int>(message->len()),static_cast<const char *>(message->payload()));break;case RdKafka::ERR__PARTITION_EOF:/* Last message */if (exit_eof){run = false;cout << "ERR__PARTITION_EOF" << endl;}break;case RdKafka::ERR__UNKNOWN_TOPIC:case RdKafka::ERR__UNKNOWN_PARTITION:std::cerr << "Consume failed: " << message->errstr() << std::endl;run = false;break;default:/* Errors */std::cerr << "Consume failed: " << message->errstr() << std::endl;run = false;}
}
class ExampleConsumeCb : public RdKafka::ConsumeCb {public:void consume_cb (RdKafka::Message &msg, void *opaque){msg_consume(&msg, opaque);}
};
void TestConsumer()
{std::string brokers = "localhost";std::string errstr;std::string topic_str="helloworld_kugou_test";//helloworld_kugouMyHashPartitionerCb hash_partitioner;int32_t partition = RdKafka::Topic::PARTITION_UA;//为何不能用??在Consumer这里只能写0???无法自动吗???partition = 0;int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;bool do_conf_dump = false;int opt;int use_ccb = 0;//Create configuration objectsRdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK){std::cerr << errstr << std::endl;exit(1);}/* * Set configuration properties */conf->set("metadata.broker.list", brokers, errstr);ExampleEventCb ex_event_cb;conf->set("event_cb", &ex_event_cb, errstr);ExampleDeliveryReportCb ex_dr_cb;/* Set delivery report callback */conf->set("dr_cb", &ex_dr_cb, errstr);/* * Create consumer using accumulated global configuration. */RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);if (!consumer){std::cerr << "Failed to create consumer: " << errstr << std::endl;exit(1);}std::cout << "% Created consumer " << consumer->name() << std::endl;/* * Create topic handle. */RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);if (!topic){std::cerr << "Failed to create topic: " << errstr << std::endl;exit(1);}/* * Start consumer for topic+partition at start offset */RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;exit(1);}ExampleConsumeCb ex_consume_cb;/* * Consume messages */while (run){if (use_ccb){consumer->consume_callback(topic, partition, 1000, &ex_consume_cb, &use_ccb);}else{RdKafka::Message *msg = consumer->consume(topic, partition, 1000);msg_consume(msg, NULL);delete msg;}consumer->poll(0);}/* * Stop consumer */consumer->stop(topic, partition);consumer->poll(1000);delete topic;delete consumer;
}int main(int argc, char *argv[])
{TestConsumer();return EXIT_SUCCESS;
}

Kafka 简易教程 C++ 实例相关推荐

  1. Android实战简易教程-第三十九枪(第三方短信验证平台Mob和验证码自动填入功能结合实例)

    用户注册或者找回密码时一般会用到短信验证功能,这里我们使用第三方的短信平台进行验证实例. 我们用到第三方短信验证平台是Mob,地址为:http://mob.com/ 一.注册用户.获取SDK 大家可以 ...

  2. Kafka入门教程与详解

    1 Kafka入门教程 1.1 消息队列(Message Queue) Message Queue消息传送系统提供传送服务.消息传送依赖于大量支持组件,这些组件负责处理连接服务.消息的路由和传送.持久 ...

  3. 文件上传利器SWFUpload入门简易教程

    凡做过网站开发的都应该知道表单file的确鸡肋. Ajax解决了不刷新页面提交表单,但是却没有解决文件上传不刷新页面,当然也有其它技术让不刷新页面而提交文件,该技术主要是利用隐藏的iFrame, 较A ...

  4. Kafka—简明教程

    目录 学习目标: 一.Kafka发展历程 1.Kafka一代 - 消息队列 2.Kafka二代 - Partition 3.Kafka三代 - Broker集群 二.Kafka的消息结构? 三.Zoo ...

  5. Ocelot简易教程(六)之重写配置文件存储方式并优化响应数据

    本来这篇文章在昨天晚上就能发布的,悲剧的是写了两三千字的文章居然没保存,结果我懵逼了.今天重新来写这篇文章.今天我们就一起来探讨下如何重写Ocelot配置文件的存储方式以及获取方式. 作者:依乐祝 原 ...

  6. Ocelot简易教程(五)之集成IdentityServer认证以及授权

    最近比较懒(编者注:作者不是真懒,而是在憋大招,他最近实现了把Ocelot的配置使用数据库存储),所以隔了N天才来继续更新第五篇Ocelot简易教程,本篇教程会先简单介绍下官方文档记录的内容然后在前几 ...

  7. Ocelot简易教程(四)之请求聚合以及服务发现

    上篇文章给大家讲解了Ocelot的一些特性并对路由进行了详细的介绍,今天呢就大家一起来学习下Ocelot的请求聚合以及服务发现功能.希望能对大家有所帮助. 作者:依乐祝 原文地址:https://ww ...

  8. Ocelot简易教程(三)之主要特性及路由详解

    作者:依乐祝 原文地址:https://www.cnblogs.com/yilezhu/p/9664977.html 上篇<Ocelot简易教程(二)之快速开始2>教大家如何快速跑起来一个 ...

  9. Solidity 简易教程0x001

    Solidity是以太坊的主要编程语言,它是一种静态类型的 JavaScript-esque 语言,是面向合约的.为实现智能合约而创建的高级编程语言,设计的目的是能在以太坊虚拟机(EVM)上运行. 本 ...

最新文章

  1. 《数字视频和高清:算法和接口》一第2章 图像的采样和显示
  2. 全国计算机等级考试python教材-全国计算机等级考试重大改革!新增Python科目
  3. HTML——a标签实现空链接(禁止跳转)
  4. oracle+去括号,关于001 TK的几个问题,请大家一起讨论一下
  5. 程鑫峰:1.23日央行推行负利率政策,伦敦金后市行情解析
  6. Java 7:使用NIO.2进行文件过滤-第2部分
  7. 阿里云推PostgreSQL 10 高可用版
  8. 洛谷P2050 美食节
  9. Linux安装MATLAB Compiler Runtime操作
  10. 将GRUB输出重定向到串口
  11. 要点初见:用Python进行微观交通模型仿真——基于SUMO的伯克利开源项目Flow Project初探与拓展
  12. 微商分销代理商城源码带代理等级和升级条件 thinkphp框架
  13. 如何屏蔽所有国外ip,禁止国外IP访问网站
  14. android中jni的调用过程,Android 调用jni的过程简述
  15. 【Matplotlib】(二)图例legend
  16. 机器视觉入门——VisionPro软件简介
  17. MATLAB绘制雷达图并导出矢量图到Visio编辑(论文用图)
  18. 这本书,豆瓣评分9.3,送给大家!
  19. (二)S7Comm协议分析
  20. 《ANSYS Workbench有限元分析实例详解(静力学)》——2.4 ACT插件

热门文章

  1. fgo服务器维护后抽奖,fgo:抽卡机制详解 彩圈是必定会出五星的
  2. php微信app支付2次签名,微信APP支付,第二次生成签名问题
  3. 深入了解el-scrollbar
  4. Java项目:ssm在线考试系统
  5. 【实用小脚本】Python实现文件/目录的复制
  6. 设​置​L​i​n​u​x​开​机​以​字​符​界​面​登​陆
  7. 今年双十一,区块链怎么玩?CSDN“免费+质量局”沙龙邀你参加
  8. EDA大作业——交通灯
  9. Exoplayer+Exomedia打造自定义播放器(二)
  10. Ubuntu Nvidia 驱动更新操作