This is the third part of the Parallel execution for contests series. For your convenience you can find other parts in the table of contents in Part 1 — Powershell
Hello! Today we continue working on parallel codes for contests. Let’s go.
Idea
Last time we saw how to implement simple thread pooling in C# using ManualResetEvents. We basically want to achieve the same using C++ mechanisms. We are going to utilize constructs from C++11 in order to write portable code. I performed the tests using VC++ 2015.
Implementation
We first start with simple ThreadPool available on GitHub
We start with the following implementation:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | class ThreadPool { public: 	ThreadPool(size_t); 	template < class F , class... Args > 	auto enqueue(F&& f, Args&&... args) 		->std::future < typename std::result_of < F (Args...)>::type>; 	template < class F > 	auto run(int taskCount, F&& f)->void; 	~ThreadPool(); private: 	// need to keep track of threads so we can join them 	std::vector < std::thread > workers; 	// the task queue 	std::queue < std::function < void()> > tasks; 	// synchronization 	std::mutex queue_mutex; 	std::condition_variable condition; 	bool stop; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads) 	: stop(false) { 	for (size_t i = 0; i < threads; ++i) 		workers.emplace_back( 			[this] 	{ 		for (;;) 		{ 			std::function < void()> task; 			{ 				std::unique_lock < std::mutex> lock(this->queue_mutex); 				this->condition.wait(lock, 					[this] { return this->stop || !this->tasks.empty(); }); 				if (this->stop && this->tasks.empty()) 					return; 				task = std::move(this->tasks.front()); 				this->tasks.pop(); 			} 			task(); 		} 	} 	); } // add new work item to the pool template < class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future < typename std::result_of<F(Args...)>::type> { 	using return_type = typename std::result_of < F(Args...)>::type; 	auto task = std::make_shared < std::packaged_task < return_type()> >( 		std::bind(std::forward < F>(f), std::forward < Args>(args)...) 		); 	std::future < return_type> res = task->get_future(); 	{ 		std::unique_lock < std::mutex> lock(queue_mutex); 		// don't allow enqueueing after stopping the pool 		if (stop) 			throw std::runtime_error("enqueue on stopped ThreadPool"); 		tasks.emplace([task]() { (*task)(); }); 	} 	condition.notify_one(); 	return res; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { 	{ 		std::unique_lock < std::mutex> lock(queue_mutex); 		stop = true; 	} 	condition.notify_all(); 	for (std::thread &worker : workers) 		worker.join(); } | 
Next, we implement custom ManualResetEvent:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | class ManualResetEvent { public: 	ManualResetEvent(bool state = false) : _state(state) {} 	ManualResetEvent(const ManualResetEvent& other) = delete; 	void WaitOne() { 		std::unique_lock<std::mutex> lock(_sync); 		while (!_state) { 			_underlying.wait(lock); 		} 	} 	void Set() { 		std::unique_lock<std::mutex> lock(_sync); 		_state = true; 		_underlying.notify_all(); 	} private: 	std::condition_variable _underlying; 	std::mutex _sync; 	std::atomic<bool>  _state; }; | 
Finally, we implement custom function to queue tasks:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | template < class F> auto ThreadPool::run(int taskCount, F&& f)->void { 	ThreadPool pool(taskCount); 	std::vector< std::future<int> > tasks; 	ManualResetEvent *readEvents = new ManualResetEvent[taskCount + 1]; 	ManualResetEvent *writeEvents = new ManualResetEvent[taskCount + 1]; 	bool *readingFlags = new bool[taskCount + 1]; 	bool *writingFlags = new bool[taskCount + 1]; 	for (int i = 0; i < taskCount + 1; ++i) { 		readingFlags[i] = false; 		writingFlags[i] = false; 	} 	{ 		tasks.push_back(pool.enqueue([&] { 			f([&] { 				if (!readingFlags[1]) { 					readingFlags[1] = true; 					return; 				} 				readEvents[1].Set(); 			}, [&] { 				if (!writingFlags[1]) { 					writingFlags[1] = true; 					return; 				} 				writeEvents[1].Set(); 			}, 1); 			return 1; 		})); 	} 	for (int i = 2; i < taskCount; ++i) 	{ 		int id = i; 		tasks.push_back(pool.enqueue([&, id] { 			f([&, id] { 				if (!readingFlags[id]) { 					readEvents[id - 1].WaitOne(); 					readingFlags[id] = true; 					return; 				} 				readEvents[id].Set(); 			}, [&, id] { 				if (!writingFlags[id]) { 					writeEvents[id - 1].WaitOne(); 					writingFlags[id] = true; 					return; 				} 				writeEvents[id].Set(); 			}, id); 			return id; 		})); 	} 	if (taskCount > 1) 	{ 		tasks.push_back(pool.enqueue([&] { 			f([&] { 				if (!readingFlags[taskCount]) { 					readEvents[taskCount - 1].WaitOne(); 					readingFlags[taskCount] = true; 				} 			}, [&] { 				if (!writingFlags[taskCount]) { 					writeEvents[taskCount - 1].WaitOne(); 					writingFlags[taskCount] = true; 				} 			}, taskCount); 			return taskCount; 		})); 	} 	for (auto && result : tasks) { 		result.get(); 	} 	delete[] readEvents; 	delete[] writeEvents; 	delete[] readingFlags; 	delete[] writingFlags; } | 
The code is very similar to C# implementation. Finally, we want to execute the code:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | ThreadPool pool(8); pool.run(10, [](auto signalRead, auto signalWrite, int id) { 	signalRead(); 	printf("Reading %d\n", id); 	std::this_thread::sleep_for(std::chrono::milliseconds((10 - id) * 500)); 	signalRead(); 	printf("Transforming %d\n", id); 	std::this_thread::sleep_for(std::chrono::milliseconds((10 - id) * 500)); 	signalWrite(); 	printf("Writing %d\n", id); 	std::this_thread::sleep_for(std::chrono::milliseconds((10 - id) * 100)); 	signalWrite(); }); | 
Summary
This simple thread pool allows us to run multiple test cases in parallel. We also don’t need to modify our algorithm really much — all we need to do is invoke two signal functions in correct order and everything else will go smoothly. Using this C++ code and Powershell code from the part 1 we can easily execute multiple tests in parallel and save some time when our algorithm is not optimal.