Well, that's enough concepts for now. Let's put this in practice by implementing a new feature in our shop application: an auction room.
The idea is to allow multiple users to bid for an item of the shop in a room where all connected users instantly see the bids of others, as depicted in the following screenshot:
In order to instantly send a notification to all the users of an auction room when one makes a bid, we have to provide an HTTP endpoint streaming these events. We can achieve this using server-sent events. Thus, we have at least two endpoints: one to make a bid and one to get the stream of bid notifications. We actually need a third endpoint to get the HTML page showing an auction room (illustrated in the previous screenshot).
Let's create an Auctions
controller and implement these three endpoints:
package controllers import play.api.mvc.{Action, Controller} object Auctions extends Controller { /* Show an auction room for an item of the shop */ def room(id: Long) = Action { models.Shop.get(id) match { case Some(item) => Ok(views.html.auctionRoom(item)) case None => NotFound } } /* Make a bid for an item */ def bid(id: Long) = Action { NotImplemented } /* Get a stream of bid notifications for an item */ def notifications(id: Long) = Action { NotImplemented } }
The Java implementation of the controller is the following:
package controllers; import models.Item; import static models.Shop.Shop; import play.mvc.Controller; import play.mvc.Result; public class Auctions extends Controller { /* Show an auction room for an item of the shop */ public static Result room(Long id) { Item item = Shop.get(id); if (item != null) { return ok(views.html.auctionRoom.render(item)); } else return notFound(); } /* Make a bid for an item */ public static Result bid(Long id) { return status(NOT_IMPLEMENTED); } /* Get a stream of bid notifications for an item */ public static Result notifications(Long id) { return status(NOT_IMPLEMENTED); } }
Now, we will define the routes for the actions we just defined earlier:
GET /items/:id/auction controllers.Auctions.room(id: Long) POST /items/:id/auction controllers.Auctions.bid(id: Long) GET /items/:id/auction/notificationscontrollers.Auctions.notifications(id: Long)
The room
action is straightforwardly implemented; it retrieves the item in the shop and renders an HTML template showing the auction room. This template could be as simple as the following code:
@(item: models.Item) @layout { <h2>Auction room</h2> <p><strong>@item.name</strong>: @(f"${item.price}%.2f") €</p> <div id="auction-room"></div> <script src="@routes.Assets.versioned("javascripts/auction.js")"> </script> }
This template only shows the item and its starting price. The display of the bids and the bid form will happen in the empty div
tag with the auction-room
ID and is delegated to a script auction.js
. This script has to perform the following tasks:
For the sake of brevity, I will show only the part of the code that retrieves the stream of notifications using the server-sent events' API (I assume that you already know how to build DOM fragments and perform Ajax requests):
/* Handles the logic of an item's auction room */
var auctionRoom = function (item) {
var route = routes.controllers.Auctions.notifications(item.id);
var notifications = new EventSource(route.url);
notifications.addEventListener('message', function (event) {
updateUIWithAddedBid(JSON.parse(event.data));
});
};
This function requests the item's bid notifications and adds an event handler updating the user interface each time a bid is received.
On the server side, we can represent a bid simply as a pair (String, Double)
, containing the name of the user making the bid and the bid price. For convenience, let's define a type alias Bid
for this type:
type Bid = (String, Double)
The stream of an item bid can then be represented with an Enumerator[Bid]
object in the Scala API. Let's assume that we have a function AuctionRooms.notifications
, in the service layer, which takes an item ID as the parameter and returns such an enumerator of bids. To implement the Auctions.notifications
action, we need to take this stream and format each element according to the server-sent events' specification. For the purpose of transforming data streams, Play provides an abstraction called Enumeratee
. If an enumerator can be thought of as a data source and an iteratee as a data sink, an enumeratee can be thought of as an adaptor that can be plugged to both enumerators and iteratees. This is described in the following diagram:
The preceding diagram shows an enumeratee (in the middle) that transforms square elements into triangle elements. By combining it with an enumerator producing squares (on the top left), it gives an enumerator producing triangles. Conversely, combining it with an iteratee consuming triangles (on the top right) gives an iteratee consuming squares. It is worth noting that enumeratees can be combined together too. For instance, if we had an enumeratee transforming triangles into circles, we could combine it with the enumeratee transforming squares into triangles in order to get an enumeratee transforming squares into circles.
In our case, we have an enumerator of bids and we want to have an enumerator of server-sent events. So we need an Enumeratee[Bid, String]
object that formats a bid according to the server-sent events' specification. However, server-sent events represent data as text, meaning that, on the client side, we have to parse each event data to interpret its value. In our case, I propose to serialize our bids as JSON objects containing a name
field and a price
field and then parse them on the client side using JSON.parse
. It means that, finally, our enumeratee must first format a bid as a JSON object and then format the JSON according to the server-sent events' specification.
A simple way to implement an enumeratee is to use the Enumeratee.map
function:
val bidToFormattedJson = Enumeratee.map[Bid] {
case (name, price) => … // return something from bid
}
To combine it with an enumerator, we can use the through
method:
val notifications: Enumerator[Bid] =AuctionRooms.notifications(item.id)
val formattedNotifications: Enumerator[String] =notifications.through(bidToFormattedJson)
Alternatively, the API also supports a symbolic operator &>
:
val formattedNotifications = notifications &> bidToFormattedJson
So, the complete code of the Auctions.notifications
action is as follows:
def notifications(id: Long) = Action {
val notifications = AuctionRooms.notifications(id)
Ok.chunked(notifications &> bidToFormattedJson).as(EVENT_STREAM)
}
This action retrieves the stream of bids corresponding to a given item ID and returns it after transforming it into a stream of formatted events. Note that the chunked
method does not infer the response content type, that's why we explicitly have to set it.
We still need to implement the bidToFormattedJson
enumeratee, but hopefully Play already provides some pieces that we can just reuse. First, we can get an Enumeratee[Bid, JsValue]
object, transforming a bid into a JSON value by defining an implicit Writes[Bid]
method and calling the Json.toJson[Bid]
method. Secondly, we can get an Enumeratee[JsValue, String]
object that formats JSON values according to the server-sent events' specification by calling the play.api.libs.EventSource[JsValue]()
method. Finally, we can combine these two enumeratees to get an Enumeratee[Bid, String]
object that transforms bids into JSON objects and then formats them according to the server-sent event's specification, as follows:
val bidToFormattedJson = Json.toJson[Bid].compose(EventSource())
You can also use the equivalent symbolic operator, as follows:
val bidToFormattedJson = Json.toJson[Bid] ><> EventSource()
In Java, since Play provides no equivalent to the Enumerator
API, a simple way to implement something close is to use a callback-based approach.
It is worth noting that some reactive programming libraries do exist in Java, such as RxJava, and that there is an ongoing initiative for establishing a standard specification for reactive programming on the JVM. You can follow this initiative at http://www.reactive-streams.org/.
In practice, it means that instead of having an AuctionRooms.notifications
method returning an Enumerator[Bid]
object, we have an AuctionRooms.subscribe
method that takes as parameter a callback consuming a Bid
object. We can use it to implement the Auctions.notifications
action, as follows:
import play.libs.EventSource; import static play.libs.EventSource.Event.event; public static Result notifications(Long id) { return ok(EventSource.whenConnected(eventSource -> { AuctionRooms .subscribe(id, bid -> eventSource.send(event(Json.toJson(bid)))); })); }
The EventSource.whenConnected
method creates an HTTP response streaming data according to the server-sent events' specification. It takes a function as parameter that itself takes an EventSource
object as parameter and allows us to define when to send notifications to users. In our case, we call the AuctionRooms.subscribe
method and pass it a callback that sends a notification each time a bid is made. The notification message just contains a JSON object describing the bid.
So far, the client-side part and the HTTP layer have been implemented; the part that remains to be implemented is the AuctionRooms
service, which holds the state of the bids for each item. The specificity of this service is that besides storing data, it has to notify all the participants of an auction room each time a new bid is made. This could be achieved by using some publish/subscribe system such as Redis Pub/Sub or MongoDB-tailed cursors but integrating with these systems is out of the scope of this book. For the sake of simplicity, we will use an in-memory implementation of a publish/subscribe system using Akka (in the AuctionRooms.scala
file of the app/models/
folder):
package models import akka.actor.Actor import play.api.libs.iteratee.Concurrent class AuctionRoomsActor extends Actor { import AuctionRooms._ var rooms = Map.empty[Long, Room] def lookupOrCreate(id: Long): Room = rooms.getOrElse(id, { val room = new Room rooms += id -> room room }) def receive = { case Notifications(id) => sender() ! lookupOrCreate(id).notifications case ItemBid(id, name, price) => lookupOrCreate(id).addBid(name, price) } class Room { var bids = Map.empty[String, Double] val (notifications, channel) = Concurrent.broadcast[Bid] def addBid(name: String, price: Double): Unit = { if (bids.forall { case (_, p) => p < price}) { bids += name -> price channel.push(name -> price) } } } } object AuctionRooms { case class Notifications(id: Long) case class ItemBid(id: Long, name: String, price: Double) }
The Java version is a bit different since it does not use the Enumerator
API. Let's explain the previous Scala version of the AuctionRoomsActor
actor before showing the Java version.
The AuctionRoomsActor
actor manages the item's auction rooms, where each room (represented with the class Room
) contains a map of per user bids (represented by the bids
field). The Room
classes also contain an enumerator streaming bid notifications (represented by the notifications
field), which is obtained from the Concurrent.broadcast
call. The Concurrent
API, provided by Play, gives convenient functions to implement a publish/subscribe system. In our case, the broadcast
method creates an enumerator paired with a channel. The channel allows us to push (or publish) elements, which are then streamed by the enumerator. The enumerator is the part that is publicly shared with the outside world, while we keep the channel for internal use, so that whenever we push data into the channel, the outside world can see it through the enumerator. We effectively push data into the channel in the addBid
method, which both updates the state of the room with a new bid and publishes it into the channel, after it has checked whether the new bid is higher than all the previous bids.
Actors are event-driven. In our case, we define two events: Notifications(id)
and ItemBid(id, name, price)
, which ask for the notifications stream of a Room
class and make a bid, respectively. For convenience, let's enrich the AuctionRooms
object with two methods corresponding to these events:
object AuctionRooms { import play.api.Play.current import akka.pattern.ask import scala.concurrent.duration.DurationInt implicit val timeout: akka.util.Timeout = 1.second private lazy val ref = Akka.system.actorOf(Props[AuctionRoomsActor]) def notifications(id: Long): Future[Enumerator[Bid]] = (ref ? Notifications(id)).mapTo[Enumerator[Bid]] def bid(id: Long, name: String, price: Double): Unit = ref ! ItemBid(id, name, price) case class Notifications(id: Long) case class ItemBid(id: Long, name: String, price: Double) }
The two methods, notifications
and bid
, provide the public (and typed) API of our actor, whose instance, ref
, is kept private.
That's all for the Scala implementation of the AuctionRooms
service.
The Java implementation of the AuctionRoomsActor
actor is as follows (in the AuctionRoomsActor.java
file in the app/models
folder):
package models; import akka.actor.UntypedActor; import java.util.function.Consumer; import java.util.stream.Collectors; public class AuctionRoomsActor extends UntypedActor { Map<Long, Room> rooms = new HashMap<>(); Room lookupOrCreate(Long id) { Room room = rooms.get(id); if (room == null) { room = new Room(); rooms.put(id, room); } return room; } @Override public void onReceive(Object message) throws Exception { if (message instanceof Subscribe) { Subscribe subscribe = (Subscribe) message; Room room = lookupOrCreate(subscribe.id); room.subscribers.add(subscribe.subscriber); } else if (message instanceof ItemBid) { ItemBid itemBid = (ItemBid) message; Room room = lookupOrCreate(itemBid.id); room.addBid(itemBid.name, itemBid.price); } else unhandled(message); } static class Room { Map<String, Double> bids = new HashMap<>(); List<Consumer<Bid>> subscribers = new ArrayList<>(); void addBid(String name, Double price) { if (bids.values().stream().allMatch(p -> p < price)) { bids.put(name, price); subscribers.forEach(subscriber -> { subscriber.accept(new Bid(name, price)) }); } } } static class Subscribe { public final Long id; public final Consumer<Bid> subscriber; public Subscribe(Long id, Consumer<Bid> subscriber) { this.id = id; this.subscriber = subscriber; } } static class ItemBid { public final Long id; public final String name; public final Double price; public ItemBid(Long id, String name, Double price) { this.id = id; this.name = name; this.price = price; } } public static class Bid { public String name; public Double price; public Bid() {} public Bid(String name, Double price) { this.name = name; this.price = price; } } }
The architecture is similar to the Scala version; the AuctionRoomsActor
actor manages the item's auction rooms, represented by the Room
class, which contains a map of per user bids and a list of notification subscribers. The addBid
method adds a new bid; it makes sure that its price is higher than the previous bids, stores it, and notifies all the subscribers.
The two events that drive our actor are Subscribe(id, subscriber)
and ItemBid(id, name, price)
, that subscribe to an auction room and make a bid, respectively. As in the Scala version, it is convenient to provide two static methods, in an AuctionRooms
class, as a public API to communicate with the actor:
package models; import play.libs.Akka; import akka.actor.Props; public class AuctionRooms { static final ActorRef ref = Akka.system().actorOf(Props.create(AuctionRoomsActor.class)); public static void subscribe(Long id, Consumer<Bid> subscriber) { ref.tell(new Subscribe(id, subscriber), null); } public static void bid(Long id, String name, Double price) { ref.tell(new ItemBid(id, name, price), null); } }
Actually, there is a memory leak issue with this first implementation; we register subscriptions but never remove them when users close their web browser. To fix it, we need to give subscribers a chance to cancel their subscription (the following code lives in the AuctionRooms
class):
import play.libs.F; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import java.util.concurrent.TimeUnit; import static akka.patterns.Patterns.ask; static final Timeout t = new Timeout(Duration.create(1, TimeUnit.SECONDS)); public static F.Promise<Subscription> subscribe(Long id,Consumer<Bid> subscriber) { return F.Promise.wrap( (Future)ask(ref, new Subscribe(id, subscriber), t) ); } public static class Subscription { private final Long id; private final Consumer<Bid> subscriber; public Subscription(Long id, Consumer<Bid> subscriber) { this.id = id; this.subscriber = subscriber; } public void cancel() { ref.tell(new Unsubscribe(id, subscriber), null); } } static class Unsubscribe { public final Long id; public final Consumer<Bid> subscriber; public Unsubscribe(Long id, Consumer<Bid> subscriber) { this.id = id; this.subscriber = subscriber; } }
The subscribe
method now returns a Subscription
(or, more precisely, a Promise<Subscription>
, because message passing is asynchronous in Akka), which has a cancel
method. This one sends an Unsubscribe
message to the actor. Finally, the Unsubscribe
message is handled by the onReceive
method of the actor, as follows:
if (message instanceof Unsubscribe) { Unsubscribe unsubscribe = (Unsubscribe) message; Room room = rooms.get(unsubscribe.id); if (room != null) { room.subscribers.remove(unsubscribe.subscriber); } }
The preceding code just removes the subscriber from the room.
We can leverage this mechanism to cancel our subscriptions when the connection with the client is closed. This can be achieved as follows (in the Auctions
controller):
public static Result notifications(Long id) { return ok(EventSource.whenConnected(eventSource -> { AuctionRooms .subscribe( id, bid -> eventSource.send(event(Json.toJson(bid))) ).onRedeem(subscription -> { eventSource.onDisconnected(() -> subscription.cancel()); }); }); }
The preceding code registers a callback that calls the cancel
method of the subscription when the client is disconnected. The onRedeem
method allows us to eventually do something with the Promise<Subscription>
object returned by the subscribe
method; it takes a function as the parameter, which is called when the promise is redeemed, and which is supplied the redeemed value.
Our Java and Scala implementations of the AuctionRooms
service are now complete. What remains is to implement the Auctions.bid
action. This can be straightforwardly achieved, as follows:
def bid(id: Long) =
Action(parse.json(bidValidator)) { implicit request =>
val (name, bid) = request.body
AuctionRooms.bid(id, name, bid)
Ok
}
Alternatively, in Java, it can be done as follows:
@BodyParser.Of(BodyParser.Json.class) public static Result bid(Long id) { Bid bid = Json.fromJson(request().body().asJson(), Bid.class); AuctionRooms.bid(id, bid.name, bid.price); return ok(); }
This implementation accepts requests whose body contains a JSON object describing a bid (that is, having a name and a price). Then, it just calls the AuctionRooms.bid
function with the request data.
Our auction room's implementation is now finished! Connected users can make bids and instantly see bids made by others. There are two issues with the implementation proposed in this book, though.
First, the stream of bid notifications notifies users only when someone makes a new bid, so when a user joins an auction room, he or she do not get the bids that have been made in the past. This could be solved by changing the Auctions.notifications
endpoint to prepend bids that have been made in the past to the stream of notifications.
Secondly, our implementation is not stateless since the state of the auction rooms and the list of subscribers is kept in our server's memory. This system would not scale horizontally. However, as previously mentioned, this could be solved using a publish/subscribe system separated from the server. Also, note that because Akka actors can transparently be migrated to remote locations, we could separate our actor from our server with minimal effort.
In the implementation presented earlier, making a bid is performed by an Ajax call, while bid notifications are retrieved by an event source. Alternatively, we can use a single bidirectional WebSocket endpoint to both send bids and receive notifications. On the server side, it changes the way we handle bid requests; instead of using the Auctions.bid
endpoint (though this one can coexist with the WebSocket endpoint), we react to data sent by the client through the WebSocket. We obviously want to do that in a non-blocking way.
First, let's define an Auctions.channel
WebSocket action and its corresponding route:
GET /items/:id/auction/channelcontrollers.Auctions.channel(id: Long)
On the client side, we receive the notifications via the WebSocket, as follows:
var route = routes.controllers.Auctions.channel(item.id);
var ws = new WebSocket(route.webSocketURL());
ws.addEventListener('message', function (event) {
updateUIWithAddedBid(JSON.parse(event.data));
});
Note that we obtain the corresponding route URL by calling the webSocketURL
method of the route instead of using its url
property.
We also use the WebSocket to send bids, instead of performing an Ajax request:
ws.send(JSON.stringify({ name: name, price: price }))
This code assumes that variables name
and price
contain the values of the corresponding form fields.
On the server side, the Auctions.channel
action now returns a WebSocket
handler instead of an Action
. It needs two parameters: an iteratee defining how to process incoming data and an enumerator defining the outgoing data stream. The outgoing data stream is the same as in the server-sent events version, except that we don't need to format the events according to the server-sent events' specification; we can send them as JSON objects. The iteratee defining how incoming data is processed can be thought of as an infinite computation returning nothing and calling the AuctionRooms.bid
service method for each incoming bid. This leads to the following code:
def channel(id: Long) = WebSocket.tryAccept[JsValue] { request => AuctionRooms.notifications(id).map { notifications => val bidsHandler = Iteratee.foreach[JsValue] { json => for ((name, bid) <- json.validate(bidValidator)) { AuctionRooms.bid(id, name, bid) } } Right((bidsHandler, notifications &> Json.toJson[Bid])) } }
The WebSocket.tryAccept
method defines a WebSocket
handler. In our case, we say that the incoming and outgoing data should be interpreted as JSON objects. The tryAccept
method takes one parameter: a function that takes the current request headers as the parameter and returns either a Result
(in that case the WebSocket is not created) or a pair of iteratees and enumerators defining the logic of the WebSocket
handler. In our case, we always return an iteratee and an enumerator. The bidsHandler
iteratee is defined using the Iteratee.foreach
method; we simply forward each incoming bid to the AuctionRooms.bid
service method.
The Java version is as follows:
public static WebSocket<JsonNode> channel(Long id) { return WebSocket.whenReady((in, out) -> { in.onMessage(json -> { Bid bid = Json.fromJson(json, Bid.class); AuctionRooms.bid(id, bid.name, bid.price); }); AuctionRooms .subscribe(id, bid -> out.write(Json.toJson(bid))) .onRedeem(subscription -> { in.onClose(() -> subscription.cancel()); }); }); }
The WebSocket.whenReady
method creates a WebSocket
handler, whose logic is defined by the function it is passed as parameter. This function takes two parameters: the WebSocket's inbound and outbound. Incoming data processing logic is defined using inbound's onMessage
method. In our case, we forward each bid to the AuctionRooms.bid
service method. The inbound's onClose
method cancels the notifications subscription when the WebSocket is closed.
Finally, note that Play also supports a way to define WebSocket using actors instead of iteratees and enumerators in Scala or callbacks in Java. In that case, incoming data is sent to the actor as messages, and outgoing messages are sent by the actor to another actor reference managed by Play. This approach can be particularly useful in Java where there is currently no reactive programming model.
3.144.222.185