Threading: Waiting for Events

In this post, we shall look at various techniques to wait between threads — the advantages and disadvantages along with its corresponding code samples

Multiple threads can execute concurrently of each other. And, if the hardware supports it, the threads can execute in parallel to each other. There are situations where a thread needs to wait for another thread to finish before it can continue to do its work. There needs to be a way for the first thread to notify the second thread that it has finished its work and the second thread can continue executing. In these situations, it is said that there is some dependency between the threads and this dependency might be exclusive or not depending on the context and use case.

We shall now look at various techniques to introduce waiting between threads.

Waiting For Completion

When there is an order between threads — where one thread needs to start only after the other has finished, then it is easy to structure it as so. The second thread can join on the first thread and wait for it to finish before continuing. In both C++ and Python, you can join on the thread using the handle to the newly created thread.

#include <iostream>
#include <thread>

void do_some_work() {
    std::cout << "Doing some work..." << std::endl;
    // Some long operation
}

int main(int argc, char *argv[]) {
    std::thread t1(do_some_work);
    t1.join();

    // Main thread does more work

    return 0;
}

In the code above, the main thread that executes main() spawns a new thread t1 to do some work in the background (by calling the function do_some_work()). The main thread then waits for the new thread, t1 to finish before it does more additional work. Here, the main thread handles the dependency by using the join() API present on the thread library. join() is a blocking call and the main thread is suspended until the thread t1 completes. Note, without the join(), it is indeterministic which thread will run first and it is up to the OS scheduler to decided which thread gets to run. In our example, we bring in ordering using the join() method.

In Python, it is similar except for the fact that you need to call start() on the thread handle to get it to run.

import threading

def do_some_work(id):
    print("{}: Doing some work...".format(id))
    # Some long operation

if __name__ == "__main__":
    t1 = threading.Thread(target=do_some_work, args=(42,))
    t1.start()
    # Some other work
    t1.join()

In both C++ and Python, the threads get mapped to the underlying OS thread and can execute concurrently with other threads in the system. In Linux, most of the times they are abstractions around the pthreads library and the corresponding pthread_create and pthread_join calls.

Signaling

When two threads want to signal each other arbitrarily, there are multiple options available. This sort of use case pop up regularly in multi-threaded applications where a different thread can start running without waiting for a dependent thread to stop. For example, one thread might be responsible for downloading a file as part of its operations and then signaling other threads, who are waiting on the completion of the download — to start their work on the said downloaded file. And then the downloading file can now continue its work after signaling dependent threads. There are also Producer-Consumer kinds of setup where you need to notify the consumers as soon as the data is produced by the producer.

Busy Wait

In this approach, you use a shared variable to check if the waiting threads can run or not. The thread waiting will check the shared variable and once it finds that it can indeed move forward, starts performing the work. The waited upon thread will appropriately set the shared variable once it completes its work to signal the waiting threads. Busy wait can be seen with the below example code:

#include <iostream>
#include <thread>

int main(int argc, char *argv[])
{
    bool download_complete = false;
    std::thread download_thread([&download_complete](){
        std::cout << "Starting Downloading Task." << std::endl;
        std::cout << "Downloading file..." << std::endl;
        std::this_thread::sleep_for(
                std::chrono::nanoseconds(std::chrono::seconds(5))
                );
        std::cout << "Download complete." << std::endl;
        download_complete = true;
        std::cout << "Doing something else..." << std::endl;
        std::this_thread::sleep_for(
                std::chrono::nanoseconds(std::chrono::seconds(5))
        );
        std::cout << "Done Downloading Task." << std::endl;
    });

    std::thread waiting_thread1([&download_complete]() {
        std::cout << "Starting Task1." << std::endl;
        while (!download_complete) {}
        std::cout << "Doing Task1." << std::endl;
        std::this_thread::sleep_for(
                std::chrono::nanoseconds(std::chrono::seconds(7))
        );
        std::cout << "Done Task1." << std::endl;
    });

    std::thread waiting_thread2([&download_complete]() {
        std::cout << "Starting Task2." << std::endl;
        while (!download_complete) {}
        std::cout << "Doing Task2." << std::endl;
        std::this_thread::sleep_for(
                std::chrono::nanoseconds(std::chrono::seconds(3))
        );
        std::cout << "Done Task2." << std::endl;
    });

    download_thread.join();
    waiting_thread1.join();
    waiting_thread2.join();

    return 0;
}

Here, in the above code, waiting_thread1 and waiting_thread2 are waiting on the download_thread. The shared variable download_complete is used to signal the completion of the action. The waiting_thread1 and waiting_thread2 will wait doing nothing until the variable download_complete is set by the download_thread. This type of waiting is called busy waiting — since CPU cycles are burned doing nothing. Note, the scheduler can still preempt the OS thread that runs the particular code. But, when the thread gets a chance to run, it will end up burning CPU cycles until the shared variable is set.

The equivalent Python version would be:

import threading
import time

download_complete = False


def download_file_and_do_other_things():
    print("Starting Downloading Task.")
    print("Downloading file...")
    time.sleep(5)
    print("Download complete.")
    global download_complete
    download_complete = True
    print("Doing something else...")
    time.sleep(5)
    print("Done Downloading Task.")


def work1():
    print("Starting Task1.")
    while not download_complete:
        pass
    print("Doing Task1.")
    time.sleep(7)
    print("Done Task1.")


def work2():
    print("Starting Task2.")
    while not download_complete:
        pass
    print("Doing Task2.")
    time.sleep(3)
    print("Done Task2.")


if __name__ == "__main__":
    download_complete = False
    download_thread = threading.Thread(
        target=download_file_and_do_other_things,
    )
    waiting_thread1 = threading.Thread(
        target=work1,
    )
    waiting_thread2 = threading.Thread(
        target=work2,
    )

    # Start Work
    download_thread.start()
    waiting_thread1.start()
    waiting_thread2.start()

    # Join threads
    download_thread.join()
    waiting_thread1.join()
    waiting_thread2.join()

There are multiple ways in passing the shared variable in Python and in the above, we use a global variable to notify between the waiting and the waited upon thread.

Wait Between Sleeping/Yielding

This version tries to minimize the busy wait seen in the earlier version. Here, we sleep for some amount of time between checking the shared variable. And, as you can see, we will not wake up the exact time when the event occurs but it again depends on what time you want to sleep on, after checking the status of the shared variable.

The below call:

while (!download_complete) {
}

is replaced with:

while (!download_complete) {
   std::this_thread::sleep_for(some_time); 
}

And, equivalently in Python, the call:

while not download_complete:
    pass

is replaced with:

while not download_complete:
    time.sleep(some_time)

The the calls to sleep_for() and sleep() will make the scheduler to suspend execution of the program and hence the busy wait is avoided here. Hence, the download_complete will be evaluated only around some_time each time. Using this technique, you can avoid the spinning of the CPU but you are forgoing the exact time you will continue onto the next instruction. This might be a alright or not depending on your use case.

Also, in the C++ code above, you can call yield() on the thread to relinquish the CPU after checking for the shared download_complete variable. This is as:

while (!download_complete) {
   std::this_thread::yield();
}

Note, calling sleep() will also relinquish the CPU and both the methods will kind of result in the same behavior. In Linux, the yield() call is an abstraction over the pthread_yield method.

One-Off Events

Here, we want to wait for one-off events efficiently. In C++, we event wait using std::condition_variable sync member. But for truly one-off events, I personally feel that using a std::condition_variable is too much of an overkill since you need an extra lock to wait on the condition variable. As we shall see later, std::condition_variable is tailor made for producer consumer code.

To signal for one-off events, we shall use a std::promise with its associated std::future. Consider where a thread is waiting for a signal on another thread, we can use the following code to send one-off signal between threads.

int main(int argc, char *argv[])
{
    std::promise<bool> prom;
    std::thread download_thread([&prom](){
        std::cout << "Starting Downloading Task." << std::endl;
        std::cout << "Downloading file..." << std::endl;
        std::this_thread::sleep_for(
                std::chrono::nanoseconds(std::chrono::seconds(5))
                );
        std::cout << "Download complete." << std::endl;
        // Notify here
        std::cout << "Setting value for future" << std::endl;
        prom.set_value(true);
        std::cout << "Doing something else..." << std::endl;
        std::this_thread::sleep_for(
                std::chrono::nanoseconds(std::chrono::seconds(5))
        );
        std::cout << "Done Downloading Task." << std::endl;
    });

    std::thread waiting_thread1([&prom]() {
        std::cout << "Running waiting thread1" << std::endl;
        std::cout << "Waiting for result. Thread 1" << std::endl;
        std::future<bool> fut = prom.get_future();
        fut.wait();
        std::cout << "Done waiting. Thread 1" << std::endl;
        std::this_thread::sleep_for(
                std::chrono::nanoseconds(std::chrono::seconds(15))
        );
        std::cout << "Doing something else.Thread 1" << std::endl;
    });

    download_thread.join();
    waiting_thread1.join();

    return 0;
}

Above, we use a std::promise to signal an event using the set_value() member function, when ready. The waiting thread will get the std::future associated with the promise by calling get_future(). The thread will then call wait() on the future, where it will be suspended until a value is available on the associated std::promise object.

And, if you need to have multiple threads waiting on an one-off event, you can use a std::shared_future to signal multiple threads of the event. The shared future will be initialized as such:

std::promise<bool> prom;
std::shared_future<bool> sf(prom.get_future());

And now the waiting threads will take in the std::shared_future object in its closure, on which wait() will be called. This is shown below:

std::thread waiting_thread1([&sf]() {}
std::thread waiting_thread2([&sf]() {}

Using a std::promise along with its associated std::future is great for one-off event signaling since a future is a once use object.

Producer-Consumer

For a producer-consumer design, we can use a std::condition_variable that is aptly suited for this use case. Here, the design is such that the consumer is going to wait until a producer produces something for it to consume. The consumer will wait on a notification from the producer saying that it has produced some value. This design can be seen below:

int main(int argc, char *argv[])
{
    std::random_device rdm;
    std::uniform_int_distribution<int> dist(3, 15);

    std::condition_variable cv;
    std::mutex mux;

    std::queue<int> q;

    std::thread producer([&cv, &mux, &dist, &rdm, &q]{
        for (;;) {
            int sleep_for = dist(rdm);
            std::this_thread::sleep_for(
                std::chrono::milliseconds{std::chrono::seconds{sleep_for}});
            {
                std::lock_guard lg(mux);
                std::cout << "Producer -> " << sleep_for << std::endl;
                q.push(sleep_for);
            }
            cv.notify_all();
        }
    });

    std::thread consumer([&cv, &mux, &q]{
        for (;;) {
            std::unique_lock ul(mux);
            cv.wait(ul, [&q]{ return !q.empty(); });
            int front = q.front();
            std::cout << "Consumer <- " << front << std::endl;
            q.pop();
            ul.unlock();
        }
    });

    producer.join();
    consumer.join();

    return 0;
}

Above, a std::queue is used to pass data between a producer and a consumer. A mutex, mux is used to lock the shared queue when pushing and popping, and hence shared between the producer and consumer. After producing a value, we push that value into the queue and then release the lock (which is scoped) and call notify_all() on the std::condition_variable. When a consumer starts, it will use the mux that is shared with the producer. It will pass this mutex to the wait() on the std::condition_variable. This method will call the closure supplied with the lock held on the passed mutex. If the closure returns false, the lock is released and the thread is suspended until the closure returns true the next time, in which case the value produced by the producer is consumed by the consumer.

Note, before the producer calling notify_all(), it is good practice to release the lock on the mutex since this mutex is also shared with the consumer. And in our above example, the value consumed is straight forward but if the consumption takes some amount of time, it is better to release the lock before processing the value.

In Linux, the std::condition_variable is abstracted on top of pthread_cond_signal, pthread_cond_broadcast and pthread_cond_wait functions.

In Python, the threading.Event interfaces are simple enough to use as an one-off event mechanism or in a Producer-Consumer setup. You can use the variant of the below code to do what you want.

import threading
import time

notif = threading.Event()


def download_file_and_do_other_things():
    print("Starting Downloading Task.")
    print("Downloading file...")
    time.sleep(5)
    print("Download complete.")
    print("Notifying consumers.")
    notif.set()
    print("Doing something else...")
    time.sleep(5)
    print("Done Downloading Task.")


def work1():
    print("Starting Task1.")
    print("Waiting on producer. Task 1.")
    notif.wait()
    print("Doing Task1.")
    time.sleep(7)
    print("Done Task1.")


def work2():
    print("Starting Task2.")
    print("Waiting on producer. Task 2.")
    notif.wait()
    print("Doing Task2.")
    time.sleep(3)
    print("Done Task2.")


if __name__ == "__main__":
    download_thread = threading.Thread(
        target=download_file_and_do_other_things,
    )
    waiting_thread1 = threading.Thread(
        target=work1,
    )
    waiting_thread2 = threading.Thread(
        target=work2,
    )

    # Start Work
    download_thread.start()
    waiting_thread1.start()
    waiting_thread2.start()

    # Join threads
    download_thread.join()
    waiting_thread1.join()
    waiting_thread2.join()

Finally, note these are not the entire ways in which you can wait on events between threads. You can use clever techniques or misuse the standard synchronization utilities provided by C++/Python. Performance and semantics might come in your way when working with such solutions. When using the right tools, you make your code easier to understand and work with.

That’s it. Hope this post briefly explained the various ways in which you can wait between threads. For any discussion, tweet here.