Chapter 5. Network I/O and Web Services

5.0. Introduction

More and more these days, it seems like every system we build has to talk to something, somewhere.[16] We’d hardly be doing anything if we didn’t actually talk with some other computers over some kind of network.

This chapter covers all of the normal remote communication modes you would expect—HTTP, TCP, UDP, and the like—as well as some relative newcomers[17] like message-oriented architectures.

5.1. Making HTTP Requests

Problem

You want to make a simple HTTP GET or POST request.

Solution

Use slurp to make simple HTTP GET requests:

(slurp "http://example.com")
;; -> "<!doctype html>
<html>
<head>
    <title>Example Domain</title> ...

Use the clj-http library to make GET, POST, and other requests with specific parameters or headers, to handle redirects and other special circumstances, or to get specific details about the response.

To follow along, add [clj-http "0.7.7"] to your project’s dependencies, or use lein-try to start a REPL:

$ lein try clj-http

Use clj-http.client/get to make GET requests:

(require '[clj-http.client :as http])

(:status (http/get "http://clojure.org"))
;; -> 200

(-> (http/get "http://clojure.org")
    :headers
    (get "server"))
;; -> "nginx"

(-> (http/get "http://www.amazon.com/")
    :cookies
    keys)
;; -> ("session-id" "session-id-time" "x-wl-uid" "skin")

Parameters can be included in both GET and POST requests. Use clj-http.client/post to make POST requests:

(http/get "http://google.com/" {:query-params {:q "clojure"}})
;; -> {:status 200 ...}

(http/post "http://example.com" {:form-params {:username "joecoder"
                                               :password "il0v3clojure"}})
;; -> {:status 200 ...}

You can even use the :multipart option to upload files, as from an HTML form via a web browser.

Discussion

slurp works to make HTTP GET requests because its arguments are passed to clojure.java.io/reader, which in turn correctly handles opening URL strings. This is totally sufficient for issuing a quick HTTP GET to a well-behaved URL. Unfortunately, this is where slurp’s usefulness ends. Among other limitations, it will not behave correctly for responses with HTTP redirects.

clj-http is an extremely flexible Clojure wrapper around the very robust Apache HttpComponents library. Its features include convenient functions for other HTTP verbs like PUT and DELETE; for reading and sending cookies, headers, and other request metadata; for reading and writing data using streams, files, or byte arrays; and lots more. Refer to the GitHub repository to learn about the huge variety of options available and to see many more examples.

If you’re building production systems that rely on external services, you may want to consider wrapping HTTP calls in Netflix’s Hystrix library to make your application more fault-tolerant and resilient. Hystrix provides Clojure bindings that you can use to wrap network calls and more easily manage complex failure scenarios involving external services.

See Also

5.2. Performing Asynchronous HTTP Requests

Problem

You want to perform asynchronous HTTP requests.

Solution

Use HTTP Kit, a highly performant, event-driven HTTP client/server library.

Before starting, add [http-kit "2.1.12"] to your project’s dependencies, or follow along in a REPL using lein-try:

$ lein try http-kit

Use any of org.httpkit.client’s HTTP verb functions to perform asynchronous HTTP requests. In their base form, these functions return a promise that you can await with deref or the @ reader shorthand:

(require '[org.httpkit.client :as http])

(def response (http/get "http://example.com"))

;; Some time later...

(:status @response)
;; -> 200

;; Or, using deref to specify a timeout length in milliseconds and
;; a value
(deref response 2000 nil)
;; -> {:opts {:url "http://example.com", :method :get}
;;     :body "..."
;;     :headers {:content-type "text/html", :content-length "1270" ...}
;;     :status 200}

Discussion

The bulk of time spent performing HTTP requests is establishing the connection and awaiting the server’s response. Asynchronous requests enable your application to continue working while awaiting the delivery of data.

In this vein, HTTP Kit provides both a highly concurrent web server and a powerful HTTP client. It offers both callbacks and promises for asynchronous requests, as well as persistent connections and alternate SSL engines for dealing with unsigned SSL certificates.

The org.httpkit.client namespace defines asynchronous versions of numerous HTTP methods, including get, delete, head, post, put, options, and patch. Each of these verbs derives from org.httpkit.client/request, which defines a common interface. An asynchronous request of a given method is made, and a promise is returned. Upon completion of the request, the promise will be fulfilled with the results/response.

All request functions accept an optional map of options where you can specify keys like :query-params, :post-params, or :headers. Functions also allow specifying a callback function to be called upon request completion:

(http/get "http://example.com"
          {:timeout 1000 ;; ms
                 :query-params {:search "value"}}
          (fn [{:keys [status headers body error]}]
            (if error
              (binding [*out* *err*]
                (println "Failed with, " error))
              (println body))))
;; -> #<core$promise$reify__6310@582e6c93: :pending>
;; *out*
;; <html>
;; <head>
;;   <title>Example Domain</title>
;; ...

See Also

5.3. Sending a Ping Request

Problem

You want to ping an IP address to check availability.

Solution

Use the java.net.InetAddress class to test if the address isReachable:

(.isReachable (java.net.InetAddress/getByName "oreilly.com") 5000)
;; -> true

Discussion

Using isReachable works great if the correct permissions can be obtained. On a typical Unix-like implementation, you will need to start your Clojure instance with sudo to get an actual ICMP ping sent. Otherwise, a standard connection will be attempted on port 7, which in most cases will be blocked by a firewall. More information can be found in the javadoc.

A common need when pinging another machine is to time the ping. You can wrap an .isReachable invocation in a function timed-ping to return timing values with every ping:

(defn timed-ping
  "Time an .isReachable ping to a given domain"
  [domain timeout]
  (let [addr (java.net.InetAddress/getByName domain)
        start (. System (nanoTime))
        result (.isReachable addr timeout)
        total (/ (double (- (. System (nanoTime)) start)) 1000000.0)]
    {:time total
     :result result}))

(timed-ping "oreilly.com" 5000)
;; -> {:time 88.07, :result true}

5.4. Retrieving and Parsing RSS Data

Problem

You need to parse RSS data.

Solution

Use the feedparser-clj library to parse RSS data.

Before starting, add [org.clojars.scsibug/feedparser-clj "0.4.0"] to your project’s dependencies, or follow along in a REPL using lein-try:

$ lein try org.clojars.scsibug/feedparser-clj

Invoke feedparser-clj.core/parse-feed with the URI of an RSS feed to retrieve that feed and parse it into Clojure data:

(require '[feedparser-clj.core :as rss])

(rss/parse-feed (str "https://github.com/clojure-cookbook/clojure-cookbook/"
                     "commits/master.atom"))
;; -> {:authors [...]
;;     :entries [{:link "LINK" :title "TITLE" :contents "CONTENT"} ...]
;;      ...}

You can also invoke parse-feed with a java.io.InputStream to read from a file or other location:

(with-open [writer (clojure.java.io/writer "master.atom")]
  (spit writer
        (slurp (str "https://github.com/clojure-cookbook/clojure-cookbook/"
                    "commits/master.atom"))))

(with-open [stream (clojure.java.io/input-stream "master.atom")]
  (rss/parse-feed stream))
;; -> {:authors [...]
;;     :entries [{:link "LINK" :title "TITLE" :contents "CONTENT"} ...]
;;     ...}

Discussion

feedparser-clj is a wrapper around the Java ROME library that is capable of processing a variety formats of RSS and Atom feeds. feedparser-clj.core/parse-feed returns a Clojure map that closely mimics the underlying XML feed.

Most of the time, what you care about will be under the :entries key, which contains an array of maps corresponding to each RSS entry.

Some RSS feeds will have <link rel="next"> elements that indicate that the returned list is incomplete and more entries can be retrieved by following the link. A lazy list of these RSS entries can be generated:

(defn next-uri
  "Return the rel=next href in a feed."
  [feed]
  (-> feed
      :entry-links
      (->> (filter #(= (:rel %) "next")))
      first
      :href))


(defn lazy-stream
  "Return a lazy stream of RSS entries."
  [uri]
   (let [raw-response (rss/parse-feed uri)]
      (lazy-cat (:entries raw-response)
         (if-let [nxt (next-uri raw-response)]
            (lazy-stream nxt)))))

To verify that lazy loading is happening, logging or tracing can be added to lazy-stream, but it is also easy to confirm that you can retrieve more entries than are present in a single fetch:

(def youtube-feed "http://gdata.youtube.com/feeds/api/videos")

(count (rss/parse-feed youtube-feed))
;; -> 15

(count (take 50 (lazy-stream youtube-feed)))
;; -> 50

Warning

Be careful when evaluating a lazy sequence in a REPL, since it will attempt to print the entire sequence. Use take to only realize part of the sequence.

See Also

5.5. Sending Email

Problem

You need to send emails from inside a Clojure application.

Solution

Use postal, a thin wrapper over the JavaMail package, to send email messages.

To follow along with this recipe, start a REPL using lein-try:

$ lein try com.draines/postal

Send a message by invoking the postal.core/send-message function with two maps, the first containing connection details and the second containing message details. For example, to send an email message to yourself via a Gmail account:

(require '[postal.core :refer [send-message]])

;; Replace the following with your own credentials
(def email "<<your gmail address>")
(def pass "<your gmail password>")

(def conn {:host "smtp.gmail.com"
           :ssl true
           :user email
           :pass pass})

(send-message conn {:from email
                    :to email
                    :subject "A message, from the past"
                    :body "Hi there, me!"})
;; -> {:error :SUCCESS, :code 0, :message "messages sent"}

If all is well, you should receive an email from yourself shortly thereafter.

Discussion

With the venerable JavaMail at its core, there isn’t much postal leaves for you to worry about. Even Gmail’s oft-maligned authentication setup can be tackled with a single :ssl key. While we might normally suggest giving the native Java API a try for simple email delivery, we prefer postal because it presents an API oriented around data rather than objects.

One of the places data orientation really shines is in specifying connection details. The first argument to the send-message function is a (versatile) map of connection details. Valid connection details are:

:host
Hostname of the desired SMTP server. Optional if running locally.
:port
Port of SMTP server. Numerous contextual defaults exist, including 465 when :ssl is set or 25 when :tls is set.
:user
Username to authenticate with (if authenticating).
:pass
Password to authenticate with (if authenticating).
:ssl
Enables SSL encryption if value is truthy.
:tls
Enables TLS encryption if value is truthy.

When provided no connection details—either by omitting the first argument or passing nilpostal will attempt to route email through a local sendmail instance.

Note

Since Amazon’s Simple Email Service (SES) can operate over SMTP, it is possible to use postal to send email via Amazon’s infrastructure.

Similar to connection details, messages themselves are represented as simple maps of data. The full complement of standard headers are supported as message keys:

  • Sender options

    • :from
    • :reply-to
  • Recipient options

    • :to
    • :cc
    • :bcc
  • Content options

    • :subject
    • :body
  • Metadata options

    • :date
    • :message-id
    • :user-agent

Options specified beyond these will be attached to the message as ancillary headers.

When specifying recipients on the :to, :cc, or :bcc keys, values may be either a single address or a sequence of addresses:

{:to "[email protected]"
 :cc ["[email protected]", "[email protected]", "[email protected]"]
 :bcc "[email protected]"}

A message’s body can be specified as either a string or a sequence of part maps. While the former delivers a simple plain-text email, the latter will deliver a multipart MIME message. MIME (Multipurpose Internet Mail Extensions) is the standard that allows email messages to contain attachments or other rich content, such as HTML.

A part map is made up of two values: :type and :content. For message body parts, :type is the MIME type of the content, and :content is the textual representation of said content. For example, to create a message with both plain text and HTML representations of the content:

:body [:alternative
       {:type "text/plain"
        :content "You just won the lottery!"}
       {:type "text/html"
        :content "<html>
                    <body>
                      <p>You just <b>won</b> the lottery!</p>
                    </body>
                  </html>"}]

You’ll notice the first “part” in the preceding body was not, in fact, a part map, but the keyword :alternative. Messages are normally sent in “mixed” mode, indicating to an email client that each part constitutes a piece of the whole message. Messages of the :alternative type, however, inform a client that each part represents the entire message, albeit in differing formats.

Note

If you need to send complicated multipart messages and require a high level of control over message creation, you should use the raw JavaMail API to construct messages.

For attachments, the :type parameter behaves a little differently, controlling whether the attachment resides inline (:inline) or as an attachment (:attachment). The contents of an attachment are specified by providing a File object for the :content key. An attachment’s content type and name are generally inferred from the File object, but they may be overridden via the :content-type and :file-name keys, respectively.

For example, forwarding all of your closest friends a picture of your cat might look something like this:

:body [{:type "text/plain"
        :content "Hey folks,

Check out these pictures of my cat!"}
       {:type :inline
        :content (File. "/tmp/lester-flying-photoshop")
        :content-type "image/jpeg"
        :file-name "lester-flying.jpeg"}
       {:type :attachment
        :content (File. "/tmp/lester-upside-down.jpeg")}]

See Also

5.6. Communicating over Queues Using RabbitMQ

Problem

You want to communicate between a number of applications using a queueing broker such as RabbitMQ.

Solution

Use Langohr, a small RabbitMQ client, to communicate with RabbitMQ.

Before starting, add [com.novemberain/langohr "1.6.0"] to your project’s dependencies, or follow along in a REPL using lein-try:

$ lein try com.novemberain/langohr

In order to follow along with this recipe, you need to have RabbitMQ installed and running.

Once installed, start a standalone RabbitMQ server with the command rabbitmq-server:

$ rabbitmq-server

Prior to performing any operations against RabbitMQ, you must connect to a server and open a communication channel. A channel is the medium over which you can produce and consume messages:

(require 'langohr.core
         'langohr.channel)

;; Connect to local RabbitMQ cluster node on localhost:5672
(def conn (langohr.core/connect {:hostname "localhost"}))

;; Open a channel against the connection
(def ch (langohr.channel/open conn))

In RabbitMQ, messages are published to exchanges, routed to queues via a binding, then finally consumed by consumers. There are a number of different exchange types that vary the semantics of delivery; the most basic exchange type is direct, which routes messages based on their routing key.

To construct a pipeline between producer and consumer, start by invoking langohr.queue/declare to create a queue with the desired name:

(require '[langohr.queue :as lq])

(def resize-queue "imaging.resize")

(lq/declare ch resize-queue)
;; -> {:queue "imaging.resize",
;;     :consumer-count 0,
;;     :message_count 0,
;;     :consumer_count 0,
;;     :message-count 0}

By default, RabbitMQ creates a binding between the empty exchange (an empty string) and each queue. You can now publish a message to the "imaging.resize" queue by invoking langohr.basic/publish with the channel, direct exchange, routing key (your queue name), and a message:

(lb/publish ch "" resize-queue "hello.jpg")

To consume messages from a queue synchronously, invoke langohr.basic/get with the channel and queue name:

(def hello-msg (lb/get ch resize-queue))

hello-msg
;; -> [{:routing-key "imaging.resize", :headers nil ...} #<byte[] [B@2b195c88>]

(String. (last hello-msg) "UTF-8")
;; -> "hello.jpg"

To consume messages asynchronously as they appear, use langohr.consumers/subscribe to subscribe to a queue. The handler function you provide to subscribe will be called for each message published to the queue:

(require '[langohr.consumers :as lc])

(defn resize-image-handler
  "Spawn a resize process for each resize message received"
  [ch metadata ^bytes payload]
  (let [filename (String. payload "UTF-8")]
    (println (format "Resizing file %s" filename))))

;; Subscribe to the queue with the handler function
(def tag (lc/subscribe ch resize-queue resize-image-handler))

;; The return value of subscribe is a subscription tag
tag
;; -> "amq.ctag-7hsNsSqLDEEoES5AkIC6XQ"

(lb/publish ch "" resize-queue "hello-again.jpg")
;; *out*
;; Resizing file hello-again.jpg

;; Unsubscribe resize-image-handler via the tag value
(lb/cancel ch tag)

Discussion

At this point, you’ve round-tripped a few messages to RabbitMQ, but you’ve barely scratched the surface of what Langohr and RabbitMQ are capable of. Langohr is a small RabbitMQ client wrapping the Java RabbitMQ library that supports AMQP 0-9-1 and RabbitMQ extensions of AMQP, and provides an HTTP API client.

AMQP 0-9-1, and by extension, Langohr, centers around a few main concepts: exchanges, queues, and bindings.

Exchanges

An exchange is very much like a post office: when a message is published to an exchange, the exchange will route the message to one or more queues. How those messages are routed to queues is dependent on both the exchange type and the bindings between the exchange/queues.

There are multiple exchange types, each with its own routing semantics—see Table 5-1. Custom exchange types can be created to deal with sophisticated routing scenarios (e.g., routing based on content or geolocation data) or just for convenience.

Table 5-1. Built-in exchange types
Name Behavior Predeclared exchange

Direct

1:1, routed based on routing key

""

Fanout

1:N, ignoring routing key

"amq.fanout"

Topic

1:N, taking routing key into consideration

"amq.topic"

Headers

1:1, taking into consideration any number of headers

"amq.match"

To declare one of the built-in exchanges, use one of langohr.exchange/fanout, langohr.exchange/topic, langohr.exchange/direct, or langohr.exchange/headers. Each of these functions exposes the relevant options for that exchange type, ultimately invoking langohr.exchange/declare:

(require '[langohr.exchange :as le])

;; Create a fanout exchange for image processing completion
(le/fanout ch "imaging.complete")

Exchanges have several attributes associated with them:

  • Name
  • Type (direct, fanout, topic, headers, or some custom type)
  • Durability (should it survive broker restarts?)
  • Whether the exchange is autodeleted when no longer used
  • Custom metadata (sometimes known as x-arguments)

Using langohr.exchange/declare directly, you can customize these attributes to create your own types of exchanges.

Queues

A queue is like a mailbox in a post office. The langohr.queue/declare function creates named queues. Apart from the name, this function accepts a number of keyword arguments that vary the characteristics of the queue, including whether it is :durable, :exclusive, or :auto-delete. Other arguments can be specified in an :arguments value:

(lq/declare ch "imaging.transcode" :durable true)
;; -> {:queue "imaging.transcode", ...}

Queues with unique names can be generated using the langohr.queue/declare-server-named function. This functions similarly to langohr.queue/declare, but without a name argument:

(lq/declare-server-named ch)
;; -> "amq.gen-FcFv8JD9K8-4NuT8kC3jKA"

Unlike exchanges, queues in RabbitMQ are all of the same type.

Bindings

As you saw in the solution, a direct exchange has an implicit binding between the default exchange and every queue, by name. In the wild, however, queues are usually bound to exchanges explicitly. You can create your own bindings by invoking langohr.queue/bind with a channel, queue name, and exchange name:

;; Create a unique completion queue...
(def completion-queue (lq/declare-server-named ch))

;; and bind it to the imaging.complete fanout
(lq/bind ch completion-queue "imaging.complete")

Publishing

Messages are published to an exchange using the langohr.basic/publish function. This function takes three primary arguments (beyond channel):

The name of an exchange
Either a user-made exchange such as "imaging.complete", or a built-in like "amq.fanout" or ""
A routing key
Used by the exchange to perform type-specific routing of messages to queue(s)
A message
A string body for the message to be delivered to the queue

As optional arguments, publish allows users to specify a plethora of message headers as keyword arguments. For the full list, see the docstring for the publish function.

Consuming

Having declared a number of queues, there are two ways to consume messages from them:

  • Pull, using langohr.basic/get
  • Push, using langohr.consumers/subscribe

In the Push API, you make a synchronous invocation of the get function to retrieve a single message from a queue. The return value of get is a tuple of metadata map and a body. The body payload, as returned, is an array of bytes—for plain-text messages you can use the string constructor (String.) to intern those bytes to a string. Since String byte arrays are encoded using UTF-8, it is important to invoke the String constructor with an encoding option of "UTF-8":

(lb/publish ch "" resize-queue "hello.jpg")
(let [[_ body] (lb/get ch resize-queue)]
  (String. body "UTF-8"))
;; -> "hello.jpg"

When no messages are present on a queue, get will return nil.

In the Pull API, you subscribe to a queue using langohr.consumers/subscribe, providing a message handler function that will be invoked for each message the queue receives. This function will be invoked with three arguments: a channel, metadata, and the body bytes:

;; A run-of-the-mill handler function
(defn resize-image-handler
  "Spawn a resize process for each resize message received"
  [ch metadata ^bytes payload]
  (let [filename (String. payload "UTF-8")]
    (println (format "Resizing file %s" filename))))

subscribe is a nonblocking call, and upon completion will return a tag string that can be used to later cancel the subscription using langohr.consumers/cancel.

The subscribe function also allows you to specify a large number of queue life cycle functions, documented at length in the langohr.consumers/create-default docstring.

Acknowledgment

Consumed messages need to be acknowledged. That can happen automatically (RabbitMQ will consider a message acknowledged as soon as it sends it to a consumer) or manually.

When a message is acknowledged, it is removed from the queue. If a channel closes unexpectedly before a delivery is acknowledged, it will be automatically requeued by RabbitMQ. Note that these acknowledgments have application-specific semantics and help ensure that messages are processed properly.

With manual acknowledgment, it is application’s responsibility to either acknowledge or reject a delivery. This is done with langohr.basic/ack and langohr.basic/nack, respectively, each of which takes a metadata attribute called delivery-tag (the delivery ID). To enable manual acknowledgments, pass :auto-ack false to langohr.consumers/subscribe:

(defn manual-ack-handler
  "Spawn a resize process for each resize message received"
  [ch {:keys [delivery-tag]} ^bytes payload]
  (try
    (String. payload "UTF-8")
    ;; Do some work, then acknowledge the message
    (lb/ack ch delivery-tag)
    (catch Throwable t
      ;; Reject message
      (lb/nack ch delivery-tag))))

(lc/subscribe ch resize-queue manual-ack-handler :auto-ack false)

Note that if you requeue a message with just one consumer on it, it will be redelivered immediately.

It is also possible to control how many messages will be pushed to the client before it must receive an ack for at least one of them. This is known as the prefetch setting and is set using langohr.basic/qos. This setting applies across an entire channel:

;; Prefetch a dozen messages
(lb/qos ch 12)

RabbitMQ queues can also be mirrored between cluster nodes for high availability, have a bounded length or expiration period for messages, and more. To learn more, see RabbitMQ and Langohr documentation sites.

See Also

5.7. Communicating with Embedded Devices via MQTT

Problem

You want to communicate with embedded devices (think “Internet of things”) using a publish/subscribe model.

Solution

Use Machine Head, a Clojure library that enables machine-to-machine (M2M) communication via the MQTT protocol. The protocol requires an existing MQTT broker with which all devices (or machines) will communicate by publishing messages or subscribing to messages on specific topics. You can use the Mosquitto broker with its test installation at tcp://test.mosquitto.org:1883 (of course, you need a functional Internet connection on your machine).

To follow along with this recipe, launch a REPL using lein-try:

$ lein try clojurewerkz/machine_head

To start, create a simple connect-and-subscribe function that listens to a topic and prints messages it receives:

(require '[clojurewerkz.machine-head.client :as mh])

(defn message-handler [topic meta payload]
  (let [p (apply str (map char payload))]
    (println "received " p "on topic " topic)))

(defn connect-and-subscribe [broker-addr topics subscriberid]
  (let [qos-levels (vec (repeat (count topics) 2)) ;; All at qos 2
        conn-sub (mh/connect broker-addr subscriberid)]
    (if (mh/connected? conn-sub)
      (do
        (mh/subscribe conn-sub topics message-handler {:qos qos-levels})
        conn-sub)))) ;; Return conn-sub for later mh/disconnect...

(def subscriberid (mh/generate-id))
;; or use a unique id
;; (def subscriberid "SNSubscriber01")

(connect-and-subscribe "tcp://test.mosquitto.org:1883"
                       ["SNControlNetwork/Florida/device1"] subscriberid)

Open another terminal window and start a second lein-try REPL session. Use the following code to publish messages to the broker. Note that subscriber must be connected already so as not to lose incoming messages:

(require '[clojurewerkz.machine-head.client :as mh])

(defn connect-and-publish [broker-addr client-id topic]
  (let [qos 2
        retained false
        conn  (mh/connect broker-addr client-id)]
    (if (mh/connected? conn)
      (do (dotimes [n 5]
            (let [payload (str "msg" n)]
              (mh/publish conn topic payload qos retained)
              (println "published " payload)))
          (mh/disconnect conn)))))

(def pubclientid (mh/generate-id))
pubclientid
;; -> "ryan.1384135173618"

(connect-and-publish "tcp://test.mosquitto.org:1883" pubclientid
                       "SNControlNetwork/Florida/device1")
;; *out* of publish REPL
;; published  msg0
;; published  msg1
;; published  msg2
;; published  msg3
;; published  msg4
;; *out* of client REPL
;; received  msg0 on topic  SNControlNetwork/Florida/device1
;; received  msg1 on topic  SNControlNetwork/Florida/device1
;; received  msg2 on topic  SNControlNetwork/Florida/device1
;; received  msg3 on topic  SNControlNetwork/Florida/device1
;; received  msg4 on topic  SNControlNetwork/Florida/device1

Discussion

MQTT is an open, lightweight publish/subscribe messaging protocol. It is useful for connections where bandwidth is at a premium and/or connections are unreliable. While the AMQP protocol excels at various scenarios for business messaging, MQTT is usually the choice for smaller payloads and last-mile connectivity because it is simple to implement in hardware. The MQTT protocol has the following properties that make it good for constrained networks:

  • Designed for devices with limited resources, like battery-operated 8-bit controllers.
  • Internally compresses into bitwise headers and variable-length fields. The smallest possible packet size is a mere two bytes.
  • No polling required. Implements asynchronous bidirectional push delivery of messages.
  • Supports always-connected and sometimes-connected models.
  • Tested with low-bandwidth networks like VSAT and GPRS.

The protocol defines three possible Quality of Service (QoS) values: 0, 1, and 2, corresponding to fire-and-forget, at-least-once, and exactly-once qualities of service. QoS parameters 1 and 2 require persistent storage on the client so as to save the message until an acknowledgment arrives. In the preceding recipe, the default persistence implementation provided by the library is used.

MQTT also has a concept of retention of messages. If you were to set retained to true in the connect-and-publish function, the broker would remember the last known retained message on the topic. When the subscriber connects, it is given the last message (for which retained was true) by the broker and does not have to wait to receive the first message.

Note

WebSphere and RabbitMQ also implement MQTT and can be used instead of Mosquitto. While the preceding code used the Mosquitto test broker (tcp://test.mosquitto.org:1883), you can install your own Mosquitto broker using the MQTT installation instructions.

The topics are usually defined with the separator / defining hierarchies. As an example, the sensor devices of a particular domain, SNControl, might be publishing their values to SNControl/Florida/device1, SNControl/Florida/device2, and so on. Meanwhile, the devices in domain RKNControl might publish their values to RKNControl/Washington/device1, for example. Naming the topics in this way helps in subscribing to multiple topics based on wildcards.

This is how wildcards are used:

/
Used as a separator.
+
The single-level wildcard and can appear anywhere in the string.
#
A multilevel wildcard and needs to appear at the end of the string.

For example, these subscriptions are possible:

SNControl/#
Any device under SNControl/Florida (e.g., SNControl/Florida/device1/sensor1 and SNControl/Florida/device1/sensor2) and SNControl/California/device1 will match.
SNControl/+/device1
Any device1 in states under domain SNControl will match(e.g., SNControl/Florida/device1 and SNControl/California/device1).
SNControl/+/+/sensor1
Any sensor1 in states under domain SNControl will match (e.g., SNControl/Florida/device1/sensor1 and SNControl/Florida/device2/sensor1).

In the preceding code, the connect-and-subscribe method uses the callback handler message-handler to process incoming messages arriving from the broker. In the connect-and-subscribe method, the connect method from the Machine Head library is invoked by providing it the broker address and client ID (generated using generate-id, or some other unique ID). Then it checks that the connection has been established using the connected? method. The subscribe method is invoked with the connection, a vector of topics to subscribe to, a message handler, and a :qos option. The subscriber then waits for some time and disconnects using the disconnect method.

The connect-and-publish method calls the method connect, which accepts the broker address and client ID and returns the connection conn. Then it checks if the connection is successful with the connected? method and invokes the publish method to publish messages (a few times) to the broker. The publish method accepts as parameters the connection, topic string, payload, QoS value, and retained. The QoS value of 2 corresponds to exactly-once delivery. The retained value of false instructs the broker not to retain messages. Finally, the disconnect method disconnects from the broker.

While the preceding code fragment just prints the incoming messages, you could potentially use the messages in some other way (e.g., triggering some actions based on an alarm that the code has received).

See Also

5.8. Using ZeroMQ Concurrently

Problem

You want to use ZeroMQ concurrently, but ZeroMQ sockets are not thread-safe. You could manually set up mutual exclusion via locks or other Java concurrency primitives, but you’d rather use a simpler method.

Solution

Use the zmq-async library to simplify concurrent usage of ZeroMQ via core.async.

In order to follow along with this recipe, your system should have ZeroMQ 3.2 installed.

If you’re on a Mac and have the Homebrew package manager installed, use this command to install it:

$ brew install zeromq

Or, if you are on Ubuntu:

$ apt-get install libzmq3

Otherwise, visit ØMQ’s downloads page.

Before starting, add [com.keminglabs/zmq-async "0.1.0"] to your project’s dependencies, or follow along in a REPL using lein-try:

$ lein try com.keminglabs/zmq-async

Here’s a simple ping-pong between two asynchronous go blocks in core.async, communicating via a ZeroMQ in-process socket:

(require '[com.keminglabs.zmq-async.core :refer [register-socket!]]
         '[clojure.core.async :refer [>! <! go chan sliding-buffer close!]])

(def addr "inproc://ping-pong")

(def server-in  (chan (sliding-buffer 64)))
(def server-out (chan (sliding-buffer 64)))
(def client-in  (chan (sliding-buffer 64)))
(def client-out (chan (sliding-buffer 64)))

(register-socket! {:in server-in
                   :out server-out
                   :socket-type :rep
                   :configurator (fn [socket] (.bind socket addr))})

(register-socket! {:in client-in
                   :out client-out
                    :socket-type :req
                   :configurator (fn [socket] (.connect socket addr))})

(do
  ;; A simple server worker that waits for incoming requests and
  ;; responds with "pong"
  (go
    (dotimes [_ 3]
      (println (String. (<! server-out)))
      (>! server-in "pong"))
    (close! server-in))

  ;; A simple client worker that sends a "ping" request and awaits
  ;;  a response
  (go
    (dotimes [_ 3]
      (>! client-in "ping")
      (println (String. (<! client-out))))
    (close! client-in)))
;; *out*
;; ping
;; pong
;; ping
;; pong
;; ping
;; pong

Discussion

ZeroMQ is a message-oriented socket system that supports many communication styles (request/reply, pub/sub, etc.) on top of many transport layers (intra-process, inter-process, inter-machine via TCP, etc.) with bindings to many languages. ZeroMQ sockets are a great substrate upon which to build service-oriented architectures. ZeroMQ sockets have less overhead than HTTP and are architecturally more flexible, supporting publish/subscribe, fanout, and other topologies in addition to request/reply.

However, ZeroMQ sockets are not thread-safe—concurrent usage typically requires explicit locking or dedicated threads and queues. The zmq-async library handles all of that for you, creating ZeroMQ sockets on your behalf and giving you access to them via thread-safe core.async channels.

The zmq-async library provides one function, com.keminglabs.zmq-async.core/register-socket!, which associates a ZeroMQ socket with either one or two core.async channels: :in (to which you can write strings or byte arrays) and :out (from which you can read byte arrays). Writing a Clojure collection of strings and/or byte arrays to a channel using >! sends a multipart message. Received multipart messages are placed on core.async channels. Reading these messages with <! will yield a vector of byte arrays.

To simulate two asynchronous processes interacting over ZeroMQ, the preceding sample uses two go blocks that read from and write to the registered channels. Each go block will begin executing immediately in background threads. The “server” block will wait for and reply to three requests (<! blocks until it receives a value), replying with “pong” each time. Concurrently, the “client” block will make three “ping” requests, awaiting a reply before moving on to the next request. Finally, after both blocks are done working, they each close their channels using close!.

The register-socket! function can be given an already-created ZeroMQ socket, but typically you would have the library create a socket for you by passing the :socket-type and a :configurator. The configurator is a function that is passed the raw ZeroMQ socket object. This function is run on the socket after it is created in order to connect/bind addresses, set pub/sub subscriptions, and otherwise configure the socket.

Warning

The implicit context supporting register-socket! can only handle one incoming/outgoing message at a time. If you need sockets to work in parallel (i.e., you don’t want to miss a small control message just because you’re slurping in a 10 GB message on another socket), then you’ll need multiple zmq-async contexts.

See Also

5.9. Creating a TCP Client

Problem

You want to open a TCP connection to a remote host, on a particular port.

Solution

Use Java interop to create an instance of java.net.Socket and connect to a remote host.

For example, the following code uses a Socket to create a TCP connection and send an HTTP GET request, returning the result as a string:

(require '[clojure.java.io :as io])
(import '[java.io StringWriter]
        '[java.net Socket])

(defn send-request
  "Sends an HTTP GET request to the specified host, port, and path"
  [host port path]
  (with-open [sock (Socket. host port)
              writer (io/writer sock)
              reader (io/reader sock)
              response (StringWriter.)]
    (.append writer (str "GET " path "
"))
    (.flush writer)
    (io/copy reader response)
    (str response)))

This function obtains instances of java.io.Writer and java.io.Reader to send and receive data to and from the remote server. By appending strings that conform to the HTTP specification to the writer, it forms a rudimentary HTTP client and executes a GET request to the specified endpoint. The results are then copied into an instance of java.io.StringWriter using the clojure.java.io/copy utility function, and returned as a string.

Invoking (send-request "google.com" 80 "/") at the REPL should return a very long string, consisting of the entire HTTP response that is the Google home page.

Discussion

This example uses the clojure.java.io namespace to obtain instances of java.io.Writer and java.io.Reader to read and write textual data to/from the network socket. In point of fact, Socket instances are not actually limited to textual data, and it would be possible to obtain raw binary input and output streams just as easily using clojure.java.io/input-stream and clojure.java.io/output-stream, respectively. Since HTTP is a textual protocol, however, it makes more sense to use the higher-level features of Reader and Writer.

Caution

This example uses HTTP because it’s a protocol that many readers are familiar with. In the real world, using a raw TCP socket for HTTP requests is almost certainly a terrible idea. There are a plethora of libraries that provide a much higher-level interface to HTTP requests and responses, and encapsulate a lot of pesky details such as escaping, encoding, and formatting.

Also note that the reader, the writer, and the socket itself are bound within the context of a with-open macro. This guarantees that the close method is called when they are finished, which releases the TCP connection. If the connection is not released, it will continue to consume resources on both the client and the server and may be subject to termination on the remote side.

When returning lazy sequences from a with-open context, it is important to fully realize those sequences using doall. This is because resources opened by with-open are only available inside the with-open block. The doall function fully realizes a collection, retaining its entire contents in memory:

(realized? (range 100))
;; -> false

(realized? (doall (range 100)))
;; -> true

Depending on your application, you may prefer to use the doseq macro. Instead of retaining the entire sequence, doseq executes its body for each element of the sequence. This is useful if you need to cause side effects for each element of a sequence, but need to hang on to the entire thing:

(doseq [n (range 3)]
  (println n))
;; *out*
;; 0
;; 1
;; 2

5.10. Creating a TCP Server

Problem

You want to open up a socket on a port to use as a low-level TCP server.

Solution

Use Java interop on the java.net.ServerSocket class to create a TCP listener. Use the functions in clojure.java.io to obtain input and output streams (or readers and writers) to read and write data to the socket:

(require '[clojure.java.io :as io])
(import '[java.net ServerSocket])

(defn receive
  "Read a line of textual data from the given socket"
  [socket]
  (.readLine (io/reader socket)))

(defn send
  "Send the given string message out over the given socket"
  [socket msg]
  (let [writer (io/writer socket)]
      (.write writer msg)
      (.flush writer)))

(defn serve [port handler]
  (with-open [server-sock (ServerSocket. port)
              sock (.accept server-sock)]
    (let [msg-in (receive sock)
          msg-out (handler msg-in)]
      (send sock msg-out))))

This code defines three functions. receive and send deal with reading and writing string data from and to a socket, using the clojure.java.io/reader and clojure.java.io/writer functions. Both of these accept a java.net.Socket as an argument and will return a java.io.Reader or java.io.Writer built from the socket’s input and output streams.

server handles actually creating an instance of ServerSocket on a particular port. It also takes a handler function, which will be used to process the incoming request and determine a response message.

After creating an instance of ServerSocket, server immediately calls its accept method, which blocks until a TCP connection is established. When a client connects, it returns the session as an instance of java.net.Socket.

It then passes the socket to the receive function, which opens up a reader on it and blocks until it receives a full line of input, terminated by a newline character ( ). When it receives one, it calls the handler function with the resulting value, and calls send to send the response using a writer opened on the same socket. send also calls the flush method on the writer to ensure that all the data is actually sent back to the client, instead of being buffered in the Writer instance.

After sending the response, the serve method returns. Because it used the with-open macro when creating the server socket and the TCP session socket, it will invoke the close method on each before returning, which disconnects the client and ends the session.

To try it out, invoke the serve function in the REPL. For a simple example, use (serve 8888 #(.toUpperCase %)). Note that it won’t return right away; it blocks, waiting for a client to connect.

To connect to the server you can use a telnet client, which is installed by default on nearly every operating system. To use it, open up a command-line window:

$ telnet localhost 8888
Trying ::1...
Connected to localhost.
Escape character is '^]'.

At this point you can type anything you like (in the following example, the input is “Hello, World!”). When you finish, make sure you type Enter or Return to send a newline character:

$ telnet localhost 8888
Trying ::1...
Connected to localhost.
Escape character is '^]'.
Hello, World!
HELLO, WORLD!Connection closed by foreign host

As you can see, as soon as you type a newline, the server responds with the uppercase version of your input (as per the handler function) and then immediately terminates the connection. In the REPL, you will find that the serve function has finally returned.

Discussion

This example uses readers and writers, which deal solely in textual data, to make the concepts of working with sockets easier to demonstrate. Of course, an actual socket is not limited to strings and can send and receive any kind of binary data.

To do this, simply use the clojure.java.io/input-stream and clojure.java.io/output-stream functions instead of the clojure.java.io/reader and clojure.java.io/writer functions, respectively, which return java.io.InputStream and java.io.OutputStream objects. These provide APIs for reading and writing raw bytes, rather than just strings and characters.

One thing you may have noticed about the example is that, unlike a traditional server, it doesn’t actually continue to accept incoming connections after the serve function returns. For ongoing use, typically you’d like to be able to serve multiple incoming connections.

Fortunately, this is relatively straightforward to do given the concurrency tools that Clojure provides. Modifying the serve function to work as a persistent server requires three changes:

  • Run the server on a separate thread so it doesn’t block the REPL.
  • Don’t close the server socket after handling the first request.
  • After handling a request, loop back to immediately handle another.

Also, because the server will be running on a non-REPL thread, it would be good to provide a mechanism for terminating the server other than killing the whole JVM.

The modified code looks like this:

(defn serve-persistent [port handler]
  (let [running (atom true)]
    (future
      (with-open [server-sock (ServerSocket. port)]
        (while @running
          (with-open [sock (.accept server-sock)]
            (let [msg-in (receive sock)
                  msg-out (handler msg-in)]
              (send sock msg-out))))))
    running))

The key feature of this code is that it launches the server socket asynchronously inside a future and calls the accept method inside of a loop. It also creates an atom called running and returns it, checking it each time it loops. To stop the server, reset the atom to false, and the loop will break:

(def a (serve-persistent 8888 #(.toUpperCase %)))
;; -> #'my-server/a

;; Server is running, will respond to multiple requests

(reset! a false)
;; -> false
;; Server is stopped, will stop serving requests after the next one

See Also

5.11. Sending and Receiving UDP Packets

Problem

You want to send asynchronous UDP packets from your application, or receive them.

Solution

Use Java interop with the java.net.DatagramSocket and java.net.DatagramPacket classes to send and receive UDP messages.

The following example demonstrates functions that send and receive short strings encoded into UDP packets:

(import '[java.net DatagramSocket
                   DatagramPacket
                   InetSocketAddress])

(defn send
  "Send a short textual message over a DatagramSocket to the specified
  host and port. If the string is over 512 bytes long, it will be
  truncated."
  [^DatagramSocket socket msg host port]
  (let [payload (.getBytes msg)
        length (min (alength payload) 512)
        address (InetSocketAddress. host port)
        packet (DatagramPacket. payload length address)]
    (.send socket packet)))

(defn receive
  "Block until a UDP message is received on the given DatagramSocket, and
  return the payload message as a string."
  [^DatagramSocket socket]
  (let [buffer (byte-array 512)
        packet (DatagramPacket. buffer 512)]
    (.receive socket packet)
    (String. (.getData packet)
             0 (.getLength packet))))

(defn receive-loop
  "Given a function and DatagramSocket, will (in another thread) wait
  for the socket to receive a message, and whenever it does, will call
  the provided function on the incoming message."
  [socket f]
  (future (while true (f (receive socket)))))

The send function is fairly straightforward—most of its content is devoted to constructing a byte array as a payload for the DatagramPacket and invoking constructor forms. The most interesting thing is its limitation of the payload size to 512 bytes, using the length argument to the DatagramPacket constructor. This is because it generally isn’t safe to attempt to send over 512 bytes of payload in a single UDP packet; although some network infrastructures may support it, others do not.

The receive function creates an incoming byte array, adds it to a mutable empty DatagramPacket instance, and invokes the DatagramSocket.receive method on the socket. When incoming data is received, the receive method will return after populating the instance of DatagramPacket. The Clojure code then constructs and returns a new String using the populated range of the byte array (that is, between 0 and the value reported by the DatagramPacket.getLength method).

Because the receive function blocks and only returns a single value, it isn’t particularly useful for accepting multiple messages or using from the REPL. receive-loop wraps the receive function, calling it repeatedly on a separate thread. Whenever it returns a value, it invokes the supplied function, then loops back to wait for more input.

To execute this code, you’ll first need to create an instance of DatagramSocket. At the REPL:

(def socket (DatagramSocket. 8888))
;; -> #'udp/socket

This creates a UDP socket on the specified port (in this case, 8888).

Next, start up a listener using the receive-loop function. For this example, simply pass it the println function so it will print out all received values:

(receive-loop socket println)
;; -> #<core$future_call$reify__6267@2783890e: :pending>

Then you can send a message! If you started the listener thread with receive-loop properly, you should see it print out the incoming message immediately:

(send socket "hello, world!" "localhost" 8888)
;; *out*
;; hello, world!
;;
;; -> nil

In this case, sending to localhost, the message transmission happens so quickly that the message is actually received before the send function even returns.

Discussion

Unlike TCP, UDP (the User Datagram Protocol) is an asynchronous protocol that makes no guarantees regarding the order in which messages arrive, whether their contents are correct, or even if they arrive at all. In exchange, UDP typically has a lower per-packet latency than protocols like TCP, since it does not need to perform error checking or recovery.

Before you decide to use UDP, make sure your application is designed to continue working even if packets are dropped or corrupted.

Because UDP uses asynchronous messages as its model, it is fairly easy to use core.async to wrap the raw DatagramSocket instances. core.async provides a very nice channel abstraction that lets you consume and produce inherently asynchronous events (such as UDP messages) in a clean, managed way.

Multicast UDP

UDP is also capable of sending the same datagram packet to multiple destinations using a technique called UDP multicast. To use multicast, create an instance of java.net.MulticastSocket instead of java.net.DatagramSocket.

A full explanation of how to use MulticastSocket is very well documented on Oracle’s website and would be redundant to reproduce here, since it is straightforward Java interop. After reading the preceding example, extending it to MulticastSocket should be relatively self-explanatory.



[17] As Australian songwriter Peter Allen so aptly put it: “Everything old is new again.”

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.38.92