using Microsoft.Extensions.Logging;
using Orleans;
using Orleans.Runtime;
using Orleans.Streams;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Orleans {
///
/// Class recreates and connects a cluster client after a disconnection, raising events to inform consumers.
/// An example of this might be the interval while a cluster being restarted or upgraded.
///
public class ReconnectingClusterClient : IClusterClient {
public event Action Connected;
public event Action Disconnected;
public Exception Error { get; private set; }
readonly ILogger Log;
readonly Func Configure;
bool _isStarted = false;
bool _isDisposed = false;
IClusterClient _client;
public ReconnectingClusterClient(ILoggerFactory loggerFactory, Func configure) {
Log = loggerFactory.CreateLogger();
Configure = configure;
}
///
/// The Start method is provided so the consuming code has time to attache event handlers before
/// we start attempting to connect.
///
public void Start() {
if (_isStarted) throw new InvalidOperationException("Can only be started once.");
_isStarted = true;
Task.Run(Connect);
}
async Task Connect() {
using (Log.BeginDataScopeUsingMethodName()) { // My custom extension method for creating a log scope.
try {
int connectionFailCount = 0;
var builder = new ClientBuilder();
// Allow consumers to setup the client builder.
await Configure(builder);
_client = builder.Build();
// Add a client disconnection handler.
builder.AddClusterConnectionLostHandler(OnDisconnected);
// This code blocks until a connection is made or we are disposed.
await _client.Connect(x => {
Error = x;
/// Only log the connection failure the first time it happens.
if (connectionFailCount++ == 0) Log.LogWarning(x, $"Failed to connect.");
// Keep automatically trying to reconnect until disposed.
return Task.FromResult(!_isDisposed);
});
if (_isDisposed) return; // We never managed to connect before we were disposed.
/// Inform consumers that a connection has been established.
/// Consumers will now re-subscribe streams and begin calling grain methods again.
Log.LogInformation("Connected.");
Connected?.Invoke(this);
} catch (Exception x) {
/// Log the exception before throwing it, which will crash the application.
/// (We actually don't expect any exception to be thrown in this method unless it is in the Configure(builder) method or the Connected event handler.)
Log.LogError(x, $"Exception thrown while connecting.");
throw;
}
}
}
void OnDisconnected(object sender, EventArgs e) {
_client.Dispose();
Disconnected?.Invoke(this);
if (_isDisposed) return;
Task.Run(Connect);
}
public void Dispose() {
_isDisposed = true;
_client?.Dispose();
}
#region IClusterClient
// There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called.
// An exception could also be thrown if the _client is not in a connected state.
// For simplicity, and because the consumers "know" about it, I've decided not to add handling for that scenario.
public bool IsInitialized => _client.IsInitialized;
public IServiceProvider ServiceProvider => _client.ServiceProvider;
// uhhh --- yep, code smell, I know.
public Task Connect(Func> retryFilter = null) => throw new NotImplementedException("Use Start() instead.");
public Task Close() => throw new NotImplementedException("Use Dispose() instead.");
public void Abort() => throw new NotImplementedException("Use Dispose() instead.");
public IStreamProvider GetStreamProvider(string name) => _client.GetStreamProvider(name);
public TGrainInterface GetGrain(Guid primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey
=> _client.GetGrain(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain(long primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey
=> _client.GetGrain(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain(string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey
=> _client.GetGrain(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain(Guid primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey
=> _client.GetGrain(primaryKey, keyExtension, grainClassNamePrefix);
public TGrainInterface GetGrain(long primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey
=> _client.GetGrain(primaryKey, keyExtension, grainClassNamePrefix);
public Task CreateObjectReference(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
=> _client.CreateObjectReference(obj);
public Task DeleteObjectReference(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
=> _client.DeleteObjectReference(obj);
public void BindGrainReference(IAddressable grain)
=> _client.BindGrainReference(grain);
#endregion
}
}