Implementing an Asynchronous Dispatch Queue

Previously, I introduced the concept of dispatch queues. Here’s a quick review of what a dispatch queue is, in case you haven’t read the previous article: The dispatcher contains multiple generic-use threads and a work queue. Threads can dispatch single functional operations to run asynchronously on the dispatch work threads.

Apple is encouraging its app developers to move away from threads and to instead utilize central dispatch queues to run asynchronous processing. To quote Apple on the advantages of using dispatch queues instead of threads:

  • It reduces the memory penalty your application pays for storing thread stacks in the application’s memory space.
  • It eliminates the code needed to create and configure your threads.
  • It eliminates the code needed to manage and schedule work on threads.
  • It simplifies the code you have to write.

These benefits are pretty real and tangible. As we saw in “The Problem With Threads”, threading introduces nondeterminism into our system. By controlling our threading models using concurrent and serial dispatch queues, we gain a better grasp on the nondeterminism of our system.

The dispatch queue concept simplifies many of the threading scenarios encountered in embedded programming. Often, I just need to run small simple tasks asynchronously without blocking the primary thread. This results in spawning numerous threads with single small purposes:

  • When the user presses a button, update the drawing on the screen
  • When charging is complete, change LEDs and notify the system
  • When recording starts, turn on an LED and start drawing the elapsed record time on the screen

These simple steps can run on any generic thread. These trivial operations don’t require the overhead of explicit thread management, excessive context switching, and higher potential for other threading errors.

Let’s see how we can implement an asynchronous dispatch queue of our own with C++11.

Table of Contents:

std::function refresher

As we saw last week with callbacks, std::function is a useful C++11 feature for capturing Callable objects. As a refresher:

Instances of std::function can store, copy, and invoke any Callable target – functions, lambda expressions, bind expressions, or other function objects, as well as pointers to member functions and pointers to data members.

For this example, we will prototype our function objects as:

typedef std::function<void(void)> fp_t;

A queue of functions

The primary purpose of using a dispatch queue is to provide a first-in, first-out processing model.

C++ luckily provides us a simple std::queue type which we can use for this purpose:

std::queue<fp_t> q_;

To add to the queue we push:

q_.push(op);

And to get the next item:

auto op = q_.front(); //get the front item
q_.pop(); //and pop it from the queue

Allocating Queue Threads

Our goal is to make our dispatch queue generic enough that we can change the number of threads for each queue we create. This allows us to create concurrent queues that allow generic tasks to run in parallel, as well as serial queues that only utilize one thread to protect a resource.

C++11 provides a generic thread type: std::thread. Using std::vector, we can manage as many std::thread objects as we need:

std::vector<std::thread> threads_;

When constructing our dispatch queue, we can specify the number of threads desired. Then our constructor does the work of creating the required number of std::thread objects in our std::vector container.

dispatch_queue(std::string name, size_t thread_cnt = 1) :
    name_{std::move(name)}, threads_(thread_cnt)
{
    for(size_t i = 0; i < threads_.size(); i++)
    {
        //Initialize each thread object
        threads_[i] = std::thread(
            &dispatch_queue::dispatch_thread_handler, this);
    }
}

Making Our Dispatch Queue Thread-Safe

Our dispatch queue is a shared resource in two potential directions:

  • Any thread can add work to the queue
  • The queue may have multiple threads which remove work from the queue for processing

To make sure we implement this safely, we must rely on a locking mechanism. Luckily, C++11 also provides std::mutex:

std::mutex lock_;

The queue itself is the critical piece, so lock around queue modifications.

Dispatch Thread Handler

The dispatch queue worker thread handler should be a simple one. Its requirements are:

  1. Wait until there is something to run
  2. Pop that item from the queue
  3. Run the item
  4. Check whether I need to quit, if not: wait again

How do I know when to run?

Once we understand our requirements for the worker threads, we encounter a question: how do I know that there’s something to execute without keeping these threads awake?

At this point, it may not surprise you to learn that C++11 also provides us a tool for this: std::condition_variable.

std::condition_variable cv_;

By pairing a condition variable with our std::mutex, we can let our threads sleep whenever there is no work to do. We wake them up again when there is data in the queue:

std::unique_lock<std::mutex> lock(lock_);
q_.push(op);
cv_.notify_one();

In our worker thread, we wait until there is a new data notification. Upon waking, the thread will take the lock, get an item from the queue, and resume operation.

void dispatch_queue::dispatch_thread_handler(void)
{
    std::unique_lock<std::mutex> lock(lock_);

    do {
        //Wait until we have data
        cv_.wait(lock, [this]{
            return (q_.size());
        });

        //after wait, we own the lock
        if(q_.size())
        {
            auto op = std::move(q_.front());
            q_.pop();

            //unlock now that we're done messing with the queue
            lock.unlock();

            op();

            lock.lock();
        }
    } while (1);
}

It’s worth noting that condition variable’s wait function requires a lock. If the thread is waiting for data, it will release the mutex and only re-lock when notified and ready to run. This is why we lock at the end of the while loop.

Exiting

The next question is: how do I know when to stop running and exit?

The simplest way is to add an exit_ or active_ boolean flag. When instructed to stop() or when destructing the queue, you can set this flag and wait for all threads to finish any work-in-progress.

dispatch_queue::~dispatch_queue()
{
	// Signal to dispatch threads that it's time to wrap up
	std::unique_lock<std::mutex> lock(lock_);
	quit_ = true;
	cv_.notify_all();
	lock.unlock();

	// Wait for threads to finish before we exit
	for(size_t i = 0; i < threads_.size(); i++)
	{
		if(threads_[i].joinable())
		{
			threads_[i].join();
		}
	}
}</std::mutex>

Your worker thread can also add the exit signal to the condition variable wait:

cv_.wait(lock, [this]{
    return (q_.size() || quit_);
  });

And use the following while structure:

do {
    // ...
} while(!quit_);

Putting it all Together

I’ve added an example dispatch queue implementation on GitHub. Check it out!

Don’t forget to compile with -std=c++11.

What if I haven’t ported std::thread or std::condition_variable?

Porting std::thread and std::condition_variable, while useful, may not be worth the time on your system. Never fear – you can use your OS’s native thread creation APIs. Most RTOSes also have support for semaphores or event flags – these are also useful for implementing a dispatch queue. Later I will show examples using ThreadX APIs.

Further Reading:

Using C++ Without the Heap

Want to use C++, but worried about how much it relies on dynamic memory allocations? Our course provides a hands-on approach for learning a diverse set of patterns, tools, and techniques for writing C++ code that never uses the heap.

Learn More on the Course Page

Major Revisions

  • 20200602:
    • Converted notify_all() in dispatch() to notify_one()
  • 20190909:
    • Update constructor to move the name string into the class member (Thanks to kixorz for the suggestion)
  • 20190201:
    • Updated std::thread constructor examples to remove std::bind, which is unnecessary

21 Replies to “Implementing an Asynchronous Dispatch Queue”

  1. Hi, great article!!
    I am an amateur when it comes to multithreading and I have a computing algorithm which would benefit from both serial as well as async dispatch queues. I looked at your example and it was inspiring.
    What I would like to implement is something in the lines of:
    a serial queue followed by an async queue followed by another async queue. Basically something where the data flows between queues. I was wondering how to re-dispatch work between these queues. For instance when op() is done I would like some mechanism to "dispatch" the result of op() to the next queue. I am toying with the idea of having the dispatch_queue class extended as a list with a pointer to next queue. I would like to have your opinion if this is something worth doing based on your example or if another design would be best suite for my application?

    1. Hi Zila,

      I’ve been thinking about your question off-and-on this week. I’ve concluded that you do not need a complex solution. Here’s how I would tackle it:

      Create three separate queues, one synchronous (one thread) and the others async (multi-threaded queue). These queues remain logically separate. You could call them something obvious such as stage1_q, stage2_q, stage3_q`.

      Rather than putting the state flow management into the dispatch queue, I would localize that requirement to the dispatched operation itself.

      First, an operation is dispatched to stage1_q. The dispatched function is then responsible for dispatching its result to stage2_q once it is completed. The stage2 function would then dispatch to the stage3_q.

      Does that make sense? Seems simple.

        1. I was checking this every day and then gave up! I was pretty sad thinking the blog is dead, because your articles are really good! so thanks for getting back.
          Indeed, this is inline with what I went for. I forgot perhaps to mention that the reason for this is that some of those queues were offloaded to FPGA, where the host CPU basically prepared the data, launched the job on the FPGA and waited for an interrupt.
          I will try and clean the code and share it!

          Thanks again for the reply!
          Zila (from Sweden).

          1. I am glad you returned :). It’s hard to balance writing, community involvement, and running a consulting business. But my true passion is writing and sharing knowledge, so I am increasingly focused on the site.

            Looking forward to seeing what you came up with!

  2. Hi Philipp!

    Thanks for the great article.

    I am trying to use a dispatch queue for emitting signals that can be queued. The tasks can last several seconds, but I would like to cancel processing directly.

    I have two solutions in mind:
    – kill the thread: That is dangerous and may leak memory, so I don’t want that.
    – split the signal tasks into smaller subtasks that dont last as long. Seems the better solution, but comes with the drawback of having extra code to split the signal into subtasks etc.

    Am I missing another option here? Thanks for any hints, pointers and help;)

    1. My first question: why do you need to cancel processing of these tasks? Is that something you intend for normal operation?

      split the signal tasks into smaller subtasks that dont last as long. Seems the better solution, but comes with the drawback of having extra code to split the signal into subtasks etc

      If there are defined stages in each subtask, you could break them up into smaller dispatch-able chunks. Each stage is responsible for dispatching the next stage. This represents a pipeline model.

  3. You’re passing std::string by value. Use std::move to move it to destination.
    Also consider using curly braces: name_ {std::move(name)}

  4. I was wondering why you use notify_all() in the dispatch command. Wouldn’t notify_one() be enough? You only add a single item to the queue.

  5. Is it possible to provide an update to this Asynchronous queue to work a Synchronous dispatch queue? What would change in the codes? (I can use this async queue as an equivalent queue to Apple DispatchQueue.async but I’m not sure how to use it as DispatchQueue.sync). Many thanks!

    1. Mo,

      I will have to experiment to see if I can come up with a nice synchronous dispatch queue design. Fundamentally you need a way to sleep the submitting thread until the task is completed in the sync queue, which then notifies the submitting thread to wake up and resume its work.

  6. Great code! In github, it should be “printf(“Creating dispatch queue: %s\n”, name_.c_str());”, or name won’t show up.

  7. Great post!
    I’m having some troubles using the code you posted on GitHub, however. The destructor calls when we leave scope, as it should. However, rather than waiting for the queue to empty, it tells all the threads to finish their current task and then kills them. This leaves most of the tasks in the queue, never to be run. However, if I put something that makes control flow wait before exiting the scope, like waiting for input, all the tasks run as expected. How should I fix this? Thanks!

    1. Hi Zac,

      The behavior you noted was an intentional design decision; if a dispatch queue goes out of scope, I think that all threads should be destroyed ASAP, regardless of work remaining in the queue. If you preferred all submitted tasks to be completed, the required change would be in the thread logic. What I would do is still have the queue notify_all() during destruction. Each thread would then clear its WIP and exit. The most straightforward way to do this is to skip the condition variable wait step if the exit_ signal has been received.

      Phillip

  8. Thanks for the response, Phillip! Knowing now that it was a design decision, I was able to understand the control flow well enough to alter it for my purposes. Very well written code, thanks again!
    For anyone in the future who wants to queue up several tasks and not continue until they are all finished, you just need to alter some code in the thread_handler function. The while loop shouldn’t stop until the quit signal is reached and the queue is empty (!quit_ || !q_.empty()) and the if statement should run even if the quit signal has been reached (q_.size()). Then, when the queue would destruct, it will pause and empty itself before leaving.

  9. Thank you for your very nice article Phillip!
    I have a quick question concerning the destructor from your github repo.
    Why is the quit_ variable protected by a mutex lock?
    The mutex is used to protect the queue everywhere, except in the destructor where it is used to protect the quit_ variable.
    Is that lock used to force a CPU Cache clear or something?
    Thanks & regards,
    Barrett Davis

Share Your Thoughts

This site uses Akismet to reduce spam. Learn how your comment data is processed.