Handling failure

In order to make the mixing step more realistic we'll add a couple of exceptions and throw them in randomly at the mixing stage. This will simulate hardware failures appearing at unpredicted times. The mixer can throw one of the three exceptions the same way it did before in the actor-based examples in the previous two chapters:

object MotorOverheatException extends Exception
object SlowRotationSpeedException extends Exception
object StrongVibrationException extends Exception

val exceptions = Seq(MotorOverheatException,
SlowRotationSpeedException,
StrongVibrationException)

The realistic mix method could look like this:

private def mix(g: Groceries) = {
if (Random.nextBoolean()) throw exceptions(Random.nextInt(exceptions.size))
Thread.sleep(mixTime.toMillis)
import g._
Dough(eggs * 50 + flour + sugar + chocolate)
}

There are a couple of different ways the exceptions can be dealt with. The most straightforward approach would be to catch them directly in the logic: 

private def mix(g: Groceries) = try {
if (Random.nextBoolean()) throw exceptions(Random.nextInt(exceptions.size))
Thread.sleep(mixTime.toMillis)
import g._
Dough(eggs * 50 + flour + sugar + chocolate)
} catch {
case SlowRotationSpeedException =>
Thread.sleep(mixTime.toMillis * 2)
import g._
Dough(eggs * 50 + flour + sugar + chocolate)
}

In the case of slow rotation, we decide to ignore the issue, keep the mixed stuff, and just give the mixer double the time to finish the mixing.

This works but it has an obvious disadvantage that we tangle our business and error-handling implementations. More often then not this is undesirable because the code for both aspects usually have different natures. The happy path contains business-related code and error handling is of a technical essence. Therefore it is usually preferred to separate these code paths. In our case, it is justified to handle the failure at the stage level because we don't want to drop the element of the stream.

Akka offers alternative ways to specify failure handling. One of them is recovery logic which can be defined for the stage so that failure is converted into the final element of the stream:

def subMixFlow: Flow[Groceries, Dough, NotUsed] =
Flow[Groceries].async("mixers-dispatcher", 1).map(mix).recover {
case MotorOverheatException => Dough(0)
}

Here we decide to return an empty bowl of dough in the case of motor failure. The stream is then completed but this is fine in our case because our mixers are one-off sub-flows anyway.

The recover method is a special case of the recoverWithRetries. The latter accepts not only a partial function for decision-making but also a number of retries in the case multiple failures happen in the same processing stage.

Now we are only missing a decision as to how to handle the StrongVibrationException. If we decide not to handle it, the default behavior will be able to stop the whole stream. If that happens, the downstream stages will get informed about the failure and upstream stages will be cancelled. 

We definitely don't want to close our bakery in case one of our mixers vibrates too much. Quite the opposite; we'd like to ignore this completely. Some stages support a defining supervision strategy the same way actors do. We can use this possibility to define a common error-handling behavior. First, we need to define a decision strategy:

val strategy: Supervision.Decider = {
case StrongVibrationException ⇒ Supervision.resume
case _ => Supervision.Stop
}

There are three strategies available—stop, restart, and resume:

  • The stopping strategy is the default one and it will stop the processing stage and propagate the failure of up and downstream stages.
  • The resuming strategy just drops the current element and the stream continues.
  • Restart is similar to resume—it drops current element and the stream continues but before that the stage is restarted and so any internal state is cleared.

In our decider, we just want the stream to continue in the case of strong vibrations, but stop in the case of any other failure. We handle both other types of exceptions in addition to a supervision strategy and therefore we're safe with this decision.

This is how we apply our supervision strategy to the definition of the processing stage:

private def subMixFlow: Flow[Groceries, Dough, NotUsed] =
Flow[Groceries].async("mixers-dispatcher", 1).map(mix).recover {
case MotorOverheatException => Dough(0)
}.withAttributes(ActorAttributes.supervisionStrategy(strategy))

Now, if we start our example, it will run and deal with hardware failures as expected.

It looks good but we're not done because we haven't tested our bakery yet.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.21.104.183