Request/Response Pipeline
Overview
We often need to have common infrastructure functionality or organizational standards across different services/clients. Such infrastructure includes, but is not limited, to logging, metrics collection, request tracing, authentication/authorization, tracking, cookie management, A/B testing, etc.
As squbs promotes separation of concerns, such logic would belong to infrastructure and not service/client implementation. The squbs pipeline allows infrastructure to provide components installed into a service/client without service/client owner having to worry about such aspects.
Generally speaking, a squbs pipeline is a Bidi Flow acting as a bridge in between:
- the Akka HTTP layer and the squbs service:
- All request messages sent from Akka Http to squbs service will go thru the pipeline
- Vice versa, all response messages sent from squbs service will go thru the pipeline.
- squbs client and Akka HTTP host connection pool flow:
- All request messages sent from squbs client to Akka HTTP host connection pool will go thru the pipeline
- Vice versa, all response messages sent from Akka HTTP host connection pool to squbs client will go thru the pipeline.
Pipeline declaration
Default pre/post flows specified via the below configuration are automatically connected to the server/client side pipeline unless defaultPipeline
in individual service/client configuration is set to off
:
squbs.pipeline.server.default {
pre-flow = defaultServerPreFlow
post-flow = defaultServerPostFlow
}
squbs.pipeline.client.default {
pre-flow = defaultClientPreFlow
post-flow = defaultClientPostFlow
}
Pipeline declaration for services
In squbs-meta.conf
, you can specify a pipeline for a service:
squbs-services = [
{
class-name = org.squbs.sample.MyActor
web-context = mypath
pipeline = dummyflow
}
]
If there is no custom pipeline for a squbs-service, just omit.
With the above configuration, the pipeline would look like:
+---------+ +---------+ +---------+ +---------+
RequestContext ~>| |~> | |~> | |~> | |
| default | | dummy | | default | | squbs |
| PreFlow | | flow | | PostFlow| | service |
RequestContext <~| |<~ | |<~ | |<~ | |
+---------+ +---------+ +---------+ +---------+
RequestContext
is basically a wrapper around HttpRequest
and HttpResponse
, which also allows carrying context information.
Pipeline declaration for clients
In application.conf
, you can specify a pipeline for a client:
sample {
type = squbs.httpclient
pipeline = dummyFlow
}
If there is no custom pipeline for a squbs-client, just omit.
With the above configuration, the pipeline would look like:
+---------+ +---------+ +---------+ +----------+
RequestContext ~>| |~> | |~> | |~> | Host |
| default | | dummy | | default | |Connection|
| PreFlow | | flow | | PostFlow| | Pool |
RequestContext <~| |<~ | |<~ | |<~ | Flow |
+---------+ +---------+ +---------+ +----------+
Bidi Flow Configuration
A bidi flow can be specified as below:
dummyflow {
type = squbs.pipelineflow
factory = org.squbs.sample.DummyBidiFlow
}
- type: to idenfity the configuration as a
squbs.pipelineflow
. - factory: the factory class to create the
BidiFlow
from.
A sample DummyBidiFlow
looks like below:
class DummyBidiFlow extends PipelineFlowFactory {
override def create(context: Context)(implicit system: ActorSystem): PipelineFlow = {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
val inbound = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("DummyRequest", "ReqValue")) })
val outbound = b.add(Flow[RequestContext].map{ rc => rc.addResponseHeader(RawHeader("DummyResponse", "ResValue"))})
BidiShape.fromFlows(inbound, outbound)
})
}
}
Aborting the flow
In certain scenarios, a stage in pipeline may have a need to abort the flow and return an HttpResponse
, e.g., in case of authentication/authorization. In such scenarios, the rest of the pipeline should be skipped and the request should not reach to the squbs service. To skip the rest of the flow:
- the flow needs to be added to builder with
abortable
, e.g.,b.add(authorization abortable)
. - call
abortWith
onRequestContext
with anHttpResponse
when you need to abort.
In the below DummyAbortableBidiFlow
example, authorization
is a bidi flow with abortable
and it aborts the flow is user is not authorized:
class DummyAbortableBidiFlow extends PipelineFlowFactory {
override def create(context: Context)(implicit system: ActorSystem): PipelineFlow = {
BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val inboundA = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInA", "valInA")) })
val inboundC = b.add(Flow[RequestContext].map { rc => rc.addRequestHeader(RawHeader("keyInC", "valInC")) })
val outboundA = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutA", "valOutA"))})
val outboundC = b.add(Flow[RequestContext].map { rc => rc.addResponseHeaders(RawHeader("keyOutC", "valOutC"))})
val inboundOutboundB = b.add(authorization abortable)
inboundA ~> inboundOutboundB.in1
inboundOutboundB.out1 ~> inboundC
inboundOutboundB.in2 <~ outboundC
outboundA <~ inboundOutboundB.out2
BidiShape(inboundA.in, inboundC.out, outboundC.in, outboundA.out)
})
}
val authorization = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
val authorization = b.add(Flow[RequestContext] map { rc =>
if(!isAuthorized) rc.abortWith(HttpResponse(StatusCodes.Unauthorized, entity = "Not Authorized!"))
else rc
})
val noneFlow = b.add(Flow[RequestContext]) // Do nothing
BidiShape.fromFlows(authorization, noneFlow)
})
}
Once a flow is added with abortable
, a bidi flow gets connected. This bidi flow checks the existence of HttpResponse
and bypasses or sends the request downstream. Here is how the above DummyAbortableBidiFlow
looks:
+-----------------------------------+
| +-----------+ +-----------+ | +-----------+
+-----------+ +---------+ | | | ~> | filter o~~~0 ~>| |
| | | | | | | |not aborted| | | inboundC | ~> RequestContext
RequestContext ~> | inboundA |~> | |~> 0~~o broadcast | +-----------+ | | |
| | | | | | | | +-----------+
+-----------+ | | | | | ~> +-----------+ |
| inbound | | +-----------+ | filter | |
| outbound| | | aborted | |
+-----------+ | B | | +-----------+ <~ +-----------+ | +-----------+
| | | | | | | | | |
RequestContext <~ | outboundA | <~| | <~0~~o merge | | | outboundC | <~ RequestContext
| | | | | | o~~~~~~~~~~~~~~~~~~~~0 <~| |
+-----------+ +---------+ | +-----------+ | +-----------+
+-----------------------------------+