ThreadX

Refactoring the ThreadX Dispatch Queue To Use std::mutex

Now that we've implemented std::mutex for an RTOS, let's refactor a library using RTOS-specific calls so that it uses std:mutex instead.

Since we have a ThreadX implementation for std::mutex, let's update our ThreadX-based dispatch queue. Moving to std::mutex will result in a simpler code structure. We still need to port std::thread and std::condition_variable to achieve true portability, but utilizing std::mutex is still a step in the right direction.

For a quick refresher on dispatch queues, refer to following articles:

Table of Contents

  1. How std::mutex Helps Us
    1. C++ Mutex Wrappers
  2. Refactoring the Asynchronous Dispatch Queue
    1. Class Definition
    2. Constructor
    3. Destructor
    4. Dispatch
    5. Thread Handler
  3. Putting It All Together
  4. Further Reading

How std::mutex Helps Us

Even though we can't yet make our dispatch queue fully portable, we still benefit from using std::mutex in the following ways:

  1. We no longer have to worry about initializing or deleting our mutex since the std::mutex constructor and destructor handles that for us
  2. We can take advantage of RAII to lock whenever we enter a scope, and to automatically unlock when we leave that scope
  3. We can utilize standard calls (with no arguments!), reducing the burden of remembering the exact ThreadX functions and arguments

But these arguments might not have real impact. Just take a look at the ThreadX native calls:

uint8_t status = tx_mutex_get(&mutex_, TX_WAIT_FOREVER);

// do some stuff

status = tx_mutex_put(&mutex_);

And here's the std::mutex equivalent:

mutex_.lock()

//do some stuff

mutex_.unlock()

Don't you prefer the std::mutex version?

C++ Mutex Wrappers

While we could manually call lock() and unlock() on our mutex object, we'll utilize two helpful C++ mutex constructs: std::lock_guard and std::unique_lock.

The std::lock_guard wrapper provides an RAII mechanism for our mutex. When we construct a std::lock_guard, the mutex starts in a locked state (or waits to grab the lock). Whenever we leave that scope the mutex will be released automatically. A std::lock_guard is especially useful in functions that can return at multiple points. No longer do you have to worry about releasing the mutex at each exit point: the destructor has your back.

We'll also take advantage of the std::unique_lock wrapper. Using std::unique_lock provides similar benefits to std::lock_guard: the mutex is locked when the std::unique_lock is constructed, and unlocked automatically during destruction. However, it provides much more flexibility than std::lock_guard, as we can manually call lock() and unlock(), transfer ownership of the lock, and use it with condition variables.

Refactoring the Asynchronous Dispatch Queue

We will utilize both std::lock_guard and std::unique_lock to simplify our ThreadX dispatch queue.

Our starting point for this refactor will be the dispatch_threadx.cpp file in the embedded-resources repository.

Class Definition

In our dispatch class, we need to change the mutex definition from TX_MUTEX to std::mutex:

std::mutex mutex_;

Constructor

Mutex initialization is handled for us by the std::mutex constructor. We can remove the tx_mutex_create call from our dispatch queue constructor:

// Initialize the Mutex
uint8_t status = tx_mutex_create(&mutex_, "Dispatch Mutex", TX_INHERIT);
assert(status == TX_SUCCESS && "Failed to create mutex!");

Destructor

Mutex deletion is handled for us by the std::mutex destructor. We can remove the tx_mutex_delete call from the dispatch queue destructor:

status = tx_mutex_delete(&mutex_);
assert(status == TX_SUCCESS && "Failed to delete mutex!");

Dispatch

By using std::lock_guard, we can remove both the mutex get and put calls. RAII will ensure that the mutex is unlocked when we leave the function.

Here's the dispatch() implementation using std::lock_guard:

void dispatch_queue::dispatch(const fp_t& op)
{
    std::lock_guard<std::mutex> lock(mutex_);

    q_.push(op);

    // Notifies threads that new work has been added to the queue
    tx_event_flags_set(&notify_flags_, DISPATCH_WAKE_EVT, TX_OR);
}

If you still wanted to unlock before setting the event flag, use std::unique_lock instead of std::lock_guard. Using std::unique_lock allows you to call unlock().

void dispatch_queue::dispatch(const fp_t& op)
{
    std::unique_lock<std::mutex> lock(mutex_);

    q_.push(op);

    lock.unlock();

    // Notifies threads that new work has been added to the queue
    tx_event_flags_set(&notify_flags_, DISPATCH_WAKE_EVT, TX_OR);
}

Either approach is acceptable and looks much cleaner than the native calls.

