An interval-based rate limiter for Akka Stream

Marcin Kubala

26 May 2017.8 minutes read

Introduction

Imagine that you have to interact with an external service, which imposes limits on its clients regarding the maximum number of operations allowed per some time unit. Quite often such a service punishes request senders when they exceed that limit and, depending on the service provider, you may be charged extra or banned for several seconds/minutes.

I encountered this problem for the first time when working on a high throughput, heavy load messaging system (see our case study for more details about the project).
In short: a system routes messages to specific SMS providers. Those providers have individual maximum throughput per connection (bind).

In this post I’d like to describe how I was looking for a proper solution and what challenges I had to face while providing its implementation.

The Problem

When you only need to limit the frequency of events appearance within your system’s boundaries, e.g. to keep the load of some subsystems at a moderate level, you could just leverage the built-in akka-streams throttler:

Flow[T, T].throttle(elements, per, ..)

This is how it works (let’s assume that we limit throughput to 3 elements / arbitrary time unit):

Diagram #1

Each element passed through to the throttler consumes a token and once every defined interval the tokens are replenished. If there are no available tokens, the elements are not pushed downstream until the next replenishment.

Sounds like a reasonable solution to our problem, although only in the ideal world, where service’s throttler ticks occur at the same moment as our limiter’s replenish ticks.
In the real world, however, you will encounter some kind of a ticks skew, which may lead to the following situation:

Diagram #2

What happened?

We let at most 3 elements per time unit to be downstreamed.
Unfortunately, the third element has been emitted just before the limiter received the next replenish tick. Because the service throttler counts received elements a little bit earlier than our rate limiter, it detects overstepping (4 elements / time unit).

The Theoretical Solution

Instead of letting the messages go down through the limiter as soon as they arrive and there are enough tokens left, we can try to distribute them in time.

Unfortunately, it’s extremely difficult (not to say impossible) to keep the same time gap between all elements and distribute them evenly in time, due to the latencies caused by the downstream stages, garbage collector activity, limitations of the built-in Akka scheduler and many other things that we don’t control.
Therefore it’s much easier to keep the periods between emitting elements not shorter than some defined interval, which is what I did in my Interval-Based Rate Limiter.

Let’s take a look at the diagram:

Diagram #3

As you can see, elements are not always forwarded right after they are pushed to the rate limiter. Although this may of course introduce some latency (seen on the above diagram), the service throttler tick time becomes irrelevant.

Failure is essential to learning

The very first version of Interval-Based Rate Limiter has been assembled from only two basic building blocks: Source and ZipWith2.
Let me start the explanation with a simple diagram:

Diagram #4

Source emits a Unit tick once every defined interval.
Zip passes elements only when all its Inlets have received at least one element.

Therefore, when an element arrives into Zip’s inlet In0 and the tick was not yet received, we wait for a tick.

Here is the code:

object IntervalBasedRateLimiter {

  def create[T](interval: FiniteDuration): Graph[FlowShape[T, T], NotUsed] = {
    val graph = GraphDSL.create() { implicit builder =>
      val tickSource = Source.tick(interval, interval, ())
      val zip = builder.add(ZipWith[T, Unit, T]((i0, i1) => i0))
      tickSource ~> zip.in1
      FlowShape(zip.in0, zip.out)
    }
    graph.withAttributes(Attributes.inputBuffer(initial = 1, max = 1))
  }

}

Pretty simple and clear, right?
I thought so as well until I realized that the same rule applies to all Zip’s inlets and if the tick appeared on In1, but In0 is empty, Zip will wait for an element on In0. And then the next tick will arrive as soon as possible, leading to a situation where the effective interval is too short:

Diagram #5

I needed a stage that discards ticks if there is no awaiting element and I could not reach the goal using the aforementioned FlowOps.throttle, because even if I set the maximum number of elements and burst to 1 (see the code below), I have control over how many elements will be passed within a period of time, but still not when it happens:

object IntervalBasedRateLimiter {

  def create[T](interval: FiniteDuration): Graph[FlowShape[T, T], NotUsed] = 
    Flow[T].throttle(1, interval, 1, ThrottleMode.shaping)

}

Diagram #6

Again, ticks skew breaks everything...

Enlightenment

That was the moment when I realized that in order to build a ticks-skew-proof rate limiter I need to create a new type of stage, a kind of a timer-based gateway to the downstream.

Meanwhile, I found something very similar to what I was working on - a Pulse, recent Marcin Rzeźnicki’s contribution to the akka-stream-contrib.
Unfortunately, I could not reuse it, since Pulse is an upstream throttler, while I need to control the downstream.

