Bidirectional streaming with GRPC

The main advantage of GRPC over traditional HTTP/1.1 is that it uses a single TCP connection for sending and receiving multiple messages between the server and the client. We saw the example of a money transaction before. Another real-world use case is a GPS installed in a taxi. Here, the taxi is the client that sends its geographical points to the server along its route. Finally, the server can calculate the total fare amount depending on the time spent between points and the total distance. 

Another such use case is when a server needs to notify the client whenever some processing is performed. This is called a server push model. The server can send a stream of results back when a client asked for them only once. This is different to polling, where the client requests something each and every time. This can be useful when there are a series of time-taking steps that need to be done. The GRPC client can escalate that job to the GRPC server. Then, the server takes its time and relays the message back to the client, which reads them and does something useful. Let us implement this.

This concept is similar to WebSockets, but between any type of platform. Create a project called serverPush:

mkdir $GOPATH/src/github.com/narenaryan/serverPush
mkdir $GOPATH/src/github.com/narenaryan/serverPush/datafiles

Now, write in datafiles a protocol buffer that is similar to the previous one:

syntax = "proto3";
package datafiles;

message TransactionRequest {
   string from = 1;
   string to = 2;
   float amount = 3;
}

message TransactionResponse {
  string status = 1;
  int32 step = 2;
  string description = 3;
}

service MoneyTransaction {
    rpc MakeTransaction(TransactionRequest) returns (stream TransactionResponse) {}
}

We have two messages and one service defined in the protocol buffer file. The exciting part is in the service; we are returning a stream instead of a plain response:

rpc MakeTransaction(TransactionRequest) returns (stream TransactionResponse) {}

The use case of this project is: the client sends a money transfer request to the server, the server does a few tasks and sends those step details as a stream of responses back to the server. Now, let us compile that proto file:

protoc -I datafiles/ datafiles/transaction.proto --go_out=plugins=grpc:datafiles

This creates a new file called transaction.pb.go in the datafiles directory. We use the definitions in this file in our server and client programs, which we will create shortly. Now, let us write the GRPC server code. This code is a bit different compared to the previous example because of the introduction of streams:

mkdir $GOPATH/src/github.com/narenaryan/serverPush/grpcServer
vi $GOPATH/src/github.com/narenaryan/serverPush/grpcServer/server.go

Now, add this program to the file:

package main

import (
  "fmt"
  "log"
  "net"
  "time"

  pb "github.com/narenaryan/serverPush/datafiles"
  "google.golang.org/grpc"
  "google.golang.org/grpc/reflection"
)

const (
  port      = ":50051"
  noOfSteps = 3
)

// server is used to create MoneyTransactionServer.
type server struct{}

// MakeTransaction implements MoneyTransactionServer.MakeTransaction
func (s *server) MakeTransaction(in *pb.TransactionRequest, stream pb.MoneyTransaction_MakeTransactionServer) error {
  log.Printf("Got request for money transfer....")
  log.Printf("Amount: $%f, From A/c:%s, To A/c:%s", in.Amount, in.From, in.To)
  // Send streams here
  for i := 0; i < noOfSteps; i++ {
    // Simulating I/O or Computation process using sleep........
    // Usually this will be saving money transfer details in DB or
    // talk to the third party API
    time.Sleep(time.Second * 2)
    // Once task is done, send the successful message back to the client
    if err := stream.Send(&pb.TransactionResponse{Status: "good",
      Step:        int32(i),
      Description: fmt.Sprintf("Description of step %d", int32(i))}); err != nil {
      log.Fatalf("%v.Send(%v) = %v", stream, "status", err)
    }
  }
  log.Printf("Successfully transfered amount $%v from %v to %v", in.Amount, in.From, in.To)
  return nil
}

func main() {
  lis, err := net.Listen("tcp", port)
  if err != nil {
    log.Fatalf("Failed to listen: %v", err)
  }
  // Create a new GRPC Server
  s := grpc.NewServer()
  // Register it with Proto service
  pb.RegisterMoneyTransactionServer(s, &server{})
  // Register reflection service on gRPC server.
  reflection.Register(s)
  if err := s.Serve(lis); err != nil {
    log.Fatalf("Failed to serve: %v", err)
  }
}

