The main mechanism by which core.async
allows for coordinating backpressure is buffering. core.async
doesn't allow unbounded buffers as this can be a source of bugs and a resource hog.
Instead, we are required to think hard about our application's unique needs and choose an appropriate buffering strategy.
This is the simplest form of buffering. It is fixed to a chosen number n
, allowing producers to put items in the channel without having to wait for consumers:
(def result (chan (buffer 5))) (go-loop [] (<! (async/timeout 1000)) (when-let [x (<! result)] (prn "Got value: " x) (recur))) (go (doseq [n (range 5)] (>! result n)) (prn "Done putting values!") (close! result)) ;; "Done putting values!" ;; "Got value: " 0 ;; "Got value: " 1 ;; "Got value: " 2 ;; "Got value: " 3 ;; "Got value: " 4
In the preceding example, we created a buffer of size 5
and started a go
loop to consume values from it. The go
loop uses a timeout
channel to delay its start.
Then, we start another go block that puts numbers from 0 to 4 into the result channel and prints to the console once it's done.
By then, the first timeout will have expired and we will see the values printed to the REPL.
Now let's watch what happens if the buffer isn't large enough:
(def result (chan (buffer 2))) (go-loop [] (<! (async/timeout 1000)) (when-let [x (<! result)] (prn "Got value: " x) (recur))) (go (doseq [n (range 5)] (>! result n)) (prn "Done putting values!") (close! Result)) ;; "Got value: " 0 ;; "Got value: " 1 ;; "Got value: " 2 ;; "Done putting values!" ;; "Got value: " 3 ;; "Got value: " 4
This time our buffer size is 2
but everything else is the same. As you can see the go
loop finishes much later as it attempted to put another value in the result channel and was blocked/parked since its buffer was full.
As with most things, this might be OK but if we are not willing to block a fast producer just because we can't consume its items fast enough, we must look for another option.
A dropping buffer also has a fixed size. However, instead of blocking producers when it is full, it simply ignores any new items as shown here:
(def result (chan (dropping-buffer 2))) (go-loop [] (<! (async/timeout 1000)) (when-let [x (<! result)] (prn "Got value: " x) (recur))) (go (doseq [n (range 5)] (>! result n)) (prn "Done putting values!") (close! result)) ;; "Done putting values!" ;; "Got value: " 0 ;; "Got value: " 1
As before, we still have a buffer of size two, but this time the producer ends quickly without ever getting blocked. The dropping-buffer
simply ignored all items over its limit.
A drawback of dropping buffers is that we might not be processing the latest items at a given time. For the times where processing the latest information is a must, we can use a sliding buffer:
(def result (chan (sliding-buffer 2))) (go-loop [] (<! (async/timeout 1000)) (when-let [x (<! result)] (prn "Got value: " x) (recur))) (go (doseq [n (range 5)] (>! result n)) (prn "Done putting values!") (close! result)) ;; "Done putting values!" ;; "Got value: " 3 ;; "Got value: " 4
As before, we only get two values but they are the latest ones produced by the go
loop.
When the limit of the sliding buffer is overrun, core.async
drops the oldest items to make room for the newest ones. I end up using this buffering strategy most of the time.
18.116.49.247