Asynchronous queueing

Message queues provide an asynchronous communication protocol. In an asynchronous communication protocol, the sender and the receiver need not interact with the message queue simultaneously.

Typical HTTP, on the other hand, is a synchronous communication protocol, meaning that the client is blocked until the operation is completed.

Consider this; you call someone on the phone, then you wait for the phone to ring and the person you talk to listens to whatever you have to say then and there. At the end of the communication you say goodbye and that is acknowledged by someone on the other end saying goodbye back. This can be considered synchronous as you don't do anything until you get a response from the person you're communicating with to end the communication.

However, if you were to send a text message to someone instead, after you send that message you can go off and do whatever behavior you please; you can receive a message in return to the one you sent when they want to communicate back to you. While someone is drafting the response to send back, you can go off and do whatever you want. While you don't communicate directly with the sender, you do still maintain synchronous communication with your phone, which notifies you when you get a new message (or simply check your phone every few minutes); but the communication with the other party itself is asynchronous. Neither party needs to know anything about the other party, they just merely are looking out for their own text messages in order to communicate with each other.

Message Queue pattern (Getting started with RabbitMQ)

RabbitMQ is a message broker; it accepts and forwards messages. Here, let's configure it so that we can send messages from one PHP script to another.

Imagine we are giving a package to a courier in order for them to give to the client; RabbitMQ is the courier, while the scripts are the individuals receiving and sending the packages respectively.

As the first step, let's install RabbitMQ; I'm going to demonstrate this on an Ubuntu 14.04 system.

To start with, we need to add the RabbitMQ APT repository to our /etc/apt/sources.list.d folder. This can fortunately be actioned with a command, like this:

echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list

Note that the repository may be liable to change; if it does, you can find the latest details at https://www.rabbitmq.com/install-debian.html.

We can optionally also add the RabbitMQ public key to the trusted key list to avoid any warnings indicating packages are unsigned when we install or upgrade the packages through the apt command:

wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -

So far, so good:

Message Queue pattern (Getting started with RabbitMQ)

Next, let's just run an apt-get update command to fetch the packages from the new repository we've included. After this is done we can get around to installing the package we need using the apt-get install rabbitmq-server command:

Message Queue pattern (Getting started with RabbitMQ)

Be sure to accept the various prompts when asked:

Message Queue pattern (Getting started with RabbitMQ)

After installation, you may run rabbitmqctl status to check the status of the application to check it's running OK:

Message Queue pattern (Getting started with RabbitMQ)

Let's make our lives easier for a second. We can use a web GUI to manage RabbitMQ; simply run the following command:

rabbitmq-plugins enable rabbitmq_management

Message Queue pattern (Getting started with RabbitMQ)

We can now see an admin interface at  <your server IP here>:15672:

Message Queue pattern (Getting started with RabbitMQ)

But before we can log in, we're going to have to create some login credentials. In order to do this we're going to have to head back to the command line.

Firstly, we'll need to set a new account with a username of junade and a password of insecurepassword:

rabbitmqctl add_user junade insecurepassword

Then we can add some admin privileges:

rabbitmqctl set_user_tags junade administrator
rabbitmqctl set_permissions -p / junade ".*" ".*" ".*"

Returning to the login page, we can now see our cool admin interface after we enter in these credentials:

Message Queue pattern (Getting started with RabbitMQ)

This is the web interface for the RabbitMQ service, accessible through our web browser

Now we can test what we've installed. Let's start off by writing a composer.json file for this new project:

{ 
  "require": { 
    "php-amqplib/php-amqplib": "2.5.*" 
  } 
} 

RabbitMQ uses the advanced message queuing protocol (AMQP), which is why we're installing a PHP library that will essentially help us communicate with it over this protocol.

Next up, we can write some code to send a message using the RabbitMQ message broker we just installed:

This assumes the port is 5672 and the install is on localhost, which may change depending on your circumstances.

Let's write a little PHP script to utilize this:

<?php 
 
require_once(__DIR__ . '/vendor/autoload.php'); 
use PhpAmqpLibConnectionAMQPStreamConnection; 
use PhpAmqpLibMessageAMQPMessage; 
 
$connection = new AMQPStreamConnection('localhost', 5672, 'junade', 'insecurepassword'); 
$channel    = $connection->channel(); 
 
$channel->queue_declare( 
  'sayHello',     // queue name 
  false,          // passive 
  true,           // durable 
  false,          // exclusive 
  false           // autodelete 
); 
 
$msg = new AMQPMessage("Hello world!"); 
 
$channel->basic_publish( 
  $msg,           // message 
  '',             // exchange 
  'sayHello'      // routing key 
); 
 
$channel->close(); 
$connection->close(); 
 
echo "Sent hello world message." . PHP_EOL; 

So let's break this down a little bit. In the first few lines, we just include the library from the Composer autoload and state which namespaces we're going to use. When we instantiate the AMQPStreamConnection object we actually connect to the message broker; we can then create a new channel object that we then use to declare a new queue on. We declare a queue by calling the queue_declare message. The durable option allows messages to survive reboots in RabbitMQ. Finally, we just go ahead and send out our message.

Let's now run this script:

php send.php

The output of this looks like this:

Message Queue pattern (Getting started with RabbitMQ)

If you now go to the web interface for RabbitMQ, click the queues tab and toggle the Get Message(s) dialog; you should be able to pull in the message we just sent to the broker:

Message Queue pattern (Getting started with RabbitMQ)

Using this web page in the interface, we can extract messages from the queue so we can look at their contents

Of course, this is just half the story. We now need to actually retrieve this message using another app.

