Hi, I am Saša Jurić, a software developer with 10+ years of professional experience in programming of web and desktop applications using Elixir, Erlang, Ruby, JavaScript, C# and C++. I'm also the author of the upcoming Elixir in Action book. In this blog you can read about Erlang and other programming related topics. You can subscribe to the feed, follow me on Twitter or fork me on GitHub.

Manually queueing messages

| Comment on this post
Erlang actors usually handle messages one at a time, in the order received. Sometimes, this is not efficient enough, and you might want to handle multiple messages at once, or change the ordering of messages. However, Erlang gives you only limited possibilities to control your message queue, the available approaches feeling more like a hacks. Nevertheless, custom queueing can be achieved fairly easy, and in this article, I will show you how.

Real life problem

In a production system, I have developed and maintain, I have an actor which receives messages and in turn sends the data via network to another, remotely located Erlang node. The messages come to the actor at the rate of up to 10 messages/sec, and the data being sent is fairly small, up to 100 kilobytes. The available bandwidth is sufficient enough, and this works normally, most of the time.

Occasionally, the network gets congested. This happens randomly in irregular intervals - sometimes a couple of times a day, sometimes a couple of times in a month. When this occurs, the actor falls behind, and the message queue builds up. After the network conditions stabilize, the actor slowly catches up.

During the congestions, I noticed that sending a single message via network would last up to 20 seconds. If an actor takes 20 seconds to process a message, and at the same time, new messages arrive at the rate of 10 messages/sec, after the first message is processed, the message queue holds 200 messages, with even more of new messages arriving.

Let's assume that normal sending time is 50 ms, and that a new message arrives every 100 ms. This means, that the actor can reduce the queue by one message every 100 ms. Consequently, the queue will be emptied after 200 * 100 ms = 20 seconds, assuming no additional congestions occur. In this case, we have a total of 40 seconds of out of sync time. In practice, I have noticed outages which would last for a couple of minutes which was unacceptable due to business constraints.

In addition, I have also noticed, that the sending time doesn't correlate with the size of the data being sent. The delay was caused by many different services using that same network output, and maybe by the network of the receiving machine. The situation was probably aggravated by the fact that many small messages were being sent, which required frequent handshake.

So what had to be done is to somehow send messages in batches. An obvious solution, is to send data in regular intervals, instead of doing it immediately. So when the actor receives a message, it stores data to an internal list instead of sending it. Another, separate process, would periodically send flush messages to the actor which would in response send accumulated data via network. This would introduce a constant delay, but it would be more resistant to random network conditions.

This was fairly quick to implement, and worked fine, but I was not happy with the solution. First, I had to come up with some arbitrary flush interval which was small enough to meet business needs, and at the same time big enough to mitigate network problem. In addition, I have now introduced a constant delay to mitigate problems which only occur occasionally.

The solution

A better approach is to make the actor send the data via network immediately, collecting new messages while doing it, and after the data is sent, process all new messages at once.

This approach works better, since it is self adaptable to the network conditions. If the network works fine, the actor sends messages immediately, one by one. When the congestion occurs, the actor adapts to the conditions by processing multiple messages at once. The technique works, as long as processing time of N messages is less than N * processing time of 1 message, which in my case was definitely true.

However, Erlang doesn't give you much control over your message queue. There are ways to get all messages from the queue, but some of those messages might not be the ones you sent, but instead the "system" ones which should be handled by the underlying OTP abstractions.

So instead, I opted for a cleaner solution which involves two tightly coupled actors: the queue and the worker.

The queue is the interface point for the clients while the worker is an implementation detail. The clients send messages to the queue, which somehow knows whether the worker is idle or busy processing messages. If the worker is idle, the queue immediately forwards a message to it. Otherwise, it queues all new messages in its own structure and waits for the worker to become idle. When that happens, the queue passes all of the collected messages to the worker.

The important thing to notice is that the queue actor immediately processes messages which allows it to take over the queueing process itself.

The code

The simplified problem is represented with the following code:

defmodule Job do
  def handle(messages) do
    :timer.sleep(100)
    IO.puts Enum.join(messages, ", ")
  end
end

The function handle expects a list of messages and prints them. Regardless of the length of the received list its execution time is about 100 ms (unless the messages are really large strings).

So how can we implement the described queue/worker algorithm? Let's see the code of the worker first:

defmodule Worker do
  use ExActor
  import Job

  defcast process(messages), state: queue do
    handle(messages)
    queue.worker_finished
  end
end

The worker exposes one service, process, which does the actual message processing. The function simply invokes Job.handle function and then notifies its queue that it is finished. The worker's state is a "reference" to the queue, which is also an actor:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
defmodule Queue do
  use ExActor

  defrecord State, worker: nil, worker_busy: false, messages: []

  def init(_) do
    initial_state(State.new(worker: Worker.actor_start(this)))
  end
  
  # implementation
