Best practices
A common practice for testing threadsafe code, that Anthony Williams describes in his excellent book is to execute the code-under-test on multiple threads, while ensuring that all threads actually do start the given tasks at the same time exactly. This may not be obvious, so let’s look at an example.
Assume that we’re trying to test the following basic implementation of a ThreadsafeQueue:
template <typename T>
class ThreadsafeQueue
{
public:
    ThreadsafeQueue() = default;
    ThreadsafeQueue& operator=(ThreadsafeQueue&&) = delete;
    ~ThreadsafeQueue() noexcept { clear(); }
    T wait_pop_front()
    {
        std::unique_lock lk{mutex_};
        cv_.wait(lk, [this]() noexcept { return !data_.empty(); });
        auto val{std::move(data_.front())};
        data_.pop_front();
        return val;
    }
    void push_back(const T& val)
    {
        std::lock_guard lk{mutex_};
        data_.push_back(val);
        cv_.notify_one();
    }
    bool empty() const noexcept
    {
        std::lock_guard lk{mutex_};
        return data_.empty();
    }
    void clear() noexcept
    {
        std::lock_guard lk{mutex_};
        data_.clear();
    }
private:
    mutable std::mutex mutex_{};
    mutable std::condition_variable cv_{};
    std::deque<T> data_{};
};The interface is purposefully minimal – it allows only for pushing at the back and popping at the front – this is all that’s necessary to illustrate the point.
When trying to test this code the seemingly obvious idea might be to spawn two threads – one calling push_back and the other calling pop_front, like so:
void test_push_pop()
{
    ThreadsafeQueue queue;
    auto push_result = std::async(std::launch::async, [&queue] {
                            queue.push_back(42);
                        });
    auto pop_result = std::async(std::launch::async, [&queue] {
                            return queue.pop_front();
                        });
    push_result.get();
    auto const val{pop_result.get()};
    assert(val == 42);
    assert(queue.empty());
} We’re guaranteeing here that the tasks given to std::async will execute on separate threads by specifying std::launch::async, but that’s where the guarantees end. We can not be sure how the operating system schedules these tasks, given that the cost of starting a thread is much greater than the push_back and pop_front operations it’s likely that one of these threads finishes execution before the other is even started. We might get “lucky” sometimes and the scheduler will actually run both of these tasks exactly in parallel, but we’d like something more reliable.
The solution that Anthony gives in his book is to utilize the mechanism of std::promise and std::future to ensure that the spawned threads run the code-under-test concurrently:
void test_push_pop_concurrent()
{
    ThreadsafeQueue<int> queue;
    std::promise<void> go, push_ready, pop_ready;       # synchronization flags
    std::shared_future<void> ready{go.get_future()};
    std::future<void> push_result = std::async(std::launch::async,
                                        [&queue, &push_ready, ready{ready}](){
                                            push_ready.set_value();
                                            ready.wait();
                                            queue.push_back(42);
                                    });
    std::future<int> pop_result = std::async(std::launch::async,
                                        [&queue, &pop_ready, ready{ready}](){
                                            pop_ready.set_value();
                                            ready.wait();
                                            return queue.wait_pop_front();
                                    });
    push_ready.get_future().wait();     # wait until the `push thread` is ready
    pop_ready.get_future().wait();      # wait until the `pop thread` is ready
    go.set_value();                     # signal both threads to continue
    push_result.get();
    auto const val{pop_result.get()};
    assert(val == 42);
    assert(queue.empty());
}The gist of the idea is to use std::promise as flags that signalize readiness – each thread gets their own flag to notify that it’s ready – these are the push_ready and pop_ready variables. Each thread also gets a std::shared_future instance – the ready flag. Each thread then signalizes that it has started successfully and is ready to execute the code-under-test by setting its respective push_ready or pop_ready flags. The threads are then blocked by the ready flag, which is set in the main thread, to signalize both threads that they should continue execution. This is as close as we can get to starting the code concurrently by using the standard library facilities. The remainder of the function is the same as in the naive case.
Note that I’m omitting some details and many considerations described in Anthony’s book, including exception safety. I’m relaying just enough here, to provide the necessary context for the main part of this post. For the complete picture please check out Anthony’s book. I really can’t recommend it highly enough.
This setup achieves the goal of actually running the code concurrently, which is great, but given that we would likely need to test many, many cases – single producer, multiple consumers, multiple produces, single consumer, empty queue, full queue – just to name a few, we’re destined to repeat this code as many times in our test code. Remembering the DRY (Don’t Repeat Yourself!) principle let’s try to generalize this pattern, and allow for covering all cases we can think of effortlessly.
Recognizing the pattern
Let’s again state what exactly we’d like to achieve.
We would like the execute two, or more, blocks of code concurrently while ensuring that the execution of each block starts exactly at the same time (or as close to that as possible).
Let’s now look more closely at the above listing. Aside for constructing the queue and the queue.push_back(42); and the return queue.wait_pop_front(); calls everything else is the clutter we’re destined to repeat every time we write a similar test, and this is exactly what we’d like to abstract away.
The pattern here is relatively straight-forward:
- every thread gets a dedicated flag that it will set for signalling readiness
- every thread gets a shared flag that signalizes readiness of all threads
- the main thread controls the execution – waits untill all threads signilize readiness and signalls them to continue
Defining the goal
Let’s for a moment forget about all the implementation details necessary to achieve the goal of starting the code concurrently, and think about the code we’d like to write. We’d really like to just specify that the given two code blocks, or more specifically callables, should be started concurrently. So, in (pseudo-ish) code:
void test_push_pop_concurrently()
{
    ThreadsafeQueue queue{};
    auto [push_result, pop_result] = start_concurrently(
                                        [&queue](){ queue.push_back(42); },
                                        [&queue](){ return queue.wait_pop_front(); }
                                     );
    assert(pop_result == 42);
    assert(queue.empty());
}Two lambdas executing the code we’d like to test, are passed in to the start_concurrently function – this is our abstraction – it does all of the heavy lifting we were previously exposed to. We then extract the results using C++17’s structured bindings and assert on whatever expectations we have.
This kind of abstraction would let us easily write as many test cases as necessary. It would leave us with pure business logic, hiding all of the cumbersome synchronization code. This is the ideal. Let’s see if it is achievable.
Reaching the goal
Implementation will require a little bit of metaprogramming. We’re given a parameter pack of callables, which need to be dispatched into separate threads of execution. We would of course like to abstract for an arbitrary number of threads. Remembering our synchronization requirements each thread will need a dedicated (write) ready-flag and a shared (read) go-flag. The listing below shows the code I was able to arrive at. A description of what’s going on follows.
template<std::size_t... Is, typename... Fs>
auto start_concurrently_impl(std::index_sequence<Is...>, Fs&&... fs)
{
    std::array<std::promise<void>, sizeof...(Is)> ready_flags{};
    std::promise<void> go{};
    std::shared_future<void> go_future{go.get_future()};
    auto futures = std::make_tuple(
        std::async(
            std::launch::async,
            [&ready = std::get<Is>(ready_flags),
             go_flag = go_future,
             f = std::forward<Fs>(fs)]() {
                ready.set_value();
                go_flag.wait();
                return f();
            })...
        );
    // (..., ready_flags[Is].get_future().wait());
    for (std::size_t i{0}; i != sizeof...(Is); ++i)
    {
        ready_flags[i].get_future().wait();
    }
    go.set_value();
    return futures;
}
template<typename... Fs>
auto start_concurrently(Fs&&... fs)
{
    using Indices = std::make_index_sequence<sizeof...(Fs)>;
    return start_concurrently_impl(Indices{}, std::forward<Fs>(fs)...);
}Starting from the start_concurrently template function. It takes a parameter pack of callables and takes advantage of the auto return type deduction available since C++14. The actual implementation needs to know the count of the given callables and iterates over them, so an std::index_sequence is constructed and passed down.
The start_concurrently_impl function template constructs an array of ready_flags – each thread will receive one of these flags to signal readiness.
A single go flag, along with an associated shared go_future object is constructed – each thread will get a copy of the go_future, so that it’s able to receive a signal when it should resume execution.
Let’s for now skip over the std::make_tuple and take a closer look at the std::async call. This is the core of the implementation.
Same as before, the std::launch::async flag is passed in, to ensure that each callable runs on a thread and isn’t just deffered.
The lambda passed in to std::async captures a reference to one of the ready-flags. A copy of the go_flag is also passed in. It is used to block on line 15 untill all other threads are ready and execution should be resumed. Finally the callable object is forwarded into the lambda. It is invoked on line 16.
Note that the expressions &ready = ready_flags[Is], and f = std::forward<Fs>(fs) take advantage of parameter-pack-expansion. The ready_flags are indexed using the passed in std::index_sequence indices, this why it was required.
Going back to the std::make_tuple – it is given a parameter-pack-expansion of the described std::async calls. The ellipsis on line 17 forces expansion of both the Is and fs parameter packs. This will create a tuple of std::future objects, holding the execution results of each callable.
Lines 20-25 enforce the thread synchronization. All of the ready-flags are waited on in a simple for-loop. Alternatively a C++17 fold expression could be used here instead (the commented-out line). All threads are then signalled to resume with the go flag.
The resulting tuple of futures is then returned to the caller. Note that the threads may still be running at this point. This is a design decision. An alternative might be to actually wait until all threads finished execution, or even return a tuple of results, rather than futures. The last alternative would impose additional requirements on the caller – passing in callables returning void would not be allowed. This is because void is not a regular type and can not be stored in a tuple. If the regular-void proposal is ever adopted into the standard (or if we implemented it ourselves…) this could be a valid alternative. I think that returning the future objects is perfectly fine here and avoids the void issues altogether.
Testing the implementation
Let’s prove to ourselves that the implementation performs as advertised. A simple sanity-check could be done by querying the current time on each thread and checking the difference.
void start_concurrently_sanity_check()
{
    auto const get_now = []() { return std::chrono::system_clock::now(); };
    auto futures = start_concurrently(get_now, get_now, get_now);
    const auto time0 = std::get<0>(futures).get();
    const auto time1 = std::get<1>(futures).get();
    const auto time2 = std::get<2>(futures).get();
    std::cout << "time0 - time1 = "
              << std::chrono::duration_cast<std::chrono::microseconds>(time0 - time1).count() << " us\n";
    std::cout << "time0 - time2 = "
              << std::chrono::duration_cast<std::chrono::microseconds>(time0 - time2).count() << " us\n";
    std::cout << "time1 - time2 = "
              << std::chrono::duration_cast<std::chrono::microseconds>(time1 - time2).count() << " us\n";
}Executing this multiple times results in time differences mostly under 20, often under 10 microseconds. These are the same results I was getting with the non-generic version.
The ThreadsafeQueue could now be tested as follows:
void test_push_pop_concurrently()
{
    ThreadsafeQueue<int> queue;
    auto [result_push, result_pop] = start_concurrently(
                                        [&queue](){ queue.push_back(42); },
                                        [&queue](){ return queue.wait_pop_front(); }
                                     );
    assert(result_pop.get() == 42);
    assert(queue.empty());
}The ideal interface goal was almost achieved. The only difference is that start_concurrently a tuple of std::future objects is rurned, rather than just result, so .get() must be called.
Summary
Testing threadsafe code is not an easy task, but as always raising the level of abstraction greatly reduces its complexity. What do you think about the start_concurrently test utility? Do you see any uses for it in your own code? I’m not sure the name fully expresses what this code does. Does a better one come to your mind?
 
 
					
0 Comments