This is the third part of the Parallel execution for contests series. For your convenience you can find other parts in the table of contents in Part 1 — Powershell

Hello! Today we continue working on parallel codes for contests. Let’s go.

Idea

Last time we saw how to implement simple thread pooling in C# using ManualResetEvents. We basically want to achieve the same using C++ mechanisms. We are going to utilize constructs from C++11 in order to write portable code. I performed the tests using VC++ 2015.

Implementation

We first start with simple ThreadPool available on GitHub
We start with the following implementation:

class ThreadPool {
public:
	ThreadPool(size_t);
	template < class F , class... Args >
	auto enqueue(F&& f, Args&&... args)
		->std::future < typename std::result_of < F (Args...)>::type>;

	template < class F >
	auto run(int taskCount, F&& f)->void;
	~ThreadPool();
private:
	// need to keep track of threads so we can join them
	std::vector < std::thread > workers;
	// the task queue
	std::queue < std::function < void()> > tasks;

	// synchronization
	std::mutex queue_mutex;
	std::condition_variable condition;
	bool stop;
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
	: stop(false)
{
	for (size_t i = 0; i < threads; ++i)
		workers.emplace_back(
			[this]
	{
		for (;;)
		{
			std::function < void()> task;

			{
				std::unique_lock < std::mutex> lock(this->queue_mutex);
				this->condition.wait(lock,
					[this] { return this->stop || !this->tasks.empty(); });
				if (this->stop && this->tasks.empty())
					return;
				task = std::move(this->tasks.front());
				this->tasks.pop();
			}

			task();
		}
	}
	);
}

// add new work item to the pool
template < class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future < typename std::result_of::type>
{
	using return_type = typename std::result_of < F(Args...)>::type;

	auto task = std::make_shared < std::packaged_task < return_type()> >(
		std::bind(std::forward < F>(f), std::forward < Args>(args)...)
		);

	std::future < return_type> res = task->get_future();
	{
		std::unique_lock < std::mutex> lock(queue_mutex);

		// don't allow enqueueing after stopping the pool
		if (stop)
			throw std::runtime_error("enqueue on stopped ThreadPool");

		tasks.emplace([task]() { (*task)(); });
	}
	condition.notify_one();
	return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
	{
		std::unique_lock < std::mutex> lock(queue_mutex);
		stop = true;
	}
	condition.notify_all();
	for (std::thread &worker : workers)
		worker.join();
}

Next, we implement custom ManualResetEvent:

class ManualResetEvent
{
public:
	ManualResetEvent(bool state = false) : _state(state) {}
	ManualResetEvent(const ManualResetEvent& other) = delete;

	void WaitOne() {
		std::unique_lock lock(_sync);
		while (!_state) {
			_underlying.wait(lock);
		}
	}

	void Set() {
		std::unique_lock lock(_sync);
		_state = true;
		_underlying.notify_all();
	}

private:
	std::condition_variable _underlying;
	std::mutex _sync;
	std::atomic  _state;
};

Finally, we implement custom function to queue tasks:

template < class F>
auto ThreadPool::run(int taskCount, F&& f)->void {
	ThreadPool pool(taskCount);
	std::vector< std::future > tasks;

	ManualResetEvent *readEvents = new ManualResetEvent[taskCount + 1];
	ManualResetEvent *writeEvents = new ManualResetEvent[taskCount + 1];
	bool *readingFlags = new bool[taskCount + 1];
	bool *writingFlags = new bool[taskCount + 1];
	for (int i = 0; i < taskCount + 1; ++i) {
		readingFlags[i] = false;
		writingFlags[i] = false;
	}

	{
		tasks.push_back(pool.enqueue([&] {
			f([&] {
				if (!readingFlags[1]) {
					readingFlags[1] = true;
					return;
				}
				readEvents[1].Set();
			}, [&] {
				if (!writingFlags[1]) {
					writingFlags[1] = true;
					return;
				}
				writeEvents[1].Set();
			}, 1);
			return 1;
		}));
	}

	for (int i = 2; i < taskCount; ++i)
	{
		int id = i;
		tasks.push_back(pool.enqueue([&, id] {
			f([&, id] {
				if (!readingFlags[id]) {
					readEvents[id - 1].WaitOne();
					readingFlags[id] = true;
					return;
				}
				readEvents[id].Set();
			}, [&, id] {
				if (!writingFlags[id]) {
					writeEvents[id - 1].WaitOne();
					writingFlags[id] = true;
					return;
				}
				writeEvents[id].Set();
			}, id);
			return id;
		}));
	}

	if (taskCount > 1)
	{
		tasks.push_back(pool.enqueue([&] {
			f([&] {
				if (!readingFlags[taskCount]) {
					readEvents[taskCount - 1].WaitOne();
					readingFlags[taskCount] = true;
				}
			}, [&] {
				if (!writingFlags[taskCount]) {
					writeEvents[taskCount - 1].WaitOne();
					writingFlags[taskCount] = true;
				}
			}, taskCount);
			return taskCount;
		}));
	}

	for (auto && result : tasks) {
		result.get();
	}

	delete[] readEvents;
	delete[] writeEvents;
	delete[] readingFlags;
	delete[] writingFlags;
}

The code is very similar to C# implementation. Finally, we want to execute the code:

ThreadPool pool(8);
pool.run(10, [](auto signalRead, auto signalWrite, int id) {
	signalRead();
	printf("Reading %d\n", id);
	std::this_thread::sleep_for(std::chrono::milliseconds((10 - id) * 500));
	signalRead();

	printf("Transforming %d\n", id);
	std::this_thread::sleep_for(std::chrono::milliseconds((10 - id) * 500));

	signalWrite();
	printf("Writing %d\n", id);
	std::this_thread::sleep_for(std::chrono::milliseconds((10 - id) * 100));
	signalWrite();
});

Summary

This simple thread pool allows us to run multiple test cases in parallel. We also don’t need to modify our algorithm really much — all we need to do is invoke two signal functions in correct order and everything else will go smoothly. Using this C++ code and Powershell code from the part 1 we can easily execute multiple tests in parallel and save some time when our algorithm is not optimal.