Now that we understand the varying types of actor systems that can be created, what are some patterns of usage that we can define so that we can avoid making common mistakes when writing actor-based applications? Let’s look at a few of them.
One of the most difficult tasks in asynchronous programming is trying to capture context so that the state of the world at the time the task was started can be accurately represented at the time the task finishes. However, creating anonymous instances of Akka actors is a very simple and lightweight solution for capturing the context at the time the message was handled to be utilized when the tasks are successfully completed. They are like extras in the cast of a movie—helping provide realistic context to the primary actors who are working around them.
A great example is an actor that is sequentially handling messages in its mailbox but performing the tasks based on those messages off-thread with futures. This is a great way to design your actors in that they will not block waiting for responses, allowing them to handle more messages concurrently and increase your application’s performance. However, the state of the actor will likely change with every message.
Let’s define the boilerplate of this example. These are classes that will be reused for each of the iterations of our development process going forward. Note that all of this code is available in my GitHub repository, should you want to clone it and test yourself. First, we have a message telling our actor to retrieve the customer account balances for a particular customer ID:
case
class
GetCustomerAccountBalances
(
id
:
Long
)
Next, we have data transfer objects in which we return the requested account information. Because customers may or may not have any accounts of each type, and it is possible they may have more than one of any of the account types, we return Option
[List[(Long, BigDecimal)]]
in each case, where Long
represents an account identifier, and BigDecimal
represents a balance:
case
class
AccountBalances
(
val
checking
:
Option
[
List
[(
Long
,BigDecimal
)]],
val
savings
:
Option
[
List
[(
Long
,BigDecimal
)]],
val
moneyMarket
:
Option
[
List
[(
Long
,BigDecimal
)]])
case
class
CheckingAccountBalances
(
val
balances
:
Option
[
List
[(
Long
,BigDecimal
)]])
case
class
SavingsAccountBalances
(
val
balances
:
Option
[
List
[(
Long
,BigDecimal
)]])
case
class
MoneyMarketAccountBalances
(
val
balances
:
Option
[
List
[(
Long
,BigDecimal
)]])
I promised in the preface of this book that I would show how this ties back to Eric Evans’ concepts with domain-driven design. Look at the classes I have created to perform this work. We can consider the entire AccountService
to be a context bound, where an individual CheckingAccount
or SavingsAccount
is an entity. The number represented by the balance inside of one of those classes is a value. The checkingBalances
, savingsBalances
, and mmBalances
fields are aggregates, and the AccountBalances
return type is an aggregate root. Finally, Vaughn Vernon in his excellent “Implementing Domain-Driven Design” points to Akka as a possible implementation for an event-driven context bound. It is also quite easy to implement command query responsibility separation (per Greg Young’s specification) and event sourcing (using the open source eventsourced library) with Akka.
Finally, we have proxy traits that represent service interfaces. Just like with the Java best practice of exposing interfaces to services rather than the implementations of the classes, we will follow that convention here and define the service interfaces which can then be stubbed out in our tests:
trait
SavingsAccountsProxy
extends
Actor
trait
CheckingAccountsProxy
extends
Actor
trait
MoneyMarketAccountsProxy
extends
Actor
Let’s take an example of an actor that will act as a proxy to get a customer’s account information for a financial services firm from multiple data sources. Further, let’s assume that each of the subsystem proxies for savings, checking and money market account balances will optionally return a list of accounts and their balances of that kind for this customer, and we’ll inject those as dependencies to the retriever class. Let’s write some basic Akka actor code to perform this task:
import
scala.concurrent.ExecutionContext
import
scala.concurrent.duration._
import
akka.actor._
import
akka.pattern.ask
import
akka.util.Timeout
class
AccountBalanceRetriever
(
savingsAccounts
:
ActorRef
,
checkingAccounts
:
ActorRef
,
moneyMarketAccounts
:
ActorRef
)
extends
Actor
{
implicit
val
timeout
:
Timeout
=
100
milliseconds
implicit
val
ec
:
ExecutionContext
=
context
.
dispatcher
def
receive
=
{
case
GetCustomerAccountBalances
(
id
)
=>
val
futSavings
=
savingsAccounts
?
GetCustomerAccountBalances
(
id
)
val
futChecking
=
checkingAccounts
?
GetCustomerAccountBalances
(
id
)
val
futMM
=
moneyMarketAccounts
?
GetCustomerAccountBalances
(
id
)
val
futBalances
=
for
{
savings
<-
futSavings
.
mapTo
[
Option
[
List
[(
Long
,BigDecimal
)]]]
checking
<-
futChecking
.
mapTo
[
Option
[
List
[(
Long
,BigDecimal
)]]]
mm
<-
futMM
.
mapTo
[
Option
[
List
[(
Long
,BigDecimal
)]]]
}
yield
AccountBalances
(
savings
,
checking
,
mm
)
futBalances
map
(
sender
!
_
)
}
}
This code is fairly concise. The AccountBalanceRetriever
actor receives a message to get account balances for a customer, and then it fires off three futures in parallel. The first will get the customer’s savings account balance, the second will get the checking account balance, and the third will get a money market balance. Doing these tasks in parallel allows us to avoid the expensive cost of performing the retrievals sequentially. Also, note that while the futures will return Options
of some account balances by account ID, if they return None
, they will not short-circuit the for
comprehension—if None
is returned from futSavings
, it will still continue the for
comprehension.
However, there are a couple of things about it that are not ideal. First of all, it is using futures to ask other actors for responses, which creates a new PromiseActorRef
for every message sent behind the scenes. This is a waste of resources. It would be better to have our AccountBalanceRetriever
actor send messages out in a “fire and forget” fashion and collect results asynchronously into one actor.
Furthermore, there is a glaring race condition in this code—can you see it? We’re referencing the “sender” in our map operation on the result from futBalances
, which may not be the same ActorRef
when the future completes, because the AccountBalanceRetriever ActorRef
may now be handling another message from a different sender at that point!
Let’s focus on eliminating the need to ask for responses in our actor first. We can send the messages with the !
and collect responses into an optional list of balances by account number. But how would we go about doing that?
import
scala.concurrent.ExecutionContext
import
scala.concurrent.duration._
import
akka.actor._
class
AccountBalanceRetriever
(
savingsAccounts
:
ActorRef
,
checkingAccounts
:
ActorRef
,
moneyMarketAccounts
:
ActorRef
)
extends
Actor
{
val
checkingBalances
,
savingsBalances
,
mmBalances
:
Option
[
List
[(
Long
,BigDecimal
)]]
=
None
var
originalSender
:
Option
[
ActorRef
]
=
None
def
receive
=
{
case
GetCustomerAccountBalances
(
id
)
=>
originalSender
=
Some
(
sender
)
savingsAccounts
!
GetCustomerAccountBalances
(
id
)
checkingAccounts
!
GetCustomerAccountBalances
(
id
)
moneyMarketAccounts
!
GetCustomerAccountBalances
(
id
)
case
AccountBalances
(
cBalances
,
sBalances
,
mmBalances
)
=>
(
checkingBalances
,
savingsBalances
,
mmBalances
)
match
{
case
(
Some
(
c
),
Some
(
s
),
Some
(
m
))
=>
originalSender
.
get
!
AccountBalances
(
checkingBalances
,
savingsBalances
,
mmBalances
)
case
_
=>
}
}
}
This is better but still leaves a lot to be desired. First of all, we’ve created our collection of balances we’ve received back at the instance level, which means we can’t differentiate the aggregation of responses to a single request to get account balances. Worse, we can’t time out a request back to our original requestor. Finally, while we’ve captured the original sender as an instance variable that may or may not have a value (since there is no originalSender
when the AccountBalanceRetriever
starts up), we have no way of being sure that the originalSender
is who we want it to be when we want to send data back!
The problem is that we’re attempting to take the result of the off-thread operations of retrieving data from multiple sources and return it to whomever sent us the message in the first place. However, the actor will likely have moved on to handling additional messages in its mailbox by the time these futures complete, and the state represented in the AccountBalanceRetriever
actor for “sender” at that time could be a completely different actor instance. So how do we get around this?
The trick is to create an anonymous inner actor for each GetCustomerAccountBalances
message that is being handled. In doing so, you can capture the state you need to have available when the futures are fulfilled. Let’s see how:
import
scala.concurrent.ExecutionContext
import
scala.concurrent.duration._
import
akka.actor._
class
AccountBalanceRetriever
(
savingsAccounts
:
ActorRef
,
checkingAccounts
:
ActorRef
,
moneyMarketAccounts
:
ActorRef
)
extends
Actor
{
val
checkingBalances
,
savingsBalances
,
mmBalances
:
Option
[
List
[(
Long
,BigDecimal
)]]
=
None
def
receive
=
{
case
GetCustomerAccountBalances
(
id
)
=>
{
context
.
actorOf
(
Props
(
new
Actor
()
{
var
checkingBalances
,
savingsBalances
,
mmBalances
:
Option
[
List
[(
Long
,BigDecimal
)]]
=
None
val
originalSender
=
sender
def
receive
=
{
case
CheckingAccountBalances
(
balances
)
=>
checkingBalances
=
balances
isDone
case
SavingsAccountBalances
(
balances
)
=>
savingsBalances
=
balances
isDone
case
MoneyMarketAccountBalances
(
balances
)
=>
mmBalances
=
balances
isDone
}
def
isDone
=
(
checkingBalances
,
savingsBalances
,
mmBalances
)
match
{
case
(
Some
(
c
),
Some
(
s
),
Some
(
m
))
=>
originalSender
!
AccountBalances
(
checkingBalances
,
savingsBalances
,
mmBalances
)
context
.
stop
(
self
)
case
_
=>
}
savingsAccounts
!
GetCustomerAccountBalances
(
id
)
checkingAccounts
!
GetCustomerAccountBalances
(
id
)
moneyMarketAccounts
!
GetCustomerAccountBalances
(
id
)
}))
}
}
}
This is much better. We’ve captured the state of each receive and only send it back to the originalSender
when all three have values. But there are still two issues here. First, we haven’t defined how we can time out on the original request for all of the balances back to whomever requested them. Secondly, our originalSender
is still getting a wrong value—the “sender” from which it is assigned is actually the sender value of the anonymous inner actor, not the one that sent the original GetCustomerAccountBalances
message!
We can send ourselves a timeout message to handle our need to timeout the original request, by allowing another task to compete for the right to complete the operation with a timeout. This is a very clean way to allow the work to occur, while still enforcing timeout semantics on the request. If the data for all three of the account types is enqueued in the mailbox before the timeout message, the proper response of an AccountBalances
type is sent back to the original sender. However, if the timeout message from the scheduled task is enqueued before any one of those three responses, a timeout is returned to the client.
Note that I am using None
to represent only when I don’t have any data returned from one of my specific account type proxies. In the case where a customer is looked up and no data is found, I’m expecting to receive a response of Some(List())
, meaning no data was found for that customer in that account type. This way, I can semantically differentiate whether or not I’ve received a response and when no data was found.
For the sake of additional clarity, I am using the LoggingReceive
block in this example. This tells Akka to automatically log the handling of each message dequeued from the mailbox. It is a best practice to give yourself as much information as possible at runtime so you can debug your actors, and it can easily be turned off in the configuration file for the application. For more information, see the Akka online documentation.
import
scala.concurrent.ExecutionContext
import
scala.concurrent.duration._
import
org.jamieallen.effectiveakka.common._
import
akka.actor.
{
Actor
,
ActorRef
,
Props
,
ActorLogging
}
import
akka.event.LoggingReceive
object
AccountBalanceRetrieverFinal
{
case
object
AccountRetrievalTimeout
}
class
AccountBalanceRetrieverFinal
(
savingsAccounts
:
ActorRef
,
checkingAccounts
:
ActorRef
,
moneyMarketAccounts
:
ActorRef
)
extends
Actor
with
ActorLogging
{
import
AccountBalanceRetrieverFinal._
def
receive
=
{
case
GetCustomerAccountBalances
(
id
)
=>
{
log
.
debug
(
s
"Received GetCustomerAccountBalances for ID: $id from $sender"
)
val
originalSender
=
sender
context
.
actorOf
(
Props
(
new
Actor
()
{
var
checkingBalances
,
savingsBalances
,
mmBalances
:
Option
[
List
[(
Long
,BigDecimal
)]]
=
None
def
receive
=
LoggingReceive
{
case
CheckingAccountBalances
(
balances
)
=>
log
.
debug
(
s
"Received checking account balances: $balances"
)
checkingBalances
=
balances
collectBalances
case
SavingsAccountBalances
(
balances
)
=>
log
.
debug
(
s
"Received savings account balances: $balances"
)
savingsBalances
=
balances
collectBalances
case
MoneyMarketAccountBalances
(
balances
)
=>
log
.
debug
(
s
"Received money market account balances: $balances"
)
mmBalances
=
balances
collectBalances
case
AccountRetrievalTimeout
=>
sendResponseAndShutdown
(
AccountRetrievalTimeout
)
}
def
collectBalances
=
(
checkingBalances
,
savingsBalances
,
mmBalances
)
match
{
case
(
Some
(
c
),
Some
(
s
),
Some
(
m
))
=>
log
.
debug
(
s
"Values received for all three account types"
)
timeoutMessager
.
cancel
sendResponseAndShutdown
(
AccountBalances
(
checkingBalances
,
savingsBalances
,
mmBalances
))
case
_
=>
}
def
sendResponseAndShutdown
(
response
:
Any
)
=
{
originalSender
!
response
log
.
debug
(
"Stopping context capturing actor"
)
context
.
stop
(
self
)
}
savingsAccounts
!
GetCustomerAccountBalances
(
id
)
checkingAccounts
!
GetCustomerAccountBalances
(
id
)
moneyMarketAccounts
!
GetCustomerAccountBalances
(
id
)
import
context.dispatcher
val
timeoutMessager
=
context
.
system
.
scheduler
.
scheduleOnce
(
250
milliseconds
)
{
self
!
AccountRetrievalTimeout
}
}))
}
}
}
Now we can collect our results and check to see if we received the expected values and place them into the AccountBalances
result to return to the caller, while also cancelling the scheduled task so that it doesn’t waste resources. Finally, we must remember to stop our anonymous inner actor so that we do not leak memory for every GetCustomerAccountBalances
message we receive, regardless of whether we received all three responses or the timeout!
So why do we have to send the AccountRetrievalTimeout
message to ourselves, into the queue of our Extra
actor, rather than just sending it directly back to the originalSender
in our scheduleOnce
lambda? The scheduled task will run on another thread! If we perform work relative to cleaning up the actor on that thread, we’re introducing concurrency into the actor. While we are only telling our actor to stop itself after sending the message in this example, it would be very easy to fall into the trap of closing over some state and manipulating it if you do not send a message to yourself. There are other interfaces for scheduling that might make it more apparent for some that the operation is asynchronous, such as the method call style seen here:
val
timeoutMessager
=
context
.
system
.
scheduler
.
scheduleOnce
(
250
milliseconds
,
self
,
AccountRetrievalTimeout
)
You have to be vigilant about this. Sometimes, it can be very easy to fall into the trap of introducing concurrency into our actors where there never should be any. If you see yourself using curly braces inside of an actor, think about what is happening inside of there and what you might be closing over.
In an earlier version of this example, I tried to use a promise to perform this work, where either the successful result of the AccountBalances
type was put into the future inside of the promise, or the timeout failure was used to complete it. However, this is unnecessary complexity, as we can allow the ordering inside of the Extra
actor’s queue of when messages are enqueued to perform the same basic task. But also, you cannot return a future value from a promise—they cannot be sent to an actor, which may or may not be remote. And due to the beauty of location transparency, that is an implementation detail on which your actors should never focus.
Futures should never be passed between actors because you cannot serialize a thread.
So now that we have some code that we think will work, we need to write tests to prove that it does. If you’re a TDD-adherent, you’re probably mortified that I didn’t do that up front. I’m not dogmatic about when someone writes tests; I just care that the tests get written.
The first thing we have to do is define the test stubs that will be used in our tests and injected as dependencies to the retriever actor. These stubs can be very simple actors—when asked for account information of their type by a specific customer ID, each non-failure test case stub will return an optional list of balances by account ID. Data for each customer to be used in tests needs to be placed into a map to be found, and if no data is returned, we must return a value of Some(List())
to meet our API:
import
akka.actor.
{
Actor
,
ActorLogging
}
import
akka.event.LoggingReceive
class
CheckingAccountsProxyStub
extends
CheckingAccountsProxy
with
ActorLogging
{
val
accountData
=
Map
[
Long
,List
[(
Long
,BigDecimal
)]](
1L
->
List
((
3
,
15000
)),
2L
->
List
((
6
,
640000
),
(
7
,
1125000
),
(
8
,
40000
)))
def
receive
=
LoggingReceive
{
case
GetCustomerAccountBalances
(
id
:
Long
)
=>
log
.
debug
(
s
"Received GetCustomerAccountBalances for ID: $id"
)
accountData
.
get
(
id
)
match
{
case
Some
(
data
)
=>
sender
!
CheckingAccountBalances
(
Some
(
data
))
case
None
=>
sender
!
CheckingAccountBalances
(
Some
(
List
()))
}
}
}
class
SavingsAccountsProxyStub
extends
SavingsAccountsProxy
with
ActorLogging
{
val
accountData
=
Map
[
Long
,List
[(
Long
,BigDecimal
)]](
1L
->
(
List
((
1
,
150000
),
(
2
,
29000
))),
2L
->
(
List
((
5
,
80000
))))
def
receive
=
LoggingReceive
{
case
GetCustomerAccountBalances
(
id
:
Long
)
=>
log
.
debug
(
s
"Received GetCustomerAccountBalances for ID: $id"
)
accountData
.
get
(
id
)
match
{
case
Some
(
data
)
=>
sender
!
SavingsAccountBalances
(
Some
(
data
))
case
None
=>
sender
!
SavingsAccountBalances
(
Some
(
List
()))
}
}
}
class
MoneyMarketAccountsProxyStub
extends
MoneyMarketAccountsProxy
with
ActorLogging
{
val
accountData
=
Map
[
Long
,List
[(
Long
,BigDecimal
)]](
2L
->
List
((
9
,
640000
),
(
10
,
1125000
),
(
11
,
40000
)))
def
receive
=
LoggingReceive
{
case
GetCustomerAccountBalances
(
id
:
Long
)
=>
log
.
debug
(
s
"Received GetCustomerAccountBalances for ID: $id"
)
accountData
.
get
(
id
)
match
{
case
Some
(
data
)
=>
sender
!
MoneyMarketAccountBalances
(
Some
(
data
))
case
None
=>
sender
!
MoneyMarketAccountBalances
(
Some
(
List
()))
}
}
}
In the failure condition (represented by a timeout), a stub will simulate a long-running blocking database call that does not complete in time by never sending a response to the calling actor:
class
TimingOutSavingsAccountProxyStub
extends
SavingsAccountsProxy
with
ActorLogging
{
def
receive
=
LoggingReceive
{
case
GetCustomerAccountBalances
(
id
:
Long
)
=>
log
.
debug
(
s
"Forcing timeout by not responding!"
)
}
}
The following examples show how to write a test case for the successful return of AccountBalances
. Since this example uses stubbed out proxies to what would be services from which receive account information, it is trivial to inject test-only stub proxies that will cause the timeout functionality to occur.
We also want to be sure that the integrity of the context of each message handled is maintained by our retriever. To do this, we send multiple messages from different TestProbe
instances one after the other, and we verify that the different values were appropriately returned to each.
Note how I use the within block to verify the timing of expected responses. This is a great way to verify that your tests are executing to meet the nonfunctional requirements of your system. Use the within block to specify either a maximum time of execution, or as we see in the failure case, that we didn’t receive a response too early or too late.
Finally, we test the timeout condition by injecting a timing out stub into our retriever and making sure that the timeout response is what our test receives in response:
import
akka.testkit.
{
TestKit
,
TestProbe
,
ImplicitSender
}
import
akka.actor.
{
Actor
,
ActorLogging
,
ActorSystem
,
Props
}
import
org.scalatest.WordSpecLike
import
org.scalatest.matchers.MustMatchers
import
scala.concurrent.duration._
import
org.jamieallen.effectiveakka.common._
import
org.jamieallen.effectiveakka.pattern.extra.AccountBalanceRetrieverFinal._
class
ExtraFinalSpec
extends
TestKit
(
ActorSystem
(
"ExtraTestAS"
))
with
ImplicitSender
with
WordSpecLike
with
MustMatchers
{
"An AccountBalanceRetriever"
should
{
"return a list of account balances"
in
{
val
probe2
=
TestProbe
()
val
probe1
=
TestProbe
()
val
savingsAccountsProxy
=
system
.
actorOf
(
Props
[
SavingsAccountsProxyStub
],
"extra-success-savings"
)
val
checkingAccountsProxy
=
system
.
actorOf
(
Props
[
CheckingAccountsProxyStub
],
"extra-success-checkings"
)
val
moneyMarketAccountsProxy
=
system
.
actorOf
(
Props
[
MoneyMarketAccountsProxyStub
],
"extra-success-money-markets"
)
val
accountBalanceRetriever
=
system
.
actorOf
(
Props
(
new
AccountBalanceRetrieverFinal
(
savingsAccountsProxy
,
checkingAccountsProxy
,
moneyMarketAccountsProxy
)),
"extra-retriever"
)
within
(
300
milliseconds
)
{
probe1
.
send
(
accountBalanceRetriever
,
GetCustomerAccountBalances
(
1L
))
val
result
=
probe1
.
expectMsgType
[
AccountBalances
]
result
must
equal
(
AccountBalances
(
Some
(
List
((
3
,
15000
))),
Some
(
List
((
1
,
150000
),
(
2
,
29000
))),
Some
(
List
())))
}
within
(
300
milliseconds
)
{
probe2
.
send
(
accountBalanceRetriever
,
GetCustomerAccountBalances
(
2L
))
val
result
=
probe2
.
expectMsgType
[
AccountBalances
]
result
must
equal
(
AccountBalances
(
Some
(
List
((
6
,
640000
),
(
7
,
1125000
),
(
8
,
40000
))),
Some
(
List
((
5
,
80000
))),
Some
(
List
((
9
,
640000
),
(
10
,
1125000
),
(
11
,
40000
)))))
}
}
"return a TimeoutException when timeout is exceeded"
in
{
val
savingsAccountsProxy
=
system
.
actorOf
(
Props
[
TimingOutSavingsAccountProxyStub
],
"extra-timing-out-savings"
)
val
checkingAccountsProxy
=
system
.
actorOf
(
Props
[
CheckingAccountsProxyStub
],
"extra-timing-out-checkings"
)
val
moneyMarketAccountsProxy
=
system
.
actorOf
(
Props
[
MoneyMarketAccountsProxyStub
],
"extra-timing-out-money-markets"
)
val
accountBalanceRetriever
=
system
.
actorOf
(
Props
(
new
AccountBalanceRetrieverFinal
(
savingsAccountsProxy
,
checkingAccountsProxy
,
moneyMarketAccountsProxy
)),
"extra-timing-out-retriever"
)
val
probe
=
TestProbe
()
within
(
250
milliseconds
,
500
milliseconds
)
{
probe
.
send
(
accountBalanceRetriever
,
GetCustomerAccountBalances
(
1L
))
probe
.
expectMsg
(
AccountRetrievalTimeout
)
}
}
}
}
Now our test checks the success case and that the failure results in expected behavior. And because the AccountRetrievalTimeout
is a case object, it is a “term,” not a “type,” and I therefore can use the expectMsg()
method instead of expectMsgType[]
.
Asynchronous programming is simply not easy, even with powerful tools at our disposal. We always must think about the state we need and the context from which we get it at the time we need it.
The Extra Pattern does help in some scenarios, but it could easily be argued that it muddles your code by putting too many details in once place. It is also similar to lambdas, in that using an anonymous instance gives you less information in stack traces on the JVM, is harder to use with a debugging tool, and is easier to close over state.
The good news is that it is very easily fixed by using pre-defined types. You get information specific to the class that you created in stack traces without an obfuscated, generated name. And you can’t close over external state, because there is none: you have to pass the data into the class for it to be visible.
There is a time and a place for everything, and that includes lambdas and anonymous implementations of interfaces, just like we did in the Extra Pattern. When the code is trivial, you can generally use these constructs without fear. However, learn to notice when the code in such literals is crossing a threshold of complexity that will make it harder to reason about in production when failure occurs.
Those are good reasons for pulling the type you’re creating with the Extra Pattern into a pre-defined type of actor, where you create an instance of that type for every message handled. To do so, we can move the anonymous implementation of the actor trait out into its own type definition. This results in a type only used for simple interactions between actors, similar to a cameo role in the movies.
Let’s pull the anonymous implementation out and bind it to a type. How would that look?
import
scala.concurrent.ExecutionContext
import
scala.concurrent.duration._
import
org.jamieallen.effectiveakka.common._
import
akka.actor._
import
akka.event.LoggingReceive
object
AccountBalanceResponseHandler
{
case
object
AccountRetrievalTimeout
// Factory method for our actor Props
def
props
(
savingsAccounts
:
ActorRef
,
checkingAccounts
:
ActorRef
,
moneyMarketAccounts
:
ActorRef
,
originalSender
:
ActorRef
)
:
Props
=
{
Props
(
new
AccountBalanceResponseHandler
(
savingsAccounts
,
checkingAccounts
,
moneyMarketAccounts
,
originalSender
))
}
}
class
AccountBalanceResponseHandler
(
savingsAccounts
:
ActorRef
,
checkingAccounts
:
ActorRef
,
moneyMarketAccounts
:
ActorRef
,
originalSender
:
ActorRef
)
extends
Actor
with
ActorLogging
{
import
AccountBalanceResponseHandler._
var
checkingBalances
,
savingsBalances
,
mmBalances
:
Option
[
List
[(
Long
,BigDecimal
)]]
=
None
def
receive
=
LoggingReceive
{
case
CheckingAccountBalances
(
balances
)
=>
log
.
debug
(
s
"Received checking account balances: $balances"
)
checkingBalances
=
balances
collectBalances
case
SavingsAccountBalances
(
balances
)
=>
log
.
debug
(
s
"Received savings account balances: $balances"
)
savingsBalances
=
balances
collectBalances
case
MoneyMarketAccountBalances
(
balances
)
=>
log
.
debug
(
s
"Received money market account balances: $balances"
)
mmBalances
=
balances
collectBalances
case
AccountRetrievalTimeout
=>
log
.
debug
(
"Timeout occurred"
)
sendResponseAndShutdown
(
AccountRetrievalTimeout
)
}
def
collectBalances
=
(
checkingBalances
,
savingsBalances
,
mmBalances
)
match
{
case
(
Some
(
c
),
Some
(
s
),
Some
(
m
))
=>
log
.
debug
(
s
"Values received for all three account types"
)
timeoutMessager
.
cancel
sendResponseAndShutdown
(
AccountBalances
(
checkingBalances
,
savingsBalances
,
mmBalances
))
case
_
=>
}
def
sendResponseAndShutdown
(
response
:
Any
)
=
{
originalSender
!
response
log
.
debug
(
"Stopping context capturing actor"
)
context
.
stop
(
self
)
}
import
context.dispatcher
val
timeoutMessager
=
context
.
system
.
scheduler
.
scheduleOnce
(
250
milliseconds
)
{
self
!
AccountRetrievalTimeout
}
}
class
AccountBalanceRetriever
(
savingsAccounts
:
ActorRef
,
checkingAccounts
:
ActorRef
,
moneyMarketAccounts
:
ActorRef
)
extends
Actor
{
def
receive
=
{
case
GetCustomerAccountBalances
(
id
)
=>
val
originalSender
=
Some
(
sender
)
// I'm now using a factory method now from the companion object above!
val
handler
=
context
.
actorOf
(
AccountBalanceResponseHandler
.
props
(
savingsAccounts
,
checkingAccounts
,
moneyMarketAccounts
,
originalSender
),
"cameo-message-handler"
)
savingsAccounts
.
tell
(
GetCustomerAccountBalances
(
id
),
handler
)
checkingAccounts
.
tell
(
GetCustomerAccountBalances
(
id
),
handler
)
moneyMarketAccounts
.
tell
(
GetCustomerAccountBalances
(
id
),
handler
)
}
}
Note that now we have to use the tell
method on the ActorRefs
for the accounts so that we can pass the handler reference as the actor to receive all responses. But the code is much cleaner from having excised the anonymous actor implementation from the body of the AccountBalanceRetriever
.
Despite the fact that we’ve created a new instance of the AccountBalanceResponseHandler
for every request to get balances, I’m placing the AccountBalanceRetriever
’s sender into a local variable in the receive block before passing it to the new instance of the AccountBalanceResponseHandler
. Make certain you follow that pattern, since passing the sender ActorRef without first capturing it will expose your handler to the same problem (losing the sender of the message to whom we want to send our response) that we saw earlier where the sender ActorRef
changed.
Also note that by using a “named” type, AccountBalanceResponseHandler
, we’ll have more useful information when debugging because anonymous types are assigned names in the JVM which aren’t very easy to decipher. In my opinion, it is always preferrable to have named types over anonymous actors for this reason.
You may have noticed the comment in the AccountBalanceResponseHandler
companion object that I now have defined a props
method as a factory for my actor. See The Companion Object Factory Method for more details about why I have done this.
Testing this code is virtually identical to how we did it previously, and we can reuse the common stubs that we created as well. After we test the success case, we can inject a stub that will induce timeout to test the failure case:
import
akka.testkit.
{
TestKit
,
TestProbe
,
ImplicitSender
}
import
akka.actor.
{
Actor
,
ActorLogging
,
ActorSystem
,
Props
}
import
org.scalatest.WordSpecLike
import
org.scalatest.matchers.MustMatchers
import
scala.concurrent.duration._
import
org.jamieallen.effectiveakka.common._
import
org.jamieallen.effectiveakka.pattern.cameo.AccountBalanceResponseHandler._
class
CameoSpec
extends
TestKit
(
ActorSystem
(
"CameoTestAS"
))
with
ImplicitSender
with
WordSpecLike
with
MustMatchers
{
val
checkingAccountsProxy
=
system
.
actorOf
(
Props
[
CheckingAccountsProxyStub
],
"checkings"
)
val
moneyMarketAccountsProxy
=
system
.
actorOf
(
Props
[
MoneyMarketAccountsProxyStub
],
"money-markets"
)
"An AccountBalanceRetriever"
should
{
"return a list of account balances"
in
{
val
probe1
=
TestProbe
()
val
probe2
=
TestProbe
()
val
savingsAccountsProxy
=
system
.
actorOf
(
Props
[
SavingsAccountsProxyStub
],
"cameo-success-savings"
)
val
checkingAccountsProxy
=
system
.
actorOf
(
Props
[
CheckingAccountsProxyStub
],
"cameo-success-checkings"
)
val
moneyMarketAccountsProxy
=
system
.
actorOf
(
Props
[
MoneyMarketAccountsProxyStub
],
"cameo-success-money-markets"
)
val
accountBalanceRetriever
=
system
.
actorOf
(
Props
(
new
AccountBalanceRetriever
(
savingsAccountsProxy
,
checkingAccountsProxy
,
moneyMarketAccountsProxy
)),
"cameo-retriever1"
)
within
(
300
milliseconds
)
{
probe1
.
send
(
accountBalanceRetriever
,
GetCustomerAccountBalances
(
1L
))
val
result
=
probe1
.
expectMsgType
[
AccountBalances
]
result
must
equal
(
AccountBalances
(
Some
(
List
((
3
,
15000
))),
Some
(
List
((
1
,
150000
),
(
2
,
29000
))),
Some
(
List
())))
}
within
(
300
milliseconds
)
{
probe2
.
send
(
accountBalanceRetriever
,
GetCustomerAccountBalances
(
2L
))
val
result
=
probe2
.
expectMsgType
[
AccountBalances
]
result
must
equal
(
AccountBalances
(
Some
(
List
((
6
,
640000
),
(
7
,
1125000
),
(
8
,
40000
))),
Some
(
List
((
5
,
80000
))),
Some
(
List
((
9
,
640000
),
(
10
,
1125000
),
(
11
,
40000
)))))
}
}
"return a TimeoutException when timeout is exceeded"
in
{
val
savingsAccountsProxy
=
system
.
actorOf
(
Props
[
TimingOutSavingsAccountProxyStub
],
"cameo-timing-out-savings"
)
val
checkingAccountsProxy
=
system
.
actorOf
(
Props
[
CheckingAccountsProxyStub
],
"cameo-timing-out-checkings"
)
val
moneyMarketAccountsProxy
=
system
.
actorOf
(
Props
[
MoneyMarketAccountsProxyStub
],
"cameo-timing-out-money-markets"
)
val
accountBalanceRetriever
=
system
.
actorOf
(
Props
(
new
AccountBalanceRetriever
(
savingsAccountsProxy
,
checkingAccountsProxy
,
moneyMarketAccountsProxy
)),
"cameo-timing-out-retriever"
)
val
probe
=
TestProbe
()
within
(
250
milliseconds
,
500
milliseconds
)
{
probe
.
send
(
accountBalanceRetriever
,
GetCustomerAccountBalances
(
1L
))
probe
.
expectMsg
(
AccountRetrievalTimeout
)
}
}
}
}
When creating the stubs I use to inject mocks of services for the tests, I am not using the props()
companion object for those actors. Can you see why? In this case, the instantiation of the Props
instance for each stubbed actor is happening inside the context of a test, not another actor. So I don’t have to worry about closing over “this” from the test context.
The cameo pattern allows you to be explicit about the type of the actor that will perform the work for each GetCustomerAccountBalances
message sent to the AccountBalanceRetriever
, which I generally prefer. I also think it separates concerns nicely, whereas the extra pattern can begin to make your code more difficult to read because of the amount of extra code it has inside of the body of the AccountBalanceRetriever
.
3.147.2.111