RabbitMQ是一個高效穩(wěn)定的消息隊列系統(tǒng),可在分布式系統(tǒng)之間傳遞消息。RabbitMQ使用AMQP協(xié)議,支持消息路由,傳輸確認和ACK機制,在消息的可靠性和穩(wěn)定性上表現(xiàn)出了優(yōu)異的性能。
在實際應用中,我們常常需要實現(xiàn)延時任務,例如:訂單支付成功后需要5分鐘后才能發(fā)貨;用戶發(fā)表了文章需要5秒后才能顯示等待。RabbitMQ提供了一種解決方案——延時隊列。
在延時隊列中,消息先被發(fā)送到特定的隊列,該隊列中的每個消息都附帶一個過期時間, 直到該過期時間到達才會釋放消息到原始隊列,或流向失敗隊列。如圖所示:
// 創(chuàng)建延時隊列并綁定死信交換機 $delayExchangeName = 'delay.order'; $delayExchangeArgs = new AMQPTable(['x-delayed-type' => 'direct']); $delayExchange = new AMQPExchange($channel); $delayExchange->setType('x-delayed-message'); $delayExchange->setName($delayExchangeName); $delayExchange->setArguments($delayExchangeArgs); $delayExchange->declare(); // 創(chuàng)建綁定死信隊列 $queueName = 'order'; $deadExchangeName = 'dead.order'; $deadRoutingKey = 'order.dead'; $deadQueueName = 'dead.order'; $deadExchange = new AMQPExchange($channel); $deadExchange->setName($deadExchangeName); $deadExchange->setType(AMQP_EX_TYPE_DIRECT); $deadExchange->declare(); $queueArgs = new AMQPTable([ 'x-dead-letter-exchange' => $deadExchangeName, 'x-dead-letter-routing-key' => $deadRoutingKey, 'x-message-ttl' => $delayTime * 1000, ]); $queue = new AMQPQueue($channel); $queue->setArguments($queueArgs); $queue->setName($queueName); $queue->declare(); $queue->bind($delayExchangeName, $queueName);
本文中,我們創(chuàng)建了一個延遲隊列delay.order,綁定了死信交換機dead.order。 消息在該隊列中保留的時間是由消費者處理延遲消息的時間大于TTL的時長度,當過期時間到達時,消息將被發(fā)送到死信交換機,然后轉移到“order.dead”隊列。 如果在該期間內成功處理,則消息將被刪除,否則將被置于失敗隊列。
當我們發(fā)布一個延遲消息時,我們需要在消息的Header中添加一個x-delay參數(shù)來表示延遲時間,具體如下:
$delayTime = 5 * 60 * 1000; $msg = new AMQPMessage('order paid', [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 'expiration' => strval($delayTime), 'application_headers' => new AMQPTable(['x-delay' => $delayTime]) ]); $exchange->publish($msg, 'order', AMQP_NOPARAM, [ 'delivery_mode' => 2, 'delay' => $delayTime, ]);
在這里,我們定義了一個在5分鐘后才能到達的消息,然后發(fā)布到名為“order”的交換器。 'delay'參數(shù)是AMQP協(xié)議的延時擴展,時間以毫秒為單位。
當消費者處理消息時,需要從“order”交換機中接收消息,但是此時應使用“對應的死信交換機和死信路由鍵”命名并創(chuàng)建消費隊列:
$deadQueue = new AMQPQueue($channel); $deadQueue->setName($deadQueueName); $deadQueue->declare(); $deadQueue->bind($deadExchangeName, $deadRoutingKey); while (true) { $msg = $queue->get(AMQP_AUTOACK); if (!$msg) { usleep(100000); continue; } // 處理消息 $deadExchange->publish($msg->getBody(), $deadRoutingKey, AMQP_NOPARAM, [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); }
在這里,消費者從一個名為“order”的AMQP隊列接收消息。如前所述,隊列被聲明為具有針對"dead.order"隊列的TTL和x-dead-letter-exchange。 此消費者將消息送到"dead.order"隊列。
此時,就可以用它來處理延遲消息了。 只需在死信交換機中監(jiān)聽即可。
$deadQueue = new AMQPQueue($channel); $deadQueue->setName($deadQueueName); $deadQueue->declare(); $deadQueue->bind($deadExchangeName, $deadRoutingKey); while (true) { $msg = $deadQueue->get(AMQP_AUTOACK); if (!$msg) { usleep(100000); continue; } // 處理死信消息 }
總的來說,延時隊列可以用來處理訂單支付成功后等待一段時間后再發(fā)貨等場景。而使用RabbitMQ的延時隊列可以有效的解決這類問題,提升系統(tǒng)的穩(wěn)定性和可靠性。