Last active
July 30, 2021 22:22
-
-
Save andersstorhaug/c36b7c0e12200c752230c64b0cdfd134 to your computer and use it in GitHub Desktop.
`IAsyncEnumerable` catch-up subscription with EventStore gRPC v21.2.0 API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using System.Runtime.CompilerServices; | |
| using System.Threading; | |
| using System.Threading.Channels; | |
| using System.Threading.Tasks; | |
| using EventStore.Client; | |
| namespace Example | |
| { | |
| public static class EventStoreClientEx | |
| { | |
| public static IAsyncEnumerable<ResolvedEvent> ReadStreamContinuousAsync( | |
| this EventStoreClient client, | |
| string streamName, | |
| StreamPosition start, | |
| bool resolveLinkTos = false, | |
| Action<EventStoreClientOperationOptions> configureOperationOptions = null, | |
| UserCredentials userCredentials = null, | |
| int boundedCapacity = 1, | |
| SubscriptionRetryOptions retryOptions = null, | |
| CancellationToken cancellationToken = new()) | |
| { | |
| var reader = new StreamSubscriptionReader( | |
| client, | |
| streamName, | |
| new SubscriptionReaderOptions( | |
| boundedCapacity, | |
| resolveLinkTos, | |
| configureOperationOptions, | |
| userCredentials, | |
| retryOptions)); | |
| return reader.Read(start, cancellationToken); | |
| } | |
| public static IAsyncEnumerable<ResolvedEvent> ReadAllContinuousAsync( | |
| this EventStoreClient client, | |
| Position start, | |
| bool resolveLinkTos = false, | |
| SubscriptionFilterOptions filterOptions = null, | |
| Action<EventStoreClientOperationOptions> configureOperationOptions = null, | |
| UserCredentials userCredentials = null, | |
| int boundedCapacity = 1, | |
| SubscriptionRetryOptions retryOptions = null, | |
| CancellationToken cancellationToken = new()) | |
| { | |
| var reader = new AllSubscriptionReader( | |
| client, | |
| filterOptions, | |
| new SubscriptionReaderOptions( | |
| boundedCapacity, | |
| resolveLinkTos, | |
| configureOperationOptions, | |
| userCredentials, | |
| retryOptions)); | |
| return reader.Read(start, cancellationToken); | |
| } | |
| } | |
| public abstract class SubscriptionReaderBase<TPosition> | |
| { | |
| private readonly int _boundedCapacity; | |
| private readonly SubscriptionRestartOptions _restartOptions; | |
| private DateTimeOffset _started; | |
| private TPosition _position; | |
| protected SubscriptionReaderBase(int boundedCapacity, SubscriptionRestartOptions restartOptions) | |
| { | |
| _boundedCapacity = boundedCapacity; | |
| _restartOptions = restartOptions ?? new SubscriptionRestartOptions( | |
| MinBackoff: TimeSpan.FromSeconds(1), | |
| MaxBackoff: TimeSpan.FromSeconds(30), | |
| RandomFactor: 0.2); | |
| } | |
| public async IAsyncEnumerable<ResolvedEvent> Read( | |
| TPosition start, | |
| [EnumeratorCancellation] CancellationToken cancellationToken) | |
| { | |
| _position = start; | |
| for (var restartCount = 0; !cancellationToken.IsCancellationRequested; restartCount++) | |
| { | |
| _started = DateTimeOffset.Now; | |
| var channel = Channel.CreateBounded<ResolvedEvent>(new BoundedChannelOptions(_boundedCapacity) | |
| { | |
| SingleReader = true, | |
| SingleWriter = true, | |
| FullMode = BoundedChannelFullMode.Wait | |
| }); | |
| Task EventAppeared(StreamSubscription _, ResolvedEvent resolvedEvent, CancellationToken cancellation) | |
| { | |
| return channel.Writer.WriteAsync(resolvedEvent, cancellation).AsTask(); | |
| } | |
| void SubscriptionDropped(StreamSubscription _, SubscriptionDroppedReason __, Exception exception) | |
| { | |
| channel.Writer.TryComplete(exception); | |
| } | |
| var subscription = default(StreamSubscription); | |
| try | |
| { | |
| subscription = await SubscribeAsync( | |
| _position, | |
| EventAppeared, | |
| SubscriptionDropped, | |
| cancellationToken); | |
| } | |
| catch (Exception exception) | |
| { | |
| channel.Writer.TryComplete(exception); | |
| } | |
| using var ___ = subscription; | |
| await using var enumerator = channel.Reader | |
| .ReadAllAsync(cancellationToken) | |
| .GetAsyncEnumerator(cancellationToken); | |
| while (true) | |
| { | |
| var hasNext = false; | |
| var shouldRestart = false; | |
| try | |
| { | |
| hasNext = await enumerator.MoveNextAsync(); | |
| } | |
| catch (Exception exception) | |
| when (!(exception is OperationCanceledException canceled && canceled.CancellationToken == cancellationToken)) | |
| { | |
| if (DateTimeOffset.Now - _started > _restartOptions.MinBackoff) | |
| restartCount = 0; | |
| shouldRestart = true; | |
| if (_restartOptions.RestartLimit is { } limit) | |
| shouldRestart = restartCount < limit; | |
| if (!shouldRestart) | |
| throw; | |
| } | |
| if (shouldRestart) | |
| { | |
| await Task.Delay(CalculateDelay(restartCount), cancellationToken); | |
| break; | |
| } | |
| if (!hasNext) | |
| yield break; | |
| var resolvedEvent = enumerator.Current; | |
| yield return resolvedEvent; | |
| _position = GetPosition(resolvedEvent); | |
| } | |
| } | |
| cancellationToken.ThrowIfCancellationRequested(); | |
| } | |
| private TimeSpan CalculateDelay(int restartCount) | |
| { | |
| var jitter = 1.0 + ThreadLocalRandom.Current.NextDouble() * _restartOptions.RandomFactor; | |
| var ticks = Math.Min( | |
| _restartOptions.MaxBackoff.Ticks, | |
| _restartOptions.MinBackoff.Ticks * Math.Pow(2, restartCount)) * jitter; | |
| return ticks is < 0d or >= long.MaxValue | |
| ? _restartOptions.MaxBackoff | |
| : new TimeSpan((long) ticks); | |
| } | |
| protected abstract Task<StreamSubscription> SubscribeAsync( | |
| TPosition position, | |
| Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared, | |
| Action<StreamSubscription, SubscriptionDroppedReason, Exception> subscriptionDropped, | |
| CancellationToken cancellationToken); | |
| protected abstract TPosition GetPosition(ResolvedEvent resolvedEvent); | |
| } | |
| public class StreamSubscriptionReader : SubscriptionReaderBase<StreamPosition> | |
| { | |
| private readonly EventStoreClient _client; | |
| private readonly string _streamName; | |
| private readonly SubscriptionReaderOptions _options; | |
| public StreamSubscriptionReader(EventStoreClient client, string streamName, SubscriptionReaderOptions options) | |
| : base(options.BoundedCapacity, options.RetryOptions) | |
| { | |
| _client = client; | |
| _streamName = streamName; | |
| _options = options; | |
| } | |
| protected override Task<StreamSubscription> SubscribeAsync( | |
| StreamPosition position, | |
| Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared, | |
| Action<StreamSubscription, SubscriptionDroppedReason, Exception> subscriptionDropped, | |
| CancellationToken cancellationToken) | |
| { | |
| return _client.SubscribeToStreamAsync( | |
| _streamName, | |
| position, | |
| eventAppeared, | |
| _options.ResolveLinkTos, | |
| subscriptionDropped, | |
| _options.ConfigureOperationOptions, | |
| _options.UserCredentials, | |
| cancellationToken); | |
| } | |
| protected override StreamPosition GetPosition(ResolvedEvent resolvedEvent) => resolvedEvent.Event.EventNumber; | |
| } | |
| public class AllSubscriptionReader : SubscriptionReaderBase<Position> | |
| { | |
| private readonly EventStoreClient _client; | |
| private readonly SubscriptionFilterOptions _filterOptions; | |
| private readonly SubscriptionReaderOptions _options; | |
| public AllSubscriptionReader(EventStoreClient client, SubscriptionFilterOptions filterOptions, SubscriptionReaderOptions options) | |
| : base(options.BoundedCapacity, options.RestartOptions) | |
| { | |
| _client = client; | |
| _filterOptions = filterOptions; | |
| _options = options; | |
| } | |
| protected override Task<StreamSubscription> SubscribeAsync( | |
| Position position, | |
| Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared, | |
| Action<StreamSubscription, SubscriptionDroppedReason, Exception> subscriptionDropped, | |
| CancellationToken cancellationToken) | |
| { | |
| return _client.SubscribeToAllAsync( | |
| eventAppeared, | |
| _options.ResolveLinkTos, | |
| subscriptionDropped, | |
| _filterOptions, | |
| _options.ConfigureOperationOptions, | |
| _options.UserCredentials, | |
| cancellationToken); | |
| } | |
| protected override Position GetPosition(ResolvedEvent resolvedEvent) => resolvedEvent.Event.Position; | |
| } | |
| public record SubscriptionReaderOptions( | |
| int BoundedCapacity, | |
| bool ResolveLinkTos, | |
| Action<EventStoreClientOperationOptions> ConfigureOperationOptions, | |
| UserCredentials UserCredentials, | |
| SubscriptionRestartOptions RestartOptions); | |
| public record SubscriptionRestartOptions( | |
| TimeSpan MinBackoff, | |
| TimeSpan MaxBackoff, | |
| double RandomFactor, | |
| int? RestartLimit = null); | |
| internal static class ThreadLocalRandom | |
| { | |
| private static int _seed = Environment.TickCount; | |
| private static readonly ThreadLocal<Random> Local = new(() => new Random(Interlocked.Increment(ref _seed))); | |
| public static Random Current => Local.Value; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Threading.Tasks; | |
| using EventStore.Client; | |
| using Example; | |
| var connectionString = args[0]; | |
| var client = new EventStoreClient(EventStoreClientSettings.Create(connectionString)); | |
| _ = Task.Run(async () => | |
| { | |
| await foreach (var resolvedEvent in client.ReadStreamContinuousAsync("example_stream", StreamPosition.Start)) | |
| { | |
| Console.WriteLine($"{resolvedEvent.OriginalStreamId}@{resolvedEvent.OriginalEventNumber}"); | |
| } | |
| }); | |
| _ = Task.Run(async () => | |
| { | |
| var filterOptions = new SubscriptionFilterOptions(StreamFilter.Prefix("test-", "other-")); | |
| await foreach (var resolvedEvent in client.ReadAllContinuousAsync(Position.Start, filterOptions: filterOptions)) | |
| { | |
| Console.WriteLine($"$all@{resolvedEvent.OriginalPosition}"); | |
| } | |
| }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment