Real life problem
defmodule Job do def handle(messages) do :timer.sleep(100) IO.puts Enum.join(messages, ", ") end end
The function handle expects a list of messages and prints them. Regardless of the length of the received list its execution time is about 100 ms (unless the messages are really large strings).
So how can we implement the described queue/worker algorithm? Let's see the code of the worker first:
defmodule Worker do use ExActor import Job defcast process(messages), state: queue do handle(messages) queue.worker_finished end end
The worker exposes one service, process, which does the actual message processing. The function simply invokes Job.handle function and then notifies its queue that it is finished. The worker's state is a "reference" to the queue, which is also an actor:
1 2 3 4 5 6 7 8 9 10 11
defmodule Queue do use ExActor defrecord State, worker: nil, worker_busy: false, messages:  def init(_) do initial_state(State.new(worker: Worker.actor_start(this))) end # implementation end
The queue actor's state is represented by the State record defined in the line 4. It holds a reference to the worker, an information whether it is busy or not, and the message queue.
In line 6 we initialize the actor by creating the worker and storing the "reference" to it in the instance of the State structure. The rest of the state data is populated with the defaults from the record definition in line 4.
Now we have to add some code to the queue actor. Let's start with the worker_finished message which is sent from the worker:
defcast worker_finished, state: state do state.worker_busy(false) |> maybe_notify_worker |> new_state end
The strangely looking |> operator chains together function calls. It is a more elegant way to write:
The code reads like english: we set the worker_busy flag to false, then maybe notify the worker (for example if we have new messages) and finally use the return value of the maybe_notify_worker as the new state.
When a client wants to push a message, it will invoke the push operation:
defcast push(msg), state: state do queue_message(state, msg) |> maybe_notify_worker |> new_state end
This code is very similar to the one above, the difference being in the first line, where we call the queue_message function to store the message in the queue:
defp queue_message(state, msg) do state.update_messages(fn(messages) -> [msg | messages] end) end
Storing the message simply amounts to "adding" it to the top of the list of the already queued messages. This is the most efficient way, since both Erlang and Elixir are functional languages operating with immutable data.
Finally all we have to do is implement the maybe_notify_worker function. We want to notify the worker if it is not busy and there are some messages in the queue:
1 2 3 4 5 6
defp maybe_notify_worker(State[worker_busy: false, messages: [_|_]] = state) do state.worker.process(Enum.reverse(state.messages)) state.update(worker_busy: true, messages: ) end defp maybe_notify_worker(state), do: state
This is an example of a multiple clause function. Depending on the value of the function argument, we will either enter the first clause (line 1) or the second one (line 6).
If the argument is an instance of the State record and its worker_busy field is false and messages field is a non empty list, then we enter the first clause. Here, we send all messages to the worker and then update the state by setting the worker_busy to true and messages queue to an empty list. Notice that when sending to the worker, the messages list is reversed. This is done because during queueing, we pushed new messages to the top. By reversing the messages, we preserve their ordering, as they were sent by the clients.
If we don't enter the first clause, we will always end up in the second one. Here we do nothing, since the worker is busy or there are no messages for it.
That concludes the implementation of the worker/queue pair. The structure can be used in the following way:
queue = Queue.actor_start Enum.each(1..10, fn(i) -> queue.push(i) :timer.sleep(30) end)
Which gives the output:
1 2, 3, 4 5, 6, 7 8, 9, 10
As I have mentioned, the queue/worker pair is adapting to the incoming rate. Since it can process messages every 100 ms and the incoming rate is 1 message every 30 ms, it consumes three messages at once. The total execution time is 400 ms, significantly less than 1 second - the time it would take if the actor handled one message at a time.
The full code of the example can be found here.
Generalizing the pattern
We can further modify this by sending the worker only the first message in the queue, which introduces completely different strategy. When the actor falls behind, it will compensate by handling new messages first. When it finds the time, it will resolve older messages. Additionally, we might choose to delete older messages if the queue gets too large, thus controlling the consumed memory.
On top of all this, sometimes we don't want the actor to process messages too fast. The worker could sleep for some time, before reporting back to the queue, which ensures that it doesn't perform too frequent processing.
Additionally, the queueing itself consumes some CPU time. You should be careful over which strategy you choose given the incoming message rate and worker's throughput.