Today we are going to implement trivial ScheduledExecutorService in C#.
Table of Contents
Introduction
We want to have a thread pool of fixed size which we can use to execute three types of tasks:
- One time tasks
- Tasks with fixed delay between executions
- Tasks with fixed rate of executions
Let’s go.
Bookkeeping
We start with the following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
public class ScheduledThreadPoolExecutor { public int ThreadCount => threads.Length; public EventHandler< Exception> OnException; private ManualResetEvent waiter; private Thread[] threads; private SortedSet< Tuple< DateTime, Action>> queue; public ScheduledThreadPoolExecutor(int threadCount) { waiter = new ManualResetEvent(false); queue = new SortedSet< Tuple< DateTime, Action>>(); OnException += (o, e) => { }; threads = Enumerable.Range(0, threadCount).Select(i => new Thread(RunLoop)).ToArray(); foreach(var thread in threads) { thread.Start(); } } private void RunLoop() { while (true) { TimeSpan sleepingTime = TimeSpan.MaxValue; bool needToSleep = true; Action task = null; try { lock (waiter) { if (queue.Any()) { if(queue.First().Item1 < = DateTime.Now) { task = queue.First().Item2; queue.Remove(queue.First()); needToSleep = false; } else { sleepingTime = queue.First().Item1 - DateTime.Now; } } } if (needToSleep) { waiter.WaitOne((int)sleepingTime.TotalMilliseconds); } else { task(); } } catch (Exception e) { OnException(task, e); } } } } |
We use queue of pairs of DateTime
indicating expected start time and Action
to execute. We have array of threads doing the job. We also have ManualResetEvent
for synchronizing threads. We also have exception handling mechanism.
We want to do the following: whenever a new task is submitted, we add it to the queue and set the event to notify other threads that there is job to do. Every thread is in infinite loop. With each iteration it checks if there is something to do.
If there is a task available for execution, the thread just removes it from the queue and does the job.
If there is a waiting task, the thread just waits for event to be set but with maximum timeout set. Now two things can happen: either there is a timeout or the event is set. The former case means that there was nothing new submitted so we should spin again, get the task from the queue which now should be ready for being executed. The latter indicates that some new task was submitted, so we need to check it to see for how long we should sleep.
If there are no tasks in the queue, we just wait for the event to be set.
One time execution
Now we want to schedule task which is to be executed exactly once after some delay. So we do this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public void Schedule(Action action, TimeSpan initialDelay) { Schedule(action, DateTime.Now + initialDelay); } private void Schedule(Action action, DateTime time) { lock (waiter) { queue.Add(Tuple.Create(time, action)); } waiter.Set(); } |
We calculate the start time, add the task to the queue and set the event. Notice that the task might be executed much later if there are no available threads.
Execution with fixed delay
1 2 3 4 5 6 7 8 |
public void ScheduleWithFixedDelay(Action action, TimeSpan initialDelay, TimeSpan delay) { Schedule(() => { action(); ScheduleWithFixedDelay(action, delay, delay); }, DateTime.Now + initialDelay); } |
We schedule a task which does the job and then registers itself again with delay
passed as initialDelay
. Notice that if we get exception during execution, we don’t register the task again (as expected). If we would like to do this, we could wrap another registration with finally
block.
Execution at fixed rate
Now comes the tricky part:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public void ScheduleAtFixedRate(Action action, TimeSpan initialDelay, TimeSpan delay) { DateTime scheduleTime = DateTime.Now + initialDelay; Action registerTask = null; registerTask = () => { Schedule(() => { action(); scheduleTime += delay; registerTask(); }, scheduleTime); }; registerTask(); } |
We calculate the scheduleTime
which is time of first execution. We do the job and then add delay
to calculate time of next invocation. Notice, that the result might be in the past (if the action
was too slow). This is perfectly fine as we effectively queue missed execution and if this time things go faster we can catch up.
Summary
Very simple implementation, should do the job generally but may be too slow for bigger thread pools. We might want to use spin locks and lock-free collections to speed things up but for basis purposes this should do the job.