最近在看rabbitmq的设计哲学和相关概念,有很多需要总结的学习心得可以分享一下。相比以往使用的轻量级高性能beanstalkd队列服务组件,rabbitmq提供了更更丰富的功能,当然他也是可以做到数据持久化的。
参考了不少资料,但是在PHP下使用rabbitmq的案例并不多,有些代码也略显陈旧,今天主要从一下几个方面开始介绍和总结。
1. php-amqp 扩展安装
2. exchange,queue,routing_key, name概念的相关梳理
3. php-amqp相关API解析
4. rabbitmq 持久化使用
php-amqp扩展安装
rabbitmq对应php的api其实有好几个,从性能上当然首选pecl的最佳。php-amqp是抽象的消息队列扩展,并不是只针对rabbitmq提供访问服务,只要是支持AMQP协议的消息队列服务程序,都能通过php-amqp插件获得访问能力。
在安装php-amqp扩展的时候需要先安装了librabbitmq依赖,如果你正在使用centos的系统,我的建议是最好不要使用yum仓库中提供的librabbitmq包,最好手工去下载最新版本的文件进行手工编译安装,因为楼主在用yum安装了ibrabbitmq-devel的时候发现了兼容性问题,导致php始终连接不上rabbitmq。
官方源文件rabbit-c的安装地址在https://github.com/alanxz/rabbitmq-c 这里,git下来后记得在configure的时候制定prefix安装路径,这样在pecl安装amqp扩展的时候才能让php-amqp包识别到librabbitmq的存在,如果直接configure的话会导致找不到librabbitmq相关文件。
rabbitmq-安装成功后,我们假定安装路径为/usr/local/librabbitmq ,那么接着安装php-amqp的时候步骤如下:
pecl install amqp
此时提示如下信息时:Set the path to librabbitmq install prefix [autodetect] :
输入刚才指定的安装路径/usr/local/librabbitmq 回车即可。
编译安装完成后将extension=amqp.so 放入php.ini文件中。
注意在使用rabbitmq的时候默认的账号是guest,但是这个账号限定只能本地访问操控,因此在我们使用rabbitmq远程访问的时候最好新建独立的账号,并进行相应授权,这个操作使用rabbitmqctl命令来控制。
exchange,queue,routing key, name概念的相关梳理
exchange相当于生产者的接收者,同时也相当于接收者数据派送者,类似于一个消息中转站。通常exchange的出现需要和queue配对,并且需要事先申明。exchange类型又主要分为三类:direct(定向直接投递),fanout(广播投递),topic(模糊主题投递)。exchange在申明创建的时候可以定义一个name名字,也可以不定义,不定义的时候rabbitmq会默认赋值一个固定的name。同时对exchange的属性也做了定义:AMQP_DURABLE(持久化,rabbitmq重启后exchange依然存在)和AMQP_PASSIVE(不自动创建exchange,而是检查exchange是否存在) 。
当我们需要检查一个name为Test的exchange是否存在的时候我们可以使用如下代码:
$connection = new AMQPConnection();
$connection->setHost('127.0.0.1');
$connection->setLogin('leeon');
$connection->setPassword('leeon');
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('Test'); // 设定要检查的exchange的名字
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_PASSIVE); // 使用PASSIVE参数
$exchange->declareExchange();
如果Test存在则不会抛出任何异常,如果不存在则会抛出异常
Fatal error: Uncaught AMQPExchangeException: Server channel error: 404, message: NOT_FOUND - no exchange 'Test' in vhost '/'
AMQP_DURABLE参数的设定将会保证rabbitmq的重启不会导致申明定义的exchange丢失。例如我们正常创建了一个exchange后可以在rabbitmq的web控制端看到此状态:
queue的定义主要在消费者这里做定义,定义queue的时候也可以定义一个name,这个name仅仅是对队列名字的命名,不同队列的名字name是不一样的。同样queue的属性定义也有好几个:
AMQP_DURABLE (持久化)
AMQP_PASSIVE (不自动创建queue,而是检查queue是否存在,不存在抛出异常)
AMQP_EXCLUSIVE (排他队列,以此定义命名的队列只有一个)
AMQP_AUTODELETE(当queue释放的时候是否自动删除这个队列在rabbitmq中的记录)
routing key 这个好比一个协商key,当发布者和消费者有协商一致的key策略的时候,消费者就可以合法从生产者手中获取数据。这个routing key主要当exchange设定为direct和topic模式的时候使用,fanout模式不使用routing key,那么这三个模式又有何区别呢?我们来看看如下三个图:
这三个图分别展示了direct,fanout,topic的消息投递规则。
php-amqp相关API解析
php-amqp在最近的版本中改动比较大,例如对于exchange和queue的delcare方法做了改变,修改为了declareExchange和declareQueue。因此网上很多相关的PHP教程事例代码并不能直接拿来学习了。
这里我们主要对publish,consume两个api方法进行讲解。
publish方法:
public function publish(
$message,
$routing_key = null,
$flags = AMQP_NOPARAM,
array $attributes = array()
) {
}
publish方法在生产者的代码逻辑中使用,message参数是需要传递的具体消息内容,这里我们可以通过序列化或者json字符串的形式进行复杂数据的封装。routing_key参数就是前文所说的exchange和queue之间的路由协商key。当我们使用fanout模式的时候,routing_key定义为null。 flags参数包含了两个定义:AMQP_MANDATORY 和 AMQP_IMMEDIATE. 这两个参数在当前php-amqp(1.8.0)版本中并没有实际的作用,因为php-amqp里面的实现调用的是异步模式,并不会等待结果的返回。
/* NOTE: basic.publish is asynchronous and thus will not indicate failure if something goes wrong on the broker */
int status = amqp_basic_publish(
channel_resource->connection_resource->connection_state,
channel_resource->channel_id,
(Z_TYPE_P(exchange_name) == IS_STRING && Z_STRLEN_P(exchange_name) > 0 ? amqp_cstring_bytes(Z_STRVAL_P(exchange_name)) : amqp_empty_bytes), /* exchange */
(key_len > 0 ? amqp_cstring_bytes(key_name) : amqp_empty_bytes), /* routing key */
(AMQP_MANDATORY & flags) ? 1 : 0, /* mandatory */
(AMQP_IMMEDIATE & flags) ? 1 : 0, /* immediate */
&props,
php_amqp_long_string(msg, msg_len) /* message body */
);
我们重点需要关注的是attributes的参数设定,这里是一个数组,可以同时定义多个属性,包括如下信息:
content_type: MIME类型定义,例如gzip
message_id: 消息唯一识别id
user_id: 用来记录这个消息的发送者唯一识别id
app_id: 用来记录这个消息的生产者的唯一识别id
delivery_mode: 消息的传送模式 1代表非持久态 2代表持久态(如果消息需要高可用,例如rabbitmq宕机后数据不丢失,name这里就要设置成2)
priority: 消息的优先级,设定值从0到9
timestamp: 消息当前被发送的时间的时间戳
expiration: 消息的过期时间,这个时间单位是毫秒
type: 消息的类型名字
reply_to: 通常用在RPC中,告知需要回复的队列
headers: 消息头部数据表,可以定义多个自定义数据。
consume方法:
public function consume(
callable $callback = null,
$flags = AMQP_NOPARAM,
$consumerTag = null
) {
}
consume方法需要传递一个回调函数,这里不支持以前我们php中的惯用法,传递一个方法名,而是需要传递一个匿名含函数的变量值。flags参数可选AMQP_AUTOACK设定,如果设定AMQP_AUTOACK则当消费者接收到消息后向rabbitmq自动确认消息接收成功,如果不设定AMQP_AUTOACK,则需要我们在代码中调用ack方法来确认消息接收状态。例如如下一段代码:
$connection = new AMQPConnection();
$connection->setHost('127.0.0.1');
$connection->setLogin('leeon');
$connection->setPassword('leeon');
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$queue = new AMQPQueue($channel);
$queue->setName('GGGGGG');
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declareQueue();
$queue->bind('topic','aaa.#');
$callback = function (AMQPEnvelope $message, AMQPQueue $queue) {
echo $message->getBody() . PHP_EOL;
$queue->ack($message->getDeliveryTag());
};
$queue->consume($callback);
当我们在编写消费者代码的时候,应当都需要调用bind方法来对exchange进行绑定,注意exchange的name一定要已经创建,否则调用bind方法会抛出异常。当使用topic和direct模型时,则需要制定明确的routing key。所以为什么大家在网上看到的PHP AMQP示例代码中,对于生产者和消费者都会申明一次exchange的缘由了,因为PHP的无状态性,为了保证程序代码的完整性,我们需要养成一个习惯,在生产者和消费者中都申明一次exchange和queue。
为何也要同时申明queue呢?因为消息的投递一定要有明确的queue队列来接收,当我们使用publish来发布后如果找不到合法的queue,这条消息就被rabbitmq丢弃掉了。
rabbitmq 持久化使用
rabbitmq的持久化分为三个部分:exchange,queue 和消息体。当需要一个高可用的消息队列服务时,我们需要同时对exchange和queue在创建的时候申明durable状态,在生产者发送的消息中定义delivery_mode为2即可。例如如下代码:
$connection = new AMQPConnection();
$connection->setHost('127.0.0.1');
$connection->setLogin('leeon');
$connection->setPassword('leeon');
$connection->connect();
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('ex');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();
try {
$queue = new AMQPQueue($channel);
$queue->setName('AAA');
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind('ex','keykey');
var_dump( $exchange->publish(
'hello world!',
'keykey',
AMQP_NOPARAM,
[
'delivery_mode'=>2
]
));
}catch (Exception $e){
var_dump($e);
}