Rx.net – Random IT Utensils https://blog.adamfurmanek.pl IT, operating systems, maths, and more. Sat, 02 Jan 2021 08:52:49 +0000 en-US hourly 1 https://wordpress.org/?v=6.6.2 Rx — Custom traffic shaper https://blog.adamfurmanek.pl/2020/12/19/rx-custom-traffic-shaper/ https://blog.adamfurmanek.pl/2020/12/19/rx-custom-traffic-shaper/#respond Sat, 19 Dec 2020 09:00:16 +0000 https://blog.adamfurmanek.pl/?p=3692 Continue reading Rx — Custom traffic shaper]]> Regular Rx Buffer(200) emits data every 200 milliseconds. Let’s implement an operator which will emit data each 200 milliseconds or immediately if nothing was emitted in last interval. It’s like a traffic shaper which will give you data as soon as possible bot not more often than each 200 millis.

Sample implementation:

using System;
using System.Threading.Tasks;
using System.Threading;
using System.Reactive;
using System.Reactive.Linq;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Subjects;
					
public class Program
{
	public static void Main()
	{
		Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " beginning");
		var subject = new BehaviorSubject(0);
		subject.MagicBuffer(TimeSpan.FromMilliseconds(200)).Subscribe(x => Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " Received " + string.Join(",", x))); // Should emit [0] immediately
		Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " emitting 1,2,3");
		subject.OnNext(1); // Should cache
		subject.OnNext(2); // Should cache
		subject.OnNext(3); // Should cache
		Thread.Sleep(250); // Should emit [1, 2, 3]
		Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " should emit empty in a bit");
		Thread.Sleep(250); // Should emit []
		Console.WriteLine(DateTime.Now.ToString("hh:mm:ss.fff") + " emitting 4");
		subject.OnNext(4); // Should emit immediately
		subject.OnNext(5); // Should cache
		Thread.Sleep(200);
	}
}

public static class Ext{
	public static IObservable<IList> MagicBuffer(this IObservable source, TimeSpan delay)
	{
		return Observable.Create<IList>(o => {
			DateTime lastEmitTime = DateTime.MinValue;
			DateTime lastCallTime = DateTime.MinValue;
			IList values = new List();
			Task continuation = Task.CompletedTask;
			Action callback = _ => {
				lock(values){
					if(DateTime.Now - lastCallTime >= delay){
						lastCallTime = DateTime.Now;
						o.OnNext(values);
						values.Clear();
					}
				}
			};
			Action recur = null;
			recur = _ => Task.Delay(delay).ContinueWith(ignored => {
				continuation = continuation.ContinueWith(callback).ContinueWith(recur);
			});
			recur(null);

			return source.Subscribe(
				v => {
					lock(values){
						if(DateTime.Now - lastEmitTime >= delay){
							lastEmitTime = lastCallTime = DateTime.Now;
							o.OnNext(new List{v});
							recur(null);
						}else{
							values.Add(v);
						}
					}
				},
				o.OnError,
				o.OnCompleted
			);
		});
	}
}

Output:

06:47:07.311 beginning
06:47:07.374 Received 0
06:47:07.374 emitting 1,2,3
06:47:07.577 Received 1,2,3
06:47:07.640 should emit empty in a bit
06:47:07.780 Received 
06:47:07.905 emitting 4
06:47:07.905 Received 4
06:47:08.108 Received 5

This is not optimal, there are probably too many continuations scheduled, also we lock directly instead of using some interlocked. However, seems like it’s working.

]]>
https://blog.adamfurmanek.pl/2020/12/19/rx-custom-traffic-shaper/feed/ 0