Last active
August 20, 2018 09:52
-
-
Save bboyle1234/9ac5a469dbeb5ea38e7994837b008cda to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Microsoft.Extensions.Logging; | |
| using Orleans; | |
| using Orleans.Runtime; | |
| using Orleans.Streams; | |
| using System; | |
| using System.Collections.Generic; | |
| using System.Text; | |
| using System.Threading.Tasks; | |
| namespace Orleans { | |
| /// <summary> | |
| /// Class recreates and connects a cluster client after a disconnection, raising events to inform consumers. | |
| /// An example of this might be the interval while a cluster being restarted or upgraded. | |
| /// </summary> | |
| public class ReconnectingClusterClient : IClusterClient { | |
| public event Action<ReconnectingClusterClient> Connected; | |
| public event Action<ReconnectingClusterClient> Disconnected; | |
| public Exception Error { get; private set; } | |
| readonly ILogger Log; | |
| readonly Func<ClientBuilder, Task> Configure; | |
| bool _isStarted = false; | |
| bool _isDisposed = false; | |
| IClusterClient _client; | |
| public ReconnectingClusterClient(ILoggerFactory loggerFactory, Func<ClientBuilder, Task> configure) { | |
| Log = loggerFactory.CreateLogger<ReconnectingClusterClient>(); | |
| Configure = configure; | |
| } | |
| /// <summary> | |
| /// The Start method is provided so the consuming code has time to attache event handlers before | |
| /// we start attempting to connect. | |
| /// </summary> | |
| public void Start() { | |
| if (_isStarted) throw new InvalidOperationException("Can only be started once."); | |
| _isStarted = true; | |
| Task.Run(Connect); | |
| } | |
| async Task Connect() { | |
| using (Log.BeginDataScopeUsingMethodName()) { // My custom extension method for creating a log scope. | |
| try { | |
| int connectionFailCount = 0; | |
| var builder = new ClientBuilder(); | |
| // Allow consumers to setup the client builder. | |
| await Configure(builder); | |
| // Add a client disconnection handler. | |
| builder.AddClusterConnectionLostHandler(OnDisconnected); | |
| // This code blocks until a connection is made or we are disposed. | |
| await _client.Connect(x => { | |
| Error = x; | |
| /// Only log the connection failure the first time it happens. | |
| if (connectionFailCount++ == 0) Log.LogWarning(x, $"Failed to connect."); | |
| // Keep automatically trying to reconnect until disposed. | |
| return Task.FromResult(!_isDisposed); | |
| }); | |
| if (_isDisposed) return; // We never managed to connect before we were disposed. | |
| /// Inform consumers that a connection has been established. | |
| /// Consumers will now re-subscribe streams and begin calling grain methods again. | |
| Log.LogInformation("Connected."); | |
| Connected?.Invoke(this); | |
| } catch (Exception x) { | |
| /// Log the exception before throwing it, which will crash the application. | |
| /// (We actually don't expect any exception to be thrown in this method unless it is in the Configure(builder) method or the Connected event handler.) | |
| Log.LogError(x, $"Exception thrown while connecting."); | |
| throw; | |
| } | |
| } | |
| } | |
| void OnDisconnected(object sender, EventArgs e) { | |
| _client.Dispose(); | |
| Disconnected?.Invoke(this); | |
| if (_isDisposed) return; | |
| Task.Run(Connect); | |
| } | |
| public void Dispose() { | |
| _isDisposed = true; | |
| _client?.Dispose(); | |
| } | |
| #region IClusterClient | |
| // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called. | |
| // An exception could also be thrown if the _client is not in a connected state. | |
| // For simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario. | |
| public bool IsInitialized => _client.IsInitialized; | |
| public IServiceProvider ServiceProvider => _client.ServiceProvider; | |
| // uhhh --- yep, code smell, I know. | |
| public Task Connect(Func<Exception, Task<bool>> retryFilter = null) => throw new NotImplementedException("Use Start() instead."); | |
| public Task Close() => throw new NotImplementedException("Use Dispose() instead."); | |
| public void Abort() => throw new NotImplementedException("Use Dispose() instead."); | |
| public IStreamProvider GetStreamProvider(string name) => _client.GetStreamProvider(name); | |
| public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey | |
| => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey | |
| => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey | |
| => _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey | |
| => _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey | |
| => _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); | |
| public Task<TGrainObserverInterface> CreateObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver | |
| => _client.CreateObjectReference<TGrainObserverInterface>(obj); | |
| public Task DeleteObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver | |
| => _client.DeleteObjectReference<TGrainObserverInterface>(obj); | |
| public void BindGrainReference(IAddressable grain) | |
| => _client.BindGrainReference(grain); | |
| #endregion | |
| } | |
| } |
Author
@martinothamar, I've updated it. In this new version, you inherit and override the "Configure()" method
Author
@martinothamar, updated.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is nice, thanks for sharing!