Why would you potentially care about calling unlock()? If you are using std::lock_guard, it is possible that the event flags will wake a thread, go to grab the mutex, and then sleep again until the dispatch() function exits. However, the dispatch() function will just release the mutex and the thread that is waiting will wake up and resume operation.

Thread Handler

We need to manually lock and unlock around specific points in our thread handler. Instead of std::lock_guard, we will use std::unique_lock so we can call unlock().

Here's our simplified thread handler:

void dispatch_queue::dispatch_thread_handler(void)
{
    ULONG flags;
    uint8_t status;

    std::unique_lock<std::mutex> lock(mutex_);

    do {
        //after wait, we own the lock
        if(q_.size() && !quit_)
        {
            auto op = std::move(q_.front());
            q_.pop();

            //unlock now that we're done messing with the queue
            lock.unlock();

            op();

            lock.lock();
        }
        else if(!quit_)
        {
            lock.unlock();

            // Wait for new work
            status = tx_event_flags_get(&notify_flags_, 
                DISPATCH_WAKE_EVT, 
                TX_OR_CLEAR,
                &flags, 
                TX_WAIT_FOREVER);
            assert(status == TX_SUCCESS && 
                "Failed to get event flags!");

            lock.lock();
        }
    } while (!quit_);

    // We were holding the mutex after we woke up
    lock.unlock();

    // Set a signal to indicate a thread exited
    status = tx_event_flags_set(&notify_flags_, 
        DISPATCH_EXIT_EVT, TX_OR);
    assert(status == TX_SUCCESS && "Failed to set event flags!");
}

Looks a bit saner already!

Putting It All Together

Example source code can be found in the embedded-resources GitHub repository. The original ThreadX dispatch queue implementation can also be found in embedded-resources.

To build the example, run make at the top-level or inside of the examples/cpp folder.

The example is built as a static library. ThreadX headers are provided in the repository, but not binaries or source.

As we implement std::thread and std::condition_variable in the future, we will simplify our RTOS-based dispatch queue even further.

Further Reading

Implementing std::mutex with ThreadX

As embedded systems developers, we are often constrained in the APIs and libraries that we can use in our programs. Many library creators assume a full-fledged computer environment and build their software on top of other host-machine libraries. This works extremely well until an embedded developer tries to port the software to a bare metal system - we often quickly find out that there are other dependencies or assumptions that we haven't ported, don't want to port, or can't feasibly meet on our system.

Unfortunately the C++ STL falls into the "built for host machines" category. There are many useful features in the STL that are upon pthreads or other support that is not always implemented for bare metal systems. However, I still want to use the C++ STL on my embedded systems, and with a little bit of work we can make that happen.

Today we'll start with an implementation of std::mutex using ThreadX RTOS primitives. Even if you aren't using ThreadX, you can generalize this approach to the RTOS of your choice. Next week I'll also provide a std::mutex implementation using the increasingly popular FreeRTOS.

Table of Contents

  1. A Review of ThreadX Mutex Support
  2. Implementing std::mutex with ThreadX
    1. About libc++ Threading Abstractions
    2. Creating Our __external_threading Header
    3. Populating __external_threading_threadx
    4. Implementing the Recursive Mutex Shims
    5. Implementing the Mutex Shims
    6. Breaking with the Standard: Initializing std::mutex
    7. Modifying __mutex_base
    8. Modifying mutex.cpp
  3. Building Our Custom std::mutex
  4. Putting It All Together
  5. Further Reading

A Review of ThreadX Mutex Support

The ThreadX RTOS provides a the mutex type TX_MUTEX. The ThreadX mutex is implemented as a recursive mutex, so we will use the same type and functions for both the std::mutex and std::recursive_mutex implementations.

ThreadX uses four functions to interact with the mutex:

UINT tx_mutex_create(TX_MUTEX* ptr, const char* name, UINT inherit);
UINT tx_mutex_delete(TX_MUTEX *ptr);
UINT tx_mutex_get(TX_MUTEX* ptr, ULONG wait_option);
UINT tx_mutex_put(TX_MUTEX* ptr);

Each mutex can be given a name and priority inheritance option during creation. Two inheritance values are defined:

#define TX_INHERIT 1
#define TX_NO_INHERIT 0

If TX_INHERIT is supplied during mutex creation, priority inheritance will be enabled for the mutex. Lower-priority threads will temporarily assume the priority of a higher-priority thread that is waiting for the mutex owned by the lower priority thread. If TX_NO_INHERIT is supplied during mutex creation, priority inheritance will not be enabled.

Note: Generally we want priority inheritance to be enabled to prevent priority inversions or deadlocks.

The tx_mutex_get function uses the wait_option parameter. The wait_option argument is used to set a timeout for getting the mutex. Two standard wait options are defined by ThreadX:

#define TX_NO_WAIT  0
#define TX_WAIT_FOREVER ((ULONG) 0xFFFFFFFF)

If TX_NO_WAIT is supplied, tx_mutex_get will immediately return and report whether or not getting the mutex was successful.

If TX_WAIT_FOREVER is supplied, the thread will block until the mutex is successfully acquired.

If a value other than TX_NO_WAIT or TX_WAIT_FOREVER is supplied, it is interpreted as a timeout. The thread will block until the specified number of ticks have elapsed or the mutex was successfully locked.

Success in any of the functions is indicated by a return code of TX_SUCCESS. Other return codes indicate failure.

Implementing std::mutex with ThreadX

Now that we've familiarized ourselves with the ThreadX mutex APIs, let's get started with our std::mutex port.

Before we can dive into the code, we need to find ourselves a working set of C++ files to start from. I recommend the LLVM libc++ implementation.

To get the LLVM libc++ files, you can perform an svn checkout:

svn co http://llvm.org/svn/llvm-project/libcxx/trunk libcxx

The entire std::mutex implementation encompasses the following files:

  • mutex (header)
  • __mutex_base (header)
  • __threading_support (header, defines underlying mutex types)
  • mutex.cpp

We'll be making minor modifications to the files above and creating our own definitions for the underlying mutex types.

If you don't want to start from scratch, you can always modify the example mutex files from my embedded-resources repository for your RTOS.

About libc++ Threading Abstractions

We'll start our std::mutex port by exploring the __threading_support file. This file defines the underlying libc++ threading types and shim functions for std::mutex and friends, std::thread, and std::condition_variable.

Luckily the threading code is easily decoupled from the underlying pthread implementation. If you look inside the __threading_support header, you'll notice that we can supply our own __external_threading header to override the default definitions:

#if defined(_LIBCPP_HAS_THREAD_API_EXTERNAL)
# include <__external_threading>

We will use this __external_threading header to re-implement the threading types and shim functions for our target RTOS.

Creating our __external_threading header

First we need to create our __external_threading header. I'm interested in building a C++ library that supports multiple RTOSes, so I will be implementing a two-header system. The basic __external_threading header will re-route the compiler to another header based on the RTOS I am targeting. For now we will just populate it with our ThreadX definitions:

#ifndef _LIBCPP_EXTERNAL_THREADING_SUPPORT
#define _LIBCPP_EXTERNAL_THREADING_SUPPORT

#if THREADX
#include <__external_threading_threadx>
#endif

#endif //_LIBCPP_EXTERNAL_THREADING_SUPPORT

If you are only supporting a single RTOS, feel free to skip the layer of redirection.

Populating __external_threading_threadx

We'll create the __external_threading_threadx file for storing our custom threading definitions.

First we'll include the tx_api.h header:

#include <threadx/tx_api.h>

I also included some of the boilerplate definitions from the start of the __threading_support header:

#if defined(__FreeBSD__) && defined(__clang__) && __has_attribute(no_thread_safety_analysis)
#define _LIBCPP_NO_THREAD_SAFETY_ANALYSIS __attribute__((no_thread_safety_analysis))
#else
#define _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
#endif

_LIBCPP_BEGIN_NAMESPACE_STD

#define _LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_FUNC_VIS

Supplying the mutex definitions for ThreadX is straightforward:

// Mutex
typedef TX_MUTEX __libcpp_mutex_t;

//TX_MUTEX is a struct, so we need a struct initializer
#define _LIBCPP_MUTEX_INITIALIZER {0}

typedef TX_MUTEX __libcpp_recursive_mutex_t;

We're not quite ready to port std::condition_variable or std::thread yet, so we'll import the generic type definitions from the "no pthread API" case in __threading_support:

// Condition Variable
typedef void* __libcpp_condvar_t;
#define _LIBCPP_CONDVAR_INITIALIZER 0

// Execute Once
typedef void* __libcpp_exec_once_flag;
#define _LIBCPP_EXEC_ONCE_INITIALIZER 0

// Thread ID
typedef long __libcpp_thread_id;

// Thread
#define _LIBCPP_NULL_THREAD 0U

typedef void* __libcpp_thread_t;

// Thread Local Storage
typedef long __libcpp_tls_key;

#define _LIBCPP_TLS_DESTRUCTOR_CC __stdcall

In coming articles we'll work on porting std::thread and possibly std::condition_variable, so these definitions will be updated. For now I'm just focused on std::mutex.

I also imported the shim function prototypes from __threading_support:

