FreeRTOS

FreeRTOS Task Notifications: A Lightweight Method for Waking Threads

I was recently implementing a FreeRTOS-based system and needed a simple way to wake my thread from an ISR. I was poking around the FreeRTOS API manual looking at semaphores when I discovered a new feature: task notifications.

FreeRTOS claims that waking up a task using the new notification system is ~45% faster and uses less RAM than using a binary semaphore.

The following APIs are used to interact with task notifications:

The notification system utilizes the 32-bit task handle (TaskHandle_t) that is set during the task creation process:

BaseType_t xTaskCreate(    TaskFunction_t pvTaskCode,
                            const char * const pcName,
                            unsigned short usStackDepth,
                            void *pvParameters,
                            UBaseType_t uxPriority,
                            TaskHandle_t *pxCreatedTask
                          );

Task notifications can be used to emulate mailboxes, binary semaphores, counting semaphores, and event groups.

While task notifications have speed and RAM advantages over other FreeRTOS structures, they are limited in that they can only be used to wake a single task with an event. If multiple tasks must be woken by an event, you must utilize the traditional structures.

Real World Example

I took advantage of the task notifications to implement a simple thread-waking scheme.

I have a button which can be used to request a system power state change. The power state changes can take quite a bit of time, and we don't want to be locked up inside of our interrupt handler for 1-10s.

Instead, I created a simple thread which sleeps until a notification is received. When the thread wakes, we check to see if there is an action we need to take:

void powerTaskEntry(__unused void const* argument)
{
    static uint32_t thread_notification;

    while(1)
    {
        /* Sleep until we are notified of a state change by an 
        * interrupt handler. Note the first parameter is pdTRUE, 
        * which has the effect of clearing the task's notification 
        * value back to 0, making the notification value act like
        * a binary (rather than a counting) semaphore.  */

        thread_notification = ulTaskNotifyTake(pdTRUE, 
                        portMAX_DELAY);

        if(thread_notification)
        {
            system_power_evaluate_state();
        }
    }
}

In order to wake the thread, we send a notification from the interrupt handler:

void system_power_interrupt_handler(uint32_t time)
{
    BaseType_t xHigherPriorityTaskWoken = pdFALSE;

    if(time < SYS_RESET_SHUTDOWN_THRESHOLD_MS)
    {
        // Perform a reset of computers
        computer_reset_request_ = RESET_ALL_COMPUTERS;
    }
    else if((time >= SYS_RESET_SHUTDOWN_THRESHOLD_MS) &&
            (time < SYS_RESET_HARD_CUTOFF_THRESHOLD_MS))
    {
        system_power_request_state(SYSTEM_POWER_LOW);
    }
    else
    {
        system_power_request_state(SYSTEM_POWER_OFF_HARD);
    }

    // Notify the thread so it will wake up when the ISR is complete
    vTaskNotifyGiveFromISR(powerTaskHandle, 
                         &xHigherPriorityTaskWoken);
    portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
}

Note the use of portYIELD_FROM_ISR(). This is required when waking a task from an interrupt handler. If vTaskNotifyGiveFromISR indicates that a higher priority task is being woken, the portYIELD_FROM_ISR() routine will context switch to that task after returning from the ISR.

Failure to use this function will result in execution will resuming at the previous point rather than switching to the new context.

Further Reading

Implementing std::mutex with FreeRTOS

Last week we looked at an implementation of std::mutex using the ThreadX RTOS. We will build upon the work in the previous article and add support for FreeRTOS.

In this edition, we will only be creating __external_threading definitions for FreeRTOS. If you are interested in the underlying work, please refer to the ThreadX RTOS port for more information.

Table of Contents

  1. A Review of FreeRTOS Mutex Support
  2. Implementing std::mutex with FreeRTOS
    1. Populating __external_threading_freertos
    2. Implementing the Recursive Mutex Shims
    3. Implementing the Mutex Shims
    4. Breaking with the Standard: Initializing std::mutex
  3. Building Our Custom std::mutex
  4. Putting It All Together
  5. Further Reading

A Review of FreeRTOS Mutex Support

Unlike ThreadX, FreeRTOS differentiates between recursive and non-recursive mutexes. However, FreeRTOS uses the SemaphoreHandle_t as a shared handle type for semaphores, mutexes, and recursive mutexes.

FreeRTOS uses the following APIs to interact with a non-recursive mutex:

SemaphoreHandle_t xSemaphoreCreateMutex( void );
SemaphoreHandle_t xSemaphoreCreateMutexStatic(
        StaticSemaphore_t *pxMutexBuffer );
xSemaphoreTake( SemaphoreHandle_t xSemaphore,
                 TickType_t xTicksToWait );
xSemaphoreGive( SemaphoreHandle_t xSemaphore );

FreeRTOS uses the following APIs to interact with a recursive mutex:

SemaphoreHandle_t xSemaphoreCreateRecursiveMutex( void );
SemaphoreHandle_t xSemaphoreCreateRecursiveMutexStatic(
                     StaticSemaphore_t *pxMutexBuffer );
xSemaphoreTakeRecursive( SemaphoreHandle_t xMutex,
        TickType_t xTicksToWait );
xSemaphoreGiveRecursive( SemaphoreHandle_t xMutex );

Both recursive and non-recursive mutexes use the same deleter function:

void vSemaphoreDelete( SemaphoreHandle_t xSemaphore );

When attempting to claim the mutex, both the recursive and non-recursive "Take" functions allow you to specify a number of ticks to wait before failing. A value of 0 indicates no waiting. Unlike ThreadX, there is no "wait forever", but FreeRTOS defines portMAX_DELAY to represent the longest timeout available on the system. Note that our "Take" calls can timeout over extremely long periods of time.

All of the mutex functions shown above are available when including the semphr.h header.

Implementing std::mutex with FreeRTOS

Now that we've familiarized ourselves with the FreeRTOS mutex APIs, let's get started with our std::mutex port.

Thanks to a well-designed shim layer, we can build off of the ThreadX std::mutex implementation and focus only on the FreeRTOS shims.

We need to adjust our __external_threading file for FreeRTOS:

#ifndef _LIBCPP_EXTERNAL_THREADING_SUPPORT
#define _LIBCPP_EXTERNAL_THREADING_SUPPORT

#if THREADX
#include <__external_threading_threadx>
#elif FREERTOS
#include <__external_threading_freertos>
#endif

#endif //_LIBCPP_EXTERNAL_THREADING_SUPPORT

An alternative approach is to create an __external_threading file inside of separate include folders. When compiling for different RTOSes, you simply change the include path so that each RTOS implementation is paired with the correct __external_threading header.

Populating __external_threading_freertos

We'll create the __external_threading_freertos file for storing our custom threading definitions.

First we'll include the FreeRTOS and semaphore headers:

#include <freertos/FreeRTOS.h>
#include <freertos/semphr.h>

We'll include some boilerplate which is also defined in __threading_support:

#ifndef _LIBCPP_HAS_NO_PRAGMA_SYSTEM_HEADER
#pragma GCC system_header
#endif

_LIBCPP_PUSH_MACROS
#include <__undef_macros>

#if defined(__FreeBSD__) && defined(__clang__) && __has_attribute(no_thread_safety_analysis)
#define _LIBCPP_NO_THREAD_SAFETY_ANALYSIS __attribute__((no_thread_safety_analysis))
#else
#define _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
#endif

_LIBCPP_BEGIN_NAMESPACE_STD

#define _LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_FUNC_VIS

Supplying the mutex definitions for FreeRTOS is straightforward:

// Mutex
typedef SemaphoreHandle_t __libcpp_mutex_t;

#define _LIBCPP_MUTEX_INITIALIZER 0

// FreeRTOS Mutex requires a function to initialize
#define _MUTEX_REQUIRES_INITIALIZATION 1

typedef SemaphoreHandle_t __libcpp_recursive_mutex_t;

Like the ThreadX std::mutex initialization, we'll need to use a non-constexpr constructor for std::mutex. We define _MUTEX_REQUIRES_INITIALIZATION to enable this support.

We're not quite ready to port std::condition_variable or std::thread yet, so we'll import the generic type definitions from the "no pthread API" case in __threading_support:

// Condition Variable
typedef void* __libcpp_condvar_t;
#define _LIBCPP_CONDVAR_INITIALIZER 0

// Execute Once
typedef void* __libcpp_exec_once_flag;
#define _LIBCPP_EXEC_ONCE_INITIALIZER 0

// Thread ID
typedef long __libcpp_thread_id;

