Timeout stage
Overview
Some stream use cases may require each message in a flow to be processed within a bounded time or to send a timeout failure message instead. squbs introduces Timeout
Akka Streams stage to add timeout functionality to streams.
Dependency
Add the following dependency to your build.sbt
or scala build file:
"org.squbs" %% "squbs-ext" % squbsVersion
Usage
The timeout functionality is provided as a BidiFlow
that can be connected to a flow via the join
operator. The timeout BidiFlow
sends a Try
to downstream:
- If an output
msg
is provided by the wrapped flow within the timeout, thenSuccess(msg)
is passed down. - Otherwise, a
Failure(FlowTimeoutException())
is pushed to downstream.
If there is no downstream demand, then no timeout message will be pushed down. Moreover, no timeout check will be done unless there is downstream demand. Accordingly, based on the demand scenarios, some messages might be send as a Success
even after the timeout duration.
Timeout precision is at best 10ms to avoid unnecessary timer scheduling cycles.
The timeout feature is supported for flows that guarantee message ordering as well as for the ones that do not guarantee message ordering. For the flows that do not guarantee message ordering, a context
needs to be carried by the wrapped flow to uniquely identify each element. While the usage of the BidiFlow
is same for both types of flows, the creation of the BidiFlow
is done via two different APIs.
Flows with message order guarantee
TimeoutOrdered
is used to create a timeout BidiFlow
to wrap flows that do keep the order of messages. Please note, with the message order guarantee, there can be a head-of-line blocking scenario, a slower element might cause subsequent elements to timeout.
Scala
val timeout = TimeoutOrdered[String, String](1 second)
val flow = Flow[String].map(s => findAnEnglishWordThatStartWith(s))
Source("a" :: "b" :: "c" :: Nil)
.via(timeout.join(flow))
.runWith(Sink.seq)
Java
final FiniteDuration duration = Duration.create(1, TimeUnit.SECONDS);
final BidiFlow<String, String, String, Try<String>, NotUsed> timeout = TimeoutOrdered.create(duration);
final Flow<String, String, NotUsed> flow =
Flow.<String>create().map(s -> findAnEnglishWordThatStartWith(s));
Source.from(Arrays.asList("a", "b", "c"))
.via(timeout.join(flow))
.runWith(Sink.seq(), mat);
Flows without message order guarantee
Timeout
is used to create a timeout BidiFlow
to wrap flows that do not guarantee the order of messages. To uniquely identify each element and its corresponding timing marks, a context
, of any type defined by the application, needs to be carried around by the wrapped flow. The requirement is that either the Context
itself or a mapping from Context
should be able to uniquely identify an element (see Context to Unique Id Mapping section for more details).
Scala
Timeout[In, Out, Context](timeout: FiniteDuration)
is used to create an unordered timeout BidiFlow
. This BidiFlow
can be joined with any flow that takes in a (In, Context)
and outputs a (Out, Context)
.
val timeout = Timeout[String, String, UUID](1 second)
val flow = Flow[(String, UUID)].mapAsyncUnordered(10) { elem =>
(ref ? elem).mapTo[(String, UUID)]
}
Source("a" :: "b" :: "c" :: Nil)
.map { s => (s, UUID.randomUUID()) }
.via(timeout.join(flow))
.runWith(Sink.seq)
Java
The following API is used to create an unordered timeout BidiFlow
:
public class Timeout {
public static <In, Out, Context> BidiFlow<Pair<In, Context>,
Pair<In, Context>,
Pair<Out, Context>,
Pair<Try<Out>, Context>,
NotUsed>
create(FiniteDuration timeout);
}
This BidiFlow
can be joined with any flow that takes in a akka.japi.Pair<In, Context>
and outputs a akka.japi.Pair<Out, Context>
.
final FiniteDuration duration = Duration.create(1, TimeUnit.SECONDS);
final BidiFlow<Pair<String, UUID>,
Pair<String, UUID>,
Pair<String, UUID>,
Pair<Try<String>, UUID>, NotUsed> timeout = Timeout.create(duration);
final Flow<Pair<String, UUID>, Pair<String, UUID>, NotUsed> flow =
Flow.<Pair<String, UUID>>create()
.mapAsyncUnordered(10, elem -> ask(ref, elem, 5000))
.map(elem -> (Pair<String, UUID>)elem);
Source.from(Arrays.asList("a", "b", "c"))
.map(s -> new Pair<>(s, UUID.randomUUID()))
.via(timeout.join(flow))
.runWith(Sink.seq(), mat);
Context to Unique Id Mapping
Context
itself might be used as a unique id. However, in many scenarios, Context
contains more than the unique id itself or the unique id might be retrieved as a mapping from the Context
. squbs allows different options to provide a unique id:
Context
itself is a type that can be used as a unique id, e.g.,Int
,Long
,java.util.UUID
Context
extendsUniqueId.Provider
and implementsdef uniqueId
Context
is wrapped withUniqueId.Envelope
Context
is mapped to a unique id by calling a function
With the first three options, a unique id can be retrieved directly through the context.
For the last option, Timeout
allows a function to be passed in via TimeoutSettings
:
Scala
case class MyContext(s: String, uuid: UUID)
val settings =
TimeoutSettings[String, String, MyContext](1 second)
.withUniqueIdMapper(context => context.uuid)
val timeout = Timeout(settings)
val flow = Flow[(String, MyContext)].mapAsyncUnordered(10) { elem =>
(ref ? elem).mapTo[(String, MyContext)]
}
Source("a" :: "b" :: "c" :: Nil)
.map( _ -> MyContext("dummy", UUID.randomUUID))
.via(timeout.join(flow))
.runWith(Sink.seq)
Java
class MyContext {
private String s;
private UUID uuid;
public MyContext(String s, UUID uuid) {
this.s = s;
this.uuid = uuid;
}
public UUID uuid() {
return uuid;
}
}
final FiniteDuration duration = Duration.create(1, TimeUnit.SECONDS);
final TimeoutSettings settings =
TimeoutSettings.<String, String, Context>create(duration)
.withUniqueIdMapper(context -> context.uuid);
final BidiFlow<Pair<String, MyContext>,
Pair<String, MyContext>,
Pair<String, MyContext>,
Pair<Try<String>, MyContext>,
NotUsed> timeout = Timeout.create(settings);
final Flow<Pair<String, MyContext>, Pair<String, MyContext>, NotUsed> flow =
Flow.<Pair<String, MyContext>>create()
.mapAsyncUnordered(10, elem -> ask(ref, elem, 5000))
.map(elem -> (Pair<String, MyContext>)elem);
Source.from(Arrays.asList("a", "b", "c"))
.map(s -> new Pair<>(s, new MyContext("dummy", UUID.randomUUID())))
.via(timeout.join(flow))
.runWith(Sink.seq(), mat);
Clean up callback
The Timeout
also provides a clean up callback function to be passed in via TimeoutSettings
. This function will be called for emitted elements that were already considered timed out.
An example use case for this functionality is when Timeout
is used with Akka Http client. As described in Implications of the streaming nature of Request/Response Entities, all http responses must be consumed or discarded. By passing a clean up callback to discard the timed out requests when they complete we avoid clogging down the stream.
Scala
val akkaHttpDiscard = (response: HttpResponse) => response.discardEntityBytes()
val settings =
TimeoutSettings[HttpRequest, HttpResponse, Context](1 second)
.withCleanUp(response => response.discardEntityBytes())
val timeout = Timeout(settings)
Java
final FiniteDuration duration = Duration.create(20, TimeUnit.MILLISECONDS);
final TimeoutSettings settings =
TimeoutSettings.<HttpRequest, HttpResponse, Context>create(duration)
.withCleanUp(httpResponse -> httpResponse.discardEntityBytes(materializer));
final BidiFlow<Pair<HttpRequest, UUID>,
Pair<HttpRequest, UUID>,
Pair<HttpResponse, UUID>,
Pair<Try<HttpResponse>, UUID>,
NotUsed> timeout = Timeout.create(settings);