concurrency – Random IT Utensils https://blog.adamfurmanek.pl IT, operating systems, maths, and more. Sat, 02 Jan 2021 19:08:20 +0000 en-US hourly 1 https://wordpress.org/?v=6.7.1 Concurrency Part 10 — Reentrant mutex https://blog.adamfurmanek.pl/2019/11/02/concurrency-part-10/ https://blog.adamfurmanek.pl/2019/11/02/concurrency-part-10/#respond Sat, 02 Nov 2019 09:00:31 +0000 https://blog.adamfurmanek.pl/?p=3157 Continue reading Concurrency Part 10 — Reentrant mutex]]>

This is the tenth part of the Concurrency series. For your convenience you can find other parts in the table of contents in Part 1 – Mutex performance in .NET

We have already seen how to implement custom mutex using memory mapped files and CAS operation. It has one drawback — it is not reentrant. Actually, if we try to take if recursively we will end up with deadlock. Today we will fix it.

Since our mutex protocol requires a delegate to execute while holding mutex (kind of a RAII pattern or try-with-resources approach) we don’t need to worry about taking the mutex and not releasing it. Conceptually, this is not allowed:

mutex.Lock()
mutex.Lock() // again
...
mutex.Release()
// Not releasing again

It is guaranteed that mutex will be released once it’s taken. Well, this is not 100% true as there are AccessViolation exceptions resulting in finally block not being executed but we will ignore this fact. So, we don’t need to take lock for the second time if we can guarantee that we won’t try releasing it.

Let’s see this code:

using System;
using System.Diagnostics;
using System.IO.MemoryMappedFiles;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;

namespace MutexUtils
{
    public static class MutexUtils
    {
        [DllImport("kernel32.dll", SetLastError = true)]
        unsafe public static extern long InterlockedCompareExchange64(long* destination, long exchange, long comperand);

        public static unsafe bool DoWithMutex(string name, Action action, int timeoutInSeconds = int.MaxValue, int spinTimeout = 250)
        {
			var myPid = Process.GetCurrentProcess().Id;
			var myTid = AppDomain.GetCurrentThreadId();
			
            // Store <PID><TID> as one field
            long myIdentifier = ((long)myPid << 32) + myTid;

            // Calculate timeout wall clock
            DateTime end = DateTime.UtcNow.AddSeconds(timeoutInSeconds);

            // Open memory mapped file initialized with zeros
            using (var memoryMappedFile = MemoryMappedFile.CreateOrOpen(name, 8))
            using (var viewStream = memoryMappedFile.CreateViewStream(8, 8))
            {
                var pointer = viewStream.SafeMemoryMappedViewHandle.DangerousGetHandle();
				
				try
				{
					var currentLock = Interlocked.Read((long*)pointer);
					if(GetPid(currentLock) == myPid && GetTid(currentLock) == myTid){
						action();
						return true;
					}
				}
				catch(Exception e)
				{
					Console.WriteLine($"Something very bad happened: {e}");
					return false;
				}

                try
                {
                    int holderPid = -1;
                    int holderTid = -1;

                    while (DateTime.UtcNow < end)
                    {
                        // Take lock only if it is not taken
                        var currentLock = InterlockedCompareExchange64((long*)pointer, myIdentifier, 0);

                        if (currentLock == 0)
                        {
                            action();
                            return true;
                        }

                        // Lock is taken, let's see who holds it
                        holderTid = GetTid(currentLock);
                        holderPid = GetPid(currentLock);

                        bool exists = false;
                        try
                        {
                            exists = Process.GetProcessById(holderPid).Threads.OfType<ProcessThread>().Any(t => t.Id == holderTid);
                        }
                        catch
                        {
                        }

                        // If holding thread doesn't exist then the lock is abandoned
                        if (!exists)
                        {
                            // Clear lock only if it is still held by previous owner
                            var currentLock2 = InterlockedCompareExchange64((long*)pointer, 0, currentLock);
                            if (currentLock == currentLock2)
                            {
                               Console.WriteLine($"Mutex {name} was abandoned by pid={holderPid} tid={holderTid}");
                            }
                        }

                        Thread.Sleep(spinTimeout);
                    }

                    Console.WriteLine($"Timeout when waiting on mutex {name} held by pid={holderPid} tid={holderTid}");
                }
                finally
                {
                    // Clear lock only if I'm the one holding it
                    var currentLock = InterlockedCompareExchange64((long*)pointer, 0, myIdentifier);
                    if (currentLock != myIdentifier)
                    {
                        Console.WriteLine($"I tried to release mutex held by someone else, pid={GetPid(currentLock)} tid={GetTid(currentLock)}");
                    }
                }
            }

            return false;
        }

