本篇概要:

  • 1. 消息队列相关概念;
  • 2. Kafka 消息队列;
  • 3. 安装 Kafka 服务;
  • 4. 安装PHP的 Kafka 扩展 rdkafka;
  • 5. 编写 Kafka 的生产者方法;
  • 6. 编写 Kafka 的异步消费者方法。

1. 消息队列相关概念;

相关概念:
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合异步消息流量削峰等问题。实现高性能高可用可伸缩最终一致性架构。是大型分布式系统不可缺少的中间件。
使用场景:异步处理
场景说明:用户注册成功后,发送注册邮件,再发送注册短信。
串行方式:将注册信息写入数据库成功后,向用户发送邮件,再发送注册短信,将结果返回客户端。
并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信,以上三个任务完成后,返回给客户端。
消息队列:将注册信息写入数据库成功后,注册信息写入消息队列,就将结果返回给客户端。然后发送邮件和短信的消费者异步读取消息队列。
使用场景:应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。
传统方式:订单系统调用库存系统接口(并不可靠,访问接口的时候网络可能挂了,库存系统挂了,用户下单会失败)
消息队列
‐ 订单系统:在用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
‐ 库存系统:订阅下单的消息,采用拉 / 推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。这样用户在下单的时候,不会去访问库存系统相关内容
使用场景:流量削峰
场景说明:秒杀活动,一般会因为流量过大,导致流量暴增。
传统方式:服务端突然接收来自前端的大量订单请求。
消息队列:在应用的前端加入消息队列。
‐ 用户的请求,服务端接收后,首先写入消息队列。假如消息队列长度超过最大数量,则字节抛弃用户请求或跳转到错误页面
‐ 秒杀业务根据消息队列中的请求信息,再做后续处理
使用场景:日志处理
解决大量日志传输的问题
日志采集客户端(比如 Yii2 框架的 log 组件),负责日志数据采集,写入 Kafka 队列
Kafka 消息队列负责日志数据的接收,存储和转发
日志处理应用:订阅并消费 Kafka 队列中的日志数据
使用场景:消息通讯
点对点消息队列,或者聊天室
客户端 A 和客户端 B 使用同一队列,进行消息通讯
客户端 A、客户端 B、客户端 N 订阅同一主题,进行消息发布和接收
消息队列组要产品
目前在生产环境,使用较多的消息队列有 ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ 等

2. Kafka 消息队列;

相关概念:
Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模较大的网站中的所有动作流数据。
Kafka 官网:http://kafka.apache.org
优势:
高吞吐量:非常普通的硬件 Kafka 也可以支持每秒数百万的消息
支持通过 Kafka 服务器和消费机集群来区分消息。可以对消息进行分类,可以使用不同分类的服务器、不同分类的消费机去消费不同分类的消息
支持 Hadoop 并行数据加载
关键概念:
Broker:Kafka 集群中的一台或多台服务器统称为 Broker (服务器)
Topic:Kafka 处理的消息源(feeds of message)的不同分类(消息分类)
Partition:Topic 物理上的分组,一个 Topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)可以把 topic 理解为群名称,消费 topic 的时候可以进行物理上的分组。比如一个 partition 不够用,可以分给多个 partition
Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息
Producers:消息和数据生产者,向 Kafka 的一个 topic 发布消息的过程叫做 producers
Consumers:消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers

3. 安装 Kafka 服务;

  • ZooKeeper 依赖 java 虚拟机,所以需要安装 java 环境
# 升级包和系统内核(可选)
yum -y update# 查找安装 java
yum list java*
yum -y install java-1.8.0-openjdk*
java -version
  • 打开 http://kafka.apache.org,点击 Download
  • 下载
cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
  • 解压
tar zxvf kafka_2.11-2.3.0.tgz -C /usr/local/
# 目录改名
cd /usr/local/
mv kafka_2.11-2.3.0/ kafka/
  • 配置
cd kafka/config/# kafka 是基于 zookeeper 启动,管理集群的
# 需要先配置 zookeeper.properties 再配置 server.properties
vim server.properties# 打开监听端口
#listeners=PLAINTEXT://:9092
# 修改为
listeners=PLAINTEXT://192.168.2.214:9092#advertised.listeners=PLAINTEXT://your.host.name:9092#zookeeper.connect=localhost:2181
# 修改为
zookeeper.connect=192.168.2.214:2181
# 保存退出
  • 启动
# 启动 zookeeper
cd ../bin/
# 启动 zookeeper 和 kafka 的命令分别是
# zookeeper-server-start.sh 和 kafka-server-start.sh
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties# 再启动一个终端,启动 kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties # 在启动一个终端
netstat -tunpl | grep 9092
# 返回
tcp6       0      0 192.168.2.214:9092      :::*                    LISTEN      25675/java
  • 测试(生产者)
# 新开一个终端
# 启动一个**生产者**(Producer),准备生产信息
/usr/local/kafka/bin/kafka-console-producer.sh \
--broker-list 192.168.2.214:9092 --topic test

  • 测试(消费者)
# 再新开一个终端
# 启动一个**消费者**(Consumer),监听有没有消息
# 当有消息生产过来,就进行相关的消费
# 注意:启动消费者的方法 --zookeeper 已经过时(服务器端口号在新旧方法里也有区别)
# /usr/local/kafka/bin/kafka-console-consumer.sh \
# --zookeeper 192.168.2.214:2181 --topic test --from-beginning
# 0.90 版本之后启动方法是 --bootstrap-server
/usr/local/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.2.214:9092 --topic test --from-beginning

4. 安装PHP的 Kafka 扩展 rdkafka;

  • 安装 librdkafka:https://github.com/edenhill/librdkafka
# 下载
cd /usr/local/src
git clone https://github.com/edenhill/librdkafka.git# 进入目录
cd librdkafka/# 编译安装
./configure
make && make install
  • 安装 php-rdkafka:https://github.com/arnaud-lb/php-rdkafka
# 下载
cd /usr/local/src
git clone https://github.com/arnaud-lb/php-rdkafka.git# 进入目录
cd php-rdkafka/# 通过 phpize 建立 PHP 的外挂模块,生成 configure
phpize# 编译安装
./configure --with-php-config=/usr/local/php/bin/php-config
make && make install# 配置 php.ini
echo "[rdkafka]" >> /usr/local/php/etc/php.ini
echo "extension = rdkafka.so" >> /usr/local/php/etc/php.ini# 重启 Nginx 和 PHP
systemctl restart nginx
/etc/init.d/php-fpm restart# 查看是否安装成功
php -m

5. 编写 Kafka 的生产者方法;

  • 以 Yii2 框架为例,新建 basic/models/Kafka.php
  • 参考:https://github.com/arnaud-lb/php-rdkafka
