WebSocket using Iteratee

Let's define a WebSocket connection, which accepts strings and sends back the reverse of a string using Iteratee:

 def websocketBroadcast = WebSocket.using[String] {
    request =>
      val (out, channel) = Concurrent.broadcast[String]
      val in = Iteratee.foreach[String] {
        word => channel.push(word.reverse)
      }
      (in, out)
  }

The WebSocket.using method creates a WebSocket of a specific type using an Iteratee (inbound channel) and its corresponding enumerator (outbound channel). In the preceding code snippet, we return a tuple of the Iteratee in and the Enumerator out.

The Concurrent object is also a helper, which provides utilities to use Iteratees, Enumerators, and Enumeratees concurrently. The broadcast[E] method creates an Enumerator and a channel and returns a (Enumerator[E], Channel[E]) tuple. The Enumerator and channel, thus obtained, can be used to broadcast data to multiple Iteratees.

After this, we need to bind it to a path in the routes file, which is similar to what we do for an Action:

GET        /ws                  controllers.Application.websocketBroadcast

Now, using a browser plugin, such as simple WebSocket client for Chrome (refer to https://chrome.google.com/webstore/detail/simple-websocket-client/pfdhoblngboilpfeibdedpjgfnlcodoo), we can send messages through the WebSocket when an application is running, as shown here:

WebSocket using Iteratee

Since we do not use multiple Iteratees in our application, we can use Concurrent.unicast. This will require us to modify our code slightly:

 def websocketUnicast = WebSocket.using[String] {
    request =>
      var channel: Concurrent.Channel[String] = null
      val out = Concurrent.unicast[String] {
        ch =>
          channel = ch
      }
      val in = Iteratee.foreach[String] {
        word => channel.push(word.reverse)
      }
      (in, out)
  }

Notice that, unlike the broadcast method, the unicast method does not return a tuple of enumerators and channels, but instead only provides an enumerator. We have to declare a channel variable and initialize it with null, so that it is accessible within the Iteratee. When the unicast method is called, it is set to the channel generated within the unicast method.

Note

The unicast method also allows us to define the onComplete and onError methods, but they are not aware of the Iteratee, that is, we cannot refer to the Iteratee within these methods.

This example is overtly simple and does not highlight the complications involved in defining and using Iteratees. Let's try a more challenging use case. Now, we might need to build a web application that lets users connect to their database and load/view data over a WebSocket. Given this condition, the frontend sends JSON messages.

Now the WebSocket can get any of the following messages:

  • Connection request: It is a message that shows the information required to connect to a database (such as a host, port, user ID, and password)
  • Query string: It is the query to be executed in the database
  • Disconnect request: It is a message that closes a connection with the database

After this, the message is translated and sent to the DBActor, which sends back a status message or a result with row data, and is then translated to JSON and sent back by the WebSocket.

The response received from the DBActor can be one of the following:

  • A successful connection
  • Connection failure
  • Query result
  • Invalid query
  • Disconnected
    def dbWebsocket = WebSocket.using[JsValue] {
        request =>
          WebSocketChannel.init
      }

We can define a WebSocket handler for this scenario in the following manner:

Here, WebSocketChannel is an actor, which communicates with the DBActor and its companion object and is defined as follows:

object WebSocketChannel {
  def props(channel: Concurrent.Channel[JsValue]): Props =
    Props(classOf[WebSocketChannel], channel)

  def init: (Iteratee[JsValue, _], Enumerator[JsValue]) = {

    var actor: ActorRef = null
    val out = Concurrent.unicast[JsValue] {
      channel =>
        actor = Akka.system.actorOf(WebSocketChannel.props(channel))
    }

    val in = Iteratee.foreach[JsValue] {
      jsReq => actor ! jsReq
    }
    (in, out)
  }
}

WebSocketChannel is defined as follows:

class WebSocketChannel(wsChannel: Concurrent.Channel[JsValue])
  extends Actor with ActorLogging {

  val backend = Akka.system.actorOf(Props(classOf[DBActor]))

  def receive: Actor.Receive = {
    case jsRequest: JsValue =>
      backend ! convertJson(jsRequest)
    case x: DBResponse =>
      wsChannel.push(x.toJson)
  }
}

In the preceding code, convertJson translates JsValue to the format that is understood by the DBActor.

In the following section, we will implement the same application using the new WebSocket methods available in Play since the 2.3.x version.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.189.186.167