async – Random IT Utensils https://blog.adamfurmanek.pl IT, operating systems, maths, and more. Sat, 16 Jan 2021 09:54:53 +0000 en-US hourly 1 https://wordpress.org/?v=6.6.2 Async Wandering Part 12 — Fibers with generics https://blog.adamfurmanek.pl/2021/01/30/async-wandering-part-12/ https://blog.adamfurmanek.pl/2021/01/30/async-wandering-part-12/#respond Sat, 30 Jan 2021 09:00:11 +0000 https://blog.adamfurmanek.pl/?p=3743 Continue reading Async Wandering Part 12 — Fibers with generics]]>

This is the twelfth part of the Async Wandering series. For your convenience you can find other parts in the table of contents in Part 1 – Why creating Form from WinForms in unit tests breaks async?

Today we are going to color our functions in different way.

Last time we saw how to return values from each function using monads. However, all we need is just an ability to run the code in some context if we expect that code may be asynchronous. If we know it’s going to be synchronous then there is no reason to go through any monads. Instead of reverse-colouring functions, we may use generics to propagate the context and call it as needed.

public abstract class Builder
{
	public abstract Monad Build();
}

public class IdBuilder : Builder
{
	public override Monad Build()
	{
		return new Id();
	}
}

public class AsyncBuilder : Builder
{
	public override Monad Build()
	{
		return new Async();
	}
}

public interface Monad
{
	U Map(T value, Func lambda);
	void Complete(object t);
}

public class Id : Monad
{
	private object t;

	public U Map(T value, Func lambda)
	{
		this.t = value;
		lock (this)
		{
			while (t == null)
			{
				Monitor.Wait(this);
			}
		}

		return lambda((T)this.t);
	}

	public void Complete(object t)
	{
		lock (this)
		{
			this.t = t;
			Monitor.PulseAll(this);
		}
	}
}

public class Async : Monad
{
	private object t;
	private int current;

	public U Map(T value, Func lambda)
	{
		this.t = value;
		if (t == null)
		{
			this.current = HKTMonadFiberAsync.current;
			byte b;
			HKTMonadFiberAsync.readyToGo.TryRemove(this.current, out b);
			HKTMonadFiberAsync.helper.Switch(0);
		}

		return lambda((T)this.t);
	}

	public void Complete(object t)
	{
		this.t = t;
		HKTMonadFiberAsync.readyToGo.TryAdd(this.current, 0);
	}
}

Super similar to the code from the last part. However, this time we don’t hold the value in the monad, we pass it as a parameter and run it through the context.

How do we use it? This way:

private static void RunInternal()
{
	WhereAmI("Before nesting");

	RunInternalNested<AsyncBuilder>();

	WhereAmI("After nesting");
}

private static void RunInternalNested() where T: Builder, new()
{
	WhereAmI("Before creating delay");

	Delay<T>(2000);

	WhereAmI("After sleeping");

	var data = Data<T>("Some string");
	
	WhereAmI($"After creating data {data}");
}

private static void Delay(int timeout) where T : Builder, new()
{
	var context = new T().Build();
	var timer = new Timer(_ => context.Complete(new object()), null, timeout, Timeout.Infinite);
	GC.KeepAlive(timer);
	context.Map((object)null, _ => timeout);
}

private static U Data(U d) where T: Builder, new()
{
	var context = new T().Build();
	return context.Map(d, _ => d);
}

notice that call to Delay passes the generic parameter indicating the context. We can also wrap any value through the context, just like Task.FromResult if needed. And the output is as expected:

Thread 1 Time 8/12/2020 5:16:21 PM: Start - HKTMonadFiberAsync
Thread 1 Time 8/12/2020 5:16:21 PM: Before nesting
Thread 1 Time 8/12/2020 5:16:21 PM: Before creating delay
Thread 1 Time 8/12/2020 5:16:21 PM: Side job
Thread 1 Time 8/12/2020 5:16:23 PM: After sleeping
Thread 1 Time 8/12/2020 5:16:23 PM: After creating data Some string
Thread 1 Time 8/12/2020 5:16:23 PM: After nesting
Thread 1 Time 8/12/2020 5:16:23 PM: End - HKTMonadFiberAsync

See that the side job was executed when we were sleeping. But if we change line 5 to RunInternalNested< IdBuilder>();, we get this:

Thread 1 Time 8/12/2020 5:17:10 PM: Start - HKTMonadFiberAsync
Thread 1 Time 8/12/2020 5:17:10 PM: Before nesting
Thread 1 Time 8/12/2020 5:17:10 PM: Before creating delay
Thread 1 Time 8/12/2020 5:17:12 PM: After sleeping
Thread 1 Time 8/12/2020 5:17:12 PM: After creating data Some string
Thread 1 Time 8/12/2020 5:17:12 PM: After nesting
Thread 1 Time 8/12/2020 5:17:12 PM: Side job
Thread 1 Time 8/12/2020 5:17:12 PM: End - HKTMonadFiberAsync

So the side job is executed after the main one finishes which is a synchronous execution.

This way we have no colors, no static state, just a generic parameter which could be optimized by the compiler. We can go even further and get rid of boxing:

public abstract class Builder
{
	public abstract Monad Build();
}

public class IdBuilder : Builder
{
	public override Monad Build()
	{
		return new Id();
	}
}

public class AsyncBuilder : Builder
{
	public override Monad Build()
	{
		return new Async();
	}
}

public interface Monad
{
	U Map<U>(T value, Func lambda);
	void Complete(T t);
}

public class Id : Monad
{
	private T t;

	public U Map<U>(T value, Func lambda)
	{
		this.t = value;
		lock (this)
		{
			while (t == null)
			{
				Monitor.Wait(this);
			}
		}

		return lambda(this.t);
	}

	public void Complete(T t)
	{
		lock (this)
		{
			this.t = t;
			Monitor.PulseAll(this);
		}
	}
}

public class Async : Monad
{
	private T t;
	private int current;

	public U Map<U>(T value, Func lambda)
	{
		this.t = value;
		if (t == null)
		{
			this.current = HKTMonadFiberAsync.current;
			byte b;
			HKTMonadFiberAsync.readyToGo.TryRemove(this.current, out b);
			HKTMonadFiberAsync.helper.Switch(0);
		}

		return lambda(this.t);
	}

	public void Complete(T t)
	{
		this.t = t;
		HKTMonadFiberAsync.readyToGo.TryAdd(this.current, 0);
	}
}

Bonus points for getting sort of Higher Kinded Type in C# without doing the Brand transformation.

]]>
https://blog.adamfurmanek.pl/2021/01/30/async-wandering-part-12/feed/ 0
Async Wandering Part 11 — Wrapping fibers in context https://blog.adamfurmanek.pl/2020/12/26/async-wandering-part-11/ https://blog.adamfurmanek.pl/2020/12/26/async-wandering-part-11/#respond Sat, 26 Dec 2020 09:00:18 +0000 https://blog.adamfurmanek.pl/?p=3696 Continue reading Async Wandering Part 11 — Wrapping fibers in context]]>

This is the eleventh part of the Async Wandering series. For your convenience you can find other parts in the table of contents in Part 1 – Why creating Form from WinForms in unit tests breaks async?

We continue exploring async code.

Last time we saw how to use fibers to wait tasks. This effectively allows us to have async code without any function coloring.

We already know function coloring has big drawbacks and I also provided an example how to use monads to address that. However, monads were actually reverse-coloring everything, instead of having functions of two colors (one for sync and one for async) we had all functions of the same color with some monad type returned.

Today we are going to merge that approach. We’ll have reverse-colored functions with fibers. Let’s begin.

I start with the same glue code as the last time:

public static ConcurrentDictionary readyToGo = new ConcurrentDictionary();
public static ConcurrentDictionary allJobs = new ConcurrentDictionary();
public static AsyncLoomCli.FiberHelper helper = new AsyncLoomCli.FiberHelper();
public static int current;
public static bool done;

public static int StartFiber(string arg)
{
	int actionId = (int)arg[0];
	allJobs[actionId]();
	if (actionId != 0)
	{
		MonadFiberAsync.done = true;
		MonadFiberAsync.helper.Switch(0);
	}

	return 0;
}

public static void Run()
{
	helper.Convert();

	allJobs.TryAdd(1, RunInternal);
	readyToGo.TryAdd(1, 0);
	helper.Create(1);

	allJobs.TryAdd(2, SideJob);
	readyToGo.TryAdd(2, 0);
	helper.Create(2);


	while (true)
	{
		done = false;
		var keys = readyToGo.Keys.GetEnumerator();
		while (keys.MoveNext())
		{
			current = keys.Current;
			helper.Switch(current);
			if (done)
			{
				helper.Delete(current);
				Action action;
				allJobs.TryRemove(current, out action);
				byte b;
				readyToGo.TryRemove(current, out b);
			}
		}

		if (allJobs.IsEmpty)
		{
			break;
		}

		Thread.Sleep(1);
	}
}

Next, I introduce monads:

public interface Monad
{
	Monad<U> Map<U>(Func<T, Monad<U>> lambda);
	void Complete(T t);
	T Value();
}

Super simple interface, not adhering to all monad laws. This is just for showing the idea, not a bullet proof implementation.

We go with this base class:

public abstract class BaseMonad : Monad
{
	public T Value()
	{
		T value = default(T);
		Map(t =>
		{
			value = t;
			return (Monad)null;
		});
		return value;
	}

	public abstract Monad<U> Map<U>(Func<T, Monad<U>> lambda);
	public abstract void Complete(T t);
}

We can see that Value is a glue to just extract the value from the monad. It’s an unwrap operation.

Next, we have our identity monad:

public class Id : BaseMonad
{
	private T t;

	public override Monad<U> Map<U>(Func<T, Monad<U>> lambda)
	{
		lock (this) {
			while (t == null)
			{
				Monitor.Wait(this);
			}
		}

		return lambda(this.t);
	}

	public override void Complete(T t)
	{
		lock (this) {
			this.t = t;
			Monitor.PulseAll(this);
		}
	}
}

We have value, simple map operation waiting for the value to appear, and a callback to fill the monad. Notice how this is effectively a promise.

Now, async monad:

public class Async : BaseMonad
{
	private T t;
	private int current;

	public override Monad<U> Map<U>(Func<T, Monad<U>> lambda)
	{
		if (t == null)
		{
			this.current = MonadFiberAsync.current;
			byte b;
			MonadFiberAsync.readyToGo.TryRemove(this.current, out b);
			MonadFiberAsync.helper.Switch(0);
		}

		return lambda(this.t);
	}

	public override void Complete(T t)
	{
		this.t = t;
		MonadFiberAsync.readyToGo.TryAdd(this.current, 0);
	}
}

The only difference here is that we do the cooperative scheduling instead of just pausing the thread.

Now, we’d like to decide how to use these monads. For that we need to have builders which we’ll be able to replace in runtime (similar to synchronization context):

public abstract class Builder
{
	public abstract Monad Build();
}

public class IdBuilder : Builder
{
	public override Monad Build()
	{
		return new Id();
	}
}

public class AsyncBuilder : Builder
{
	public override Monad Build()
	{
		return new Async();
	}
}

No magic here. And the env:

public class ExecutionEnvironment
{
	public Builder SyncBuilder;
	public Builder AsyncBuilder;
}

If you think about task builders in C# (which let you return any task type) then you’re right.

Okay, let’s go with some operation now. Again, I’ll have 2 jobs:

private static void RunInternal()
{
	WhereAmI("Before nesting");

	var env = new ExecutionEnvironment
	{
		SyncBuilder = new IdBuilder(),
		AsyncBuilder = new AsyncBuilder()
	};

	RunInternalNested(env);

	WhereAmI("After nesting");
}

We create environment with builders and then continue:

private static void RunInternalNested(ExecutionEnvironment env)
{
	var start = env.SyncBuilder.Build();
	start.Complete(0);
	start.Map(i =>
	{
		WhereAmI("Before creating task");

		var delay = Delay(2000, env);

		WhereAmI("After creating delay");

		return delay;
	}).Map(i =>
	{
		WhereAmI("After sleeping");

		var data = Data("Some string", env);

		WhereAmI("After creating data");

		return data;
	}).Map(result => {
		WhereAmI($"After reading data {result}");

		return env.SyncBuilder.Build();
	});
}

We glue some lambdas together thanks to mapping. Notice how we return delay from the first lambda and then call Map on it which makes the waiting. Also, notice how I pass environment explicitly. In some other language we could pass it via implicits, or we could utilize compiler to do the reverse-coloring for us (to avoid parameters and lambdas!).

Moving on:

private static Monad Delay(int timeout, ExecutionEnvironment env)
{
        var a = env.AsyncBuilder.Build();
        var timer = new Timer(_ => a.Complete(new object()), null, timeout, Timeout.Infinite);
        GC.KeepAlive(timer);
	return a;
}

Instead of blocking the thread with sleep, we create a timer which calls the callback and resolves the monad.

private static Monad Data(string d, ExecutionEnvironment env)
{
	var monad = env.SyncBuilder.Build();
	monad.Complete(d);
	return monad;
}

Here we just return the data.

And another side job:

private static void SideJob()
{
	WhereAmI("Side job");
}

Output:

Thread 1 Time 7/23/2020 10:56:48 PM: Start - MonadFiberAsync
Thread 1 Time 7/23/2020 10:56:48 PM: Before nesting
Thread 1 Time 7/23/2020 10:56:48 PM: Before creating task
Thread 1 Time 7/23/2020 10:56:48 PM: After creating delay
Thread 1 Time 7/23/2020 10:56:48 PM: Side job
Thread 1 Time 7/23/2020 10:56:50 PM: After sleeping
Thread 1 Time 7/23/2020 10:56:50 PM: After creating data
Thread 1 Time 7/23/2020 10:56:50 PM: After reading data Some string
Thread 1 Time 7/23/2020 10:56:50 PM: After nesting
Thread 1 Time 7/23/2020 10:56:50 PM: End - MonadFiberAsync

Okay, works like a charm. However, let’s now do some magic. Instead of using async builder for async methods lets do AsyncBuilder = new IdBuilder(). Output:

Thread 1 Time 7/23/2020 10:57:23 PM: Start - MonadFiberAsync
Thread 1 Time 7/23/2020 10:57:23 PM: Before nesting
Thread 1 Time 7/23/2020 10:57:23 PM: Before creating task
Thread 1 Time 7/23/2020 10:57:23 PM: After creating delay
Thread 1 Time 7/23/2020 10:57:25 PM: After sleeping
Thread 1 Time 7/23/2020 10:57:25 PM: After creating data
Thread 1 Time 7/23/2020 10:57:25 PM: After reading data Some string
Thread 1 Time 7/23/2020 10:57:25 PM: After nesting
Thread 1 Time 7/23/2020 10:57:25 PM: Side job
Thread 1 Time 7/23/2020 10:57:25 PM: End - MonadFiberAsync

Notice that Side job waited until the first job finished and wasn’t run in the middle. The thread was sleeping since we blocked it by using synchronous monad. One line of code and we disabled asynchronous code.

Is this approach better? In theory this has the best of two worlds, you can wait synchronously for async code, not block the thread, and change the logic any way you like. Allocation is much higher, but that’s obvious with this approach. Also, there is a lot of plumbing code so we’d probably need a clever compiler doing the magic behind the scenes but that could bring us back to the C# solution. Apart from different coloring, I pass environment explicitly but once I stop doing that (and it’s added by the compiler) then it isn’t much different from thread local variables for synchronization context.

However, the difference now is that nothing can escape the monad (well, at least when we have compiler checking that) so we can control how things behave deep down. If we better encapsulate the environment then it won’t be possible to change the async machinery by replacing some global state.

Ultimately these solutions may not necessarily differ that much.

]]>
https://blog.adamfurmanek.pl/2020/12/26/async-wandering-part-11/feed/ 0
Async Wandering Part 10 — Project Loom in .NET – awaiting with fibers https://blog.adamfurmanek.pl/2020/10/03/async-wandering-part-10/ https://blog.adamfurmanek.pl/2020/10/03/async-wandering-part-10/#respond Sat, 03 Oct 2020 08:00:16 +0000 https://blog.adamfurmanek.pl/?p=3525 Continue reading Async Wandering Part 10 — Project Loom in .NET – awaiting with fibers]]>

This is the tenth part of the Async Wandering series. For your convenience you can find other parts in the table of contents in Part 1 – Why creating Form from WinForms in unit tests breaks async?

In Part 8 I mentioned that async could be implemented as fibers and this is the goal of project Loom in JVM world. Let’s see this super simple implementation. This is not supposed to work reliably in production (as .NET officially doesn’t support fibers) but is presented as a sample that it could be done.

Sidenote: you may want to read about internals of async state machine and synchronization contexts before moving forward. I’ll skip details of async code under the hood but will assume you know how it works.

Regular async code

Okay, let’s begin with defining what we would like to do. Let’s take this code:

using System;
using System.Threading;

namespace AsyncLoom
{
    class Program
    {
        static void Main(string[] args)
        {
            WhereAmI("Start - RegularAsync");
            RegularAsync.Run().Wait();
            WhereAmI("End - RegularAsync\n");
        }

        public static void WhereAmI(string what)
        {
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} Time {DateTime.Now}: {what}");
        }
    }
}

This is just as skeleton which we will use to trigger jobs. We have helper method for printing thread ID and timestamp.

Let’s take regular async code:

using System;
using System.Threading.Tasks;

namespace AsyncLoom
{
    public class RegularAsync
    {
        public static async Task Run()
        {
            Program.WhereAmI("Before nesting");

            var nested = RunNested();

            Program.WhereAmI("Between nesting");

            await nested;

            Program.WhereAmI("After nesting");
        }

        public static async Task RunNested()
        {
            Program.WhereAmI("Before creating task");

            var delay = Task.Delay(2000);

            Program.WhereAmI("After creating delay");

            await delay;

            Program.WhereAmI("After sleeping");

            var data = Task.Run(() => "Some string");

            Program.WhereAmI("After creating data");

            var result = await data;

            Program.WhereAmI($"After reading data {result}");
        }
    }
}

Nothing magical here. We start with top async function called Run which starts executing RunNested. We start executing nested function until line 29 in which we call await delay which results in checking whether the task is finished (it is not) and we return from the function. Next, we print in line 14 and then wait in line 16.

At this point the thread is free to do anything it wants. In our case we just block it but it could observe message pump or network requests.

Some time later we finish waiting and resume in line 31. Output:

Thread 1 Time 6/2/2020 9:48:52 PM: Start - RegularAsync
Thread 1 Time 6/2/2020 9:48:52 PM: Before nesting
Thread 1 Time 6/2/2020 9:48:52 PM: Before creating task
Thread 1 Time 6/2/2020 9:48:52 PM: After creating delay
Thread 1 Time 6/2/2020 9:48:52 PM: Between nesting
Thread 4 Time 6/2/2020 9:48:54 PM: After sleeping
Thread 4 Time 6/2/2020 9:48:54 PM: After creating data
Thread 4 Time 6/2/2020 9:48:54 PM: After reading data Some string
Thread 4 Time 6/2/2020 9:48:54 PM: After nesting
Thread 1 Time 6/2/2020 9:48:54 PM: End - RegularAsync

You can see 2 seconds delay between lines 5 and 6.

Coroutines

Let’s now reimplement this solution with coroutines. You probably know that fibers, coroutines, and user mode threads are equivalent to some extent. By going with coroutines we’ll see the general logic.

Coroutines in C# can be easily implemented with yield return and this is what we’re going to do. We need to implement message queue (just like for a regular thread pool thread) and continuations. Implementation will be messy because it’s just for experimentation so it has little to no error checking (and probably has race conditions as well).

Message loop

We start with this:

using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;

namespace AsyncLoom
{
    public static class GeneratorAsync
    {
        public static ConcurrentDictionary<EnumeratorWithFlag, byte> readyToGo = new ConcurrentDictionary<EnumeratorWithFlag, byte>();
        public static ConcurrentDictionary<EnumeratorWithFlag, byte> allJobs = new ConcurrentDictionary<EnumeratorWithFlag, byte>();
        public static Semaphore semaphore = new Semaphore(0, int.MaxValue);
        public static EnumeratorWithFlag current;

        public class EnumeratorWithFlag
        {
            public IEnumerator<int> enumerator;
            private bool lastState;
            private EnumeratorWithFlag previous;

            public EnumeratorWithFlag(IEnumerator<int> enumerator)
            {
                this.enumerator = enumerator;
                this.lastState = true;
            }

            public bool MoveNext()
            {
                lastState = lastState && enumerator.MoveNext();

                if (!lastState && previous != null)
                {
                    this.enumerator = this.previous.enumerator;
                    this.lastState = true;
                    this.previous = this.previous.previous;
                }

                return lastState;
            }

            public void Push(IEnumerator<int> enumerator)
            {
                this.previous = new EnumeratorWithFlag(this.enumerator);
                this.enumerator = enumerator;
            }
        }
    }
}

We have a bunch of collections. First in line 10 is readyToGo — these are coroutines which can be executed because they finished waiting for dependencies.
Next in line 11 we have a collection of all jobs so we know when we’re done with calculations.
We also need to have semaphore for synchronization in line 12, and current executed coroutine in line 13.

Next, we have a helper class EnumeratorWithFlag which keeps a chain of continuations and stack traces. It wraps regular enumerator and propagates context if needed. It also has method Push which adds new enumerator on top of existing one.

Idea here is: EnumeratorWithFlag represents current coroutine. If we move to some other (by calling it) we push it to the chain with Push method. However, MoveNext is clever enough to restore the state if needed so we traverse whole chain with just one instance of EnumeratorWithFlag stored in collections.

Now the main method:

public static void Run()
{
	var runInternal = new EnumeratorWithFlag(RunInternal().GetEnumerator());
	allJobs.TryAdd(runInternal, 0);
	readyToGo.TryAdd(runInternal, 0);
	semaphore.Release();

	var sideJob = new EnumeratorWithFlag(SideJob().GetEnumerator());
	allJobs.TryAdd(sideJob, 0);
	readyToGo.TryAdd(sideJob, 0);
	semaphore.Release();


	bool triggered;
	while (true) {
		triggered = false;
		var keys = readyToGo.Keys.GetEnumerator();

		while(keys.MoveNext()){
			current = keys.Current;
			triggered = true;

			if (!current.MoveNext())
			{
				byte b;
				readyToGo.TryRemove(current, out b);
				allJobs.TryRemove(current, out b);
			}
		}

		if (allJobs.IsEmpty)
		{
			break;
		}

		if (!triggered)
		{
			semaphore.WaitOne();
		}
	}
}

This is quite a lot. First, we schedule a normal job to execute in lines 3-6. We do some bookkeeping and also add new token in the semaphore to signal that there may be something running under the hood.

Next, in lines 8-11 we simulate new request coming to the thread in the meantime. Think of message in message pump or network call while we execute something.

Finally, lines 14-40 execute the message loop.
We start in line 16 with setting flag if we executed any step in this iteration. This is to avoid deadlocks if we have something to do and we don’t need to wait.
Next, we get something to execute (lines 17-20). In line 23 we execute one step of coroutine. If it finished whole generator (and returned false), we remove it from collections to not bother with it anymore.

Finally, we break in line 33 if we’re done. However, if we didn’t execute a thing and there are unfinished coroutines, we wait with semaphore.

It is important to remember that we’re generally single threaded here. Collections may be modified outside of this thread only when we execute some continuation on other thread but whole message loop generally runs coroutines synchronously.

async as coroutines

Now jobs:

private static IEnumerable<int> RunInternal()
{
	Program.WhereAmI("Before nesting");

	var nested = RunInternalNested().GetEnumerator().Start();

	Program.WhereAmI("Between nesting");
	
	yield return nested.MagicWait();

	Program.WhereAmI("After nesting");
}

private static IEnumerable<int> RunInternalNested()
{
	Program.WhereAmI("Before creating task");

	var delay = Task.Delay(2000);

	Program.WhereAmI("After creating delay");

	yield return delay.MagicWait();

	Program.WhereAmI("After sleeping");

	var data = Task.Run(() => "Some string");

	Program.WhereAmI("After creating data");

	yield return data.MagicWait();
	var result = data.Result;

	Program.WhereAmI($"After reading data {result}");
}

private static IEnumerable<int> SideJob()
{
	Program.WhereAmI("Side job");
	yield break;
}

We start with very similar code as last time. Notice in line 5 that we need to manually trigger the coroutine — this is difference between foreach and await in C#. The former executes coroutine only when iterating through the collection, the latter starts the coroutine immediately (we don’t need to await it).

Next, notice how we use yield return in lines 9, 22, and 30. These are equivalent to doing await, only we work with enumerators. This also shows how enumerators in C# have the same issue as await — they require specific return type so we need to pollute our codebase (yield return and IEnumerable all the way up).

Finally, see in line 31 how we get result from the Task. We magically wait for it in line 30 so in line 31 it is already resolved. We just read the result directly as it must be available (so this is non-blocking).

Okay, let’s see magic:

static class TaskExtensions
{
	public static int MagicWait(this Task t)
	{
		var current = GeneratorAsync.current;
		byte b;
		GeneratorAsync.readyToGo.TryRemove(current, out b);
		t.ContinueWith(r => {
			GeneratorAsync.readyToGo.TryAdd(current, 0);
			GeneratorAsync.semaphore.Release();
		});
		return 0;
	}

	public static int MagicWait(this IEnumerator<int> e)
	{
		if (GeneratorAsync.current.enumerator != e)
		{
			e.Start();
		}

		return 0;
	}

