Fork me on GitHub

Bryan Gilbert - Random.next()

akka-stream: Akka's reactive stream implementation

If you've been following recent developments around Akka, then I'm sure you've heard about the Reactive Streams Specification and Akka's adoption of it in the form of the akka-stream module. Akka's implementation of reactive streams provides a useful set of abstractions to make thinking about and working with stream processing much simpler and more robust. This coupled with Akka's partnership with Mathias Doenitz of Spray to create an http module (while heavily leveraging akka-stream) makes for a particularly exciting set of developments.

In this post, I hope to provide a good foundation to understand the Reactive Streams specification and how Akka implements it in akka-stream. I do not plan on covering any of the DSL provided with akka-stream, but instead will be focusing on a lower level look at it's core abstractions in an effort to gain a better understanding of how it works. I plan on turning this into a set of three posts, with the next post building on these fundamentals with the Flow DSL, and finally looking at akka-http and how to create a proper reactive streams among multiple servers using HTTP. All code in this post can be found in this repo.

Let's start by taking a look at the Reactive Streams Specification.

Reactive Streams

The reactive streams specification essentially boils down to a minimal set of interfaces that together can be used to achieve asynchonous streaming with the ability to apply non-blocking back pressure. At a high level these interfaces provide a two way mechanism that allows for subscribers to signal that they can handle more items and for publishers to push items to the subscribers when they're ready. To illustrate this I've shamelessly stolen a diagram created by Mathias Doenitz for his akka-http talk at ScalaDays 2014 (You can check it out here).

Reactive Streams w/ Backpressure

Each step in the image acts as both a publisher and a subscriber. If one follows the large arrows on the bottom moving forward, this is the direction that data is published through the stream. The top set of arrows pointing back from each step is the direction that demand is signaled back through the stream. Being a publisher in a reactive stream, the publisher can only push data to a subscriber if and only if the subsciber has indicated that it can handle more data. As long as this chain is unbroken and every step in the chain obeys the rules of being a participant in a reactive stream then step 1 can properly react to back pressure generated by step 4. This means that if a computation happening pretty far down stream behaves slowly and begins to back up, it can prevent data from being sent at the source of the stream until it has a chance to catch up.

In order to get a more concrete understanding of exactly how this back pressure mechanism is achieved let's take a look at these interfaces.

Subscriber

First we'll take a look at the Subscriber interface:

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

This interface is fairly straightforward with the onNext, onError, and onComplete functions being fairly standard to most stream implementations. As new elements are produced, a publisher would invoke the onNext function with each new element as they are produced. If an error was encounder while producing values, the publisher would then invoke the onError function with the exception that was encountered. Finally, when the there are no more elements to be published in the stream, the onComplete function is then invoked.

This leaves us with one final function, the onSubscribe function which gets invoked when a subscriber is subscribed to a publisher. This function is key to enabling proper back pressure handling in reactive streams. In order to understand how this works lets move onto the Subscription interface.

Subscription

A subscription is a subscriber's communication channel back to the publisher that allows it to either cancel the subscription or signal demand by requesting more data. The interface for a Subscription is very simple:

public interface Subscription {
    public void request(long n);
    public void cancel();
}

A subscription contains just two functions request which signals to the publisher how many more elements a subscriber can handle and cancel which allows a subscriber to terminate a subscription meaning the publisher will no longer send data to the subscriber. This line of communication between the publisher and subscriber provided by the subscription is the mechanism by which non blocking back pressure is achieved in Reactive Streams.

Publisher

The last interface that needs to be implemented to tie all of this together is the publisher. The publisher is responsible for implementing a single function subscribe that takes a subscriber instance.

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

While this may be the simplest interface we've seen so far, the publisher is responsible for tying all of the above functionality together. When a subscriber is subscribed via the subscribe function, the publisher is responsible with providing the subscriber with a subscription. The subscriber must provide stream elements to the subscriber by invoking the onNext function, but must not exceed the total number elements that the subscriber has expressed demand for. Additionally, the publisher must honor the subscriber's cancel request if the cancel function is invoked on the subscription, as well as be responsible for invoking the onError and onComplete functions if an error is encountered or the stream is completed respectively.

The Reactive Streams specification defines one more interface, the processor interface.

Processor

The Processor interface is very simple and consists of a single interface that combines the Subscriber and Publisher interfaces and is defined as such:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }

The Processor interface is very important because it allows stream processors to be created that can consume, transform, and publish streams while retaining back pressure propogation with the original stream source. In the image shown above each of the intermediary steps that both take in a stream a produce a corresponding stream are themselves processors. Why the processor concept is integral to retaining back pressure propogation will become clear later on when we implement one.

Now that we've been introduced to Reactive Streams, let's take a look at how akka implements them.

Akka-Stream

Akka currently has an experiental akka-stream module that implements the Reactive Streams specification. This experimental module can be included in any sbt project by adding the following dependency:

"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-M2"

While we are exploring how the Reactive Streams specification is implemented in the akka-stream module, we'll also be implementing a trivial example with a publisher that produces fibonacci values, a transformer that doubles every value that comes through, and a subscriber that merely logs values as they are consumed along with a fixed delay to simulate a slow consumer. Let's get started by implementing the publisher.

Actor Publisher

Before we implement a publisher that calculates and publishes fibonacci values, let's first look at how we can use akka-stream to create a publisher. Being that akka is at it's heart an actor based concurrency framework, it makes sense that akka allows us to create reactive streams completely out of actors. That being said akka-stream provides a mixable trait to help us out: ActorPublisher[T] (scaladoc).

The ActorPublisher[T] trait can be mixed into any actor and provides important publisher-specific lifecycle management. It handles most of the book keeping required to properly manage the life cycle of a publisher under the covers and provides a number of functions to interrogate the current state of the subscription and manipulate the stream. Specifically, it provides functions that track the state of the subscription:

  • isActive - Returns a boolean indicating if the stream is in an active state
  • isCompleted - Returns a boolean indicating if the stream has been completed
  • isErrorEmitted - Returns a boolean indicating if the stream has encountered an exception
  • totalDemand - Returns an long indicating the total demand that has been signaled by subscriber

Each of these functions are updated automatically during the lifetime of the subscription. For example, as the subscriber signals that they can handle more demand, the ActorPublisher trait automatically increases the totalDemand.

The ActorPublisher[T] trait also defines several functions for interacting with the stream:

  • onNext - Pushes an element into the stream (Throws an exception if isActive is false or totalDemand < 1)
  • onComplete - Completes the stream
  • onError - Terminates the stream with an error

Finally two messages are used to signal to the implementing actor when the subscriber either cancels the subscription or signals more demand: Cancel and Request(n: Long).

Fibonacci Publisher

Now that we've looked at how we can use the ActorPublisher[T] trait to help implement a reactive publisher, let's implement our fibonacci publisher.

class FibonacciPublisher extends ActorPublisher[BigInteger] with ActorLogging {      // 1
  var prev = BigInteger.ZERO
  var curr = BigInteger.ZERO

  def receive = {
    case Request(cnt) =>                                                             // 2
      log.debug("[FibonacciPublisher] Received Request ({}) from Subscriber", cnt)
      sendFibs()
    case Cancel =>                                                                   // 3
      log.info("[FibonacciPublisher] Cancel Message Received -- Stopping")
      context.stop(self)
    case _ =>
  }

  def sendFibs() {
    while(isActive && totalDemand > 0) {                                             // 4
      onNext(nextFib())
    }
  }

  def nextFib(): BigInteger = {    
    if(curr == BigInteger.ZERO) {
      curr = BigInteger.ONE
    } else {
      val tmp = prev.add(curr)
      prev = curr
      curr = tmp
    }
    curr
  }
}

Our fibonacci publisher is fairly simple, he maintains two values prev and curr which are used to maintain state and calculate the next value in the fibonacci sequence. We're using BigInteger here to prevent overflows. At a high level all our Publisher is doing is generating and emitting fibonacci values based on demand from a consumer. Let's a take a look at some of the important points in more detail:

  1. Extend ActorPublisher[BigInteger], which declares that we intend to produce BigInteger values

    class FibonacciPublisher extends ActorPublisher[BigInteger] with ActorLogging {
    
  2. Here we react to request messages and send as many fibonacci values as we can to fulfill the subscribers demand

        case Request(cnt) =>
          log.debug("[FibonacciPublisher] Received Request ({}) from Subscriber", cnt)
          sendFibs()
    
  3. Here we react to cancel messages indicating that the subscriber has cancelled the subscription and promptly stop the actor

        case Cancel =>
          log.info("[FibonacciPublisher] Cancel Message Received -- Stopping")
          context.stop(self)
    
  4. Here we generate and send as many fibonacci values as we can, checking after pushing each value if there is more demand

        while(isActive && totalDemand > 0) {
          onNext(nextFib())
        }
    

Finally we can instantiate our actor and use the ActorPublisher apply function to create a bona fide reactive streams publisher:

val pubRef = system.actorOf(Props[FibonacciPublisher])
val publisher = ActorPublisher[BigInteger](pubRef) // => org.reactivestreams.Publisher

Now that we've created a reactive publisher, let's look at creating a subscriber to subscribe to it!

Actor Subscriber

To help us with the task of creating a reactive subscriber akka provides a trait to manage the lifecycle of a subscriber, ActorSubscriber (scaladoc). This trait behaves very similarly to ActorProducer[T] in that it provides several facilities to assist us properly behave the way that a reactive subscriber should. It provides a number of messages that will be sent to the actor as normal stream lifecycle events occur:

  • OnNext - Emitted when publisher pushes a new element into the stream
  • OnComplete - Emitted when publisher completes the stream
  • OnError - Emitted when the publisher ends the stream due to an exception

Additionally, the ActorSubscriber trait forces any subclass to define a request strategy. A request strategy is used to control stream back pressure and as such dictates when a request is made of the subscription and for how many elements. A request strategy is a simple object that the subscriber uses after every element pushed into the stream to determine whether or not more elements should be requested. It's interface is defined as:

trait RequestStrategy {
  def requestDemand(remainingRequested: Int): Int
}

There are several implementations provided by akka-stream, however it is also very simple to roll your own. The available implementations are:

  • WatermarkRequestStrategy - This strategy allows us to specify a high and low watermark that is used to request more elements. If the number of unhandled messages is less than the low watermark this strategy will request enough elements to meet the high watermark. This is a good choice if all work is done directly inside the current actor, however it would be a poor choice to use if this actor is delegating work to a third party.
  • MaxInFlightRequestStrategy - This strategy tracks the number of elements that are currently "in flight", meaning they may or may not have been consumed, but not fully handled. To use this strategy we would need to implement a subclass of this strategy that defines the function inFlightInternally: Int that tracks the current number of elements being processed. We will use this strategy to implement our Doubling Processor.
  • ZeroRequestStrategy - Finally, we can define no request strategy at all and simply call the request function provided by the ActorSubcriber trait explictly when we want more elements.

Now that we're a bit familiar with the ActorSubscriber trait let's implement a sloooow fibonacci subscriber so that we can see some of this back pressure propogation in action.

Slow Fibonacci Subscriber

Let's jump straight into our subscriber implementation and walk through the details:

class FibonacciSubscriber(delay: Long) extends ActorSubscriber with ActorLogging {       // 1
  val requestStrategy = WatermarkRequestStrategy(50)                                     // 2

  def receive = {
    case OnNext(fib: BigInteger) =>                                                      // 3
      log.debug("[FibonacciSubscriber] Received Fibonacci Number: {}", fib)
      Thread.sleep(delay)
    case OnError(err: Exception) =>                                                      // 4
      log.error(err, "[FibonacciSubscriber] Receieved Exception in Fibonacci Stream")
      context.stop(self)
    case OnComplete =>                                                                   // 5
      log.info("[FibonacciSubscriber] Fibonacci Stream Completed!")
      context.stop(self)
    case _ =>
  }
}

Not too exciting huh? Notice that we're using the WatermarkRequestStrategy with a high watermark of 50 and a low watermark of 25 (automatically set to half the high watermark if not specified). Since we are using a sleep to simulate work and introduce a delay we can use the WatermarkRequestStrategy since all the "work" is happening inside the actor. This actor will request enough elements to reach the high watermark whenever the number of elements queued up is lower than the low watermark.

Let's step through this:

  1. We extend the ActorSubscriber trait

    class FibonacciSubscriber(delay: Long) extends ActorSubscriber with ActorLogging {
    
  2. We define our WatermarkRequestStrategy with a high watermark of 50 and a low watermark of 25

        val requestStrategy = WatermarkRequestStrategy(50)
    
  3. We handle each OnNext message containing the next fibonacci value pushed into the stream by the publisher by logging it and introducing a simulated delay using Thread.sleep(delay)

        case OnNext(fib: BigInteger) =>
          log.debug("[FibonacciSubscriber] Received Fibonacci Number: {}", fib)
          Thread.sleep(delay)
    
  4. We handle any OnError messages emitted by the publisher by logging the error and stopping the subscriber

        case OnError(err: Exception) =>
          log.error(err, "[FibonacciSubscriber] Receieved Exception in Fibonacci Stream")
          context.stop(self)
    
  5. We handle any OnComplete messages by logging and stopping the subscriber

        case OnComplete =>
          log.info("[FibonacciSubscriber] Fibonacci Stream Completed!")
          context.stop(self)
    

Finally, let's create an instance of our subscriber:

val subRef = system.actorOf(Props(new FibonacciSubscriber(500L))) // 500 ms delay
val subscriber = ActorSubscriber[BigInteger](subRef) // => org.reactivestreams.Subscriber

We can now hook our slow subscriber up to our publisher:

publisher.subscribe(subscriber)

And that's all there is to it, our subscriber will slowly log each fibonacci value without becoming overwhelmed!

Now, let's move onto our last component by introducing a middle man into our stream that alters the stream in a meaningful way while correctly propogating back pressure to our FibonacciProducer from our ever so slow FibonacciSubscriber.

Doubling Fibonacci Processor

Our middle man will be a processor that consumes the fibonacci numbers from the FibonacciPublisher and emits a new stream that consists of each fibonacci number doubled that we will use the slow FibonacciSubscriber to consume. In order to create this processor we simply need to create an actor that is both an ActorPublisher[T] and an ActorSubscriber. Without further ado, let's implement our processor:

class DoublingProcessor extends ActorSubscriber with ActorPublisher[BigInteger] {   // 1
  val dos = BigInteger.valueOf(2L)
  val doubledQueue = MQueue[BigInteger]()                                           // 2

  def receive = {
    case OnNext(biggie: BigInteger) =>                                              // 3
      doubledQueue.enqueue(biggie.multiply(dos))
      sendDoubled()
    case OnError(err: Exception) =>                                                 // 4
      onError(err)
      context.stop(self)
    case OnComplete =>                                                              // 5
      onComplete()
      context.stop(self)
    case Request(cnt) =>                                                            // 6
      sendDoubled()
    case Cancel =>                                                                  // 7
      cancel()
      context.stop(self)
    case _ =>
  }

  def sendDoubled() {
    while(isActive && totalDemand > 0 && !doubledQueue.isEmpty) {                   // 8
      onNext(doubledQueue.dequeue())
    }
  }

  val requestStrategy = new MaxInFlightRequestStrategy(50) {                        // 9
    def inFlightInternally(): Int = { doubledQueue.size }
  }
}

The DoublingProcessor acts as both a publisher and a subscriber that doubles and subsequently publishes every element it receives. Note that we use a queue to store our doubled values that have yet to be sent to the subscriber. We are able to create an anonymous subclass of MaxInFlightRequestStrategy that uses the current size of the queue to determine the number of unprocessed messages ('inFlightInternally'). This request strategy will request new elements in batches of at least 5 (the default) until it reaches the maximum of 50 and will only ever request enough elements to reach that number.

Let's step through the above example:

  1. Here we are extending both ActorSubscriber and ActorPublisher

    class DoublingProcessor extends ActorSubscriber with ActorPublisher[BigInteger] {
    
  2. We are using a mutable.Queue to store all unpublished doubled values

        val doubledQueue = MQueue[BigInteger]()
    
  3. Here we are reacting a new element and handling the OnNext message by putting the doubled value on the queue and publishing as many values as we can that the subscriber has expressed demand for

        case OnNext(biggie: BigInteger) =>
          doubledQueue.enqueue(biggie.multiply(dos))
          sendDoubled()
    
  4. Here we are reacting to the OnError message by pushing that error forward to the subscriber and stopping the processor

        case OnError(err: Exception) =>
          onError(err)
          context.stop(self)
    
  5. Here we are reacting to the OnCompletemessage by completing the subscription and stopping the processor

        case OnComplete =>
          onComplete()
          context.stop(self)
    
  6. Here we are reacting to a Request for more elements from the subscriber and sending as many elements down as possible

        case Request(cnt) =>
          sendDoubled()
    
  7. Here we are reacting to a Cancel message from the subscriber and cancelling the subscription with our publisher

        case Cancel =>
          cancel()
          context.stop(self)
    
  8. Here we are publishing as many elements as possible while ensuring the subscription is active and demand exists

      def sendDoubled() {
        while(isActive && totalDemand > 0 && !doubledQueue.isEmpty) {
          onNext(doubledQueue.dequeue())
        }
      }
    
  9. Here we are defining a subclass of MaxInFlightRequestStrategy that will have a maximum of 50 elements in flight using the queue size to determine how many elements are currently in flight

      val requestStrategy = new MaxInFlightRequestStrategy(50) {
        def inFlightInternally(): Int = { doubledQueue.size }
      }
    

Now that we have our processor that will double our fibonacci values, we can now use it to create a stream with three players in it and see that backpressure is properly propogated from our subscriber through our processor back to the publisher:

val pubRef = system.actorOf(Props[FibonacciPublisher])
val publisher = ActorPublisher[BigInteger](pubRef)

val subRef = system.actorOf(Props(new FibonacciSubscriber(500L))) // 500 ms delay
val subscriber = ActorSubscriber[BigInteger](subRef)

val procRef = system.actorOf(Props[DoublingProcessor])
val procSubscriber = ActorSubscriber(procRef)
val procProducer = ActorPublisher[BigInteger](procRef)

publisher.subscribe(procSubscriber)
procPublisher.subscribe(subscriber)

Running the above will result in the slow subscriber receiving and printing doubled fibonacci values at a slow rate without being overwhelmed. The processor will properly push back on the fibonacci publisher so that he will receieve data at the same rate the slow subscriber is consuming elements.

And there you have it! All the tools required to create a reactive stream using akka-stream. In the next post, I'll cover how to condense most of this code into just a few lines using the Flow DSL.

All of the code from this post can be found here. Thanks for reading!