This is the tenth part of the Async Wandering series. For your convenience you can find other parts in the table of contents in Part 1 – Why creating Form from WinForms in unit tests breaks async?
In Part 8 I mentioned that async could be implemented as fibers and this is the goal of project Loom in JVM world. Let’s see this super simple implementation. This is not supposed to work reliably in production (as .NET officially doesn’t support fibers) but is presented as a sample that it could be done.
Sidenote: you may want to read about internals of async state machine and synchronization contexts before moving forward. I’ll skip details of async code under the hood but will assume you know how it works.
Table of Contents
Regular async code
Okay, let’s begin with defining what we would like to do. Let’s take this code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
using System; using System.Threading; namespace AsyncLoom { class Program { static void Main(string[] args) { WhereAmI("Start - RegularAsync"); RegularAsync.Run().Wait(); WhereAmI("End - RegularAsync\n"); } public static void WhereAmI(string what) { Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} Time {DateTime.Now}: {what}"); } } } |
This is just as skeleton which we will use to trigger jobs. We have helper method for printing thread ID and timestamp.
Let’s take regular async code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
using System; using System.Threading.Tasks; namespace AsyncLoom { public class RegularAsync { public static async Task Run() { Program.WhereAmI("Before nesting"); var nested = RunNested(); Program.WhereAmI("Between nesting"); await nested; Program.WhereAmI("After nesting"); } public static async Task RunNested() { Program.WhereAmI("Before creating task"); var delay = Task.Delay(2000); Program.WhereAmI("After creating delay"); await delay; Program.WhereAmI("After sleeping"); var data = Task.Run(() => "Some string"); Program.WhereAmI("After creating data"); var result = await data; Program.WhereAmI($"After reading data {result}"); } } } |
Nothing magical here. We start with top async function called Run
which starts executing RunNested
. We start executing nested function until line 29 in which we call await delay
which results in checking whether the task is finished (it is not) and we return from the function. Next, we print in line 14 and then wait in line 16.
At this point the thread is free to do anything it wants. In our case we just block it but it could observe message pump or network requests.
Some time later we finish waiting and resume in line 31. Output:
1 2 3 4 5 6 7 8 9 10 |
Thread 1 Time 6/2/2020 9:48:52 PM: Start - RegularAsync Thread 1 Time 6/2/2020 9:48:52 PM: Before nesting Thread 1 Time 6/2/2020 9:48:52 PM: Before creating task Thread 1 Time 6/2/2020 9:48:52 PM: After creating delay Thread 1 Time 6/2/2020 9:48:52 PM: Between nesting Thread 4 Time 6/2/2020 9:48:54 PM: After sleeping Thread 4 Time 6/2/2020 9:48:54 PM: After creating data Thread 4 Time 6/2/2020 9:48:54 PM: After reading data Some string Thread 4 Time 6/2/2020 9:48:54 PM: After nesting Thread 1 Time 6/2/2020 9:48:54 PM: End - RegularAsync |
You can see 2 seconds delay between lines 5 and 6.
Coroutines
Let’s now reimplement this solution with coroutines. You probably know that fibers, coroutines, and user mode threads are equivalent to some extent. By going with coroutines we’ll see the general logic.
Coroutines in C# can be easily implemented with yield return
and this is what we’re going to do. We need to implement message queue (just like for a regular thread pool thread) and continuations. Implementation will be messy because it’s just for experimentation so it has little to no error checking (and probably has race conditions as well).
Message loop
We start with this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
using System.Threading.Tasks; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; namespace AsyncLoom { public static class GeneratorAsync { public static ConcurrentDictionary<EnumeratorWithFlag, byte> readyToGo = new ConcurrentDictionary<EnumeratorWithFlag, byte>(); public static ConcurrentDictionary<EnumeratorWithFlag, byte> allJobs = new ConcurrentDictionary<EnumeratorWithFlag, byte>(); public static Semaphore semaphore = new Semaphore(0, int.MaxValue); public static EnumeratorWithFlag current; public class EnumeratorWithFlag { public IEnumerator<int> enumerator; private bool lastState; private EnumeratorWithFlag previous; public EnumeratorWithFlag(IEnumerator<int> enumerator) { this.enumerator = enumerator; this.lastState = true; } public bool MoveNext() { lastState = lastState && enumerator.MoveNext(); if (!lastState && previous != null) { this.enumerator = this.previous.enumerator; this.lastState = true; this.previous = this.previous.previous; } return lastState; } public void Push(IEnumerator<int> enumerator) { this.previous = new EnumeratorWithFlag(this.enumerator); this.enumerator = enumerator; } } } } |
We have a bunch of collections. First in line 10 is readyToGo
— these are coroutines which can be executed because they finished waiting for dependencies.
Next in line 11 we have a collection of all jobs so we know when we’re done with calculations.
We also need to have semaphore for synchronization in line 12, and current executed coroutine in line 13.
Next, we have a helper class EnumeratorWithFlag
which keeps a chain of continuations and stack traces. It wraps regular enumerator and propagates context if needed. It also has method Push
which adds new enumerator on top of existing one.
Idea here is: EnumeratorWithFlag
represents current coroutine. If we move to some other (by calling it) we push it to the chain with Push
method. However, MoveNext
is clever enough to restore the state if needed so we traverse whole chain with just one instance of EnumeratorWithFlag
stored in collections.
Now the main method:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
public static void Run() { var runInternal = new EnumeratorWithFlag(RunInternal().GetEnumerator()); allJobs.TryAdd(runInternal, 0); readyToGo.TryAdd(runInternal, 0); semaphore.Release(); var sideJob = new EnumeratorWithFlag(SideJob().GetEnumerator()); allJobs.TryAdd(sideJob, 0); readyToGo.TryAdd(sideJob, 0); semaphore.Release(); bool triggered; while (true) { triggered = false; var keys = readyToGo.Keys.GetEnumerator(); while(keys.MoveNext()){ current = keys.Current; triggered = true; if (!current.MoveNext()) { byte b; readyToGo.TryRemove(current, out b); allJobs.TryRemove(current, out b); } } if (allJobs.IsEmpty) { break; } if (!triggered) { semaphore.WaitOne(); } } } |
This is quite a lot. First, we schedule a normal job to execute in lines 3-6. We do some bookkeeping and also add new token in the semaphore to signal that there may be something running under the hood.
Next, in lines 8-11 we simulate new request coming to the thread in the meantime. Think of message in message pump or network call while we execute something.
Finally, lines 14-40 execute the message loop.
We start in line 16 with setting flag if we executed any step in this iteration. This is to avoid deadlocks if we have something to do and we don’t need to wait.
Next, we get something to execute (lines 17-20). In line 23 we execute one step of coroutine. If it finished whole generator (and returned false
), we remove it from collections to not bother with it anymore.
Finally, we break in line 33 if we’re done. However, if we didn’t execute a thing and there are unfinished coroutines, we wait with semaphore.
It is important to remember that we’re generally single threaded here. Collections may be modified outside of this thread only when we execute some continuation on other thread but whole message loop generally runs coroutines synchronously.
async as coroutines
Now jobs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
private static IEnumerable<int> RunInternal() { Program.WhereAmI("Before nesting"); var nested = RunInternalNested().GetEnumerator().Start(); Program.WhereAmI("Between nesting"); yield return nested.MagicWait(); Program.WhereAmI("After nesting"); } private static IEnumerable<int> RunInternalNested() { Program.WhereAmI("Before creating task"); var delay = Task.Delay(2000); Program.WhereAmI("After creating delay"); yield return delay.MagicWait(); Program.WhereAmI("After sleeping"); var data = Task.Run(() => "Some string"); Program.WhereAmI("After creating data"); yield return data.MagicWait(); var result = data.Result; Program.WhereAmI($"After reading data {result}"); } private static IEnumerable<int> SideJob() { Program.WhereAmI("Side job"); yield break; } |
We start with very similar code as last time. Notice in line 5 that we need to manually trigger the coroutine — this is difference between foreach
and await
in C#. The former executes coroutine only when iterating through the collection, the latter starts the coroutine immediately (we don’t need to await
it).
Next, notice how we use yield return
in lines 9, 22, and 30. These are equivalent to doing await
, only we work with enumerators. This also shows how enumerators in C# have the same issue as await
— they require specific return type so we need to pollute our codebase (yield return
and IEnumerable
all the way up).
Finally, see in line 31 how we get result from the Task
. We magically wait for it in line 30 so in line 31 it is already resolved. We just read the result directly as it must be available (so this is non-blocking).
Okay, let’s see magic:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
static class TaskExtensions { public static int MagicWait(this Task t) { var current = GeneratorAsync.current; byte b; GeneratorAsync.readyToGo.TryRemove(current, out b); t.ContinueWith(r => { GeneratorAsync.readyToGo.TryAdd(current, 0); GeneratorAsync.semaphore.Release(); }); return 0; } public static int MagicWait(this IEnumerator<int> e) { if (GeneratorAsync.current.enumerator != e) { e.Start(); } return 0; } public static IEnumerator<int> Start(this IEnumerator<int> e) { GeneratorAsync.current.Push(e); GeneratorAsync.current.MoveNext(); return e; } } |
So we have 3 helper methods. MagicWait
in line 3 takes current coroutine (line 5), deschedules it (line 7), and then sets continuation which restores the coroutine (line 9). Also, continuation notifies main thread that there is something ready for execution (line 10). This is the only place where we are multithreaded — continuation runs anywhere (specifically on other thread pool thread) and sets flags, we synchronize later using message queue.
Next, MagicWait
in line 15 waits for coroutine called directly (not something requiring waiting). We check if it is not current coroutine and start it if needed.
Finally, Start
method pushes the chain (so it’s like a new frame on a call stack) and executes one step of it (by calling MoveNext
).
Notice how we access static fields here and there. They could be hidden but I wanted to show something similar to real async
implementation. It accesses hidden global state in a very similar manner.
Output:
1 2 3 4 5 6 7 8 9 10 11 |
Thread 1 Time 6/2/2020 9:48:54 PM: Start - GeneratorAsync Thread 1 Time 6/2/2020 9:48:54 PM: Before nesting Thread 1 Time 6/2/2020 9:48:54 PM: Before creating task Thread 1 Time 6/2/2020 9:48:54 PM: After creating delay Thread 1 Time 6/2/2020 9:48:54 PM: Between nesting Thread 1 Time 6/2/2020 9:48:54 PM: Side job Thread 1 Time 6/2/2020 9:48:56 PM: After sleeping Thread 1 Time 6/2/2020 9:48:56 PM: After creating data Thread 1 Time 6/2/2020 9:48:56 PM: After reading data Some string Thread 1 Time 6/2/2020 9:48:56 PM: After nesting Thread 1 Time 6/2/2020 9:48:56 PM: End - GeneratorAsync |
Couple of important things here. First, we execute everything on one thread only (compare with previous output).
Second, we see second job executed while waiting (line 6). So we can see that the thread is effectively unblocked and can handle other messages in the middle, while waiting for continuations.
Finally, we see nesting works correctly — it would be super easy to write After nesting
in line 6 but that would break the code flow.
Fibers in C++/CLI
Okay, now we want to implement previous example with fibers. They are provided by WinAPI but not exposed to .NET so we need to start with wrappers.
I’m working with .NET Framework 4.7.2 on W10 Pro x64. You can play with other platforms, changes should be relatively simple.
We create C++/CLI wrapper. First part:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
using namespace System; using namespace System::Collections::Generic; namespace AsyncLoomCli { public ref class FiberHelper { public: Dictionary<int, long>^ actions; FiberHelper() { actions = gcnew Dictionary<int, long>(); } void Convert() { actions->Add(0, (long)ConvertThreadToFiber(NULL)); } void Create(int action) { actions->Add(action, (long)CreateFiber(1024 * 1024, (LPFIBER_START_ROUTINE)FiberStart, (LPVOID)action)); } void Switch(int action) { SwitchToFiber((LPVOID)actions[action]); } void Delete(int action) { DeleteFiber((LPVOID)actions[action]); } }; } |
We will just number lambdas provided by the C# code so we can find fiber address (represented as long
here in line 8) by id.
We wrap 4 basic methods:
Convert
in line 15 converts current thread to fiber. Current thread will have id 0.
Create
in line 19 creates new fiber and passes lambda id to the fiber method.
Switch
in line 23 just schedules the fiber.
Delete
in line 27 deletes the fiber.
Now, we need to have a fiber function (just like a thread function when creating new thread). The flow is: we start in C# code creating fibers using wrapper which we just implemented. At some point C# code calls Switch
which is supposed to schedule a fiber. Next, fiber must return to C# code and execute managed lambda. So we go with this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
#pragma once #include <Windows.h> #include <cstdio> #include <metahost.h> #pragma comment(lib, "mscoree.lib") #import "mscorlib.tlb" raw_interfaces_only \ high_property_prefixes("_get","_put","_putref") \ rename("ReportEvent", "InteropServices_ReportEvent") void FiberStart(LPVOID param) { HRESULT hr; ICLRMetaHost* pMetaHost = NULL; ICLRRuntimeInfo* pRuntimeInfo = NULL; ICLRRuntimeHost* pClrRuntimeHost = NULL; hr = CLRCreateInstance(CLSID_CLRMetaHost, IID_PPV_ARGS(&pMetaHost)); hr = pMetaHost->GetRuntime(L"v4.0.30319", IID_PPV_ARGS(&pRuntimeInfo)); hr = pRuntimeInfo->GetInterface(CLSID_CLRRuntimeHost, IID_PPV_ARGS(&pClrRuntimeHost)); hr = pClrRuntimeHost->Start(); wchar_t id[2]; id[0] = (wchar_t)param; id[1] = 0; DWORD pReturnValue; hr = pClrRuntimeHost->ExecuteInDefaultAppDomain( L"AsyncLoomDll.dll", L"AsyncLoomDll.FiberAsync", L"StartFiber", id, &pReturnValue); } |
We import necessary .NET stubs in lines 8-12. Next, we load CLR libraries into the process (they are already there as we came from managed world to this place) in lines 15-22. Finally, we prepare a string which marshals lambda id into first character (to avoid conversions etc).
Finally, in line 31 we execute method in C# code. That’s all.
Fibers in C#
Message loop
We’ll have a very similar code. Let’s start with this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
using System; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Threading; namespace AsyncLoomDll { public class FiberAsync { public static ConcurrentDictionary<int, byte> readyToGo = new ConcurrentDictionary<int, byte>(); public static ConcurrentDictionary<int, Action> allJobs = new ConcurrentDictionary<int, Action>(); public static Semaphore semaphore = new Semaphore(0, int.MaxValue); public static AsyncLoomCli.FiberHelper helper = new AsyncLoomCli.FiberHelper(); public static int current; public static bool done; public static int StartFiber(string arg) { int actionId = (int)arg[0]; allJobs[actionId](); return 0; } public static void Run() { helper.Convert(); allJobs.TryAdd(1, RunInternal); readyToGo.TryAdd(1, 0); semaphore.Release(); helper.Create(1); allJobs.TryAdd(2, SideJob); readyToGo.TryAdd(2, 0); semaphore.Release(); helper.Create(2); bool triggered; while (true) { triggered = false; done = false; var keys = readyToGo.Keys.GetEnumerator(); while (keys.MoveNext()) { current = keys.Current; helper.Switch(current); if (done) { helper.Delete(current); Action action; allJobs.TryRemove(current, out action); byte b; readyToGo.TryRemove(current, out b); } } if (allJobs.IsEmpty) { break; } if (!triggered) { semaphore.WaitOne(); } } } } } |
Similar collections in lines 10-12 and some bookkeeping as well lin lines 13-15.
In lines 17-23 you can see a method StartFiber
which is called by C++ code. You can see how we extract lambda id from the string 9line 19) and execute it (line 20). That’s all.
Now, scheduler in lines 25-71. We first convert current thread to fiber (line 27). Next, we schedule two fibers to imitate two jobs to be done (just like two requests in coroutines sample).
Finally, in lines 40-71 we do the same loop as previously. This time instead of calling MoveNext
on a coroutine we call Switch
in line 50 which effectively pauses this part of the code and starts executing fiber (potentially via StartFiber
method in line 17). We also need to remove fibers when they are done (once again, no locking, just some flags).
async as fibers
Okay, now let’s see the meat. We need the following helpers:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
static class TaskExtensions { public static int MagicWait(this Task t) { var current = FiberAsync.current; byte b; FiberAsync.readyToGo.TryRemove(current, out b); t.ContinueWith(r => { FiberAsync.readyToGo.TryAdd(current, 0); FiberAsync.semaphore.Release(); }); FiberAsync.helper.Switch(0); return 0; } public static void Done() { FiberAsync.done = true; FiberAsync.helper.Switch(0); } } |
Now MagicWait
just deschedules fiber and resumes the main one (with the message loop).
Also, Done
method is just for bookkeeping. We could hide it if we wanted.
And now the most important part — async
without pollution:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
private static void RunInternal() { WhereAmI("Before nesting"); RunInternalNested(); WhereAmI("After nesting"); } private static void RunInternalNested() { WhereAmI("Before creating task"); var delay = Task.Delay(2000); WhereAmI("After creating delay"); delay.MagicWait(); WhereAmI("After sleeping"); var data = Task.Run(() => "Some string"); WhereAmI("After creating data"); data.MagicWait(); var result = data.Result; WhereAmI($"After reading data {result}"); TaskExtensions.Done(); } private static void SideJob() { WhereAmI("Side job"); TaskExtensions.Done(); } |
Look how methods are just void
. We don’t have any magical keywords, we don’t need to use tasks everywhere. The waiting is done by calling MagicWait
in line 18 or 26. We literally write synchronous code (just replace Wait
with MagicWait
to see). The only part which is ugly are lines 30 and 36 where we explicitly close the coroutine but this could be hidden in StartFiber
method.
Output:
1 2 3 4 5 6 7 8 9 |
Thread 1 Time 6/2/2020 9:48:56 PM: Start - FiberAsync Thread 1 Time 6/2/2020 9:48:56 PM: Before nesting Thread 1 Time 6/2/2020 9:48:56 PM: Before creating task Thread 1 Time 6/2/2020 9:48:56 PM: After creating delay Thread 1 Time 6/2/2020 9:48:56 PM: Side job Thread 1 Time 6/2/2020 9:48:58 PM: After sleeping Thread 1 Time 6/2/2020 9:48:58 PM: After creating data Thread 1 Time 6/2/2020 9:48:58 PM: After reading data Some string Thread 1 Time 6/2/2020 9:48:58 PM: End - FiberAsync |
Notice line 5 how we execute side job while waiting for Task.Delay
and how we then move on. This IS a synchronous code which is managed by user mode scheduler under the hood.
Summary
We have seen couple of approaches and finally implemented asynchronous code in a purely synchronous way. This is the goal of project Loom.
Obviously, presented approach is not production ready and may result in multiple issues. Imagine locks being incorrectly passed (and now think for a sec how async
machinery must manage locks between threads as continuations may easily execute on any thread), deadlocks, exceptions and all the other non-positive scenarios. However, the essence of the solution works. Is it worth it? We get pure code with no artificial types and async
all the way up. Would it work? Time will tell as project Loom is not ready when I’m writing these words.