在程序開發中,很多時候我們需要使用消息隊列來解決一些應用場景,比如異步處理,任務調度等等,而RabbitMQ作為知名的輕量級開源消息隊列系統,被廣泛地應用于多個領域,包括金融、醫療、電商等等。
PHP中有很多操作RabbitMQ的類庫,這里我們就以php-amqplib這個類庫為例來演示如何使用RabbitMQ。
安裝php-amqplib
composer require php-amqplib/php-amqplib
這里我們先介紹RabbitMQ中幾個主要的概念:
- Producer:生產者,一般是指生產消息的應用。
- Exchange:交換機,一般是指消息的分配中心。
- Queue:隊列,一般是指存儲消息的地方。
- Consumer:消費者,一般是指消費消息的應用。
生產者
生產者主要是指生產消息的應用,我們可以使用php-amqplib來實現。
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage('Hello World!'); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close();
這段代碼的意思是發送一條消息到隊列名為hello的隊列中。
消費者
消費者主要是指消費消息的應用,我們也可以使用php-amqplib來實現。
require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
這段代碼的意思是從隊列名為hello的隊列中接收一條消息并打印出來。
運行結果
我們在命令行中分別運行生產者和消費者,可以看到結果如下:
生產者:
[x] Sent 'Hello World!'
消費者:
[*] Waiting for messages. To exit press CTRL+C [x] Received Hello World!
總結
通過上述例子可以看出,使用php-amqplib實現RabbitMQ非常簡單。RabbitMQ已經成為消息隊列領域的佼佼者,它的高性能、高可用性和可擴展性是很多開發者青睞的選擇。而php-amqplib又是PHP中最實用的RabbitMQ開發庫之一,可以方便地連接、新建隊列、生產消息和消費消息,并且使用起來非常簡單,因此,筆者在這里強烈推薦使用php-amqplib來實現RabbitMQ。