今天我來跟大家聊一下php rabbitmq。那么什么是rabbitmq呢?簡單來說,它是一個消息隊列中間件,用于處理異步消息。如果你有過秒殺系統(tǒng)的開發(fā)經(jīng)歷,你可能會知道為應對高并發(fā),這時候使用消息隊列可以很好地處理消峰限流的問題,而rabbitmq就是其中名聲遠播的開源解決方案。
在php中,rabbitmq有非常豐富的擴展庫,如PhpAmqpLib和Enqueue/AmqpBunny等。我們以PhpAmqpLib為例子,來看看rabbitmq的基本使用流程。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); $msg = new \PhpAmqpLib\Message\AMQPMessage('Hello World!'); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close();
上述代碼中,我們首先使用autoload將庫文件加載進來,然后使用AMQPStreamConnection來連接到rabbitmq,queue_declare來聲明一個隊列,basic_publish來發(fā)送一條消息,echo輸出發(fā)送成功,并最后關閉連接。
針對不同的業(yè)務場景,我們可以結合rabbitmq提供的豐富功能來完成更復雜的業(yè)務操作。比如,我們可以通過交換機實現(xiàn)發(fā)布-訂閱模式,通過路由鍵實現(xiàn)消息路由、通過無限autopoll來進行消息輪詢等等。下面我們舉例說明幾種應用場景。
1.延時消息
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('my_delay_exchange', 'x-delayed-message', false, true, false, false, false, ['x-delayed-type'=>'direct']); $channel->queue_declare('my_delay_msg_queue', false, false, false, false, false, ['x-dead-letter-exchange'=>'normal_exchange', 'x-dead-letter-routing-key'=>'normal', 'x-message-ttl'=>20000]); $channel->queue_bind('my_delay_msg_queue', 'my_delay_exchange', 'delay'); $msg = new AMQPMessage('Hello World!'); $msg->set('application_headers', new \PhpAmqpLib\Wire\AMQPTable(array('x-delay'=>30000))); $channel->basic_publish($msg, 'my_delay_exchange', 'delay', true); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close();
上述代碼中,我們首先聲明了一個exchange(my_delay_exchange)并指定了exchange的類型為延遲消息類型(x-delayed-message),同時聲明了一個queue(my_delay_msg_queue)并進行了死信隊列的綁定,指定過期時間為20秒,這里有一個用于測試的具體場景:我們向rabbitmq發(fā)送一條延時30秒的消息,當消息過期后,該消息將移入到名為normal的routingKey對應的queue中。
2.普通消息-消費者方處理失敗重復處理
如果是普通消息,為了防止在消息消費過程中失敗,可以結合rabbitmq的ack與reject機制來進行消息重新投遞,下面是一個簡單的例子。<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $msg = new AMQPMessage('Hello World!'); $channel->queue_declare('normal', false, true, false, false); $channel->basic_publish($msg, '', 'normal'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close();
上述代碼中我們聲明了一個普通的queue,發(fā)送了一條消息。消費方代碼如下:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('normal', false, true, false, false); $callback = function($msg){ echo ' [x] Received ', $msg->body, '\n'; $channel->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('normal', '', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
注意到在$callback回調函數(shù)中,我們使用了basic_ack方法來確認收到消息,并告知rabbitmq可以安全刪除該消息。此時,rabbitmq將認為該條消息已經(jīng)被正確消費。如果消費方在消費的過程中遇到問題(比如消息格式解析錯誤),可以使用basic_reject方法拒絕消息并且指定是否重新投遞,或使用basic_nack方法拒絕消息并批量確認之前未確認的消息。