Skip to content

Instantly share code, notes, and snippets.

@bboyle1234
Last active August 20, 2018 09:52
Show Gist options
  • Select an option

  • Save bboyle1234/9ac5a469dbeb5ea38e7994837b008cda to your computer and use it in GitHub Desktop.

Select an option

Save bboyle1234/9ac5a469dbeb5ea38e7994837b008cda to your computer and use it in GitHub Desktop.
using Apex.LoggingUtils;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Nito.AsyncEx;
using Orleans;
using Orleans.Runtime;
using Orleans.Streams;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Apex.Grains {
/// <summary>
/// Class recreates and connects a cluster client after a disconnection, raising events to inform consumers.
/// An example of this might be the interval while a cluster being restarted or upgraded.
/// </summary>
public abstract class ReconnectingClusterClient : IClusterClient {
public event Action<ReconnectingClusterClient> Connected;
public event Action<ReconnectingClusterClient> Disconnected;
public bool IsConnected { get; set; }
public Exception Error { get; private set; }
readonly ILogger Log;
// This ManualResetEvent allows multiple consumers to await the same connection event without affecting each other.
// The trick is to make sure we manually reset it when disconnected.
readonly AsyncManualResetEvent ConnectedEvent = new AsyncManualResetEvent(false);
bool _isDisposed = false;
IClusterClient _client;
public ReconnectingClusterClient(ILoggerFactory loggerFactory) {
// Because we're an abstract class we don't use the generic method.
Log = loggerFactory.CreateLogger(GetType());
Task.Run(Connect);
}
public Task WaitForConnection(TimeSpan waitTime) {
using (var cts = new CancellationTokenSource()) {
cts.CancelAfter(waitTime);
return WaitForConnection(cts.Token);
}
}
public Task WaitForConnection(CancellationToken cancellationToken)
=> ConnectedEvent.WaitAsync(cancellationToken);
async Task Connect() {
using (Log.BeginDataScopeUsingMethodName()) {
await CreateAndConnect();
if (_isDisposed) return; // We never managed to connect before we were disposed.
IsConnected = true;
ConnectedEvent.Set();
try {
Connected?.Invoke(this);
} catch (Exception x) {
Log.LogError(x, $"Exception thrown while executing the 'Connected' event handler.");
}
}
}
async Task CreateAndConnect() {
/// The orleans client has the ability to keep retrying connections before its first connection. (It won't retry after a disconnection)
/// In this implementation we don't use the built-in client's ability to retry connections during its first connection attempt, because
/// we found that it messed up stream subscriptions. Therefore we now recreate the client for each connection attempt.
int connectionFailCount = 0;
while (!_isDisposed) {
try {
var builder = new ClientBuilder();
Configure(builder);
builder.AddClusterConnectionLostHandler(OnDisconnected);
_client = builder.Build();
} catch (Exception x) {
Error = x;
Log.LogError(x, $"Exception thrown while configuring client.");
throw;
}
try {
await _client.Connect(); // This overload does not allow the client to reattempt connection.
Log.LogInformation("Connected.");
return;
} catch (Exception x) {
Error = x;
_client.Dispose();
if (connectionFailCount++ == 0)
Log.LogWarning(x, $"Failed to connect.");
}
await Task.Delay(1000); // Attempt reconnection once per second.
}
}
protected abstract void Configure(ClientBuilder builder);
void OnDisconnected(object sender, EventArgs e) {
ConnectedEvent.Reset();
_client?.Dispose();
IsConnected = false;
Disconnected?.Invoke(this);
if (_isDisposed) return;
Task.Run(Connect);
}
public void Dispose() {
_isDisposed = true;
_client?.Dispose();
}
#region IClusterClient
// There is potential for null reference exceptions here if these properties/methods are called before the Start method has been called.
// For simplicity, and because the consumers "know" about it, I've decided to add handling for that scenario.
public bool IsInitialized => _client.IsInitialized;
public IServiceProvider ServiceProvider => _client.ServiceProvider;
// uhhh --- yep, code smell, I know.
public Task Connect(Func<Exception, Task<bool>> retryFilter = null) => throw new NotImplementedException("This client starts connection attempts immediately. Try using 'WaitForConnection()' instead.");
public Task Close() => throw new NotImplementedException("Use Dispose() instead.");
public void Abort() => throw new NotImplementedException("Use Dispose() instead.");
public IStreamProvider GetStreamProvider(string name) => _client.GetStreamProvider(name);
public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey
=> _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey
=> _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey
=> _client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey
=> _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix);
public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string keyExtension, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey
=> _client.GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix);
public Task<TGrainObserverInterface> CreateObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
=> _client.CreateObjectReference<TGrainObserverInterface>(obj);
public Task DeleteObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver
=> _client.DeleteObjectReference<TGrainObserverInterface>(obj);
public void BindGrainReference(IAddressable grain)
=> _client.BindGrainReference(grain);
#endregion
}
}
@martinothamar
Copy link

This is nice, thanks for sharing!

@bboyle1234
Copy link
Author

@martinothamar, I've updated it. In this new version, you inherit and override the "Configure()" method

@bboyle1234
Copy link
Author

@martinothamar, updated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment