Backpressure

The main mechanism by which core.async allows for coordinating backpressure is buffering. core.async doesn't allow unbounded buffers as this can be a source of bugs and a resource hog.

Instead, we are required to think hard about our application's unique needs and choose an appropriate buffering strategy.

Fixed buffer

This is the simplest form of buffering. It is fixed to a chosen number n, allowing producers to put items in the channel without having to wait for consumers:

(def result (chan (buffer 5)))
(go-loop []
  (<! (async/timeout 1000))
  (when-let [x (<! result)]
    (prn "Got value: " x)
    (recur)))

(go  (doseq [n (range 5)]
       (>! result n))
     (prn "Done putting values!")
     (close! result))

;; "Done putting values!"
;; "Got value: " 0
;; "Got value: " 1
;; "Got value: " 2
;; "Got value: " 3
;; "Got value: " 4

In the preceding example, we created a buffer of size 5 and started a go loop to consume values from it. The go loop uses a timeout channel to delay its start.

Then, we start another go block that puts numbers from 0 to 4 into the result channel and prints to the console once it's done.

By then, the first timeout will have expired and we will see the values printed to the REPL.

Now let's watch what happens if the buffer isn't large enough:

(def result (chan (buffer 2)))
(go-loop []
  (<! (async/timeout 1000))
  (when-let [x (<! result)]
    (prn "Got value: " x)
    (recur)))

(go  (doseq [n (range 5)]
       (>! result n))
     (prn "Done putting values!")
     (close! Result))
;; "Got value: " 0
;; "Got value: " 1
;; "Got value: " 2
;; "Done putting values!"
;; "Got value: " 3
;; "Got value: " 4

This time our buffer size is 2 but everything else is the same. As you can see the go loop finishes much later as it attempted to put another value in the result channel and was blocked/parked since its buffer was full.

As with most things, this might be OK but if we are not willing to block a fast producer just because we can't consume its items fast enough, we must look for another option.

Dropping buffer

A dropping buffer also has a fixed size. However, instead of blocking producers when it is full, it simply ignores any new items as shown here:

(def result (chan (dropping-buffer 2)))
(go-loop []
  (<! (async/timeout 1000))
  (when-let [x (<! result)]
    (prn "Got value: " x)
    (recur)))

(go  (doseq [n (range 5)]
       (>! result n))
     (prn "Done putting values!")
     (close! result))

;; "Done putting values!"
;; "Got value: " 0
;; "Got value: " 1

As before, we still have a buffer of size two, but this time the producer ends quickly without ever getting blocked. The dropping-buffer simply ignored all items over its limit.

Sliding buffer

A drawback of dropping buffers is that we might not be processing the latest items at a given time. For the times where processing the latest information is a must, we can use a sliding buffer:

(def result (chan (sliding-buffer 2)))
(go-loop []
  (<! (async/timeout 1000))
  (when-let [x (<! result)]
    (prn "Got value: " x)
    (recur)))

(go  (doseq [n (range 5)]
       (>! result n))
     (prn "Done putting values!")
     (close! result))

;; "Done putting values!"
;; "Got value: " 3
;; "Got value: " 4

As before, we only get two values but they are the latest ones produced by the go loop.

When the limit of the sliding buffer is overrun, core.async drops the oldest items to make room for the newest ones. I end up using this buffering strategy most of the time.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.78.83