Building the inventory service

We will start by implementing the inventory service, as it will use a simple request/reply pattern for communication and it does not have any other dependencies.

Getting started with ZeroMQ REQ/REP sockets

Start by creating the service's composer.json file in the inventory/ directory:

{ 
  "name": "packt-php7/chp7-inventory", 
  "type": "project", 
  "authors": [{ 
    "name": "Martin Helmich", 
    "email": "[email protected]" 
  }], 
  "require": { 
    "php": ">= 7.0", 
    "ext-zmq": "*" 
  }, 
  "autoload": { 
    "psr-4": { 
      "Packt\Chp7\Inventory": "src/" 
    } 
  } 
} 

After creating the composer.json file, install the project's dependencies using the composer install command on a command line within the inventory/ directory.

Let's start by creating a server.php file for the inventory. Just like the Ratchet application from Chapter 6, Building a Chat Application, this file will later be our main server process - remember, in this example, we're not even using HTTP as a communication protocol, so there's no web server and no FPM involved anywhere.

The starting point of each ZeroMQ application is the context. The context stores all kind of states that the ZeroMQ library needs for maintaining sockets and communicating with other sockets. You can then use this context to create a new socket and bind this context to a port:

$args = getopt('p:', ['port=']); 
$ctx = new ZMQContext(); 
 
$port = $args['p'] ?? $args['port'] ?? 5557; 
$addr = 'tcp://*:' . $port; 
 
$sock = $ctx->getSocket(ZMQ::SOCKET_REP); 
$sock->bind($addr); 

This code creates a new ZeroMQ REP socket (a socket that can reply to requests) and binds this socket to a configurable TCP port (5557 by default). You can now receive messages on this socket and reply to them:

while($message = $sock->recv()) { 
    echo "received message '" . $message . "'
"; 
    $sock->send("this is my response message"); 
} 

As you can see, this loop will poll infinitely for new messages and then respond to them. The socket's recv() method will block the script execution until a new message has been received (you can later use the react/zmq library to easily implement non-blocking sockets, but this will suffice for now).

In order to test your ZeroMQ server, you can create a second file, client.php, in your inventory/ directory in which you can use an REQ socket to send requests to the server:

$args = getopt('h', ['host=']); 
$ctx = new ZMQContext(); 
 
$addr = $args['h'] ?? $args['host'] ?? 'tcp://127.0.0.1:5557'; 
 
$sock = $ctx->getSocket(ZMQ::SOCKET_REQ); 
$sock->connect($addr); 
 
$sock->send("This is my request"); 
var_dump($sock->recv()); 

When your server script is running, you can simply run the client.php script to connect to the server's REP socket, send a request, and wait for the server's reply. Just like with the REP socket, the REQ socket's recv method will also block until a reply has been received from the server.

If you are using Docker compose to manage the multitude of containers in your development environment (currently, it's only one, but there will be more), add the following section to your docker-compose.yml file:

inventory: 
  build: inventory 
  ports: 
    - 5557 
  volumes: 
    - inventory:/usr/src/app 

After adding the inventory service to the docker-compose.yml configuration file, you can start the container by simply running the following command on a command line:

$ docker-compose up

Using JsonRPC for communication

Now we have a server that can receive text messages from a client and then send responses back to that client. However, in order to build a working and maintainable Microservice architecture, we'll need some kind of protocol and format that these messages can follow and all services can agree upon. Often in Microservice architectures, this common denominator is HTTP, whose rich protocol semantics can be used to easily build REST web services. However, ZeroMQ as a protocol is much more low-level and does not concern itself with different request methods, headers, caching, and all the other features that come for free with HTTP.

Instead of a RESTful service, we will implement the inventory service as a simple Remote Procedure Call (RPC) service. A quick and easy format that can be used for this is JSON-RPC, which implements RPCs with JSON messages. Using JSON-RPC, a client can send a method call using the following JSON format:

{ 
  "jsonrpc": "2.0", 
  "method": "methodName", 
  "params": ["foo", "bar", "baz"], 
  "id": "some-random-id" 
} 

The server can then respond to this message using the following format:

{ 
  "jsonrpc": "2.0", 
  "id": "id from request", 
  "result": "the result value" 
} 

Or alternatively, when an error occurred during processing:

{ 
  "jsonrpc": "2.0", 
  "id": "id from request", 
  "error": { 
    "message": "the error message", 
    "code": 1234 
  } 
} 

This protocol is relatively simple and we can easily implement it on top of ZeroMQ. For this, start by creating a new PacktChp7InventoryJsonRpcServer class. This server will need a ZeroMQ socket and also an object that provides the methods that clients should be able to invoke using RPCs:

namespace PacktChp7Inventory; 
 
class JsonRpcServer 
{ 
    private $socket; 
    private $server; 
 
    public function __construct(MQSocket $socket, $server) 
    { 
        $this->socket = $socket; 
        $this->server = $server; 
    } 
} 

We can now implement a method that receives messages from the socket, tries to parse them as JSON-RPC messages, and invokes the respective method on the $server object and returns that method's result value:

public function run() 
{ 
    while ($msg = $this->socket->recv()) { 
        $resp = $this->handleMessage($msg); 
        $this->socket->send($resp); 
    } 
} 

As in the previous example, this method will run infinitely and will process any number of requests. Now, let's have a look at the handleMessage method:

private function handleMessage(string $req): string { 
    $json   = json_decode($req); 
    $method = [$this->server, $json->method]; 
 
    if (is_callable($method)) { 
        $result = call_user_func_array($method, $json->params ?? []); 
        return json_encode([ 
            'jsonrpc' => '2.0, 
            'id'      => $json->id, 
            'result'  => $result 
        ]); 
    } else { 
        return json_encode([ 
            'jsonrpc' => '2.0', 
            'id'      => $json->id, 
            'error'   => [ 
                'message' => 'uncallable method ' . $json->method, 
                'code'    => -32601 
            ] 
        ]); 
    } 
} 

This method checks if the $this->server object has a callable method with the same name as the method property of the JSON-RPC request. If so, this method is invoked with the request's param property as arguments and the return value is incorporated into the JSON-RPC response.

Currently, this method is still missing some basic exception handling. As a single unhandled exception, a fatal error can terminate the entire server process, so we need to be extra careful here. First, we need to make sure that the incoming message is really a valid JSON string:

private function handleMessage(string $req): string { 
    $json   = json_decode($req); 
    if (json_last_error()) {

        return json_encode([

            'jsonrpc' => '2.0',

            'id'      => null,

            'error'   => [

                'message' => 'invalid json: ' .
json_last_error_msg(),

                'code'    => -32700

            ]

        ]);

    } 
  
    // ... 
} 

Also, make sure that you catch anything that might be thrown from the actual service function. As we're working with PHP 7, remember that regular PHP errors are now also thrown, so it's important to not only catch exceptions, but errors as well. You can catch both exceptions and errors by using the Throwable interface in your catch clause:

if (is_callable($method)) { 
    try { 
        $result = call_user_func_array($method, $json->params ?? []); 
        return json_encode(/* ... */); 
    } catch (Throwable $t) {

        return json_encode([

            'jsonrpc' => '2.0',

            'id'      => $json->id,

            'error'   => [

                'message' => $t->getMessage(),

                'code'    => $t->getCode()

            ]

        ]);

    } 
} else { // ... 

You can now continue by implementing the actual service containing the inventory service business logic. As we've spent a fair amount of time with low-level protocols until now, let's recapitulate the requirements for this service: the inventory service manages the inventories of articles in stock. During the checkout process, the inventory service needs to check if the required amount of an article is in stock, and if possible, reduce the inventory amount by the given amount.

We will implement this logic in the PacktChp7InventoryInventoryService class. Note that we'll try to keep the example simple and manage our article inventories simply in-memory. In a production setup, you'd probably use a database management system for storing your article data:

namespace PacktChp7InventoryInventoryService; 
 
class InventoryService 
{ 
    private $stock = [ 
        1000 => 123, 
        1001 => 4, 
        1002 => 12 
    ]; 
 
    public function checkArticle(int $articleNumber, int $amount = 1): bool 
    { 
        if (!array_key_exists($articleNumber, $this->stock)) { 
            return false; 
        } 
        return $this->stock[$articleNumber] >= $amount; 
    } 
 
    public function takeArticle(int $articleNumber, int $amount = 1): bool 
    { 
        if (!$this->checkArticle($articleNumber, $amount) { 
            return false; 
        } 
 
        $this->stock[$articleNumber] -= $amount; 
        return true; 
    } 
} 

In this example, we're starting off with three articles with the article numbers 1000 to 1002. The checkArticle function tests if the required amount of a given article is in stock. The takeArticle function attempts to reduce the amount of articles by the required amount, if possible. If this was successful, the function returns true. If the required amount is not in stock, or the article is not known at all, the function will return false.

We now have a class that implements a JSON-RPC server and another class containing the actual business logic for our inventory service. We can now put both of these classes together in our server.php file:

$args = getopt('p:', ['port=']); 
$ctx = new ZMQContext(); 
 
$port = $args['p'] ?? $args['port'] ?? 5557; 
$addr = 'tcp://*:' . $port; 
 
$sock = $ctx->getSocket(ZMQ::SOCKET_REP); 
$sock->bind($addr); 
 
$service = new PacktChp7InventoryInventoryService();

$server = new PacktChp7InventoryJsonRpcServer($sock, $service);

$server->run();

To test this service, at least until you have a first version of the checkout service up and running, you can adjust the client.php script that you've created in the previous section to also send and receive JSON-RPC messages:

// ... 
 
$msg = [ 
    'jsonrpc' => '2.0', 
    'method'  => 'takeArticle', 
    'params'  => [1001, 2] 
]; 
 
$sock->send(json_encode($msg)); 
$response = json_decode($sock->recv()); 
 
if (isset($response->error)) { 
    // handle error... 
} else { 
    $success = $reponse->result; 
    var_dump($success); 
} 

Each call of this script will remove two items of article #1001 from your inventory. In our example, we're working with a locally managed inventory that is always initialized with four items of this article, so the first two invocations of the client.php script will return true as a result, and all subsequent invocations will return false.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.36.38