Kenan Bandaliyev

Contents


Thread Pool implementation in C++


19-06-2024

|

7 mins read

Today I will talk about a thread pool implementation I have written for image loading in CR3Converter project I have been working on.

Why do we need a thread pool?

If you are reading this you probably know what a thread is. If you don't, a thread is a sequence of instructions that can be executed independently of other code. Threads are used to execute multiple tasks concurrently. There are, naturally, a wide range of implementations of threads in different programming languages. The most common one, the one that you have probably seen in you OS course, pthreads in C. While we are not going to explain the details of how threads work, we will talk about the problems that arise when you have too many of them.

I have mentioned above that the threads in my application are used for image loading. The naive way of loading images could be to load images sequentially. This means the program will load the first image, then the second, then the third and so on. This is not a problem if you have a small number of images. But what if you have hundreds of images? Loading them sequentially will take a lot of time. Such a task is perfect usage of threads. You can load multiple images at the same time. But what if you have thousands of images? Creating a thread for each image is not a good idea.

The wrong way

Let's imagine that we have a thousand image in a directory that has to be loaded. Spawning thousand threads in C++ std::thread library will take enormous amount of resources. Apart from this we have to consider taht the threads have to be handled by OS which is going to bring a large overhead. If you are not convinced, lets test our hypothesis. I have a folder that contains close to 200 images. I have written a simple program that loads the images sequentially and measures the time it takes to load them.


struct Image {
    char* data;
    int width;
    int height;
};

void LoadImageImpl(std::queue& gl_queue, const std::filesystem::path& path) {
    Image* image = new Image();
    LibRaw iProcessor;
    int result = iProcessor.open_file(path.string().c_str());
    result = iProcessor.unpack();

    auto data = iProcessor.dcraw_make_mem_image();
    image->data = (char*)data->data;
    image->width = iProcessor.imgdata.thumbnail.twidth;
    image->height = iProcessor.imgdata.thumbnail.theight;

    std::mutex images_mutex;
    {
        std::lock_guard lock(images_mutex);
        gl_queue.push(image);
    }
    iProcessor.recycle();
    std::cout << "Loaded image " << path << std::endl;
}

int main() {
    std::queue gl_queue;
    std::vector> futures;
    for (const auto& entry : std::filesystem::directory_iterator("images")) {
        futures.push_back(std::async(std::launch::async, 
                          LoadImageImpl,
                          std::ref(gl_queue), 
                          entry.path()));
    }

    for (auto& future : futures) {
        future.get();
    }
}

We can see how much time it takes to load all the images using the time command. This will give us the following output:


________________________________________________________
Executed in    7.89 secs    fish           external
   usr time   44.49 secs  585.00 micros   44.49 secs
   sys time    7.23 secs  359.00 micros    7.23 secs

The execution of the program took 7.89 seconds. This is not terrible, however could be improved a lot.

Thread Pool

A thread pool is a group of threads that are created once and reused. The threads in the pool are used to execute tasks. When a task is submitted to the pool, the pool assigns the task to one of the threads in the pool. When the task is completed, the thread is returned to the pool and is ready to execute another task. My implementation of a thread pool is pretty basic class. Generally, a thread pool contains a queue of tasks and a group of threads that are waiting for tasks. When a task is submitted to the pool, it is added to the queue. We can simply add a function pointer to a queue and thread will execute the given function.

What is beautiful about thread pools is that the number of threads in the pool is fixed. This means that the number of threads in the pool is not going to grow indefinitely. Another advantage of a thread pool is that the structure that we are going to implement doesn't care about what the type of the task is. As long as the function is a void function that takes no arguments, it can be added to the queue. This is the basic idea of a thread pool.

Let's see how we can use the thread pool to load the images.


class ThreadPool {
public:
    ThreadPool(size_t num_threads) {
        for (size_t i = 0; i < num_threads; i++) {
            threads.push_back(std::thread(&ThreadPool::Worker, this));
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (auto& thread : threads) {
            thread.join();
        }
    }

    void Enqueue(std::function task) {
        std::unique_lock lock(queue_mutex);
        tasks.push(task);
        condition.notify_one();
    }
    
    private:
    void Worker() {
        while (true) {
            std::function task;
            {
                std::unique_lock lock(queue_mutex);
                condition.wait(lock, [this] { return stop || !tasks.empty(); });
                if (stop && tasks.empty()) {
                    return;
                }
                task = tasks.front();
                tasks.pop();
            }
            task();
        }
    }

    std::vector threads;
    std::queue> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop = false;
};

int main() {
    // Previously defined LoadImageImpl function
    ThreadPool pool(std::thread::hardware_concurrency());
    for (const auto& entry : std::filesystem::directory_iterator("images")) {
        pool.Enqueue(std::bind(LoadImageImpl, std::ref(gl_queue), entry.path()));
    }
}

Note that here we spawned as many threads as the number of cores in the system. This is a good practice as the number of threads should be equal to the number of cores in the system. Let's see how this implementation performs:


________________________________________________________
Executed in    4.20 secs    fish           external
   usr time   42.81 secs  675.00 micros   42.81 secs
   sys time    5.18 secs  420.00 micros    5.18 secs

The execution of the program took 4.20 seconds. This is more than twice as fast as the previous implementation. Moreover, we are not in need of thinking about the old systems which might suffer under tons of threads. Overall, a simple thread pool implementation can be a great tool to speed up an application with a limited cost. Of course this is a very basic implementation of a thread pool and there are many other features that can be added to it; for example, we can add a way to return the result of the task. However because we are dealing with C++ writing a generic function that can return any type is not that aesthetically pleasing. This is as far as we go today. I hope you have learned something new.