Skip to content

Instantly share code, notes, and snippets.

@andersstorhaug
Last active July 30, 2021 22:22
Show Gist options
  • Select an option

  • Save andersstorhaug/c36b7c0e12200c752230c64b0cdfd134 to your computer and use it in GitHub Desktop.

Select an option

Save andersstorhaug/c36b7c0e12200c752230c64b0cdfd134 to your computer and use it in GitHub Desktop.
`IAsyncEnumerable` catch-up subscription with EventStore gRPC v21.2.0 API
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using EventStore.Client;
namespace Example
{
public static class EventStoreClientEx
{
public static IAsyncEnumerable<ResolvedEvent> ReadStreamContinuousAsync(
this EventStoreClient client,
string streamName,
StreamPosition start,
bool resolveLinkTos = false,
Action<EventStoreClientOperationOptions> configureOperationOptions = null,
UserCredentials userCredentials = null,
int boundedCapacity = 1,
SubscriptionRetryOptions retryOptions = null,
CancellationToken cancellationToken = new())
{
var reader = new StreamSubscriptionReader(
client,
streamName,
new SubscriptionReaderOptions(
boundedCapacity,
resolveLinkTos,
configureOperationOptions,
userCredentials,
retryOptions));
return reader.Read(start, cancellationToken);
}
public static IAsyncEnumerable<ResolvedEvent> ReadAllContinuousAsync(
this EventStoreClient client,
Position start,
bool resolveLinkTos = false,
SubscriptionFilterOptions filterOptions = null,
Action<EventStoreClientOperationOptions> configureOperationOptions = null,
UserCredentials userCredentials = null,
int boundedCapacity = 1,
SubscriptionRetryOptions retryOptions = null,
CancellationToken cancellationToken = new())
{
var reader = new AllSubscriptionReader(
client,
filterOptions,
new SubscriptionReaderOptions(
boundedCapacity,
resolveLinkTos,
configureOperationOptions,
userCredentials,
retryOptions));
return reader.Read(start, cancellationToken);
}
}
public abstract class SubscriptionReaderBase<TPosition>
{
private readonly int _boundedCapacity;
private readonly SubscriptionRestartOptions _restartOptions;
private DateTimeOffset _started;
private TPosition _position;
protected SubscriptionReaderBase(int boundedCapacity, SubscriptionRestartOptions restartOptions)
{
_boundedCapacity = boundedCapacity;
_restartOptions = restartOptions ?? new SubscriptionRestartOptions(
MinBackoff: TimeSpan.FromSeconds(1),
MaxBackoff: TimeSpan.FromSeconds(30),
RandomFactor: 0.2);
}
public async IAsyncEnumerable<ResolvedEvent> Read(
TPosition start,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
_position = start;
for (var restartCount = 0; !cancellationToken.IsCancellationRequested; restartCount++)
{
_started = DateTimeOffset.Now;
var channel = Channel.CreateBounded<ResolvedEvent>(new BoundedChannelOptions(_boundedCapacity)
{
SingleReader = true,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});
Task EventAppeared(StreamSubscription _, ResolvedEvent resolvedEvent, CancellationToken cancellation)
{
return channel.Writer.WriteAsync(resolvedEvent, cancellation).AsTask();
}
void SubscriptionDropped(StreamSubscription _, SubscriptionDroppedReason __, Exception exception)
{
channel.Writer.TryComplete(exception);
}
var subscription = default(StreamSubscription);
try
{
subscription = await SubscribeAsync(
_position,
EventAppeared,
SubscriptionDropped,
cancellationToken);
}
catch (Exception exception)
{
channel.Writer.TryComplete(exception);
}
using var ___ = subscription;
await using var enumerator = channel.Reader
.ReadAllAsync(cancellationToken)
.GetAsyncEnumerator(cancellationToken);
while (true)
{
var hasNext = false;
var shouldRestart = false;
try
{
hasNext = await enumerator.MoveNextAsync();
}
catch (Exception exception)
when (!(exception is OperationCanceledException canceled && canceled.CancellationToken == cancellationToken))
{
if (DateTimeOffset.Now - _started > _restartOptions.MinBackoff)
restartCount = 0;
shouldRestart = true;
if (_restartOptions.RestartLimit is { } limit)
shouldRestart = restartCount < limit;
if (!shouldRestart)
throw;
}
if (shouldRestart)
{
await Task.Delay(CalculateDelay(restartCount), cancellationToken);
break;
}
if (!hasNext)
yield break;
var resolvedEvent = enumerator.Current;
yield return resolvedEvent;
_position = GetPosition(resolvedEvent);
}
}
cancellationToken.ThrowIfCancellationRequested();
}
private TimeSpan CalculateDelay(int restartCount)
{
var jitter = 1.0 + ThreadLocalRandom.Current.NextDouble() * _restartOptions.RandomFactor;
var ticks = Math.Min(
_restartOptions.MaxBackoff.Ticks,
_restartOptions.MinBackoff.Ticks * Math.Pow(2, restartCount)) * jitter;
return ticks is < 0d or >= long.MaxValue
? _restartOptions.MaxBackoff
: new TimeSpan((long) ticks);
}
protected abstract Task<StreamSubscription> SubscribeAsync(
TPosition position,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
Action<StreamSubscription, SubscriptionDroppedReason, Exception> subscriptionDropped,
CancellationToken cancellationToken);
protected abstract TPosition GetPosition(ResolvedEvent resolvedEvent);
}
public class StreamSubscriptionReader : SubscriptionReaderBase<StreamPosition>
{
private readonly EventStoreClient _client;
private readonly string _streamName;
private readonly SubscriptionReaderOptions _options;
public StreamSubscriptionReader(EventStoreClient client, string streamName, SubscriptionReaderOptions options)
: base(options.BoundedCapacity, options.RetryOptions)
{
_client = client;
_streamName = streamName;
_options = options;
}
protected override Task<StreamSubscription> SubscribeAsync(
StreamPosition position,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
Action<StreamSubscription, SubscriptionDroppedReason, Exception> subscriptionDropped,
CancellationToken cancellationToken)
{
return _client.SubscribeToStreamAsync(
_streamName,
position,
eventAppeared,
_options.ResolveLinkTos,
subscriptionDropped,
_options.ConfigureOperationOptions,
_options.UserCredentials,
cancellationToken);
}
protected override StreamPosition GetPosition(ResolvedEvent resolvedEvent) => resolvedEvent.Event.EventNumber;
}
public class AllSubscriptionReader : SubscriptionReaderBase<Position>
{
private readonly EventStoreClient _client;
private readonly SubscriptionFilterOptions _filterOptions;
private readonly SubscriptionReaderOptions _options;
public AllSubscriptionReader(EventStoreClient client, SubscriptionFilterOptions filterOptions, SubscriptionReaderOptions options)
: base(options.BoundedCapacity, options.RestartOptions)
{
_client = client;
_filterOptions = filterOptions;
_options = options;
}
protected override Task<StreamSubscription> SubscribeAsync(
Position position,
Func<StreamSubscription, ResolvedEvent, CancellationToken, Task> eventAppeared,
Action<StreamSubscription, SubscriptionDroppedReason, Exception> subscriptionDropped,
CancellationToken cancellationToken)
{
return _client.SubscribeToAllAsync(
eventAppeared,
_options.ResolveLinkTos,
subscriptionDropped,
_filterOptions,
_options.ConfigureOperationOptions,
_options.UserCredentials,
cancellationToken);
}
protected override Position GetPosition(ResolvedEvent resolvedEvent) => resolvedEvent.Event.Position;
}
public record SubscriptionReaderOptions(
int BoundedCapacity,
bool ResolveLinkTos,
Action<EventStoreClientOperationOptions> ConfigureOperationOptions,
UserCredentials UserCredentials,
SubscriptionRestartOptions RestartOptions);
public record SubscriptionRestartOptions(
TimeSpan MinBackoff,
TimeSpan MaxBackoff,
double RandomFactor,
int? RestartLimit = null);
internal static class ThreadLocalRandom
{
private static int _seed = Environment.TickCount;
private static readonly ThreadLocal<Random> Local = new(() => new Random(Interlocked.Increment(ref _seed)));
public static Random Current => Local.Value;
}
}
using System;
using System.Threading.Tasks;
using EventStore.Client;
using Example;
var connectionString = args[0];
var client = new EventStoreClient(EventStoreClientSettings.Create(connectionString));
_ = Task.Run(async () =>
{
await foreach (var resolvedEvent in client.ReadStreamContinuousAsync("example_stream", StreamPosition.Start))
{
Console.WriteLine($"{resolvedEvent.OriginalStreamId}@{resolvedEvent.OriginalEventNumber}");
}
});
_ = Task.Run(async () =>
{
var filterOptions = new SubscriptionFilterOptions(StreamFilter.Prefix("test-", "other-"));
await foreach (var resolvedEvent in client.ReadAllContinuousAsync(Position.Start, filterOptions: filterOptions))
{
Console.WriteLine($"$all@{resolvedEvent.OriginalPosition}");
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment