Last active
August 20, 2018 09:52
-
-
Save bboyle1234/9ac5a469dbeb5ea38e7994837b008cda to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Apex.LoggingUtils; | |
| using Microsoft.Extensions.DependencyInjection; | |
| using Microsoft.Extensions.DependencyInjection.Extensions; | |
| using Microsoft.Extensions.Logging; | |
| using Nito.AsyncEx; | |
| using Orleans; | |
| using Orleans.Runtime; | |
| using Orleans.Streams; | |
| using System; | |
| using System.Collections.Generic; | |
| using System.Linq; | |
| using System.Text; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| namespace Apex.Grains { | |
| /// <summary> | |
| /// Class recreates and connects a cluster client after a disconnection, raising events to inform consumers. | |
| /// An example of this might be the interval while a cluster being restarted or upgraded. | |
| /// </summary> | |
| public abstract class ReconnectingClusterClient : IClusterClient { | |
| public event Action<ReconnectingClusterClient> Connected; | |
| public event Action<ReconnectingClusterClient> Disconnected; | |
| public bool IsConnected { get; set; } | |
| public Exception Error { get; private set; } | |
| readonly ILogger Log; | |
| // This ManualResetEvent allows multiple consumers to await the same connection event without affecting each other. | |
| // The trick is to make sure we manually reset it when disconnected. | |
| readonly AsyncManualResetEvent ConnectedEvent = new AsyncManualResetEvent(false); | |
| bool _isDisposed = false; | |
| IClusterClient _client; | |
| public ReconnectingClusterClient(ILoggerFactory loggerFactory) { | |
| // Because we're an abstract class we don't use the generic method. | |
| Log = loggerFactory.CreateLogger(GetType()); | |
| Task.Run(Connect); | |
| } | |
| public Task WaitForConnection(TimeSpan waitTime) { | |
| using (var cts = new CancellationTokenSource()) { | |
| cts.CancelAfter(waitTime); | |
| return WaitForConnection(cts.Token); | |
| } | |
| } | |
| public Task WaitForConnection(CancellationToken cancellationToken) | |
| => ConnectedEvent.WaitAsync(cancellationToken); | |
| async Task Connect() { | |
| using (Log.BeginDataScopeUsingMethodName()) { | |
| await CreateAndConnect(); | |
| if (_isDisposed) return; // We never managed to connect before we were disposed. | |
| IsConnected = true; | |
| ConnectedEvent.Set(); | |
| try { | |
| Connected?.Invoke(this); | |
| } catch (Exception x) { | |
| Log.LogError(x, $"Exception thrown while executing the 'Connected' event handler."); | |
| } | |
| } | |
| } | |
| async Task CreateAndConnect() { | |
| /// The orleans client has the ability to keep retrying connections before its first connection. (It won't retry after a disconnection) | |
| /// In this implementation we don't use the built-in client's ability to retry connections during its first connection attempt, because | |
| /// we found that it messed up stream subscriptions. Therefore we now recreate the client for each connection attempt. | |
| int connectionFailCount = 0; | |
| while (!_isDisposed) { | |
| try { | |
| var builder = new ClientBuilder(); | |
| Configure(builder); | |
| builder.AddClusterConnectionLostHandler(OnDisconnected); | |
| _client = builder.Build(); | |
| } catch (Exception x) { | |
| Error = x; | |
| Log.LogError(x, $"Exception thrown while configuring client."); | |
| throw; | |
| } | |
| try { | |
| await _client.Connect(); // This overload does not allow the client to reattempt connection. | |
| Log.LogInformation("Connected."); | |
| return; | |
| } catch (Exception x) { | |
| Error = x; | |
| _client.Dispose(); | |
| if (connectionFailCount++ == 0) | |
| Log.LogWarning(x, $"Failed to connect."); | |
| } | |
| await Task.Delay(1000); // Attempt reconnection once per second. | |
| } | |
| } | |
| protected abstract void Configure(ClientBuilder builder); | |
| void OnDisconnected(object sender, EventArgs e) { | |
| ConnectedEvent.Reset(); | |
| _client?.Dispose(); | |
| IsConnected = false; | |
| Disconnected?.Invoke(this); | |
| if (_isDisposed) return; | |
| Task.Run(Connect); | |
| } | |
| public void Dispose() { | |
| _isDisposed = true; | |
| _client?.Dispose(); | |
| } | |
| #region IClusterClient | |
| // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called. | |
| // For simplicity, and because the consumers "know" about it, I've decided to add handling for that scenario. | |
| public bool IsInitialized => _client.IsInitialized; | |
| public IServiceProvider ServiceProvider => _client.ServiceProvider; | |
| // uhhh --- yep, code smell, I know. | |
| public Task Connect(Func<Exception, Task<bool>> retryFilter = null) => throw new NotImplementedException("This client starts connection attempts immediately. Try using 'WaitForConnection()' instead."); | |
| public Task Close() => throw new NotImplementedException("Use Dispose() instead."); | |
| public void Abort() => throw new NotImplementedException("Use Dispose() instead."); | |
| public IStreamProvider GetStreamProvider(string name) => _client.GetStreamProvider(name); | |
| public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey | |
| => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey | |
| => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey | |
| => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey | |
| => _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey | |
| => _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); | |
| public Task<TGrainObserverInterface> CreateObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver | |
| => _client.CreateObjectReference<TGrainObserverInterface>(obj); | |
| public Task DeleteObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver | |
| => _client.DeleteObjectReference<TGrainObserverInterface>(obj); | |
| public void BindGrainReference(IAddressable grain) | |
| => _client.BindGrainReference(grain); | |
| #endregion | |
| } | |
| } |
Author
@martinothamar, I've updated it. In this new version, you inherit and override the "Configure()" method
Author
@martinothamar, updated.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is nice, thanks for sharing!