Also, this stage indirectly leverages the Akka scheduler’s schedule, which does not meet my requirements due to the limitations described below.

Limitations of the Akka scheduler

Let’s take a look at how the Akka scheduler works. It’s implemented on top of the HashedWheelTimer - a timer, optimized for approximated I/O timeout scheduling, which, on every tick, checks if there are any tasks behind the schedule and executes them.
This means that the scheduler may emit both the delayed ticks and those just in time, causing the length of the intervals to vary between the ticks:

Diagram #7

Another disadvantage of the scheduler is the limited frequency of ticks. The default is 10ms which, considering the lack of precision described above, may vary.

It’s possible to increase (or decrease, if you wish) the timer resolution by customizing the akka.scheduler.tick-duration configuration property.
If you are going to run your streams on Windows, I have bad news - the minimum supported tick duration on this platform is 10ms.

The 10ms timer resolution means that we would be able to schedule at most 100 ticks per second, which means that the maximum throughput supported by our limiter is limited to 100 elements per second.

We could send more than a single element after each tick (therefore, to gain a maximum throughput of 200 elements per second, we could pass 2 elements in 10ms intervals). However, because of the non-deterministic delay between two or more subsequent pushes, we end up with the same problem as shown on the first diagram.

Let’s focus on the first limitation before we go further.

Achieving minimal interval guarantee in a non-deterministic world of HashedWheelTimers

So we rather cannot trust the ticks emitted by schedule, but the Scheduler has yet another interesting method: scheduleOnce, which schedules a function to be run once with a delay.
This sounds like a perfect fit for our problem - we could schedule a tick right after the previous one is received and processed:

Diagram #8

One may note that because we schedule the next tick after the previous one was handled, the intervals between ticks may not be equal. That’s true but this way we can keep those intervals not shorter than the scheduleOnce delay.

How about that lack of timer precision you mentioned in the Limitations of the Akka scheduler section? Is it not a problem?

Well, it’s not, since the HashedWheelTimer may trigger a tick either just on time or with some delay, but never in advance, there is no way to pass the next element too early.
I must admit that my solution has some disadvantage as well - the effective throughput, depending on the minimal interval length, may never reach the maximum allowed one.

Passing more than one element at time

Speaking of throughput, let’s get back to the second Scheduler flaw, which is limited frequency.

I have not found a way of passing many elements at the same time other than batching (emitting sequences of elements instead of single ones):

Diagram #9

As you can see, it’s very close to the ideal solution described in The Theoretical Solution section.

The Code

Having research done, I was ready to write code for the Interval-Based Rate Limiter.
I’m not going to describe how to write custom akka-streams graph stages, since my teammate Jacek already did it.

To gain an access to the timer, my custom GraphStageLogic extends TimerGraphStateLogic which brings two important methods:

  • final protected def scheduleOnce(timerKey: Any, delay: FiniteDuration): Unit used to schedule a tick with some delay, and
  • protected def onTimer(timerKey: Any): Unit which is called whenever the timer emits a tick.

The preStart method has been overridden to schedule the first tick and start pulling from the upstream.

My onTimer implementation tries to push a batch of elements (if there is any) and then schedule the next tick.

To accumulate the elements to be sent in a single batch, I’m using a ConcurrentLinkedQueue. Because it requires an O(n) traversal to calculate its size and I’m referencing its value quite often, I decided to use a separate AtomicInteger field to track the number of awaiting elements.

The whole code as well as the unit tests are available on GitHub

The Epilogue

I’ve opened a Pull Request in akka-stream-contrib and had a discussion with Martynas Mickevičius, who pointed me to another PR, introducing a new stage called DelayFlow.

This stage is based on the scheduleOnce method, just like mine, and therefore can be used to keep intervals with guaranteed minimum length.

Combined with GroupedWithin, it can be used to provide the interval-based rate limiter in a concise and elegant way:

object IntervalBasedRateLimiter {

 def create[T](minInterval: FiniteDuration, maxBatchSize: Int): Graph[FlowShape[T, immutable.Seq[T]], NotUsed] =
   Flow[T].groupedWithin(maxBatchSize, minInterval).via(DelayFlow[immutable.Seq[T]](minInterval))

}

I really enjoyed looking for the right solution and facing the implementation challenges.
I hope you find its outcome in the shape of this blog post and code interesting and useful.

P.S.
If you are looking for a team of skilled software developers or just need a bit of help with choosing the right technology, tools and architecture for your product, or maybe you’re looking for somebody who could review your project, then don’t hesitate to drop us a line.

We are always eager to take on new challenges.

Blog Comments powered by Disqus.