This is the third part of the Parallel execution for contests series. For your convenience you can find other parts in the table of contents in Part 1 — Powershell
Hello! Today we continue working on parallel codes for contests. Let’s go.
Idea
Last time we saw how to implement simple thread pooling in C# using ManualResetEvents. We basically want to achieve the same using C++ mechanisms. We are going to utilize constructs from C++11 in order to write portable code. I performed the tests using VC++ 2015.
Implementation
We first start with simple ThreadPool available on GitHub
We start with the following implementation:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
class ThreadPool { public: ThreadPool(size_t); template < class F , class... Args > auto enqueue(F&& f, Args&&... args) ->std::future < typename std::result_of < F (Args...)>::type>; template < class F > auto run(int taskCount, F&& f)->void; ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector < std::thread > workers; // the task queue std::queue < std::function < void()> > tasks; // synchronization std::mutex queue_mutex; std::condition_variable condition; bool stop; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; ++i) workers.emplace_back( [this] { for (;;) { std::function < void()> task; { std::unique_lock < std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); }); if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } } ); } // add new work item to the pool template < class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future < typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of < F(Args...)>::type; auto task = std::make_shared < std::packaged_task < return_type()> >( std::bind(std::forward < F>(f), std::forward < Args>(args)...) ); std::future < return_type> res = task->get_future(); { std::unique_lock < std::mutex> lock(queue_mutex); // don't allow enqueueing after stopping the pool if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task]() { (*task)(); }); } condition.notify_one(); return res; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock < std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread &worker : workers) worker.join(); } |
Next, we implement custom ManualResetEvent:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
class ManualResetEvent { public: ManualResetEvent(bool state = false) : _state(state) {} ManualResetEvent(const ManualResetEvent& other) = delete; void WaitOne() { std::unique_lock<std::mutex> lock(_sync); while (!_state) { _underlying.wait(lock); } } void Set() { std::unique_lock<std::mutex> lock(_sync); _state = true; _underlying.notify_all(); } private: std::condition_variable _underlying; std::mutex _sync; std::atomic<bool> _state; }; |
Finally, we implement custom function to queue tasks:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
template < class F> auto ThreadPool::run(int taskCount, F&& f)->void { ThreadPool pool(taskCount); std::vector< std::future<int> > tasks; ManualResetEvent *readEvents = new ManualResetEvent[taskCount + 1]; ManualResetEvent *writeEvents = new ManualResetEvent[taskCount + 1]; bool *readingFlags = new bool[taskCount + 1]; bool *writingFlags = new bool[taskCount + 1]; for (int i = 0; i < taskCount + 1; ++i) { readingFlags[i] = false; writingFlags[i] = false; } { tasks.push_back(pool.enqueue([&] { f([&] { if (!readingFlags[1]) { readingFlags[1] = true; return; } readEvents[1].Set(); }, [&] { if (!writingFlags[1]) { writingFlags[1] = true; return; } writeEvents[1].Set(); }, 1); return 1; })); } for (int i = 2; i < taskCount; ++i) { int id = i; tasks.push_back(pool.enqueue([&, id] { f([&, id] { if (!readingFlags[id]) { readEvents[id - 1].WaitOne(); readingFlags[id] = true; return; } readEvents[id].Set(); }, [&, id] { if (!writingFlags[id]) { writeEvents[id - 1].WaitOne(); writingFlags[id] = true; return; } writeEvents[id].Set(); }, id); return id; })); } if (taskCount > 1) { tasks.push_back(pool.enqueue([&] { f([&] { if (!readingFlags[taskCount]) { readEvents[taskCount - 1].WaitOne(); readingFlags[taskCount] = true; } }, [&] { if (!writingFlags[taskCount]) { writeEvents[taskCount - 1].WaitOne(); writingFlags[taskCount] = true; } }, taskCount); return taskCount; })); } for (auto && result : tasks) { result.get(); } delete[] readEvents; delete[] writeEvents; delete[] readingFlags; delete[] writingFlags; } |
The code is very similar to C# implementation. Finally, we want to execute the code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
ThreadPool pool(8); pool.run(10, [](auto signalRead, auto signalWrite, int id) { signalRead(); printf("Reading %d\n", id); std::this_thread::sleep_for(std::chrono::milliseconds((10 - id) * 500)); signalRead(); printf("Transforming %d\n", id); std::this_thread::sleep_for(std::chrono::milliseconds((10 - id) * 500)); signalWrite(); printf("Writing %d\n", id); std::this_thread::sleep_for(std::chrono::milliseconds((10 - id) * 100)); signalWrite(); }); |
Summary
This simple thread pool allows us to run multiple test cases in parallel. We also don’t need to modify our algorithm really much — all we need to do is invoke two signal functions in correct order and everything else will go smoothly. Using this C++ code and Powershell code from the part 1 we can easily execute multiple tests in parallel and save some time when our algorithm is not optimal.