Created
August 29, 2015 12:21
-
-
Save dmannock/f8d99e4172243c70cbc4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class CommandThrottler | |
| { | |
| private static readonly Dictionary<Type, object> observables = new Dictionary<Type, object>(); | |
| public static readonly TimeSpan DefaultThrottle = TimeSpan.FromMilliseconds(500); | |
| public static readonly TimeSpan SmallThrottle = TimeSpan.FromMilliseconds(200); | |
| public static readonly TimeSpan LargeThrottle = TimeSpan.FromSeconds(1); | |
| public static readonly TimeSpan None = TimeSpan.Zero; | |
| private IServiceBus _bus; | |
| private IEventStore _store; | |
| //public IServiceBus Bus { get { if (_bus == null) { _bus = ServiceLocator.Bus; } return _bus; } } | |
| //public IEventStore Store { get { if (_store == null) { _store = ServiceLocator.ProductEnquiryStore; } return _store; } } | |
| public IServiceBus Bus { get { return _bus; } } | |
| public IEventStore Store { get { return _store; } } | |
| public CommandThrottler(IServiceBus bus, IEventStore store) | |
| { | |
| this._bus = bus; | |
| this._store = store; | |
| } | |
| public void Register<T>() where T : Command | |
| { | |
| Register<T>(DefaultThrottle); | |
| } | |
| public void Register<T>(int throttleMilliseconds) where T : Command | |
| { | |
| Register<T>(TimeSpan.FromMilliseconds(throttleMilliseconds)); | |
| } | |
| public void Register<TCommand>(TimeSpan dueTime) | |
| where TCommand : Command | |
| { | |
| Type type = typeof(TCommand); | |
| //only register the command once | |
| if (observables.ContainsKey(type)) | |
| throw new CommandThrottleException(string.Format("Failed to register throttle command of type {0}. Command type already registered.", type.Name)); | |
| if (Bus == null) | |
| throw new ArgumentNullException(string.Format("Failed to register throttle command of type {0}. Bus is null.", type.Name)); | |
| if (Store == null) | |
| throw new ArgumentNullException(string.Format("Failed to register throttle command of type {0}. Store is null.", type.Name)); | |
| var observable = new Subject<ThrottleCommand<TCommand>>(); | |
| observable | |
| .GroupBy(g => new { g.Id, Bus, Store }) | |
| .SelectMany(s => | |
| s.DistinctUntilChanged() | |
| .Throttle(dueTime) | |
| ) | |
| .Finally(() => | |
| { | |
| if (observables.ContainsKey(type)) | |
| observables.Remove(type); | |
| //re-subscribe | |
| Register<TCommand>(dueTime); | |
| //Elmah.ErrorLog | |
| // .GetDefault(null).Log( | |
| // new Elmah.Error( | |
| // new CommandThrottleException(string.Format("Command Throttling Observable completed and re-subscribed for type {0}", type.Name)))); | |
| }) | |
| .Subscribe(c => | |
| { | |
| var version = Store.LatestVersion(c.Id); | |
| Bus.Send(c.CreateCommand(version)); | |
| }, | |
| e => | |
| { | |
| //Elmah.ErrorLog | |
| // .GetDefault(null).Log( | |
| // new Elmah.Error( | |
| // new CommandThrottleException(string.Format("Command Throttling Observable threw an exception for type {0}, dueTime {1}", type.Name, dueTime), e))); | |
| }, | |
| () => | |
| { | |
| //Elmah.ErrorLog | |
| // .GetDefault(null).Log( | |
| // new Elmah.Error( | |
| // new CommandThrottleException(string.Format("Command Throttling Observable completed for type {0}", type.Name)))); | |
| }); | |
| observables.Add(type, observable); | |
| } | |
| /// <summary> | |
| /// Throttles sending of commands, deferring execution | |
| /// until when the command is sent on the bus | |
| /// </summary> | |
| /// <typeparam name="TCommand">Type of command</typeparam> | |
| /// <param name="id">enquiry id</param> | |
| /// <param name="deferredCommand">function that takes an integer param (version) and returns a command of type T</param> | |
| /// <param name="context">REMOVED hub context</param> | |
| public static void ThrottleCommand<TCommand>(Guid id, Func<int, TCommand> deferredCommand) | |
| where TCommand : Command | |
| { | |
| if (!observables.ContainsKey(typeof(TCommand))) | |
| throw new Exception(string.Format("No handler registered to throttle command of type {0}. Details: Id={1}, Context={2} ", typeof(TCommand).Name, id)); | |
| var observable = observables[typeof(TCommand)] as Subject<ThrottleCommand<TCommand>>; | |
| observable.OnNext( | |
| new ThrottleCommand<TCommand>(id, deferredCommand) | |
| ); | |
| } | |
| } | |
| public class ThrottleCommand<TCommand> | |
| where TCommand : Command | |
| { | |
| public readonly Guid Id; | |
| public readonly Func<int, TCommand> CreateCommand; | |
| public ThrottleCommand(Guid id, Func<int, TCommand> createCommand) | |
| { | |
| this.Id = id; | |
| this.CreateCommand = createCommand; | |
| } | |
| } | |
| public class CommandFactory | |
| { | |
| public T Create<T>(Guid id, IEventStore store, params object[] commandParams) | |
| { | |
| var version = store.LatestVersion(id); | |
| return Create<T>(id, version, commandParams); | |
| } | |
| public T Create<T>(Guid id, int version, params object[] commandParams) | |
| { | |
| //_command = (T)Activator.CreateInstance(typeof(T), Id, CommandParams, version); | |
| //var obj = (T)FormatterServices.GetUninitializedObject(typeof(T)); | |
| var constructorTypes = commandParams.Select(p => p.GetType()).ToList(); | |
| constructorTypes.Insert(0, id.GetType()); | |
| constructorTypes.Add(version.GetType()); | |
| var ctor = typeof(T).GetConstructor(constructorTypes.ToArray()); | |
| if (ctor == null) | |
| throw new ArgumentException("Command Parameters do not match the Constructor"); | |
| var param = commandParams.Cast<object>().ToList(); | |
| param.Insert(0, id); | |
| param.Add(version); | |
| return (T)ctor.Invoke(param.ToArray()); | |
| } | |
| } | |
| public class CommandThrottleException : Exception | |
| { | |
| public CommandThrottleException(string message) | |
| : base(message) | |
| { | |
| } | |
| public CommandThrottleException(string message, Exception ex) | |
| : base(message, ex) | |
| { | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment