欧美一区二区三区,国内熟女精品熟女A片视频小说,日本av网,小鲜肉男男GAY做受XXX网站

php rabbitmq延時

沈明麗1年前8瀏覽0評論

RabbitMQ是一個高效穩(wěn)定的消息隊列系統(tǒng),可在分布式系統(tǒng)之間傳遞消息。RabbitMQ使用AMQP協(xié)議,支持消息路由,傳輸確認和ACK機制,在消息的可靠性和穩(wěn)定性上表現(xiàn)出了優(yōu)異的性能。

在實際應用中,我們常常需要實現(xiàn)延時任務,例如:訂單支付成功后需要5分鐘后才能發(fā)貨;用戶發(fā)表了文章需要5秒后才能顯示等待。RabbitMQ提供了一種解決方案——延時隊列。

在延時隊列中,消息先被發(fā)送到特定的隊列,該隊列中的每個消息都附帶一個過期時間, 直到該過期時間到達才會釋放消息到原始隊列,或流向失敗隊列。如圖所示:

// 創(chuàng)建延時隊列并綁定死信交換機
$delayExchangeName = 'delay.order';
$delayExchangeArgs = new AMQPTable(['x-delayed-type' => 'direct']);
$delayExchange = new AMQPExchange($channel);
$delayExchange->setType('x-delayed-message');
$delayExchange->setName($delayExchangeName);
$delayExchange->setArguments($delayExchangeArgs);
$delayExchange->declare();
// 創(chuàng)建綁定死信隊列
$queueName = 'order';
$deadExchangeName = 'dead.order';
$deadRoutingKey = 'order.dead';
$deadQueueName = 'dead.order';
$deadExchange = new AMQPExchange($channel);
$deadExchange->setName($deadExchangeName);
$deadExchange->setType(AMQP_EX_TYPE_DIRECT);
$deadExchange->declare();
$queueArgs = new AMQPTable([
'x-dead-letter-exchange' => $deadExchangeName,
'x-dead-letter-routing-key' => $deadRoutingKey,
'x-message-ttl' => $delayTime * 1000,
]);
$queue = new AMQPQueue($channel);
$queue->setArguments($queueArgs);
$queue->setName($queueName);
$queue->declare();
$queue->bind($delayExchangeName, $queueName);

本文中,我們創(chuàng)建了一個延遲隊列delay.order,綁定了死信交換機dead.order。 消息在該隊列中保留的時間是由消費者處理延遲消息的時間大于TTL的時長度,當過期時間到達時,消息將被發(fā)送到死信交換機,然后轉移到“order.dead”隊列。 如果在該期間內成功處理,則消息將被刪除,否則將被置于失敗隊列。

當我們發(fā)布一個延遲消息時,我們需要在消息的Header中添加一個x-delay參數(shù)來表示延遲時間,具體如下:

$delayTime = 5 * 60 * 1000;
$msg = new AMQPMessage('order paid', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => strval($delayTime),
'application_headers' => new AMQPTable(['x-delay' => $delayTime])
]);
$exchange->publish($msg, 'order', AMQP_NOPARAM, [
'delivery_mode' => 2,
'delay' => $delayTime,
]);

在這里,我們定義了一個在5分鐘后才能到達的消息,然后發(fā)布到名為“order”的交換器。 'delay'參數(shù)是AMQP協(xié)議的延時擴展,時間以毫秒為單位。

當消費者處理消息時,需要從“order”交換機中接收消息,但是此時應使用“對應的死信交換機和死信路由鍵”命名并創(chuàng)建消費隊列:

$deadQueue = new AMQPQueue($channel);
$deadQueue->setName($deadQueueName);
$deadQueue->declare();
$deadQueue->bind($deadExchangeName, $deadRoutingKey);
while (true) {
$msg = $queue->get(AMQP_AUTOACK);
if (!$msg) {
usleep(100000);
continue;
}
// 處理消息
$deadExchange->publish($msg->getBody(), $deadRoutingKey, AMQP_NOPARAM, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
}

在這里,消費者從一個名為“order”的AMQP隊列接收消息。如前所述,隊列被聲明為具有針對"dead.order"隊列的TTL和x-dead-letter-exchange。 此消費者將消息送到"dead.order"隊列。

此時,就可以用它來處理延遲消息了。 只需在死信交換機中監(jiān)聽即可。

$deadQueue = new AMQPQueue($channel);
$deadQueue->setName($deadQueueName);
$deadQueue->declare();
$deadQueue->bind($deadExchangeName, $deadRoutingKey);
while (true) {
$msg = $deadQueue->get(AMQP_AUTOACK);
if (!$msg) {
usleep(100000);
continue;
}
// 處理死信消息
}

總的來說,延時隊列可以用來處理訂單支付成功后等待一段時間后再發(fā)貨等場景。而使用RabbitMQ的延時隊列可以有效的解決這類問題,提升系統(tǒng)的穩(wěn)定性和可靠性。