// Mutex
_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_recursive_mutex_init(__libcpp_recursive_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_recursive_mutex_lock(__libcpp_recursive_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
bool __libcpp_recursive_mutex_trylock(__libcpp_recursive_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_recursive_mutex_unlock(__libcpp_recursive_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_recursive_mutex_destroy(__libcpp_recursive_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_mutex_lock(__libcpp_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
bool __libcpp_mutex_trylock(__libcpp_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_mutex_unlock(__libcpp_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_mutex_destroy(__libcpp_mutex_t *__m);

I also imported the std::thread and std::condition_variable shims so the compiler will be happy:

// Condition variable
_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_condvar_signal(__libcpp_condvar_t* __cv);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_condvar_broadcast(__libcpp_condvar_t* __cv);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_condvar_wait(__libcpp_condvar_t* __cv, __libcpp_mutex_t* __m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_condvar_timedwait(__libcpp_condvar_t *__cv, __libcpp_mutex_t *__m,
                               timespec *__ts);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_condvar_destroy(__libcpp_condvar_t* __cv);

// Execute once
_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_execute_once(__libcpp_exec_once_flag *flag,
                          void (*init_routine)(void));

// Thread id
_LIBCPP_THREAD_ABI_VISIBILITY
bool __libcpp_thread_id_equal(__libcpp_thread_id t1, __libcpp_thread_id t2);

_LIBCPP_THREAD_ABI_VISIBILITY
bool __libcpp_thread_id_less(__libcpp_thread_id t1, __libcpp_thread_id t2);

// Thread
_LIBCPP_THREAD_ABI_VISIBILITY
bool __libcpp_thread_isnull(const __libcpp_thread_t *__t);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_thread_create(__libcpp_thread_t *__t, void *(*__func)(void *),
                           void *__arg);

_LIBCPP_THREAD_ABI_VISIBILITY
__libcpp_thread_id __libcpp_thread_get_current_id();

_LIBCPP_THREAD_ABI_VISIBILITY
__libcpp_thread_id __libcpp_thread_get_id(const __libcpp_thread_t *__t);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_thread_join(__libcpp_thread_t *__t);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_thread_detach(__libcpp_thread_t *__t);

_LIBCPP_THREAD_ABI_VISIBILITY
void __libcpp_thread_yield();

_LIBCPP_THREAD_ABI_VISIBILITY
void __libcpp_thread_sleep_for(const chrono::nanoseconds& __ns);

// Thread local storage
_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_tls_create(__libcpp_tls_key* __key,
                        void(_LIBCPP_TLS_DESTRUCTOR_CC* __at_exit)(void*));

_LIBCPP_THREAD_ABI_VISIBILITY
void *__libcpp_tls_get(__libcpp_tls_key __key);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_tls_set(__libcpp_tls_key __key, void *__p);

Implementing the Recursive Mutex Shims

The recursive mutex shim functions are just wrappers around our underlying ThreadX mutex calls:

int __libcpp_recursive_mutex_init(__libcpp_recursive_mutex_t *__m)
{
  return tx_mutex_create(__m, "std::recursive_mutex", TX_INHERIT);
}

int __libcpp_recursive_mutex_lock(__libcpp_recursive_mutex_t *__m)
{
  //tx mutex is already recursive
  return tx_mutex_get(__m, TX_WAIT_FOREVER);
}

bool __libcpp_recursive_mutex_trylock(__libcpp_recursive_mutex_t *__m)
{
  //intentional no wait for try_lock
  return tx_mutex_get(__m, TX_NO_WAIT) == TX_SUCCESS;
}

int __libcpp_recursive_mutex_unlock(__libcpp_mutex_t *__m)
{
  return tx_mutex_put(__m);
}

int __libcpp_recursive_mutex_destroy(__libcpp_recursive_mutex_t *__m)
{
  return tx_mutex_delete(__m);
}

Implementing the Mutex Shims

Since ThreadX has no distinction between a recursive and non-recursive mutex, our mutex shim functions look exactly the same:

int __libcpp_mutex_lock(__libcpp_mutex_t *__m)
{
  return tx_mutex_get(__m, TX_WAIT_FOREVER);
}

bool __libcpp_mutex_trylock(__libcpp_mutex_t *__m)
{
  //intentional no wait for try_lock
  return tx_mutex_get(__m, TX_NO_WAIT) == TX_SUCCESS;
}

int __libcpp_mutex_unlock(__libcpp_mutex_t *__m)
{
  return tx_mutex_put(__m);
}

int __libcpp_mutex_destroy(__libcpp_mutex_t *__m)
{
  return tx_mutex_delete(__m);
}

Breaking with the Standard: Initializing std::mutex

Unfortunately, we'll need to break with the standard to support std::mutex with an RTOS.

The default mutex implementation assumes you can create your mutex with the default value:

#define _LIBCPP_MUTEX_INITIALIZER {0}

The std::mutex constructor assumes the mutex can be created by supplying the default initialization value:

mutex() _NOEXCEPT {__m_ = (__libcpp_mutex_t)_LIBCPP_MUTEX_INITIALIZER;}

We need to call a function to initialize our mutex. This function might fail, and if it does we should throw an exception. Since std::recursive_mutex has an initialization function and can throw, I've decided to break with the standard on this point. (I'm still investigating why things have been set up this way…)

We'll define a new definition in our __external_threading_threadx header that can control the behavior of the std::mutex constructor:

// TX_MUTEX requires a function to initialize
#define _MUTEX_REQUIRES_INITIALIZATION 1

We'll also define a new mutex shim function that our constructor can call:

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_mutex_init(__libcpp_mutex_t *__m);

int __libcpp_mutex_init(__libcpp_mutex_t *__m)
{
  return tx_mutex_create(__m, "std::mutex", TX_INHERIT);
}

Modifying __mutex_base

In order to support our custom shim function, we will need to modify the __mutex_base header.

If _MUTEX_REQUIRES_INITIALIZATION is defined and equal to 1, we will want to call our custom constructor. Otherwise we want to implement the default behavior.

Unfortunately, the need to initialize our std::mutex means we cannot use the constexpr constructor, so we are deviating from the C++ standard.

We'll adjust the std::mutex constructor logic to meet this behavioral requirement:

#if defined(_MUTEX_REQUIRES_INITIALIZATION) &&
     (_MUTEX_REQUIRES_INITIALIZATION == 1)
    // We currently deviate from the standard on this point
    // Since some of our mutexes require dynamic initialization
    mutex() _NOEXCEPT;
#else
#ifndef _LIBCPP_CXX03_LANG
    constexpr mutex() = default;
#else
    mutex() _NOEXCEPT {__m_ = 
    (__libcpp_mutex_t)_LIBCPP_MUTEX_INITIALIZER;}
#endif

I am investigating alternative methods to satisfy the constexpr requirement. Unfortunately, it might require initialization when calling lock().

Modifying mutex.cpp

The shim functions are called in mutex.cpp, so we need to add our std::mutex constructor there. The constructor should only be included if _MUTEX_REQUIRES_INITIALIZATION is set.

#if defined(_MUTEX_REQUIRES_INITIALIZATION) && (_MUTEX_REQUIRES_INITIALIZATION == 1)
mutex::mutex()
{
    int ec = __libcpp_mutex_init(&__m_);
    if (ec)
        __throw_system_error(ec, "mutex constructor failed");
}
#endif

And that completes the port! Thanks to a well-designed shim layer, we didn't need to do too much work at all.

Building Our Custom std::mutex

A few compilation options need to be set in order to build std::mutex with support for ThreadX. Since a variety of build systems are in use, I am only providing general build strategies.

First, we'll need to set _LIBCPP_HAS_THREAD_API_EXTERNAL so that the compiler looks for the __external_threading header.

-D _LIBCPP_HAS_THREAD_API_EXTERNAL

If you're using the __external_threading implementation that will support multiple RTOSes, you'll also need to set a THREADX definition:

-DTHREADX=1

You'll also want to set the following compiler flags so that the default C++ libraries are not linked:

-fno-builtin -nodefaultlibs

As well as the following link options:

-nodefaultlibs

Because we didn't include all the cpp headers, we need to point our compiler to the include location for other C++ headers. Make sure your local path is placed ahead of the mainstream path so that your compiler picks it up first.

Here's an example if you're compiling with clang on OSX:

-I/path/to/src/include -I/usr/local/opt/llvm/include/c++/v1/

Putting It All Together

I've included my example std::mutex implementation in the embedded-resources GitHub repository. The implementation can be found in examples/libcpp.

The example is currently built as a static library. Only ThreadX headers are included in the repository, so the current example is only runnable if you have a ThreadX implementation.

To compile the example, simply run:

cd examples/libcpp
make

In a coming article I will walk through a std::mutex implementation using FreeRTOS.

Further Reading

Implementing an Asynchronous Dispatch Queue with ThreadX

Updated: 2018-12-19

I previously introduced the concept of dispatch queues and walked through the creation of a simple C++ dispatch queue implementation. The original dispatch queue example is implemented using std::mutex, std::thread, and std::condition_variable. Today I'd like to demonstrate the creation of a dispatch queue using ThreadX RTOS primitives instead of the C++ builtin types.

We'll start with a review of what dispatch queues are. If you're familiar with them, feel free to skip to the following section.

A Review of Dispatch Queues

A dispatch queue contains multiple generic-use threads and a work queue. Consumers can dispatch standalone functional operations to the work queue. Each thread pulls from the work queue (or sleeps and waits for new work).

To quote Apple on the advantages of using dispatch queues instead of threads:

  • It reduces the memory penalty your application pays for storing thread stacks in the application’s memory space.
  • It eliminates the code needed to create and configure your threads.
  • It eliminates the code needed to manage and schedule work on threads.
  • It simplifies the code you have to write.

These benefits are pretty real and tangible. As we saw in "The Problem With Threads", threading introduces nondeterminism into our system. By controlling our threading models using concurrent and serial dispatch queues, we gain a better grasp on the nondeterminism of our system.

The dispatch queue concept simplifies many of the threading scenarios encountered in embedded programming. Often, I just need to run small simple tasks asynchronously without blocking the primary thread. This results in spawning numerous threads with single small purposes:

  • When a user presses a button, update the drawing on the screen
  • When charging is complete, change LEDs and notify the system
  • When recording starts, turn on an LED and start drawing the elapsed record time on the screen

These simple steps can run on any generic thread. These trivial operations don't require the overhead of explicit thread management, excessive context switching, and higher potential for other threading errors.

A C++11 and ThreadX Dispatch Queue

We'll be modifying the C++11 dispatch queue implementation to use ThreadX RTOS primitives instead of the C++11 types std::mutex, std::thread, and std::condition_variable. We will stick to C++11 features in places where RTOS primitives are not required.

std::function refresher

std::function is a useful C++11 feature for capturing Callable objects. As a refresher:

Instances of std::function can store, copy, and invoke any Callable target -- functions, lambda expressions, bind expressions, or other function objects, as well as pointers to member functions and pointers to data members.

For this example, we will prototype our function objects as:

typedef std::function<void(void)> fp_t;

Bounce Refresher

ThreadX is implemented in C, and our dispatch queue is implemented in C++. We'll need the bounce function to make sure our ThreadX thread interfaces with the correct object's dispatch handler. For more information on the bounce function, please see the linked article.

Here's the implementation of bounce:

/// This Bounce implementation is pulled from bounce.cpp
template<class T, class Method, Method m, class ...Params>
static auto bounce(void *priv, Params... params) ->
        decltype(((*reinterpret_cast<T *>(priv)).*m)(params...))
{
    return ((*reinterpret_cast<T *>(priv)).*m)(params...);
}

/// Convenience macro to simplify bounce statement usage
#define BOUNCE(c,m) bounce<c, decltype(&c::m), &c::m>

A queue of functions

The primary purpose of using a dispatch queue is to provide a first-in, first-out processing model.

C++ luckily provides us a simple std::queue type which we can use for this purpose:

std::queue<fp_t> q_;

In order to add to the queue we simply need to:

q_.push(op);

And to get the next item:

auto op = q_.front(); //get the front item
q_.pop(); //and pop it from the queue

Allocating Queue Threads

Our goal is to make our dispatch queue generic enough that we can change the number of threads for each queue we create. This allows us to create concurrent queues that allow generic tasks to run in parallel, as well as serial queues that only utilize one thread to protect a resource.

Instead of using a std::vector of std::thread, we'll instead build a container based on the ThreadX type TX_THREAD:

/// Thread type
struct tx_thread_t {
    TX_THREAD thread;
    std::string name;
    std::unique_ptr<uint8_t> stack;
};

Each thread's stack and name will be tracked with the internal thread object. We'll then create a std::vector of tx_thread_t to keep track of our dispatch threads:

std::vector<tx_thread_t> threads_;

Making Our Dispatch Queue Thread-Safe

Our dispatch queue is a shared resource in two potential directions:

  • Any thread can add work to the queue
  • The queue may have multiple threads which remove work from the queue for processing

In order to make sure we implement this safely, we must rely on a locking mechanism. In this case we will utilize ThreadX's builtin mutex type:

TX_MUTEX mutex_;

The queue itself is the critical piece, so we will lock around all queue modifications.

Constructing Our Dispatch Queue

Our ThreadX dispatch queue constructor is responsible for instantiating three components:

  1. The internal mutex which protects the work queue
  2. The event flags which wake the threads
  3. The worker threads

Our constructor prototype will also take an additional function argument: thread_stack_size. This can have a default value (such as 1KB) or can be specified during construction.

dispatch_queue(std::string name, size_t thread_cnt = 1, size_t thread_stack_size) :
    name_(name), threads_(thread_cnt)

Creating the mutex and event flags structures involve straightforward ThreadX calls:

// Initialize the Mutex
uint8_t status = tx_mutex_create(&mutex_, "Dispatch Mutex", TX_INHERIT);

// Create the event flags
status = tx_event_flags_create(&notify_flags_, "Dispatch Event Flags");

When constructing our dispatch queue, we can specify the number of threads desired. Our constructor does the work of creating the required number of tx_thread_t objects in our std::vector container. For each thread, we'll need to allocate a stack, create a unique thread name, and then finally create the thread.

In order for ThreadX to find its way to the correct dispatch_queue object, we'll utilize BOUNCE to make sure we get back to the correct object:

reinterpret_cast<void(*)(ULONG)>(
    BOUNCE(dispatch_queue, dispatch_thread_handler)), reinterpret_cast<ULONG>(this),

Here's our full thread initialization loop:

// Dispatch thread setup
for(size_t i = 0; i < threads_.size(); i++)
{
    // allocate the thread stack
    threads_[i].stack.reset(new uint8_t[thread_stack_size]);

    // Define the name
    threads_[i].name = std::string("Dispatch Thread "
        + std::to_string(i));

    // Create and autostart the thread
    status = tx_thread_create(&threads_[i].thread,
            threads_[i].name.c_str(),
            reinterpret_cast<void(*)(ULONG)>(
                BOUNCE(dispatch_queue, dispatch_thread_handler)),
            reinterpret_cast<ULONG>(this),
            threads_[i].stack.get(), thread_stack_size,
            DISPATCH_Q_PRIORITY, DISPATCH_Q_PRIORITY,
            DISPATCH_TIME_SLICE,
            TX_AUTO_START);
    assert(status == TX_SUCCESS && "Failed to create thread!");
}

Note that the tx_thread_create function requires you to specify a thread priority and a time slice value. For this example I've defined two default values:

/// Example thread priority and time slice
#define DISPATCH_Q_PRIORITY 15
#define DISPATCH_TIME_SLICE 5

For further discussion on selecting thread priority and time slice, see Thread Priorities and Time Slicing below.

Dispatch Thread Handler Requirements

The dispatch queue worker thread handler should be a simple one. Its only requirements are:

  1. Wait until there is something to run
  2. Pop that item from the queue
  3. Run the item
  4. Check whether I need to quit, if not: wait again

Once we understand our requirements for the worker threads, we encounter a question: how do I know that there's something to execute without keeping these threads awake?

Event Flags: Our Condition Variable Replacement

Instead of using std::condition_variable to wake threads when work is ready, we will utilize the ThreadX builtin event flags type:

/// TX event flags - like condition variable
TX_EVENT_FLAGS_GROUP notify_flags_;

We will define two event flags for use by the queue. One flag will tell threads to wake up, and the other flag will be set when a thread exits.

/// Definitions for dispatch event flags
#define DISPATCH_WAKE_EVT    (0x1)
#define DISPATCH_EXIT_EVT    (0x2)

Adding Work to the Queue

We can let our threads sleep until there is work in the queue. By setting an event flag, the next available thread will wake up, remove work from the queue, and execute.

The mutex will always protect our queue, so we need to lock and unlock before pushing a new piece of work onto the queue.

void dispatch_queue::dispatch(const fp_t& op)
{
    uint8_t status = tx_mutex_get(&mutex_, TX_WAIT_FOREVER);
    assert(status == TX_SUCCESS && "Failed to lock mutex!");

    q_.push(op);

    status = tx_mutex_put(&mutex_);
    assert(status == TX_SUCCESS && "Failed to unlock mutex!");

    // Notifies threads that new work is present in the queue
    tx_event_flags_set(&notify_flags_, DISPATCH_WAKE_EVT, TX_OR);
}

Exiting

The next question is: how do I know when to stop running and exit?

The simplest way is to add an exit_ or active_ boolean flag to our dispatch queue. When instructed to stop() or when destructing the queue, you can set this flag, notify threads that they need to wake up, and wait for confirmation that they have finished.

Because ThreadX does not have its own "join" function, we will imitate the behavior. We'll tell threads to wake up until we have confirmation that every thread was destroyed. We set the "wake" flag to wake up any remaining threads, and we wait for an "exit" event. Because we are not guaranteed that threads will exit in order, we will utilize a timeout on with tx_event_flags_get. This timeout allows us to continue through our loop even if all threads have exited.

dispatch_queue::~dispatch_queue()
{
    // Signal to dispatch threads that it's time to wrap up
    quit_ = true;

    // We will join each thread to confirm exiting
    for (size_t i = 0; i < threads_.size(); ++i) {
        UINT state;
        ULONG flags;
        do {
            // Signal wake - check exit flag
            tx_event_flags_set(&notify_flags_,
                DISPATCH_WAKE_EVT,
                TX_OR);

            // Wait until a thread signals exit.
            // Timeout is acceptable - all threads may have exited
            tx_event_flags_get(&notify_flags_,
                DISPATCH_EXIT_EVT,
                TX_OR_CLEAR,
                &flags, 10);

            // If it was not thread_[i], that is ok,
            // but we will loop around
            // until threads_[i] has exited
            tx_thread_info_get(&threads_[i].thread, nullptr, &state,
                    nullptr, nullptr, nullptr, nullptr,
                    nullptr, nullptr);
        } while (state != TX_COMPLETED);

        // threads_[i] has exited - let's delete it
        // and move on to the next one
        status = tx_thread_delete(&threads_[i].thread);
        assert(status == TX_SUCCESS && "Failed to delete thread!");
        threads_[i].name.clear();
        threads_[i].stack.reset();
    }

    // Cleanup event flags and mutex
    tx_event_flags_delete(&notify_flags_);

    status = tx_mutex_delete(&mutex_);
    assert(status == TX_SUCCESS && "Failed to delete mutex!");
}

We can then add state checking to the thread handler. The thread handler can monitor the quit_ flag and exit when instructed.

The thread handler will also need to set the DISPATCH_EXIT_EVT flag when quitting to work with the logic shown above:

// Set a signal to indicate a thread exited
status = tx_event_flags_set(&notify_flags_,
    DISPATCH_EXIT_EVT,
    TX_OR);
assert(status == TX_SUCCESS && "Failed to set event flags!");

Dispatch Thread Handler Implementation

In our worker thread, we primarily wait until we receive a notification that there is new work. Upon waking, the thread will take the lock, get an item from the queue, and resume operation.

If there is no work to execute, we will release the lock and sleep. The thread wakes after receiving a notification that there is work in the queue.

void dispatch_queue::dispatch_thread_handler(void)
{
    ULONG flags;

    uint8_t status = tx_mutex_get(&mutex_, TX_WAIT_FOREVER);
    assert(status == TX_SUCCESS && "Failed to lock mutex!");

    do {
        //after wait, we own the lock
        if(q_.size() && !quit_)
        {
            auto op = std::move(q_.front());
            q_.pop();

            //unlock now that we're done messing with the queue
            status = tx_mutex_put(&mutex_);
            assert(status == TX_SUCCESS &&
                "Failed to unlock mutex!");

            op();

            status = tx_mutex_get(&mutex_, TX_WAIT_FOREVER);
            assert(status == TX_SUCCESS && "Failed to lock mutex!");
        }
        else if(!quit_)
        {
            status = tx_mutex_put(&mutex_);
            assert(status == TX_SUCCESS &&
                "Failed to unlock mutex!");

            // Wait for new work
            status = tx_event_flags_get(&notify_flags_,
                    DISPATCH_WAKE_EVT,
                    TX_OR_CLEAR,
                    &flags, TX_WAIT_FOREVER);
            assert(status == TX_SUCCESS &&
                "Failed to get event flags!");

            status = tx_mutex_get(&mutex_, TX_WAIT_FOREVER);
            assert(status == TX_SUCCESS && "Failed to lock mutex!");
        }
    } while (!quit_);

    // We were holding the mutex after we woke up
    status = tx_mutex_put(&mutex_);
    assert(status == TX_SUCCESS && "Failed to unlock mutex!");

    // Set a signal to indicate a thread exited
    status = tx_event_flags_set(&notify_flags_,
        DISPATCH_EXIT_EVT,
        TX_OR);
    assert(status == TX_SUCCESS && "Failed to set event flags!");
}

Thread Priorities and Time Slicing

Selecting thread priorities and ensuring your system runs smoothly without any priority inversions can be a difficult task. In general, your dispatch queue needs a mid-level or low thread priority. If the priority of the dispatch queue threads is too high, low-priority asynchronous work may end up starving the CPU and blocking primary system threads from running.

If you need queues of differing priorities, you can always create multiple queues and utilize different priorities in each queue.

Set time-slicing to a non-zero value. Since we have multiple dispatch queues running at equal priorities, we need to utilize time-slicing to prevent the scheduler from running a single dispatch thread and blocking the others. Time-slicing enables us to rotate from worker thread to worker thread as long as the threads are active.

You can tune the time-slice value to ensure the dispatch queues only utilize a certain percentage of the CPU's cycles. This example uses a low time-slice value and gives equal priority to each dispatch thread.

Putting it all Together

I've added the complete ThreadX dispatch queue implementation to GitHub.

Because the example uses ThreadX function calls, I have only built the example as a static library. It will not link or execute unless you supply a ThreadX library for your platform.

Further Reading:

Change Log

  • 20181219:
    • Updated links to open in external tabs
    • Added more links to Further Reading section
    • Improved grammar