using Microsoft.Extensions.Logging; using Orleans; using Orleans.Runtime; using Orleans.Streams; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace Orleans { /// /// Class recreates and connects a cluster client after a disconnection, raising events to inform consumers. /// An example of this might be the interval while a cluster being restarted or upgraded. /// public class ReconnectingClusterClient : IClusterClient { public event Action Connected; public event Action Disconnected; public Exception Error { get; private set; } readonly ILogger Log; readonly Func Configure; bool _isStarted = false; bool _isDisposed = false; IClusterClient _client; public ReconnectingClusterClient(ILoggerFactory loggerFactory, Func configure) { Log = loggerFactory.CreateLogger(); Configure = configure; } /// /// The Start method is provided so the consuming code has time to attache event handlers before /// we start attempting to connect. /// public void Start() { if (_isStarted) throw new InvalidOperationException("Can only be started once."); _isStarted = true; Task.Run(Connect); } async Task Connect() { using (Log.BeginDataScopeUsingMethodName()) { // My custom extension method for creating a log scope. try { int connectionFailCount = 0; var builder = new ClientBuilder(); // Allow consumers to setup the client builder. await Configure(builder); _client = builder.Build(); // Add a client disconnection handler. builder.AddClusterConnectionLostHandler(OnDisconnected); // This code blocks until a connection is made or we are disposed. await _client.Connect(x => { Error = x; /// Only log the connection failure the first time it happens. if (connectionFailCount++ == 0) Log.LogWarning(x, $"Failed to connect."); // Keep automatically trying to reconnect until disposed. return Task.FromResult(!_isDisposed); }); if (_isDisposed) return; // We never managed to connect before we were disposed. /// Inform consumers that a connection has been established. /// Consumers will now re-subscribe streams and begin calling grain methods again. Log.LogInformation("Connected."); Connected?.Invoke(this); } catch (Exception x) { /// Log the exception before throwing it, which will crash the application. /// (We actually don't expect any exception to be thrown in this method unless it is in the Configure(builder) method or the Connected event handler.) Log.LogError(x, $"Exception thrown while connecting."); throw; } } } void OnDisconnected(object sender, EventArgs e) { _client.Dispose(); Disconnected?.Invoke(this); if (_isDisposed) return; Task.Run(Connect); } public void Dispose() { _isDisposed = true; _client?.Dispose(); } #region IClusterClient // There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called. // An exception could also be thrown if the _client is not in a connected state. // For simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario. public bool IsInitialized => _client.IsInitialized; public IServiceProvider ServiceProvider => _client.ServiceProvider; // uhhh --- yep, code smell, I know. public Task Connect(Func> retryFilter = null) => throw new NotImplementedException("Use Start() instead."); public Task Close() => throw new NotImplementedException("Use Dispose() instead."); public void Abort() => throw new NotImplementedException("Use Dispose() instead."); public IStreamProvider GetStreamProvider(string name) => _client.GetStreamProvider(name); public TGrainInterface GetGrain(Guid primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey => _client.GetGrain(primaryKey, grainClassNamePrefix); public TGrainInterface GetGrain(long primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey => _client.GetGrain(primaryKey, grainClassNamePrefix); public TGrainInterface GetGrain(string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey => _client.GetGrain(primaryKey, grainClassNamePrefix); public TGrainInterface GetGrain(Guid primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey => _client.GetGrain(primaryKey, keyExtension, grainClassNamePrefix); public TGrainInterface GetGrain(long primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey => _client.GetGrain(primaryKey, keyExtension, grainClassNamePrefix); public Task CreateObjectReference(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver => _client.CreateObjectReference(obj); public Task DeleteObjectReference(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver => _client.DeleteObjectReference(obj); public void BindGrainReference(IAddressable grain) => _client.BindGrainReference(grain); #endregion } }