Completely async file writerby Rob Galanakis on 26/04/2011
I’ve been doing more and more asynchronous programming lately. I needed to implement a logging system recently, but wanted to use asynchronous file IO. There was also the possibility that log calls could come in faster than they could complete, so I needed to synchronize the logging calls, marking them finished when the async IO completed. I wanted to make this as lock-free as possible.
The idea is, you have a ‘AsyncFileWriter’ field on a class, with a ‘QueueWrite’ method that threads can call to request a file write, passing the string/bytes, and a callback that fires when the IO is complete.
The ‘AsyncFileWriter’ has a Queue on it, and when calls to QueueWrite are made, a request (args and callback) is added to the Queue, and a call to a ‘WriteImpl’ is queued on the ThreadPool. ’WriteImpl’ has basically a semaphore, only allowing 1 thread to dequeue a request (args and callback) and begin the file IO. This ‘semaphore’ is released in the async IO complete callback. After the semaphore is released, another call to WriteImpl is queued on the ThreadPool, which will just return immediately if the request queue is empty.
The net effect is we have, basically, a producer-consumer queue, where producer threads can also be consumers (or queue up other ThreadPool threads to be consumers). When a request is processed, the consumer thread that finished processing it queues up another consumer thread. If a request comes in while the ‘semaphore’ is taken, the thread just drops its request into the request Queue and immediately returns- if the ‘semaphore’ is not taken, the thread dropping off the request then takes the lock and acts as a consumer thread.
I think the pseudocode is more clear than the actual code:
class AsyncFileWriter string filename #initialized from constructor Queue
_requestQueue bool _insideWrite public void QueueWrite(byte bytes, Action callback) lock (_requestQueue) _requestQueue.Add(bytes, callback) ThreadPool.QueueWork(WriteImpl) private void WriteImpl() if (_insideWrite) return lock (_requestQueue) request = _requestQueue.Dequeue() _insideWrite = true if request != null: fs = new FileStream(filename) fs.BeginWrite(request.Bytes, () => OnIOComplete(fs, request.Callback)) private void InIOComplete(FileStream fs, Action callback) fs.EndWrite() fs.Dispose() _insideWrite = false ThreadPool.QueueWork(WriteImpl) #Keep clearing the queue callback()
So you can see that, we only take locks when adding or removing items from the queue. If 5 threads enter WriteImpl at the same time, only one will take the lock, and the rest will return. And when the IO is complete, it’ll queue up another ThreadPool request to keep clearing the queue.
I initially implemented this for async IO as mentioned above, but have abstracted the pattern for anything- it is just basically a way to throttle processing with a ThreadPool-managed producer/consumer queue. I’ll go over this pattern along with some code in a future email@example.com