Asynchronous logging in rust, or how to create predominantly lock-free bounded queue without data loss

I’d like to describe an asynchronous logger library implemented for rust. Its goal is to make logging operation as fast as possible, so the decision was made to try implementing it with lock-free queue, but with limited capacity and occasional blocking because from the following three strategies: to wait, to lose data, or run out of memory, the first is preferable. The idea is described below, and the implementation is available here: https://crates.io/crates/async_logger

The idea.

In short, having a buffer of a certain fixed size we can use an atomic integer to reserve space in the buffer by incrementing the value of the atomic integer concurrently until the buffer is full. Then switch to another buffer, and let the first buffer be processed and become available again.

But, it is not as simple.

Let’s assume we have a continuous buffer of bytes of a limited size S. We want to write to that buffer from multiple threads T1, T2, T3… without locking.
Suppose thread Ti (i=1,2,3…) is going to write a chunk of data of size si, 0 < si < S. Let’s suppose there is an atomic integer A shared by all threads and representing currently used buffer space. Thread Ti uses CAS operation to increment atomic A by value si concurrently. As soon as the value is successfully incremented thread Ti has the previous value V of atomic A returned by CAS operation. And interval [V; V+si) represents the range of bytes where thread Ti can write its data in buffer.

So far, all writes are lock-free. At some moment buffer becomes full and should be processed by the thread W which will write the data to the log file, for example. Threads T1, T2, T3… should block when try writing to the buffer, while thread W should be notified and start working.

Thread W can be notified only when all the threads T1, T2, T3… have finished writing to the buffer. For that another atomic D representing “done” size of the buffer is used. As soon as Ti finishes processing the data it increments atomic D by si.
If, suppose, CAS operation on A for thread Ti resulted in V < S < V + si (required size is out of bounds), then Ti doesn’t write anything to the buffer but still increases D by value S - V. Having this in mind we can conclude that as soon as all the threads T1, T2, T3… have finished writing, “done” size D is equal to S. The first thread which achieved state D = S sends notification to thread W.

Because in case of V < S < V + si as describe above thread Ti doesn’t write anything to buffer we additionally need to store information about how much data actually have been written to the buffer and can be respectively processed by W. For that, additional atomic U can be used. U is incremented only by the values of actually written data.

After buffer is full any thread Ti attempting to write to the buffer will first examine atomic A and only then go into blocking state and await for buffer to free up.
Thread W after being notified acquires buffer for read, process the data of size U and then resets U and D to 0 first, and only after that sets A to 0. After that thread W also notifies all possibly waiting threads T1, T2, T3…

To eliminate contention pair of buffers can be used. While thread W is processing one of them, the other is available to T1, T2, T3… for writing.

In pseudocode.

; write to buffer and return true, or return false if buffer is full
function try_write(data: byte_array,
s: integer,
buffer: byte_array,
S: integer,
A: atomic_integer,
U: atomic_integer,
D: atomic_integer)
begin
V = A.load(mem_order_relaxed)
loop
if V > S then
return false
else
cur_V = A.compare_and_swap(V, V + s,
mem_order_relaxed,
mem_order_relaxed)
if cur_V != V then
V = cur_V
else
; value of A has been updated by the current thread,
; now we only check if there is enough space left in
; buffer and if current thread needs to notify
; thread W
if V + s <= S then
copy(buffer, V, data, s)
U.fetch_and_add(mem_order_release)
done = s
return_value = true
else
done = S - V
return_value = false
end
if done > 0 then
total_done = D.fetch_and_add(done,
mem_order_relaxed)
+ done
if total_done == S then
notify(W)
end
end
return return_value
end
end
end
end
; write data of size s to any of two buffers
function write_to_buffer(data: byte_array, s: integer)
begin
loop
if try_write(data, s, buffer1, S1, A1, U1, D1) then
break
end
if try_write(data, s, buffer2, S2, A2, U2, D2) then
break
end
; wait until thread W performs 'notify_all(T)'
wait_for_any_buffer_free()
end
end
; thread W
function process_buffers()
begin
loop
; wait until some thread performs 'notify(W)';
; this operation also includes acquire fence
(buffer, A, U, D) = wait_for_any_buffer_full()
process_data(buffer, U.load(mem_order_relaxed)) U.store(0, mem_order_relaxed)
D.store(0, mem_order_relaxed)
A.store(0, mem_order_release) ; release semantics to be sure
; U and D are updated before A
notify_all(T) ; notify all possibly waiting
; threads T1, T2, T3...
end
end

Having large enough buffers threads T1, T2, T3… work almost lock-free, rarely performing ‘notify(W)’ call.

Specific trait of this approach is that data does not immediately becomes available to thread W after being written by any of threads T1, T2, T3… Explicit flushing can be implemented by reserving all the remaining space in buffer. Although frequent flushing may degrade performance.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store