欧美一区二区三区,国内熟女精品熟女A片视频小说,日本av网,小鲜肉男男GAY做受XXX网站

php rabbitmq視頻

錢琪琛1年前6瀏覽0評論

今天我來跟大家聊一下php rabbitmq。那么什么是rabbitmq呢?簡單來說,它是一個消息隊列中間件,用于處理異步消息。如果你有過秒殺系統(tǒng)的開發(fā)經(jīng)歷,你可能會知道為應對高并發(fā),這時候使用消息隊列可以很好地處理消峰限流的問題,而rabbitmq就是其中名聲遠播的開源解決方案。

在php中,rabbitmq有非常豐富的擴展庫,如PhpAmqpLib和Enqueue/AmqpBunny等。我們以PhpAmqpLib為例子,來看看rabbitmq的基本使用流程。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new \PhpAmqpLib\Message\AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();

上述代碼中,我們首先使用autoload將庫文件加載進來,然后使用AMQPStreamConnection來連接到rabbitmq,queue_declare來聲明一個隊列,basic_publish來發(fā)送一條消息,echo輸出發(fā)送成功,并最后關閉連接。

針對不同的業(yè)務場景,我們可以結合rabbitmq提供的豐富功能來完成更復雜的業(yè)務操作。比如,我們可以通過交換機實現(xiàn)發(fā)布-訂閱模式,通過路由鍵實現(xiàn)消息路由、通過無限autopoll來進行消息輪詢等等。下面我們舉例說明幾種應用場景。

1.延時消息

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('my_delay_exchange', 'x-delayed-message', false, true, false, false, false, ['x-delayed-type'=>'direct']);
$channel->queue_declare('my_delay_msg_queue', false, false, false, false, false, ['x-dead-letter-exchange'=>'normal_exchange', 'x-dead-letter-routing-key'=>'normal', 'x-message-ttl'=>20000]);
$channel->queue_bind('my_delay_msg_queue', 'my_delay_exchange', 'delay');
$msg = new AMQPMessage('Hello World!');
$msg->set('application_headers', new \PhpAmqpLib\Wire\AMQPTable(array('x-delay'=>30000)));
$channel->basic_publish($msg, 'my_delay_exchange', 'delay', true);
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();

上述代碼中,我們首先聲明了一個exchange(my_delay_exchange)并指定了exchange的類型為延遲消息類型(x-delayed-message),同時聲明了一個queue(my_delay_msg_queue)并進行了死信隊列的綁定,指定過期時間為20秒,這里有一個用于測試的具體場景:我們向rabbitmq發(fā)送一條延時30秒的消息,當消息過期后,該消息將移入到名為normal的routingKey對應的queue中。

2.普通消息-消費者方處理失敗重復處理

如果是普通消息,為了防止在消息消費過程中失敗,可以結合rabbitmq的ack與reject機制來進行消息重新投遞,下面是一個簡單的例子。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$msg = new AMQPMessage('Hello World!');
$channel->queue_declare('normal', false, true, false, false);
$channel->basic_publish($msg, '', 'normal');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();

上述代碼中我們聲明了一個普通的queue,發(fā)送了一條消息。消費方代碼如下:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('normal', false, true, false, false);
$callback = function($msg){
echo ' [x] Received ', $msg->body, '\n';
$channel->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('normal', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();

注意到在$callback回調函數(shù)中,我們使用了basic_ack方法來確認收到消息,并告知rabbitmq可以安全刪除該消息。此時,rabbitmq將認為該條消息已經(jīng)被正確消費。如果消費方在消費的過程中遇到問題(比如消息格式解析錯誤),可以使用basic_reject方法拒絕消息并且指定是否重新投遞,或使用basic_nack方法拒絕消息并批量確認之前未確認的消息。

結語

本文我們介紹了php rabbitmq的基本使用方法以及應用示例,對于沒有接觸過消息隊列的同學,可以嘗試通過學習rabbitmq來豐富自己的技術棧,同時也能更好地理解和使用異步消息。