A Superlean Inter-Thread Queue

Download the complete sample solution – scroll to bottom of post.

There are times when a design calls for an ability to pass information from one thread to another within an application. The Actor design pattern hinges upon such a capability as do other bespoke architectures in which dedicated threads play a central role. In an asynchronous design information passes between threads by queuing requests to a thread pool, the operating system internally schedules the processing of queued work by selecting some arbitrary thread within the thread pool and causing that thread to invoke a callback that you supply directly or indirectly like when using async/await.

The above pattern is well suited to a variety of problem domains including user interface processing and network servers for example, but it is not a good fit for designs that call for communication between specific threads as is the case with the Actor pattern for example.

There are several ways to accomplish such communication and they usually involve a managed queue possibly with synchronization along with a synchronous wait on an event of some form, its also possible to leverage a thread’s Window message queue and use the API that underpins Windows GUI messaging but doing so in a Windows app that’s using Windows can present difficulties.

Asynchronous Procedure Calls

The Windows kernel supports a mechanism called Asynchronous Procedure Calls or APCs. These are implemented at a fundamental level within the OS and one of the foundational abstractions exposed by the kernel.

They are powerful and fast yet surprisingly few developers have even heard of APCs and fewer still have actually used them particularly from managed code.

Every native thread in Windows has an APC queue, this is a true queue managed by a fast API in the operating system. The queue contains small opaque messages that encapsulate a small amount of data and a function address. When a message is queued (by another thread) it remains in the queue until the target thread enters what is known as an alertable state at which point it gets processed.

A thread can enter the alertable state by calling one of several kernel wait functions, including SleepEx. When a thread is waiting in the alertable state and an APC is queued, the scheduler dequeues the message and schedules the thread to run at the address specified in the originally queued message,

The designated function is therefore invoked by the target thread, when that function eventually returns the original wait operation (that caused the thread to be in the alertable state) terminates and set the Win32 error code to WAIT_IO_COMPLETION so the threads code can ascertain why the wait was terminated and optionally resume waiting.

The thread doesn’t have to be waiting for APCs to work, but a message will not get processed and passed to the handler function until the thread next performs an alertable wait.

Leveraging APCs from Managed Code

I’ve used APCs from managed code several times and seen impressive performance, bear in mind that APC queue entries are allocated by the kernel from the non-paged heap so code never experiences a (time consuming) page fault when manipulating the APC mechanism.

I’ve created a small example console project that demonstrates how to use APCs from a managed environment, this project presents a rudimentary Actor model which fits well with how APCs work.

One of the strengths of APC’s is that they are relatively fast by design, they involve no IO or buffering or copying – in essence they are a command sent by one thread to the operating system saying – metaphorically – “Dear Windows, please tell thread XXX to execute the following function and pass it the following pointer as the argument to that function”.

Of course you might have noticed that the sender of the APC needs to specify the function to be called, that is it needs to know the address – the actual address within the processes’ address space – of the function. In managed code this is easily obtained by use of a delegate. The operating system function QueueUserAPC has no managed equivalent so we must rely on the native Win32 function which resides in the KERNEL32.DLL.

.Net takes care of converting a managed delegate instance into a function address when we call a native function so this presents no particular challenge to us. However the native QueueUserAPC function requires a native handle to the thread to which we want to send the APC. We can only get a handle to a Windows thread by calling OpenThread and specifying the native thread ID of the target thread, .Net doesn’t expose a thread’s native thread ID though only its managed ID which has no role to play when it comes to APCs.

The sample code I put together has a started thread getting its own native thread ID by calling GetCurrentThreadID, this is done in the Actor’s thread start function so it always runs in the context of the started thread. That ID is then exposed (as an internal property) to the rest of the code in the sample. Once we have a thread’s ID we can open a handle to it and once we have a handle we can send APCs.

Handles

Opening a handle to a thread is straightforward but we need two things: 1) The native ID of the thread we want a handle to and 2) Suitable permissions in order to manipulate (Send APCs to) that thread. I’ve abstracted this with a thing called an ActorHandle – this encapsulates the Win32 handle itself and the act of sending an APC (via the method SendMessage).

An ActorHandle is opened by passing a reference to the target Actor into the Connect method and this is fine for Actors running within the same AppDomain:

        /// <summary>
        /// Opens a Win32 handle to another thread and encapsulates it within an ActorHandle.
        /// </summary>
        /// <param name="actor">The identity of the target thread.</param>
        /// <returns>A new ActorHandle object.</returns>
        public static ActorHandle Connect(Actor actor)
        {
            uint tid = actor.NativeTid;

            UInt32 handle = Kernel.NativeMethods.OpenThread(Kernel.Constants.THREAD_SET_CONTEXT, false, tid);

            if (handle != 0)
            {
                ActorHandle context = new ActorHandle(handle,tid);
                context.handler = actor.ProcessAPC;
                return (context);
            }

            throw new InvalidOperationException("Unable to open a handle to thread: " + actor.NativeTid);
        }

As I’ll be discussing in a future post, an APC is a  pure Win32 thread – to – Win32 thread operation, and the location of a thread (that is – which process its situated in) has no bearing on the functioning of APCs. This means that the only restrictions on this are those arising from .Net itself. Under .Net we could use some form of remoting to allow threads in different AppDomains to to send APCs to other threads but that will carry a runtime cost that might be prohibitive but it would allow threads to interact using .Net object references.

All we actually need to open a handle is the native thread ID, this is why there’s a “Name” property defined on an Actor – it will enable us to identify Actors in a controlled manner. But opening a handle isn’t the challenging part – its sending the APC that’s the real challenge because all we can send is (basically) a pointer – for threads running in the same AppDomain this is fine – we can – and the sample does – just pass an object reference to our “payload” and it all works.

Passing a managed reference between AppDomains is technically possible but requires remoting which may carry an overhead cost, even if it doesn’t it certainly would if we did it between two different processes and the performance cost (basically copying the entire payload object) might swamp any of the performance attraction of APCs.

However there is another approach which offers the same performance as using APCs in the same AppDomain – we’d actually pass a structure by reference but we must then design the message passing interface to use a generic method with a “(ref T Message)” as the signature and calling native (interop) methods within generic classes often raises runtime problems.

However these can be addressed and I have successfully implemented process – to – process APCs before but such a design is out of scope insofar as this initial introductory post is concerned because a sender also needs to know the actual target function address within the target thread’s process, this is quite involved to pull of and I’ll discuss a more powerful solution to this problem in a future post.

Sleeping

A thread will only be able to process APCs if it’s in an alertable state, fortunately calling Thread.Sleep does exactly that. If a thread calls Thread.Sleep with an infinite timeout then that sleep will actually return back to the caller after an APC has been processed. It’s not widely know that a Thread.Sleep(-1) can ever return, but it can. However the native SleepEx always returns a code which indicates if a sleep exited due to an APC being processed or due to a genuine timeout but the managed Thread.Sleep does not expose that return code.

If we could get that return code then we could implement an idle processing feature within this simple Actor model, with that feature an Actor could periodically “wake up” to perform useful work unrelated to processing APCs.

Well in the code I put together for this sample I replaced Thread.Sleep with a call to the native SleepEx and we can now examine the return code and recognize if a timeout ocurred or an APC was processed – this is a small detail but I wanted to include simply to make the code more interesting and show how an interesting model can be created using the APC concept.

