Akka HTTP Client on Steroids
Overview
squbs-httpclient
project adds operationalization aspects to Akka HTTP Host-Level Client-Side API while keeping the Akka HTTP API. Here is the list of features it adds:
- Service Discovery: Lets any service discovery mechanism to be plugged in and allows resolving HTTP endpoints by string identifiers, e.g.,
paymentserv
. - Per Client configuration: Let's each client to individually override defaults in
application.conf
. - Pipeline: Allows a
Bidi
Akka Streams flow to be registered globally or individually for clients. - Metrics: Provides Codahale Metrics out-of-the-box for each client without AspectJ.
- JMX Beans: Exposes the configuration of each client as JMX beans.
- Circuit Breaker: Provides resiliency with a stream based circuit breaker.
Dependency
Add the following dependency to your build.sbt
or scala build file:
"org.squbs" %% "squbs-httpclient" % squbsVersion
Usage
squbs-httpclient
project sticks to the Akka HTTP API. The only exception is during the creation of host connection pool. Instead of Http().cachedHostConnectionPool
, it defines ClientFlow
with the same set of parameters (and few extra optional parameters).
Scala
Similar to the example at Akka HTTP Host-Level Client-Side API, the Scala use of ClientFlow
is as follows:
// construct a pool client flow with context type `Int`
val poolClientFlow = ClientFlow[Int]("sample") // Only this line is specific to squbs. Takes implicit ActorSystem.
val responseFuture: Future[(Try[HttpResponse], Int)] =
Source.single(HttpRequest(uri = "/") -> 42)
.via(poolClientFlow)
.runWith(Sink.head)
You can pass optional settings, e.g., HttpsConnectionContext
and ConnectionPoolSettings
, to ClientFlow
.
val clientFlow = ClientFlow("sample", Some(connectionContext), Some(connectionPoolSettings))
Java
Also, similar to the example at Akka HTTP Host-Level Client-Side API, the Java use of ClientFlow
is as follows:
final ActorSystem system = ActorSystem.create();
final Materializer mat = Materializer.createMaterializer(system);
final Flow<Pair<HttpRequest, Integer>, Pair<Try<HttpResponse>, Integer>, HostConnectionPool>
clientFlow = ClientFlow.create("sample", system, mat); // Only this line is specific to squbs
final CompletionStage<Pair<Try<HttpResponse>, Integer>> responseFuture =
Source.single(Pair.create(HttpRequest.create().withUri("/"), 42))
.via(clientFlow)
.runWith(Sink.head(), mat);
You can pass Optional
settings, e.g., HttpsConnectionContext
and ConnectionPoolSettings
, to ClientFlow.create
. It also provides a fluent API:
final Flow<Pair<HttpRequest, Integer>, Pair<Try<HttpResponse>, Integer>, HostConnectionPool> clientFlow =
ClientFlow.<Integer>builder()
.withSettings(connectionPoolSettings)
.withConnectionContext(connectionContext)
.create("sample", system, mat);
HTTP Model
Scala
Below is an HttpRequest
creation example in Scala. Please see HTTP Model Scala documentation for more details:
import HttpProtocols._
import MediaTypes._
val charset = HttpCharsets.`UTF-8`
val userData = ByteString("abc", charset.nioCharset())
val authorization = headers.Authorization(BasicHttpCredentials("user", "pass"))
HttpRequest(
PUT,
uri = "/user",
entity = HttpEntity(`text/plain` withCharset charset, userData),
headers = List(authorization),
protocol = `HTTP/1.0`)
Java
Below is an HttpRequest
creation example in Java. Please see Http Model Java documentation for more details:
import HttpProtocols.*;
import MediaTypes.*;
Authorization authorization = Authorization.basic("user", "pass");
HttpRequest complexRequest =
HttpRequest.PUT("/user")
.withEntity(HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, "abc"))
.addHeader(authorization)
.withProtocol(HttpProtocols.HTTP_1_0);
Service Discovery Chain
squbs-httpclient
does not require a hostname/port combination to be provided during client pool creation. Instead it allows a service discovery chain to be registered which allows resolving HttpEndpoint
s by a string identifier by running through the registered service discovery mechanisms. For instance, in the above example, "sample"
is the logical name of the service that client wants to connect, the configured service discovery chain will resolve it to an HttpEndpoint
which includes host and port, e.g., http://akka.io:80
.
Please note, you can still pass a valid http URI as a string to ClientFlow
as a default resolver to resolve valid http URIs is pre-registered in the service discovery chain by default:
ClientFlow[Int]("http://akka.io")
in ScalaClientFlow.create("http://akka.io", system, mat)
in Java
There are two variations of registering resolvers as shown below. The closure style allows more compact and readable code. However, the subclass has the power to keep state and make resolution decisions based on such state:
Scala
Register function type (String, Env) => Option[HttpEndpoint]
:
ResolverRegistry(system).register[HttpEndpoint]("SampleEndpointResolver") { (svcName, env) =>
svcName match {
case "sample" => Some(HttpEndpoint("http://akka.io:80"))
case "google" => Some(HttpEndpoint("http://www.google.com:80"))
case _ => None
}
}
Register class extending Resolver[HttpEndpoint]
:
class SampleEndpointResolver extends Resolver[HttpEndpoint] {
override def name: String = "SampleEndpointResolver"
override def resolve(svcName: String, env: Environment): Option[HttpEndpoint] =
svcName match {
case "sample" => Some(Endpoint("http://akka.io:80"))
case "google" => Some(Endpoint("http://www.google.com:80"))
case _ => None
}
}
// Register EndpointResolver
ResolverRegistry(system).register[HttpEndpoint](new SampleEndpointResolver)
Java
Register BiFunction<String, Environment, Optional<HttpEndpoint>>
:
ResolverRegistry.get(system).register(HttpEndpoint.class, "SampleEndpointResolver", (svcName, env) -> {
if ("sample".equals(svcName)) {
return Optional.of(HttpEndpoint.create("http://akka.io:80"));
} else if ("google".equals(svcName))
return Optional.of(HttpEndpoint.create("http://www.google.com:80"));
} else {
return Optional.empty();
}
});
Register class extending AbstractResolver<HttpEndpoint>
:
class SampleEndpointResolver extends AbstractResolver<HttpEndpoint> {
String name() {
return "SampleEndpointResolver";
}
Optional<HttpEndpoint> resolve(svcName: String, env: Environment) {
if ("sample".equals(svcName)) {
return Optional.of(Endpoint.create("http://akka.io:80"));
} else if ("google".equals(svcName))
return Optional.of(Endpoint.create("http://www.google.com:80"));
} else {
return Optional.empty();
}
}
}
// Register EndpointResolver
ResolverRegistry.get(system).register(HttpEndpoint.class, new SampleEndpointResolver());
You can register multiple Resolver
s. The chain is executed in the reverse order of registration. If a resolver returns None
, it means it could not resolve it and the next resolver is tried.
If the resolved endpoint is a secure one, e.g., https, an SSLContext
can be passed to HttpEndpoint
as an optional parameter.
An optional Config
can also be passed to HttpEndpoint
to override the default configuration. However, the client specific configuration has higher precedence over the passed in configuration.
Please see Resource Resolution for details on resolution in general.
Per Client Configuration
Akka HTTP Configuration defines the default values for configuration. You can override these defaults in application.conf
; however, that would affect all the clients. To do a client specific override, Akka HTTP allows passing a ConnectionPoolSettings
during HostConnectionPool
flow creation. This is supported by squbs as well.
In addition to the above, squbs allows a client specific override in application.conf
. You just need to specify a configuration section with the client's name that has type = squbs.httpclient
. Then, you can specify any client configuration inside the section. For instance, if we would like to override the max-connections
setting only for the above "sample"
client, but no other client, we can do it as follows:
sample {
type = squbs.httpclient
akka.http.host-connection-pool {
max-connections = 10
}
}
Pipeline
We often need to have common infrastructure functionality or organizational standards across different clients. Such infrastructure includes, but is not limited to, logging, metrics collection, request tracing, authentication/authorization, tracking, cookie management, A/B testing, etc. As squbs promotes separation of concerns, such logic would belong to infrastructure and not client implementation. The squbs pipeline allows infrastructure to provide components installed into a client without the client owner having to worry about such aspects. Please see squbs pipeline for more details.
Generally speaking, a pipeline is a Bidi Flow acting as a bridge in between squbs client and the Akka HTTP layer. squbs-httpclient
allows registering a Bidi Akka Streams flow globally for all clients (default pipeline) or for individual clients. To register a client specific pipeline, set the pipeline
configuration. You can turn on/off the default pipeline via defaultPipeline
setting (it is set to on
, if not specified):
sample {
type = squbs.httpclient
pipeline = metricsFlow
defaultPipeline = on
}
Please see squbs pipeline to find out how to create a pipeline and configure default pipeline.
Metrics
squbs comes with pre-built pipeline elements for metrics collection and squbs giter8 templates sets those as default. Accordingly, each squbs http client is enabled to collect Codahale Metrics out-of-the-box without any code change or configuration. Please note, squbs metrics collection does NOT require AspectJ or any other runtime code weaving. The following metrics are available on JMX by default:
- Request Timer
- Request Count Meter
- Response Count Meter
- A meter for each http response status code category: 2xx, 3xx, 4xx, 5xx
- A meter for each exception type that was returned by
ClientFlow
.
You can access the MetricRegistry
by MetricsExtension(system).metrics
. This allows you to create further meters, timers, histograms, etc or to pass it to a different type metrics reporter.
JMX Beans
Visibility of the system configuration has utmost importance while trouble shooting an issue. squbs-httpclient
registers a JMX bean for each client. The JMX bean exposes all the configuration, e.g., endpoint, host connection pool settings, etc. The name of the bean is set as org.squbs.configuration.${system.name}:type=squbs.httpclient,name=$name
. So, if the actor system name is squbs
and the client name is sample
, then the name of the JMX bean would be org.squbs.configuration.squbs:type=squbs.httpclient,name=sample
.
Circuit Breaker
squbs provides CircuitBreakerBidi
Akka Streams GraphStage
to provide circuit breaker functionality for streams. It is a generic circuit breaker implementation for streams. Please see Circuit Breaker documentation for details.
Circuit Breaker might potentially change the order of messages, so it requires a Context
to be carried around, like ClientFlow
. But, in addition to that, it needs to be able to uniquely identify each element for its internal mechanics. Accordingly, the Context
passed to ClientFlow
or a mapping from Context
should be able to uniquely identify each element. If circuit breaker is enabled, and the Context
passed to ClientFlow
does not uniquely identify each element, then you will experience unexpected behavior. Please see Context to Unique Id Mapping section of Circuit Breaker documentation for details on providing a unique id.
Circuit Breaker is disabled by default. Please see Configuring in application.conf and Passing CircuitBreakerSettings Programmatically sections below for enabling.
Once enabled, by default, any Failure
or a Success
with http status code 400
or greater increments the failure count. The default CircuitBreakerState
implementation is AtomicCircuitBreakerState
, which can be shared across materializations and across flows. These can be customized by Passing CircuitBreakerSettings Programmatically.
Configuring in application.conf
Inside the client specific configuration, you can add circuit-breaker
and specify the configuration you would like to override. For the rest, it will use the default settings. Please see here for the default circuit breaker configuration.
sample {
type = squbs.httpclient
circuit-breaker {
max-failures = 2
call-timeout = 10 milliseconds
reset-timeout = 100 seconds
}
}
Passing CircuitBreakerSettings Programmatically
You can pass a CircuitBreakerSettings
instance with the programmatically. The API lets you to pass a custom CircuitBreakerState
and optional fallback, failure decider and unique id mapper functions. If a CircuitBreakerSettings
instance is passed in programmatically, then the circuit breaker settings in application.conf
are ignored.
In below examples a fallback response is provided via withFallback
. The default failure decider is overriden via withFailureDecider
to consider only status codes >= 500
to increment failure count of circuit breaker:
Scala
import org.squbs.streams.circuitbreaker.CircuitBreakerSettings
val circuitBreakerSettings =
CircuitBreakerSettings[HttpRequest, HttpResponse, Int](circuitBreakerState)
.withFallback( _ => Try(HttpResponse(entity = "Fallback Response")))
.withFailureDecider(
response => response.isFailure || response.get.status.intValue() >= StatusCodes.InternalServerError.intValue)
val clientFlow = ClientFlow[Int]("sample", circuitBreakerSettings = Some(circuitBreakerSettings))
Java
import org.squbs.streams.circuitbreaker.japi.CircuitBreakerSettings;
final CircuitBreakerSettings circuitBreakerSettings =
CircuitBreakerSettings.<HttpRequest, HttpResponse, Integer>create(circuitBreakerState)
.withFallback(httpRequest -> Success.apply(HttpResponse.create().withEntity("Fallback Response")))
.withFailureDecider(response ->
response.isFailure() || response.get().status().intValue() >= StatusCodes.INTERNAL_SERVER_ERROR.intValue());
Calling another service as a fallback
A common scenario in circuit breaker usage is to call another service as the fallback. Calling another service would require a new stream materialization within the fallback function; so, we suggest users to get the fail fast message downstream instead and branch off accordingly. This lets defining the fallback ClientFlow
within the same stream.