Last active
August 20, 2018 09:52
-
-
Save bboyle1234/9ac5a469dbeb5ea38e7994837b008cda to your computer and use it in GitHub Desktop.
Revisions
-
bboyle1234 revised this gist
Aug 8, 2018 . 1 changed file with 0 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -95,7 +95,6 @@ async Task CreateAndConnect() { _client.Dispose(); if (connectionFailCount++ < 5) Log.LogWarning(x, $"Failed to connect."); } await Task.Delay(1000); // Attempt reconnection once per second. } -
bboyle1234 revised this gist
Aug 8, 2018 . 1 changed file with 39 additions and 29 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -12,6 +12,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using static Apex.TaskUtilities.Tasks; namespace Apex.Grains { @@ -28,41 +29,40 @@ public abstract class ReconnectingClusterClient : IClusterClient { public Exception Error { get; private set; } readonly ILogger Log; readonly CancellationTokenSource Disposed = new CancellationTokenSource(); // This ManualResetEvent allows multiple consumers to await the same connection event without affecting each other. // The trick is to make sure we manually reset it when disconnected. readonly AsyncManualResetEvent ConnectedEvent = new AsyncManualResetEvent(false); IClusterClient _client; public ReconnectingClusterClient(ILoggerFactory loggerFactory) { // Because we're an abstract class we don't use the generic method. Log = loggerFactory.CreateLogger(GetType()); FireAndForget(Connect); } public async Task WaitForConnection(TimeSpan waitTime) { using (var cts = new CancellationTokenSource(waitTime)) { await WaitForConnection(cts.Token); } } public async Task WaitForConnection(CancellationToken ct) { using (var cts = CancellationTokenSource.CreateLinkedTokenSource(ct, Disposed.Token)) { await ConnectedEvent.WaitAsync(cts.Token); } } async Task Connect() { await CreateAndConnect(); if (Disposed.IsCancellationRequested) return; // We never managed to connect before we were disposed. IsConnected = true; ConnectedEvent.Set(); try { Connected?.Invoke(this); } catch (Exception x) { Log.LogError(x, $"Exception thrown while executing the 'Connected' event handler."); } } @@ -71,7 +71,8 @@ async Task CreateAndConnect() { /// In this implementation we don't use the built-in client's ability to retry connections during its first connection attempt, because /// we found that it messed up stream subscriptions. Therefore we now recreate the client for each connection attempt. int connectionFailCount = 0; while (true) { if (Disposed.IsCancellationRequested) return; try { var builder = new ClientBuilder(); Configure(builder); @@ -83,14 +84,18 @@ async Task CreateAndConnect() { throw; } try { /// This overload does not allow the client to reattempt connection if it fails the first time. /// Tests showed that stream subscriptions didn't work so well when re-attempted connections were made, /// so it was decided that on failed connection attempt, the client would be disposed, and a new one created. await _client.Connect(); Log.LogInformation("Connected."); return; } catch (Exception x) { Error = x; _client.Dispose(); if (connectionFailCount++ < 5) Log.LogWarning(x, $"Failed to connect."); FireAndForget(Connect); } await Task.Delay(1000); // Attempt reconnection once per second. } @@ -99,17 +104,22 @@ async Task CreateAndConnect() { protected abstract void Configure(ClientBuilder builder); void OnDisconnected(object sender, EventArgs e) { IsConnected = false; ConnectedEvent.Reset(); _client?.Dispose(); if (Disposed.IsCancellationRequested) return; try { Disconnected?.Invoke(this); } catch (Exception x) { Log.LogError(x, $"Exception thrown while executing the 'Disconnected' event handler."); } FireAndForget(Connect); } public void Dispose() { Disposed.Cancel(); _client?.Close(); // required for graceful (informing silo) disconnection _client?.Dispose(); // does not inform silo } #region IClusterClient @@ -145,4 +155,4 @@ public void BindGrainReference(IAddressable grain) #endregion } } -
bboyle1234 revised this gist
May 10, 2018 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -115,7 +115,7 @@ public void Dispose() { #region IClusterClient // There is potential for null reference / object disposed exceptions here if these properties/methods are called while the _client is not properly connected. // For code simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario. public bool IsInitialized => _client.IsInitialized; public IServiceProvider ServiceProvider => _client.ServiceProvider; -
bboyle1234 revised this gist
May 10, 2018 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -114,8 +114,8 @@ public void Dispose() { #region IClusterClient // There is potential for null reference / object disposed exceptions here if these properties/methods are called while the _client is not properly connected. // For code simplicity, and because the consumers "know" about it, I've decided to add handling for that scenario. public bool IsInitialized => _client.IsInitialized; public IServiceProvider ServiceProvider => _client.ServiceProvider; -
bboyle1234 revised this gist
May 10, 2018 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -88,6 +88,7 @@ async Task CreateAndConnect() { return; } catch (Exception x) { Error = x; _client.Dispose(); if (connectionFailCount++ == 0) Log.LogWarning(x, $"Failed to connect."); } -
bboyle1234 revised this gist
May 10, 2018 . 1 changed file with 65 additions and 41 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,81 +1,106 @@ using Apex.LoggingUtils; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; using Nito.AsyncEx; using Orleans; using Orleans.Runtime; using Orleans.Streams; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Apex.Grains { /// <summary> /// Class recreates and connects a cluster client after a disconnection, raising events to inform consumers. /// An example of this might be the interval while a cluster being restarted or upgraded. /// </summary> public abstract class ReconnectingClusterClient : IClusterClient { public event Action<ReconnectingClusterClient> Connected; public event Action<ReconnectingClusterClient> Disconnected; public bool IsConnected { get; set; } public Exception Error { get; private set; } readonly ILogger Log; // This ManualResetEvent allows multiple consumers to await the same connection event without affecting each other. // The trick is to make sure we manually reset it when disconnected. readonly AsyncManualResetEvent ConnectedEvent = new AsyncManualResetEvent(false); bool _isDisposed = false; IClusterClient _client; public ReconnectingClusterClient(ILoggerFactory loggerFactory) { // Because we're an abstract class we don't use the generic method. Log = loggerFactory.CreateLogger(GetType()); Task.Run(Connect); } public Task WaitForConnection(TimeSpan waitTime) { using (var cts = new CancellationTokenSource()) { cts.CancelAfter(waitTime); return WaitForConnection(cts.Token); } } public Task WaitForConnection(CancellationToken cancellationToken) => ConnectedEvent.WaitAsync(cancellationToken); async Task Connect() { using (Log.BeginDataScopeUsingMethodName()) { await CreateAndConnect(); if (_isDisposed) return; // We never managed to connect before we were disposed. IsConnected = true; ConnectedEvent.Set(); try { Connected?.Invoke(this); } catch (Exception x) { Log.LogError(x, $"Exception thrown while executing the 'Connected' event handler."); } } } async Task CreateAndConnect() { /// The orleans client has the ability to keep retrying connections before its first connection. (It won't retry after a disconnection) /// In this implementation we don't use the built-in client's ability to retry connections during its first connection attempt, because /// we found that it messed up stream subscriptions. Therefore we now recreate the client for each connection attempt. int connectionFailCount = 0; while (!_isDisposed) { try { var builder = new ClientBuilder(); Configure(builder); builder.AddClusterConnectionLostHandler(OnDisconnected); _client = builder.Build(); } catch (Exception x) { Error = x; Log.LogError(x, $"Exception thrown while configuring client."); throw; } try { await _client.Connect(); // This overload does not allow the client to reattempt connection. Log.LogInformation("Connected."); return; } catch (Exception x) { Error = x; if (connectionFailCount++ == 0) Log.LogWarning(x, $"Failed to connect."); } await Task.Delay(1000); // Attempt reconnection once per second. } } protected abstract void Configure(ClientBuilder builder); void OnDisconnected(object sender, EventArgs e) { ConnectedEvent.Reset(); _client?.Dispose(); IsConnected = false; Disconnected?.Invoke(this); if (_isDisposed) return; Task.Run(Connect); @@ -89,14 +114,13 @@ public void Dispose() { #region IClusterClient // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called. // For simplicity, and because the consumers "know" about it, I've decided to add handling for that scenario. public bool IsInitialized => _client.IsInitialized; public IServiceProvider ServiceProvider => _client.ServiceProvider; // uhhh --- yep, code smell, I know. public Task Connect(Func<Exception, Task<bool>> retryFilter = null) => throw new NotImplementedException("This client starts connection attempts immediately. Try using 'WaitForConnection()' instead."); public Task Close() => throw new NotImplementedException("Use Dispose() instead."); public void Abort() => throw new NotImplementedException("Use Dispose() instead."); -
bboyle1234 revised this gist
Apr 18, 2018 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -49,6 +49,7 @@ async Task Connect() { var builder = new ClientBuilder(); // Allow consumers to setup the client builder. await Configure(builder); _client = builder.Build(); // Add a client disconnection handler. builder.AddClusterConnectionLostHandler(OnDisconnected); // This code blocks until a connection is made or we are disposed. -
bboyle1234 revised this gist
Apr 18, 2018 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -88,6 +88,7 @@ public void Dispose() { #region IClusterClient // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called. // An exception could also be thrown if the _client is not in a connected state. // For simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario. public bool IsInitialized => _client.IsInitialized; -
bboyle1234 revised this gist
Apr 18, 2018 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -74,7 +74,7 @@ await _client.Connect(x => { } void OnDisconnected(object sender, EventArgs e) { _client.Dispose(); Disconnected?.Invoke(this); if (_isDisposed) return; Task.Run(Connect); @@ -88,7 +88,7 @@ public void Dispose() { #region IClusterClient // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called. // For simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario. public bool IsInitialized => _client.IsInitialized; public IServiceProvider ServiceProvider => _client.ServiceProvider; -
bboyle1234 created this gist
Apr 18, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,121 @@ using Microsoft.Extensions.Logging; using Orleans; using Orleans.Runtime; using Orleans.Streams; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace Orleans { /// <summary> /// Class recreates and connects a cluster client after a disconnection, raising events to inform consumers. /// An example of this might be the interval while a cluster being restarted or upgraded. /// </summary> public class ReconnectingClusterClient : IClusterClient { public event Action<ReconnectingClusterClient> Connected; public event Action<ReconnectingClusterClient> Disconnected; public Exception Error { get; private set; } readonly ILogger Log; readonly Func<ClientBuilder, Task> Configure; bool _isStarted = false; bool _isDisposed = false; IClusterClient _client; public ReconnectingClusterClient(ILoggerFactory loggerFactory, Func<ClientBuilder, Task> configure) { Log = loggerFactory.CreateLogger<ReconnectingClusterClient>(); Configure = configure; } /// <summary> /// The Start method is provided so the consuming code has time to attache event handlers before /// we start attempting to connect. /// </summary> public void Start() { if (_isStarted) throw new InvalidOperationException("Can only be started once."); _isStarted = true; Task.Run(Connect); } async Task Connect() { using (Log.BeginDataScopeUsingMethodName()) { // My custom extension method for creating a log scope. try { int connectionFailCount = 0; var builder = new ClientBuilder(); // Allow consumers to setup the client builder. await Configure(builder); // Add a client disconnection handler. builder.AddClusterConnectionLostHandler(OnDisconnected); // This code blocks until a connection is made or we are disposed. await _client.Connect(x => { Error = x; /// Only log the connection failure the first time it happens. if (connectionFailCount++ == 0) Log.LogWarning(x, $"Failed to connect."); // Keep automatically trying to reconnect until disposed. return Task.FromResult(!_isDisposed); }); if (_isDisposed) return; // We never managed to connect before we were disposed. /// Inform consumers that a connection has been established. /// Consumers will now re-subscribe streams and begin calling grain methods again. Log.LogInformation("Connected."); Connected?.Invoke(this); } catch (Exception x) { /// Log the exception before throwing it, which will crash the application. /// (We actually don't expect any exception to be thrown in this method unless it is in the Configure(builder) method or the Connected event handler.) Log.LogError(x, $"Exception thrown while connecting."); throw; } } } void OnDisconnected(object sender, EventArgs e) { _client?.Dispose(); Disconnected?.Invoke(this); if (_isDisposed) return; Task.Run(Connect); } public void Dispose() { _isDisposed = true; _client?.Dispose(); } #region IClusterClient // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called. // For simplicity, and because the consumers "know" about it, I've decided to add handling for that scenario. public bool IsInitialized => _client.IsInitialized; public IServiceProvider ServiceProvider => _client.ServiceProvider; // uhhh --- yep, code smell, I know. public Task Connect(Func<Exception, Task<bool>> retryFilter = null) => throw new NotImplementedException("Use Start() instead."); public Task Close() => throw new NotImplementedException("Use Dispose() instead."); public void Abort() => throw new NotImplementedException("Use Dispose() instead."); public IStreamProvider GetStreamProvider(string name) => _client.GetStreamProvider(name); public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey => _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey => _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); public Task<TGrainObserverInterface> CreateObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver => _client.CreateObjectReference<TGrainObserverInterface>(obj); public Task DeleteObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver => _client.DeleteObjectReference<TGrainObserverInterface>(obj); public void BindGrainReference(IAddressable grain) => _client.BindGrainReference(grain); #endregion } }