Condition Variables

Condition Variables

Condition Variable

  • The most basic communication device

  • Everything else can be built around it (and a mutex)

    • Semaphores

    • Events

    • Message queues

    • Promises and futures (⟶ later)

Condition Variables: By Example

#include <mutex>
#include <condition_variable>
#include <deque>
#include <thread>
#include <chrono>
#include <iostream>


// message queue which can hold up to maxelem elements of type
// T. associated with the queue are two wait conditions:

// * a consumer has to wait if the queue is empty. he then waits for
//   the condition "not empty" which is made true by a producer.

// * a producer has to wait if the queue is full. he then waits for
//   the condition "not full" which is made true by a consumer.

// note that std::condition_variable must be used with
// std::unique_lock<std::mutex>, which is a moveable version of a lock
// guard. this is to guarantee that the mutex remains locked when a
// thread enters the wait and an exception is thrown inside the
// std::condition_variable
// implementation. std::condition_variable::wait(guard) moves the
// guard into the condition variable before suspending the thread, and
// moves it back out to the caller again after wakeup.
template <typename T> class MessageQueue
{
public:
    MessageQueue(size_t maxelem) : maxelem_(maxelem) {}

    void push(T elem)
    {
        {
            std::unique_lock<std::mutex> guard(lock_);
            // wait if the queue is full. (use while instead of if to
            // guard against spurious wakeups.)
            while (q_.size() == maxelem_)
                // atomically release the mutex and put the calling
                // thread to sleep until the condition becomes true
                // (not_full_ is notified).
                not_full_.wait(guard);
            q_.push_back(elem);
        }
        // notify a waiting consumer that there might is an item
        // available.
        not_empty_.notify_one();
    }

    T pop()
    {
        T rv;
        {
            std::unique_lock<std::mutex> guard(lock_);
            // wait if the queue is empty. (use while instead of if to
            // guard against spurious wakeups.)
            while (q_.size() == 0)
                // atomically release the mutex and put the calling
                // thread to sleep until the condition becomes true
                // (not_empty_ is notified).
                not_empty_.wait(guard);
            rv = q_.front();
            q_.pop_front();
        }
        // notify a waiting producer that there is room.
        not_full_.notify_one();
        return rv;
    }

private:
    std::mutex lock_;
    std::condition_variable not_empty_;
    std::condition_variable not_full_;

    size_t maxelem_;
    std::deque<T> q_;
};

void producer_func(MessageQueue<int>& mq, int nelem)
{
    for (int i=0; i<nelem; i++) {
        mq.push(i);
        std::cout << "produced " << i << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(300));
    }
}

void consumer_func(MessageQueue<int>& mq, int threadno, int nelem)
{
    for (int i=0; i<nelem; i++) {
        int elem = mq.pop();
        std::cout << "consumer #" << threadno << " consumed " << elem << std::endl;
    }
}

int main()
{
    MessageQueue<int> mq(2);

    std::thread producer(producer_func, std::ref(mq), 10);
    std::thread consumers[5]{
        std::thread(consumer_func, std::ref(mq), 0, 2),
        std::thread(consumer_func, std::ref(mq), 1, 2),
        std::thread(consumer_func, std::ref(mq), 2, 2),
        std::thread(consumer_func, std::ref(mq), 3, 2),
        std::thread(consumer_func, std::ref(mq), 4, 2),
    };

    for (auto& thread: consumers)
        thread.join();
    producer.join();
}

Danger

while instead of ifSpurious Wakeups!

More Communication: Future

Problem:

  • Worker thread calculates something in the background

  • Somebody waits (synchronizes) for that something to become ready

  • That something will become ready in the future

Solution:

  • Condition variable and mutex

  • encapsulated in a “future” device.

  • Manually coded here for fun …

#include <mutex>
#include <condition_variable>
#include <thread>
#include <iostream>


// a Future can be thought of as a handle to something (of type T)
// that is not yet available (will be produced by a different thread).

// any thread that retrieves the element (using get()) will be
// suspended until the element is available.

// a thread that makes the element available wakes a possible waiter.
template <typename T> class Future
{
public:
    Future() : is_ready_(false) {}

    // if the element is available, it is returned
    // immediately. otherwise, the caller is suspended until the
    // element is available.
    T get()
    {
        std::unique_lock<std::mutex> guard(lock_);
        // wait(..., pred): does the bloody spurious wakeup loop
        // inside
        cond_ready_.wait(guard, [this]() { return this->is_ready_; });
        return value_;
    }

    // make the element available.
    void set(T value)
    {
        {
            std::unique_lock<std::mutex> guard(lock_);
            value_ = value;
            is_ready_ = true;
        }
        cond_ready_.notify_one();
    }
    
private:
    std::mutex lock_;
    bool is_ready_;
    std::condition_variable cond_ready_;

    T value_;
};


int main()
{
    Future<int> future;

    // this thread waits a couple of seconds and then produces the
    // element
    std::thread([&future]() {
            // countdown
            int i=5;
            while (i) {
                std::cout << i-- << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(500));
            }
            // fire
            std::cout << "fire!" << std::endl;
            future.set(666);
        }).detach();

    // the main thread immediately get()s the element - which is not
    // likely to be available already.
    std::cout << "waiting for future" << std::endl;
    std::cout << "result: " << future.get() << std::endl;
}

std::promise and std::future

Same scenario, but different responsibilities

  • Somebody promises to have textit{something} ready in the future

  • Two objects …

    • std::promise is used by producer (the one who promises)

    • std::future is used by consumer (who relies on the promise that has been made)

Best done by example

#include <future>
#include <thread>
#include <iostream>


int main()
{
    // as promised *by worker*: will be ready in the future
    std::promise<int> promise;
    // a handle to the bright future; somebody can wait if he wants
    std::future<int> future = promise.get_future();

    std::thread([&promise]() {
            // countdown
            int i=5;
            while (i) {
                std::cout << i-- << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(500));
            }
            // fire
            std::cout << "fire!" << std::endl;
            promise.set_value(666);
        }).detach();

    std::cout << "waiting for future" << std::endl;
    std::cout << "result: " << future.get() << std::endl;
}