Hi, I am Saša Jurić, a software developer with 10+ years of professional experience in programming of web and desktop applications using Elixir, Erlang, Ruby, JavaScript, C# and C++. I'm also the author of the upcoming Elixir in Action book. In this blog you can read about Erlang and other programming related topics. You can subscribe to the feed, follow me on Twitter or fork me on GitHub.

Concurrency in a long polling comet server

| Comment on this post
The last couple of articles were a part of a loose miniseries in which I presented a few concurrency patterns I have used in my Erlang systems. Today, I will wrap up this series with some thoughts on how to deal with many concurrent clients and minimize CPU load in an Erlang based comet (HTTP push) server.

The article is based on my own experience in building a long polling based server which pushes fast changing data to connected clients. The data is dynamically categorized into channels to which clients can subscribe to. At any moment, there are no more than 50 channels, and 4000 connected clients. All clients receive data using the so called long polling technique. A client issues an AJAX request asking the server for new data, and receives a response only when something new is available, after which new request must be issued. This results in a fairly high number of HTTP requests, in peak times about 2000 per second, all of which are handled in Erlang, without any front level caches. Each request comes from a separate client which is subscribed to its own subset of available channels, and therefore must be treated individually.

Comparing to big success stories, such as WhatsApp, Facebook, Heroku and others, this server might seem almost like a hello world app. Nevertheless, it took some attention and tuning to be able to handle that kind of load, and here I will present some approaches which have helped me accomplishing the task.

Basic principles

A typical Erlang based web server handles each request in a separate Erlang process, so at any point in time there are at least as many processes as there are connected users. These processes are usually short lived: they get the response, send it back via network and terminate. I will refer to such processes as request processes

Additionally, the server will usually run a smaller number of long running actors, which will be used by the request processes to obtain data. I will refer to these processes as data providers

The basic strategy of minimizing CPU load is to make data providers do everything that is common for multiple requests, and at the same time move all specific operations to request processes. So, whatever is common, we want to execute exactly once, whereas whatever is specific, we want to execute in parallel thus promoting vertical scalability of the system. This strategy is pure common sense not specific to Erlang, but in Erlang it is especially easy to implement, due to its light weight concurrency, message passing and some other mechanisms available.

It is not always obvious what is common for multiple clients. For example, in each request, the server returns the current time in RFC1123 format. The creation of such string might last about 1-2 milliseconds, which is not insignificant if we have 2000 requests/sec. Since this information is common for all processes, it is better to have a "time provider" process which computes the string once (instead of 2000 times) per second.

Although, this might seem like a case of premature optimization, it is important to notice that each CPU bound operation in a request process might be very expensive if there are many simultaneous requests. If 4000 clients perform a 1 millisecond operation, they effectively create a 4 seconds of a single core work. The good news is that you can usually get away if you do it wrong, depending on the load and available CPU resources, thanks to the way Erlang scheduler works (there is a great article about it here). I most certainly did many things wrong (and I probably still do), but still the Erlang VM was able to cope with it, and the server was working fine. So I don't recommend long meditation whether every single statement belongs in the request or in the provider process. Start with some reasonable assumptions, then measure and optimize where needed.

Removing single process reader bottleneck

Data providers are typically implemented as Erlang processes. Since process handles messages sequentially, requesting data directly from them might cause a bottleneck. Imagine 4000 clients at the same time asking a single provider process for some information. These clients effectively become "deparallelized", meaning they will get the answer one by one, instead of obtaining it simultaneously. Moreover, the provider process will be overloaded handling fetch data requests, instead of doing some other meaningful work (e.g. producing new data).

We can get around this problem by using Erlang Term Storage (ETS), an in memory key-value storage, with keys and values being Erlang terms. ETS supports fast lookup, and truly concurrent read/write operations. You can think of it as a lower level Erlang version of Redis. While not as rich in features, ETS has advantage of being in process, which makes it easy to use and requires no serialization/deserialization of the data being stored.

Recall the time provider process mentioned earlier. It is implemented as a separate Erlang process which calculates the time string once per second. Instead of exposing read interface, the process can store the computed string into an ETS table which can be used by the requests processes to read the time string, thus making reads truly parallel and at the same time taking the load of the time provider process. This technique is for example used in the popular Cowboy web server.

ETS implements only a very basic locking mechanism, so you usually want to have one type of data modified by a single process. However, it's ok to have multiple processes modifying different types of data. For example, in the comet server, when client subscribe to channels, a client specific process stores the subscription information to the subscription ETS table. Thus, there are many processes simultaneously writing to the same ETS table, and there are also many other processes reading from that table. This works fine, because each client process handles only its own subscription information, so no record level contention occurs.

ETS tables have their own drawbacks: all data is memory copied during reads and writes, and there is no garbage collection, so you must manually delete the data when it's no longer needed. They are not appropriate for all types of problems, but when used properly, they can really boost performance. I use them often to hold server-wide information, meaning anything that will be read simultaneously from multiple processes.

Permanent client processes

As stated earlier, each HTTP request is handled in a separate Erlang process. Consequently, for every client request, we have to constantly fetch the state of the client which might introduce some processing overhead. In the long polling comet server, many clients return constantly to the server so some processing time is spent repeatedly gathering client specific data. To overcome this, we can create permanent client processes which will maintain client's state. All client related operations will be executed in these processes. When HTTP request arrives from a client, the request process simply must find the corresponding client process (for example via ETS) and ask it to perform the requested operation. The client process now has all data ready available.

Let's see how this works in practice. In the comet server, when a message for a client is generated it is sent to the client process. Since the server is long polling based, the real client may have not yet returned to the server, for example if it still processes the HTTP response of the previous message, or its HTTP request did not yet reach the server. If the client has not yet returned, the corresponding client process simply stores the message in its internal message queue, doing nothing else (since we don't know if the client will return at all). When/if the client returns, the client process immediately knows whether there are new messages, and has them immediately available for serving.

Although this effectively doubles the amount of Erlang processes being used, due to their low cost, it should not present a problem. However, some additional processing must be done to perform client expiry check and terminate processes of clients which didn't return. Additionally, since we will always have some number of phantom client processes for clients which didn't return, some extra memory and CPU will be used. As a rule, I would say the technique is appropriate when you expect mostly returning clients which issue regular, frequent requests in some larger time period.

Alternative approach is to hold client data in the ETS table. The request process can retrieve it directly from the table, which eliminates the need to have extra process per each client. I initially used this approach and then transformed it to explicit client process. The problem in my case was that there are multiple parallel modifiers of client state: long polling requests, channel (un)subscribe requests, and message dispatcher process which modifies client's inbox. Multiple parallel modifiers of the same ETS data require some manual synchronization. Additional downside of ETS is that data is copied while reading/writing, which can cause performance penalty depending on the data size. These issues disappear when using pure processes, and taking this approach increased performance in my case. In a different situation, e.g. with small state data and a single data writer, ETS based approach might work better.

Throttling dispatches

In the comet server, during peak times, the data changes fast, a couple of times per second. Since long polling is used, a typical client usually returns after some new messages are generated, and when this happens, the request must be processed separately: per each request new messages must be collected, joined and the combined final message must be compressed. This again results in extra computations, and higher CPU load.

Knowing that many clients listen to the same channels, what I wanted to achieve is to have most clients connected and in the waiting state before the message dispatch takes place. This opens up the possibility to identify clients which receive the same set of messages, and perform join/compress computation only once per each distinct subscription group (all clients listening to same channels).

