Let's continue with the pipeline pattern. The pipeline pattern transmits data between nodes ordered in the pipeline. Data is transmitted continuously and each step of the pipeline is connected to one or more nodes. A round-robin strategy is used to transmit data between nodes. It is somewhat similar to the request-reply pattern.
It is like there is no escape from a divide and conquer strategy when you do programming. Remember when you enrolled in your algorithms class and your annoying professor introduced divide and conquer using merge sort and a week later half of the class dropped the unit? We remember as well. It is a small world and here is divide and conquer, again.
Let's do something in parallel with ZeroMQ. Consider a scenario where we have a producer that generates some random numbers. We also have workers, which find the square root of those numbers with Newton's method. Then we have a collector that collects the results from the workers.
The following is our server code:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/time.h> #include <time.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); // This is the socket that we send messages. void* socket = zmq_socket(context, ZMQ_PUSH); zmq_bind(socket, "tcp://*:4040"); // This is the socket that we send batch message. void* connector = zmq_socket(context, ZMQ_PUSH); zmq_connect(connector, "tcp://localhost:5050"); printf("Please press enter when workers are ready..."); getchar(); printf("Sending tasks to workers... "); // The first message. It's also the signal start of batch. int length = strlen("-1"); zmq_msg_t message; zmq_msg_init_size(&message, length); memcpy(zmq_msg_data(&message), "-1", length); zmq_msg_send(&message, connector, 0); zmq_msg_close(&message); // Generate some random numbers. srandom((unsigned) time(NULL)); // Send the tasks. int count; int msec = 0; for(count = 0; count < 100; count++) { int load = (int) ((double) (100) * random () / RAND_MAX); msec += load; char string[10]; sprintf(string, "%d", load); } printf("Total: %d msec ", msec); sleep(1); zmq_close(connector); zmq_close(socket); zmq_ctx_destroy(context); return 0; }
The following is the worker code where we do some square root calculations using Newton's method:
#include <stdlib.h> #include <string.h> #include <unistd.h> #include "zmq.h" double square(double x) { return x * x; } double average(double x, double y) { return (x + y) / 2.0; } double good_enough(double guess, double x) { return abs(square(guess) - x) < 0.000001; } double improve(double guess, double x) { return average(guess, x / guess); } double sqrt_inner(double guess, double x) { if(good_enough(guess, x)) return guess; else return sqrt_inner(improve(guess, x), x); } double newton_sqrt(double x) { return sqrt_inner(1.0, x); } int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); // Let's initialize a socket to receive messages. void* receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:4040"); // Let's initialize a socket to send the messages. void* sender = zmq_socket(context, ZMQ_PUSH); zmq_connect(sender, "tcp://localhost:5050"); for(;;) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* msg = malloc(length + 1); memcpy(msg, zmq_msg_data(&reply), length); zmq_msg_close(&reply); fflush(stdout); double val = atof(msg); printf("%.1f: %.1f ", val, newton_sqrt(val)); sleep(1); free(msg); zmq_msg_t message; char* ssend = "T"; int t_length = strlen(ssend); zmq_msg_init_size(&message, t_length); memcpy(zmq_msg_data(&message), ssend, t_length); zmq_msg_send(&message, receiver, 0); zmq_msg_close(&message); } zmq_close(receiver); zmq_close(sender); zmq_ctx_destroy(context); return 0; }
The following is the receiver code:
#include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* receiver = zmq_socket(context, ZMQ_PULL); zmq_bind(receiver, "tcp://*:5050"); // We receive the first message and discard it since it's the // signal start of batch which is -1. zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* msg = malloc(length + 1); memcpy(msg, zmq_msg_data(&reply), length); zmq_msg_close(&reply); free(msg); int count; for(count = 0; count < 100; count++) { zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, receiver, 0); int length = zmq_msg_size(&reply); char* value = malloc(length + 1); memcpy(value, zmq_msg_data(&reply), length); zmq_msg_close(&reply); free(value); if(count / 10 == 0) printf("10 Tasks have been processed."); fflush(stdout); } zmq_close(receiver); zmq_ctx_destroy(context); return 0; }
The following diagram represents the code we have written so far:
What we have done so far:
We have mentioned that workers connect to downstream to the collector and upstream to the server. Now, let's examine what this means more closely.
Let's have a look at the following code snippet from our worker code:
// Let's initialize a socket to receive messages.
void* receiver = zmq_socket(context, ZMQ_PULL);
zmq_connect(receiver, "tcp://localhost:4040");
When we want to retrieve data from upstream to nodes, we use ZMQ_PULL
. The ZMQ_PULL
type sockets are used to receive messages from upstream nodes in the pipeline. As we have said earlier, this process is done with fair-queue scheduling.
When we want to communicate downstream with nodes, we use ZMQ_PUSH
. The ZMQ_PUSH
type sockets are used to send messages to downstream nodes in the pipeline.
ZMQ_PUSH
never discards messages. If a high watermark is reached for downstream nodes, or if there are no downstream nodes available, all messages sent with zmq_send(3)
are blocked until there is an available downstream node to receive messages.
13.59.61.119