Making the inventory service multithreaded

Currently, the inventory service works in a single thread, and with a blocking socket. This means that it can handle only one request at a time; if a new request is received while other requests are being processed, the client will have to wait until all previous requests have finished processing. Obviously, this does not scale very well.

In order to implement a server that can handle multiple requests in parallel, you can employ ZeroMQ's ROUTER/DEALER pattern. A ROUTER is a special kind of ZeroMQ socket that behaves very much like a regular REP socket, with the only difference being that multiple REQ sockets can connect to in parallel. Likewise, a DEALER socket is another kind of socket that is similar to an REQ socket, only that it can be connected to multiple REP sockets. This allows you to construct a load balancer that simply consists of one ROUTER and one DEALER socket that pipes packages from a set of multiple clients to a set of multiple servers.

Making the inventory service multithreaded

The ROUTER/DEALER pattern

As PHP does not support multithreading (at least, not very well), we will resort to using multiple processes in this example. Our multithreaded server will consist of one master process that handles the ROUTER and DEALER sockets, and multiple worker processes that each work with one REP socket. To implement this, you can fork a number of worker processes using the pcntl_fork function.

Tip

For the pcntl_fork function to work, you need the pcntl extension enabled. In nearly all distributions, this extension is enabled by default; in the Dockerfile that you have built in the previous section, it is also explicitly installed. If you compiled PHP yourself, you will need the -- enable-pcntl flag when calling the configure script.

In this example, our inventory service will consist of multiple ZeroMQ sockets: first a multitude of worker processes, each listening on a RES socket that responds to requests, and a master process with each ROUTER and DEALER socket that accepts and dispatches these requests. Only the ROUTER socket will be visible to outside services and reachable via TCP; for all other sockets, we will use UNIX sockets for communicating - they are faster and not reachable via network.

Start by implementing a worker function; create a new file called server_multithreaded.php for this:

require 'vendor/autoload.php'; 
 
use PacktChp7InventoryInventoryService; 
use PacktChp7InventoryJsonRpcServer; 
 
function worker() 
{ 
    $ctx = new ZMQContext(); 
 
    $sock = $ctx->getSocket(ZMQ::SOCKET_REP); 
    $sock->connect('ipc://workers.ipc'); 
 
    $service = new InventoryService(); 
 
    $server = new JsonRpcServer($sock, $service); 
    $server->run(); 
} 

The worker() function creates a new REP socket and connects this socket to the UNIX socket ipc://workers.ipc (this will be created by the master process later). It then runs the usual JsonRpcServer that you've already worked with before.

You can now start any number (in this case, four) of these worker processes using the pcntl_fork function:

for ($i = 0; $i < 4; $i ++) { 
    $pid = pcntl_fork(); 
    if ($pid == 0) { 
        worker($i); 
        exit(); 
    } 
} 

In case you're not familiar with the fork function: it duplicates the currently running process. The forked process will continue to run at the same code location at which it was forked. However, in the parent process, the return value of pcntl_fork() will return the process ID of the newly created process. However, within the new process, this value will be 0. In this case, the child processes now become our worker processes and the actual master process will pass the loop without exiting.

After this, you can start the actual load balancer by creating a ROUTER and a DEALER socket:

$args = getopt('p:', ['port=']); 
$ctx = new ZMQContext(); 
 
$port = $args['p'] ?? $args['port'] ?? 5557; 
$addr = 'tcp://*:' . $port; 
 
$ctx = new ZMQContext(); 
 
//  Socket to talk to clients 
$clients = $ctx->getSocket(ZMQ::SOCKET_ROUTER); 
$clients->bind($addr); 
 
//  Socket to talk to workers 
$workers = $ctx->getSocket(ZMQ::SOCKET_DEALER); 
$workers->bind("ipc://workers.ipc"); 

The ROUTER socket is bound to the actual network address at which the service is intended to be reachable (in this case, a TCP socket, allowing the service to be reached via a network). The DEALER socket, on the other hand, is bound to a local UNIX socket that will not be exposed to the outside world. The only purpose of the UNIX socket ipc://workers.ipc is that the worker processes can connect their REP sockets to it.

After having created both the ROUTER and the DEALER socket, you can use the ZMQDevice class to pipe incoming packages from the ROUTER socket to the DEALER socket, which will then distribute equally to all connected REP sockets. Response packages that are sent back from the REP sockets will also be dispatched back to the original clients:

//  Connect work threads to client threads via a queue 
$device = new ZMQDevice($clients, $workers); 
$device->run(); 

Changing the inventory service this way does not require any modification of the client code; the ROUTER socket that the load balancer is listening on behaves very much like a REP socket, and any REQ socket can connect to it in exactly the same way.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.135.196.172