The Boost C++ Libraries


Chapter 6: Multithreading


Table of Contents

This book is licensed under a Creative Commons License.

A new edition of this book is available! It has been published as a print book and can be bought from Barnes and Noble, Amazon and other bookstores. The new edition is up-to-date and based on the Boost C++ Libraries 1.47.0 (released in July 2011). Several chapters have been updated (for example to Boost.Spirit 2.x, Boost.Signals 2 and Boost.Filesystem 3) and many new libraries are covered (for example Boost.CircularBuffer, Boost.Intrusive and Boost.MultiArray). For more information please see the publisher's website XML Press.


6.1 General

Threads are discrete processing sequences that allow execution of different functions within a given application at the same time. This becomes important if a function, known to take a long time for a specific calculation, should not block a different function while executing. Threads actually allow the simultaneous execution of two functions without having one waiting for the other.

Once an application is launched, it contains only one thread by default. This thread executes the main() function. Functions called from within main() are executed sequentially in the context of this thread. Such a program is called a single threaded application.

In contrast, programs that create new threads are called multithreaded applications. Not only can they execute multiple functions at the same time, they also become more and more important these days since computers now ship with CPUs containing more than one core by default. Since multiple cores allow the execution of different functions simultaneously, they put a burden on the developer to use the available processing capacity accordingly. While threads have always been used whenever different functions should execute concurrently, developers now are more and more forced to carefully structure their applications to support this concurrency. Knowledge of multithreaded programming thus becomes more and more important in the age of multi-core systems.

This chapter introduces the Boost C++ Library Boost.Thread that allows the development of platform independent multithreaded applications.


6.2 Thread Management

The most important class in this library is boost::thread, defined in boost/thread.hpp, which is used to create a new thread. The following example shows how to apply it.

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

void thread() 
{ 
  for (int i = 0; i < 5; ++i) 
  { 
    wait(1); 
    std::cout << i << std::endl; 
  } 
} 

int main() 
{ 
  boost::thread t(thread); 
  t.join(); 
} 

The name of the function that should execute within the thread is passed to the constructor of boost::thread. Once the variable t in the above example is created, the thread() function starts executing in its own thread immediately. At this point, thread() executes concurrently with the main() function.

To keep the application from terminating, the join() method is called on the newly created thread. join() is a blocking call: It blocks the current thread until the thread, for which join() was called, has terminated. This causes main() to wait until thread() has terminated.

As seen in the above example, a particular thread can be accessed via a variable such as t in order to e.g. wait for its termination using the join() method. However, the thread will continue to execute even if t loses its scope and is destroyed. A thread always binds to a variable of type boost::thread in the beginning, but once created, depends no longer on it. There even exists a method called detach() that allows to decouple a variable of type boost::thread from its corresponding thread. Certainly, methods such as join() cannot be called afterwards since the variable does not represent a valid thread any longer.

Anything that can be done inside a function can also be done inside a thread. Ultimately, a thread is no different than a function - except that it is executed concurrently. In the example above, five numbers are written to the standard output stream using a loop. To slow down the output, every iteration of the loop calls the wait() function to stall execution for one second. wait() employs a function named sleep() that also originates from Boost.Thread and resides in the boost::this_thread namespace.

sleep() either expects a period of time or a specific point in time indicating how long or until when the current thread should be stalled. By passing an object of type boost::posix_time::seconds, a period of time is specified in this example. boost::posix_time::seconds comes from the Boost.DateTime library that is used by Boost.Thread to manage and process time data.

While the previous example shows how to wait for a different thread, the following example rather shows how a thread can be interrupted by the means of so-called interruption points.

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

void thread() 
{ 
  try 
  { 
    for (int i = 0; i < 5; ++i) 
    { 
      wait(1); 
      std::cout << i << std::endl; 
    } 
  } 
  catch (boost::thread_interrupted&) 
  { 
  } 
} 

int main() 
{ 
  boost::thread t(thread); 
  wait(3); 
  t.interrupt(); 
  t.join(); 
} 

Calling interrupt() on a thread object interrupts the corresponding thread. In this context, interrupted means that an exception of type boost::thread_interrupted is thrown inside the thread. However, this only happens once the thread reaches an interruption point.

Simply calling interrupt() does not cause anything if the given thread does not contain any interruption points. Whenever a thread reaches an interruption point it will check whether the interrupt() method has been called. Only if it has, a boost::thread_interrupted exception is thrown accordingly.

Boost.Thread defines a series of interruption points such as the sleep() function. Since sleep() is called five times in the example, the thread checks five times whether or not it was interrupted. Between the calls to sleep(), the thread can not be interrupted though.

