Until now, we’ve covered coroutines, suspending functions, and how to deal with streams using Channels.
We’ve seen from the previous chapter that working with Channel
s imply starting coroutines to send and/or
receive from those Channel
s. The aforementioned coroutines are then hot entities which are sometimes
hard to debug, or can leak resources if they aren’t cancelled when they should be.
Flow
s, like channels, are meant to handle asynchronous streams of data, but at higher level of abstraction and with
better library tooling. Conceptually, Flow
s are similar to Sequence
s, except that each step of a flow can be
asynchronous. It is also easy to integrate flows in structured concurrency, to avoid leaking resources.
However, Flow
s1 aren’t meant to
replace channels. Channel
s are building blocks for flows. Channels are still appropriate in some architectures
such as in CSP (see Chapter 5). Nevertheless, you’ll see that flows suit most needs in asynchronous data
processing.
In this chapter, we’ll introduce you to cold and hot flows. You’ll see how cold flows can be a better choice when you want to make sure never to leak any resources. On the other hand, hot flows serve a different purpose such as when you need a “publish-subscribe” relationship between entities in your app. For example, you can implement an event-bus using hot flows.
The best way to understand flows is to see how they are used in real-life applications. This chapter will then go through a series of typical use-cases.
Lets re-implement the example given at the end of the previous chapter Example 5-7, using a Flow
:
fun
numbers
():
Flow
<
Int
>
=
flow
{
emit
(
1
)
emit
(
2
)
// emit other values
}
Several aspects are important to notice:
Instead of returning a Channel
instance, we’re returning a Flow
instance
Inside the flow, we use emit
suspending function instead of send
.
The numbers
function, which returns a Flow instance, isn’t a suspending function. Invoking the numbers
function
doesn’t start anything by itself - it just immediately returns a Flow instance.
To sum up, you define in the flow
block the emission of values. When invoked, the numbers
function quickly returns a
Flow
instance without running anything in the background.
On the consuming site:
fun
main
(
)
=
runBlocking
{
val
flow
=
numbers
(
)
flow
.
collect
{
println
(
it
)
}
}
We get an instance of Flow
, using the numbers
function.
Once we get a flow, instead of looping over it (like we would with a channel), we use the collect
function which
in flows parlance is called a terminal operator. We’ll extend on flows operators and terminal operators in
“Operators”.
For now, we can summarise the purpose of the collect
terminal operator: it consumes the
flow - e.g iterate over the flow and execute the given lambda on each element of the flow.
That’s it - you’ve seen the basic usage of a flow. As we mentioned earlier, we’ll now take a more realistic example, so
you’ll see the real interest of Flow
s.
A more realistic example
Imagine that you need to get tokens from a remote database2, then query additional data for each of those tokens. You need to do that only once in a while, so you decide not to maintain an active connection to the database (which could be expensive). So you create a connection only when fetching the data, and close it when you’re done.
Your implementation should first establish the connection to the database. Then, you get a token using a
suspending function getToken
. This getToken
function performs a request to the database and returns a token. Then,
you asynchronously get optional data associated with this token. In our example, this is done by invoking the
suspending function getData
, which takes a token as parameter.
Once you get the result of getData
, you wrap both the token and the result in one TokenData
class instance, defined as:
data
class
TokenData
(
val
token
:
String
,
val
opt
:
String
?
=
null
)
To sum-up, you need to produce a stream of TokenData
objects. This stream requires to first establish a database connection,
then perform asynchronous queries for retrieving tokens and getting associated data. You choose how many tokens you need.
When you’re done processing all tokens, you disconnect and release underlying database connection resources.
Figure 6-1 shows how you would implement such a flow.
In this chapter, we sometimes use images instead of code blocks, because the screenshots from our IDE shows suspension points (in the margin) and type hints, which are really helpful.
You can find the corresponding source code here.
Several aspects of this implementation are particularly important to notice:
Creating a connection to the database and closing it on completion is completely transparent to the client code which
consumes the flow. Client code only sees a flow of TokenData
.
All operations inside the flow are sequential. For example, once we get the first token (say “token1”), the flow
invokes getData("token1")
and suspends until it gets the result (say “data1”). Then, the flow emits the first
TokenData("token1", "data1")
. Only after, the execution proceeds with “token2”, etc.
Invoking getDataFlow
function does nothing on its own. It simply returns a flow. The code inside the flow
executes only when a coroutine collects the flow, as shown in Example 6-1.
If the coroutine that collects the flow get cancelled or reaches the end of the flow, the code inside onCompletion
block executes. This guarantees that we properly release the connection to the database.
fun
main
()
=
runBlocking
<
Unit
>
{
val
flow
=
getDataFlow
(
3
)
// Nothing runs at initialization
// A coroutine collects the flow
launch
{
flow
.
collect
{
data
->
println
(
data
)
}
}
}
As we already mentioned, collect
is a terminal operator that consumes all elements of the flow. In this
example, collect
invokes a function on each collected element of the flow (e.g, println(data)
is
invoked 3 times). We’ll cover other terminal operators in “Examples of cold flow usage”.
Until now, you’ve seen examples of flows that don’t run any code until a coroutine collect them. In flows parlance, they are cold flows.
If you need to perform transformations on a flow, much like you would do on collections, the coroutines library provides
functions such as map
, filter
, debounce
, buffer
, onCompletion
, etc. Those functions are called flow operator
or intermediate operator, as they operate on a flow and returns another flow. A regular operator shouldn’t be confused
with a terminal operator as you’ll see later.
For example, here is an example of usage of the map
operator:
fun
main
()
=
runBlocking
<
Unit
>
{
val
numbers
:
Flow
<
Int
>
=
// implementation hidden for brevity
val
newFlow
:
Flow
<
String
>
=
numbers
().
map
{
transform
(
it
)
}
}
suspend
fun
transform
(
i
:
Int
):
String
=
withContext
(
Dispatchers
.
Default
)
{
delay
(
10
)
// simulate real work
"${i + 1}"
}
The interesting bit here is that map
turns a Flow<Int>
into a Flow<String>
. The type of the resulting flow is
determined by the return type of the lambda passed to the operator.
The map
flow operator is conceptually really close to the map
extension function on collections. There’s a noticeable
difference though: the lambda passed to the map
flow operator can be a suspending function.
We’ll cover most of the common operators in a series of use cases in the next section.
A terminal operator can be easily distinguished from other regular operators since it’s a suspending function
which starts the collection of the flow. You’ve previously seen collect
.
Other terminal operators are available, like toList
, collectLatest
, first
, etc. Here is a brief description of
those terminal operators:
toList
collects the given flow and returns a List
containing all collected elements.
collectLatest
collects the given flow with a provided action. The difference from collect
is that when the
original flow emits a new value, the action block for previous value is cancelled.
first
returns the first element emitted by the flow and then cancels flow’s collection. It throws
NoSuchElementException
if the flow was empty. There’s also a variant: firstOrNull
which returns null
if the flow
was empty.
As it turns out, picking one single example making use of all possible operators isn’t the best path to follow. Instead, we’ll provide different use-cases, which will illustrate the usage of several flow operators.
Suppose that you’re developing a chat application. Your users can send messages to each other. A message has a date, a reference to the author of the message, and a content as plain text.
Here is a Message
:
data
class
Message
(
val
user
:
String
,
val
date
:
LocalDateTime
,
val
content
:
String
)
Unsurprisingly, we’ll represent the stream of messages as a flow of Message
instance. Every time a user posts
a message into the app, the flow will transmit that message. For now, assume that you can invoke a function
getMessageFlow
, which returns an instance of Flow<Message>
. With the Kotlin Flows library, you are able to create
your own custom flows. However, it makes the most sense to start with exploring how the flow API can be used
in common use cases.
fun
getMessageFlow
():
Flow
<
Message
>
{
// we'll implement it later
}
Now, suppose that you want to translate all messages from a given user in a different language, on the fly. Moreover, you’d like to perform the translation on a background thread.
To do that, you start by getting the flow of messages, by invoking getMessageFlow()
. Then, you apply operators to the
original flow, as shown in Example 6-3.
fun
getMessagesFromUser
(
user
:
String
,
language
:
String
)
:
Flow
<
Message
>
{
return
getMessageFlow
(
)
.
filter
{
it
.
user
=
=
user
}
.
map
{
it
.
translate
(
language
)
}
.
flowOn
(
Dispatchers
.
Default
)
}
The first operator, filter
, operates on the original flow and returns another flow of messages which all originate
from the same user
passed as parameter.
The second operator, map
, operates on the flow returned by filter
and returns a flow of translated messages.
From the filter
operator standpoint, the original flow (returned by getMessageFlow()
) is the upstream
flow, while the downstream flow is represented by all operators happening after filter
. The same
reasoning applies for all intermediate operators - they have their own relative upstream and downstream flow, as
illustrated in Figure 6-2.
At last, the flowOn
operator changes the context of the flow it operates on. It changes the coroutine context of
the upstream flow, while not affecting the downstream flow. Consequently, the steps 1 and 2 are done using the
Dispatchers.Default
dispatcher.
In other words, the upstream flow’s operators (which are filter
and map
) are now encapsulated: their
execution context will always be Dispatchers.Default
. It doesn’t matter in which context the resulting flow
will be collected, the previously mentioned operators will be executed using Dispatchers.Default
.
This is a highly important property of flow and it’s called context preservation. For example, imagine that
you’re collecting the flow on the UI thread of your application - typically, you would do that using the
viewModelScope
of a ViewModel
. It would be embarrassing if the context of execution of one of the flow’s
operator leaked downstream and affected in which thread the flow is ultimately collected. Thankfully, it will
never happen. For example, if you collect a flow on the UI thread, all values are emitted by a coroutine which uses
Dispatchers.Main
. All the necessary context switches are automatically managed for you.
Under the hood, flowOn
starts a new coroutine when it detects that the context is about to change. This new
coroutine interacts with the rest of the flow through a channel which is internally managed.
In flow parlance, an intermediate operator like map
operates on the upstream flow and returns another flow. From the
map
operator standpoint, the returned flow is the downstream flow.
The map
operator accepts a suspending function as transformation block. So if you wanted to only perform
message translation using Dispatchers.Default
(and not message filtering), you could remove the flowOn
operator
and declare the translate
function like so:
private
suspend
fun
Message
.
translate
(
language
:
String
):
Message
=
withContext
(
Dispatchers
.
Default
)
{
// this is a dummy implementation
copy
(
content
=
"translated content"
)
}
See how easy it is to offload parts of data transformation to other threads, while still having a big picture of the data flow.
As you see, the Flow API allows for a declarative way to express data transformation. When you invoke
getMessagesFromUser("Amanda", "en-us")
, nothing is actually running. All those transformations involve
intermediate operators, which will be triggered when the flow will be collected.
On the consuming site, if you need to act on each received message, you can use the collect
function like so:
fun
main
()
=
runBlocking
{
getMessagesFromUser
(
"Amanda"
,
"en-us"
).
collect
{
println
(
"Received message from ${it.user}: ${it.content}"
)
}
}
Now that we’ve shown how to transform the flow and consume it, we can provide an implementation for the flow itself -
the getMessageFlow
function. The signature of this function is to return a flow of Message
s. In that particular
situation, we can reasonably assume that the message machinery is actually a service which runs in its own thread.
We’ll name this service MessageFactory
.
Like most services of that kind, the message factory has a publish/subscribe mechanism - we can register or unregister observers for new incoming messages, as shown in Example 6-4.
abstract
class
MessageFactory
:
Thread
()
{
/* The internal list of observers must be thread-safe */
private
val
observers
=
Collections
.
synchronizedList
(
mutableListOf
<
MessageObserver
>())
private
var
isActive
=
true
override
fun
run
()
=
runBlocking
{
while
(
isActive
)
{
val
message
=
fetchMessage
()
for
(
observer
in
observers
)
{
observer
.
onMessage
(
message
)
}
delay
(
1000
)
}
}
abstract
fun
fetchMessage
():
Message
fun
registerObserver
(
observer
:
MessageObserver
)
{
observers
.
add
(
observer
)
}
fun
unregisterObserver
(
observer
:
MessageObserver
)
{
observers
.
removeAll
{
it
==
observer
}
}
fun
cancel
()
{
isActive
=
false
observers
.
forEach
{
it
.
onCancelled
()
}
observers
.
clear
()
}
interface
MessageObserver
{
fun
onMessage
(
msg
:
Message
)
fun
onCancelled
()
fun
onError
(
cause
:
Throwable
)
}
}
This implementation polls for new messages every second and notifies observers. Now the question is: how do we
turn a hot3 entity such as this MessageFactory
into a flow? MessageFactory
is also said to be callback-based,
because it holds references to MessageObserver
instances and call methods on those instances when new messages
are retrieved. To bridge the Flow world with the “callback” world, you can use the callbackFlow
flow builder.
This is how you can use it in this example:
fun
getMessageFlow
(
factory
:
MessageFactory
)
=
callbackFlow
<
Message
>
{
val
observer
=
object
:
MessageFactory
.
MessageObserver
{
override
fun
onMessage
(
msg
:
Message
)
{
offer
(
msg
)
}
override
fun
onCancelled
()
{
channel
.
close
()
}
override
fun
onError
(
cause
:
Throwable
)
{
cancel
(
CancellationException
(
"Message factory error"
,
cause
))
}
}
factory
.
registerObserver
(
observer
)
awaitClose
{
factory
.
unregisterObserver
(
observer
)
}
}
The callbackFlow
builder creates a cold flow which doesn’t perform anything until you invoke a terminal
operator. Let’s break it down. First off, it’s a parametrized function which returns a Flow
of the given type.
It’s always done in three steps:
callbackFlow
{
/*
1. Instantiate the "callback". In this case, it's an observer
2. Register that callback using the available api.
3. Listen for close event using `awaitClose`, and provide a
relevant action to take in this case. Most probably,
you'll have to unregister the callback.
*/
}
It’s worth having a look at the signature of callbackFlow
:
public
inline
fun
<
T
>
callbackFlow
(
@BuilderInference
noinline
block
:
suspend
ProducerScope
<
T
>.()
->
Unit
):
Flow
<
T
>
Don’t feel impressed by this. One key information here is that callbackFlow
takes a suspending function
with ProducerScope
receiver as argument. This means that inside the curly braces of the block following callbackFlow
,
you have a ProducerScope
instance as an implicit this
.
Here is the signature of ProducerScope
:
public
interface
ProducerScope
<
in
E
>
:
CoroutineScope
,
SendChannel
<
E
>
So a ProducerScope
is a SendChannel
. And that’s what you should remember: callbackFlow
provides you an
instance of SendChannel
, which you can use inside your implementation. You send to this channel the object
instances you get from your callback. This is what we do inside the step 1 of Example 6-5.
Sometimes, you have to apply a transformation on a collection or stream of objects, to get a new collection of transformed objects. When those transformations should be done asynchronously, then things start being a bit complicated. Not with flows!
Imagine that you have a list of Location
instances. Each location can be resolved to a Content
instance,
using the transform
function:
suspend
fun
transform
(
loc
:
Location
):
Content
=
withContext
(
Dispatchers
.
IO
)
{
// Actual implementation doesn't matter
}
So you are receiving Location
instances, and you have to transform them on the fly using the transform
function. However, processing one Location
instance might take quite some time. So you don’t want that
processing of a location to delay the transformation of the next incoming locations. In other words,
transformations should be done in parallel, like so:
In the above schema, we’ve limited the concurrency to 4 - e.g at most 4 locations can be transformed simultaneously at a given point in time.
This is how you would implement this behavior using flows:
You can find the corresponding source code here.
To understand what’s going on here, you should realize that locations.map{..}
returns a flow of flow (e.g, the type is
Flow<Flow<Content>>
). Indeed, inside the map{..}
operator, a new flow is created upon emission of a location by the
upstream flow (which is locationsFlow
).
Each of those created flows are of type Flow<Content>
and individually perform location transformation.
The last statement, flattenMerge
, merges all those created flows inside a new resulting Flow<Content>
(which we
assign to contentFlow
). Also, flattenMerge
has “concurrency” parameter. Indeed, it would most probably be
inappropriate to concurrently create and collect a flow everytime we receive a location. With a concurrency level of 4,
we ensure that no more than 4 flows will be collected at a given point in time. It’s handy in case of CPU-bound tasks,
when you know that your CPU won’t be able to transform more than 4 locations in parallel (assuming the CPU has 4 cores).
In other words, flattenMerge
’s concurrency level refers to how many operations/transformations will be done in
parallel at most at a given point in time.
Thanks to the suspending nature of flows, you get back pressure for free. New locations are collected from
locationsFlow
only when the machinery is available to process them. A similar mechanism could be implemented without
flows or coroutines, using a thread pool and a blocking queue. However, that would require considerably more lines of code.
As of this writing, the flattenMerge
operator is marked as @FlowPreview
in the source code, which means
that this declaration is in a preview state and can be changed in a backwards-incompatible manner with a
best-effort migration.
We hope that by the time we finish this book, the flow merging API will be stabilized. Otherwise, a similar
operator might replace flattenMerge
.
What happens in case of error?
If one of the transform
function raises an exception, the entire flow will be cancelled, and the exception will be
propagated downstream. While this a good default behavior, you might want to handle some exceptions right inside the
flow itself.
We’ll show how to do that in “Error handling”.
Final thoughts
Do you realize that we’ve just created a worker pool which concurrently transforms an incoming stream of objects, using only five lines of code?
You’re guaranteed that the flow machinery is thread-safe. No more headache figuring out the proper synchronization strategy to pass object references from a thread pool to a collecting thread.
You can easily tweak the concurrency level, which in this case means the maximum number of parallel transformations.
Even if a lot of flow operators are available out of the box, sometimes you’ll have to make your own. Thankfully, flows are composable, and it’s not that difficult to implement a custom reactive logic.
For example, by the time we write those lines, there’s no Flows operator equivalent of Project Reactor’s bufferTimeout.
So what is bufferTimeout
supposed to do? Imagine that you have an upstream flow of elements, but you want
to process those elements by batches and at a fixed maximum rate. The flow returned by bufferTimout
should buffer
elements and emit a list (batch) of elements when either:
The buffer is full, or
A predefined maximum time has elapsed (timeout)
Before going through the implementation, let’s talk about the key idea. The flow returned by bufferTimeout
should internally consume the upstream flow and buffer elements. When the buffer is full, or a timeout has
elapsed, the flow should emit the content of the buffer (a list). You can imagine that internally we’ll start
a coroutine which receives two types of events:
“An element has just been received from the upstream flow. Should we just add it to the buffer or also send the whole buffer?”
“Timout! Send the content of the buffer right now”
In the chapter Chapter 5 (CSP section), we’ve discussed a similar situation. The select
expression is
perfect for dealing with multiple events coming from several channels.
Now, we’re going to implement our bufferTimeout
flow operator:
You can find the corresponding source code here.
Explanation
First of all, the signature of the operator tells a lot. It’s declared as an extension function of Flow<T>
,
so you can use it like: upstreamFlow.bufferTimeout(10, 100)
. As for the return type, it’s Flow<List<T>>
.
Remember that you want to process elements by batches, so the flow returned by bufferTimeout
should return
elements as List<T>
.
Line 17. We’re using flow{}
builder. As a reminder, the builder provides you an instance of FlowCollector
,
and the block of code is an extension function with FlowCollector
as receiver type. In other words, you can
invoke emit
from inside the block of code.
Line 21. We’re using coroutineScope{}
because we’ll start new coroutines, which is only possible within a
CoroutineScope
.
Line 22. From our coroutine standpoint 4, received
elements should come from a ReceiveChannel
. So another inner coroutine should be started to consume the
upstream flow and send them over a channel. This is exactly the purpose of the produceIn
Flow
-operator.
Line 23. We need to generate “timeout” events. A library function already exists exactly for that purpose:
ticker
. It creates a channel that produces the first item after the given initial delay and subsequent
items with the given delay between them. As specified in the documentation, ticker
starts a new coroutine
eagerly, and we’re fully responsible for cancelling it.
Line 34. We’re using whileSelect
, which really is just syntax sugar for looping in a select
expression
while clauses return true
. Inside the whileSelect{}
, block you can see the logic of adding an element
to the buffer only if it’s not full, and emitting the whole buffer otherwise.
Line 46. When the upstream flow collection completes, the coroutine started with produceIn
still attempts
to read from that flow, and a ClosedReceiveChannelException
is raised. So we catch that exception, and
we know that we should emit the content of the buffer.
Lines 48 & 49. Channels are hot entities - they should be cancelled when they’re not supposed to be used anymore. As for
the ticker
, it should be cancelled too.
Usage
Figure 6-5 shows an example of how bufferTimeout
can be used.
You can find the corresponding source code here.
The output is:
139 ms: [1, 2, 3, 4] 172 ms: [5, 6, 7, 8] 223 ms: [9, 10, 11, 12, 13] 272 ms: [14, 15, 16, 17] 322 ms: [18, 19, 20, 21, 22] ... 1022 ms: [86, 87, 88, 89, 90] 1072 ms: [91, 92, 93, 94, 95] 1117 ms: [96, 97, 98, 99, 100]
As you can see, the upstream flow is emitting numbers from 1 to 100, with a delay of 10ms between each emission. As in this example we set a timeout of 50ms, each emitted list can contain at most 5 numbers.
Error handling is fundamental in reactive programming. If you’re familiar with RxJava, you probably handle
exceptions using the onError
callback of the subscribe
method:
// RxJava sample
someObservable
().
subscribe
(
{
value
->
/* Do something useful */
},
{
error
->
println
(
"Error: $error"
)
}
)
Using flows, you can handle errors using a combination of techniques, involving:
The classic try/catch
block
The catch
operator - we’ll cover this new operator right after discussing the try-catch
block.
If we define a dummy upstream flow made of only 3 Int
s, and purposely throw an exception inside the collect{}
block, we can catch the exception by wrapping the whole chain in a try/catch
block:
You can find the corresponding source code here.
The output is:
Received 1 Received 2 Caught java.lang.RuntimeException
It is important to note that try/catch
also works when the exception is raised from inside the upstream flow.
For example, if we change the definition of the upstream flow to:
You can find the corresponding source code here.
We get the exact same result.
However, if you try to intercept an exception in the flow itself, you’re likely to get unexpected results. Here is an example:
// Warning: DON'T DO THIS, this flow swallows downstream exceptions
val
upstream
:
Flow
<
Int
>
=
flow
{
for
(
i
in
1.
.
3
)
{
try
{
emit
(
i
)
}
catch
(
e
:
Throwable
)
{
println
(
"Intercept downstream exception $e"
)
}
}
}
fun
main
()
=
runBlocking
{
try
{
upstream
.
collect
{
value
->
println
(
"Received $value"
)
check
(
value
<=
2
)
{
"Collected $value while we expect values below 2"
}
}
}
catch
(
e
:
Throwable
)
{
println
(
"Caught $e"
)
}
}
In this example, we’re using the flow
builder to define upstream
, and we wrapped emit
invocation
inside a try-catch
statement. Even if it seems useless because emit
isn’t throwing exceptions, it could
make sense with a non-trivial emission logic nevertheless. At the consuming site, in main
function, we collect
that flow, and we check that we don’t get values strictly greater than 2. Otherwise, the catch
block should
print Caught java.lang.IllegalStateException Collected x while we expect values below 2
.
We expect the following output:
Received 1 Received 2 Caught java.lang.IllegalStateException: Collected 3 while we expect values below 2
However, this is what we actually get:
Received 1 Received 2 Received 3 Intercept downstream exception java.lang.IllegalStateException: Collected 3 while we expect values below 2
Despite the exception raised by check(value <= 2) {..}
, that exception gets caught not
by the try-catch
statement of main
function, but by the try-catch
statement of the flow.
A try-catch
statement inside a Flow builder might catch downstream exceptions - which
includes exceptions raised during the collection of the flow.
Separation of concern is important
A flow implementation shouldn’t have side effect on the code which collects that flow. Likewise, the code that collects a flow shouldn’t be aware of the implementation details of the upstream flow. A flow should always be transparent to exceptions: it should propagate exceptions coming from a collector. In other words, a flow should never swallow downstream exceptions.
Throughout this book, we’ll refer to exception transparency to designate a flow which is transparent to exception.
Exception transparency violation
The previous example was an example of exception transparency violation. Trying to emit values from inside a
try-catch
block is another violation. Here is an example (again, don’t do this!):
val
violatesExceptionTransparency
:
Flow
<
Int
>
=
flow
{
for
(
i
in
1.
.
3
)
{
try
{
emit
(
i
)
}
catch
(
e
:
Throwable
)
{
emit
(-
1
)
}
}
}
fun
main
()
=
runBlocking
{
try
{
violatesExceptionTransparency
.
collect
{
value
->
check
(
value
<=
2
)
{
"Collected $value"
}
}
}
catch
(
e
:
Throwable
)
{
println
(
"Caught $e"
)
}
}
The output is:
Caught java.lang.IllegalStateException: Flow exception transparency is violated: Previous 'emit' call has thrown exception java.lang.IllegalStateException: Collected 3, but then emission attempt of value '-1' has been detected. Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead. For a more detailed explanation, please refer to Flow documentation.
Summary
The try-catch
block should only be used to surround the collector, to handle exceptions raised from the collector
itself, or (possibly though not ideal) exceptions raised from the flow.
To handle exceptions inside the flow, you should use the catch
operator.
The catch
operator allows for a declarative style of catching exceptions. It catches all upstream exceptions. By all
exceptions, we mean that it even catches Throwable
s. Since it only catches upstream exceptions, the catch
operator
doesn’t have the exception issue of the try-catch
block.
Declarative style
You can find the corresponding source code here.
The output is:
Received 1 Received 2 Caught java.lang.RuntimeException
As you can see, the flow raises a RuntimeException
if it’s passed a value greater than 2. Right after, in the
catch
operator, we print in the console. However, the collector never get the value 3. So the catch
operator
automatically cancels the flow.
Exception transparency
From inside this operator, you can only catch upstream exceptions - when we say upstream, we mean relatively
to the catch
operator. To show what we mean, we’ll pick an example where the collector throws an exception
before the flow internally throws another exception. The collector should be able to catch the raised exception
(the exception shouldn’t be caught by the flow):
You can find the corresponding source code here.
In this example, the collector throws a RuntimeException
if it collects a value greater than 2. The collection
logic is wrapped in a try-catch
statement because we don’t want our program to crash and log the exception.
The flow, internally raises a NumberformatException
if the value is negative. The catch
operator acts as a
safeguard (logs the exception and cancels the flow).
The output is:
Received 0 Collector stopped collecting the flow
As you can see, the flow did not intercept the exception raised inside the collector because it was caught in the catch
clause of the try-catch
. The flow never got to raise a NumberformatException
because the
collector prematurely cancelled the collection.
Another example
In “Use-case #2: Concurrently transform a stream of values”, we hold off on talking about error handling. Suppose the transform
function might raise
exceptions, among of which NumberFormatException
. You can selectively handle NumberFormatException
using the
catch
operator:
fun
main
(
)
=
runBlocking
{
// Defining the Flow of Content - nothing is executing yet
val
contentFlow
=
locationsFlow
.
map
{
loc
-
>
flow
{
emit
(
transform
(
loc
)
)
}
.
catch
{
cause
:
Throwable
-
>
if
(
cause
is
NumberFormatException
)
{
println
(
"Handling $cause"
)
}
else
{
throw
cause
}
}
}
.
flattenMerge
(
4
)
// We now collect the entire flow using the toList terminal operator
val
contents
=
contentFlow
.
toList
(
)
}
As the catch
operator catches Throwable
s, we need to check the type of the error. If the error is a
NumberFormatException
, then we handle inside the if
statement. You add there other checks for different error
type.
Otherwise, you don’t know the error’s type. In most cases, it’s preferable to not swallow the error and re-throw.
You can use emit
from inside catch
Sometimes, it will make sense to emit a particular value when you catch an exception from inside the flow:
You can find the corresponding source code here.
The output is:
Received 1 Received 3 Received 0
Emitting values from inside catch
is especially useful to materialize exceptions.
Materializing5 exceptions is the process of catching exceptions and emitting special values or objects which represent those exceptions. The goal is to avoid throwing exceptions from inside the flow, because code execution then goes to whatever place which collects that flow. It doesn’t matter if collection code handles exceptions thrown by the flow or not. If the flow throws exceptions, the collection code needs to be aware of those exceptions and catch them in order to avoid undefined behavior. Consequently, the flow has a side-effect on the collection code, and this is a violation of the exception transparency principle.
The collection code shouldn’t be aware of implementation details of the flow. For example, if the flow is a Flow<Number>
,
you should only expect to get Number
values (or subtypes) - not exceptions.
Let’s take another example. Imagine you’re fetching images, given their URLs. You have an incoming flow of URLs:
// We don't use realistic URLs for brevity
val
urlFlow
=
flowOf
(
"url-1"
,
"url-2"
,
"url-retry"
)
You also have this function already available:
suspend
fun
fetchImage
(
url
:
String
):
Image
{
// Simulate some remote call
delay
(
10
)
// Simulate an exception thrown by the server or API
if
(
url
.
contains
(
"retry"
))
{
throw
IOException
(
"Server returned HTTP response code 503"
)
}
return
Image
(
url
)
}
data
class
Image
(
val
url
:
String
)
This fetchImage
function may throw IOException
s. In order to craft a “flow of images” using the
urlFlow
, and the fetchImage
function, you should materialize IOException
s. Regarding the
fetchImage
function, it either succeeds or fails - you either get an Image
instance, or an exception
is thrown. You can represent these outcomes by a Result
type, with Success
and Error
subclasses6:
sealed
class
Result
data
class
Success
(
val
image
:
Image
)
:
Result
()
data
class
Error
(
val
url
:
String
)
:
Result
()
In the case of a success, we wrap the actual result - the Image
instance. In the case of failure, we felt
appropriate to wrap the URL for which image retrieval failed. However, you’re free to wrap all data that
might be useful for the collection code, such as the exception itself.
Now you can encapsulate fetchImage
usage, by creating a fetchResult
function which returns Result
instances:
suspend
fun
fetchResult
(
url
:
String
):
Result
{
println
(
"Fetching $url.."
)
return
try
{
val
image
=
fetchImage
(
url
)
Success
(
image
)
}
catch
(
e
:
IOException
)
{
Error
(
url
)
}
}
Finally, you can implement a resultFlow
(see below) and collect it safely:
fun
main
()
=
runBlocking
{
val
urlFlow
=
flowOf
(
"url-1"
,
"url-2"
,
"url-retry"
)
val
resultFlow
=
urlFlow
.
map
{
url
->
fetchResult
(
url
)
}
val
results
=
resultFlow
.
toList
()
println
(
"Results: $results"
)
}
The output is:
Fetching url-1.. Fetching url-2.. Fetching url-retry.. Results: [Success(image=Image(url=url-1)), Success(image=Image(url=url-2)), Error(url=url-retry)]
A bonus
Imagine that you’d like automatically retry fetching an image in case of error. You can implement a custom
flow operator that retries an action
while the predicate
returns true:
fun
<
T
,
R
:
Any
>
Flow
<
T
>.
mapWithRetry
(
action
:
suspend
(
T
)
->
R
,
predicate
:
suspend
(
R
,
attempt
:
Int
)
->
Boolean
)
=
map
{
data
->
var
attempt
=
0L
var
shallRetry
:
Boolean
var
lastValue
:
R
?
=
null
do
{
val
tr
=
action
(
data
)
shallRetry
=
predicate
(
tr
,
++
attempt
)
if
(!
shallRetry
)
lastValue
=
tr
}
while
(
shallRetry
)
return
@map
lastValue
}
If, you’d like to retry e.g at most 3 times before returning an error, you can use this operator like so:
fun
main
()
=
runBlocking
{
val
urlFlow
=
flowOf
(
"url-1"
,
"url-2"
,
"url-retry"
)
val
resultFlowWithRetry
=
urlFlow
.
mapWithRetry
(
{
url
->
fetchResult
(
url
)
},
{
value
,
attempt
->
value
is
Error
&&
attempt
<
3L
}
)
val
results
=
resultFlowWithRetry
.
toList
()
println
(
"Results: $results"
)
}
The output is:
Fetching url-1.. Fetching url-2.. Fetching url-retry.. Fetching url-retry.. Fetching url-retry.. Results: [Success(image=Image(url=url-1)), Success(image=Image(url=url-2)), Error(url=url-retry)]
Previous implementations of flow were cold: nothing runs until you start collecting the flow. This is made possible because for each emitted value, only one collector would get the value. Therefore, there’s no need to run anything until the collector is ready to collect the values.
However, what if you need to share emitted values among several collectors? For example, imagine an event in your app, like when a file download completes. You might want to notify various components, such as some view-models, repositories, or even some views directly. Your file downloader might not have to be aware of the existence of other parts of your app. A good separation of concerns starts with a loose coupling of classes, and the event-bus is one architecture pattern which helps in this situation.
The principle is simple: the downloader emits an event (an instance of a class, optionally holding some state) by giving
it to the event-bus, and all subscribers subsequently receive that event. A SharedFlow
can act just like that, as
shown in Figure 6-6.
A SharedFlow
broadcasts events to all its subscribers.
Actually, SharedFlow
really is a toolbox which can be used in many situations - not just to implement an event-bus.
Before giving examples of usage, we’ll show how to create a SharedFlow
and how you can tune it.
Create a SharedFlow
In its simplest usage, you invoke MutableSharedFlow()
with no parameter. As its name suggests, you can mutate its
state, by sending values to it. A common pattern when creating a SharedFlow
is to create a private mutable version and
a public non-mutable one using asSharedFlow()
, as shown in Example 6-6.
This pattern is useful when you ensure that subscribers will only be able to read (e.g not send values) the flow.
You might be surprised to find that MutableSharedFlow
isn’t a class. It’s actually a function which accepts parameters,
which we’ll cover later in this chapter. For now, we’re only showing the default no-arg version of MutableSharedFlow
.
Register a subscriber
A subscriber registers when it starts collecting the SharedFlow
- preferably the public non-mutable version:
scope
.
launch
{
sharedFlow
.
collect
{
data
->
println
(
data
)
}
}
A subscriber can only live in a scope, because the collect
terminal operator is a suspending function. This is good
for structured concurrency: if the scope is cancelled, so is the subscriber.
Send values to the SharedFlow
A MutableSharedFlow
exposes two methods to emit values: emit
and tryEmit
:
emit
- suspends under some conditions, see below.
tryEmit
- never suspends. Tries to emit the value immediately.
Why are there two methods to emit values? This is because by default, when a MutableSharedFlow
emits a value using
emit
, it suspends until all subscribers start processing the value. We will give an example of emit
usage in
“Using SharedFlow to stream data”.
However, sometimes this isn’t what you want to do. You’ll find situations where you have to emit values from
non-suspending code (see “Using SharedFlow as an event-bus”). So here comes tryEmit
, which tries to emit a value immediately
and returns true
if it succeeded, and false
otherwise. We’ll provide more details on the nuances between emit
and
tryEmit
in the upcoming sections.
When sharing a state, a state flow:
Shares only one value: the current state.
Replays the state. Indeed, subscribers should get the last state even if subscribe afterwards.
Emits an initial value - much like a LiveData
has an initial value.
Emits new values only when the state changes.
As you’ve learned previously, this behavior can be achieved using a shared flow, as shown in Example 6-12.
StateFlow
8 is a shorthand for the above code. In practice, all you have to write is:
val
state
=
MutableStateFlow
(
initialValue
)
An example of StateFlow
usage
Imagine that you have a download service which can emit three possible download states: download started, downloading, and download finished, as shown in Figure 6-10.
Exposing a flow from an Android service can be done in several ways. If you need high decoupling for e.g testability purposes, a DI-injected “repository” object can expose the flow. The repository is then injected in all components which need to subscribe. Or, the service can statically expose the flow in a companion object. This induces tight coupling between all components which use the flow. However, it might be acceptable in a small app or for demo purpose such as in the following example.
class
DownloadService
:
Service
()
{
companion
object
{
private
val
_downloadState
=
MutableStateFlow
<
ServiceStatus
>(
Stopped
)
val
downloadState
=
_downloadState
.
asStateFlow
()
}
// Rest of the code hidden for brevity
}
sealed
class
ServiceStatus
object
Started
:
ServiceStatus
()
data
class
Downloading
(
val
progress
:
Int
)
:
ServiceStatus
()
object
Stopped
:
ServiceStatus
()
Internally, the service can update its state by using, e.g _downloadState.tryEmit(Stopped)
.
When declared inside a companion object, the state flow can be easily accessed from a view-model, and exposed as a
LiveData
using asLiveData()
:
class
DownloadViewModel
:
ViewModel
()
{
val
downloadServiceStatus
=
DownloadService
.
downloadState
.
asLiveData
()
}
Subsequently, a view can subscribe to the LiveData
:
class
DownloadFragment
:
Fragment
(
)
{
private
val
viewModel
:
DownloadViewModel
by
viewModels
(
)
override
fun
onCreate
(
savedInstanceState
:
Bundle
?
)
{
super
.
onCreate
(
savedInstanceState
)
viewModel
.
downloadServiceStatus
.
observe
(
this
)
{
it
?.
also
{
onDownloadServiceStatus
(
it
)
}
}
}
private
fun
onDownloadServiceStatus
(
status
:
ServiceStatus
)
:
Nothing
=
when
(
status
)
{
Started
-
>
TODO
(
"Show download is a about to start"
)
Stopped
-
>
TODO
(
"Show download stopped"
)
is
Downloading
-
>
TODO
(
"Show progress"
)
}
}
We subscribe to the LiveData
. If we receive a non-null value, then we invoke onDownloadServiceStatus
method.
We are purposely using when
as an expression, so that the Kotlin compiler guarantees that all possible types of
ServiceStatus
are taken in account.
You might be wondering why we used a state flow, and why we haven’t used a LiveData
in the first place - eliminating
the need of asLiveData()
in the view-model.
The reason is simple. LiveData
is Android specific. It’s a lifecycle-aware component which is meaningful when used
within Android views. You might design your application with Kotlin multiplatform in mind. When targeting Android and
iOS, only multiplatform code can be shared as common code. The coroutine library is multiplatform. LiveData
isn’t.
However, even when not considering Kotlin multiplatform, the Flows api makes more sense since it provides greater flexibility with all its flows operators.
The Flows api allows for asynchronous data stream transformation. A lot of operators are already available out of the box, which cover most use-cases.
Thanks to the composable nature of flow operators, you can fairly easily design your own, if you need to.
Some parts of the flow can be offloaded to a background thread or thread pool, yet with keeping a high-level view of data transformation.
A shared flow broadcasts values to all its subscribers. You can enable buffering and/or replay of values. Shared flows really are a tool box. You can use them as an event-bus for one-time events, or in more complex interactions between components.
When a component shares its state, a special kind of shared flow is appropriate for use: state flow. It replays the last state for new subscribers and only notifies subscribers when the state changes.
1 We’ll refer to Flow
s as to flows in the rest of this chapter.
2 A token is generally encrypted registration data which the client application stores in memory so that further database access don’t require explicit authentication.
3 As opposed to cold, a hot entity lives on its own until explicitely stopped.
4 The coroutine started with coroutineScope{}
.
5 “Materialize” comes from and the Rx operator of the same name. See http://introtorx.com/Content/v1.0.10621.0/08_Transformation.html#MaterializeAndDematerialize for more insight.
6 Which is an Algebraic Data Type
7 “DAO” stands for Data Access Object.
8 Actually, StateFlow
is a SharedFlow
under the hood.
18.209.66.87