Manipulating data streams by combining iteratees, enumerators, and enumeratees

Well, that's enough concepts for now. Let's put this in practice by implementing a new feature in our shop application: an auction room.

The idea is to allow multiple users to bid for an item of the shop in a room where all connected users instantly see the bids of others, as depicted in the following screenshot:

Manipulating data streams by combining iteratees, enumerators, and enumeratees

An auction room for the item Play Framework Essentials. The prices offered by Alice and Bob are followed by the form at the bottom where users can make new offers

Unidirectional streaming with server-sent events

In order to instantly send a notification to all the users of an auction room when one makes a bid, we have to provide an HTTP endpoint streaming these events. We can achieve this using server-sent events. Thus, we have at least two endpoints: one to make a bid and one to get the stream of bid notifications. We actually need a third endpoint to get the HTML page showing an auction room (illustrated in the previous screenshot).

Preparing the ground

Let's create an Auctions controller and implement these three endpoints:

package controllers
import play.api.mvc.{Action, Controller}

object Auctions extends Controller {

  /* Show an auction room for an item of the shop */
  def room(id: Long) = Action {
    models.Shop.get(id) match {
      case Some(item) => Ok(views.html.auctionRoom(item))
      case None => NotFound
    }
  }

  /* Make a bid for an item */
  def bid(id: Long) = Action { NotImplemented }

  /* Get a stream of bid notifications for an item */
  def notifications(id: Long) = Action { NotImplemented }

}

The Java implementation of the controller is the following:

package controllers;
import models.Item;
import static models.Shop.Shop;
import play.mvc.Controller;
import play.mvc.Result;

public class Auctions extends Controller {

    /* Show an auction room for an item of the shop */
    public static Result room(Long id) {
        Item item = Shop.get(id);
        if (item != null) {
            return ok(views.html.auctionRoom.render(item));
        } else return notFound();
    }

    /* Make a bid for an item */
    public static Result bid(Long id) {
        return status(NOT_IMPLEMENTED);
    }

    /* Get a stream of bid notifications for an item */
    public static Result notifications(Long id) {
        return status(NOT_IMPLEMENTED);
    }
}

Now, we will define the routes for the actions we just defined earlier:

GET  /items/:id/auction      controllers.Auctions.room(id: Long)
POST /items/:id/auction      controllers.Auctions.bid(id: Long)
GET  /items/:id/auction/notificationscontrollers.Auctions.notifications(id: Long)

The room action is straightforwardly implemented; it retrieves the item in the shop and renders an HTML template showing the auction room. This template could be as simple as the following code:

@(item: models.Item)
@layout {
  <h2>Auction room</h2>
  <p><strong>@item.name</strong>: @(f"${item.price}%.2f")&nbsp;€</p>
  <div id="auction-room"></div>
  <script src="@routes.Assets.versioned("javascripts/auction.js")">
  </script>
}

This template only shows the item and its starting price. The display of the bids and the bid form will happen in the empty div tag with the auction-room ID and is delegated to a script auction.js. This script has to perform the following tasks:

  • Display a form allowing users to make bids and, on submission, send an Ajax request to the corresponding server endpoint
  • Get the stream of bid notifications and continuously update the room display so that users can see who has made which bid

For the sake of brevity, I will show only the part of the code that retrieves the stream of notifications using the server-sent events' API (I assume that you already know how to build DOM fragments and perform Ajax requests):

/* Handles the logic of an item's auction room */
var auctionRoom = function (item) {
  var route = routes.controllers.Auctions.notifications(item.id);
  var notifications = new EventSource(route.url);
  notifications.addEventListener('message', function (event) {
    updateUIWithAddedBid(JSON.parse(event.data));
  });
};

This function requests the item's bid notifications and adds an event handler updating the user interface each time a bid is received.

Transforming streams of data using enumeratees

On the server side, we can represent a bid simply as a pair (String, Double), containing the name of the user making the bid and the bid price. For convenience, let's define a type alias Bid for this type:

type Bid = (String, Double)

The stream of an item bid can then be represented with an Enumerator[Bid] object in the Scala API. Let's assume that we have a function AuctionRooms.notifications, in the service layer, which takes an item ID as the parameter and returns such an enumerator of bids. To implement the Auctions.notifications action, we need to take this stream and format each element according to the server-sent events' specification. For the purpose of transforming data streams, Play provides an abstraction called Enumeratee. If an enumerator can be thought of as a data source and an iteratee as a data sink, an enumeratee can be thought of as an adaptor that can be plugged to both enumerators and iteratees. This is described in the following diagram:

Transforming streams of data using enumeratees

The preceding diagram shows an enumeratee (in the middle) that transforms square elements into triangle elements. By combining it with an enumerator producing squares (on the top left), it gives an enumerator producing triangles. Conversely, combining it with an iteratee consuming triangles (on the top right) gives an iteratee consuming squares. It is worth noting that enumeratees can be combined together too. For instance, if we had an enumeratee transforming triangles into circles, we could combine it with the enumeratee transforming squares into triangles in order to get an enumeratee transforming squares into circles.

In our case, we have an enumerator of bids and we want to have an enumerator of server-sent events. So we need an Enumeratee[Bid, String] object that formats a bid according to the server-sent events' specification. However, server-sent events represent data as text, meaning that, on the client side, we have to parse each event data to interpret its value. In our case, I propose to serialize our bids as JSON objects containing a name field and a price field and then parse them on the client side using JSON.parse. It means that, finally, our enumeratee must first format a bid as a JSON object and then format the JSON according to the server-sent events' specification.

A simple way to implement an enumeratee is to use the Enumeratee.map function:

val bidToFormattedJson = Enumeratee.map[Bid] {
  case (name, price) => … // return something from bid
}

To combine it with an enumerator, we can use the through method:

val notifications: Enumerator[Bid] =AuctionRooms.notifications(item.id)
val formattedNotifications: Enumerator[String] =notifications.through(bidToFormattedJson)

Alternatively, the API also supports a symbolic operator &>:

val formattedNotifications = notifications &> bidToFormattedJson

So, the complete code of the Auctions.notifications action is as follows:

def notifications(id: Long) = Action {
  val notifications = AuctionRooms.notifications(id)
  Ok.chunked(notifications &> bidToFormattedJson).as(EVENT_STREAM)
}

This action retrieves the stream of bids corresponding to a given item ID and returns it after transforming it into a stream of formatted events. Note that the chunked method does not infer the response content type, that's why we explicitly have to set it.

We still need to implement the bidToFormattedJson enumeratee, but hopefully Play already provides some pieces that we can just reuse. First, we can get an Enumeratee[Bid, JsValue] object, transforming a bid into a JSON value by defining an implicit Writes[Bid] method and calling the Json.toJson[Bid] method. Secondly, we can get an Enumeratee[JsValue, String] object that formats JSON values according to the server-sent events' specification by calling the play.api.libs.EventSource[JsValue]() method. Finally, we can combine these two enumeratees to get an Enumeratee[Bid, String] object that transforms bids into JSON objects and then formats them according to the server-sent event's specification, as follows:

val bidToFormattedJson = Json.toJson[Bid].compose(EventSource())

You can also use the equivalent symbolic operator, as follows:

val bidToFormattedJson = Json.toJson[Bid] ><> EventSource()

In Java, since Play provides no equivalent to the Enumerator API, a simple way to implement something close is to use a callback-based approach.

Note

It is worth noting that some reactive programming libraries do exist in Java, such as RxJava, and that there is an ongoing initiative for establishing a standard specification for reactive programming on the JVM. You can follow this initiative at http://www.reactive-streams.org/.

In practice, it means that instead of having an AuctionRooms.notifications method returning an Enumerator[Bid]object, we have an AuctionRooms.subscribe method that takes as parameter a callback consuming a Bid object. We can use it to implement the Auctions.notifications action, as follows:

import play.libs.EventSource;
import static play.libs.EventSource.Event.event;

public static Result notifications(Long id) {
  return ok(EventSource.whenConnected(eventSource -> {
    AuctionRooms
      .subscribe(id, bid -> eventSource.send(event(Json.toJson(bid))));
  }));
}

The EventSource.whenConnected method creates an HTTP response streaming data according to the server-sent events' specification. It takes a function as parameter that itself takes an EventSource object as parameter and allows us to define when to send notifications to users. In our case, we call the AuctionRooms.subscribe method and pass it a callback that sends a notification each time a bid is made. The notification message just contains a JSON object describing the bid.

Implementing a publish/subscribe system using Akka

So far, the client-side part and the HTTP layer have been implemented; the part that remains to be implemented is the AuctionRooms service, which holds the state of the bids for each item. The specificity of this service is that besides storing data, it has to notify all the participants of an auction room each time a new bid is made. This could be achieved by using some publish/subscribe system such as Redis Pub/Sub or MongoDB-tailed cursors but integrating with these systems is out of the scope of this book. For the sake of simplicity, we will use an in-memory implementation of a publish/subscribe system using Akka (in the AuctionRooms.scala file of the app/models/ folder):

package models
import akka.actor.Actor
import play.api.libs.iteratee.Concurrent

class AuctionRoomsActor extends Actor {
  import AuctionRooms._

  var rooms = Map.empty[Long, Room]

  def lookupOrCreate(id: Long): Room = rooms.getOrElse(id, {
    val room = new Room
    rooms += id -> room
    room
  })

  def receive = {
    case Notifications(id) =>
      sender() ! lookupOrCreate(id).notifications
    case ItemBid(id, name, price) =>
      lookupOrCreate(id).addBid(name, price)
  }

  class Room {
    var bids = Map.empty[String, Double]
    val (notifications, channel) = Concurrent.broadcast[Bid]

    def addBid(name: String, price: Double): Unit = {
      if (bids.forall { case (_, p) => p < price}) {
        bids += name -> price
        channel.push(name -> price)
      }
    }
  }
}

object AuctionRooms {
  case class Notifications(id: Long)
  case class ItemBid(id: Long, name: String, price: Double)
}

The Java version is a bit different since it does not use the Enumerator API. Let's explain the previous Scala version of the AuctionRoomsActor actor before showing the Java version.

The AuctionRoomsActor actor manages the item's auction rooms, where each room (represented with the class Room) contains a map of per user bids (represented by the bids field). The Room classes also contain an enumerator streaming bid notifications (represented by the notifications field), which is obtained from the Concurrent.broadcast call. The Concurrent API, provided by Play, gives convenient functions to implement a publish/subscribe system. In our case, the broadcast method creates an enumerator paired with a channel. The channel allows us to push (or publish) elements, which are then streamed by the enumerator. The enumerator is the part that is publicly shared with the outside world, while we keep the channel for internal use, so that whenever we push data into the channel, the outside world can see it through the enumerator. We effectively push data into the channel in the addBid method, which both updates the state of the room with a new bid and publishes it into the channel, after it has checked whether the new bid is higher than all the previous bids.

Actors are event-driven. In our case, we define two events: Notifications(id) and ItemBid(id, name, price), which ask for the notifications stream of a Room class and make a bid, respectively. For convenience, let's enrich the AuctionRooms object with two methods corresponding to these events:

object AuctionRooms {

  import play.api.Play.current
  import akka.pattern.ask
  import scala.concurrent.duration.DurationInt
  implicit val timeout: akka.util.Timeout = 1.second

  private lazy val ref =
    Akka.system.actorOf(Props[AuctionRoomsActor])

  def notifications(id: Long): Future[Enumerator[Bid]] =
    (ref ? Notifications(id)).mapTo[Enumerator[Bid]]

  def bid(id: Long, name: String, price: Double): Unit =
    ref ! ItemBid(id, name, price)

  case class Notifications(id: Long)
  case class ItemBid(id: Long, name: String, price: Double)
}

The two methods, notifications and bid, provide the public (and typed) API of our actor, whose instance, ref, is kept private.

That's all for the Scala implementation of the AuctionRooms service.

The Java implementation of the AuctionRoomsActor actor is as follows (in the AuctionRoomsActor.java file in the app/models folder):

package models;
import akka.actor.UntypedActor;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class AuctionRoomsActor extends UntypedActor {

    Map<Long, Room> rooms = new HashMap<>();

    Room lookupOrCreate(Long id) {
        Room room = rooms.get(id);
        if (room == null) {
            room = new Room();
            rooms.put(id, room);
        }
        return room;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Subscribe) {
            Subscribe subscribe = (Subscribe) message;
            Room room = lookupOrCreate(subscribe.id);
            room.subscribers.add(subscribe.subscriber);
        } else if (message instanceof ItemBid) {
            ItemBid itemBid = (ItemBid) message;
            Room room = lookupOrCreate(itemBid.id);
            room.addBid(itemBid.name, itemBid.price);
        } else unhandled(message);
    }

    static class Room {
        Map<String, Double> bids = new HashMap<>();
        List<Consumer<Bid>> subscribers = new ArrayList<>();

        void addBid(String name, Double price) {
            if (bids.values().stream().allMatch(p -> p < price)) {
                bids.put(name, price);
                subscribers.forEach(subscriber -> {
                  subscriber.accept(new Bid(name, price))
                });
            }
        }
    }

    static class Subscribe {
        public final Long id;
        public final Consumer<Bid> subscriber;

        public Subscribe(Long id, Consumer<Bid> subscriber) {
            this.id = id;
            this.subscriber = subscriber;
        }
    }

    static class ItemBid {
        public final Long id;
        public final String name;
        public final Double price;

        public ItemBid(Long id, String name, Double price) {
            this.id = id;
            this.name = name;
            this.price = price;
        }
    }

    public static class Bid {
        public String name;
        public Double price;

        public Bid() {}

        public Bid(String name, Double price) {
            this.name = name;
            this.price = price;
        }
    }
}

The architecture is similar to the Scala version; the AuctionRoomsActor actor manages the item's auction rooms, represented by the Room class, which contains a map of per user bids and a list of notification subscribers. The addBid method adds a new bid; it makes sure that its price is higher than the previous bids, stores it, and notifies all the subscribers.

The two events that drive our actor are Subscribe(id, subscriber) and ItemBid(id, name, price), that subscribe to an auction room and make a bid, respectively. As in the Scala version, it is convenient to provide two static methods, in an AuctionRooms class, as a public API to communicate with the actor:

package models;
import play.libs.Akka;
import akka.actor.Props;

public class AuctionRooms {

  static final ActorRef ref =
    Akka.system().actorOf(Props.create(AuctionRoomsActor.class));

  public static void subscribe(Long id, Consumer<Bid> subscriber) {
    ref.tell(new Subscribe(id, subscriber), null);
  }

  public static void bid(Long id, String name, Double price) {
    ref.tell(new ItemBid(id, name, price), null);
  }
}

Actually, there is a memory leak issue with this first implementation; we register subscriptions but never remove them when users close their web browser. To fix it, we need to give subscribers a chance to cancel their subscription (the following code lives in the AuctionRooms class):

import play.libs.F;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
import static akka.patterns.Patterns.ask;

static final Timeout t = new Timeout(Duration.create(1, TimeUnit.SECONDS));

public static F.Promise<Subscription> subscribe(Long id,Consumer<Bid> subscriber) {
  return F.Promise.wrap(
    (Future)ask(ref, new Subscribe(id, subscriber), t)
  );
}

public static class Subscription {
    private final Long id;
    private final Consumer<Bid> subscriber;

    public Subscription(Long id, Consumer<Bid> subscriber) {
        this.id = id;
        this.subscriber = subscriber;
    }

    public void cancel() {
        ref.tell(new Unsubscribe(id, subscriber), null);
    }
}

static class Unsubscribe {
    public final Long id;
    public final Consumer<Bid> subscriber;

    public Unsubscribe(Long id, Consumer<Bid> subscriber) {
        this.id = id;
        this.subscriber = subscriber;
    }
}

The subscribe method now returns a Subscription (or, more precisely, a Promise<Subscription>, because message passing is asynchronous in Akka), which has a cancel method. This one sends an Unsubscribe message to the actor. Finally, the Unsubscribe message is handled by the onReceive method of the actor, as follows:

if (message instanceof Unsubscribe) {
    Unsubscribe unsubscribe = (Unsubscribe) message;
    Room room = rooms.get(unsubscribe.id);
    if (room != null) {
        room.subscribers.remove(unsubscribe.subscriber);
    }
}

The preceding code just removes the subscriber from the room.

We can leverage this mechanism to cancel our subscriptions when the connection with the client is closed. This can be achieved as follows (in the Auctions controller):

public static Result notifications(Long id) {
  return ok(EventSource.whenConnected(eventSource -> {
    AuctionRooms
        .subscribe(
            id,
            bid -> eventSource.send(event(Json.toJson(bid)))
        ).onRedeem(subscription -> {
          eventSource.onDisconnected(() -> subscription.cancel());
        });
  });
}

The preceding code registers a callback that calls the cancel method of the subscription when the client is disconnected. The onRedeem method allows us to eventually do something with the Promise<Subscription> object returned by the subscribe method; it takes a function as the parameter, which is called when the promise is redeemed, and which is supplied the redeemed value.

Note

In Scala, the Enumerator API handles the unsubscription problem for us.

Our Java and Scala implementations of the AuctionRooms service are now complete. What remains is to implement the Auctions.bid action. This can be straightforwardly achieved, as follows:

def bid(id: Long) =
  Action(parse.json(bidValidator)) { implicit request =>
    val (name, bid) = request.body
    AuctionRooms.bid(id, name, bid)
    Ok
  }

Alternatively, in Java, it can be done as follows:

@BodyParser.Of(BodyParser.Json.class)
public static Result bid(Long id) {
    Bid bid = Json.fromJson(request().body().asJson(), Bid.class);
    AuctionRooms.bid(id, bid.name, bid.price);
    return ok();
}

This implementation accepts requests whose body contains a JSON object describing a bid (that is, having a name and a price). Then, it just calls the AuctionRooms.bid function with the request data.

Our auction room's implementation is now finished! Connected users can make bids and instantly see bids made by others. There are two issues with the implementation proposed in this book, though.

First, the stream of bid notifications notifies users only when someone makes a new bid, so when a user joins an auction room, he or she do not get the bids that have been made in the past. This could be solved by changing the Auctions.notifications endpoint to prepend bids that have been made in the past to the stream of notifications.

Secondly, our implementation is not stateless since the state of the auction rooms and the list of subscribers is kept in our server's memory. This system would not scale horizontally. However, as previously mentioned, this could be solved using a publish/subscribe system separated from the server. Also, note that because Akka actors can transparently be migrated to remote locations, we could separate our actor from our server with minimal effort.

Bidirectional streaming with WebSockets

In the implementation presented earlier, making a bid is performed by an Ajax call, while bid notifications are retrieved by an event source. Alternatively, we can use a single bidirectional WebSocket endpoint to both send bids and receive notifications. On the server side, it changes the way we handle bid requests; instead of using the Auctions.bid endpoint (though this one can coexist with the WebSocket endpoint), we react to data sent by the client through the WebSocket. We obviously want to do that in a non-blocking way.

First, let's define an Auctions.channel WebSocket action and its corresponding route:

GET  /items/:id/auction/channelcontrollers.Auctions.channel(id: Long)

On the client side, we receive the notifications via the WebSocket, as follows:

var route = routes.controllers.Auctions.channel(item.id);
var ws = new WebSocket(route.webSocketURL());
ws.addEventListener('message', function (event) {
  updateUIWithAddedBid(JSON.parse(event.data));
});

Note that we obtain the corresponding route URL by calling the webSocketURL method of the route instead of using its url property.

We also use the WebSocket to send bids, instead of performing an Ajax request:

ws.send(JSON.stringify({ name: name, price: price }))

This code assumes that variables name and price contain the values of the corresponding form fields.

On the server side, the Auctions.channel action now returns a WebSocket handler instead of an Action. It needs two parameters: an iteratee defining how to process incoming data and an enumerator defining the outgoing data stream. The outgoing data stream is the same as in the server-sent events version, except that we don't need to format the events according to the server-sent events' specification; we can send them as JSON objects. The iteratee defining how incoming data is processed can be thought of as an infinite computation returning nothing and calling the AuctionRooms.bid service method for each incoming bid. This leads to the following code:

def channel(id: Long) = WebSocket.tryAccept[JsValue] { request =>
  AuctionRooms.notifications(id).map { notifications =>
    val bidsHandler = Iteratee.foreach[JsValue] { json =>
      for ((name, bid) <- json.validate(bidValidator)) {
        AuctionRooms.bid(id, name, bid)
      }
    }
    Right((bidsHandler, notifications &> Json.toJson[Bid]))
  }
}

The WebSocket.tryAccept method defines a WebSocket handler. In our case, we say that the incoming and outgoing data should be interpreted as JSON objects. The tryAccept method takes one parameter: a function that takes the current request headers as the parameter and returns either a Result (in that case the WebSocket is not created) or a pair of iteratees and enumerators defining the logic of the WebSocket handler. In our case, we always return an iteratee and an enumerator. The bidsHandler iteratee is defined using the Iteratee.foreach method; we simply forward each incoming bid to the AuctionRooms.bid service method.

The Java version is as follows:

public static WebSocket<JsonNode> channel(Long id) {
  return WebSocket.whenReady((in, out) -> {
    in.onMessage(json -> {
      Bid bid = Json.fromJson(json, Bid.class);
      AuctionRooms.bid(id, bid.name, bid.price);
    });
    AuctionRooms
        .subscribe(id, bid -> out.write(Json.toJson(bid)))
        .onRedeem(subscription -> {
          in.onClose(() -> subscription.cancel());
        });
  });
}

The WebSocket.whenReady method creates a WebSocket handler, whose logic is defined by the function it is passed as parameter. This function takes two parameters: the WebSocket's inbound and outbound. Incoming data processing logic is defined using inbound's onMessage method. In our case, we forward each bid to the AuctionRooms.bid service method. The inbound's onClose method cancels the notifications subscription when the WebSocket is closed.

Finally, note that Play also supports a way to define WebSocket using actors instead of iteratees and enumerators in Scala or callbacks in Java. In that case, incoming data is sent to the actor as messages, and outgoing messages are sent by the actor to another actor reference managed by Play. This approach can be particularly useful in Java where there is currently no reactive programming model.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.47.218