Once the program is executed, it will only print three numbers to the standard output stream. This is caused by calling the interrupt() method after three seconds inside main(). Thus, the corresponding thread is interrupted and throws a boost::thread_interrupted exception accordingly. While the exception is correctly caught inside the thread, the catch handler is empty though. Since the thread() function returns after the handler, the thread is terminated as well. This in turn will also terminate the entire application since main() simply waited for the thread to terminate using the join() method.

Boost.Thread defines about ten interruption points including the mentioned sleep() function. Thanks to these interruption points, threads can easily be interrupted in a timely manner. However, they may not always be the best choice since an interruption point must be reached first in order to check for the boost::thread_interrupted exception.

To provide a rough overview over the different functions provided by Boost.Thread, the following example introduces two more.

#include <boost/thread.hpp> 
#include <iostream> 

int main() 
{ 
  std::cout << boost::this_thread::get_id() << std::endl; 
  std::cout << boost::thread::hardware_concurrency() << std::endl; 
} 

Using the namespace boost::this_thread, independent functions are provided that apply to the current thread such as the sleep() function shown before. Another one is get_id(): It will return a number identifying the current thread. It is also provided as a method for boost::thread.

The static hardware_concurrency() method, provided by the boost::thread class, returns the number of threads that could physically be executed at the same time based on the underlying number of CPUs or CPU cores. Calling this function on a commonly used dual-core machine, a value of 2 is returned. This allows for a simple method to identify the theoretical maximum number of threads that should be used simultaneously by a given multithreaded application.


6.3 Synchronization

While the usage of multiple threads can increase the performance of an application it usually also increases its complexity. If several functions execute at the same time using threads, access to shared resources must be synchronized accordingly. This involves quite some work once the application reaches a certain size. This paragraph introduces the classes provided by Boost.Thread for synchronizing threads.

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::mutex mutex; 

void thread() 
{ 
  for (int i = 0; i < 5; ++i) 
  { 
    wait(1); 
    mutex.lock(); 
    std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; 
    mutex.unlock(); 
  } 
} 

int main() 
{ 
  boost::thread t1(thread); 
  boost::thread t2(thread); 
  t1.join(); 
  t2.join(); 
} 

Multithreaded programs use so-called mutex objects for synchronization. Boost.Thread provides different mutex classes, boost::mutex being the most simple one. The basic principle of a mutex is to prevent other threads from taking ownership while a particular thread owns it. Once released, a different thread can take ownership. This causes threads to wait until a different thread has finished processing some operations and thus releases its ownership of the mutex object accordingly.

The above example uses a global mutex object of type boost::mutex named mutex. The thread() function takes ownership of this object right before it writes to the standard output stream inside the for loop using the lock() method. Once a message has been written, the ownership is released using the unlock() method.

main() creates two threads, both executing the thread() function. Using the for loop, every thread counts to five, writing a message to the standard output stream with each iteration. Unfortunately, the standard output stream is a global object which is shared among all threads. The standard does not provide any guarantee that std::cout can be safely accessed from multiple threads. Thus, access to the standard output stream must be synchronized: At any given time, only one thread is allowed to access std::cout.

Since both threads try to acquire the mutex before writing to the standard output stream, it is guaranteed that only one thread at a time actually accesses std::cout. Disregarding which thread successfully calls the lock() method, all other threads need to wait until unlock() has been called.

Acquiring and releasing mutexes is a typical scheme and is supported by Boost.Thread through different data types. For example, instead of calling lock() and unlock() directly, the boost::lock_guard class can be used.

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::mutex mutex; 

void thread() 
{ 
  for (int i = 0; i < 5; ++i) 
  { 
    wait(1); 
    boost::lock_guard<boost::mutex> lock(mutex); 
    std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; 
  } 
} 

int main() 
{ 
  boost::thread t1(thread); 
  boost::thread t2(thread); 
  t1.join(); 
  t2.join(); 
} 

boost::lock_guard automatically calls lock() and unlock() inside its constructor and destructor, respectively. Access to the shared resource is as synchronized as it was when calling both methods explicitly. The boost::lock_guard class is yet another example of the RAII idiom presented in Chapter 2, Smart Pointers.

Besides boost::mutex and boost::lock_guard, Boost.Thread provides additional classes to support variations of synchronization. One of the essential ones is boost::unique_lock which, compared to boost::lock_guard, provides a number of helpful methods.

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::timed_mutex mutex; 

void thread() 
{ 
  for (int i = 0; i < 5; ++i) 
  { 
    wait(1); 
    boost::unique_lock<boost::timed_mutex> lock(mutex, boost::try_to_lock); 
    if (!lock.owns_lock()) 
      lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(1)); 
    std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; 
    boost::timed_mutex *m = lock.release(); 
    m->unlock(); 
  } 
} 

int main() 
{ 
  boost::thread t1(thread); 
  boost::thread t2(thread); 
  t1.join(); 
  t2.join(); 
} 

The above example uses various methods to illustrate some of the features provided by boost::unique_lock. Certainly, the usage of these features does not necessarily make sense for the given scenario; the usage of boost::lock_guard in the previous example was already adequate. This example is rather meant to demonstrate the possibilities offered by boost::unique_lock.

boost::unique_lock offers different ways of acquiring a mutex by providing multiple constructors. The one expecting only a mutex object simply calls the lock() method and waits until the mutex has been acquired. It thus acts exactly the same as the one of boost::lock_guard.

If a value of type boost::try_to_lock is passed as the second parameter, the corresponding constructor calls try_lock() instead. This method returns a value of type bool: true if the mutex could be acquired, false if not. Opposed to the lock() method, try_lock() returns immediately and does not block until the mutex has been acquired.

The above program passes boost::try_to_lock as the second parameter to the constructor of boost::unique_lock. Whether or not the mutex has been acquired can be checked via the owns_lock() method afterwards. In case it has not - owns_lock() returns false - another function provided by boost::unique_lock is used: timed_lock() waits for a certain time to acquire the mutex. The given program waits for up to one second which should be more than enough time to acquire the mutex.

The example actually shows the three fundamental ways of acquiring a mutex: lock() waits until the mutex has been acquired. try_lock() does not wait but acquires the mutex if it is available at the time of the call and returns false otherwise. Finally, timed_lock() tries to acquire the mutex within a given period of time. As with try_lock(), success or failure is indicated by the return value of type bool.

While both lock() and try_lock() are provided by boost::mutex, timed_lock() is only supported by boost::timed_mutex instead which is the reason for its usage in the above example program. Without using timed_lock(), the mutex can be of type boost::mutex as seen in the previous example.

Just like boost::lock_guard, the destructor of boost::unique_lock releases the mutex accordingly. In addition, the mutex can be manually released using the unlock() method. It is also possible to remove the association between the boost::unique_lock class and the mutex by calling release() as done in the above example. In this case however, the mutex must be released using the unlock() method explicitly as it is not done automatically by the destructor of boost::unique_lock anymore.

boost::unique_lock is a so-called exclusive lock meaning that only one thread at a time can acquire the mutex. Other threads are required to wait until the mutex has been released again. Besides exclusive locks there are also non-exclusive ones. Boost.Thread provides a class named boost::shared_lock to support non-exclusive locks accordingly. This class must be used together with a mutex of type boost::shared_mutex as shown in the following example.

#include <boost/thread.hpp> 
#include <iostream> 
#include <vector> 
#include <cstdlib> 
#include <ctime> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::shared_mutex mutex; 
std::vector<int> random_numbers; 

void fill() 
{ 
  std::srand(static_cast<unsigned int>(std::time(0))); 
  for (int i = 0; i < 3; ++i) 
  { 
    boost::unique_lock<boost::shared_mutex> lock(mutex); 
    random_numbers.push_back(std::rand()); 
    lock.unlock(); 
    wait(1); 
  } 
} 

void print() 
{ 
  for (int i = 0; i < 3; ++i) 
  { 
    wait(1); 
    boost::shared_lock<boost::shared_mutex> lock(mutex); 
    std::cout << random_numbers.back() << std::endl; 
  } 
} 

int sum = 0; 

void count() 
{ 
  for (int i = 0; i < 3; ++i) 
  { 
    wait(1); 
    boost::shared_lock<boost::shared_mutex> lock(mutex); 
    sum += random_numbers.back(); 
  } 
} 

int main() 
{ 
  boost::thread t1(fill); 
  boost::thread t2(print); 
  boost::thread t3(count); 
  t1.join(); 
  t2.join(); 
  t3.join(); 
  std::cout << "Summe: " << sum << std::endl; 
} 

Non-exclusive locks of type boost::shared_lock can be used if threads only need read-only access to a specific resource. A thread modifying the resource needs write access and thus requires an exclusive lock. The reason for that should be obvious: Threads with read-only access do not recognize other threads accessing the same resource at the same time. Non-exclusive locks can therefore share a mutex.

In the given example, both print() and count() access random_numbers read-only. While the print() function writes the last number of random_numbers to the standard output stream, the count() function adds it to the variable sum. Since neither function modifies random_numbers, both can access it at the same time using a non-exclusive lock of type boost::shared_lock.

Inside the fill() function, an exclusive lock of type boost::unique_lock is required since it inserts new random numbers into random_numbers. The mutex is explicitly released using the unlock() method before fill() waits for one second. Opposed to the previous example, wait() is called at the end of the for loop to guarantee that at least one random number exists in the container before it is accessed by either print() or count(). Both of these functions call the wait() function at the beginning of their for loop accordingly.

Looking at the individual calls to the wait() function from different locations, one potential issue becomes apparent: The order of the function calls is directly affected by the order in which the CPU actually executes the individual threads. Using so-called condition variables, the individual threads can be synchronized so that elements added to random_numbers are immediately processed by a different thread accordingly.

#include <boost/thread.hpp> 
#include <iostream> 
#include <vector> 
#include <cstdlib> 
#include <ctime> 

boost::mutex mutex; 
boost::condition_variable_any cond; 
std::vector<int> random_numbers; 

void fill() 
{ 
  std::srand(static_cast<unsigned int>(std::time(0))); 
  for (int i = 0; i < 3; ++i) 
  { 
    boost::unique_lock<boost::mutex> lock(mutex); 
    random_numbers.push_back(std::rand()); 
    cond.notify_all(); 
    cond.wait(mutex); 
  } 
} 

void print() 
{ 
  std::size_t next_size = 1; 
  for (int i = 0; i < 3; ++i) 
  { 
    boost::unique_lock<boost::mutex> lock(mutex); 
    while (random_numbers.size() != next_size) 
      cond.wait(mutex); 
    std::cout << random_numbers.back() << std::endl; 
    ++next_size; 
    cond.notify_all(); 
  } 
} 

int main() 
{ 
  boost::thread t1(fill); 
  boost::thread t2(print); 
  t1.join(); 
  t2.join(); 
} 

The program in this example removed the wait() and count() functions. Threads do no longer wait for one second with every loop iteration but rather execute as fast as possible. Additionally, no total is calculated; numbers are solely written to the standard output stream.

To ensure correct processing of the random numbers, the individual threads are synchronized by a conditional variable that allows to check for certain conditions between multiple threads.

As before, the fill() function generates a random number with each iteration and places it in the random_numbers container. To block other threads from accessing the container at the same time, an exclusive lock is taken accordingly. Instead of waiting for one second, this example actually uses a conditional variable instead. Calling the notify_all() method will wake up every thread that has been waiting for this notification by calling the wait() method, respectively.

By looking at the for loop of the print() function, one can see that for the same conditional variable a method named wait() is called. If the thread is woken up by a call to notify_all(), it tries to acquire the mutex which will only succeed after it has been successfully released in the fill() function.

The trick here is that calling wait() also releases the corresponding mutex passed as parameter. After calling notify_all(), the fill() function releases the mutex by calling wait() accordingly. It then blocks and waits for some other thread to call notify_all() which happens in the print() function once the random number has been written to the standard output stream.

Notice that the call to the wait() method inside the print() function actually happens within a separate while loop. This is done to handle the scenario where a random number has already been placed in the container before the wait() method is called for the first time in print(). By comparing the number of stored elements in random_numbers with the expected number of elements, this scenario is successfully handled and the random number is written to the standard output stream.


6.4 Thread Local Storage

Thread Local Storage (TLS) is a dedicated storage area that can only be accessed by one thread. TLS variables can be seen as global variables only visible to a particular thread instead of the whole program. The following example shows the benefits of these variables.

#include <boost/thread.hpp> 
#include <iostream> 
#include <cstdlib> 
#include <ctime> 

void init_number_generator() 
{ 
  static bool done = false; 
  if (!done) 
  { 
    done = true; 
    std::srand(static_cast<unsigned int>(std::time(0))); 
  } 
} 

boost::mutex mutex; 

void random_number_generator() 
{ 
  init_number_generator(); 
  int i = std::rand(); 
  boost::lock_guard<boost::mutex> lock(mutex); 
  std::cout << i << std::endl; 
} 

int main() 
{ 
  boost::thread t[3]; 

  for (int i = 0; i < 3; ++i) 
    t[i] = boost::thread(random_number_generator); 

  for (int i = 0; i < 3; ++i) 
    t[i].join(); 
} 

The example creates three threads, each writing a random number to the standard output stream. The random_number_generator() function utilizes the std::rand() function, defined in the C++ standard, to create the random numbers. However, the generator used for std::rand() must be properly initialized with the std::srand() function. If not, the program will always print the same random number.

Initializing the random number generator takes place inside the init_number_generator() function with the aid of the std::time() function returning the current time. Since this value differs every time the application is executed, it is guaranteed that the generator is always initialized with a different value and thus provides new random numbers. Since the generator only needs to be initialized once, init_number_generator() uses a static variable named done as the condition.

If the program is executed a couple of times, it becomes apparent that two of the three random numbers written are always the same. The program actually has a defect: The generator on which std::rand() is based must be initialized for every thread using the function. Therefore, the implementation of init_number_generator() is actually incorrect since it only calls std::srand() once. Using TLS, this defect can be rectified.

#include <boost/thread.hpp> 
#include <iostream> 
#include <cstdlib> 
#include <ctime> 

void init_number_generator() 
{ 
  static boost::thread_specific_ptr<bool> tls; 
  if (!tls.get()) 
    tls.reset(new bool(false)); 
  if (!*tls) 
  { 
    *tls = true; 
    std::srand(static_cast<unsigned int>(std::time(0))); 
  } 
} 

boost::mutex mutex; 

void random_number_generator() 
{ 
  init_number_generator(); 
  int i = std::rand(); 
  boost::lock_guard<boost::mutex> lock(mutex); 
  std::cout << i << std::endl; 
} 

int main() 
{ 
  boost::thread t[3]; 

  for (int i = 0; i < 3; ++i) 
    t[i] = boost::thread(random_number_generator); 

  for (int i = 0; i < 3; ++i) 
    t[i].join(); 
} 

The static variable done has been replaced with a TLS variable tls which is based on the template class boost::thread_specific_ptr - instantiated with the data type bool. In principle, tls works just like done: It acts as a condition indicating whether or not the random number generator has already been initialized. The crucial difference, however, is that the value stored by tls is only visible and available to the corresponding thread.

Once a variable of type boost::thread_specific_ptr is created, it can be set accordingly. However, it now expects the address of a variable of type bool instead of the variable itself. Using the reset() method, the corresponding address can be stored in tls. In the given example, a variable of type bool is dynamically allocated and its address, returned by new, is stored in tls. To avoid setting tls every time the init_number_generator() function is called, it first checks whether an address is already stored via the get() method.

Since boost::thread_specific_ptr stores an address, it behaves like a regular pointer. Consequently, both the operator*() and operator->() operators are overloaded to simplify the usage. The example uses *tls to check whether the condition is currently true or false. Depending on the current condition, the random number generator is either initialized or not.

As seen, boost::thread_specific_ptr allows storing the address of an object for the current thread and only allows the current thread to retrieve the same address later on. While one thread already stored the address successfully, a different thread may have not.

If the program is now executed, it may come as a surprise that despite the TLS variable, the random numbers generated are still equal. This is due to the fact that all three threads are created at the same time and thus, the random number generators are initialized with the same time as well. If the program is executed a couple of times however, the random numbers will change indicating that the generators itself are initialized correctly.


6.5 Exercises

You can buy solutions to all exercises in this book as a ZIP file.

  1. Refactor the following program to calculate the total using two threads. Since many processors nowadays have two cores, the execution time should decrease by utilizing threads.

    #include <boost/date_time/posix_time/posix_time.hpp> 
    #include <boost/cstdint.hpp> 
    #include <iostream> 
    
    int main() 
    { 
      boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time(); 
    
      boost::uint64_t sum = 0; 
      for (int i = 0; i < 1000000000; ++i) 
        sum += i; 
    
      boost::posix_time::ptime end = boost::posix_time::microsec_clock::local_time(); 
      std::cout << end - start << std::endl; 
    
      std::cout << sum << std::endl; 
    } 
  2. Generalize the program from exercise 1 by utilizing as many threads as the processor can physically execute at the same time. For example, if the processor contains four cores, a total of four threads should be utilized.

  3. Change the following program by executing the thread() function in its own thread created in main(). The program should calculate the total and write it to the standard output stream twice. The implementation of calculate(), print() and thread() can be modified however, the signature of each function needs to stay the same. In other words, each function should still neither require any parameters nor return a value.

    #include <iostream> 
    
    int sum = 0; 
    
    void calculate() 
    { 
      for (int i = 0; i < 1000; ++i) 
        sum += i; 
    } 
    
    void print() 
    { 
      std::cout << sum << std::endl; 
    } 
    
    void thread() 
    { 
      calculate(); 
      print(); 
    } 
    
    int main() 
    { 
      thread(); 
    }