Previously, I introduced the concept of dispatch queues. Here’s a quick review of what a dispatch queue is, in case you haven’t read the previous article: The dispatcher contains multiple generic-use threads and a work queue. Threads can dispatch single functional operations to run asynchronously on the dispatch work threads.
Apple is encouraging its app developers to move away from threads and to instead utilize central dispatch queues to run asynchronous processing. To quote Apple on the advantages of using dispatch queues instead of threads:
- It reduces the memory penalty your application pays for storing thread stacks in the application’s memory space.
- It eliminates the code needed to create and configure your threads.
- It eliminates the code needed to manage and schedule work on threads.
- It simplifies the code you have to write.
These benefits are pretty real and tangible. As we saw in “The Problem With Threads”, threading introduces nondeterminism into our system. By controlling our threading models using concurrent and serial dispatch queues, we gain a better grasp on the nondeterminism of our system.
The dispatch queue concept simplifies many of the threading scenarios encountered in embedded programming. Often, I just need to run small simple tasks asynchronously without blocking the primary thread. This results in spawning numerous threads with single small purposes:
- When the user presses a button, update the drawing on the screen
- When charging is complete, change LEDs and notify the system
- When recording starts, turn on an LED and start drawing the elapsed record time on the screen
These simple steps can run on any generic thread. These trivial operations don’t require the overhead of explicit thread management, excessive context switching, and higher potential for other threading errors.
Let’s see how we can implement an asynchronous dispatch queue of our own with C++11.
Table of Contents:
std::function
Refresher- A Queue of Functions
- Allocating Queue Threads
- Making Our Dispatch Queue Thread-Safe
- Dispatch Thread Handler
- How do I know when to run?
- Exiting
- Putting it all Together
- Further Reading
std::function
refresher
As we saw last week with callbacks, std::function
is a useful C++11 feature for capturing Callable
objects. As a refresher:
Instances of
std::function
can store, copy, and invoke anyCallable
target – functions, lambda expressions, bind expressions, or other function objects, as well as pointers to member functions and pointers to data members.
For this example, we will prototype our function objects as:
typedef std::function<void(void)> fp_t;
A queue of functions
The primary purpose of using a dispatch queue is to provide a first-in, first-out processing model.
C++ luckily provides us a simple std::queue
type which we can use for this purpose:
std::queue<fp_t> q_;
To add to the queue we push:
q_.push(op);
And to get the next item:
auto op = q_.front(); //get the front item
q_.pop(); //and pop it from the queue
Allocating Queue Threads
Our goal is to make our dispatch queue generic enough that we can change the number of threads for each queue we create. This allows us to create concurrent queues that allow generic tasks to run in parallel, as well as serial queues that only utilize one thread to protect a resource.
C++11 provides a generic thread type: std::thread
. Using std::vector
, we can manage as many std::thread
objects as we need:
std::vector<std::thread> threads_;
When constructing our dispatch queue, we can specify the number of threads desired. Then our constructor does the work of creating the required number of std::thread
objects in our std::vector
container.
dispatch_queue(std::string name, size_t thread_cnt = 1) :
name_{std::move(name)}, threads_(thread_cnt)
{
for(size_t i = 0; i < threads_.size(); i++)
{
//Initialize each thread object
threads_[i] = std::thread(
&dispatch_queue::dispatch_thread_handler, this);
}
}
Making Our Dispatch Queue Thread-Safe
Our dispatch queue is a shared resource in two potential directions:
- Any thread can add work to the queue
- The queue may have multiple threads which remove work from the queue for processing
To make sure we implement this safely, we must rely on a locking mechanism. Luckily, C++11 also provides std::mutex
:
std::mutex lock_;
The queue itself is the critical piece, so lock around queue modifications.
Dispatch Thread Handler
The dispatch queue worker thread handler should be a simple one. Its requirements are:
- Wait until there is something to run
- Pop that item from the queue
- Run the item
- Check whether I need to quit, if not: wait again
How do I know when to run?
Once we understand our requirements for the worker threads, we encounter a question: how do I know that there’s something to execute without keeping these threads awake?
At this point, it may not surprise you to learn that C++11 also provides us a tool for this: std::condition_variable
.
std::condition_variable cv_;
By pairing a condition variable with our std::mutex
, we can let our threads sleep whenever there is no work to do. We wake them up again when there is data in the queue:
std::unique_lock<std::mutex> lock(lock_);
q_.push(op);
cv_.notify_one();
In our worker thread, we wait until there is a new data notification. Upon waking, the thread will take the lock, get an item from the queue, and resume operation.
void dispatch_queue::dispatch_thread_handler(void)
{
std::unique_lock<std::mutex> lock(lock_);
do {
//Wait until we have data
cv_.wait(lock, [this]{
return (q_.size());
});
//after wait, we own the lock
if(q_.size())
{
auto op = std::move(q_.front());
q_.pop();
//unlock now that we're done messing with the queue
lock.unlock();
op();
lock.lock();
}
} while (1);
}
It’s worth noting that condition variable’s wait
function requires a lock. If the thread is waiting for data, it will release the mutex and only re-lock when notified and ready to run. This is why we lock at the end of the while
loop.
Exiting
The next question is: how do I know when to stop running and exit?
The simplest way is to add an exit_
or active_
boolean flag. When instructed to stop()
or when destructing the queue, you can set this flag and wait for all threads to finish any work-in-progress.
dispatch_queue::~dispatch_queue()
{
// Signal to dispatch threads that it's time to wrap up
std::unique_lock<std::mutex> lock(lock_);
quit_ = true;
cv_.notify_all();
lock.unlock();
// Wait for threads to finish before we exit
for(size_t i = 0; i < threads_.size(); i++)
{
if(threads_[i].joinable())
{
threads_[i].join();
}
}
}</std::mutex>
Your worker thread can also add the exit signal to the condition variable wait:
cv_.wait(lock, [this]{
return (q_.size() || quit_);
});
And use the following while structure:
do {
// ...
} while(!quit_);
Putting it all Together
I’ve added an example dispatch queue implementation on GitHub. Check it out!
Don’t forget to compile with -std=c++11
.
What if I haven’t ported std::thread
or std::condition_variable
?
Porting std::thread
and std::condition_variable
, while useful, may not be worth the time on your system. Never fear – you can use your OS’s native thread creation APIs. Most RTOSes also have support for semaphores or event flags – these are also useful for implementing a dispatch queue. Later I will show examples using ThreadX APIs.
Further Reading:
- Dispatch queues
- The Problem with Threads
- Implementing an Asynchronous Dispatch Queue with FreeRTOS
- Implementing an Asynchronous Dispatch Queue with ThreadX
- Improving Your Callback Game
- Apple’s Guide on Dispatch Queues
std::thread
std::function
std::mutex
std::condition_variable
std::queue
Using C++ Without the Heap
Want to use C++, but worried about how much it relies on dynamic memory allocations? Our course provides a hands-on approach for learning a diverse set of patterns, tools, and techniques for writing C++ code that never uses the heap.
Learn More on the Course Page
Major Revisions
- 20200602:
- Converted
notify_all()
indispatch()
tonotify_one()
- Converted
- 20190909:
- Update constructor to move the
name
string into the class member (Thanks to kixorz for the suggestion)
- Update constructor to move the
- 20190201:
- Updated
std::thread
constructor examples to removestd::bind
, which is unnecessary
- Updated
Hi, great article!!
I am an amateur when it comes to multithreading and I have a computing algorithm which would benefit from both serial as well as async dispatch queues. I looked at your example and it was inspiring.
What I would like to implement is something in the lines of:
a serial queue followed by an async queue followed by another async queue. Basically something where the data flows between queues. I was wondering how to re-dispatch work between these queues. For instance when op() is done I would like some mechanism to "dispatch" the result of op() to the next queue. I am toying with the idea of having the dispatch_queue class extended as a list with a pointer to next queue. I would like to have your opinion if this is something worth doing based on your example or if another design would be best suite for my application?
Hi Zila,
I’ve been thinking about your question off-and-on this week. I’ve concluded that you do not need a complex solution. Here’s how I would tackle it:
Create three separate queues, one synchronous (one thread) and the others async (multi-threaded queue). These queues remain logically separate. You could call them something obvious such as
stage1_q
,stage2_q
, stage3_q`.Rather than putting the state flow management into the dispatch queue, I would localize that requirement to the dispatched operation itself.
First, an operation is dispatched to
stage1_q
. The dispatched function is then responsible for dispatching its result tostage2_q
once it is completed. Thestage2
function would then dispatch to thestage3_q
.Does that make sense? Seems simple.
"Seems simple" meaning you don’t need any additional machinery, and I like those kinds of solutions 🙂
I was checking this every day and then gave up! I was pretty sad thinking the blog is dead, because your articles are really good! so thanks for getting back.
Indeed, this is inline with what I went for. I forgot perhaps to mention that the reason for this is that some of those queues were offloaded to FPGA, where the host CPU basically prepared the data, launched the job on the FPGA and waited for an interrupt.
I will try and clean the code and share it!
Thanks again for the reply!
Zila (from Sweden).
I am glad you returned :). It’s hard to balance writing, community involvement, and running a consulting business. But my true passion is writing and sharing knowledge, so I am increasingly focused on the site.
Looking forward to seeing what you came up with!
Hi Philipp!
Thanks for the great article.
I am trying to use a dispatch queue for emitting signals that can be queued. The tasks can last several seconds, but I would like to cancel processing directly.
I have two solutions in mind:
– kill the thread: That is dangerous and may leak memory, so I don’t want that.
– split the signal tasks into smaller subtasks that dont last as long. Seems the better solution, but comes with the drawback of having extra code to split the signal into subtasks etc.
Am I missing another option here? Thanks for any hints, pointers and help;)
Actually only referring to the dispatch queue in general, not the async especially:)
My first question: why do you need to cancel processing of these tasks? Is that something you intend for normal operation?
If there are defined stages in each subtask, you could break them up into smaller dispatch-able chunks. Each stage is responsible for dispatching the next stage. This represents a pipeline model.
You’re passing std::string by value. Use std::move to move it to destination.
Also consider using curly braces: name_ {std::move(name)}
Thanks, will correct it.
I appreciate the feedback.
What is the difference with the curly braces in the constructor initialization list?
I was wondering why you use notify_all() in the dispatch command. Wouldn’t notify_one() be enough? You only add a single item to the queue.
Habit more than anything. You’re absolutely right that notify_one() is the appropriate function to use, I will update the article and example code.
Is it possible to provide an update to this Asynchronous queue to work a Synchronous dispatch queue? What would change in the codes? (I can use this async queue as an equivalent queue to Apple DispatchQueue.async but I’m not sure how to use it as DispatchQueue.sync). Many thanks!
Mo,
I will have to experiment to see if I can come up with a nice synchronous dispatch queue design. Fundamentally you need a way to sleep the submitting thread until the task is completed in the sync queue, which then notifies the submitting thread to wake up and resume its work.
Great code! In github, it should be “printf(“Creating dispatch queue: %s\n”, name_.c_str());”, or name won’t show up.
Wow, I can’t believe I haven’t noticed that in all this time! Thanks! Corrected the code.
Great post!
I’m having some troubles using the code you posted on GitHub, however. The destructor calls when we leave scope, as it should. However, rather than waiting for the queue to empty, it tells all the threads to finish their current task and then kills them. This leaves most of the tasks in the queue, never to be run. However, if I put something that makes control flow wait before exiting the scope, like waiting for input, all the tasks run as expected. How should I fix this? Thanks!
Hi Zac,
The behavior you noted was an intentional design decision; if a dispatch queue goes out of scope, I think that all threads should be destroyed ASAP, regardless of work remaining in the queue. If you preferred all submitted tasks to be completed, the required change would be in the thread logic. What I would do is still have the queue notify_all() during destruction. Each thread would then clear its WIP and exit. The most straightforward way to do this is to skip the condition variable wait step if the exit_ signal has been received.
Phillip
Thanks for the response, Phillip! Knowing now that it was a design decision, I was able to understand the control flow well enough to alter it for my purposes. Very well written code, thanks again!
For anyone in the future who wants to queue up several tasks and not continue until they are all finished, you just need to alter some code in the thread_handler function. The while loop shouldn’t stop until the quit signal is reached and the queue is empty (!quit_ || !q_.empty()) and the if statement should run even if the quit signal has been reached (q_.size()). Then, when the queue would destruct, it will pause and empty itself before leaving.
Thank you for your very nice article Phillip!
I have a quick question concerning the destructor from your github repo.
Why is the quit_ variable protected by a mutex lock?
The mutex is used to protect the queue everywhere, except in the destructor where it is used to protect the quit_ variable.
Is that lock used to force a CPU Cache clear or something?
Thanks & regards,
Barrett Davis