MakeTransaction is the function that interests us. It takes a request and a stream as its arguments. In the function, we are looping through the number of steps (here, it is three), and performing the computation. The server is simulating the mock I/O or computation using the time.Sleep function: 

stream.Send()

This function sends a stream response from the server to the client. Now, let us compose the client program. This is also a bit different to the basic GRPC client that we saw in the preceding code. Create a new directory for the client program:

mkdir $GOPATH/src/github.com/narenaryan/serverPush/grpcClient
vi $GOPATH/src/github.com/narenaryan/serverPush/grpcClient/cilent.go

Now, start writing the client logic in that file:

package main

import (
  "io"
  "log"

  pb "github.com/narenaryan/serverPush/datafiles"
  "golang.org/x/net/context"
  "google.golang.org/grpc"
)

const (
  address = "localhost:50051"
)

// ReceiveStream listens to the stream contents and use them
func ReceiveStream(client pb.MoneyTransactionClient, request *pb.TransactionRequest) {
  log.Println("Started listening to the server stream!")
  stream, err := client.MakeTransaction(context.Background(), request)
  if err != nil {
    log.Fatalf("%v.MakeTransaction(_) = _, %v", client, err)
  }
  // Listen to the stream of messages
  for {
    response, err := stream.Recv()
    if err == io.EOF {
      // If there are no more messages, get out of loop
      break
    }
    if err != nil {
      log.Fatalf("%v.MakeTransaction(_) = _, %v", client, err)
    }
    log.Printf("Status: %v, Operation: %v", response.Status, response.Description)
  }
}

func main() {
  // Set up a connection to the server.
  conn, err := grpc.Dial(address, grpc.WithInsecure())
  if err != nil {
    log.Fatalf("Did not connect: %v", err)
  }
  defer conn.Close()
  client := pb.NewMoneyTransactionClient(conn)

  // Prepare data. Get this from clients like Front-end or Android App
  from := "1234"
  to := "5678"
  amount := float32(1250.75)

  // Contact the server and print out its response.
  ReceiveStream(client, &pb.TransactionRequest{From: from,
    To: to, Amount: amount})
}

Here, ReceiveStream is the custom function we wrote for the sake of sending a request and receiving a stream of messages. It takes two arguments: MoneyTransactionClient and TransactionRequest. It uses the first argument to create a stream and starts listening to it. Whenever the server exhausts all the messages, the client will stop listening and terminate. Then, an io.EOF error will be returned if the client tries to receive messages. We are logging the responses collected from the GRPC server. The second argument, TransactionRequest, is used to send the request to the server for the first time. Now, running it will make it more clear to us. On terminal one, run the GRPC server:

go run $GOPATH/src/github.com/narenaryan/serverPush/grpcServer/server.go

It will keep on listening for incoming requests. Now, run the client on the second terminal to see the action:

go run $GOPATH/src/github.com/narenaryan/serverPush/grpcClient/client.go

This outputs the following to the console:

2017/07/16 15:08:15 Started listening to the server stream!
2017/07/16 15:08:17 Status: good, Operation: Description of step 0
2017/07/16 15:08:19 Status: good, Operation: Description of step 1
2017/07/16 15:08:21 Status: good, Operation: Description of step 2

At the same time, the server also logs its own messages on terminal one:

2017/07/16 15:08:15 Got request for money Transfer....
2017/07/16 15:08:15 Amount: $1250.750000, From A/c:1234, To A/c:5678
2017/07/16 15:08:21 Successfully transfered amount $1250.75 from 1234 to 5678

This process happens in sync with the server. The client stays alive until all the streaming messages are sent back. The server can handle any number of clients at a given time. Every client request is considered as an individual entity. This is an example of the server sending a stream of responses. There are other cases that can also be implemented with protocol buffers and GRPC:

  • The client sending streamed requests to get one final response from the server
  • The client and server are both sending streamed requests and responses at the same time
The official GRPC team has provided a nice example of routing a taxi on GitHub. You can take a look at it to learn more about the functioning of bidirectional streams at
https://github.com/grpc/grpc-go/tree/master/examples/route_guide.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.190.211