高性能分布式队列系统 Beanstalkd 介绍及使用

 2017年05月30日    1358     声明


Beanstalkd是一个简单、高效的工作队列系统,其最初设计目的是通过后台异步执行耗时任务方式降低高容量Web应用的页面延时。而其简单、轻量、易用等特点,和对任务优先级、延时 超时重发等控制,以及众多语言版本的客户端的良好支持,使其可以很好的在各种需要队列系统的场景中应用。

  1. Beanstalkd介绍
  2. Beanstalkd安装使用

1. Beanstalkd介绍

1.1 核心概念

Beanstalkd 架构

Beanstalkd使用Producer-Consumer设计模式,无论是其协议结构还是使用方式都是类Memcached风格的。以下是Beanstalkd设计思想中核心概念:

job - 任务

job是一个需要异步处理的任务,是Beanstalkd中的基本单元,job需要放在一个tube中。

Beanstalkd中的任务(job)类似于其它队列系统中的消息(message)的概念,详细参考任务生命周期

tube - 管道

管道即某一种类型的任务队列,其类似与消息的主题(topic),是ProducerConsumer的操作对象。

一个Beanstalkd中可以有多个管道, 每个管道都有自己的发布者(Producer)和消费者Consumer,管道之间互相不影响。

producer - 生产者

任务(job)的生产者,通过put命令来将一个job放到一个tube中。

consumer - 消费者

任务(job)的消费者,通过reservereleaseburydelete命令来获取或改变job的状态。


1.2 任务生命周期

Beanstalkd中的任务(job)替代了消息(message)的概念,任务会有一系列的状态。任务的生命周期如下:

Beanstalkd-job-status

一个Beanstalkd任务可能会包含以下状态:

  • READY - 需要立即处理的任务。当producer直接put一个任务时,任务就处于READY状态,以等待consumer来处理。当延时 (DELAYED) 任务到期后会自动成为当前READY状态的任务
  • DELAYED - 延迟执行的任务。当任务被延时put时,任务就处于DELAYED状态。等待时间过后,任务会被迁移到READY状态。当消费者处理任务后,可以用将消息再次放回DELAYED队列延迟执行
  • RESERVED - 已经被消费者获取,正在执行的任务。当consumer获取了当前READY的任务后,该任务的状态就会迁移到RESERVED状态,这时其它的consumer就不能再操作该任务。Beanstalkd会检查任务是否在TTR(time-to-run)内完成
  • BURIED - 保留的任务,这时任务不会被执行,也不会消失。当consumer完成该任务后,可以选择deleterelease或者bury操作。

    delete后,任务会被删除,生命周期结束;

    release操作可以重新把任务状态迁移回READY状态或DELAYED状态,使其他consumer可以继续获取和执行该任务

    bury会拔任务休眠,等需要该任务时,再将休眠的任务kickREADY;也可能过delete删除BURIED状态的任务

  • DELETED - 消息被删除,Beanstalkd不再维持这些消息。即任务生命周期结束。

任务优先级(priority

任务 (job) 可以有0~2^32个优先级,0表示优先级最高。Beanstalkd采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn)

任务延时(delay

Beanstalkd中可以通过两种方式延时执行任务: 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with delay)。这种机制可以实现分布式定时任务,这种任务机制的优势是:如果某个消费者节点故障,任务超时重发(time-to-run)以保证任务转移到其它节点执行。

