Customizing Stream Control
Akka Streams/Reactive stream needs to be integrated with the Runtime Lifecycle of the server. For this, an automated or semi-automated integration is provided through the PerpetualStream
infrastructure. If you need even more fine-grained control over stream, the following sections explain such facilities.
Dependency
In general, you do not need to add an extra dependency. The classes are part of the following dependency:
"org.squbs" %% "squbs-unicomplex" % squbsVersion
Making A Lifecycle-Sensitive Source
If you wish to have a source that is fully connected to the lifecycle events of squbs, you can wrap any source with LifecycleManaged
.
Scala
val inSource = <your-original-source>
val aggregatedSource = LifecycleManaged().source(inSource)
Java
final Source inSource = <your-original-source>
final Source aggregatedSource = new LifecycleManaged(system).source(inSource);
The resulting source will be an aggregated source materialize to a (T, ActorRef)
where T
is the materialized type of inSource
and ActorRef
is the materialized type of the trigger actor which receives events from the Unicomplex, the squbs container.
The aggregated source does not emit from original source until lifecycle becomes Active
, and stop emitting element and shuts down the stream after lifecycle state becomes Stopping
.
Custom Aggregated Triggered Source
If you want your flow to enable/disable to custom events, you can integrate with a custom trigger source,
element true
will enable, false
will disable.
Note that Trigger
takes an argument eagerComplete
which defaults to false
in Scala but has to be
passed in Java. If eagerComplete
is set to false
, completion and/or termination of the trigger source actor
will detach the trigger. If set to true
, such termination will complete the stream.
Scala
import org.squbs.stream.TriggerEvent._
val inSource = <your-original-source>
val trigger = <your-custom-trigger-source>.collect {
case 0 => DISABLE
case 1 => ENABLE
}
val aggregatedSource = new Trigger().source(inSource, trigger)
Java
import static org.squbs.stream.TriggerEvent.DISABLE;
import static org.squbs.stream.TriggerEvent.ENABLE;
final Source<?, ?> inSource = <your-original-source>;
final Source<?, ?> trigger = <your-custom-trigger-source>.collect(new PFBuilder<Integer, TriggerEvent>()
.match(Integer.class, p -> p == 1, p -> ENABLE)
.match(Integer.class, p -> p == 0, p -> DISABLE)
.build());
final Source aggregatedSource = new Trigger(false).source(inSource, trigger);
Custom Lifecycle Event(s) for Trigger
If you want to respond to more lifecycle events beyond Active
and Stopping
, for example you want Failed
to also stop the flow, you can modify the lifecylce event mapping.
Scala
import org.squbs.stream.TriggerEvent._
val inSource = <your-original-source>
val trigger = Source.actorPublisher[LifecycleState](Props.create(classOf[UnicomplexActorPublisher]))
.collect {
case Active => ENABLE
case Stopping | Failed => DISABLE
}
val aggregatedSource = new Trigger().source(inSource, trigger)
Java
import static org.squbs.stream.TriggerEvent.DISABLE;
import static org.squbs.stream.TriggerEvent.ENABLE;
final Source<?, ?> inSource = <your-original-source>;
final Source<?, ActorRef> trigger = Source.<LifecycleState>actorPublisher(Props.create(UnicomplexActorPublisher.class))
.collect(new PFBuilder<Integer, TriggerEvent>()
.matchEquals(Active.instance(), p -> ENABLE)
.matchEquals(Stopping.instance(), p -> DISABLE)
.matchEquals(Failed.instance(), p -> DISABLE)
.build()
);
final Source aggregatedSource = new Trigger(false).source(inSource, trigger);