Persistent Buffer

PersistentBuffer is the first of a series of practical Akka Streams flow components. It works like the Akka Streams buffer with the difference that the content of the buffer is stored in a series of memory-mapped files in the directory given at construction of the PersistentBuffer. This allows the buffer size to be virtually limitless, not use the JVM heap for storage, and have extremely good performance in the range of a million messages/second at the same time.

Dependencies

The following dependencies are required for Persistent Buffer to work:

"org.squbs" %% "squbs-pattern" % squbsVersion,
"net.openhft" % "chronicle-queue" % "4.5.13"

Examples

The following example shows the use of PersistentBuffer in a stream:

implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val buffer = new PersistentBuffer[ByteString](new File("/tmp/myqueue"))
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val countFuture = source.via(buffer.async).runWith(counter)

This version shows the same in a GraphDSL:

implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val buffer = new PersistentBuffer[ByteString](new File("/tmp/myqueue"))
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(counter) { implicit builder =>
  sink =>
    import GraphDSL.Implicits._
    source ~> buffer.async ~> sink
    ClosedShape
})
val countFuture = streamGraph.run()

Back-Pressure

PersistentBuffer does not back-pressure upstream. It will take all the stream elements given to it and grow its storage by increasing, or rotating, the number of queue files. It does not have any means to determine a limit on the buffer size or determine the storage capacity. Downstream back-pressure is honored as per Akka Streams and Reactive Streams requirements.

If the PersistentBuffer stage gets fused with the downstream, PersistentBuffer would not buffer and it would actually back-pressure. To make sure PersistentBuffer actually runs in its own pace, add an async boundary just after it.

Failure & Recovery

Due to it's persistent nature, PersistentBuffer can recover from abrupt stream shutdowns, failures, JVM failures or even potential system failures. A restart of a stream with the PersistentBuffer on the same directory will start emitting the elements stored in the buffer and not yet consumed before the newly added elements. Elements consumed from the buffer but not yet finished processing at the time of the previous stream failure or shutdown will cause a loss of only those elements.

Since the buffer is backed by local storage, spindles or SSD, the performance and durability of this buffer is also dependent on the durability of this storage. A system malfunction or storage corruption may cause total loss of all elements in the buffer. So it is important to understand and assume the durability of this buffer not at the level of databases or other off-host persistent stores, in exchange for much higher performance.

Akka Streams stages batch the requests and buffers the records internally. PersistentBuffer guarantees the recovery and persistence of the records that reached to onPush, the records that are in Akka Stream stage's internal buffer that has not reached to onPush yet would be lost during a failure.

Commit Guarantee

In case of an unexpected failure, elements emitted from the PersistentBuffer stage but not yet reached to a sink would be lost. Sometimes, it might be required to avoid such data loss. Using a commit stage before a sink might help in such case. To add a commit stage, use PersistentBufferAtLeastOnce instead. Please see below example for commit stage usage:

implicit val serializer = QueueSerializer[ByteString]()
val source = Source(1 to 1000000).map { n => ByteString(s"Hello $n") }
val tempPath = new File("/tmp/myqueue")
val config = ConfigFactory.parseMap {
    Map(
      "persist-dir" -> s"${tempPath.getAbsolutePath}"
    )
  }
val buffer = new PersistentBufferAtLeastOnce[ByteString](config)
val commit = buffer.commit[ByteString]
val flowSink = // do some transformation or a sink flow with expected failure
val counter = Flow[Any].map( _ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(counter) { implicit builder =>
  sink =>
    import GraphDSL.Implicits._
    // ensures that records are reprocessed when something fails at tranform flow
    source ~> buffer ~> flowSink ~> commit ~> sink 
    ClosedShape
})
val countFuture = streamGraph.run()

Please note, commit does not prevent the loss of messages in a sink's (or any other stage's after commit) internal buffer.

Commit Order

The commit stage should normally receive the elements in the index order. However, a potential bug in a stream may cause an element to be dropped or reach to commit stage out of order. The default commit-order-policy is set to lenient to let the stream continue in such scenarios. You can set it to strict for a CommitOrderException to be thrown and let the Supervision.Decider determine what action to take.

Space Management

A typical directory for persisting the queue looks like the followings:

$ ls -l
-rw-r--r--  1 squbs_user     110054053  83886080 May 17 20:00 20160518.cq4
-rw-r--r--  1 squbs_user     110054053      8192 May 17 20:00 tailer.idx

Queue files are deleted automatically once all the readers have successfully processed reading the queue.

Configuration

The queue can be created by passing just a location of the persistent directory keeping all default configuration. This is seen in all the examples above. Alternatively, it can be created by passing a Config object at construction. The Config object is a standard HOCON configuration. The following example shows constructing a PersistentBuffer using a Config:

val configText =
  """
    | persist-dir = /tmp/myQueue
    | roll-cycle = xlarge_daily
    | wire-type = compressed_binary
    | block-size = 80m
  """.stripMargin
val config = ConfigFactory.parseString(configText)

// Construct the buffer using a Config.
val buffer = new PersistentBuffer[ByteString](config)

The following configuration properties are used for the PersistentBuffer

persist-dir = /tmp/myQueue # Required
roll-cycle = daily         # Optional, defaults to daily
wire-type = binary         # Optional, defaults to binary
block-size = 80m           # Optional, defaults to 64m
index-spacing = 16k        # Optional, defaults to roll-cycle's spacing 
index-count = 16           # Optional, defaults to roll-cycle's count
commit-order-policy = lenient # Optional, default to lenient

Roll-cycle can be specified in lower or upper case. Supported values for roll-cycle are as follows:

Roll Cycle Capacity
MINUTELY 64 million entries per minute
HOURLY 256 million entries per hour
SMALL_DAILY 512 million entries per day
DAILY 4 billion entries per day
LARGE_DAILY 32 billion entries per day
XLARGE_DAILY 2 trillion entries per day
HUGE_DAILY 256 trillion entries per day

Wire-type can be specified in lower or upper case. Supported values for wire-type are as follows:

  • TEXT
  • BINARY
  • FIELDLESS_BINARY
  • COMPRESSED_BINARY
  • JSON
  • RAW
  • CSV

The memory sizes such as block-size and index-spacing are specified according to the memory size format defined in the HOCON specification.

Serialization

A QueueSerializer[T] needs to be implicitly provided for a PersistentBuffer[T], as seen in the examples above:

implicit val serializer = QueueSerializer[ByteString]()

The QueueSerializer[T]() call produces a serializer for your target type. It depends on the serialization and deserialization of the underlying infrastructure.

Implementing a Serializer

To control the fine-grained persistent format in the queue, you may want to implement your own serializer as follows:

case class Person(name: String, age: Int)

class PersonSerializer extends QueueSerializer[Person] {

  override def readElement(wire: WireIn): Option[Person] = {
    for {
      name <- Option(wire.read().`object`(classOf[String]))
      age <- Option(wire.read().int32)
    } yield { Person(name, age) }
  }

  override def writeElement(element: Person, wire: WireOut): Unit = {
    wire.write().`object`(classOf[String], element.name)
    wire.write().int32(element.age)
  }
}

To use this serializer, just declare it implicitly before constructing the PersistentBuffer as follows:

implicit val serializer = new PersonSerializer()
val buffer = new PersistentBuffer[Person](new File("/tmp/myqueue")

Broadcast Buffer

BroadcastBuffer is a variant of persistent buffer. This works similar to PersistentBuffer except that stream elements are broadcasted to multiple output ports. Hence it is a combination of buffer and broadcast stages. The configuration takes an additional parameter named output-ports which specifies the number of output ports.

A broadcast buffer is specially required when stream elements are to be emitted from each output port at an independent rate depending on the speed of downstream demand.

val configText =
  """
    | persist-dir = /tmp/myQueue
    | roll-cycle = xlarge_daily
    | wire-type = compressed_binary
    | block-size = 80m
    | output-ports = 3
  """.stripMargin
val config = ConfigFactory.parseString(configText)

// Construct the buffer using a Config.
val bcBuffer = new BroadcastBuffer[ByteString](config)

Examples

implicit val serializer = QueueSerializer[ByteString]()

val in = Source(1 to 100000)
val flowCounter = Flow[Any].map(_ => 1L).reduce(_ + _).toMat(Sink.head)(Keep.right)

val streamGraph = RunnableGraph.fromGraph(GraphDSL.create(flowCounter) { implicit builder =>
      sink =>
        import GraphDSL.Implicits._
        val buffer = new BroadcastBufferAtLeastOnce[ByteString](config)
        val commit = buffer.commit[ByteString]
        val bcBuffer = builder.add(buffer.async)
        val mr = builder.add(merge)
        in ~> transform ~> bcBuffer ~> commit ~> mr ~> sink
                           bcBuffer ~> commit ~> mr
                           bcBuffer ~> commit ~> mr
        ClosedShape
    })

val countFuture = streamGraph.run()

Credits

PersistentBuffer utilizes Chronicle-Queue 4.x as high-performance memory-mapped queue persistence.