任务超时重发(time-to-run

Beanstalkd把任务返回给消费者后:消费者必须在预设的TTR(time-to-run) 时间内发送deleterelease或者bury命令改变任务状态;否则Beanstalkd会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在TTR时间内无法完成任务, 可以发送touch命令,以使Beanstalkd重新计算TTR

任务预留(buried

RESERVED状态的任务因为某些原因无法执行时,消费者可以将其设置为buried状态,这时Beanstalkd会继续保留这些任务。在具备任务执行条件时,再通过kick将任务迁移回READY状态。


2. Beanstalkd安装使用

Beanstalkd分为服务端客户端两部分。可以在其官网查找相关安装包及安装方法:

2.1 服务端

要使用Beanstalkd,首先需要在一或多台机器安装并运行beanstalkd服务端。安装beanstalkd需要Linux (2.6.17 or later) 、Mac OS X、或 FreeBSD,可以通过源码编译或安装包来安装。

源码安装

下载、解压并进入源码目录后,执行makemake install命令即可:

$ sudo make
// 或
$ sudo make install 
// 或
$ sudo make install PERFIX=/usr/bin/beanstalkd

安装包安装

在Unbuntu或Debian系统中,可以使用以下命令安装:

$ sudo apt-get install beanstalkd

在CentOS或RHEL系统中,首先需要更新EPEL源,然后再使用yum命令安装。

RHEL6中使用以下命令更新源:

su -c 'rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm'

RHEL7中:

su -c 'rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-9.noarch.rpm'

执行安装:

$ sudo yum install beanstalkd

运行beanstalkd

前台运行:

$ beanstalkd

这时,beanstalkd会保持在前台,可以通过control+C命令结束进程。

要使beanstalkd后台运行,可以在命令结尾增加&

$ beanstalkd & 

启动beanstalkd时还可以增加一些启动选项:

$ beanstalkd -l 127.0.0.1 -p 11300 &

在以上命令中,我们通过-l指定了监听地址、-p参数指定了监听端口、&指定为后台运行。

beanstalkd运行参数

Beanstalkd安装后,就可以通过beanstalkd命令来启动或配置Beanstalkd。该命令的使用格式如下:

beanstalkd [OPTIONS]

可选[OPTIONS]参数有:

  • -b DIR - wal目录
  • -f MS - 指定MS毫秒内的 fsync (-f0 为"always fsync")
  • -F - 从不 fsync (默认)
  • -l ADDR - 指定监听地址(默认为:0.0.0.0)
  • -p PORT - 指定监听端口(默认为:11300)
  • -u USER - 用户与用户组
  • -z BYTE - 最大的任务大小(默认为:65535)
  • -s BYTE - 每个wal文件的大小(默认为:10485760)
  • -c - 压缩binlog(默认)
  • -n - 不压缩binlog


2.2 客户端

客户端包含了Beanstalkd设计概念中的任务生产者(Producer)和消费者(Consumer)。Beanstalkd有很多语言版本客户端的实现,点击Beanstalkd 客户端查找自大所需要的版本,如果都不能满足需要,还可以根据Beanstalkd 协议自行实现。

笔者日常工作中,接触Node.js语言较多,以下用一个Node.js版本的Beanstalkd 客户端:fivebeans为例,简单演示Beanstalkd的任务处理流程。

安装fivebeans模块后,创建一个consumer客户端。代码如下:

var fivebeans = require('fivebeans');

var consumer = new fivebeans.client('192.168.3.218', 11300);

// 连接服务端
consumer.connect();
// 临听名为 itbilu 的tube
consumer.watch('itbilu', function(err, numwatched) {})
// 收到任务后,处理任务
consumer.reserve(function(err, jobid, payload) {
  console.log('收到任务:%s,任务内容:%s', jobid, payload);
});

再创建一个producer客户端。代码如下:

var fivebeans = require('fivebeans');

var producer = new fivebeans.client('192.168.3.218', 11300);
// 连接服务端
producer.connect();
// 使用名为 itbilu 的tube
producer.use('itbilu', function(err, tubename) {});
// 向tube 发布任务
producer.put(1024, 0, 2, '内容', function(err, jobid) {
  console.log('发布了一个任务,任务ID:%s', jobid);
});

完成并保存代码后,首先运行客户端consumer.js

node consumer.js

然后运行客户端producer.js

node producer.js

producer运行后,会自动put一个任务,而consumer处于监听状态,就会收到这个任务:

// procuer 运行后
发布了一个任务,任务ID:1
// consumer 收到任务后
收到任务:1,任务内容:内容