Let's write a receive.php script:

<?php 
 
require_once(__DIR__ . '/vendor/autoload.php'); 
use PhpAmqpLibConnectionAMQPStreamConnection; 
use PhpAmqpLibMessageAMQPMessage; 
 
$connection = new AMQPStreamConnection('localhost', 5672, 'junade', 'insecurepassword'); 
$channel    = $connection->channel(); 
 
$channel->queue_declare( 
  'sayHello',     // queue name 
  false,          // passive 
  false,          // durable 
  false,          // exclusive 
  false           // autodelete 
); 
 
$callback = function ($msg) { 
  echo "Received: " . $msg->body . PHP_EOL; 
}; 
 
$channel->basic_consume( 
  'sayHello',                     // queue 
  '',                             // consumer tag 
  false,                          // no local 
  true,                           // no ack 
  false,                          // exclusive 
  false,                          // no wait 
  $callback                       // callback 
); 
 
while (count($channel->callbacks)) { 
  $channel->wait(); 
} 

Note that the first few lines are identical to our sending script; we even re-declare the queue in case this receive script is run before the send.php script is run.

Let's run our receive.php script:

Message Queue pattern (Getting started with RabbitMQ)

In another bash Terminal, let's run the send.php script a few times:

Message Queue pattern (Getting started with RabbitMQ)

Accordingly, in the receive.php Terminal tab, we can now see we've received the messages we've been sending:

Message Queue pattern (Getting started with RabbitMQ)

The RabbitMQ documentation uses the following diagram to describe the basic accepting and forwarding of messages:

Message Queue pattern (Getting started with RabbitMQ)

Publish-Subscriber pattern

The Publish-Subscriber pattern (or Pub/Sub for short) is a design pattern whereby messages aren't directly sent from publisher to subscribers; instead, publishers push out the message without any knowledge.

In RabbitMQ, the producer never sends any messages directly to the queue. Quite often, the producer doesn't even know if the message will end up in a queue at all. Instead, the producer must send messages to an exchange. It receives messages from producers then pushes them out to queues.

The consumer is the application that will receive the messages.

The exchange must be told exactly how to handle a given message, and which queue(s) it should be appended to. These rules are defined by the exchange type.

The RabbitMQ documentation describes a Publish-Subscriber relationship (connecting the publisher, exchange, queue, and consumer) as follows:

Publish-Subscriber pattern

A direct exchange type delivers messages based on a routing key. It can be used both for one-to-one and one-to-many forms of routing, but it is best suited to a one-to-one relationship.

A fanout exchange type routes messages to all queues that are bound to it and the routing key is completely ignored. Effectively, you cannot differentiate between which workers messages will be distributed to based on the routing key.

A topic exchange type works by routing messages to one or many queues on the basis of a messaging routing queue and the pattern that was used to bind a queue to an exchange. This exchange has the potential to work well when are multiple consumers/applications that want to choose the type of messages they want to receive, usually in a many-to-many relationship.

The headers exchange type is commonly used to route on a set of attributes that are better expressed in message headers than the routing queue. Instead of using routing keys, the attributes to the route are based on the headers attribute.

In order to test a Pub/Sub queue, we will be using the following scripts. They are similar to the one in the example earlier, except I have modified them so that they use exchanges. Here is our send.php file:

<?php 
 
require_once(__DIR__ . '/vendor/autoload.php'); 
use PhpAmqpLibConnectionAMQPStreamConnection; 
use PhpAmqpLibMessageAMQPMessage; 
 
$connection = new AMQPStreamConnection('localhost', 5672, 'junade', 'insecurepassword'); 
$channel    = $connection->channel(); 
 
$channel->exchange_declare( 
  'helloHello',   // exchange 
  'fanout',       // exchange type 
  false,          // passive 
  false,          // durable 
  false           // auto-delete 
); 
 
$msg = new AMQPMessage("Hello world!"); 
 
$channel->basic_publish( 
  $msg,           // message 
  'helloHello'    // exchange 
); 
 
$channel->close(); 
$connection->close(); 
 
echo "Sent hello world message." . PHP_EOL; 

Here is our receive.php file. Like before, I have modified this script so that it also uses exchanges:

<?php 
 
require_once(__DIR__ . '/vendor/autoload.php'); 
use PhpAmqpLibConnectionAMQPStreamConnection; 
use PhpAmqpLibMessageAMQPMessage; 
 
$connection = new AMQPStreamConnection('localhost', 5672, 'junade', 'insecurepassword'); 
$channel    = $connection->channel(); 
 
$channel->exchange_declare( 
  'helloHello',   // exchange 
  'fanout',       // exchange type 
  false,          // passive 
  false,          // durable 
  false           // auto-delete 
); 
 
$callback = function ($msg) { 
  echo "Received: " . $msg->body . PHP_EOL; 
}; 
 
list($queueName, ,) = $channel->queue_declare("", false, false, true, false); 
 
$channel->queue_bind($queueName, 'helloHello'); 
 
$channel->basic_consume($queueName, '', false, true, false, false, $callback); 
 
while (count($channel->callbacks)) { 
  $channel->wait(); 
} 
 
$channel->close(); 
$connection->close(); 

Now, let's test these scripts. We'll first need to have our receive.php script running, then we can send messages across using our send.php script.

First, let's trigger our receive.php script so that it starts running:

Publish-Subscriber pattern

After this is complete we can then move on to sending messages by running our send.php script:

Publish-Subscriber pattern

This will now populate our Terminal running receive.php with the following information:

Publish-Subscriber pattern

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.218.78.102