Php MQ(Message Queue) 是一個開源的消息隊列,允許分布式系統(tǒng)組件之間進行異步通信。它的基本原理就是消息發(fā)送者將消息發(fā)布到一個隊列中,消息接收者則從隊列中獲取并處理消息。相較于直接在組件之間進行同步通信,MQ 具有更高的可靠性和靈活性,降低了系統(tǒng)組件之間的耦合性。
舉個例子,假設(shè)一個電商網(wǎng)站需要在用戶下單后,發(fā)送一封電子郵件通知用戶訂單詳情和預(yù)計送貨時間。傳統(tǒng)做法是用戶下單后直接調(diào)用發(fā)送郵件的函數(shù)來完成此操作。但是,如果郵件發(fā)送失敗,這個函數(shù)將會返回錯誤信息,導(dǎo)致下單操作無法成功完成,降低了系統(tǒng)的可靠性。而使用 MQ,我們將消息發(fā)布到一個郵件隊列中,郵件發(fā)送過程則由另一個組件來處理。這個組件可以在多個實例之間進行負載均衡,提高了系統(tǒng)的靈活性和可靠性。如果發(fā)送郵件失敗,MQ 會把消息重新推到郵件隊列中,直到成功為止。
PHP MQ 的常用實現(xiàn)包括 RabbitMQ、Kafka 和 RocketMQ。下面我們以 RabbitMQ 為例,來介紹 PHP MQ 的使用。
首先,我們需要安裝 RabbitMQ 的客戶端 PHP_AMQPLib。可以使用 Composer 進行安裝:
$ composer require php-amqplib/php-amqplib
接下來,我們需要連接到 RabbitMQ 服務(wù)器,并創(chuàng)建一個消息隊列。對于 RabbitMQ,我們可以使用 PHP_AMQPLib 的相應(yīng) API 進行操作:use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection(
'localhost', // RabbitMQ 服務(wù)器地址
5672, // RabbitMQ 服務(wù)器端口
'guest', // RabbitMQ 用戶名
'guest' // RabbitMQ 密碼
);
$channel = $connection->channel();
$queueName = 'email-queue';
$channel->queue_declare($queueName, false, true, false, false);
以上代碼創(chuàng)建了一個名為 email-queue 的隊列,并且開啟了持久化。持久化意味著即使 RabbitMQ 服務(wù)器或者 PHP 應(yīng)用程序出現(xiàn)故障,消息隊列也能夠保存在硬盤上,并且在服務(wù)器或者應(yīng)用程序恢復(fù)后繼續(xù)工作。
接下來,我們需要在應(yīng)用程序中發(fā)布一條消息到這個隊列中。以下代碼演示了如何發(fā)送一條包含用戶下單信息的消息到 email-queue 隊列:$emailMessage = [
'id' =>123456,
'email' =>'user@example.com',
'product' =>'Apple iPhone 13',
'quantity' =>1,
'price' =>1299
];
$messageBody = json_encode($emailMessage);
$queueName = 'email-queue';
$message = new AMQPMessage($messageBody, [
'delivery_mode' =>AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$channel->basic_publish($message, '', $queueName);
以上代碼通過將用戶下單信息序列化為 JSON 字符串,并創(chuàng)建一個 AMQPMessage 實例來表示這條消息。這個消息被設(shè)置成了持久化,確保即使在 RabbitMQ 服務(wù)器或者 PHP 應(yīng)用程序失敗的情況下也能夠被保存下來。最后,我們將這條消息發(fā)布到 email-queue 隊列中。
最后,我們需要消費這個隊列中的消息,并在郵件發(fā)送成功后發(fā)送一個確認消息給 RabbitMQ。以下代碼演示了如何為 email-queue 隊列設(shè)置一個消費者,處理隊列中的每一條消息:$queueName = 'email-queue';
$callback = function ($message) {
$emailMessage = json_decode($message->body, true);
// 發(fā)送郵件
$sendEmailResult = sendEmail($emailMessage);
if ($sendEmailResult) {
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} else {
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
}
};
$channel->basic_qos(null, 1, null); // 設(shè)置每次只取一個消息
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
以上代碼首先設(shè)置一個回調(diào)函數(shù) $callback,處理隊列中每一條消息。在此例中,$callback 函數(shù)會將消息反序列化為 PHP 數(shù)組 $emailMessage,并且調(diào)用 sendEmail 函數(shù)來發(fā)送郵件。如果郵件發(fā)送成功,我們會向 RabbitMQ 發(fā)送一個 ACK 確認消息,告知 RabbitMQ 可以將這條消息從隊列中刪除。反之,我們會發(fā)送一個 NACK 消息,要求 RabbitMQ 將這條消息重新放回隊列中等待處理。
接下來,我們?yōu)?email-queue 隊列設(shè)置一個消費者,調(diào)用 $callback 函數(shù)來處理每一條消息。在處理消息的過程中,如果沒有新的消息,我們將在 $channel->wait() 這個調(diào)用處阻塞線程,直到隊列中有新的消息到來。
綜上所述,PHP MQ 是一個強大的工具,可以讓分布式系統(tǒng)中的組件進行異步通信,提高系統(tǒng)的可靠性和靈活性。通過 PHP_AMQPLib,我們可以輕松地連接到 RabbitMQ 這樣的 MQ 服務(wù)器,并且使用消息隊列來進行組件之間的通信。