        private static int GetPid(long value)
        {
            return (int)(value >> 32);
        }

        private static int GetTid(long value)
        {
            return (int)(value & 0xFFFFFFFF);
        }
    }
}

We check in line 34 if the current owner of the lock is us — in that case we just execute the action and return early. If something wrong happened (we have exception) then we cannot assume anything about lock owner or executed action so we need to return immediately. The only thing we know for sure is that we didn’t modify the lock. You may be tempted to set a flag that lock is on us and return it in the exception handler — thinking that the exception was thrown by the executed action — but you cannot guarantee that you started executing the action (think about OOM thrown after setting the flag and before calling the handler). You could try enhancing this by using CER but then you constrain yourself a lot.

]]>
https://blog.adamfurmanek.pl/2019/11/02/concurrency-part-10/feed/ 0
Concurrency Part 9 — Semaphores with custom locks https://blog.adamfurmanek.pl/2019/10/26/concurrency-part-9/ https://blog.adamfurmanek.pl/2019/10/26/concurrency-part-9/#respond Sat, 26 Oct 2019 08:00:16 +0000 https://blog.adamfurmanek.pl/?p=3154 Continue reading Concurrency Part 9 — Semaphores with custom locks]]>

This is the ninth part of the Concurrency series. For your convenience you can find other parts in the table of contents in Part 1 – Mutex performance in .NET

Last time we implemented custom mutex based on memory mapped files. We can use it to track who owns the lock in much simpler way. Today we will implement a counting semaphore.

Idea is simple: we want to create multiple mutexes, iterate through all of them and choose first available. If none is free, we just choose one and wait on it for a little longer. This is very dirty implementation and can lead to starvation, though.

public static void DoWithSemaphore(string name, int semaphoresCount, Action action)
{
	bool checkedAll = false;
	string fullName;

	for (int index = 0; ; index = (index + 1) % semaphoresCount)
	{
		fullName = name + index;
		if (DoWithMutex(fullName, action, checkedAll ? 20 : 1))
		{
			break;
		}

		if (index == semaphoresCount - 1)
		{
			checkedAll = true;
		}
	}
}

So we accept number of semaphores (each of them being a binary semaphore) and we try to implement naive WaitAny. This is obviously less than optimal and doesn’t work every time, but in reasonable system should perform pretty well. Once again, your mileage may vary and I take no responsibility.

]]>
https://blog.adamfurmanek.pl/2019/10/26/concurrency-part-9/feed/ 0
Concurrency Part 8 — Tracking mutex owner https://blog.adamfurmanek.pl/2019/10/19/concurrency-part-8/ https://blog.adamfurmanek.pl/2019/10/19/concurrency-part-8/#respond Sat, 19 Oct 2019 08:00:51 +0000 https://blog.adamfurmanek.pl/?p=3148 Continue reading Concurrency Part 8 — Tracking mutex owner]]>

This is the eighth part of the Concurrency series. For your convenience you can find other parts in the table of contents in Part 1 – Mutex performance in .NET

We know how to use global mutexes to synchronize processes. However, there is a big drawback — we don’t know who owns the mutex and we cannot get that information easily. There is https://docs.microsoft.com/en-us/windows/win32/debug/wait-chain-traversal API for reading that information but it is not easy to follow. Can we do better?

One common trick is to use memory mapped files to store the information. Let’s see the code:

using System;
using System.Diagnostics;
using System.IO.MemoryMappedFiles;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;

namespace MutexUtils
{
    public static class MutexUtils
    {
        [DllImport("kernel32.dll", SetLastError = true)]
        unsafe public static extern long InterlockedCompareExchange64(long* destination, long exchange, long comperand);

