Skip to content

Instantly share code, notes, and snippets.

@bboyle1234
Last active August 20, 2018 09:52
Show Gist options
  • Select an option

  • Save bboyle1234/9ac5a469dbeb5ea38e7994837b008cda to your computer and use it in GitHub Desktop.

Select an option

Save bboyle1234/9ac5a469dbeb5ea38e7994837b008cda to your computer and use it in GitHub Desktop.

Revisions

  1. bboyle1234 revised this gist Aug 8, 2018. 1 changed file with 0 additions and 1 deletion.
    1 change: 0 additions & 1 deletion ReconnectingClusterClient.cs
    Original file line number Diff line number Diff line change
    @@ -95,7 +95,6 @@ async Task CreateAndConnect() {
    _client.Dispose();
    if (connectionFailCount++ < 5)
    Log.LogWarning(x, $"Failed to connect.");
    FireAndForget(Connect);
    }
    await Task.Delay(1000); // Attempt reconnection once per second.
    }
  2. bboyle1234 revised this gist Aug 8, 2018. 1 changed file with 39 additions and 29 deletions.
    68 changes: 39 additions & 29 deletions ReconnectingClusterClient.cs
    Original file line number Diff line number Diff line change
    @@ -12,6 +12,7 @@
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using static Apex.TaskUtilities.Tasks;

    namespace Apex.Grains {

    @@ -28,41 +29,40 @@ public abstract class ReconnectingClusterClient : IClusterClient {
    public Exception Error { get; private set; }

    readonly ILogger Log;

    readonly CancellationTokenSource Disposed = new CancellationTokenSource();
    // This ManualResetEvent allows multiple consumers to await the same connection event without affecting each other.
    // The trick is to make sure we manually reset it when disconnected.
    readonly AsyncManualResetEvent ConnectedEvent = new AsyncManualResetEvent(false);

    bool _isDisposed = false;
    IClusterClient _client;

    public ReconnectingClusterClient(ILoggerFactory loggerFactory) {
    // Because we're an abstract class we don't use the generic method.
    Log = loggerFactory.CreateLogger(GetType());
    Task.Run(Connect);
    FireAndForget(Connect);
    }

    public Task WaitForConnection(TimeSpan waitTime) {
    using (var cts = new CancellationTokenSource()) {
    cts.CancelAfter(waitTime);
    return WaitForConnection(cts.Token);
    public async Task WaitForConnection(TimeSpan waitTime) {
    using (var cts = new CancellationTokenSource(waitTime)) {
    await WaitForConnection(cts.Token);
    }
    }

    public Task WaitForConnection(CancellationToken cancellationToken)
    => ConnectedEvent.WaitAsync(cancellationToken);
    public async Task WaitForConnection(CancellationToken ct) {
    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(ct, Disposed.Token)) {
    await ConnectedEvent.WaitAsync(cts.Token);
    }
    }

    async Task Connect() {
    using (Log.BeginDataScopeUsingMethodName()) {
    await CreateAndConnect();
    if (_isDisposed) return; // We never managed to connect before we were disposed.
    IsConnected = true;
    ConnectedEvent.Set();
    try {
    Connected?.Invoke(this);
    } catch (Exception x) {
    Log.LogError(x, $"Exception thrown while executing the 'Connected' event handler.");
    }
    await CreateAndConnect();
    if (Disposed.IsCancellationRequested) return; // We never managed to connect before we were disposed.
    IsConnected = true;
    ConnectedEvent.Set();
    try {
    Connected?.Invoke(this);
    } catch (Exception x) {
    Log.LogError(x, $"Exception thrown while executing the 'Connected' event handler.");
    }
    }

    @@ -71,7 +71,8 @@ async Task CreateAndConnect() {
    /// In this implementation we don't use the built-in client's ability to retry connections during its first connection attempt, because
    /// we found that it messed up stream subscriptions. Therefore we now recreate the client for each connection attempt.
    int connectionFailCount = 0;
    while (!_isDisposed) {
    while (true) {
    if (Disposed.IsCancellationRequested) return;
    try {
    var builder = new ClientBuilder();
    Configure(builder);
    @@ -83,14 +84,18 @@ async Task CreateAndConnect() {
    throw;
    }
    try {
    await _client.Connect(); // This overload does not allow the client to reattempt connection.
    /// This overload does not allow the client to reattempt connection if it fails the first time.
    /// Tests showed that stream subscriptions didn't work so well when re-attempted connections were made,
    /// so it was decided that on failed connection attempt, the client would be disposed, and a new one created.
    await _client.Connect();
    Log.LogInformation("Connected.");
    return;
    } catch (Exception x) {
    Error = x;
    _client.Dispose();
    if (connectionFailCount++ == 0)
    if (connectionFailCount++ < 5)
    Log.LogWarning(x, $"Failed to connect.");
    FireAndForget(Connect);
    }
    await Task.Delay(1000); // Attempt reconnection once per second.
    }
    @@ -99,17 +104,22 @@ async Task CreateAndConnect() {
    protected abstract void Configure(ClientBuilder builder);

    void OnDisconnected(object sender, EventArgs e) {
    IsConnected = false;
    ConnectedEvent.Reset();
    _client?.Dispose();
    IsConnected = false;
    Disconnected?.Invoke(this);
    if (_isDisposed) return;
    Task.Run(Connect);
    if (Disposed.IsCancellationRequested) return;
    try {
    Disconnected?.Invoke(this);
    } catch (Exception x) {
    Log.LogError(x, $"Exception thrown while executing the 'Disconnected' event handler.");
    }
    FireAndForget(Connect);
    }

    public void Dispose() {
    _isDisposed = true;
    _client?.Dispose();
    Disposed.Cancel();
    _client?.Close(); // required for graceful (informing silo) disconnection
    _client?.Dispose(); // does not inform silo
    }

    #region IClusterClient
    @@ -145,4 +155,4 @@ public void BindGrainReference(IAddressable grain)

    #endregion
    }
    }
    }
  3. bboyle1234 revised this gist May 10, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion ReconnectingClusterClient.cs
    Original file line number Diff line number Diff line change
    @@ -115,7 +115,7 @@ public void Dispose() {
    #region IClusterClient

    // There is potential for null reference / object disposed exceptions here if these properties/methods are called while the _client is not properly connected.
    // For code simplicity, and because the consumers "know" about it, I've decided to add handling for that scenario.
    // For code simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario.

    public bool IsInitialized => _client.IsInitialized;
    public IServiceProvider ServiceProvider => _client.ServiceProvider;
  4. bboyle1234 revised this gist May 10, 2018. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions ReconnectingClusterClient.cs
    Original file line number Diff line number Diff line change
    @@ -114,8 +114,8 @@ public void Dispose() {

    #region IClusterClient

    // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called.
    // For simplicity, and because the consumers "know" about it, I've decided to add handling for that scenario.
    // There is potential for null reference / object disposed exceptions here if these properties/methods are called while the _client is not properly connected.
    // For code simplicity, and because the consumers "know" about it, I've decided to add handling for that scenario.

    public bool IsInitialized => _client.IsInitialized;
    public IServiceProvider ServiceProvider => _client.ServiceProvider;
  5. bboyle1234 revised this gist May 10, 2018. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions ReconnectingClusterClient.cs
    Original file line number Diff line number Diff line change
    @@ -88,6 +88,7 @@ async Task CreateAndConnect() {
    return;
    } catch (Exception x) {
    Error = x;
    _client.Dispose();
    if (connectionFailCount++ == 0)
    Log.LogWarning(x, $"Failed to connect.");
    }
  6. bboyle1234 revised this gist May 10, 2018. 1 changed file with 65 additions and 41 deletions.
    106 changes: 65 additions & 41 deletions ReconnectingClusterClient.cs
    Original file line number Diff line number Diff line change
    @@ -1,81 +1,106 @@
    using Apex.LoggingUtils;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.DependencyInjection.Extensions;
    using Microsoft.Extensions.Logging;
    using Nito.AsyncEx;
    using Orleans;
    using Orleans.Runtime;
    using Orleans.Streams;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    namespace Orleans {
    namespace Apex.Grains {

    /// <summary>
    /// Class recreates and connects a cluster client after a disconnection, raising events to inform consumers.
    /// An example of this might be the interval while a cluster being restarted or upgraded.
    /// </summary>
    public class ReconnectingClusterClient : IClusterClient {
    public abstract class ReconnectingClusterClient : IClusterClient {

    public event Action<ReconnectingClusterClient> Connected;
    public event Action<ReconnectingClusterClient> Disconnected;

    public bool IsConnected { get; set; }
    public Exception Error { get; private set; }

    readonly ILogger Log;
    readonly Func<ClientBuilder, Task> Configure;

    bool _isStarted = false;
    // This ManualResetEvent allows multiple consumers to await the same connection event without affecting each other.
    // The trick is to make sure we manually reset it when disconnected.
    readonly AsyncManualResetEvent ConnectedEvent = new AsyncManualResetEvent(false);

    bool _isDisposed = false;
    IClusterClient _client;

    public ReconnectingClusterClient(ILoggerFactory loggerFactory, Func<ClientBuilder, Task> configure) {
    Log = loggerFactory.CreateLogger<ReconnectingClusterClient>();
    Configure = configure;
    public ReconnectingClusterClient(ILoggerFactory loggerFactory) {
    // Because we're an abstract class we don't use the generic method.
    Log = loggerFactory.CreateLogger(GetType());
    Task.Run(Connect);
    }

    /// <summary>
    /// The Start method is provided so the consuming code has time to attache event handlers before
    /// we start attempting to connect.
    /// </summary>
    public void Start() {
    if (_isStarted) throw new InvalidOperationException("Can only be started once.");
    _isStarted = true;
    Task.Run(Connect);
    public Task WaitForConnection(TimeSpan waitTime) {
    using (var cts = new CancellationTokenSource()) {
    cts.CancelAfter(waitTime);
    return WaitForConnection(cts.Token);
    }
    }

    public Task WaitForConnection(CancellationToken cancellationToken)
    => ConnectedEvent.WaitAsync(cancellationToken);

    async Task Connect() {
    using (Log.BeginDataScopeUsingMethodName()) { // My custom extension method for creating a log scope.
    using (Log.BeginDataScopeUsingMethodName()) {
    await CreateAndConnect();
    if (_isDisposed) return; // We never managed to connect before we were disposed.
    IsConnected = true;
    ConnectedEvent.Set();
    try {
    Connected?.Invoke(this);
    } catch (Exception x) {
    Log.LogError(x, $"Exception thrown while executing the 'Connected' event handler.");
    }
    }
    }

    async Task CreateAndConnect() {
    /// The orleans client has the ability to keep retrying connections before its first connection. (It won't retry after a disconnection)
    /// In this implementation we don't use the built-in client's ability to retry connections during its first connection attempt, because
    /// we found that it messed up stream subscriptions. Therefore we now recreate the client for each connection attempt.
    int connectionFailCount = 0;
    while (!_isDisposed) {
    try {
    int connectionFailCount = 0;
    var builder = new ClientBuilder();
    // Allow consumers to setup the client builder.
    await Configure(builder);
    _client = builder.Build();
    // Add a client disconnection handler.
    Configure(builder);
    builder.AddClusterConnectionLostHandler(OnDisconnected);
    // This code blocks until a connection is made or we are disposed.
    await _client.Connect(x => {
    Error = x;
    /// Only log the connection failure the first time it happens.
    if (connectionFailCount++ == 0) Log.LogWarning(x, $"Failed to connect.");
    // Keep automatically trying to reconnect until disposed.
    return Task.FromResult(!_isDisposed);
    });
    if (_isDisposed) return; // We never managed to connect before we were disposed.
    /// Inform consumers that a connection has been established.
    /// Consumers will now re-subscribe streams and begin calling grain methods again.
    Log.LogInformation("Connected.");
    Connected?.Invoke(this);
    _client = builder.Build();
    } catch (Exception x) {
    /// Log the exception before throwing it, which will crash the application.
    /// (We actually don't expect any exception to be thrown in this method unless it is in the Configure(builder) method or the Connected event handler.)
    Log.LogError(x, $"Exception thrown while connecting.");
    Error = x;
    Log.LogError(x, $"Exception thrown while configuring client.");
    throw;
    }
    try {
    await _client.Connect(); // This overload does not allow the client to reattempt connection.
    Log.LogInformation("Connected.");
    return;
    } catch (Exception x) {
    Error = x;
    if (connectionFailCount++ == 0)
    Log.LogWarning(x, $"Failed to connect.");
    }
    await Task.Delay(1000); // Attempt reconnection once per second.
    }
    }

    protected abstract void Configure(ClientBuilder builder);

    void OnDisconnected(object sender, EventArgs e) {
    _client.Dispose();
    ConnectedEvent.Reset();
    _client?.Dispose();
    IsConnected = false;
    Disconnected?.Invoke(this);
    if (_isDisposed) return;
    Task.Run(Connect);
    @@ -89,14 +114,13 @@ public void Dispose() {
    #region IClusterClient

    // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called.
    // An exception could also be thrown if the _client is not in a connected state.
    // For simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario.
    // For simplicity, and because the consumers "know" about it, I've decided to add handling for that scenario.

    public bool IsInitialized => _client.IsInitialized;
    public IServiceProvider ServiceProvider => _client.ServiceProvider;

    // uhhh --- yep, code smell, I know.
    public Task Connect(Func<Exception, Task<bool>> retryFilter = null) => throw new NotImplementedException("Use Start() instead.");
    public Task Connect(Func<Exception, Task<bool>> retryFilter = null) => throw new NotImplementedException("This client starts connection attempts immediately. Try using 'WaitForConnection()' instead.");
    public Task Close() => throw new NotImplementedException("Use Dispose() instead.");
    public void Abort() => throw new NotImplementedException("Use Dispose() instead.");

  7. bboyle1234 revised this gist Apr 18, 2018. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions ReconnectingClusterClient.cs
    Original file line number Diff line number Diff line change
    @@ -49,6 +49,7 @@ async Task Connect() {
    var builder = new ClientBuilder();
    // Allow consumers to setup the client builder.
    await Configure(builder);
    _client = builder.Build();
    // Add a client disconnection handler.
    builder.AddClusterConnectionLostHandler(OnDisconnected);
    // This code blocks until a connection is made or we are disposed.
  8. bboyle1234 revised this gist Apr 18, 2018. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions ReconnectingClusterClient.cs
    Original file line number Diff line number Diff line change
    @@ -88,6 +88,7 @@ public void Dispose() {
    #region IClusterClient

    // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called.
    // An exception could also be thrown if the _client is not in a connected state.
    // For simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario.

    public bool IsInitialized => _client.IsInitialized;
  9. bboyle1234 revised this gist Apr 18, 2018. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions ReconnectingClusterClient.cs
    Original file line number Diff line number Diff line change
    @@ -74,7 +74,7 @@ await _client.Connect(x => {
    }

    void OnDisconnected(object sender, EventArgs e) {
    _client?.Dispose();
    _client.Dispose();
    Disconnected?.Invoke(this);
    if (_isDisposed) return;
    Task.Run(Connect);
    @@ -88,7 +88,7 @@ public void Dispose() {
    #region IClusterClient

    // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called.
    // For simplicity, and because the consumers "know" about it, I've decided to add handling for that scenario.
    // For simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario.

    public bool IsInitialized => _client.IsInitialized;
    public IServiceProvider ServiceProvider => _client.ServiceProvider;
  10. bboyle1234 created this gist Apr 18, 2018.
    121 changes: 121 additions & 0 deletions ReconnectingClusterClient.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,121 @@
    using Microsoft.Extensions.Logging;
    using Orleans;
    using Orleans.Runtime;
    using Orleans.Streams;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;

    namespace Orleans {

    /// <summary>
    /// Class recreates and connects a cluster client after a disconnection, raising events to inform consumers.
    /// An example of this might be the interval while a cluster being restarted or upgraded.
    /// </summary>
    public class ReconnectingClusterClient : IClusterClient {

    public event Action<ReconnectingClusterClient> Connected;
    public event Action<ReconnectingClusterClient> Disconnected;

    public Exception Error { get; private set; }

    readonly ILogger Log;
    readonly Func<ClientBuilder, Task> Configure;

    bool _isStarted = false;
    bool _isDisposed = false;
    IClusterClient _client;

    public ReconnectingClusterClient(ILoggerFactory loggerFactory, Func<ClientBuilder, Task> configure) {
    Log = loggerFactory.CreateLogger<ReconnectingClusterClient>();
    Configure = configure;
    }

    /// <summary>
    /// The Start method is provided so the consuming code has time to attache event handlers before
    /// we start attempting to connect.
    /// </summary>
    public void Start() {
    if (_isStarted) throw new InvalidOperationException("Can only be started once.");
    _isStarted = true;
    Task.Run(Connect);
    }

    async Task Connect() {
    using (Log.BeginDataScopeUsingMethodName()) { // My custom extension method for creating a log scope.
    try {
    int connectionFailCount = 0;
    var builder = new ClientBuilder();
    // Allow consumers to setup the client builder.
    await Configure(builder);
    // Add a client disconnection handler.
    builder.AddClusterConnectionLostHandler(OnDisconnected);
    // This code blocks until a connection is made or we are disposed.
    await _client.Connect(x => {
    Error = x;
    /// Only log the connection failure the first time it happens.
    if (connectionFailCount++ == 0) Log.LogWarning(x, $"Failed to connect.");
    // Keep automatically trying to reconnect until disposed.
    return Task.FromResult(!_isDisposed);
    });
    if (_isDisposed) return; // We never managed to connect before we were disposed.
    /// Inform consumers that a connection has been established.
    /// Consumers will now re-subscribe streams and begin calling grain methods again.
    Log.LogInformation("Connected.");
    Connected?.Invoke(this);
    } catch (Exception x) {
    /// Log the exception before throwing it, which will crash the application.
    /// (We actually don't expect any exception to be thrown in this method unless it is in the Configure(builder) method or the Connected event handler.)
    Log.LogError(x, $"Exception thrown while connecting.");
    throw;
    }
    }
    }

    void OnDisconnected(object sender, EventArgs e) {
    _client?.Dispose();
    Disconnected?.Invoke(this);
    if (_isDisposed) return;
    Task.Run(Connect);
    }

    public void Dispose() {
    _isDisposed = true;
    _client?.Dispose();
    }

    #region IClusterClient

    // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called.
    // For simplicity, and because the consumers "know" about it, I've decided to add handling for that scenario.

    public bool IsInitialized => _client.IsInitialized;
    public IServiceProvider ServiceProvider => _client.ServiceProvider;

    // uhhh --- yep, code smell, I know.
    public Task Connect(Func<Exception, Task<bool>> retryFilter = null) => throw new NotImplementedException("Use Start() instead.");
    public Task Close() => throw new NotImplementedException("Use Dispose() instead.");
    public void Abort() => throw new NotImplementedException("Use Dispose() instead.");

    public IStreamProvider GetStreamProvider(string name) => _client.GetStreamProvider(name);
    public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey
    => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
    public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey
    => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
    public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey
    => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
    public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey
    => _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix);
    public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey
    => _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix);
    public Task<TGrainObserverInterface> CreateObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
    => _client.CreateObjectReference<TGrainObserverInterface>(obj);
    public Task DeleteObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
    => _client.DeleteObjectReference<TGrainObserverInterface>(obj);
    public void BindGrainReference(IAddressable grain)
    => _client.BindGrainReference(grain);

    #endregion
    }
    }