Perpetual Stream
Overview
The PerpetualStream
allows declaration of a stream that would start when the server starts and stop gracefully without dropping messages when the server stops. It is commonly used for message consumers from Kafka or JMS, but also used as a consolidation point for data from multiple streams received through HTTP requests.
PerpetualStream
can be customized in various ways to fit your streams' needs. Those are discussed in the sections listed below:
- Basic Use
- Override Lifecycle State to run the stream
- Shutdown Overrides
- Kill Switch Overrides
- Receiving and forwarding a message to the stream
- Handling Stream Errors
- Connecting a Perpetual Stream with an HTTP Flow
Dependency
PerpetualStream
is part of core squbs. In general, you do not need to add an extra dependency. The classes are part of the following dependency:
"org.squbs" %% "squbs-unicomplex" % squbsVersion
Usage
The PerpetualStream
gets exposed as the PerpetualStream
trait for Scala and AbstractPerpertualStream
abstract class for Java. For brevity, we'll refer to both as PerpetualStream
.
Basic Use
Scala
Streams making use of PerpetualStream
will want to materialize to certain known types, allowing the hooks in PerpetualStream
to work seamlessly with minimal amount of custom overrides. The options are:
- Materialize to a
Future[_]
, meaning a future of any type. In this case the sharedkillSwitch
fromPerpetualStream
should be embedded orshutdown()
would need to be overridden. - Materialize to a
(KillSwitch, Future[_])
tuple. TheKillSwitch
will be used for initiating the shutdown of the stream. - Materialize to a
List
or anyProduct
(List
s,Tuple
s are all subtypes ofProduct
) where the first element is aKillSwitch
and the last element is aFuture
.
Streams with different materialized values can still be used but shutdown()
needs to be overridden.
Common examples for well behaved streams can be seen below:
class WellBehavedStream extends PerpetualStream[Future[Done]] {
def generator = Iterator.iterate(0) { p =>
if (p == Int.MaxValue) 0 else p + 1
}
val source = Source.fromIterator(generator _)
val ignoreSink = Sink.ignore
override def streamGraph = RunnableGraph.fromGraph(GraphDSL.create(ignoreSink) {
implicit builder =>
sink =>
import GraphDSL.Implicits._
source ~> killSwitch.flow[Int] ~> sink
ClosedShape
})
}
Alternatively, the following code shows another conformant PerpetualStream
materializing its first element as KillSwitch
:
class WellBehavedStream2 extends PerpetualStream[(KillSwitch, Future[Done])] {
def generator = Iterator.iterate(0) { p =>
if (p == Int.MaxValue) 0 else p + 1
}
val source = Source.fromIterator(generator _)
val ignoreSink = Sink.ignore
override def streamGraph = RunnableGraph.fromGraph(
GraphDSL.create(KillSwitch.single[Int], ignoreSink)((_,_)) { implicit builder =>
(kill, sink) =>
import GraphDSL.Implicits._
source ~> kill ~> sink
ClosedShape
})
}
That's it. These streams are well behaved because they materialize to the sink's materialized value, which is a Future[Done]
in the first example, or a (KillSwitch, Future[Done])
in the second one.
Java
Streams making use of AbstractPerpetualStream
will want to materialize to certain known types, allowing the hooks in AbstractPerpetualStream
to work seamlessly with minimal amount of custom overrides. The options are:
- Materialize to a
CompletionStage<?>
, meaning a JavaCompletionStage
of any type. In this case the sharedkillSwitch
fromAbstractPerpetualStream
should be embedded orshutdown()
would need to be overridden. - Materialize to a
Pair<KillSwitch, CompletionStage<?>>
. TheKillSwitch
will be used for initiating the shutdown of the stream. - Materialize to a
java.util.List
where the first element is aKillSwitch
and the last element is aCompletionStage
.
Streams with different materialized values can still be used but shutdown()
needs to be overridden.
Common examples for well behaved streams can be seen below:
public class WellBehavedStream extends AbstractPerpetualStream<CompletionStage<Done>> {
Sink<Integer, CompletionStage<Done>> ignoreSink = Sink.ignore();
@Override
public RunnableGraph<CompletionStage<Done>>> streamGraph() {
return RunnableGraph.fromGraph(GraphDSL.create(ignoreSink, (builder, sink) -> {
SourceShape<Integer> source = builder.add(
Source.unfold(0, i -> {
if (i == Integer.MAX_VALUE) {
return Optional.of(Pair.create(0, i));
} else {
return Optional.of(Pair.create(i + 1, i));
}
})
);
FlowShape<Integer, Integer> killSwitch= builder.add(killSwitch().<Integer>flow());
builder.from(source).via(killSwitch).to(sink);
return ClosedShape.getInstance();
}));
}
Alternatively, the following code shows another conformant PerpetualStream
materializing its first element as KillSwitch
:
public class WellBehavedStream2 extends
AbstractPerpetualStream<Pair<KillSwitch, CompletionStage<Done>>> {
Sink<Integer, CompletionStage<Done>> ignoreSink = Sink.ignore();
@Override
public RunnableGraph<Pair<KillSwitch, CompletionStage<Done>>>> streamGraph() {
return RunnableGraph.fromGraph(GraphDSL.create(KillSwitches.<Integer>single(),
ignoreSink, Pair::create, (builder, kill, sink) -> {
SourceShape<Integer> source = builder.add(
Source.unfold(0, i -> {
if (i == Integer.MAX_VALUE) {
return Optional.of(Pair.create(0, i));
} else {
return Optional.of(Pair.create(i + 1, i));
}
})
);
builder.from(source).via(kill).to(sink);
return ClosedShape.getInstance();
}));
}
That's it. These streams are well behaved because they materialize to the sink's materialized value, which is a CompletionStage<Done>
in the first example, or a Pair<KillSwitch, CompletionStage<Done>>
in the second one.
Override Lifecycle State to run the stream
There may be scenarios where a stream need to be materialized at a different lifecycle than active
. In such scenarios, override streamRunLifecycleState
, e.g.,:
Scala
override lazy val streamRunLifecycleState: LifecycleState = Initializing
Java
@Override
public LifecycleState streamRunLifecycleState() {
return Initializing.instance();
}
Shutdown Overrides
It is sometimes not possible to define a well behaved stream. For instance, the Sink
may not materialize to a Future
or CompletionStage
or you need to do further cleanup at shutdown. For this reason, it is possible to override shutdown
as in the following code:
Scala
override def shutdown(): Future[Done] = {
// Do all your cleanup
// For safety, call super
super.shutdown()
// The Future from super.shutdown may not mean anything.
// Feel free to create your own future that identifies the
// stream being done. Return your Future instead.
}
Java
@Override
public CompletionStage<Done> shutdown() {
// Do all your cleanup
// For safety, call super
super.shutdown();
// The Future from super.shutdown may not mean anything.
// Feel free to create your own future that identifies the
// stream being done. Return your Future instead.
}
shutdown
needs to do the following:
- Initiate the shutdown of the stream.
- Do any other cleanup.
- Return the future that completes when the stream has finished processing.
Note: It is always advisable to call super.shutdown
. There is no harm or other side-effect in making this call.
Alternate Shutdown Mechanisms
The source
may not materialize to KillSwitch
and provide a better way to do a proper shutdown than using the killSwitch
. Just use the shutdown mechanism of the source
in such cases and override shutdown
to initiate the shutdown of the source. The killSwitch
remains unused.
Kill Switch Overrides
If the killSwitch
needs to be shared across multiple streams, you can override killSwitch
to reflect the shared instance.
Scala
override lazy val killSwitch = mySharedKillSwitch
Java
@Override
public SharedKillSwitch killSwitch() {
return KillSwitches.shared("myKillSwitch");
}
Receiving and forwarding a message to the stream
Some streams take input from actor messages. While it is possible for some stream configurations to materialize to the ActorRef
of the source, it is difficult to address this actor. Since PerpetualStream
itself is an actor, it can have a well known address/path and forward to message to the stream source. To do so, we need to override the receive
or createReceive()
as follows:
Scala
override def receive = {
case msg: MyStreamMessage =>
val (sourceActorRef, _) = matValue
sourceActorRef forward msg
}
Java
@Override
public Receive createReceive() {
return receiveBuilder()
.match(MyStreamMessage.class, msg -> {
ActorRef sourceActorRef = matValue().first();
sourceActorRef.forward(msg, getContext());
})
.build();
}
Handling Stream Errors
The PerpetualStream
default behavior resumes on errors uncaught by the stream stages. The message causing the error is ignored. Override decider
if a different behavior is desired.
Scala
override def decider: Supervision.Decider = { t =>
log.error("Uncaught error {} from stream", t)
t.printStackTrace()
Restart
}
Java
@Override
public akka.japi.function.Function<Throwable, Supervision.Directive> decider() {
return t -> {
log().error("Uncaught error {} from stream", t);
t.printStackTrace();
return Supervision.restart();
};
}
Restart
will restart the stage that has an error without shutting down the stream. Please see Supervision Strategies for possible strategies.
Connecting a Perpetual Stream with an HTTP Flow
Akka HTTP allows defining a Flow[HttpRequest, HttpResponse, NotUsed]
, which gets materialized for each http connection. There are scenarios where an app needs to connect the http flow to a long running stream that needs to be materialized only once (e.g., publishing to Kafka). Akka HTTP enables end-to-end streaming in such scenarios with MergeHub
. squbs provides utilities to connect an http flow with a PerpetualStream
that uses MergeHub
.
Below are sample PerpetualStream
implementations - two Scala and two Java equivalents, all using MergeHub
.
Type parameter Sink[MyMessage, NotUsed]
describes the inlet of the RunnableGraph
instance that will be used as a destination (Sink
) by the http flow part in HttpFlowWithMergeHub
further down.
First a simplest outline of the logic:
Scala
class PerpetualStreamWithMergeHub extends PerpetualStream[Sink[MyMessage, NotUsed]] {
override lazy val streamRunLifecycleState: LifecycleState = Initializing
/**
* Describe your graph by implementing streamGraph
*
* @return The graph.
*/
override def streamGraph= MergeHub.source[MyMessage].to(Sink.ignore)
}
Java
public class PerpetualStreamWithMergeHub extends AbstractPerpetualStream<Sink<MyMessage, NotUsed>> {
@Override
public LifecycleState streamRunLifecycleState() {
return Initializing.instance();
}
/**
* Describe your graph by implementing streamGraph
*
* @return The graph.
*/
@Override
public RunnableGraph<Sink<MyMessage, NotUsed>> streamGraph() {
return MergeHub.of(MyMessage.class).to(Sink.ignore());
}
}
From outside prospective (by http flow) this class is seen as terminal Sink[MyMessage, NotUsed]
, which means that
PerpetualStreamWithMergeHub
expects to receive MyMessage
on its inlet and will not emit anything out, i.e. its outlet is plugged.
From the inside prospective MergeHub
is the source of MyMessage
s. Those messages are passed to Sink.ignore
, which is nothing.
MergeHub.source[MyMessage]
produces runtime instance, with inlet of type Sink[MyMessage, NotUsed]
, which conforms to PerpetualStream[Sink[MyMessage, NotUsed]]
type parameter.
The .to(Sink.ignore)
completes or "closes" this Shape
with a plugged outlet. End result is an instance of RunnableGraph[Sink[MyMessage, NotUsed]]
A bit more involved example using GraphDSL:
Scala
final case class MyMessage(ip:String, ts:Long)
final case class MyMessageEnrich(ip:String, ts:Long, enrichTs:List[Long])
class PerpetualStreamWithMergeHub extends PerpetualStream[Sink[MyMessage, NotUsed]] {
override lazy val streamRunLifecycleState: LifecycleState = Initializing
// inlet - destination for MyMessage messages
val source = MergeHub.source[MyMessage]
//outlet - discard messages
val sink = Sink.ignore
//flow component, which supposedly does something to MyMessage
val preprocess = Flow[MyMessage].map{inMsg =>
val outMsg = MyMessageEnrich(ip=inMsg.ip, ts = inMsg.ts, enrichTs = List.empty[Long])
println(s"Message inside stream=$inMsg")
outMsg
}
// building a flow based on another flow, to do some dummy enrichment
val enrichment = Flow[MyMessageEnrich].map{inMsg=>
val outMsg = MyMessageEnrich(ip=inMsg.ip.replaceAll("\\.","-"), ts = inMsg.ts, enrichTs = System.currentTimeMillis()::inMsg.enrichTs)
println(s"Enriched Message inside enrich step=$outMsg")
outMsg
}
/**
* Describe your graph by implementing streamGraph
*
* @return The graph.
*/
override def streamGraph: RunnableGraph[Sink[MyMessage, NotUsed]] = RunnableGraph.fromGraph(
GraphDSL.create(source) { implicit builder=>
input =>
import GraphDSL.Implicits._
input ~> killSwitch.flow[MyMessage] ~> preprocess ~> enrichment ~> sink
ClosedShape
})
}
Java
public class PerpetualStreamWithMergeHub extends AbstractPerpetualStream<Sink<MyMessage, NotUsed>> {
// inlet - destination for MyMessage messages
Source<MyMessage, Sink<MyMessage, NotUsed>> source = MergeHub.of(MyMessage.class);
@Override
public LifecycleState streamRunLifecycleState() {
return Initializing.instance();
}
/**
* Describe your graph by implementing streamGraph
*
* @return The graph.
*/
@Override
public RunnableGraph<Sink<MyMessage, NotUsed>> streamGraph() {
return RunnableGraph.fromGraph(GraphDSL.create(source, (builder, input) -> {
FlowShape<MyMessage, MyMessage> killSwitch = builder.add(killSwitch().<MyMessage>flow());
//flow component, which supposedly does something to MyMessage
FlowShape<MyMessage, MyMessageEnrich> preProcess = builder.add(Flow.<MyMessage>create().map(inMsg -> {
MyMessageEnrich outMsg = new MyMessageEnrich(inMsg.ip, inMsg.ts, new ArrayList<>());
System.out.println("Message inside stream=" + inMsg);
return outMsg;
}));
// building a flow based on another flow, to do some dummy enrichment
FlowShape<MyMessageEnrich, MyMessageEnrich> enrichment =
builder.add(Flow.<MyMessageEnrich>create().map(inMsg -> {
inMsg.enrichTs.add(System.currentTimeMillis());
MyMessageEnrich outMsg = new MyMessageEnrich(inMsg.ip.replaceAll("\\.","-"),
inMsg.ts, inMsg.enrichTs);
System.out.println("Enriched Message inside enrich step=" + outMsg);
return outMsg;
}));
//outlet - discard messages
SinkShape<Object> sink = builder.add(Sink.ignore());
builder.from(input).via(killSwitch).via(preProcess).via(enrichment).to(sink);
return ClosedShape.getInstance();
}));
}
}
Let's see how all parts are falling into a place:
streamGraph
is expected to return RunnableGraph
with the same type parameter as described in PerpetualStream[Sink[MyMessage, NotUsed]]
.
Our source
is a MergeHub
, it is expected to receive a MyMessage
, which makes its materialized (runtime) type a Sink[MyMessage,NotUsed]
.
Our graph is built starting with our source
, by passing it as a parameter to GraphDSL.create(s:Shape)
constructor.
The result is an instance of RunnableGraph[Sink[MyMessage, NotUsed]]
, which is a ClosedShape
with Sink
inlet and plugged outlet.
Potentially confusing part when looking at this example is mixing Sink
and source
names to refer to the same. It looks a bit strange in English.
Let's use outside vs. inside prospective explanation again:
From the outside prospective our component is seen as a Sink[MyMessage, NotUsed]
. This is achieved by using MergeHub
, which from the inside prospective is a source of the messages hence val
name source
.
Correspondingly, in the event we need to emit something out, our val sink
will be actually some shape with outlet of type Source[MyMessage, NotUsed]
.
Let's add the above PerpetualStream
in squbs-meta.conf
. Please see Well Known Actors for more details.
cube-name = org.squbs.stream.mycube
cube-version = "0.0.1"
squbs-services = [
{
class-name = org.squbs.stream.HttpFlowWithMergeHub
web-context = mergehub
}
]
squbs-actors = [
{
class-name = org.squbs.stream.PerpetualStreamWithMergeHub
name = perpetualStreamWithMergeHub
}
]
Scala
The HTTP FlowDefinition
can be connected to the PerpetualStream
as follows by extending PerpetualStreamMatValue
and using matValue
method.
Type parameter for the PerpetualStreamMatValue
describes the data type flowing between the HTTP flow and the MergeHub
.
(both versions of PerpetualStreamWithMergeHub
above expect to receive MyMessage
, i.e. both have inlet of a type Sink[MyMessage, NotUsed]
).
class HttpFlowWithMergeHub extends FlowDefinition with PerpetualStreamMatValue[MyMessage] {
override val flow: Flow[HttpRequest, HttpResponse, NotUsed] =
Flow[HttpRequest]
.mapAsync(1)(Unmarshal(_).to[MyMessage])
.alsoTo(matValue("/user/mycube/perpetualStreamWithMergeHub"))
.map { myMessage => HttpResponse(entity = s"Received Id: ${myMessage.id}") }
}
Java
The HTTP FlowDefinition
can be connected to the PerpetualStream
as follows by extending FlowToPerpetualStream
instead of FlowDefinition
directly. Note that FlowToPerpetualStream
is a FlowDefinition
. We use the matValue
method as the sink to send HTTP messages to the MergeHub
defined in the PerpetualStream
.
class HttpFlowWithMergeHub extends FlowToPerpetualStream {
private final Materializer mat = ActorMaterializer.create(context().system());
private final MarshalUnmarshal mu = new MarshalUnmarshal(context().system().dispatcher(), mat);
@Override
public Flow<HttpRequest, HttpResponse, NotUsed> flow() {
return Flow.<HttpRequest>create()
.mapAsync(1, req -> mu.apply(unmarshaller(MyMessage.class), req.entity()))
.alsoTo(matValue("/user/mycube/perpetualStreamWithMergeHub"))
.map(myMessage -> HttpResponse.create().withEntity("Received Id: " + myMessage.ip));
}
}
Let's see what's happening here:
matValue
method finds the RunnableGraph
component registered under the /user/mycube/perpetualStreamWithMergeHub
.
This happens to be the bootstrapped instance of our PerpetualStreamWithMergeHub
.
alsoTo
expects result of matValue
to be a Sink
for MyMessage
.
I.e. Sink[MyMessage, NotUsed]
. And as we've seen above this is exactly what PerpetualStreamWithMergeHub.streamGraph
will produce.
(Remembering our two prospectives: here alsoTo
looks at PerpetualStreamWithMergeHub
from the outside prospective and sees a Sink[MyMessage, NotUsed]
.)