Implementing an Asynchronous Dispatch Queue with ThreadX

I previously introduced the concept of dispatch queues and walked through the creation of a simple C++ dispatch queue implementation. The original dispatch queue example was implemented using std::mutex, std::thread, and std::condition_variable. Today I'd like to demonstrate the creation of a dispatch queue using ThreadX RTOS primitives instead of the C++ builtin types.

We'll start with a review of what dispatch queues are. If you're familiar with them, feel free to skip to the following section.

A Review of Dispatch Queues

A dispatch queue contains multiple generic-use threads and a work queue. Consumers can dispatch standalone functional operations to the work queue. Each thread pulls from the work queue (or sleeps and waits for new work).

To quote Apple on the advantages of using dispatch queues instead of threads:

  • It reduces the memory penalty your application pays for storing thread stacks in the application’s memory space.
  • It eliminates the code needed to create and configure your threads.
  • It eliminates the code needed to manage and schedule work on threads.
  • It simplifies the code you have to write.

These benefits are pretty real and tangible. As we saw in "The Problem With Threads", threading introduces nondeterminism into our system. By controlling our threading models using concurrent and serial dispatch queues, we gain a better grasp on the nondeterminism of our system.

The dispatch queue concept simplifies many of the threading scenarios encountered in embedded programming. Often, I just need to run small simple tasks asynchronously without blocking the primary thread. This results in spawning numerous threads with single small purposes:

  • When a button is pressed, update the drawing on the screen
  • When charging is complete, change LEDs and notify the system
  • When recording starts, turn on an LED and start drawing the elapsed record time on the screen

These simple steps could be run on any generic set of threads without requiring the overhead of thread management, excessive context switching, and higher potential for other threading errors.

A C++11 and ThreadX Dispatch Queue

We'll be modifying the C++11 dispatch queue implementation to use ThreadX RTOS primitives instead of the C++11 types std::mutex, std::thread, and std::condition_variable. We will stick to C++11 features in places where RTOS primitives are not required.

std::function refresher

std::function is a very useful C++11 feature for capturing Callable objects. As a refresher:

Instances of std::function can store, copy, and invoke any Callable target -- functions, lambda expressions, bind expressions, or other function objects, as well as pointers to member functions and pointers to data members.

For this example, we will prototype our function objects as:

typedef std::function<void(void)> fp_t;

Bounce Refresher

ThreadX is implemented in C, and our dispatch queue is being implemented in C++. We'll need to utilize the bounce function to make sure our ThreadX thread interfaces with the correct object's dispatch handler. For more information on the bounce function, please see the previous article.

Here's the implementation of bounce:

/// This Bounce implementation is pulled from bounce.cpp
template<class T, class Method, Method m, class ...Params>
static auto bounce(void *priv, Params... params) ->
        decltype(((*reinterpret_cast<T *>(priv)).*m)(params...))
{
    return ((*reinterpret_cast<T *>(priv)).*m)(params...);
}

/// Convenience macro to simplify bounce statement usage
#define BOUNCE(c,m) bounce<c, decltype(&c::m), &c::m>

A queue of functions

The primary purpose of using a dispatch queue is to provide a first-in, first-out processing model.

C++ luckily provides us a simple std::queue type which we can use for this purpose:

std::queue<fp_t> q_;

In order to add to the queue we simply need to:

q_.push(op);

And to get the next item:

auto op = q_.front(); //get the front item
q_.pop(); //and pop it from the queue

Allocating Queue Threads

Our goal is to make our dispatch queue generic enough that we can change the number of threads for each queue we create. This allows us to create concurrent queues that allow generic tasks to run in parallel, as well as serial queues that only utilize one thread in order to protect a resource.

Instead of using a std::vector of std::thread, we'll instead build a container based on the ThreadX type TX_THREAD:

/// Thread type
struct tx_thread_t {
    TX_THREAD thread;
    std::string name;
    std::unique_ptr<uint8_t> stack;
};

Each thread's stack and name will be tracked with the internal thread object. We'll then create a std::vector of tx_thread_t to keep track of our dispatch threads:

std::vector<tx_thread_t> threads_;

Making Our Dispatch Queue Thread-Safe

Our dispatch queue is a shared resource in two potential directions:

  • Any thread can add work to the queue
  • The queue may have multiple threads which remove work from the queue for processing

In order to make sure we implement this safely, we must rely on a locking mechanism. In this case we will utilize ThreadX's builtin mutex type:

TX_MUTEX mutex_;

The queue itself is the critical piece, so we will lock around all queue modifications.

Constructing Our Dispatch Queue

Our ThreadX dispatch queue constructor is responsible for instantiating three components:

  1. The internal mutex which protects the work queue
  2. The event flags which wake the threads
  3. The worker threads

Our constructor prototype will also take an additional function argument: thread_stack_size. This can have a default value (such as 1KB) or can be specified during construction.