<?phpnamespace app\models;class Kafka{public $broker_list = '192.168.2.214:9092';public $topic = "topic";public $partition = 0;  // 物理分组protected $producer = null;protected $consumer = null;// 参考:https://arnaud.le-blanc.net/php-rdkafka/phpdoc/rdkafka.examples-producer.htmlpublic function __construct(){if(empty($this->broker_list)){throw new \yii\base\InvalidConfigException("broker not config");}$conf = new \RdKafka\Conf();if (empty($conf)) {throw new \yii\base\InvalidConfigException("Conf error");}$conf->set('log_level', LOG_DEBUG);$conf->set('debug', 'all');$rk = new \RdKafka\Producer($conf);if (empty($rk)) {throw new \yii\base\InvalidConfigException("rk error");}if (!$rk->addBrokers($this->broker_list)) {throw new \yii\base\InvalidConfigException("addBrokers error");}$this->producer = $rk;}/*** 生产者方法,把日志往消息队列里发送* @param array $messages*/public function send($messages = []){$t = $this->producer->newTopic($this->topic);$result = $t->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));// poll() 方法传递的是时间(ms)$this->producer->poll(50);return $result;}}
  • 修改配置文件 basic/config/web.php
// config->components 添加
'asyncLog' => ['class' => '\\app\\models\\Kafka','broker_list' => '192.168.2.214:9092','topic' => 'asynclog',
],
  • 新建 basic/controllers/IndexController.php
<?phpnamespace app\controllers;use Yii;
use yii\web\Controller;class IndexController extends controller{public function actionIndex(){Yii::$app->asyncLog->send(['this is IndexController']);}}
  • 操作
# 启动 zookeeper
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties# 启动 kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties# 启动消费者,订阅 topic 为 asynclog
/usr/local/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.2.214:9092 --topic asynclog --from-beginning# 运行生产者脚本
http://192.168.2.214/yii2/basic/web/index.php?r=index/index# 消费者 consumer 控制台会接收到消息,输出
# ["this is IndexController"]

6. 编写 Kafka 的异步消费者方法。

  • 修改 basic/models/Kafka.php
// 添加消费者方法 /*** 消费者方法* @param $object   对象* @param $callback 回调方法*/
public function consumer($object, $callback){$conf = new \RdKafka\Conf();$conf->set('group.id', 0);  // 定义 groupid,默认为 0$conf->set('metadata.broker.list', $this->broker_list);// 定义 topic 相关内容$topicConf = new \RdKafka\TopicConf();$topicConf->set('auto.offset.reset', 'smallest');   // 从开头消费最新消息$conf->setDefaultTopicConf($topicConf);$consumer = new \RdKafka\KafkaConsumer($conf);$consumer->subscribe([$this->topic]);   // 可以订阅多个,所以是数组// 开始监听消息队列echo "waiting for messages.....\n";while(true) {$message = $consumer->consume(120*1000);    //switch ($message->err) {    // 判断消息接收错误case RD_KAFKA_RESP_ERR_NO_ERROR:    // 如果没有错误,处理消息echo "message payload....\n";// Yii::info($message->payload);$object->$callback($message->payload);break;}sleep(1);   // 必须加程序休眠,否则 CPU 消耗会越来越大}}
  • 修改配置文件 basic/config/console.php
// 消费者需要在终端启动// config->components 添加
'asyncLog' => ['class' => '\\app\\models\\Kafka','broker_list' => '192.168.2.214:9092','topic' => 'asynclog',
],// config->components->log->targets 添加['class' => 'yii\log\FileTarget','levels' => ['info'],'categories' => ['testkafka'],'logVars' => [],'exportInterval' => 1,  // 有一条消息就刷到下面的文件里'logFile' =>'@app/runtime/logs/Kafka.log',
],
  • 新建 basic/command/KafkaController.php
<?phpnamespace app\commands;use Yii;
use yii\console\Controller;class KafkaController extends Controller{public function actionConsume(){Yii::$app->asyncLog->consumer($this, 'callback');}public function callback($message){Yii::info($message, 'testkafka');// 信息写入 logYii::$app->log->setflushInterval(1);// 业务代码全部写这里。。。}}
  • 操作
# 进入项目目录
cd /data/project/test/yii2/basic/
./yii
# 确认一下方法存在
- kafka                        kafka/consume# 运行方法,开始跑消费脚本
./yii kafka/consume
# 返回 waiting for message.....# 打开以下页面,开始生产消息
http://192.168.2.214/yii2/basic/web/index.php?r=index/index# 再打开消费者脚本所在终端,返回
message payload....# 打开 basic/runtime/logs/Kafka.log
2019-10-12 15:37:04 [-][-][-][info][testkafka] ["this is IndexController"]
# 有信息写入,成功
  • 出现的问题(待解决):

Kafka 消息队列的使用相关推荐

  1. kafka消息队列的概念理解

    kafka在大数据.分布式架构中都很流行.kafka可以进行流式计算,也可以做为日志系统,还可以用于消息队列. kafka作为消息队列的优点: 分布式的系统 高吞吐量.即使存储了许多TB的消息,它也保 ...

  2. 使用kafka消息队列中间件实现跨进程,跨服务器的高并发消息通讯

    作者 | 陈屹       责编 | 欧阳姝黎 近来工作上接收到一项任务,实现c++后台服务器程序,要求它能承载千万级别的DAU读写请求.目前实现千万级高并发海量数据请求的服务器设计在"套路 ...

  3. 19 kafka消息队列

    文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...

  4. kafka 消息队列

    kafka 消息队列 kafka 架构原理 大数据时代来临,如果你还不知道Kafka那就真的out了!据统计,有三分之一的世界财富500强企业正在使用Kafka,包括所有TOP10旅游公司,7家TOP ...

  5. Java+Kafka消息队列

    本文主要针对,Java端对Kafka消息队列的生产和消费.Kafka的安装部署,请看查看相关文章. 笔者最近所用的是Spring mvc,监听文件路径,然后将读取到的文件内容发送到消息队列中.由另外系 ...

  6. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  7. kafka消息队列应用总结

    kafka官网: Apache Kafka 公司使用阿里云提供的kafka消息队列服务,分别为测试环境与生产环境,部署了多个集群. 使用场景:应用对外提供API接口调用,同时支持kafka增量消息推送 ...

  8. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  9. Kafka—消息队列

    Kafka-消息队列(理论部分) 一.Kafka概述 1.1.简介 kafka是一个分布式的基于发布/订阅模式的消息队列 主要应用场景:大数据实时处理领域 1.2.什么是消息队列? 消息队列 = 消息 ...

  10. kafka消息队列使用场景

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,Rabbit ...

最新文章

  1. 小技巧之chm文件无法显示
  2. 凸优化中如何改进GD方法以防止陷入局部最优解
  3. Matlab中将数据保存为txt或dat格式四种方案
  4. select_arg_from_python相关的测试程序
  5. Visual Studio 10将面世 微软走得太快?
  6. python计算a的平方加b的平方_NumPy计算范数2的平方
  7. idea中新建.xml文件找不到选项的解决方法
  8. Selenium爬虫 -- 图片视频的src绝对地址链接分析
  9. iOS 地图移动中心点获取
  10. 浙江理工考研c语言程序设计,浙江理工大学C程序设计期末试卷A卷
  11. PyGame:Python 游戏编程入门-1
  12. 非线性动力学中的同步,同步有哪几类?
  13. 如果你还是“程序员”,我劝你别创业!
  14. Python下载新浪微博视频(流式下载)
  15. 用iperf在ambarella s2l上进行网络性能测试
  16. 华为机试题 2014
  17. 蕴含深刻道理的经典语录
  18. codeforces 1567 A. Domino Disaster
  19. 关于MacOS降系统版本的处理方法
  20. R语言删除包含NA的列

热门文章

  1. 秒杀系统的设计与实现(三)(限时抢购、抢救接口、单用户限制实现)
  2. php查看curl扩展重新安装,关于php安装curl扩展
  3. 关于taskAffinity属性的使用
  4. js获取当前时间并且实时更新
  5. Tacotron2中文训练笔记
  6. 跟踪报道:中国国际数字城市建设技术与设备博览会(简报)
  7. SqlServer报表统计
  8. 给定一个二维的 0-1 矩阵,其中 0 表示海洋,1 表示陆地。单独的或相邻的陆地可以形成岛屿,每个格子只与其上下左右四个格子相邻。求最大的岛屿面积。
  9. linux桌面和发行版关系,Linux 发行版/桌面 体验报告(细节向)
  10. 需求及政策加速电子劳动合同应用,君子签助推企业优化用工管理