        public static unsafe bool DoWithMutex(string name, Action action, int timeoutInSeconds = int.MaxValue, int spinTimeout = 250)
        {
            // Store <PID><TID> as one field
            long myIdentifier = ((long)Process.GetCurrentProcess().Id << 32) + AppDomain.GetCurrentThreadId();

            // Calculate timeout wall clock
            DateTime end = DateTime.UtcNow.AddSeconds(timeoutInSeconds);

            // Open memory mapped file initialized with zeros
            using (var memoryMappedFile = MemoryMappedFile.CreateOrOpen(name, 8))
            using (var viewStream = memoryMappedFile.CreateViewStream(8, 8))
            {
                var pointer = viewStream.SafeMemoryMappedViewHandle.DangerousGetHandle();

                try
                {
                    int holderPid = -1;
                    int holderTid = -1;

                    while (DateTime.UtcNow < end)
                    {
                        // Take lock only if it is not taken
                        var currentLock = InterlockedCompareExchange64((long*)pointer, myIdentifier, 0);

                        if (currentLock == 0)
                        {
                            action();
                            return true;
                        }

                        // Lock is taken, let's see who holds it
                        holderTid = GetTid(currentLock);
                        holderPid = GetPid(currentLock);

                        bool exists = false;
                        try
                        {
                            exists = Process.GetProcessById(holderPid).Threads.OfType<ProcessThread>().Any(t => t.Id == holderTid);
                        }
                        catch
                        {
                        }

                        // If holding thread doesn't exist then the lock is abandoned
                        if (!exists)
                        {
                            // Clear lock only if it is still held by previous owner
                            var currentLock2 = InterlockedCompareExchange64((long*)pointer, 0, currentLock);
                            if (currentLock == currentLock2)
                            {
                               Console.WriteLine($"Mutex {name} was abandoned by pid={holderPid} tid={holderTid}");
                            }
                        }

                        Thread.Sleep(spinTimeout);
                    }

                    Console.WriteLine($"Timeout when waiting on mutex {name} held by pid={holderPid} tid={holderTid}");
                }
                finally
                {
                    // Clear lock only if I'm the one holding it
                    var currentLock = InterlockedCompareExchange64((long*)pointer, 0, myIdentifier);
                    if (currentLock != myIdentifier)
                    {
                        Console.WriteLine($"I tried to release mutex held by someone else, pid={GetPid(currentLock)} tid={GetTid(currentLock)}");
                    }
                }
            }

            return false;
        }

        private static int GetPid(long value)
        {
            return (int)(value >> 32);
        }

        private static int GetTid(long value)
        {
            return (int)(value & 0xFFFFFFFF);
        }
    }
}

Magic, a lot. Let’s go part by part.

First, we want to store process ID and thread ID somewhere in the lock to be able to read it easily later. In line 18 we encode those in one 64-bit long variable which we can later replace using CAS operation.

In line 21 we just calculate the time when we should stop trying to get lock. This is for timeouts.

Next, we create memory mapped file (lines 24, 25) not backed by any physical file. This is to avoid permission problems — we cannot map the same file in two processes without copy-on-write semantics. We will need separate reader for debugging.

Next, we spin in the loop. Each time we try to take lock (line 37). If current lock value is zero (line 39) then it means that our lock is available and we have just locked it. We execute the action and then return (which is jump to finally).

However, if lock wasn’t available, we now have the owner of the lock (lines 46 and 47). So we need to check if the owner is still alive. We read process and look for thread in line 52.

If it didn’t exist, we try to clear the lock (line 62). We clear it only if it is still held by the same owner. And here is big warning — process ID and thread ID can be recycled so here we may inadvertently release still used mutex!

Then in line 69 we sleep for some timeout and loop again.

Ultimately, in line 77 we try to clear the lock if it is taken by us. We may end up in this line of code if some exception appears so we cannot just blindly release the mutex.

That’s all, you can verify that it should work pretty well. Just be aware of this chance of clearing up some other mutex, you may come up with different identifier if needed.

In theory, this can be solved by using CAS for 128 bits:

public static class MutexUtils
{
	public static void DoWithSemaphore(string name, int semaphoresCount, Action action)
	{
		bool checkedAll = false;
		string fullName;

		for (int index = 0; ; index = (index + 1) % semaphoresCount)
		{
			fullName = name + index;
			if (DoWithMutex(fullName, action, checkedAll ? 20 : 1))
			{
				break;
			}

			if (index == semaphoresCount - 1)
			{
				checkedAll = true;
			}
		}
	}

	[DllImport("kernel32.dll", SetLastError = true)]
	unsafe public static extern bool InterlockedCompareExchange128(long* destination, long exchangeHigh, long exchangeLow, long* comperand);

	public static unsafe bool DoWithMutex(string name, Action action, int timeoutInSeconds = int.MaxValue, int spinTimeout = 250)
	{
		// Store <PID><TID> as one field
		long myIdentifier = ((long)Process.GetCurrentProcess().Id << 32) + AppDomain.GetCurrentThreadId();
		long myTime = DateTime.UtcNow.Ticks;

		// Calculate timeout wall clock
		DateTime end = DateTime.UtcNow.AddSeconds(timeoutInSeconds);

		// Open memory mapped file initialized with zeros
		using (var memoryMappedFile = MemoryMappedFile.CreateOrOpen(name, 8))
		using (var viewStream = memoryMappedFile.CreateViewStream(8, 8))
		{
			var pointer = viewStream.SafeMemoryMappedViewHandle.DangerousGetHandle();
			long* currentLock = stackalloc long[2];
			currentLock[0] = 0;
			currentLock[1] = 0;

			try
			{
				int holderPid = -1;
				int holderTid = -1;

				while (DateTime.UtcNow < end)
				{
					// Take lock only if it is not taken
					var isLockFree = InterlockedCompareExchange128((long*)pointer, myIdentifier, myTime, currentLock);

					if (isLockFree)
					{
						action();
						return true;
					}

					// Lock is taken, let's see who holds it
					holderTid = GetTid(currentLock[0]);
					holderPid = GetPid(currentLock[1]);

					bool exists = false;
					try
					{
						exists = Process.GetProcessById(holderPid).Threads.OfType<ProcessThread>().Any(t => t.Id == holderTid);
					}
					catch
					{
					}

					// If holding thread doesn't exist then the lock is abandoned
					if (!exists)
					{
						// Clear lock only if it is still held by previous owner
						var isLockStillOwnedByTheSameOwner = InterlockedCompareExchange128((long*)pointer, 0, 0, currentLock);
						if (isLockStillOwnedByTheSameOwner)
						{
						   Console.WriteLine($"Mutex {name} was abandoned by pid={holderPid} tid={holderTid}");
						}
					}

					Thread.Sleep(spinTimeout);
				}

				Console.WriteLine($"Timeout when waiting on mutex {name} held by pid={holderPid} tid={holderTid}");
			}
			finally
			{
				// Clear lock only if I'm the one holding it
				currentLock[0] = myIdentifier;
				currentLock[1] = myTime;
				if (!InterlockedCompareExchange128((long*)pointer, 0, 0, currentLock))
				{
					Console.WriteLine($"I tried to release mutex held by someone else, pid={GetPid(currentLock[0])} tid={GetTid(currentLock[1])}");
				}
			}
		}

		return false;
	}

	private static int GetPid(long value)
	{
		return (int)(value >> 32);
	}

	private static int GetTid(long value)
	{
		return (int)(value & 0xFFFFFFFF);
	}
}

However, it doesn’t work on my machine, my kernel32.dll doesn’t support this CAS operation.

And since this is playing with very dangerous primitives, I take no responsibility for any failures in your systems if it doesn’t work. I tested it in my situation and it behaved correctly, though.

]]>
https://blog.adamfurmanek.pl/2019/10/19/concurrency-part-8/feed/ 0
Concurrency Part 7 — Semaphores trickery https://blog.adamfurmanek.pl/2019/10/12/concurrency-part-7/ https://blog.adamfurmanek.pl/2019/10/12/concurrency-part-7/#comments Sat, 12 Oct 2019 08:00:33 +0000 https://blog.adamfurmanek.pl/?p=3146 Continue reading Concurrency Part 7 — Semaphores trickery]]>

This is the seventh part of the Concurrency series. For your convenience you can find other parts in the table of contents in Part 1 – Mutex performance in .NET

Last time we examined an interesting behavior of Mutex when it is abandoned. Today we will look into Semaphore.

Typical interview question is: what is the difference between a binary mutex and a semaphore? Exemplary answer is: mutex tracks owner, semaphore doesn’t. This is not always true, not all mutexes do that (ones that do are called recursive mutexes). However, based on this definition we can easily see a problem:

using System;
using System.Threading;

namespace AbandonedSemaphore
{
    class Program
    {
        static void Main(string[] args)
        {
            var semaphore = new Semaphore(2, 2, "semaphoreName");

            semaphore.WaitOne();
            Console.WriteLine("Acquired!");

            Console.ReadLine();
            semaphore.Release();
            Console.WriteLine("Done");
        }
    }
}

Run three instances and then kill first two. Last instance will never get the semaphore. Why? Because semaphore doesn’t track the owner so it cannot throw any exception when process dies.

Can we make it better? Yes:

using System;
using System.Linq;
using System.Threading;

namespace AbandonedSemaphoreBetter
{
    class Program
    {
        static void Main(string[] args)
        {
            var mutexes = Enumerable.Range(0, 2).Select(i => new Mutex(false, "mutexName" + i)).ToArray();

            int id = -1;
            try
            {
                id = WaitHandle.WaitAny(mutexes);
            }
            catch(AbandonedMutexException e)
            {
                id = e.MutexIndex;
            }

            Console.WriteLine("Acquired");

            Console.ReadLine();

            mutexes[id].ReleaseMutex();

            Console.WriteLine("Released");
        }
    }
}

We have an array of mutexes and track which one was acquired. Notice that id is also available in the exception so we know which one was freed by killing the process.

]]>
https://blog.adamfurmanek.pl/2019/10/12/concurrency-part-7/feed/ 1