dispatch_queue(std::string name, size_t thread_cnt = 1, size_t thread_stack_size) : 
    name_(name), threads_(thread_cnt)

Creating the mutex and event flags structures involve straightforward ThreadX calls:

// Initialize the Mutex
uint8_t status = tx_mutex_create(&mutex_, "Dispatch Mutex", TX_INHERIT);

// Create the event flags
status = tx_event_flags_create(&notify_flags_, "Dispatch Event Flags");

When constructing our dispatch queue, we can specify the number of threads desired. Our constructor does the work of creating the required number of tx_thread_t objects in our std::vector container. For each thread, we'll need to allocate a stack, create a unique thread name, and then finally create the thread.

In order for ThreadX to find its way to the correct dispatch_queue object, we'll utilize BOUNCE to make sure we get back to the correct object:

reinterpret_cast<void(*)(ULONG)>(
    BOUNCE(dispatch_queue, dispatch_thread_handler)), reinterpret_cast<ULONG>(this),

Here's our full thread initialization loop:

// Dispatch thread setup
for(size_t i = 0; i < threads_.size(); i++)
{
    // allocate the thread stack
    threads_[i].stack.reset(new uint8_t[thread_stack_size]);

    // Define the name
    threads_[i].name = std::string("Dispatch Thread " 
        + std::to_string(i));

    // Create and autostart the thread
    status = tx_thread_create(&threads_[i].thread, 
            threads_[i].name.c_str(),
            reinterpret_cast<void(*)(ULONG)>(
                BOUNCE(dispatch_queue, dispatch_thread_handler)),
            reinterpret_cast<ULONG>(this),
            threads_[i].stack.get(), thread_stack_size,
            DISPATCH_Q_PRIORITY, DISPATCH_Q_PRIORITY, 
            DISPATCH_TIME_SLICE,
            TX_AUTO_START);
    assert(status == TX_SUCCESS && "Failed to create thread!");
}

Note that the tx_thread_create function requires you to specify a thread priority and a time slice value. For this example I've defined two default values:

/// Example thread priority and time slice
#define DISPATCH_Q_PRIORITY 15
#define DISPATCH_TIME_SLICE 5

For further discussion on selecting thread priority and time slice, see Thread Priorities and Time Slicing below.

Dispatch Thread Handler Requirements

The dispatch queue worker thread handler should be a simple one. Its only requirements are:

  1. Wait until there is something to run
  2. Pop that item from the queue
  3. Run the item
  4. Check whether I need to quit, if not: wait again

Once we understand our requirements for the worker threads, we encounter a question: how do I know that there's something to execute without keeping these threads awake?

Event Flags: Our Condition Variable Replacement

Instead of using std::condition_variable to wake threads when work is ready, we will utilize the ThreadX builtin event flags type:

/// TX event flags - like condition variable
TX_EVENT_FLAGS_GROUP notify_flags_;

We will define two event flags to be used by the queue. One flag will tell threads to wake up, and the other flag will be set when a thread exits.

/// Definitions for dispatch event flags
#define DISPATCH_WAKE_EVT    (0x1)
#define DISPATCH_EXIT_EVT    (0x2)

Adding Work to the Queue

We can let our threads sleep until work has been added to the queue. By setting an event flag, the next available thread will wake up, remove work from the queue, and execute.

Our queue will always be protected by the mutex, so we need to lock and unlock before pushing a new piece of work onto the queue.

void dispatch_queue::dispatch(const fp_t& op)
{
    uint8_t status = tx_mutex_get(&mutex_, TX_WAIT_FOREVER);
    assert(status == TX_SUCCESS && "Failed to lock mutex!");

    q_.push(op);

    status = tx_mutex_put(&mutex_);
    assert(status == TX_SUCCESS && "Failed to unlock mutex!");

    // Notifies threads that new work has been added to the queue
    tx_event_flags_set(&notify_flags_, DISPATCH_WAKE_EVT, TX_OR);
}

Exiting

The next question is: how do I know when to stop running and exit?

The simplest way is to add an exit_ or active_ boolean flag to our dispatch queue. When instructed to stop() or when destructing the queue, you can set this flag, notify threads that they need to wake up, and wait for confirmation that they have finished.

Because ThreadX does not have its own "join" function, we will imitate the behavior. We'll tell threads to wake up until we have confirmation that every thread is destroyed. We set the "wake" flag to wake up any remaining threads, and we wait for an "exit" event. Because we are not guaranteed that threads will exit in order, we will utilize a timeout on with tx_event_flags_get. This timeout allows us to continue through our loop even if all threads have exited.

dispatch_queue::~dispatch_queue()
{
    // Signal to dispatch threads that it's time to wrap up
    quit_ = true;

    // We will join each thread to confirm exiting
    for (size_t i = 0; i < threads_.size(); ++i) {
        UINT state;
        ULONG flags;
        do {
            // Signal wake - check exit flag
            tx_event_flags_set(&notify_flags_, 
                DISPATCH_WAKE_EVT, 
                TX_OR);

            // Wait until a thread signals exit. 
            // Timeout is acceptable - all threads may have exited
            tx_event_flags_get(&notify_flags_, 
                DISPATCH_EXIT_EVT, 
                TX_OR_CLEAR,
                &flags, 10);

            // If it was not thread_[i], that is ok, 
            // but we will loop around
            // until threads_[i] has exited
            tx_thread_info_get(&threads_[i].thread, nullptr, &state,
                    nullptr, nullptr, nullptr, nullptr,
                    nullptr, nullptr);
        } while (state != TX_COMPLETED);

        // threads_[i] has exited - let's delete it 
        // and move on to the next one
        status = tx_thread_delete(&threads_[i].thread);
        assert(status == TX_SUCCESS && "Failed to delete thread!");
        threads_[i].name.clear();
        threads_[i].stack.reset();
    }

    // Cleanup event flags and mutex
    tx_event_flags_delete(&notify_flags_);

    status = tx_mutex_delete(&mutex_);
    assert(status == TX_SUCCESS && "Failed to delete mutex!");
}

We can then add state checking to the thread handler. The thread handler can monitor the quit_ flag and exit when it is set.

The thread handler will also need to set the DISPATCH_EXIT_EVT flag when quitting in order to work with the logic shown above:

// Set a signal to indicate a thread exited
status = tx_event_flags_set(&notify_flags_, 
    DISPATCH_EXIT_EVT, 
    TX_OR);
assert(status == TX_SUCCESS && "Failed to set event flags!");

Dispatch Thread Handler Implementation

In our worker thread, we primarily wait until we are notified that there is new work. Upon waking, the thread will take the lock, get an item from the queue, and resume operation.

If there is no work to execute we will release the lock and sleep until we are notified that new work has been added to the queue.

void dispatch_queue::dispatch_thread_handler(void)
{
    ULONG flags;

    uint8_t status = tx_mutex_get(&mutex_, TX_WAIT_FOREVER);
    assert(status == TX_SUCCESS && "Failed to lock mutex!");

    do {
        //after wait, we own the lock
        if(q_.size() && !quit_)
        {
            auto op = std::move(q_.front());
            q_.pop();

            //unlock now that we're done messing with the queue
            status = tx_mutex_put(&mutex_);
            assert(status == TX_SUCCESS && 
                "Failed to unlock mutex!");

            op();

            status = tx_mutex_get(&mutex_, TX_WAIT_FOREVER);
            assert(status == TX_SUCCESS && "Failed to lock mutex!");
        }
        else if(!quit_)
        {
            status = tx_mutex_put(&mutex_);
            assert(status == TX_SUCCESS && 
                "Failed to unlock mutex!");

            // Wait for new work
            status = tx_event_flags_get(&notify_flags_, 
                    DISPATCH_WAKE_EVT, 
                    TX_OR_CLEAR,
                    &flags, TX_WAIT_FOREVER);
            assert(status == TX_SUCCESS && 
                "Failed to get event flags!");

            status = tx_mutex_get(&mutex_, TX_WAIT_FOREVER);
            assert(status == TX_SUCCESS && "Failed to lock mutex!");
        }
    } while (!quit_);

    // We were holding the mutex after we woke up
    status = tx_mutex_put(&mutex_);
    assert(status == TX_SUCCESS && "Failed to unlock mutex!");

    // Set a signal to indicate a thread exited
    status = tx_event_flags_set(&notify_flags_, 
        DISPATCH_EXIT_EVT, 
        TX_OR);
    assert(status == TX_SUCCESS && "Failed to set event flags!");
}

Thread Priorities and Time Slicing

Selecting thread priorities and ensuring your system runs smoothly without any priority inversions can be a difficult task. In general, it is best if your dispatch queue has a mid-level or low priority. If the priority of the dispatch queue is set too high, low-priority asynchronous work may end up starving the CPU and blocking primary system threads from running.

If you need queues of differing priorities, you can always create multiple queues and utilize different priorities in each queue.

Time-slicing also needs to be set to a non-zero value. Since we have multiple dispatch queues running at equal priorities, we need to utilize time-slicing to prevent the scheduler from running a single dispatch thread and blocking the others. Time-slicing enables us to rotate from worker thread to worker thread as long as the threads are active.

You can tune the time-slice value to ensure the dispatch queues only utilize a certain percentage of the CPU's cycles. This example uses a low time-slice value and gives equal priority to each dispatch thread.

Putting it all Together

I've added the complete ThreadX dispatch queue implementation to GitHub.

Because the example uses ThreadX function calls, I have only built the example as a static library. It will not link or execute unless you supply a ThreadX library for your platform.

Further Reading: