Circuit Breaker
Overview
Akka Streams and Akka HTTP are great technologies to build highly resilient systems. They provide back-pressure to ensure you do not overload the system and the slowness of one component does not cause its work queue to pile up and end up in memory leaks. But, we need another safeguard to ensure our service stays responsive in the event of external or internal failures, and have alternate paths to satisfying the requests/messages due to such failures. If a downstream service is not responding or slow, we could alternatively try another service or fetch cached results instead of back-pressuring the stream and potentially the whole system.
squbs introduces CircuitBreakerBidi
Akka Streams GraphStage
to provide circuit breaker functionality for streams.
Dependency
Add the following dependency to your build.sbt
or scala build file:
"org.squbs" %% "squbs-ext" % squbsVersion
Usage
The circuit breaker functionality is provided as a BidiFlow
that can be connected to a flow via the join
operator. CircuitBreakerBidi
might potentially change the order of messages, so it requires a Context
to be carried around. In addition, it needs to be able to uniquely identify each element for its internal mechanics. The requirement is that either the Context
itself or a mapping from Context
should be able to uniquely identify an element (see Context to Unique Id Mapping section for more details). Along with the Context
, a Try
is pushed downstream:
Circuit is Closed
:
- If an output
msg
is provided by the wrapped flow within the timeout, then(Success(msg), context)
is passed down. - Otherwise, a
(Failure(FlowTimeoutException()), context)
is pushed to downstream.
Circuit is Open
:
- The result of
fallback
function, along with thecontext
, is pushed to downstream if one provided - Otherwise, a
(Failure(CircuitBreakerOpenException()), context)
is pushed to downstream
Circuit is HalfOpen
:
- The first request/element is let to go through the wrapped flow, and the behavior should be same as
Closed
state. - Rest of the elements are short circuited and the behaviour is same to
Open
state.
The state of the circuit breaker is hold in a CircuitBreakerState
implementation. The default implementation AtomicCircuitBreakerState
is based on Atomic
variables, which allows it to be updated concurrently across multiple flow materializations.
Scala
import org.squbs.streams.circuitbreaker.CircuitBreakerSettings
val circuitBreakerState = AtomicCircuitBreakerState("sample", 2, 100 milliseconds, 1 second)
val circuitBreakerSettings = CircuitBreakerSettings[String, String, UUID](circuitBreakerState)
val circuitBreakerBidiFlow = CircuitBreakerBidiFlow(circuitBreakerSettings)
val flow = Flow[(String, UUID)].mapAsyncUnordered(10) { elem =>
(ref ? elem).mapTo[(String, UUID)]
}
Source("a" :: "b" :: "c" :: Nil)
.map(s => (s, UUID.randomUUID()))
.via(circuitBreakerBidiFlow.join(flow))
.runWith(Sink.seq)
Java
For Java, use CircuitBreakerSettings
from the org.squbs.streams.circuitbreaker.japi
package:
import org.squbs.streams.circuitbreaker.japi.CircuitBreakerSettings;
final CircuitBreakerState circuitBreakerState =
AtomicCircuitBreakerState.create(
"sample",
2,
FiniteDuration.apply(100, TimeUnit.MILLISECONDS),
FiniteDuration.apply(1, TimeUnit.SECONDS),
system.dispatcher(),
system.scheduler());
CircuitBreakerSettings<String, String, UUID> circuitBreakerSettings =
CircuitBreakerSettings.create(circuitBreakerState);
final BidiFlow<Pair<String, UUID>, Pair<String, UUID>, Pair<String, UUID>, Pair<Try<String>, UUID>, NotUsed>
circuitBreakerBidiFlow = CircuitBreakerBidiFlow.create(circuitBreakerSettings);
final Flow<Pair<String, UUID>, Pair<String, UUID>, NotUsed> flow =
Flow.<Pair<String, UUID>>create()
.mapAsyncUnordered(10, elem -> ask(ref, elem, 5000))
.map(elem -> (Pair<String, UUID>)elem);
Source.from(Arrays.asList("a", "b", "c"))
.map(s -> Pair.create(s, UUID.randomUUID()))
.via(circuitBreakerBidiFlow.join(flow))
.runWith(Sink.seq(), mat);
Fallback Response
CircuitBreakerSettings
optionally takes a fallback function that gets called when the circuit is Open
.
Scala
A function of type In => Try[Out]
can be provided via withFallback
function:
import org.squbs.streams.circuitbreaker.CircuitBreakerSettings
val circuitBreakerSettings =
CircuitBreakerSettings[String, String, UUID](circuitBreakerState)
.withFallback((elem: String) => Success("Fallback Response!"))
Java
A Function<In, Try<Out>>
can be provided via withFallback
function:
import org.squbs.streams.circuitbreaker.japi.CircuitBreakerSettings;
CircuitBreakerSettings circuitBreakerSettings =
CircuitBreakerSettings.<String, String, UUID>create(circuitBreakerState)
.withFallback(s -> Success.apply("Fallback Response!"));
Failure Decider
By default, any Failure
from the joined Flow
is considered a problem and causes the circuit breaker failure count to be incremented. However, CircuitBreakerSettings
also accepts an optional failureDecider
to decide on whether an element passed by the joined Flow
is actually considered a failure. For instance, if Circuit Breaker is joined with an Akka HTTP flow, a Success
Http Response with status code 500 internal server error should be considered a failure.
Scala
A function of type Try[Out] => Boolean
can be provided via withFailureDecider
function. Below is an example where, along with any Failure
message, a Success
of HttpResponse
with status code 400
and above is also considered a failure:
import org.squbs.streams.circuitbreaker.CircuitBreakerSettings
val circuitBreakerSettings =
CircuitBreakerSettings[HttpRequest, HttpResponse, UUID](circuitBreakerState)
.withFailureDecider(tryHttpResponse => tryHttpResponse.isFailure || tryHttpResponse.get.status.isFailure)
Java
A Function<Try<Out>, Boolean>
can be provided via withFailureDecider
function. Below is an example where, along with any Failure
message, a Success
of HttpResponse
with status code 400
and above is also considered a failure:
import org.squbs.streams.circuitbreaker.japi.CircuitBreakerSettings;
CircuitBreakerSettings circuitBreakerSettings =
CircuitBreakerSettings.<HttpRequest, HttpResponse, UUID>create(circuitBreakerState)
.withFailureDecider(
tryHttpResponse -> tryHttpResponse.isFailure() || tryHttpResponse.get().status().isFailure());
Creating CircuitBreakerState from a configuration
While AtomicCircuitBreakerState
has programmatic API for configuration, it also allows the configuration to be provided through a Config
object. The Config
can partially define some settings, and for the rest, it will fallback to default values. Please see here for the default circuit breaker configuration.
Scala
val config = ConfigFactory.parseString(
"""
|max-failures = 5
|call-timeout = 50 ms
|reset-timeout = 100 ms
|max-reset-timeout = 2 seconds
|exponential-backoff-factor = 2.0
""".stripMargin)
val circuitBreakerState = AtomicCircuitBreakerState("sample", config)
Java
Config config = ConfigFactory.parseString(
"max-failures = 5\n" +
"call-timeout = 50 ms\n" +
"reset-timeout = 100 ms\n" +
"max-reset-timeout = 2 seconds\n" +
"exponential-backoff-factor = 2.0");
final CircuitBreakerState circuitBreakerState = AtomicCircuitBreakerState.create("sample", config, system);
Circuit Breaker across materializations
Please note, in many scenarios, the same circuit breaker instance is used across multiple materializations of the same flow. For such scenarios, make sure to use a CircuitBreakerState
instance that can be modified concurrently. The default implementation AtomicCircuitBreakerState
uses Atomic
variables and can be used across multiple materializations. More implementations, e.g., Agent
based, can be introduced in the future.
Context to Unique Id Mapping
Context
itself might be used as a unique id. However, in many scenarios, Context
contains more than the unique id itself or the unique id might be retrieved as a mapping from the Context
. squbs allows different options to provide a unique id:
Context
itself is a type that can be used as a unique id, e.g.,Int
,Long
,java.util.UUID
Context
extendsUniqueId.Provider
and implementsdef uniqueId
Context
is wrapped withUniqueId.Envelope
Context
is mapped to a unique id by calling a function
With the first three options, a unique id can be retrieved directly through the context. For the last option, CircuitBreakerSettings
allows a function to be provided.
Scala
A function of type Context => Option[Any]
can be provided via withUniqueIdMapper
function:
import org.squbs.streams.circuitbreaker.CircuitBreakerSettings
case class MyContext(s: String, id: Long)
val circuitBreakerSettings =
CircuitBreakerSettings[String, String, MyContext](circuitBreakerState)
.withUniqueIdMapper((context: MyContext) => Some(context.id))
Java
A Function<Context, Optional<Any>>
can be provided via withUniqueIdMapper
function:
import org.squbs.streams.circuitbreaker.japi.CircuitBreakerSettings;
class MyContext {
private String s;
private long id;
public MyContext(String s, long id) {
this.s = s;
this.id = id;
}
public long id() {
return id;
}
}
CircuitBreakerSettings circuitBreakerSettings =
CircuitBreakerSettings.<String, String, MyContext>create(circuitBreakerState)
.withUniqueIdMapper(context -> Optional.of(context.id));
Notifications
An ActorRef
can be subscribed to receive all TransitionEvents
or any transition event that it is intered in, e.g., Closed
, Open
, HalfOpen
.
Scala
Here is an example that registers an ActorRef
to receive events when circuit transitions to Open
state:
circuitBreakerState.subscribe(self, Open)
Java
Here is an example that registers an ActorRef
to receive events when circuit transitions to Open
state:
circuitBreakerState.subscribe(getRef(), Open.instance());
Metrics
The CircuitBreakerState
keeps Codahale meters for:
- Success count
- Failure count
- Short circuit times
It also keeps a Gauge
for the circuit breaker state.
To differentiate metrics across instance, CircuitBreakerState
implementation require a name to passed in.
CircuitBreakerState
also allows a MetricRegistry
instance to be passed in. If no MetricRegistry
is passed in, it creates one internally.
Potential effects of Circuit Open state on throughput
If the upstream does not control the throughput, then the throughput of the stream might temporarily increase once the circuit is Open
: The downstream demand will be addressed with short circuit/fallback messages, which might (or might not) take less time than it takes the joined Flow
to process an element. To eliminate this problem, a throttle can be applied specifically for circuit breaker Open
messages (or fallback messages).