Asynchronous logging in rust, or how to create predominantly lock-free bounded queue without data loss
I’d like to describe an asynchronous logger library implemented for rust. Its goal is to make logging operation as fast as possible, so the decision was made to try implementing it with lock-free queue, but with limited capacity and occasional blocking because from the following three strategies: to wait, to lose data, or run out of memory, the first is preferable. The idea is described below, and the implementation is available here: https://crates.io/crates/async_logger
In short, having a buffer of a certain fixed size we can use an atomic integer to reserve space in the buffer by incrementing the value of the atomic integer concurrently until the buffer is full. Then switch to another buffer, and let the first buffer be processed and become available again.
But, it is not as simple.
Let’s assume we have a continuous buffer of bytes of a limited size S. We want to write to that buffer from multiple threads T1, T2, T3… without locking.
Suppose thread Ti (i=1,2,3…) is going to write a chunk of data of size si, 0 < si < S. Let’s suppose there is an atomic integer A shared by all threads and representing currently used buffer space. Thread Ti uses CAS operation to increment atomic A by value si concurrently. As soon as the value is successfully incremented thread Ti has the previous value V of atomic A returned by CAS operation. And interval [V; V+si) represents the range of bytes where thread Ti can write its data in buffer.
So far, all writes are lock-free. At some moment buffer becomes full and should be processed by the thread W which will write the data to the log file, for example. Threads T1, T2, T3… should block when try writing to the buffer, while thread W should be notified and start working.
Thread W can be notified only when all the threads T1, T2, T3… have finished writing to the buffer. For that another atomic D representing “done” size of the buffer is used. As soon as Ti finishes processing the data it increments atomic D by si.
If, suppose, CAS operation on A for thread Ti resulted in V < S < V + si (required size is out of bounds), then Ti doesn’t write anything to the buffer but still increases D by value S - V. Having this in mind we can conclude that as soon as all the threads T1, T2, T3… have finished writing, “done” size D is equal to S. The first thread which achieved state D = S sends notification to thread W.
Because in case of V < S < V + si as describe above thread Ti doesn’t write anything to buffer we additionally need to store information about how much data actually have been written to the buffer and can be respectively processed by W. For that, additional atomic U can be used. U is incremented only by the values of actually written data.
After buffer is full any thread Ti attempting to write to the buffer will first examine atomic A and only then go into blocking state and await for buffer to free up.
Thread W after being notified acquires buffer for read, process the data of size U and then resets U and D to 0 first, and only after that sets A to 0. After that thread W also notifies all possibly waiting threads T1, T2, T3…
To eliminate contention pair of buffers can be used. While thread W is processing one of them, the other is available to T1, T2, T3… for writing.
; write to buffer and return true, or return false if buffer is full
function try_write(data: byte_array,
V = A.load(mem_order_relaxed) loop
if V > S then
cur_V = A.compare_and_swap(V, V + s,
mem_order_relaxed) if cur_V != V then
V = cur_V
; value of A has been updated by the current thread,
; now we only check if there is enough space left in
; buffer and if current thread needs to notify
; thread W if V + s <= S then
copy(buffer, V, data, s)
done = s
return_value = true
done = S - V
return_value = false
end if done > 0 then
total_done = D.fetch_and_add(done,
if total_done == S then
end return return_value
end; write data of size s to any of two buffers
function write_to_buffer(data: byte_array, s: integer)
if try_write(data, s, buffer1, S1, A1, U1, D1) then
end if try_write(data, s, buffer2, S2, A2, U2, D2) then
end ; wait until thread W performs 'notify_all(T)'
end; thread W
; wait until some thread performs 'notify(W)';
; this operation also includes acquire fence
(buffer, A, U, D) = wait_for_any_buffer_full() process_data(buffer, U.load(mem_order_relaxed)) U.store(0, mem_order_relaxed)
D.store(0, mem_order_relaxed) A.store(0, mem_order_release) ; release semantics to be sure
; U and D are updated before A notify_all(T) ; notify all possibly waiting
; threads T1, T2, T3...
Having large enough buffers threads T1, T2, T3… work almost lock-free, rarely performing ‘notify(W)’ call.
Specific trait of this approach is that data does not immediately becomes available to thread W after being written by any of threads T1, T2, T3… Explicit flushing can be implemented by reserving all the remaining space in buffer. Although frequent flushing may degrade performance.