Here’s the complete function that implements the core wait loop for the Actor class:

        private void ThreadStart (object info)
        {
            ExceptionSource source = ExceptionSource.OnMessageReceived;

            uint os_status;

            // The system always passes something in, if not then a serious error has ocurred.

            if (info == null)
                throw new InvalidOperationException("An internal error has occurred.");

            ActorStartInfo start_info = (ActorStartInfo)(info);

            // Beacuse the Actor instance is created by the calling thread - thread specific details
            // cant be acquired by the Actor constructor, they can only be acquired after the thread
            // has started running code in the Actor. That's why we do this here - in efffect were
            // 'completing' the object initialization in the context of the started thread.

            name = start_info.Name;
            thread = Thread.CurrentThread;
            native_tid = Kernel.NativeMethods.GetCurrentThreadId();

            try
              {
              // Allow user's derived classs to perform its own initialization.
              // this can indicarte failure if it wants, so we check for this too.

              if (OnThreadStartup(start_info.Startup) == false)
                 return;
              }
            catch (Exception E)
              {
              // Expose the exception via a member in this Actor object. The code that
              // started us can see this and use it to report helpful information.

              start_exception = E; // Allows CreateActor to see failure reason...
              Debug.WriteLine("An exception was thrown during actor startup: " + E.Message);
              continue_to_run = false; // Prevent any further work from being done.
              }
            finally
              {
              // Always wake the waiter (the thread starting us) whether we start OK or not.
              ievent[0].Set();
              }

            // This is the core processing loop. We wait using the Win32 SleepEx which allows
            // us to see why a Sleep exited - ordinarly a Sleep can only exit if the time expires
            // but when the thread gets an APC that too will exit the sleep.

            while (continue_to_run)
            {
                try
                {
                    // If the thread is blocked inside this call, and an APC is sent to the thread
                    // then the function specified in the APC is called. When that function ends
                    // this sleep will subsequently exit with the specific status code.

                    source = ExceptionSource.OnMessageReceived;
                    os_status = Kernel.NativeMethods.SleepEx((UInt32)start_info.IdlePeriod, true);

                    // This is the Win32 status code telling is why a wait ended.

                    if (os_status != Kernel.Constants.WAIT_IO_COMPLETION)
                       {
                       source = ExceptionSource.OnTimeoutExpired;
                       continue_to_run = OnTimeoutExpired();
                       }
                }
                catch (Exception E)
                {
                // Alway trap excetions not trapped by the user's handler methods and forward them to
                // the special handler. This handler will run if an excpetion occurs within either
                // the timeout handler OR the APC handler.
                OnUnhandledException(source,E);
                Debug.WriteLine("An exception was thrown during the processing of an APC.");
                }
            }
        }

Abstract Class

In the sample there’s an abstract class called Actor that you use as a base for your own actor classes. You are therefore compelled to implement handlers for three different kinds of events: Thread startup, APC receipt and Idle timer expiration. Basically you create an Actor using the generic static method CreateActor<A> whose generic argument is your derived actor class.

An Actor instance has a dedicated thread which waits for APCs and timer expirations. Timeouts were included in the sample simply because APCs requires a thread to alertably wait and waiting itself always has an optional timeout. The model is simple yet useful, if your Actor isn’t receiving work via APCs then the timeouts allow it (or rather its thread) to perform useful occasional work, timeouts can be a frequent as once per millisecond and an Actor can adjust the current timeout interval at any time.

Under .Net if any thread allows an exception to propagate up the stack to the CLR then the process is promptly terminated, unhandled exceptions in background worker threads like those used by async/await, Task, Timers, or APM callbacks is a common cause of sudden application failure and can be challenging to track down. For this reason the Actor implementation strives to ensure that no exception arising during execution of your handler implementations can ever cause process termination.

QueueUserAPC

The heart of the simple Actor implementation is the Win32 function QueueUserAPC, here is the code that implements the managed interface to this function, this code can be found inside the ActorHandle class.

        public void SendMessage (int MsgID,object Message)
        {
            uint retcode;

            if (handle == 0)
                throw new InvalidOperationException("The handle to the Actor has not been initialized.");

            if (Message == null)
                throw new ArgumentNullException("Message");

            // Create an APCMessage instance. This is used internally to facilitate
            // the inclusion of system information as well as the user's data.

            APCMessage msg = new APCMessage(MsgID, Message, my_tid);

            retcode = Kernel.NativeMethods.QueueUserAPC(handler,handle,msg);

            if (retcode == 0)
            {
                throw new InvalidOperationException("Unable to Queue an APC to native thread: " + tid + ", Code: " + retcode);
            }
        }

The QueueUserAPC function never blocks and never induces page faults it is a low cost operation and well suited to higher performance ITC (Inter Thread Communication).

Flexibility

The above examples and the accompanying project (in a downloadable ZIP, see below) leverage APCs for inter-thread communication between managed threads within a single AppDomain. However QueueUserAPC can be used between any two threads on the machine even threads running in different processes, this is the true power of APCs when leveraged fully.

In a future post I’ll explain how to implement a more general purpose design that allows managed threads in differing applications to use APCs for communication, this is a very fast and very powerful technique that opens up a whole host of possibilities.

Download the complete sample solution APCSample.

The solution was created with Visual Studio 2013.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s