// Thread
#define _LIBCPP_NULL_THREAD 0U

typedef void* __libcpp_thread_t;

// Thread Local Storage
typedef long __libcpp_tls_key;

#define _LIBCPP_TLS_DESTRUCTOR_CC __stdcall

In coming articles we'll work on porting std::thread and possibly std::condition_variable, so these definitions will be updated. For now I'm just focused on std::mutex.

I also imported the shim function prototypes from __threading_support, including our custom _libcpp_mutex_init function:

// Mutex
_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_recursive_mutex_init(__libcpp_recursive_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_recursive_mutex_lock(__libcpp_recursive_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
bool __libcpp_recursive_mutex_trylock(__libcpp_recursive_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_recursive_mutex_unlock(__libcpp_recursive_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_recursive_mutex_destroy(__libcpp_recursive_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_mutex_init(__libcpp_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_mutex_lock(__libcpp_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
bool __libcpp_mutex_trylock(__libcpp_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_mutex_unlock(__libcpp_mutex_t *__m);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_mutex_destroy(__libcpp_mutex_t *__m);

I also imported the std::thread and std::condition_variable shims so the compiler will be happy:

// Condition variable
_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_condvar_signal(__libcpp_condvar_t* __cv);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_condvar_broadcast(__libcpp_condvar_t* __cv);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_condvar_wait(__libcpp_condvar_t* __cv, __libcpp_mutex_t* __m);

_LIBCPP_THREAD_ABI_VISIBILITY _LIBCPP_NO_THREAD_SAFETY_ANALYSIS
int __libcpp_condvar_timedwait(__libcpp_condvar_t *__cv, __libcpp_mutex_t *__m,
                               timespec *__ts);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_condvar_destroy(__libcpp_condvar_t* __cv);

// Execute once
_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_execute_once(__libcpp_exec_once_flag *flag,
                          void (*init_routine)(void));

// Thread id
_LIBCPP_THREAD_ABI_VISIBILITY
bool __libcpp_thread_id_equal(__libcpp_thread_id t1, __libcpp_thread_id t2);

_LIBCPP_THREAD_ABI_VISIBILITY
bool __libcpp_thread_id_less(__libcpp_thread_id t1, __libcpp_thread_id t2);

// Thread
_LIBCPP_THREAD_ABI_VISIBILITY
bool __libcpp_thread_isnull(const __libcpp_thread_t *__t);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_thread_create(__libcpp_thread_t *__t, void *(*__func)(void *),
                           void *__arg);

_LIBCPP_THREAD_ABI_VISIBILITY
__libcpp_thread_id __libcpp_thread_get_current_id();

_LIBCPP_THREAD_ABI_VISIBILITY
__libcpp_thread_id __libcpp_thread_get_id(const __libcpp_thread_t *__t);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_thread_join(__libcpp_thread_t *__t);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_thread_detach(__libcpp_thread_t *__t);

_LIBCPP_THREAD_ABI_VISIBILITY
void __libcpp_thread_yield();

_LIBCPP_THREAD_ABI_VISIBILITY
void __libcpp_thread_sleep_for(const chrono::nanoseconds& __ns);

// Thread local storage
_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_tls_create(__libcpp_tls_key* __key,
                        void(_LIBCPP_TLS_DESTRUCTOR_CC* __at_exit)(void*));

_LIBCPP_THREAD_ABI_VISIBILITY
void *__libcpp_tls_get(__libcpp_tls_key __key);

_LIBCPP_THREAD_ABI_VISIBILITY
int __libcpp_tls_set(__libcpp_tls_key __key, void *__p);

Implementing the Recursive Mutex Shims

The recursive mutex shim functions are just wrappers around our underlying FreeRTOS recursive mutex calls:

int __libcpp_recursive_mutex_init(__libcpp_recursive_mutex_t *__m)
{
  *__m = xSemaphoreCreateRecursiveMutex();

  return (*__m == NULL);
}

int __libcpp_recursive_mutex_lock(__libcpp_recursive_mutex_t *__m)
{
  return xSemaphoreTakeRecursive(*__m, portMAX_DELAY);
}

bool __libcpp_recursive_mutex_trylock(__libcpp_recursive_mutex_t *__m)
{
  //intentional no wait for try_lock
  return xSemaphoreTakeRecursive(*__m, 0);
}

int __libcpp_recursive_mutex_unlock(__libcpp_mutex_t *__m)
{
  return xSemaphoreGiveRecursive(*__m);
}

int __libcpp_recursive_mutex_destroy(__libcpp_recursive_mutex_t *__m)
{
  vSemaphoreDelete(*__m);

  return 0;
}

Implementing the Mutex Shims

The basic mutex shim functions are just wrappers around our underlying FreeRTOS non-recursive mutex calls:

int __libcpp_mutex_init(__libcpp_mutex_t *__m)
{
  *__m = xSemaphoreCreateMutex();

  return (*__m == NULL);
}

int __libcpp_mutex_lock(__libcpp_mutex_t *__m)
{
  return xSemaphoreTake(*__m, portMAX_DELAY);
}

bool __libcpp_mutex_trylock(__libcpp_mutex_t *__m)
{
  //intentional no wait for try_lock
 return xSemaphoreTake(*__m, 0);
}

int __libcpp_mutex_unlock(__libcpp_mutex_t *__m)
{
  return xSemaphoreGive(*__m);
}

int __libcpp_mutex_destroy(__libcpp_mutex_t *__m)
{
  vSemaphoreDelete(*__m);

  return 0;
}

Building Our Custom std::mutex

A few compilation options need to be set in order to build std::mutex with support for FreeRTOS. Since a variety of build systems are in use, I am only providing general build strategies.

First, we'll need to set _LIBCPP_HAS_THREAD_API_EXTERNAL so that the compiler looks for the __external_threading header.

-D _LIBCPP_HAS_THREAD_API_EXTERNAL

If you're using the __external_threading implementation that will support multiple RTOSes, you'll also need to set a FREERTOS definition:

-DFREERTOS=1

You'll also want to set the following compiler flags so that the default C++ libraries are not linked:

-fno-builtin -nodefaultlibs

As well as the following link options:

-nodefaultlibs

Because we didn't include all the cpp headers, we need to point our compiler to the include location for other C++ headers. Make sure your local path is placed ahead of the mainstream path so that your compiler picks it up first.

Here's an example if you're compiling with clang on OSX:

-I/path/to/src/include -I/usr/local/opt/llvm/include/c++/v1/

Putting It All Together

I've included my example std::mutex implementation in the embedded-resources GitHub repository. The implementation can be found in examples/libcpp.

The example is currently built as a static library. Only FreeRTOS headers are included in the repository, so the current example is only runnable if you have a FreeRTOS implementation.

To compile the example, simply run:

cd examples/libcpp
make

Further Reading

Implementing an Asynchronous Dispatch Queue with FreeRTOS

Updated: 2018-12-19

Last week I provided an implementation of a dispatch queue using ThreadX RTOS primitives. In this article, I'll provide an example C++ dispatch queue implementation using the popular FreeRTOS.

We'll start with a review of what dispatch queues are. If you're familiar with them, feel free to skip to the following section.

A Review of Dispatch Queues

A dispatch queue contains multiple generic-use threads and a work queue. Consumers can dispatch standalone functional operations to the work queue. Each thread pulls from the work queue (or sleeps and waits for new work).

To quote Apple on the advantages of using dispatch queues instead of threads:

  • It reduces the memory penalty your application pays for storing thread stacks in the application’s memory space.
  • It eliminates the code needed to create and configure your threads.
  • It eliminates the code needed to manage and schedule work on threads.
  • It simplifies the code you have to write.

These benefits are pretty real and tangible. As we saw in The Problem With Threads", threading introduces nondeterminism into our system. By controlling our threading models using concurrent and serial dispatch queues, we gain a better grasp on the nondeterminism of our system.

The dispatch queue concept simplifies many of the threading scenarios encountered in embedded programming. Often, I just need to run small simple tasks asynchronously without blocking the primary thread. This results in spawning numerous threads with single small purposes:

  • When user presses a button, update the drawing on the screen
  • When charging is complete, change LEDs and notify the system
  • When recording starts, turn on an LED and start drawing the elapsed record time on the screen

These simple steps can run on any generic thread. These trivial operations don't require the overhead of explicit thread management, excessive context switching, and higher potential for other threading errors.

A C++11 and FreeRTOS Dispatch Queue

We'll be modifying the [C++11 dispatch queue implementation][12] to use FreeRTOS RTOS primitives instead of the C++11 types std::mutex, std::thread, and std::condition_variable. We will stick to C++11 features in places where RTOS primitives are not required.

FreeRTOS Requirements

The asynchronous dispatch queue shown below requires the following FreeRTOS headers:

#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#include <freertos/event_groups.h>
#include <freertos/semphr.h>

The following FreeRTOSConfig.h settings are required for this project:

  • configSUPPORT_DYNAMIC_ALLOCATION is set to 1 (or undefined)
  • INCLUDE_eTaskGetState is set to 1
  • configUSE_TIME_SLICING is set to 1

Additionally, make sure that FreeRTOS/source/event_groups.c is included in your build.

std::function refresher

std::function is a useful C++11 feature for capturing Callable objects. As a refresher:

Instances of std::function can store, copy, and invoke any Callable target -- functions, lambda expressions, bind expressions, or other function objects, as well as pointers to member functions and pointers to data members.

For this example, we will prototype our function objects as:

typedef std::function<void(void)> fp_t;

Bounce Refresher

FreeRTOS is implemented in C, and our dispatch queue is being implemented in C++. We'll need to utilize the bounce function to make sure our FreeRTOS thread interfaces with the correct object's dispatch handler. For more information on the bounce function, please see the bounce article.

Here's the implementation of bounce that we will use:

/// This Bounce implementation is pulled from bounce.cpp
template<class T, class Method, Method m, class ...Params>
static auto bounce(void *priv, Params... params) ->
        decltype(((*reinterpret_cast<T *>(priv)).*m)(params...))
{
    return ((*reinterpret_cast<T *>(priv)).*m)(params...);
}

/// Convenience macro to simplify bounce statement usage
#define BOUNCE(c,m) bounce<c, decltype(&c::m), &c::m>

A queue of functions

The primary purpose of using a dispatch queue is to provide a first-in, first-out processing model.

C++ luckily provides us a simple std::queue type which we can use for this purpose:

std::queue<fp_t> q_;

To add to the queue we push:

q_.push(op);

And to get the next item:

auto op = q_.front(); //get the front item
q_.pop(); //and pop it from the queue

Allocating Queue Threads

Our goal is to make our dispatch queue generic enough that we can change the number of threads for each queue we create. This allows us to create concurrent queues that allow generic tasks to run in parallel, as well as serial queues that only utilize one thread to protect a resource.

Instead of using a std::vector of std::thread, we'll instead build a container based on the FreeRTOS type TaskHandle_t:

/// Thread type
struct freertos_thread_t {
    TaskHandle_t thread;
    std::string name;
};

Each thread's stack and name will be tracked with the internal thread object. We'll then create a std::vector of freertos_thread_t to keep track of our dispatch threads:

std::vector<freertos_thread_t> threads_;

Making Our Dispatch Queue Thread-Safe

Our dispatch queue is a shared resource in two potential directions:

  • Any thread can add work to the queue
  • The queue may have multiple threads which remove work from the queue for processing

In order to make sure we implement this safely, we must rely on a locking mechanism. In this case we will utilize FreeRTOS's "semaphore" type, which also works as a mutex:

SemaphoreHandle_t mutex_;

The queue itself is the critical piece, so we will lock around all queue modifications.

Constructing Our Dispatch Queue

Our FreeRTOS dispatch queue constructor is responsible for instantiating three components:

  1. The internal mutex which protects the work queue
  2. The event flags which wake the threads
  3. The worker threads

Our constructor prototype will also take an additional function argument: thread_stack_size. This can have a default value (such as 1KB). You can also specify a custom value during construction.

dispatch_queue(std::string name, size_t thread_cnt = 1,
    size_t thread_stack_size) :
            name_(name),
        threads_(thread_cnt)

Creating the mutex and event flags structures involve straightforward FreeRTOS calls:

// Create the Mutex
mutex_ = xSemaphoreCreateRecursiveMutex();
assert(mutex_ != NULL && "Failed to create mutex!");

// Create the event flags
notify_flags_ = xEventGroupCreate();
assert(notify_flags_ != NULL && "Failed to create event group!");

When constructing our dispatch queue, we can specify the number of threads desired. Our constructor does the work of creating the required number of freertos_thread_t objects in our std::vector container. For each thread, we'll need to create a unique thread name and create the thread.

In this example, I've chosen the xTaskCreate API, which allocates thread stacks from the heap automatically. You can also use the xTaskCreateStatic API if you wish to provide your own thread stack buffers.

In order for FreeRTOS to find its way to the correct dispatch_queue object, we'll utilize BOUNCE to make sure we get back to the correct object:

reinterpret_cast<void(*)(ULONG)>(
    BOUNCE(dispatch_queue, dispatch_thread_handler)),
reinterpret_cast<ULONG>(this),

Here's our full thread initialization loop:

// Dispatch thread setup
for(size_t i = 0; i < threads_.size(); i++)
{
    // Define the name
    threads_[i].name = std::string("Dispatch Thread " +
        std::to_string(i));

    // Create the thread
    BaseType_t status = xTaskCreate(
            reinterpret_cast<void(*)(void*)>(
                BOUNCE(dispatch_queue,
                dispatch_thread_handler)),
            threads_[i].name.c_str(),
            thread_stack_size,
            reinterpret_cast<void*>(this),
            DISPATCH_Q_PRIORITY,
            &threads_[i].thread);
    assert(status == pdPASS && "Failed to create thread!");
}

Note that the xTaskCreate function requires you to specify a thread priority. For this example I've defined a default values:

/// Example thread priority and time slice
#define DISPATCH_Q_PRIORITY 15

For further discussion on selecting thread priority, see Thread Priorities and Time Slicing below.

Dispatch Thread Handler Requirements

The dispatch queue worker thread handler should be a simple one. Its only requirements are:

  1. Wait until there is something to run
  2. Pop that item from the queue
  3. Run the item
  4. Check whether I need to quit, if not: wait again

Once we understand our requirements for the worker threads, we encounter a question: how do I know that there's something to execute without keeping these threads awake?

Event Flags: Our Condition Variable Replacement

Instead of using std::condition_variable to wake threads when work is ready, we will utilize the FreeRTOS builtin event flags type:

/// FreeRTOS event flags - like condition variable
EventGroupHandle_t notify_flags_;

We will define two event flags to be used by the queue. One flag will tell threads to wake up, and the other flag will be set when a thread exits.

/// Definitions for dispatch event flags
#define DISPATCH_WAKE_EVT    (0x1)
#define DISPATCH_EXIT_EVT    (0x2)

Adding Work to the Queue

We can let our threads sleep until there is work in the queue. By setting an event flag, the next available thread will wake up, remove work from the queue, and execute.

The mutex will always protect our queue, so we need to lock and unlock before pushing a new piece of work onto the queue.

void dispatch_queue::dispatch(const fp_t& op)
{
    BaseType_t status = xSemaphoreTakeRecursive(mutex_,
        portMAX_DELAY);
    assert(status == pdTRUE && "Failed to lock mutex!");

    q_.push(op);

    status = xSemaphoreGiveRecursive(mutex_);
    assert(status == pdTRUE && "Failed to unlock mutex!");

    // Notifies threads that new work is in the queue
    xEventGroupSetBits(notify_flags_, DISPATCH_WAKE_EVT);
}

Exiting

The next question is: how do I know when to stop running and exit?

The simplest way is to add an exit_ or active_ boolean flag to our dispatch queue. When instructed to stop() or when destructing the queue, you can set this flag, notify threads that they need to wake up, and wait for confirmation that they have finished.

Because FreeRTOS does not have its own "join" function, we will imitate the behavior. We'll tell threads to wake up until we have confirmation that every thread is destroyed. We set the "wake" flag to wake up any remaining threads, and we wait for an "exit" event. Because we are not guaranteed that threads will exit in order, we will utilize a timeout on with xEventGroupWaitBits. This timeout allows us to continue through our loop even if all threads have exited.

Each thread will delete itself once woken by the exit notification, so our "join" emulation will wait for each thread to report an eDeleted status.

dispatch_queue::~dispatch_queue()
{
    BaseType_t status;

    // Signal to dispatch threads that it's time to wrap up
    quit_ = true;

    // We will join each thread to confirm exiting
    for (size_t i = 0; i < threads_.size(); ++i) {
        eTaskState state;

        do {
            // Signal wake - check exit flag
            xEventGroupSetBits(notify_flags_,
                DISPATCH_WAKE_EVT);

            // Wait until a thread signals exit.
            // Timeout is acceptable.
            xEventGroupWaitBits(notify_flags_,
                DISPATCH_EXIT_EVT,
                pdTRUE, pdFALSE, 10);

            // If it was not thread_[i], that is ok,
            // but we will loop around
            // until threads_[i] has exited
            state = eTaskGetState(threads_[i].thread);
        } while (state != eDeleted);

        threads_[i].name.clear();
    }

    // Cleanup event flags and mutex
    vEventGroupDelete(notify_flags_);

    vSemaphoreDelete(mutex_);
}

We can then add state checking to the thread handler. The thread handler can monitor the quit_ flag and exit when requested.

The thread handler will also need to set the DISPATCH_EXIT_EVT flag when quitting to work with the logic shown above:

// Set a signal to indicate a thread exited
status = xEventGroupSetBits(notify_flags_, DISPATCH_EXIT_EVT);
assert(status == pdTRUE && "Failed to set event flags!");

After setting the notification, the worker thread then deletes itself:

// Delete the current thread
vTaskDelete(NULL);

Dispatch Thread Handler Implementation

In our worker thread, we primarily sleep until is new work. Upon waking, the thread will take the lock, get an item from the queue, and resume operation.

If there is no work to execute, we will release the lock and sleep until new work is in the queue.

void dispatch_queue::dispatch_thread_handler(void)
{
    BaseType_t status = xSemaphoreTakeRecursive(mutex_,
        portMAX_DELAY);
    assert(status == pdTRUE && "Failed to lock mutex!");

    do {
        //after wait, we own the lock
        if(q_.size() && !quit_)
        {
            auto op = std::move(q_.front());
            q_.pop();

            //unlock now that we're done messing with the queue
            status = xSemaphoreGiveRecursive(mutex_);
            assert(status == pdTRUE && "Failed to unlock mutex!");

            op();

            status = xSemaphoreTakeRecursive(mutex_,
                portMAX_DELAY);
            assert(status == pdTRUE && "Failed to lock mutex!");
        }
        else if(!quit_)
        {
            status = xSemaphoreGiveRecursive(mutex_);
            assert(status == pdTRUE && "Failed to unlock mutex!");

            // Wait for new work - clear flags on exit
            xEventGroupWaitBits(notify_flags_,
                DISPATCH_WAKE_EVT,
                pdTRUE, pdFALSE,
                portMAX_DELAY);

            status = xSemaphoreTakeRecursive(mutex_,
                portMAX_DELAY);
            assert(status == pdTRUE && "Failed to lock mutex!");
        }
    } while (!quit_);

    // We were holding the mutex after we woke up
    status = xSemaphoreGiveRecursive(mutex_);
    assert(status == pdTRUE && "Failed to unlock mutex!");

    // Set a signal to indicate a thread exited
    status = xEventGroupSetBits(notify_flags_,
        DISPATCH_EXIT_EVT);
    assert(status == pdTRUE && "Failed to set event flags!");

    // Delete the current thread
    vTaskDelete(NULL);
}

Thread Priorities and Time Slicing

Selecting thread priorities and ensuring your system runs smoothly without any priority inversions can be a difficult task. In general, your dispatch queue should have a mid-level or low thread priority. If the priority of the dispatch queue is too high, low-priority asynchronous work may end up starving the CPU and blocking primary system threads from running.

If you need queues of differing priorities, you can always create multiple queues and utilize different priorities in each queue.

FreeRTOS does not enable time slicing by default. You'll need to set configUSE_TIME_SLICING to 1 so that threads of equal priority will share available processing time fairly.

Putting it all Together

I've added the complete FreeRTOS dispatch queue implementation to GitHub.

Because the example uses FreeRTOS function calls, I have only built the example as a static library. It will not link or execute unless you supply a FreeRTOS library for your platform.

For how to use the dispatch queue, please see the original dispatch example:

dispatch_queue q("Phillip's Demo Dispatch Queue", 4);

q.dispatch([]{printf("Dispatch 1!\n");});
q.dispatch([]{printf("Dispatch 2!\n");});
q.dispatch([]{printf("Dispatch 3!\n");});
q.dispatch([]{printf("Dispatch 4!\n");});

Further Reading:

Change Log

  • 20181219:
    • Updated links to open in external tabs
    • Added more links to Further Reading section
    • Improved grammar