Beanstalkd 是一个高性能的消息队列中间件,本博文宅鸟将介绍一下这个东东的使用。

一、先通过概念让大家了解Beanstalkd的特性和工作场景。

Beanstalkd 是一个轻量级消息中间件,它最大特点是将自己定位为基于管道  (tube) 和任务 (job) 的工作队列 (work-queue):

Beanstalkd 支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 能够很好的支持分布式的后台任务和定时任务处理。

它的内部实现采用 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯协议,具有有很高的性能。

尽管是内存队列, beanstalkd 提供了 binlog 机制, 当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。

管道 (tube):

管道类似于消息主题 (topic), 在一个 Beanstalkd 中可以支持多个管道, 每个管道都有自己的发布者 (producer) 和消费者 (consumer). 管道之间互相不影响。

任务 (job):

Beanstalkd 用任务 (job) 代替消息 (message) 的概念。与消息不同,任务有一系列状态:

READY- 需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;

DELAYED- 延迟执行的任务, 当消费者处理任务后, 可以用将消息再次放回 DELAYED 队列延迟执行;

RESERVED- 已经被消费者获取, 正在执行的任务。Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;

BURIED- 保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;

DELETED- 消息被彻底删除。Beanstalkd 不再维持这些消息。

任务优先级 (priority):

任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn).

延时任务 (delay):

有两种方式可以延时执行任务 (job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with <delay>)。这种机制可以实现分布式的 java.util.Timer,这种分布式定时任务的优势是:如果某个消费者节点故障,任务超时重发 (time-to-run) 能够保证任务转移到另外的节点执行。

任务超时重发 (time-to-run):

Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR (time-to-run).

任务预留 (buried):

如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。

Beanstalkd 协议:

Beanstalkd 采用类 memcached 协议, 客户端通过文本命令与服务器交互。这些命令可以简单的分成三组:

生产类 - use <tube> / put <priority> <delay> <ttr> [bytes]:

生产者用 use 选择一个管道 (tube), 然后用 put 命令向管道发布任务 (job).

消费类 - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id>

消费者用 watch 选择多个管道 (tube), 然后用 reserve 命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者可以彻底删除任务 (DELETE), 释放任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。

维护类 - peek job / peek delayed / peek ready / peek buried / kick <n>

用于维护管道内的任务状态, 在不改变任务状态的条件下获取任务。可以用消费类命令改变这些任务的状态。

被保留 (buried) 的任务可以用 kick 命令 "踢" 回队列。

协议文档: https://raw.github.com/kr/beanstalkd/master/doc/protocol.txt

Beanstalkd 不足:

Beanstalkd 没有提供主备同步 + 故障切换机制, 在应用中有成为单点的风险。实际应用中,可以用数据库为任务 (job) 提供持久化存储。

另外, 和 memcached 类似, Beanstalkd 依赖 libevent 的单线程事件分发机制, 不能有效利用多核 cpu 的性能。这一点可以通过单机部署多个实例克服。

二、部署安装:

Beanstalkd 的安装非常简单:

在Ubuntu和debian下使用下面命令:

sudo apt-get install beanstalkd

安装后编辑配置文件:

vim /etc/default/beanstalkd

把START=NO改为:START=yes即可

更多关于安装可以参考官网

通过命令可以启动、停止Beanstalk

/etc/init.d/beanstalkd start
lsof -i:11300
/etc/init.d/beanstalkd stop

启动后,就可以通过客户端进行调用了:

Beanstalk支持多种客户端语言:

php,java,perl,c,c++,lua,python,go,ruby等等(了解更多可以来官网)。

我们将通过php给大家介绍在生产环境下面的使用。

就拿录视频制程序使用到的Beanstalk来给大家介绍:

先介绍一下程序结构:

视频录制程序分为两个方面,一个是产生录制任务的脚本(生产者),还有一个处理录制任务脚本(消费者)。

