using System; using System.Threading.Channels; using System.Threading.Tasks; namespace ChannelsAreCool { //Disclaimer : I didn't actually run this code so it might not quite work. //Feel free to complain or ask questions and i'll fix it. public static class Example { public static async Task RunExample() { const int maxMessagesToBuffer = 100; var channel = Channel.CreateBounded(maxMessagesToBuffer); //bounded channels are important if the consumer/reader is slower than the //producer. You don't want your app to keep buffering until you explode with //an OutOfMemoryException in production... or use .CreateUnbounded if you don't care var reader = channel.Reader; var writer = channel.Writer; //You typically would need to run a dedicated thread to await and proccess //message from the channel, but we can use Task.Run to 'borrow' a thread //from the thread pool for the same purpose. The thread pool will compensate for it var worker1 = Task.Run(() => ListenToChannel(channel.Reader)); //You can uncomment these to have multiple readers listening to the //same channel. It works and it is safe. //var worker2 = Task.Run(() => ListenToChannel(channel.Reader)); //var worker3 = Task.Run(() => ListenToChannel(channel.Reader)); //var worker4 = Task.Run(() => ListenToChannel(channel.Reader)); //This will try to write lots of messages to the channel //but since the reader is slower (because of the Task.Delay) //WriteAsync will block until there is space in the channel //This technique is called Back Pressure and and can help slow //a writer from getting too far ahead. //NOTE: Unbounded channels do not block ever for (int i = 0; i < 1000; i++) await writer.WriteAsync($"Message # {i}"); //ALWAYS do this to wake up the readers and tell them you are done //If you don't they will stay awaiting 'WaitToReadAsync()' forever writer.Complete(); //[Optional] //This will wait until the readers have read all remaining messages in the //channel. This is optional bute await reader.Completion;//this is option but it can be important. //NOTE: You can also 'await Task.WhenAll(worker1, worker2, worker3, worker4);' //wait for the workers to have completed processing, not just draining the channel } private static async Task ListenToChannel(ChannelReader reader) { //because async methods use a state machine to handle awaits //it is safe to await in an infinte loop. Thank you C# compiler gods! while (await reader.WaitToReadAsync())//if this returns false the channel is completed { //as a note, if there are multiple readers but only one message, only one reader //wakes up. This prevents inefficent races. string messageString; while (reader.TryRead(out messageString))//yes, yes I know about 'out var messageString'... { Console.WriteLine($"The listener just read {messageString}!"); await Task.Delay(25);//this simulates some work... } } } } }