Bridging ZeroMQ and HTTP

As you have seen in this chapter, ZeroMQ offers a lot of different possibilities for implementing communication between separate services. In particular, patterns such as publish/subscribe and push/pull are not that easy to implement with PHP's de-facto standard protocol, HTTP.

On the other hand, HTTP is more widely adopted and offers a richer set of protocol semantics, handling concerns such as caching or authentication already at the protocol-level. Because of this, especially when offering external APIs, you might prefer offering an HTTP-based API instead of a ZeroMQ-based API. Luckily, it's easy to bridge between the two protocols. In our example architecture, the checkout service is the only service that will be used by outside services. In order to offer a better interface for the checkout service, we will now implement an HTTP-based wrapper for the checkout service that can be used in a RESTful way.

For this, you can use the react/http package. This package offers a minimalist HTTP server that - just like react/zmq - works asynchronously and uses an event loop for handling requests. This means that a react-based HTTP server can even run in the same process using the same event loop as the REP ZeroMQ socket that is already offered by the checkout service. Start by installing the react/http package by running the following command in the checkout/ folder in your project directory:

$ composer require react/http

Before extending the checkout service with an HTTP server, the server.php script needs a bit of refactoring. Currently, the server.php creates a REP ZeroMQ socket with an event listener function in which the request is processed. As our goal is now to add an HTTP API that triggers the same functionality, we'll need to extract this logic into a separate class. Start by creating the PacktChp7CheckoutCheckoutService class:

namespace PacktChp7Checkout; 
 
use ReactPromisePromiseInterface; 
 
class CheckoutService 
{ 
    private $client; 
 
    public function __construct(JsonRpcClient $client) 
    { 
        $this->client = $client; 
    } 
 
    public function handleCheckoutOrder(string $msg): PromiseInterface 
    { 
    } 
} 

The handleCheckoutOrder method will be holding the logic that was previously implemented directly in the server.php file. As this method will later be used by both the ZeroMQ REP socket and the HTTP server, this method cannot directly send a response message, but will simply return a promise that can then be used in the server.php:

public function handleCheckoutOrder(string $msg): PromiseInterface 
{ 
    $request = json_decode($msg); 
    $promises = []; 
 
    foreach ($request->cart as $article) { 
        $promises[] = $this->client->request('checkArticle', [$article->articlenumber, $article->amount]); 
    } 
 
    return ReactPromiseall($promises) 
        ->then(function(array $values):bool { 
            if (array_sum($values) != count($values)) { 
                throw new Exception('not all articles are in stock'); 
            } 
            return true; 
        })->then(function() use ($request):PromiseInterface { 
            $promises = []; 
 
            foreach ($request->cart as $article) { 
                $promises[] = $this->client->request('takeArticle', [$article->articlenumber, $article->amount]); 
            } 
 
            return ReactPromiseall($promises); 
        })->then(function(array $values):bool { 
            if (array_sum($values) != count($values)) { 
                throw new Exception('not all articles are in stock'); 
            } 
            return true; 
        }); 
} 

The consistent use of promises and not caring about the return message actually allows some simplifications; instead of directly sending back an error message, you can simply throw an exception, which will cause the promise returned by this function to be automatically rejected.

The existing server.php file can now be simplified by quite a few lines of code:

$client          = new JsonRpcClient($ctx, 'tcp://inventory:5557'); 
$checkoutService = new CheckoutService($client); 
 
$socket->on('message', function($msg) use ($ctx, $checkoutService, $pubSocket, $socket) { 
    echo "received checkout order $msg
"; 
 
    $checkoutService->handleCheckoutOrder($msg)->then(function() use ($pubSocket, $msg, $socket) {

        $pubSocket->send($msg);

        $socket->send(json_encode(['msg' => 'OK']));

    }, function(Exception $err) use ($socket) {

        $socket->send(json_encode(['error' => $err->getMessage()]));

    }); 
}); 

Next, you can get to work on the HTTP server. For this, you'll first need a simple socket server that you can then pass into the actual HTTP server class. This can be done at any point in the server.php before the event loop is run:

$httpSocket = new ReactSocketServer($loop);

$httpSocket->listen(8080, '0.0.0.0');

$httpServer = new ReactHttpServer($httpSocket); 
 
$loop->run(); 

The HTTP server itself has a 'request' event for which you can register a listener function (similar to the 'message' event of the ZeroMQ sockets). The listener function gets a request and a response object passed as a parameter. These are instances of the ReactHttpRequest respective ReactHttpResponse classes:

$httpServer->on('request', function(ReactHttpRequest $req, ReactHttpResponse $res) { 
    $res->writeHead(200); 
    $res->end('Hello World'); 
}); 

Unfortunately, React HTTP's Request and Response classes are not compatible with the respective PSR-7 interfaces. However, if the need arises you can convert them relatively easily, as already seen in the section Bridging Ratchet and PSR-7 applications in Chapter 6, Building a Chat Application.

Within this listener function, you can first check for a correct request method and path, and send an error code, otherwise:

$httpServer->on('request', function(ReactHttpRequest $req, ReactHttpResponse $res) { 
    if ($request->getPath() != '/orders') {

        $msg = json_encode(['msg' => 'this resource does not exist']);

        $response->writeHead(404, [

            'Content-Type' => 'application/json;charset=utf8',

            'Content-Length' => strlen($msg)

        ]);

        $response->end($msg);

        return;

    }

    if ($request->getMethod() != 'POST') {

        $msg = json_encode(['msg' => 'this method is not allowed']);

        $response->writeHead(405, [

            'Content-Type' => 'application/json;charset=utf8',

            'Content-Length' => strlen($msg)

        ]);

        $response->end($msg);

        return;

    } 
}); 

This is where it gets tricky. The ReactPHP HTTP server is so asynchronous, that when the request event is triggered the request body has not yet been read from the network socket. To get the actual request body, you need to listen on the request's data event. However, the request body is read in chunks of 4096 bytes, so for large request bodies, the data event may actually be called multiple times. The easiest way to read the full request body is to check the Content-Length header and check in the data event handler if exactly this amount of bytes has already been read:

$httpServer->on('request', function(ReactHttpRequest $req, ReactHttpResponse $res) { 
    // error checking omitted... 
 
    $length = $req->getHeaders()['Content-Length'];

    $body   = '';

    $request->on('data', function(string $chunk) use (&$body) {

        $body .= $chunk;

        if (strlen($body) == $length) {

            // body is complete!

        }

    }); 
}); 

Of course, this won't work when the sender uses the so-called chunked transfer encoding in their request. However, reading a request body using chunked transfer would work a similar way; in this case, the exit condition is not dependent on the Content-Length header, but instead when the first empty chunk has been read.

After the complete request body has been read, you can then pass this body into the $checkoutService that you have already used before:

$httpServer->on('request', function(ReactHttpRequest $req, ReactHttpResponse $res) use ($pubSocket, $checkoutService) { 
    // error checking omitted... 
 
    $length = $req->getHeaders()['Content-Length']; 
    $body   = ''; 
 
    $request->on('data', function(string $chunk) use (&$body, $pubSocket, $checkoutService) { 
        $body .= $chunk; 
        if (strlen($body) == $length) { 
            $checkoutService->handleCheckoutOrder($body)

                ->then(function() use ($response, $body, $pubSocket) {
                    $pubSocket->send($body);

                    $msg = json_encode(['msg' => 'OK']);

                    $response->writeHead(200, [

                        'Content-Type' => 'application/json',

                        'Content-Length' => strlen($msg)

                    ]);

                    $response->end($msg);

                }, function(Exception $err) use ($response) {

                    $msg = json_encode(['msg' => $err->getMessage()]);

                    $response->writeHead(500, [

                        'Content-Type' => 'application/json',

                        'Content-Length' => strlen($msg)

                    ]);

                    $response->end($msg);

                }); 
        } 
    }); 
}); 

The CheckoutService class is used exactly the same way as before. The only difference now is how the response is sent back to the client; if the original request was received by the ZeroMQ REP socket, a respective response was sent to the REQ socket that sent the request. Now, if the request was received by the HTTP server, an HTTP response with the same content is sent.

You can test your new HTTP API using a command-line tool such as curl or HTTPie:

$ http -v localhost:8080/orders
cart:='[{"articlenumber":1000,"amount":3}]' customer:='{"email":"[email protected]"}'

The following screenshot shows an example output when testing the new API endpoint using the preceding HTTPie command:

Bridging ZeroMQ and HTTP

Testing the new HTTP API

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.216.77.153