首先把php的客户端下载后,加入到项目中。下面把代码贴出来:

生产者:

#!/usr/bin/php
<?php
require_once 'Configuration.php';
require_once 'Record.class.php';
require_once 'BeanStalk.class.php';
$now=time();
$model = new RecordModel ();
$records=$model->checkStartRecord($now);
//print_r($records);
//exit();
$beanstalk = BeanStalk::open ( array ('servers' => array (Configuration::$record_config['beanStak']),'select' => 'random peek') );
$beanstalk->use_tube ( 'records' );
foreach ( $records as $record ) {$beanstalk->put ( 0, 0, 10, json_encode ( $record ) );
}
?>

消费者:

<?php
require_once('config.php');
require_once('func.php');
require_once('BeanStalk.class.php');
$beanstalk = BeanStalk::open(array('servers'       => array( $config['beanStak'] ),'select'        => 'random peek'));
$beanstalk->watch('records');
while(true){//$beanstalk->watch('records');$job = $beanstalk->reserve_with_timeout();if(is_object($job)){$data=$job->get();$json=json_decode($data,true);print_r($json);if(!empty($json["live_name"])&&!empty($json["start_time"])&&!empty($json["end_time"])&&!empty($json["vod_id"])){//print_r($json);if(!empty($json["afterplay"])&&$json["afterplay"]==1)$cmd="{$config['afterplaycmd']} {$json["live_name"]} {$json["vod_id"]} {$json['start_time']} {$json['end_time']}";else$cmd="{$config['recordcmd']} {$json["live_name"]} {$json["vod_id"]} {$json['start_time']} {$json['end_time']}";echo $cmd;$chkcmd="ps -ef |grep '".$cmd."'  |grep -v 'grep'|wc -l";//$chkcmd="ps -ef |wc -l";//echo $chkcmd;$count=system($chkcmd);//echo $count;if($count==0){//system($cmd);exec($cmd,$res,$rc);//print_r($res);//print_r($rc);}Beanstalk::delete($job);   // Delete the job.$info=array();$info["vod_id"]=$json['vod_id'];$info["record_msg"]="startjob";$data=array();$data["type"]="reciveRecords";$data["message"]=$info;$url=$config['recordStatus'];$httpcode = 200;$result = test_api($httpcode,$url,"post",json_encode($data));print_r($data);}//$beanstalk->watch('records');}sleep(1);
}
?>

下面我们介绍一个可以管理Beanstalk的php工具,地址如下

https://github.com/jimbojsb/bstools

把该工具安装后,就可以查看Beanstalk的各种情况了

到此结束,不足之处欢迎拍砖

转载于:https://blog.51cto.com/birdinroom/1344109

beanstalkd消息队列在生产环境的应用相关推荐

  1. 消息队列入门案例-环境搭建

    我们就来编写一个RabbitMQ的入门案例,首先第一步呢,先要去创建一个Project,那么至于Project的创建方式呢,我们都可以,比如用Spring官网创建的方式,创建项目也可以,你也可以在ID ...

  2. Redis实现消息队列之生产消费模式

    简单的介绍下消息队列,使用消息队列首先我们得有一个队列,那么这个队列之前讲过就是先进先出的一个数据结构:那么有了队列以后我们还需要有人在队列里面放东西,那么这个放东西的人我们称之为生产者:有了生产者对 ...

  3. SpringBoot使用Redis消息队列 实现生产/消费者

    文章目录 一.redis 依赖和配置源 二.消费者 2.1.生产者和消息公共的代码 消息队列 key 2.2.redis 消息队列相关配置 1).MsgConsumer 定义公共消息接口 2).Red ...

  4. Java应用beanstalkd消息队列

    应用场景 最近做一个项目,处理每一个从队列收到的消息都要去获取一个锁(使用Redis实现的分布式锁),如果没有获取到锁,也不能把这个消息给丢了,那可不可以把这个没有获取到锁的消息再发回队列? 如果是用 ...

  5. Beanstalkd消息队列的安装与使用

    一.Beanstalkd是什么? Beanstalkd是一个高性能,轻量级的分布式内存队列 二.Beanstalkd特性 1.支持优先级(支持任务插队) 2.延迟(实现定时任务) 3.持久化(定时把内 ...

  6. 腾讯云CMQ消息队列在Windows环境下的使用

    版权声明:本文由李少华原创文章,转载请注明出处:  文章原文链接:https://www.qcloud.com/community/article/100 来源:腾云阁 https://www.qcl ...

  7. 李少华 linux内核,腾讯云CMQ消息队列在Linux环境下的使用

    环境配置: 操作系统 Ubuntu 内存 8G 选择自己语言版本的 SDK(下载地址),这里我选择的是 C++版本 SDK. 1. 环境依赖 安装 curl: Ubuntu 安装 curl 可以用如下 ...

  8. linux启动队列mq,腾讯云CMQ消息队列在Linux环境下的使用

    环境配置: 操作系统Ubuntu 内存 8G 选择自己语言版本的 SDK(下载地址),这里我选择的是 C++版本 SDK. 1. 环境依赖 安装 curl: Ubuntu 安装 curl 可以用如下命 ...

  9. 用了8年MQ,聊聊消息队列的技术选型,哪个最香?

    谈起消息队列,内心还是会有些波澜. 消息队列,缓存,分库分表是高并发解决方案三剑客,而消息队列是我最喜欢,也是思考最多的技术. 我想按照下面的四个阶段分享我与消息队列的故事,同时也是对我技术成长经历的 ...

最新文章

  1. 纯jsp实现评论功能_基于云开发的小程序版本更新、评论功能改进、后台管理的实现...
  2. Maven 常见问题
  3. 用友BIP助力大型企业构建“敏态+稳态”的数智企业摩天
  4. 2021年第十二届蓝桥杯 - 省赛 - C/C++大学B组 - I.双向排序
  5. UOJ#77. A+B Problem [可持久化线段树优化建边 最小割]
  6. 解决 阶段02 商品类与初始商品
  7. 生孩子时,你们公婆给了多少钱?
  8. Sublime Text3插件管理
  9. ubuntu18.10 编译安装caffe gpu版本
  10. java做安卓开发需要学什么,安卓开发要学什么 需要什么基础知识
  11. lopatkin俄大神精简中文系统Windows 8.1 Pro 18655 x86-x64 ZH-CN PIP
  12. 2021华为软挑赛题_思路分析——实时更新,做多少更多少(四)
  13. 常用的Mysql数据库操作语句大全
  14. ubuntu16.04使用腾达U6网卡驱动+建立无线热点(手机可链接)
  15. 网页菜单设计html5,点靓网页的10种导航菜单设计
  16. /Volumes/TeXLive2019/install-tl: No binary platform specified/available, quitting.
  17. 中国巡游帆船行业市场供需与战略研究报告
  18. NGUI图集分解 切割
  19. kindlefire刷安卓系统_测试kindle fire 刷安卓4系统用手机GPS模块导航
  20. 禅道安装及团队成员登录

热门文章

  1. 英特尔用ViT做密集预测效果超越卷积,性能提高28%,mIoU直达SOTA|在线可玩
  2. AI之父图灵登上50英镑钞票,荣耀比肩牛顿达尔文;吴恩达:将激励更多人
  3. sprint计划会议总结
  4. Google因数据泄露关闭Google+消费者版本
  5. springboot集成mybatis-generator时候遇到的问题
  6. 冷热分治,DT时代的数据存储必由之路
  7. sklearn API 文档 - 0.18 中文翻译
  8. GraphQL 进阶: 基于Websocket的实时Web应用开发
  9. linux FTP配置详解
  10. 重启nagios有异常提示Starting nagios:This account is currently not available