The pipeline pattern

Let's continue with the pipeline pattern. The pipeline pattern transmits data between nodes ordered in the pipeline. Data is transmitted continuously and each step of the pipeline is connected to one or more nodes. A round-robin strategy is used to transmit data between nodes. It is somewhat similar to the request-reply pattern.

The divide and conquer strategy

It is like there is no escape from a divide and conquer strategy when you do programming. Remember when you enrolled in your algorithms class and your annoying professor introduced divide and conquer using merge sort and a week later half of the class dropped the unit? We remember as well. It is a small world and here is divide and conquer, again.

Let's do something in parallel with ZeroMQ. Consider a scenario where we have a producer that generates some random numbers. We also have workers, which find the square root of those numbers with Newton's method. Then we have a collector that collects the results from the workers.

The following is our server code:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/time.h>
#include <time.h>
#include "zmq.h"

int main (int argc, char const *argv[]) {
  
  void* context = zmq_ctx_new();
  // This is the socket that we send messages.
  void* socket = zmq_socket(context, ZMQ_PUSH);
  zmq_bind(socket, "tcp://*:4040");
  
  // This is the socket that we send batch message.
  void* connector = zmq_socket(context, ZMQ_PUSH);
  zmq_connect(connector, "tcp://localhost:5050");
  
  printf("Please press enter when workers are ready...");
  getchar();
  printf("Sending tasks to workers...
");
  
  // The first message. It's also the signal start of batch.
  int length = strlen("-1");
  zmq_msg_t message;
  zmq_msg_init_size(&message, length);
  memcpy(zmq_msg_data(&message), "-1", length);
  zmq_msg_send(&message, connector, 0);
  zmq_msg_close(&message);
  
  // Generate some random numbers.
  srandom((unsigned) time(NULL));

  // Send the tasks.
  int count;
  int msec = 0;
  for(count = 0; count < 100; count++) {
    int load = (int) ((double) (100) * random () / RAND_MAX);
    msec += load;
    char string[10];
    sprintf(string, "%d", load);
  }
  printf("Total: %d msec
", msec);
  sleep(1);

  zmq_close(connector);
  zmq_close(socket);
  zmq_ctx_destroy(context);

  return 0;
}

The following is the worker code where we do some square root calculations using Newton's method:

#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "zmq.h"

double square(double x) {
  return x * x;
}

double average(double x, double y) {
  return (x + y) / 2.0;
}

double good_enough(double guess, double x) {
  return abs(square(guess) - x) < 0.000001;
}

double improve(double guess, double x) {
  return average(guess, x / guess);
}

double sqrt_inner(double guess, double x) {
  if(good_enough(guess, x))
    return guess;
  else
    return sqrt_inner(improve(guess, x), x);
}

double newton_sqrt(double x) {
  return sqrt_inner(1.0, x);
}

int main (int argc, char const *argv[]) {
  
  void* context = zmq_ctx_new();
  
  // Let's initialize a socket to receive messages.
  void* receiver = zmq_socket(context, ZMQ_PULL);
  zmq_connect(receiver, "tcp://localhost:4040");
  // Let's initialize a socket to send the messages.
  void* sender = zmq_socket(context, ZMQ_PUSH);
  zmq_connect(sender, "tcp://localhost:5050");
  
  for(;;) {
    zmq_msg_t reply;
    zmq_msg_init(&reply);
    zmq_msg_recv(&reply, receiver, 0);

    int length = zmq_msg_size(&reply);
    char* msg = malloc(length + 1);
    memcpy(msg, zmq_msg_data(&reply), length);
    zmq_msg_close(&reply);

    fflush(stdout);
    double val = atof(msg);
    printf("%.1f: %.1f
", val, newton_sqrt(val));
    
    sleep(1);
    free(msg);
    
    zmq_msg_t message;
    char* ssend = "T";
    int t_length = strlen(ssend);
    zmq_msg_init_size(&message, t_length);
    memcpy(zmq_msg_data(&message), ssend, t_length);
    zmq_msg_send(&message, receiver, 0);
    zmq_msg_close(&message);

  }
  zmq_close(receiver);
  zmq_close(sender);
  zmq_ctx_destroy(context);
  
  return 0;
}

The following is the receiver code:

#include <stdlib.h>
#include <string.h>
#include "zmq.h"
int main (int argc, char const *argv[]) {
  
  void* context = zmq_ctx_new();
  void* receiver = zmq_socket(context, ZMQ_PULL);
  zmq_bind(receiver, "tcp://*:5050");
  
  // We receive the first message and discard it since it's the
  // signal start of batch which is -1.
  zmq_msg_t reply;
  zmq_msg_init(&reply);
  zmq_msg_recv(&reply, receiver, 0);

  int length = zmq_msg_size(&reply);
  char* msg = malloc(length + 1);
  memcpy(msg, zmq_msg_data(&reply), length);
  zmq_msg_close(&reply);
  free(msg);
  
  int count;
  for(count = 0; count < 100; count++) {
    zmq_msg_t reply;
    zmq_msg_init(&reply);
    zmq_msg_recv(&reply, receiver, 0);
    
    int length = zmq_msg_size(&reply);
    char* value = malloc(length + 1);
    memcpy(value, zmq_msg_data(&reply), length);
    zmq_msg_close(&reply);
    free(value);
    if(count / 10 == 0)
      printf("10 Tasks have been processed.");
    fflush(stdout);
  }

  zmq_close(receiver);
  zmq_ctx_destroy(context);
  
  return 0;
}

The following diagram represents the code we have written so far:

The divide and conquer strategy

What we have done so far:

  • The start of the batch needs to be synchronized when the workers are up and running. As we have said earlier, the connection process takes some time. If we do not do that, then the first worker will take messages while other workers are being connected. In order to prevent this, we need to synchronize the start of the batch to run in parallel.
  • The collector's PULL socket fetches the results using the fair-queue scheduling that we described in Chapter 1, Getting Started.
  • The server's PUSH socket sends the tasks to the workers evenly.
  • The workers are connected downstream to the collector and upstream to the server. You could add more workers explicitly.

We have mentioned that workers connect to downstream to the collector and upstream to the server. Now, let's examine what this means more closely.

Let's have a look at the following code snippet from our worker code:

// Let's initialize a socket to receive messages.
  void* receiver = zmq_socket(context, ZMQ_PULL);
  zmq_connect(receiver, "tcp://localhost:4040");

The ZMQ_PULL socket

When we want to retrieve data from upstream to nodes, we use ZMQ_PULL. The ZMQ_PULL type sockets are used to receive messages from upstream nodes in the pipeline. As we have said earlier, this process is done with fair-queue scheduling.

Note

zmq_send(3) cannot be used in place of ZMQ_PULL.

The ZMQ_PUSH socket

When we want to communicate downstream with nodes, we use ZMQ_PUSH. The ZMQ_PUSH type sockets are used to send messages to downstream nodes in the pipeline.

ZMQ_PUSH never discards messages. If a high watermark is reached for downstream nodes, or if there are no downstream nodes available, all messages sent with zmq_send(3) are blocked until there is an available downstream node to receive messages.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.59.61.119