To achieve this, I have resorted to the throttling approach. All messages are generated in the channel specific process, and initially, these processes dispatched messages immediately to clients. In the throttling version, I have introduced the single dispatcher process, which receives messages from channels, aggregates them and dispatches them to clients every two seconds. The interval was chosen as big enough to accomodate reasonably fast clients, and small enough not to introduce significant delay, because data must be served as fast as possible. As the result, most of the clients return to the server before new messages are dispatched, so now I could identify which clients listen to the same messages, and minimize join/compress computations (see next section). As an added bonus, since this approach reduces incoming requests rate, both CPU load and generated traffic have dropped. Prior to introducing throttling, I noticed peak load of about 3000 reqs/sec. After it was introduced, the requests rate has never went above 2000 reqs/sec even though the number of parallel clients has since increased by 33% (from 3000 to 4000).

Admittedly, fixed time throttling seems like a "lame" approach, but it was an easy solution to the problem at hand, which helped me take control over the CPU load and paved the way for further optimizations. It might be better to monitor returning clients, and dispatch new messages once some desired percentage of them has returned, which would eliminate arbitrary dispatch time delay. Additionally, switching to true permanent connection, such as web sockets or server sent events might completely eliminate the problem.

Concurrent caching

Although we usually want to have data providers doing all the common processing, sometimes it is easier to identify that commonality from inside client processes. For example, when profiling the comet server I noticed that many requests were spending a considerable amount of time joining new messages and compressing the response. Now, as I have mentioned, each client is its own case, with its own set of subscriptions, and some of them might not support compression. So at first glance, it seemed as if those messages joins and compressions are not common between different clients. However, knowing the nature of the data and the usage flow of the application, I was sure that most of the clients were actually listening to the same subset of channels. On top of this, owing to the throttling approach described above, I knew most of the clients were receiving same messages most of the time.

To take advantage of this, I have introduce the logic in the client processes which can reuse the join/compression operation from another process. The algorithm is a bit involved, but the general idea goes like this. When a client process receives the list of messages it is interested in, it verifies whether a combined message has already been computed by another client process. It does so via look up to the ETS table, using messages unique ids as the key. If a cached version exists, it is served. Otherwise, the client process joins messages, compresses the result and stores it to ETS.

The key thing here is that, while one process is joining messages, all others, interested in the same set of messages, are waiting for it to finish, so they can reuse its result. However, two processes which try to join different set of messages will not block each other. Consequently, join/compress computation is performed only once per distinct set of messages, but it is executed in parallel to join/compress for a different set of messages. There are couple of ways of accomplishing this in Erlang, and after some trials and errors, I have settled for a custom locking implementation. The generic Elixir library which I used for this task can be found here.

As the result, messages join/compress is performed much less often. A glance in the production log, tells me that in a single minute out of about 80,000 join/compress operations needed, only 1,300 are actually computed! For all others, the pre-generated result is fetched from ETS.

This is generally a more complicated approach. Not only do we have to worry about custom inter process synchronization, but there is also an expiry issue: the cached data is stored in the ETS table, so we must manually delete it at some point. Therefore, I generally find it better to prepare data in the provider process, if possible, because this should result in a simpler and better performant code.

A careful reader might also wonder whether a message dispatcher process could determine which clients receive same messages, since all messages are sent from that process. This is essentially possible, but it has its drawbacks. First, the dispatcher doesn't know which clients have returned to the server. A client might return after the next dispatch, so whatever the dispatcher has joined and compressed would then be useless for such clients (since additional new messages have arrived), making the computation completely redundant. Additionally, shifting the join/compress responsibility to the dispatcher, effectively removes parallelization of independent computations, turning a dispatcher into a sequential bottleneck.

Some final thoughts

The most important point of this post is that, in Erlang, it is fairly easy to minimize the number of redundant computations, without sacrificing the scalability. I described a couple of techniques which I find interesting, and which (hopefully) illustrate higher level view on concurrency in Erlang systems. The motivation to use these approaches came to me after observing the system from a higher level perspective, thinking about the nature of the data, how it is transported through the system and how the clients are using it. Of course, profiling, benchmarking and performance monitoring are always useful tools which can help identifying more problematic parts of the system.

Post a Comment