using CommonDomain; using CommonDomain.Core; using Microsoft.Practices.Unity; using NEventStore; using NEventStore.Dispatcher; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace WMSEventStorePrototype { class Program { static void Main(string[] args) { try { var container = new UnityContainer(); var dispatcher = new MyDispatcher(container); var store = Wireup.Init() .UsingInMemoryPersistence() .InitializeStorageEngine() .UsingJsonSerialization() .UsingSynchronousDispatchScheduler(dispatcher) .Build(); container.RegisterInstance(store); container.RegisterInstance(new MyAdapterService()); dispatcher.RegisterHandler(); dispatcher.RegisterHandler(); using (store) { using (var stream = store.CreateStream("I can build my own unique stream id")) { var rev = stream.StreamRevision; stream.Add(new EventMessage { Body = new EventA { Version = ++rev } }); stream.CommitChanges(Guid.NewGuid()); stream.Add(new EventMessage { Body = new EventA { Version = ++rev } }); stream.CommitChanges(Guid.NewGuid()); stream.Add(new EventMessage { Body = new EventA { Version = ++rev } }); stream.CommitChanges(Guid.NewGuid()); stream.Add(new EventMessage { Body = new EventB { Version = ++rev } }); stream.CommitChanges(Guid.NewGuid()); stream.Add(new EventMessage { Body = new EventB { Version = ++rev } }); stream.CommitChanges(Guid.NewGuid()); stream.Add(new EventMessage { Body = new EventB { Version = ++rev } }); stream.CommitChanges(Guid.NewGuid()); stream.Add(new EventMessage { Body = new EventB { Version = ++rev } }); stream.CommitChanges(Guid.NewGuid()); stream.Add(new EventMessage { Body = new EventB { Version = ++rev } }); stream.CommitChanges(Guid.NewGuid()); // the commit id arbitrarily identifies the changeset and doesn't need to be tied to the aggregate directly (unless I could generate a consitent guid from my hash) } } } catch (Exception ex) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); Console.ResetColor(); } } } public class MyDispatcher : IDispatchCommits { IUnityContainer container; Dictionary> registry = new Dictionary>(); public MyDispatcher(IUnityContainer container) { this.container = container; } public void Dispatch(ICommit commit) { var store = container.Resolve(); int minRevision = int.MinValue; var snapshot = store.Advanced.GetSnapshot(commit.StreamId, int.MaxValue); if (snapshot != null) minRevision = snapshot.StreamRevision; else snapshot = new Snapshot(commit.StreamId, commit.StreamRevision, new MyMemento()); var stream = store.OpenStream(commit.StreamId, minRevision, int.MaxValue); using (var child = container.CreateChildContainer()) { child.RegisterInstance(snapshot); child.RegisterInstance(stream); foreach (var msg in commit.Events) { // TODO: keep handler around so we don't have to re-initialize on each iteration (don't worry about it for now - all our commits only have a single event) registry[msg.Body.GetType()](child, (IEvent)msg.Body); } } } public void Dispose() { } public void RegisterHandler() where H : IHandler where T : IEvent { registry.Add(typeof(T), (unity, evt) => ((IHandler)unity.Resolve(typeof(H))).Handle(evt)); } } public class MyAdapterService { public void UpdateA(int id, int value, int version) { Console.WriteLine("UpdateA - Id: {0}, Value: {1}, Version: {2}", id, value, version); } public void UpdateB(int id, int value, int version) { Console.WriteLine("UpdateB - Id: {0}, Value: {1}, Version: {2}", id, value, version); } } public interface IHandler where T : IEvent { void Handle(object message); } public interface IEvent { string MessageId { get; set; } int Version { get; set; } } public class MyMemento : IMemento { public int RecordId { get; set; } public int TotalA { get; set; } public int TotalB { get; set; } public Guid Id { get; set; } public int Version { get; set; } } public class MyAggregate : IHandler, IHandler { IEventStream stream; ISnapshot snapshot; MyAdapterService adapter; int id = 0; int totalA = 0; int totalB = 0; public MyAggregate(IEventStream stream, ISnapshot snapshot, MyAdapterService adapter) { this.stream = stream; this.snapshot = snapshot; this.adapter = adapter; if (snapshot != null) { var m = (MyMemento)snapshot.Payload; id = m.RecordId; totalA = m.TotalA; totalB = m.TotalB; } Register(applyRegistry, Apply); Register(applyRegistry, Apply); Register(updateRegistry, Update); Register(updateRegistry, Update); foreach (var msg in stream.CommittedEvents) { Apply(msg); } } Dictionary> updateRegistry = new Dictionary>(); Dictionary> applyRegistry = new Dictionary>(); private void Register(IDictionary> registry, Action action) { if (action == null) throw new ArgumentNullException("action"); registry.Add(typeof(T), evt => action((T)evt)); } public void Handle(object msg) { Action handler; if (updateRegistry.TryGetValue(msg.GetType(), out handler)) { handler(msg); } else { throw new NotSupportedException(String.Format("No update handler is registered for the type '{0}'", msg.GetType())); } } private void Update(EventA @event) { adapter.UpdateA(id, totalA, @event.Version); } private void Update(EventB @event) { adapter.UpdateB(id, totalB, @event.Version); } private void Apply(EventMessage msg) { Action handler; if (applyRegistry.TryGetValue(msg.Body.GetType(), out handler)) { handler(msg.Body); } else { throw new NotSupportedException(String.Format("No apply handler is registered for the type '{0}'", msg.Body.GetType())); } } private void Apply(EventA @event) { totalA++; } private void Apply(EventB @event) { totalB++; } } public class EventA : IEvent { public int Version { get; set; } public string MessageId { get; set; } } public class EventB : IEvent { public int Version { get; set; } public string MessageId { get; set; } } }