	public static IEnumerator<int> Start(this IEnumerator<int> e)
	{
		GeneratorAsync.current.Push(e);
		GeneratorAsync.current.MoveNext();
		return e;
	}
}

So we have 3 helper methods. MagicWait in line 3 takes current coroutine (line 5), deschedules it (line 7), and then sets continuation which restores the coroutine (line 9). Also, continuation notifies main thread that there is something ready for execution (line 10). This is the only place where we are multithreaded — continuation runs anywhere (specifically on other thread pool thread) and sets flags, we synchronize later using message queue.

Next, MagicWait in line 15 waits for coroutine called directly (not something requiring waiting). We check if it is not current coroutine and start it if needed.

Finally, Start method pushes the chain (so it’s like a new frame on a call stack) and executes one step of it (by calling MoveNext).

Notice how we access static fields here and there. They could be hidden but I wanted to show something similar to real async implementation. It accesses hidden global state in a very similar manner.

Output:

Thread 1 Time 6/2/2020 9:48:54 PM: Start - GeneratorAsync
Thread 1 Time 6/2/2020 9:48:54 PM: Before nesting
Thread 1 Time 6/2/2020 9:48:54 PM: Before creating task
Thread 1 Time 6/2/2020 9:48:54 PM: After creating delay
Thread 1 Time 6/2/2020 9:48:54 PM: Between nesting
Thread 1 Time 6/2/2020 9:48:54 PM: Side job
Thread 1 Time 6/2/2020 9:48:56 PM: After sleeping
Thread 1 Time 6/2/2020 9:48:56 PM: After creating data
Thread 1 Time 6/2/2020 9:48:56 PM: After reading data Some string
Thread 1 Time 6/2/2020 9:48:56 PM: After nesting
Thread 1 Time 6/2/2020 9:48:56 PM: End - GeneratorAsync

Couple of important things here. First, we execute everything on one thread only (compare with previous output).
Second, we see second job executed while waiting (line 6). So we can see that the thread is effectively unblocked and can handle other messages in the middle, while waiting for continuations.
Finally, we see nesting works correctly — it would be super easy to write After nesting in line 6 but that would break the code flow.

Fibers in C++/CLI

Okay, now we want to implement previous example with fibers. They are provided by WinAPI but not exposed to .NET so we need to start with wrappers.
I’m working with .NET Framework 4.7.2 on W10 Pro x64. You can play with other platforms, changes should be relatively simple.

We create C++/CLI wrapper. First part:

using namespace System;
using namespace System::Collections::Generic;

namespace AsyncLoomCli {
	public ref class FiberHelper
	{
	public:
		Dictionary<int, long>^ actions;

		FiberHelper() {
			actions = gcnew Dictionary<int, long>();
		}

		void Convert() {
			actions->Add(0, (long)ConvertThreadToFiber(NULL));
		}

		void Create(int action) {
			actions->Add(action, (long)CreateFiber(1024 * 1024, (LPFIBER_START_ROUTINE)FiberStart, (LPVOID)action));
		}

		void Switch(int action) {
			SwitchToFiber((LPVOID)actions[action]);
		}

		void Delete(int action) {
			DeleteFiber((LPVOID)actions[action]);
		}
	};
}

We will just number lambdas provided by the C# code so we can find fiber address (represented as long here in line 8) by id.

We wrap 4 basic methods:
Convert in line 15 converts current thread to fiber. Current thread will have id 0.
Create in line 19 creates new fiber and passes lambda id to the fiber method.
Switch in line 23 just schedules the fiber.
Delete in line 27 deletes the fiber.

Now, we need to have a fiber function (just like a thread function when creating new thread). The flow is: we start in C# code creating fibers using wrapper which we just implemented. At some point C# code calls Switch which is supposed to schedule a fiber. Next, fiber must return to C# code and execute managed lambda. So we go with this:

#pragma once

#include <Windows.h>
#include <cstdio>

#include <metahost.h>

#pragma comment(lib, "mscoree.lib")

#import "mscorlib.tlb" raw_interfaces_only				\
    high_property_prefixes("_get","_put","_putref")		\
    rename("ReportEvent", "InteropServices_ReportEvent")

void FiberStart(LPVOID param) {
	HRESULT hr;
	ICLRMetaHost* pMetaHost = NULL;
	ICLRRuntimeInfo* pRuntimeInfo = NULL;
	ICLRRuntimeHost* pClrRuntimeHost = NULL;

	hr = CLRCreateInstance(CLSID_CLRMetaHost, IID_PPV_ARGS(&pMetaHost));
	hr = pMetaHost->GetRuntime(L"v4.0.30319", IID_PPV_ARGS(&pRuntimeInfo));
	hr = pRuntimeInfo->GetInterface(CLSID_CLRRuntimeHost, IID_PPV_ARGS(&pClrRuntimeHost));

	hr = pClrRuntimeHost->Start();

	wchar_t id[2];
	id[0] = (wchar_t)param;
	id[1] = 0;

	DWORD pReturnValue;
	hr = pClrRuntimeHost->ExecuteInDefaultAppDomain(
		L"AsyncLoomDll.dll",
		L"AsyncLoomDll.FiberAsync",
		L"StartFiber",
		id,
		&pReturnValue);
}

We import necessary .NET stubs in lines 8-12. Next, we load CLR libraries into the process (they are already there as we came from managed world to this place) in lines 15-22. Finally, we prepare a string which marshals lambda id into first character (to avoid conversions etc).

Finally, in line 31 we execute method in C# code. That’s all.

Fibers in C#

Message loop

We’ll have a very similar code. Let’s start with this:

using System;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Threading;

namespace AsyncLoomDll
{
    public class FiberAsync
    {
        public static ConcurrentDictionary<int, byte> readyToGo = new ConcurrentDictionary<int, byte>();
        public static ConcurrentDictionary<int, Action> allJobs = new ConcurrentDictionary<int, Action>();
        public static Semaphore semaphore = new Semaphore(0, int.MaxValue);
        public static AsyncLoomCli.FiberHelper helper = new AsyncLoomCli.FiberHelper();
        public static int current;
        public static bool done;

        public static int StartFiber(string arg)
        {
            int actionId = (int)arg[0];
            allJobs[actionId]();

            return 0;
        }

        public static void Run()
        {
            helper.Convert();

            allJobs.TryAdd(1, RunInternal);
            readyToGo.TryAdd(1, 0);
            semaphore.Release();
            helper.Create(1);

            allJobs.TryAdd(2, SideJob);
            readyToGo.TryAdd(2, 0);
            semaphore.Release();
            helper.Create(2);


            bool triggered;
            while (true)
            {
                triggered = false;
                done = false;
                var keys = readyToGo.Keys.GetEnumerator();

                while (keys.MoveNext())
                {
                    current = keys.Current;
                    helper.Switch(current);
                    if (done)
                    {
                        helper.Delete(current);
                        Action action;
                        allJobs.TryRemove(current, out action);
                        byte b;
                        readyToGo.TryRemove(current, out b);
                    }
                }

                if (allJobs.IsEmpty)
                {
                    break;
                }

                if (!triggered)
                {
                    semaphore.WaitOne();
                }
            }
        }
   }
}

Similar collections in lines 10-12 and some bookkeeping as well lin lines 13-15.

In lines 17-23 you can see a method StartFiber which is called by C++ code. You can see how we extract lambda id from the string 9line 19) and execute it (line 20). That’s all.

Now, scheduler in lines 25-71. We first convert current thread to fiber (line 27). Next, we schedule two fibers to imitate two jobs to be done (just like two requests in coroutines sample).

Finally, in lines 40-71 we do the same loop as previously. This time instead of calling MoveNext on a coroutine we call Switch in line 50 which effectively pauses this part of the code and starts executing fiber (potentially via StartFiber method in line 17). We also need to remove fibers when they are done (once again, no locking, just some flags).

async as fibers

Okay, now let’s see the meat. We need the following helpers:

static class TaskExtensions
{
	public static int MagicWait(this Task t)
	{
		var current = FiberAsync.current;
		byte b;
		FiberAsync.readyToGo.TryRemove(current, out b);
		t.ContinueWith(r => {
			FiberAsync.readyToGo.TryAdd(current, 0);
			FiberAsync.semaphore.Release();
		});

		FiberAsync.helper.Switch(0);
		return 0;
	}

	public static void Done()
	{
		FiberAsync.done = true;
		FiberAsync.helper.Switch(0);
	}
}

Now MagicWait just deschedules fiber and resumes the main one (with the message loop).

Also, Done method is just for bookkeeping. We could hide it if we wanted.

And now the most important part — async without pollution:

private static void RunInternal()
{
	WhereAmI("Before nesting");

	RunInternalNested();

	WhereAmI("After nesting");
}

private static void RunInternalNested()
{
	WhereAmI("Before creating task");

	var delay = Task.Delay(2000);

	WhereAmI("After creating delay");

	delay.MagicWait();

	WhereAmI("After sleeping");

	var data = Task.Run(() => "Some string");

	WhereAmI("After creating data");

	data.MagicWait();
	var result = data.Result;

	WhereAmI($"After reading data {result}");
	TaskExtensions.Done();
}

private static void SideJob()
{
	WhereAmI("Side job");
	TaskExtensions.Done();
}

Look how methods are just void. We don’t have any magical keywords, we don’t need to use tasks everywhere. The waiting is done by calling MagicWait in line 18 or 26. We literally write synchronous code (just replace Wait with MagicWait to see). The only part which is ugly are lines 30 and 36 where we explicitly close the coroutine but this could be hidden in StartFiber method.

Output:

Thread 1 Time 6/2/2020 9:48:56 PM: Start - FiberAsync
Thread 1 Time 6/2/2020 9:48:56 PM: Before nesting
Thread 1 Time 6/2/2020 9:48:56 PM: Before creating task
Thread 1 Time 6/2/2020 9:48:56 PM: After creating delay
Thread 1 Time 6/2/2020 9:48:56 PM: Side job
Thread 1 Time 6/2/2020 9:48:58 PM: After sleeping
Thread 1 Time 6/2/2020 9:48:58 PM: After creating data
Thread 1 Time 6/2/2020 9:48:58 PM: After reading data Some string
Thread 1 Time 6/2/2020 9:48:58 PM: End - FiberAsync

Notice line 5 how we execute side job while waiting for Task.Delay and how we then move on. This IS a synchronous code which is managed by user mode scheduler under the hood.

Summary

We have seen couple of approaches and finally implemented asynchronous code in a purely synchronous way. This is the goal of project Loom.

Obviously, presented approach is not production ready and may result in multiple issues. Imagine locks being incorrectly passed (and now think for a sec how async machinery must manage locks between threads as continuations may easily execute on any thread), deadlocks, exceptions and all the other non-positive scenarios. However, the essence of the solution works. Is it worth it? We get pure code with no artificial types and async all the way up. Would it work? Time will tell as project Loom is not ready when I’m writing these words.

]]>
https://blog.adamfurmanek.pl/2020/10/03/async-wandering-part-10/feed/ 0
Async Wandering Part 9 — awaiting with timeout https://blog.adamfurmanek.pl/2020/09/12/async-wandering-part-9/ https://blog.adamfurmanek.pl/2020/09/12/async-wandering-part-9/#comments Sat, 12 Sep 2020 08:00:14 +0000 https://blog.adamfurmanek.pl/?p=3469 Continue reading Async Wandering Part 9 — awaiting with timeout]]>

This is the ninth part of the Async Wandering series. For your convenience you can find other parts in the table of contents in Part 1 – Why creating Form from WinForms in unit tests breaks async?

In previous part I mentioned multiple ways to await a task with timeout. Let’s see them.

Sketch

Let’s start with this code:

static async Task Main(string[] args)
{
	await Do();
}

public static async Task Do()
{
	await Hang(); // How to timeout here?
}

public static async Task Hang()
{
	await Task.Delay(TimeSpan.FromDays(1));
	Console.WriteLine("Worked!");
}

We will say that this code runs forever (even though it would finish after a day). The question is how do we call the method in line 8 with some timeout?

Let’s also add the following helper method:

public static async Task<T> ThrowTimeoutException<T>()
{
	await Task.Delay(TimeSpan.FromSeconds(1));
	throw new Exception("Timeout!");
}

It waits for a second and throws an exception, nothing big here.

WARNING: In examples below I don’t handle timeout exception properly. Don’t do it this way! Always await all your tasks or exceptions will be propagated by GC and kill your application in an out-of-band manner.

Solution 1 — timeouting manually

General trick is to wait for two tasks and throw exception when timeout happens:

public static async Task Do()
{
	var completed = await Task.WhenAny(Program.Hang(), Program.ThrowTimeoutException<bool>()).ConfigureAwait(false);
	await completed.ConfigureAwait(false);
}

Since Task.WhenAny returns task which resulted first, we need to wait for it as well.

Writing that code everywhere may be tedious. Let’s look further.

Solution 2 — extension method

public static async Task Do()
{
	await Program.Hang().Timeout().ConfigureAwait(false);
}

static class TaskExtensions
{
	public static async Task Timeout(this Task t)
	{
		await (await Task.WhenAny(t, Program.ThrowTimeoutException<bool>()).ConfigureAwait(false)).ConfigureAwait(false);
	}
}

That’s basically the same as before. However, this time we wrap the logic in one method and that’s probably the solution we are looking for.

Let’s explore some more.

Soltuion 3 — custom SynchronizationContext

This time we are going to use custom context and timeout over there:

public static async Task Do()
{
	await MyContext.Run(() => Program.Hang()).ConfigureAwait(false);
}

class MyTaskScheduler : TaskScheduler
{
	private readonly MyContext context;
	public BlockingCollection<Task> tasks = new BlockingCollection<Task>();

	public MyTaskScheduler(MyContext context)
	{
		this.context = context;
	}

	protected override IEnumerable<Task> GetScheduledTasks()
	{
		return tasks;
	}

	protected override void QueueTask(Task task)
	{
		Queue(task);
	}

	protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
	{
		return TryExecuteTask(task);
	}

	public new bool TryExecuteTask(Task task)
	{
		return base.TryExecuteTask(task);
	}

	public void Queue(Task task)
	{
		tasks.Add(task);
	}
}

class MyContext : SynchronizationContext
{
	public MyTaskScheduler scheduler;
	public TaskFactory factory;
	private int operations;

	public MyContext()
	{
		scheduler = new MyTaskScheduler(this);
		factory = new TaskFactory(CancellationToken.None, TaskCreationOptions.HideScheduler, TaskContinuationOptions.HideScheduler, scheduler);
	}

	public override void Post(SendOrPostCallback d, object state)
	{
		var task = factory.StartNew(() => d(state));
		scheduler.Queue(task);
	}

	public override void Send(SendOrPostCallback d, object state)
	{
		d(state);
	}

	public override void OperationCompleted()
	{
		operations--;
		if (operations == 0)
		{
			scheduler.tasks.CompleteAdding();
		}
	}

	public override void OperationStarted()
	{
		operations++;
	}

	public static Task Run(Action action)
	{
		TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(TaskContinuationOptions.RunContinuationsAsynchronously);
		Program.ThrowTimeoutException<bool>().ContinueWith(t => taskCompletionSource.SetException(t.Exception));

		Task.Run(() =>
		{
			var oldContext = SynchronizationContext.Current;
			var newContext = new MyContext();
			try
			{
				SynchronizationContext.SetSynchronizationContext(newContext);
				var spanningTask = newContext.factory.StartNew(action);
				foreach (var task in newContext.scheduler.tasks.GetConsumingEnumerable())
				{
					newContext.scheduler.TryExecuteTask(task);
					task.GetAwaiter().GetResult();
				}
				spanningTask.GetAwaiter().GetResult();
			}
			finally
			{
				SynchronizationContext.SetSynchronizationContext(oldContext);
			}
		}).ContinueWith(t => taskCompletionSource.SetException(t.Exception), TaskContinuationOptions.OnlyOnFaulted).ContinueWith(_ => taskCompletionSource.SetResult(true));

		return taskCompletionSource.Task;
	}
}

That’s super similar to awaiting void which we have already seen in Part 5. Once we start executing method we fire a timeout as well. Bonus points, this works for async void methods as well. The advantage is we handle all continuations on the context so we can timeout them selectively, as needed. This is probably an overkill, though.

Solution 4 — custom task type

We can use custom task and timeout there:

public static async TimeoutableTask Do()
{
	await Program.Hang().ConfigureAwait(false);
}

[AsyncMethodBuilder(typeof(TimeoutableTaskMethodBuilder))]
public class TimeoutableTask
{
	public TaskCompletionSource<object> Promise { get; } = new TaskCompletionSource<object>();

	public Task AsTask() => Promise.Task;

	public TaskAwaiter<object> GetAwaiter()
	{
		return Promise.Task.GetAwaiter();
	}

	public static implicit operator Task(TimeoutableTask task) => task.AsTask();
}

public class TimeoutableTaskMethodBuilder
{
	public void Start<TStateMachine>(ref TStateMachine stateMachine)
		where TStateMachine : IAsyncStateMachine
	{
		Program.ThrowTimeoutException<bool>().ContinueWith(t => {
			if (!Task.GetAwaiter().IsCompleted)
			{
				Task.Promise.SetException(t.Exception);
			}
		});
		stateMachine.MoveNext();
	}

	public static TimeoutableTaskMethodBuilder Create()
	{
		return new TimeoutableTaskMethodBuilder();
	}

	public void SetStateMachine(IAsyncStateMachine stateMachine)
	{
	}

	public void SetResult()
	{
		Task.Promise.SetResult(null);
	}

	public void SetException(Exception exception)
	{
		Task.Promise.SetException(exception);
	}

	public TimeoutableTask Task { get; } = new TimeoutableTask();

	public void AwaitOnCompleted<TAwaiter, TStateMachine>(
		ref TAwaiter awaiter,
		ref TStateMachine stateMachine)
		where TAwaiter : INotifyCompletion
		where TStateMachine : IAsyncStateMachine
	{
		awaiter.OnCompleted(ResumeAfterAwait(stateMachine));
	}

	public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(
		ref TAwaiter awaiter,
		ref TStateMachine stateMachine)
		where TAwaiter : ICriticalNotifyCompletion
		where TStateMachine : IAsyncStateMachine
	{
		awaiter.UnsafeOnCompleted(ResumeAfterAwait(stateMachine));
	}

	private Action ResumeAfterAwait<TStateMachine>(TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
	{
		return () =>
		{
			stateMachine.MoveNext();
		};
	}
}

Notice first line – we use async TimeoutableTask. We need to provide a builder for it which is trivial in our case. We just delegate to regular logic and also fire a timeout task in line 26.

]]>
https://blog.adamfurmanek.pl/2020/09/12/async-wandering-part-9/feed/ 1
Async Wandering Part 8 — async and await — the biggest C# mistake? https://blog.adamfurmanek.pl/2020/05/09/async-wandering-part-8/ https://blog.adamfurmanek.pl/2020/05/09/async-wandering-part-8/#respond Sat, 09 May 2020 08:00:14 +0000 https://blog.adamfurmanek.pl/?p=3292 Continue reading Async Wandering Part 8 — async and await — the biggest C# mistake?]]>

This is the eighth part of the Async Wandering series. For your convenience you can find other parts in the table of contents in Part 1 – Why creating Form from WinForms in unit tests breaks async?

Async is one of the most popular C# features nowadays. It changes the way we write applications and gives a lot of advantages. However, it has a lot of significant drawbacks which we tend to ignore. In this post I’ll talk about some of them.

It’s expensive!

State machine cost

Every single time you mark your method as async C# compiler does a lot of magic. It introduces new type, does some bookkeeping for the machine, handles exceptions. While these things seem to be harmless, they do pile up. If your whole API is async-based then introducing state machine at every level will become expensive.

This may be very bad in high performance code. Actually, one of the first things you do when you want to improve the performance is you get rid of the state machine, which is against Microsoft guidelines

Garbage collection

Each task is allocated on the heap. This takes memory and requires Garbage Collector to do additional work. Even though it doesn’t have finalizer, it does manage state which is released when GC cleans up the objects. See Part 7 for more details.

Because of the GC pressure, C# introduced ValueTask which is stack allocated. However, because of issues I’ll discuss later, transition between the two is not that easy.

It doesn’t integrate with rest of the platform that nicely

async all the way up

When you use async you need to use it all the way up in your code. If you try waiting for a task synchronously, you’ll most likely end up with a deadlock. Consider the code below:

using System;
using System.Threading.Tasks;

namespace AsyncSyncWait
{
    class Program
    {
        static void Main(string[] args)
        {
            async Task operationOnContext()
            {
                await Task.Delay(1000);
                Console.WriteLine("Done waiting!");
            }

            operationOnContext().Wait();
        }
    }
}

If you try running this code in a console application — it works. If you use this code in a desktop app (WPF for instance) it hangs forever. Why? That’s because you synchronously wait for a task and because it uses global state under the hood, it cannot post a continuation on the same thread where it started because that thread is blocked. It is not a problem in a console app since it works differently there (continue reading to understand why).

ConfigureAwait(false) all the way down

async all the way up!” is a common phrase. However, it’s not the end of the story. Take this code:

using System;
using System.Threading.Tasks;

namespace AsyncSyncWait
{
    class Program
    {
        static void Main(string[] args)
        {
            async Task operationOnContext()
            {
                await operationOnContext2().ConfigureAwait(false);
            }

            async Task operationOnContext2()
            {
                await Task.Delay(1000);
                Console.WriteLine("Done waiting!");
            }

            operationOnContext().Wait();
        }
    }
}

Again, this hangs in UI applications. Why? We do ConfigureAwait in operationOnContext so continuation after awaiting should run on the thread pool right?

That’s right but there are more continuations here. operationOnContext2 awaits another method but doesn’t use ConfigureAwait so it captures the context. It doesn’t matter that one level above method doesn’t do it, context can be captured anywhere. Again, after waiting the continuation wants to return to the UI thread which is blocked.

void

Using async void methods is a very bad practice. But we need to do that sometimes! First, you have a lot of UI frameworks using handlers which expect you to provide a void method. However, delegates are worse — you cannot return a value from a delegate reliably. Some languages don’t support that at all (VB.NET) so the code is not portable anymore. What’s more, only last value from the delegate is returned — if you had multiple values returned then most of them are lost. So you cannot await the event easily. It is doable as in Part 4 but it is a very hacky solution.

void in disguise

void is not necessarily about returning nothing from the function, it is about not returning the Task as expected. There is one more place where we can’t use tasks easily — constructor. This collides with good OOP design when you want to have some checks when creating the object but they require going to database/network/etc.
There are other places where you can’t use tasks easily, some of them are fixed by C# language over time. Disposable items, foreach loops, etc. They show that async wasn’t integrated properly since the very beginning.

Exceptions

async changes exception behavior. First, it automatically captures all the exceptions and stores them on the Task object which may result in exception going unnoticed. See Part 7 for more details.

But it also changes behavior of aggregated exceptions, as shown in Part 6. You can await tasks but they won’t report all the exceptions as they would normally. Again, exceptions may go unnoticed because of that.

It doesn’t work with synchronization primivites

What happens if you try awaiting inside lock like here?

using System;
using System.Threading.Tasks;
using System.Threading;
					
public class Program
{
	public static void Main()
	{
		Test().Wait();
	}
	
	public static async Task Test(){
		var o = new object();
		lock(o){
			Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
			await Task.Delay(100);
			Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
		}
	}
}

We get this error:

Compilation error (line 16, col 4): The 'await' operator cannot be used in the body of a lock statement

And it makes perfect sense. If we await we may continue on other thread and we can never stop that. How would we move the lock to the continuation thread? Well, we could do some magic on the CLR level but then it wouldn’t work with OS primitives anyway (unless we integrate .NET with the system).
There is another issue — await releases the thread so it can do something else. Should the lock still be held? If we do so then we risk another deadlock if the thread runs code trying to lock on the same mechanism. Should we release the lock? Well, that would be even worse because critical section wouldn’t be protected anymore.
So it looks like everything is perfect? Not so fast. We know how lock is implemented in C# so why not unrolling it manually like here?

using System;
using System.Threading.Tasks;
using System.Threading;
					
public class Program
{
	public static void Main()
	{
		Test().Wait();
	}
	
	public static async Task Test(){
		var o = new object();
		bool taken = false;
		try{
			Monitor.Enter(o, ref taken);
			Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
			await Task.Delay(100);
			Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
		}finally{
			if(taken){
				Monitor.Exit(o);
			}
		}
	}
}

It compiles and prints the following:

1
4
Unhandled exception. System.AggregateException: One or more errors occurred. (Object synchronization method was called from an unsynchronized block of code.)
 ---> System.Threading.SynchronizationLockException: Object synchronization method was called from an unsynchronized block of code.
   at System.Threading.Monitor.Exit(Object obj)
   at Program.Test()
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at Program.Main()
Command terminated by signal 6

So we switch the thread after waiting and then monitor is not held. But you know what? The monitor is still taken by the thread 1. So we caused yet another deadlock. This will behave the same way with any other synchronization primitive, managed or not.

It can change thread anytime

What else is broken? Basically anything depending on the thread. Do you have thread static? It won’t work. Do you tie permissions to a thread? Most likely broken. Thread priorities?

This impacts other things as well — what about call stacks? It’s not simple to retrieve the trace easily, especially when things are moved to the heap.

It uses hidden state

Synchronization context capture

Wherever you await a task, you capture the synchronization context by default. This is why some code may work in a console application but may break in a UI one. There are multiple synchronization contexts and they differ significantly.

In console based application there is no synchronization context. In that situation, await doesn’t capture anything (cause there is nothing to capture) and uses a thread pool. Later on, when continuation is passed to run, it goes through the task scheduler and ends up on a thread pool so it has some thread to continue on.

In UI applications there is a synchronization context which uses exactly one thread – the UI thread. Whenever you post a continuation, it must be executed on that one thread. If it is blocked — you are out of luck and get a deadlock. See Part 3 how to fix that in UI applications.

What’s worse, the library you use may decide to not capture a synchronization context and change the behavior of your application (by running on a thread pool). Also, when it comes to Microsoft guidelines — at the time of writing this post there is none. However, most people (myself included) recommend calling ConfigureAwait(false) always unless you know what you’re doing.

Context on a thread

Things get even worse. In previous section I wrote that in console app there is no synchronization context. But this thing doesn’t depend on the application type — it can be changed in runtime. This is effectively a global state which anyone can modify. For instance, see Part 1 how UI frameworks modify the context on initialization. This is something which can easily go unnoticed and you have absolutely no control on that.

It breaks (or at least doesn’t follow) best practices

Endless waiting

First rule of waiting in concurrent scenarios is always wait with a timeout. It’s not easy to choose correct value, it depends on the application, CPU usage, peak/non-peak time and many more details. However, you should always have some timeout at least, so when your application hangs it can heal itself automatically.

How do you specify timeout using await? You probably don’t want to add any logic in the callee as it shouldn’t care. So you either wrap the call site and you end up with something like await Foo().SetTimeout(123).ConfigureAwait(false) (where SetTimeout is a nice extension method) or you modify synchronization context or you introduce new task type with AsyncMethodBuilder (if you have no idea — read fantastic post by Kevin Gosse).

First solution makes your codebase even more polluted (and it’s already pretty bad with ConfigureAwait calls) not to mention that one day you’ll find a place where you forgot to call the method. You can obviously use Fody or other nice tricks to modify your code on the fly but that’s risky.

Second solution won’t work unless you rewrite a lot. You probably don’t want to mess with synchronization context (especially not in UI apps) not to mention that it doesn’t need to be captured. You can rewrite thread pools, create custom schedulers and apply context automatically but this is magical again.

Third solution seems pretty nice, however, once your new task is casted to good old Task you’re done.

DRY — Synchronous and asynchronous methods

What happens if you want to expose synchronous and asynchronous version of your API? Well, you just create two methods. See File.ReadAllText and similar methods. They all have async counterpart. Imagine now, what may happen in few years if C# introduces another task type. Yet another method, just to support new use case.

And it’s not that async method just calls synchronous one under the hood. This is synchronous path and this is asynchronous one. When I’m writing this post, they look like this:

Synchronous:

private static string InternalReadAllText(string path, Encoding encoding)
{
	Debug.Assert(path != null);
	Debug.Assert(encoding != null);
	Debug.Assert(path.Length > 0);

	using (StreamReader sr = new StreamReader(path, encoding, detectEncodingFromByteOrderMarks: true))
		return sr.ReadToEnd();
}

Asynchronous:

private static async Task InternalReadAllTextAsync(string path, Encoding encoding, CancellationToken cancellationToken)
{
	Debug.Assert(!string.IsNullOrEmpty(path));
	Debug.Assert(encoding != null);

	char[] buffer = null;
	StreamReader sr = AsyncStreamReader(path, encoding);
	try
	{
		cancellationToken.ThrowIfCancellationRequested();
		buffer = ArrayPool.Shared.Rent(sr.CurrentEncoding.GetMaxCharCount(DefaultBufferSize));
		StringBuilder sb = new StringBuilder();
		while (true)
		{
#if MS_IO_REDIST
			int read = await sr.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
#else
			int read = await sr.ReadAsync(new Memory(buffer), cancellationToken).ConfigureAwait(false);
#endif
			if (read == 0)
			{
				return sb.ToString();
			}

			sb.Append(buffer, 0, read);
		}
	}
	finally
	{
		sr.Dispose();
		if (buffer != null)
		{
			ArrayPool.Shared.Return(buffer);
		}
	}
}

This is inextensible and doesn’t allow the old code to use new features. And we already have two task types (Task and ValueTask) so it is not that unlikely to get yet another one in future.

Dependency Inversion principle

The biggest issue is it breaks the Dependency Inversion principle. The principle says:

1. High-level modules should not depend on low-level modules. Both should depend on abstractions (e.g. interfaces).
2. Abstractions should not depend on details. Details (concrete implementations) should depend on abstractions.

Digression: every time you use concrete implementation instead of an interface you break this principle. Have you ever thought about strings? Is using System.String correct? And before you answer “nobody is going to rewrite string” let me just point you to some cases where people implement their own strings.

Now each async method in your interface depends on concrete implementation which is Task. Since it also relies on global state, you effectively cannot control how it works. What if you’d like to run it on a constrained thread pool? What if use only one thread? What if you’d like to wait for all possible continuations generated by given flow? No way, cannot be done because Task uses synchronization context and a thread pool internally, and these can be changed beyond your control.

So how should we fix this? Well, creating new APIs with another parameter like IThreadPool won’t work. It breaks DRY and the parameter may be ignored. What should be used instead is a special type controlling the behavior. And functional programming already has a type for that — it’s called Monad.

I’m not going to explain monads here (there are tons of articles out there). Monads capture values and put them in context, and this is exactly what we want with async. The same way monad may have a value or not (Maybe/Option), exactly one value (Id), or multiple values (List) — the same way the context here is the asynchronous execution. See the code below:

We start with very simple abstraction:

public interface Context
{
    Context<u> Apply<u>(Func<T, U> lambda);
}

Now, we implement a context with exactly one value:

public class Id : Context
{
    public T Val { get; set; }

    public static Id<u> Return<u>(U val)
    {
        return new Id<u>() { Val = val };
    }

    public Context<u> Apply<u>(Func<T, U> lambda)
    {
        return Id<u>.Return(lambda(this.Val));
    }
}

What does this do? Well, it just wraps a value in a no-op context. It’s like a boxed value type, or a nullable. We can see that it doesn’t do anything special, it just holds the value and performs an operation on it.

Now, let’s do this:

public static void Main()
{
    Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} at {DateTime.Now}");
    Context path = Id.Return("file.txt");
    Context content = Download(path);
    Console.WriteLine($"After downloading at {DateTime.Now}");
    content.Apply(p => {
        Console.WriteLine(p);
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} at {DateTime.Now}");
        return true;
    });
}
        
public static Context Download(Context path)
{
    return path.Apply(p => {
        Thread.Sleep(5000);
        Console.WriteLine($"Downloading at {DateTime.Now}");
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} at {DateTime.Now}");
        var content = "Grab the content somehow";
        return content;
    });
}

So we start with a context holding a file name, then we read the file and return a context holding the content. Let’s run the code and see:

Thread 1 at 11/29/2019 4:36:38 PM
Downloading at 11/29/2019 4:36:44 PM
Thread 1 at 11/29/2019 4:36:44 PM
After downloading at 11/29/2019 4:36:44 PM
Grab the content somehow
Thread 1 at 11/29/2019 4:36:44 PM

Okay, so we see our content and we see that the thread is the same before, during and after reading. Also, notice the delay between first two lines — because we run everything synchronously, the sleep is there. Now, what if we wanted to introduce asynchronous execution?

public class MyTask : Context
{
    public Task Val { get; set; }

    public static MyTask<u> Return<u>(U val)
    {
        return new MyTask<u>() { Val = Task.FromResult(val) };
    }

    public Context<u> Apply<u>(Func<T, U> lambda)
    {
        return new MyTask<u>() { Val = this.Val.ContinueWith(v => lambda(v.Result)) };
    }
}

Our new context holds a value in Task.FromResult, so you can see this is nothing new. However, when we bind the operation, we use ContinueWith method to execute operation in the context. Let’s now do this:

void Main()
{
    Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} at {DateTime.Now}");
    Context path = MyTask.Return("file.txt");
    Context content = Download(path);
    Console.WriteLine($"After downloading at {DateTime.Now}");
    content.Apply(p => {
        Console.WriteLine(p);
        Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} at {DateTime.Now}");
        return true;
    });
}

Notice, this is exactly the same code as before. What we actually change is we replace

Context path = Id.Return("file.txt");

with

Context path = MyTask.Return("file.txt");

See the output now:

Thread 1 at 11/29/2019 4:38:09 PM
After downloading at 11/29/2019 4:38:09 PM

What happened? We introduced asynchronous execution and didn’t await the task. Let’s add some sleep at the end and then the output is:

Thread 1 at 11/29/2019 4:38:46 PM
After downloading at 11/29/2019 4:38:46 PM
Downloading at 11/29/2019 4:38:51 PM
Thread 3 at 11/29/2019 4:38:51 PM
Grab the content somehow
Thread 4 at 11/29/2019 4:38:51 PM

Notice that the delay is now between lines 2 and 3. That’s because we wrapped sleep in ContinueWith so it didn’t block the main thread.

The important part here is: we encapsulated details of execution (synchronous vs asynchronous) in the type, not in the internals of the Download method. This means that the method and any infrastructure under the hood don’t need to know about thread pools, tasks, synchronization contexts or whatever else. They don’t care — they just create a series of transformations.

Now, in your production code you can use whatever thread pool you like, and in your unit tests you can go with synchronous execution. All with just one line of code change. You can now imagine having different implementations binding different infrastructure as needed, using dependency injection, factories or whatever you like. But the business code stays the same.

This is something you cannot do with async because it depends on implementation details. However, it doesn’t solve all the problems. TPL tried to solve a problem of callback hell and it did that by encapsulating continuations into an object. People often complain about lambdas in TPL and give async as a solution for that problem (which is not the main benefit but still important factor). Can we get the same encapsulation as above without lambdas?

Typically, a language needs to support this pattern somehow. It’s kind of similar to Continuation Passing Style and the translation can be done by the compiler. Actually, C# supports such a translation for a long time — LINQ with query syntax.

There are plenty of articles how to implement important methods. Here, I’ll just show this little extension:

public static class IdExtensions
{
    public static Id SelectMany<T, U, V>(this Id context, Func<T, Id<U>> transformation, Func<T, U, V> projection)
    {
        return Id.Return(projection(context.Val, transformation(context.Val).Val));
    }
}

Now, we can do this:

var result = from a in Id.Return(3)
    from b in Id.Return(4)
    select a + b;
result.Apply(c =>
{
    Console.WriteLine(c);
    return true;
});

And this prints 7 as expected. We can implement the same logic for MyTask. Having one interface with all methods implemented as defaults is harder as we don’t have higher kinded types.

The syntax now is very similar to do notation from Haskell or for from Scala. However, this doesn’t address the main point of async which is releasing the thread. We would like to trigger some calculations, let the thread do something else, and then come back to the place where we left. If you think coroutines — you’re on the right track. Actually, these things (coroutines, CPS, fibers, green threads) are pretty similar and can simulate each other (entirely or to some extent). Project Loom in JVM world is exploring this field to implement async in a way which doesn’t result in incompatible method flavours (synchronous and asynchronous).

Being all that said, we can think of a different implementation. You start with a fiber which sits on top of the message loop of the UI thread. It takes a message, creates new fiber and lets it process the message. When the new fiber needs to wait (just like await) it just lets the original fiber to carry on. Original fiber checks the message loop and (at some point) when it realizes the paused fiber can continue, it just schedules it again.

That is not trivial to implement, not to mention that fibers are officially not supported by .NET and are generally considered problematic. C# doesn’t support higher kinded types which limits the options even more.

]]>
https://blog.adamfurmanek.pl/2020/05/09/async-wandering-part-8/feed/ 0
Async Wandering Part 7 — Exceptions on unobserved tasks https://blog.adamfurmanek.pl/2020/05/02/async-wandering-part-7/ https://blog.adamfurmanek.pl/2020/05/02/async-wandering-part-7/#comments Sat, 02 May 2020 08:00:47 +0000 https://blog.adamfurmanek.pl/?p=3299 Continue reading Async Wandering Part 7 — Exceptions on unobserved tasks]]>

This is the seventh part of the Async Wandering series. For your convenience you can find other parts in the table of contents in Part 1 – Why creating Form from WinForms in unit tests breaks async?

Let’s take this code:

using System;
using System.Threading.Tasks;

namespace UnobservedException
{
    class Program
    {
        static void Main(string[] args)
        {
            Test();
        }

        public static async Task Test()
        {
            throw new Exception();
        }
    }
}

It doesn’t print any exception because we don’t await the task. Can we do something about it?

Let’s use the UnobservedTaskException handler:

using System;
using System.Threading.Tasks;

namespace UnobservedException
{
    class Program
    {
        static void Main(string[] args)
        {
            TaskScheduler.UnobservedTaskException += TaskScheduler_UnobservedTaskException;
            Test();

            Console.ReadLine();

            GC.Collect();
            GC.WaitForPendingFinalizers();
        }

        static void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
        {
            Console.WriteLine("Unobserved exception!");
            Console.WriteLine(e.Exception);
        }

        public static async Task Test()
        {
            throw new Exception();
        }
    }
}

You can now verify that it prints out the exception correctly after you press enter. When GC notices there is a task with an exception on it, it handles it and calls the delegate.

But! If you take a look at Task source it has no finalizer. How does it work?

If you decompile async method, you can see this:

// UnobservedException.Program.<Test>d__2
// Token: 0x06000006 RID: 6 RVA: 0x000020E0 File Offset: 0x000002E0
void IAsyncStateMachine.MoveNext()
{
	int num = this.<>1__state;
	try
	{
		throw new Exception();
	}
	catch (Exception exception)
	{
		this.<>1__state = -2;
		this.<>t__builder.SetException(exception);
	}
}

So there is some SetException method. It creates a TaskExceptionHolder under the hood. That type has a finalizer and does the following:

AggregateException ex4 = new AggregateException(Environment.GetResourceString("TaskExceptionHolder_UnhandledException"), this.m_faultExceptions);
UnobservedTaskExceptionEventArgs unobservedTaskExceptionEventArgs = new UnobservedTaskExceptionEventArgs(ex4);
TaskScheduler.PublishUnobservedTaskException(this.m_task, unobservedTaskExceptionEventArgs);

So it calls the handler.

]]>
https://blog.adamfurmanek.pl/2020/05/02/async-wandering-part-7/feed/ 2
Async Wandering Part 6 — Exceptions logging https://blog.adamfurmanek.pl/2020/04/25/async-wandering-part-6/ https://blog.adamfurmanek.pl/2020/04/25/async-wandering-part-6/#comments Sat, 25 Apr 2020 08:00:47 +0000 https://blog.adamfurmanek.pl/?p=3294 Continue reading Async Wandering Part 6 — Exceptions logging]]>

This is the sixth part of the Async Wandering series. For your convenience you can find other parts in the table of contents in Part 1 – Why creating Form from WinForms in unit tests breaks async?

Let’s take the following code:

using System;
using System.Threading.Tasks;

namespace AggregateException
{
    class Program
    {
        static void Main(string[] args)
        {
            CreateAndAwait().Wait();
        }

        static async Task CreateAndAwait()
        {
            await CreateTask();
        }

        static Task CreateTask()
        {
            return Task.Factory.StartNew(() =>
            {
                Task.Factory.StartNew(() => { throw new Exception("FIRST TASK EXCEPTION!!!"); }, 
                    TaskCreationOptions.AttachedToParent);
                Task.Factory.StartNew(() => { throw new Exception("SECOND TASK EXCEPTION!!!"); }, 
                    TaskCreationOptions.AttachedToParent);
                Console.WriteLine("Attached both children");
            });
        }
    }
}

We create a task and add two child tasks to it, both of them throwing exceptions. What is the output?

Attached both children

Unhandled Exception: System.AggregateException: One or more errors occurred. ---> System.AggregateException: One or more errors occurred. ---> System.Exception: SECOND TASK EXCEPTION!!!
   at AggregateException.Program.<>c.<CreateTask>b__3_2() in C:\Users\afish\Desktop\msp_windowsinternals\AggregateException\Program.cs:line 35
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.Execute()
   --- End of inner exception stack trace ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at AggregateException.Program.<CreateAndAwait>d__2.MoveNext() in C:\Users\afish\Desktop\msp_windowsinternals\AggregateException\Program.cs:line 26
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at AggregateException.Program.Main(String[] args) in C:\Users\afish\Desktop\msp_windowsinternals\AggregateException\Program.cs:line 10

So we lost one exception. This is a case when introducing async changes semantics and doesn’t integrate with the platform nicely. How to fix that?

With this:

static Task CreateAndWait()
{
    Task action = CreateTask();
    Task faulted = action.ContinueWith(p => Console.WriteLine(p.Exception), TaskContinuationOptions.OnlyOnFaulted);
    Task succeeded = action.ContinueWith(r => { }, TaskContinuationOptions.OnlyOnRanToCompletion);
    return Task.WhenAll(faulted, succeeded);
}

This correctly prints out both exceptions, however, we need to use normal TPL methods.

]]>
https://blog.adamfurmanek.pl/2020/04/25/async-wandering-part-6/feed/ 2