Skip to content

Instantly share code, notes, and snippets.

@dmannock
Created August 29, 2015 12:21
Show Gist options
  • Select an option

  • Save dmannock/f8d99e4172243c70cbc4 to your computer and use it in GitHub Desktop.

Select an option

Save dmannock/f8d99e4172243c70cbc4 to your computer and use it in GitHub Desktop.
public class CommandThrottler
{
private static readonly Dictionary<Type, object> observables = new Dictionary<Type, object>();
public static readonly TimeSpan DefaultThrottle = TimeSpan.FromMilliseconds(500);
public static readonly TimeSpan SmallThrottle = TimeSpan.FromMilliseconds(200);
public static readonly TimeSpan LargeThrottle = TimeSpan.FromSeconds(1);
public static readonly TimeSpan None = TimeSpan.Zero;
private IServiceBus _bus;
private IEventStore _store;
//public IServiceBus Bus { get { if (_bus == null) { _bus = ServiceLocator.Bus; } return _bus; } }
//public IEventStore Store { get { if (_store == null) { _store = ServiceLocator.ProductEnquiryStore; } return _store; } }
public IServiceBus Bus { get { return _bus; } }
public IEventStore Store { get { return _store; } }
public CommandThrottler(IServiceBus bus, IEventStore store)
{
this._bus = bus;
this._store = store;
}
public void Register<T>() where T : Command
{
Register<T>(DefaultThrottle);
}
public void Register<T>(int throttleMilliseconds) where T : Command
{
Register<T>(TimeSpan.FromMilliseconds(throttleMilliseconds));
}
public void Register<TCommand>(TimeSpan dueTime)
where TCommand : Command
{
Type type = typeof(TCommand);
//only register the command once
if (observables.ContainsKey(type))
throw new CommandThrottleException(string.Format("Failed to register throttle command of type {0}. Command type already registered.", type.Name));
if (Bus == null)
throw new ArgumentNullException(string.Format("Failed to register throttle command of type {0}. Bus is null.", type.Name));
if (Store == null)
throw new ArgumentNullException(string.Format("Failed to register throttle command of type {0}. Store is null.", type.Name));
var observable = new Subject<ThrottleCommand<TCommand>>();
observable
.GroupBy(g => new { g.Id, Bus, Store })
.SelectMany(s =>
s.DistinctUntilChanged()
.Throttle(dueTime)
)
.Finally(() =>
{
if (observables.ContainsKey(type))
observables.Remove(type);
//re-subscribe
Register<TCommand>(dueTime);
//Elmah.ErrorLog
// .GetDefault(null).Log(
// new Elmah.Error(
// new CommandThrottleException(string.Format("Command Throttling Observable completed and re-subscribed for type {0}", type.Name))));
})
.Subscribe(c =>
{
var version = Store.LatestVersion(c.Id);
Bus.Send(c.CreateCommand(version));
},
e =>
{
//Elmah.ErrorLog
// .GetDefault(null).Log(
// new Elmah.Error(
// new CommandThrottleException(string.Format("Command Throttling Observable threw an exception for type {0}, dueTime {1}", type.Name, dueTime), e)));
},
() =>
{
//Elmah.ErrorLog
// .GetDefault(null).Log(
// new Elmah.Error(
// new CommandThrottleException(string.Format("Command Throttling Observable completed for type {0}", type.Name))));
});
observables.Add(type, observable);
}
/// <summary>
/// Throttles sending of commands, deferring execution
/// until when the command is sent on the bus
/// </summary>
/// <typeparam name="TCommand">Type of command</typeparam>
/// <param name="id">enquiry id</param>
/// <param name="deferredCommand">function that takes an integer param (version) and returns a command of type T</param>
/// <param name="context">REMOVED hub context</param>
public static void ThrottleCommand<TCommand>(Guid id, Func<int, TCommand> deferredCommand)
where TCommand : Command
{
if (!observables.ContainsKey(typeof(TCommand)))
throw new Exception(string.Format("No handler registered to throttle command of type {0}. Details: Id={1}, Context={2} ", typeof(TCommand).Name, id));
var observable = observables[typeof(TCommand)] as Subject<ThrottleCommand<TCommand>>;
observable.OnNext(
new ThrottleCommand<TCommand>(id, deferredCommand)
);
}
}
public class ThrottleCommand<TCommand>
where TCommand : Command
{
public readonly Guid Id;
public readonly Func<int, TCommand> CreateCommand;
public ThrottleCommand(Guid id, Func<int, TCommand> createCommand)
{
this.Id = id;
this.CreateCommand = createCommand;
}
}
public class CommandFactory
{
public T Create<T>(Guid id, IEventStore store, params object[] commandParams)
{
var version = store.LatestVersion(id);
return Create<T>(id, version, commandParams);
}
public T Create<T>(Guid id, int version, params object[] commandParams)
{
//_command = (T)Activator.CreateInstance(typeof(T), Id, CommandParams, version);
//var obj = (T)FormatterServices.GetUninitializedObject(typeof(T));
var constructorTypes = commandParams.Select(p => p.GetType()).ToList();
constructorTypes.Insert(0, id.GetType());
constructorTypes.Add(version.GetType());
var ctor = typeof(T).GetConstructor(constructorTypes.ToArray());
if (ctor == null)
throw new ArgumentException("Command Parameters do not match the Constructor");
var param = commandParams.Cast<object>().ToList();
param.Insert(0, id);
param.Add(version);
return (T)ctor.Invoke(param.ToArray());
}
}
public class CommandThrottleException : Exception
{
public CommandThrottleException(string message)
: base(message)
{
}
public CommandThrottleException(string message, Exception ex)
: base(message, ex)
{
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment