General purpose thread-safe event class. Features: * Thread-safety. * Supports capturing the SyncronizationContext (Which makes it useable in the Unity3d engine). * Provides a `Task` api for waiting for events. * Provides a callback (Subscribe / Unsubscribe) api for receiving events. * Reasonable performance. * Ability to pass a custom `subscriptionToken` makes it easier to unsubscribe lambda's. * Supports storing data that was invoked when no-one was subscribed (or waiting) yet. Allocations: * Invoke is allocation free (after it has warmed up some caches). * Waiting using the `Task` api allocates a handler on the heap. * New subscriptions allocates a handler on the heap. * Unsubscriptions are allocation free (Depending on 'Equals' implementation on your provided subscriptionToken). Unity3d note: If you call 'Subscribe' from the unity-thread (and leave `callOnCapturedContext` at `true`) then your callback will only be invoked on the unity-thread making it very safe to use. Example: ```c# class Program { static async Task Main() { var worker = new Worker(); // Async style: var result = await worker.WorkReady.WaitAsync(); Console.WriteLine($"Got async: '{result}'"); // Callback style: worker.WorkReady.Subscribe(WorkReady); Console.ReadKey(); worker.WorkReady.Unsubscribe(WorkReady); } static void WorkReady(int output) => Console.WriteLine($"Got from subscription: '{output}'"); class Worker : IExceptionHandler { readonly SynchronizedEvent outputEvent; readonly Task backgroundWork; public Worker() { this.outputEvent = new SynchronizedEvent( exceptionHandler: this, storeUnobservedData: true); this.backgroundWork = Task.Run(BackgroundWorkAsync); } public IReadOnlySynchronizedEvent WorkReady => this.outputEvent; void IExceptionHandler.Handle(Exception e) => Console.Error.Write(e); async Task BackgroundWorkAsync() { while (true) { await Task.Delay(1000); this.outputEvent.Invoke(42); } } } } ``` `SynchronizedEvent`: ```c# using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; /// /// Interface for exception handlers. /// public interface IExceptionHandler { /// /// Handle given exception. /// /// Implementation should not rethrow in this call-stack. /// Exception to handle. void Handle(Exception exception); } /// /// Thread-safe event that supports task and callback style listening. /// /// Type of the event data. public interface IReadOnlySynchronizedEvent { /// /// Wait for an event to be fired. /// /// Token to be able to cancel the task. /// Task that completes when an event is received or is cancelled. Task WaitAsync(CancellationToken cancelToken = default); /// /// Subscribe to events. /// /// Action to invoke when an event is fired. /// /// Token to use for unsubscribing, if none is provided then the 'action' will be used. /// /// /// Should the action only be called on the SynchronizationContext that was active when /// subscribing. /// void Subscribe(Action action, object subscriptionToken = null, bool callOnCapturedContext = true); /// /// Subscribe to events. /// /// /// State parameter can be used to avoid having to use a closure to capture state. /// /// Action to invoke when an event is fired. /// State that is passed to the action. /// /// Token to use for unsubscribing, if none is provided then the 'action' will be used. /// /// /// Should the action only be called on the SynchronizationContext that was active when /// subscribing. /// void Subscribe( Action action, object state, object subscriptionToken = null, bool callOnCapturedContext = true); /// /// Unsubscribe from events. /// /// Action that was used as the subscriptionToken. /// True if successfully unsubscribed otherwise False. bool Unsubscribe(Action action); /// /// Unsubscribe from events. /// /// Token that was used for subscribing. /// True if successfully unsubscribed otherwise False. bool Unsubscribe(object subscriptionToken); } /// /// Thread-safe event that supports task and callback style listening. /// /// Thread safety: Api should be threadsafe. /// /// Type of the event data. public sealed class SynchronizedEvent : IReadOnlySynchronizedEvent, IDisposable { [ThreadStatic] private static List awaiterInvokeList; [ThreadStatic] private static List subInvokeList; private readonly IExceptionHandler exceptionHandler; private readonly bool storeUnobservedData; private readonly bool allowSynchronousContinuations; private readonly object awaitersLock = new object(); private readonly List awaiters = new List(); private readonly object subscribersLock = new object(); private readonly List subscribers = new List(); private readonly Lazy> unobservedData = new Lazy>(); private volatile int disposeCount; /// /// Initializes a new instance of the class. /// /// Handler for dealing with exceptions during callback invoke. /// Should data be stored when there is no-one listening. /// /// Are task-continuations allowed to execute synchronously. /// Use with caution as its easy to shoot yourself in the foot with this and cause deadlocks. /// For example if a awaiter would block its thread after the await that would also block the /// 'Invoke' call of this event. /// Why would you ever want to enable this? Well if you know its safe in your usecase then its /// faster as it requires less context switching. /// public SynchronizedEvent( IExceptionHandler exceptionHandler, bool storeUnobservedData = false, bool allowSynchronousContinuations = false) { this.exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler)); this.storeUnobservedData = storeUnobservedData; this.allowSynchronousContinuations = allowSynchronousContinuations; } /// /// Check if the given task was created by . /// /// Task to check. /// True if task was created by otherwise False. public static bool IsOwnedBy(Task task) { if (task is null) throw new ArgumentNullException(nameof(task)); return task.AsyncState != null && task.AsyncState is AwaiterHandle; } /// /// Attempt to cancel a task that is created by . /// /// /// Returns false when given a task that was not created from . /// /// Task to attempt to cancel. /// True if successfully cancelled, otherwise False. public static bool TryCancel(Task task) { if (task is null) throw new ArgumentNullException(nameof(task)); var handle = task.AsyncState as AwaiterHandle; if (handle == null) return false; return handle.TryCancel(); } /// public Task WaitAsync(CancellationToken cancelToken = default) => this.WaitAsyncInternal(cancelToken); /// public void Subscribe( Action action, object subscriptionToken = null, bool callOnCapturedContext = true) { if (action is null) throw new ArgumentNullException(nameof(action)); this.SubscribeInternal(action, state: null, subscriptionToken ?? action, callOnCapturedContext); } /// public void Subscribe( Action action, object state, object subscriptionToken = null, bool callOnCapturedContext = true) { if (action is null) throw new ArgumentNullException(nameof(action)); this.SubscribeInternal(action, state, subscriptionToken ?? action, callOnCapturedContext); } /// public bool Unsubscribe(Action action) { if (action is null) throw new ArgumentNullException(nameof(action)); return this.UnsubscribeInternal(action); } /// public bool Unsubscribe(object subscriptionToken) { if (subscriptionToken is null) throw new ArgumentNullException(nameof(subscriptionToken)); return this.UnsubscribeInternal(subscriptionToken); } /// /// Invoke the event. /// /// Data of the event. public void Invoke(T data) => this.InvokeInternal(data); /// public void Dispose() { // Using 'Interlocked.Increment' to make sure we only dispose once even when called concurrently. if (Interlocked.Increment(ref this.disposeCount) == 1) this.DisposeInternal(); } private Task WaitAsyncInternal(CancellationToken cancelToken = default) { // If the event has been disposed or cancellation is already requested then early out. if (this.disposeCount != 0 || cancelToken.IsCancellationRequested) return Task.FromCanceled(cancelToken); // If there is any unobserved data then return that. if (this.unobservedData.IsValueCreated) { if (this.unobservedData.Value.TryDequeue(out var data)) return Task.FromResult(data); } // Otherwise create a handle to wait for a invoke. var handle = new AwaiterHandle(cancelToken, this.allowSynchronousContinuations); // Register the handle. lock (this.awaitersLock) { this.awaiters.Add(handle); } return handle.WaitForInvoke; } private void SubscribeInternal( object action, object state, object subscriptionToken, bool callOnCapturedContext = true) { Debug.Assert(action is Action || action is Action, "Invalid action type"); Debug.Assert(subscriptionToken != null, "Missing subscription token"); // If the event has been disposed it can never receive messages anymore. if (this.disposeCount != 0) throw new ObjectDisposedException(nameof(SynchronizedEvent)); // Create handle for this subscription. var syncContext = callOnCapturedContext ? SynchronizationContext.Current : null; var handle = new SubscribeHandle(action, state, subscriptionToken, syncContext, this.exceptionHandler); lock (this.subscribersLock) { this.subscribers.Add(handle); } // Handle any unobserved data. if (this.unobservedData.IsValueCreated) { var queue = this.unobservedData.Value; while (queue.TryDequeue(out var data)) handle.Invoke(data); } } private bool UnsubscribeInternal(object subscriptionToken) { // If the event has been disposed then there is no need to unsubscribe anymore. if (this.disposeCount != 0) return false; var removed = false; // Remove all subscribers with the same 'subscriptionToken'. lock (this.subscribersLock) { for (int i = this.subscribers.Count - 1; i >= 0; i--) { if (this.subscribers[i].SubscriptionToken.Equals(subscriptionToken)) { // Safe to call dispose here while holding the lock because it only sets a bool. this.subscribers[i].Dispose(); this.subscribers.RemoveAt(i); removed = true; } } } return removed; } private void InvokeInternal(T data) { if (this.disposeCount != 0) throw new ObjectDisposedException(nameof(SynchronizedEvent)); // Keep track if someone was invoked with this data. var observed = false; /* Complete all the awaiters. Note: We first make a list of things to invoke before invoking them, reason is otherwise there would be a deadlock if a continuation that executes synchronously would call 'WaitAsync' on this event again. */ // 'awaiterInvokeList' is thread-static so this is safe to do. if (awaiterInvokeList == null) awaiterInvokeList = new List(); else awaiterInvokeList.Clear(); // Gather all awaiters to invoke and clear them. lock (this.awaitersLock) { awaiterInvokeList.AddRange(this.awaiters); this.awaiters.Clear(); } // Invoke the awaiters. foreach (var awaiter in awaiterInvokeList) observed |= awaiter.TryComplete(data); /* Invoke all the subscribers. Note: We first make a list of things to invoke before invoking them, reason is otherwise there would be a deadlock if someone called Subscribe / Unsubscribe from inside a invoke call. */ // 'subInvokeList' is thread-static so this is safe to do. if (subInvokeList == null) subInvokeList = new List(); else subInvokeList.Clear(); // Gather all the subscribers to invoke. lock (this.subscribersLock) { subInvokeList.AddRange(this.subscribers); } // Invoke the subscribers. foreach (var sub in subInvokeList) sub.Invoke(data); observed |= subInvokeList.Count != 0; // Store data that was not observed. if (!observed && this.storeUnobservedData) this.unobservedData.Value.Enqueue(data); } private void DisposeInternal() { // Dispose all the awaiters. lock (this.awaitersLock) { /* It is safe to call 'Dispose' here while holding the lock because we've already marked the event as disposed so new calls to 'WaitAsync' will be denied so they cannot cause a deadlock */ foreach (var awaiter in this.awaiters) awaiter.Dispose(); this.awaiters.Clear(); } // Dispose all the subscribers. lock (this.subscribersLock) { foreach (var sub in this.subscribers) sub.Dispose(); this.subscribers.Clear(); } } private sealed class AwaiterHandle { private readonly TaskCompletionSource completeSource; private readonly CancellationTokenRegistration cancelReg; internal AwaiterHandle(CancellationToken cancelToken, bool allowSynchronousContinuations) { /* Save this handle in the async state of the 'WaitForInvoke' task, that way we can cancel a task by fishing the handle out of its state. */ this.completeSource = new TaskCompletionSource( state: this, allowSynchronousContinuations ? TaskCreationOptions.None : TaskCreationOptions.RunContinuationsAsynchronously); this.cancelReg = cancelToken.Register(this.Dispose, useSynchronizationContext: false); } internal Task WaitForInvoke { get { var result = this.completeSource.Task; Debug.Assert(result.AsyncState == this, "Invalid async-state"); return result; } } internal bool TryComplete(T data) { this.cancelReg.Dispose(); return this.completeSource.TrySetResult(data); } internal bool TryCancel() { this.cancelReg.Dispose(); return this.completeSource.TrySetCanceled(); } internal void Dispose() => this.TryCancel(); } private sealed class SubscribeHandle { private readonly object action; private readonly object state; private readonly object subscriptionToken; private readonly SynchronizationContext context; private readonly IExceptionHandler exceptionHandler; private volatile bool isDisposed; internal SubscribeHandle( object action, object state, object subscriptionToken, SynchronizationContext context, IExceptionHandler exceptionHandler) { Debug.Assert(action != null && (action is Action || action is Action), "Invalid action"); Debug.Assert(action != null, "Missing subscription token"); Debug.Assert(exceptionHandler != null, "Missing exception handler"); this.action = action; this.state = state; this.subscriptionToken = subscriptionToken; this.context = context; this.exceptionHandler = exceptionHandler; } internal object SubscriptionToken => this.subscriptionToken; internal void Invoke(T data) { // Execute immediately if no context is required or if we are on the right context. if (this.context == null || SynchronizationContext.Current == this.context) { this.InvokeActionInline(data); } else { this.context.Post(this.OnPost, data); } } internal void Dispose() => this.isDisposed = true; private void OnPost(object input) { Debug.Assert(input is T, "Posted input is of incorrect type"); /* Check if the handle has been disposed as otherwise its possible that user action is invoked after unsubscribing (if it was already posted to the sync-context) */ if (this.isDisposed) return; this.InvokeActionInline((T)input); } private void InvokeActionInline(T input) { try { if (this.action is Action statelessAction) statelessAction.Invoke((T)input); else (this.action as Action).Invoke((T)input, this.state); } catch (Exception e) { this.exceptionHandler.Handle(e); } } } } ```