end

The queue actor's state is represented by the State record defined in the line 4. It holds a reference to the worker, an information whether it is busy or not, and the message queue.
In line 6 we initialize the actor by creating the worker and storing the "reference" to it in the instance of the State structure. The rest of the state data is populated with the defaults from the record definition in line 4.

Now we have to add some code to the queue actor. Let's start with the worker_finished message which is sent from the worker:

defcast worker_finished, state: state do
  state.worker_busy(false) |>
  maybe_notify_worker |>
  new_state
end

The strangely looking |> operator chains together function calls. It is a more elegant way to write:
new_state(maybe_notify_worker(state.worker_busy(false)))

The code reads like english: we set the worker_busy flag to false, then maybe notify the worker (for example if we have new messages) and finally use the return value of the maybe_notify_worker as the new state.

When a client wants to push a message, it will invoke the push operation:

defcast push(msg), state: state do
  queue_message(state, msg) |>
  maybe_notify_worker |>
  new_state
end

This code is very similar to the one above, the difference being in the first line, where we call the queue_message function to store the message in the queue:

defp queue_message(state, msg) do
  state.update_messages(fn(messages) -> [msg | messages] end)
end

Storing the message simply amounts to "adding" it to the top of the list of the already queued messages. This is the most efficient way, since both Erlang and Elixir are functional languages operating with immutable data.

Finally all we have to do is implement the maybe_notify_worker function. We want to notify the worker if it is not busy and there are some messages in the queue:

1
2
3
4
5
6
defp maybe_notify_worker(State[worker_busy: false, messages: [_|_]] = state) do
  state.worker.process(Enum.reverse(state.messages))
  state.update(worker_busy: true, messages: [])
end

defp maybe_notify_worker(state), do: state

This is an example of a multiple clause function. Depending on the value of the function argument, we will either enter the first clause (line 1) or the second one (line 6).

If the argument is an instance of the State record and its worker_busy field is false and messages field is a non empty list, then we enter the first clause. Here, we send all messages to the worker and then update the state by setting the worker_busy to true and messages queue to an empty list. Notice that when sending to the worker, the messages list is reversed. This is done because during queueing, we pushed new messages to the top. By reversing the messages, we preserve their ordering, as they were sent by the clients.

If we don't enter the first clause, we will always end up in the second one. Here we do nothing, since the worker is busy or there are no messages for it.

That concludes the implementation of the worker/queue pair. The structure can be used in the following way:

queue = Queue.actor_start

Enum.each(1..10, fn(i) ->
  queue.push(i)
  :timer.sleep(30)
end)

Which gives the output:

1
2, 3, 4
5, 6, 7
8, 9, 10

As I have mentioned, the queue/worker pair is adapting to the incoming rate. Since it can process messages every 100 ms and the incoming rate is 1 message every 30 ms, it consumes three messages at once. The total execution time is 400 ms, significantly less than 1 second - the time it would take if the actor handled one message at a time.

The full code of the example can be found here.

Generalizing the pattern

The described pattern decouples the queueing from processing, and this now opens many possibilities for different queueing strategies.

The simplest variant is to avoid reversing messages when sending to the worker which is useful if the ordering is not important, and we don't want to waste time on reversing.

We can further modify this by sending the worker only the first message in the queue, which introduces completely different strategy. When the actor falls behind, it will compensate by handling new messages first. When it finds the time, it will resolve older messages. Additionally, we might choose to delete older messages if the queue gets too large, thus controlling the consumed memory.

Another approach is to add a category information to each message, and store messages in a hash, using the category as a key. When a message arrives, it overwrites the pending message of the same category, which allows us to avoid processing duplicated messages.

Finally, we might introduce some priority information in messages which allows us to handle more important messages first.

On top of all this, sometimes we don't want the actor to process messages too fast. The worker could sleep for some time, before reporting back to the queue, which ensures that it doesn't perform too frequent processing.

These are all approaches I have used in production in different situations, and it helped me to increase the overall throughput, and stabilize the system, without optimizing the underlying code. I have developed an Elixir library which implements many of the above mentioned techniques, and made it publicly available here.

The downside of this technique is that each message is sent twice: first from a client to the queue, and then from the queue to the worker process. Since sending a message in Erlang usually means deep copying, it might have a significant negative impact on performance, depending on the message size, data type and incoming rate.
Additionally, the queueing itself consumes some CPU time. You should be careful over which strategy you choose given the incoming message rate and worker's throughput.

Conclusion

I have demonstrated how to control the message queuing process, even if Erlang itself doesn't allow it. I consider the solution elegant, easy to reason about, and not a hack. The technique described can help in various situations, when processing of multiple messages is cheaper than multiple processing of a single message or when some alternate rearranging of messages is called for.