Forked from guidoschmidt17/example_zpure_eventsourcing_1.scala
Created
February 24, 2023 03:17
-
-
Save guizmaii/9df958c2010b4dc593b85c3c11bdeaa2 to your computer and use it in GitHub Desktop.
Revisions
-
guidoschmidt17 revised this gist
Feb 23, 2023 . 1 changed file with 15 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -409,4 +409,19 @@ def test(uuid: Uuid): LoggedProgram[Unit] = _ <- createMaster(uuid) _ <- modify.repeatN(1999) yield () // using Transition to recreate the AggregateRoot import syntax.* def readById(uuid: Uuid) = ZIO.scoped(eventstore.readFactsByAggregateRootId(uuid).flatMap(aggregate(_))) def aggregate( facts: RawValueStream )(using transition: Transition): ZIO[Any, StreamingError, ZPure[Nothing, Instance, Instance, Any, Error, Unit]] = facts .mapZIO(transport.fromRawValue(_)) .runFold(setWithError(Instance.empty))((s, value) => s.flatMap(_ => transition(value.event, value.identified.latest))) -
guidoschmidt17 revised this gist
Feb 23, 2023 . 1 changed file with 69 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -341,3 +341,72 @@ object Syntax: transport <- ZIO.service[Transport] result = Syntax(transport) yield result // example usage of domain "mastermanagement" case class Person(lastname: String, firstname: String, age: Int) given Codec[Person] = deriveCodec[Person] case class Matrix( m1: Double, m2: Double, m3: Double, m4: Double, m5: Double, m6: Double, m7: Double, m8: Double, m9: Double, m10: Double, m11: Double, m12: Double ) given Codec[Matrix] = deriveCodec[Matrix] object Matrix: final val identity = Matrix(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0) import Properties.* import Properties.given def test(uuid: Uuid): LoggedProgram[Unit] = val largebytes = util.Random.alphanumeric.take(200).toList.toString.nn val modify = for _ <- addPropertyMaster("name" -> StringElem("this is my name.")) _ <- addPropertyMaster("description" -> StringElem("this is my description.")) _ <- addPropertyVersion("creator" -> StringElem("Bob")) _ <- addPropertyRevision("size" -> IntElem(4711)) _ <- addPropertyIteration("pi" -> DoubleElem(3.14159)) _ <- addPropertyIteration("somebytes" -> ByteArrayElem("blablabla".getBytes.nn)) _ <- addPropertyIteration("largebytes" -> StringElem(largebytes)) _ <- addPropertyIteration("happy" -> BooleanElem(true)) _ <- addPropertyIteration("noidea" -> NullElem) _ <- addPropertyIteration("dontcare" -> UndefinedElem) _ <- addPropertyIteration("file" -> AnyElem(File(Uuid.randomUuid, 100, 10, "test.txt"))) _ <- addPropertyIteration("person" -> AnyElem(Person("Smith", "Joe", 37))) _ <- addPropertyIteration("matrix" -> AnyElem(Matrix.identity)) _ <- newIteration _ <- newVersion _ <- newRevision _ <- newIteration _ <- newVersion _ <- newIteration _ <- removePropertyIteration("pi") _ <- addPropertyIteration("e" -> DoubleElem(2.71828)) _ <- removePropertyMaster("name") _ <- deleteIteration _ <- selectIndex((2, 1, 1)) _ <- removePropertyIteration("name") _ <- removePropertyIteration("name") _ <- newIteration _ <- addPropertyIteration("name" -> StringElem("this is my new name.")) _ <- newVersion _ <- selectIndex((1, 1, 2)) _ <- selectLatest yield () for _ <- createMaster(uuid) _ <- modify.repeatN(1999) yield () -
guidoschmidt17 created this gist
Feb 23, 2023 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,343 @@ package cqrs package eventstore import zio.* import zio.prelude.* import zio.prelude.fx.* object BaseSyntax: type Program[S, R, E, A] = ZPure[Nothing, S, S, R, E, A] type LoggedProgram[W, S, R, E, A] = ZPure[W, S, S, R, E, A] def pure[S, R, A](a: A): Program[S, R, Nothing, A] = ZPure.succeed(a) def unit[S, R]: Program[S, R, Nothing, Unit] = ZPure.unit def raiseError[S, R, E](t: => E): Program[S, R, E, Nothing] = ZPure.fail(t) def assertThat[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] = if cond then unit else raiseError(t) def assertThatNot[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] = assertThat(!cond, t) def extractOption[S, R, E, A](a: Option[A], t: => E): Program[S, R, E, A] = a match case Some(value) => pure(value) case None => raiseError(t) def get[S, R]: Program[S, R, Nothing, S] = ZPure.get[S] def set[S, R](s: S): Program[S, R, Nothing, Unit] = EState.set(s) def setWithError[S, R, E](s: S): Program[S, R, E, Unit] = EState.set(s) def update[S, R](f: S => S): Program[S, R, Nothing, Unit] = EState.update(f.apply) def inspect[S, R, A](f: S => A): Program[S, R, Nothing, A] = get.map(f(_)) def inquire[S, R: Tag, A](f: R => A): Program[S, R, Nothing, A] = ZPure.service[S, R].map(f(_)) def log[W, S, R, E](w: W): LoggedProgram[W, S, R, E, Unit] = ZPure.log(w) def set[W, S, R](previouslog: Chunk[W]): LoggedProgram[W, S, R, Nothing, Unit] = ZPure.forEach(previouslog)(log(_)).unit def restore[W, S, R](previouslog: Chunk[W], previousstate: S): LoggedProgram[W, S, R, Nothing, Unit] = set(previouslog) *> set(previousstate) package cqrs package domain package mastermanagement import zio.* import eventstore.* import Error.* import Error.ValidationError.* import Event.* import Fact.* import Transport.* final class Syntax private (transport: Transport): type Program[A] = BaseSyntax.Program[Instance, Any, Error, A] type LoggedProgram[A] = BaseSyntax.LoggedProgram[Value, Instance, Any, Error, A] val get = BaseSyntax.get[Instance, Any] val set = BaseSyntax.set[Instance, Any] val update = BaseSyntax.update[Instance, Any] def setWithError = BaseSyntax.setWithError[Instance, Any, Error] def inspect[A] = BaseSyntax.inspect[Instance, Any, A] import BaseSyntax.* def lift(event: Event)(using transition: Transition): LoggedProgram[Unit] = for previous <- inspect(_.master.latest) _ <- transition(event, previous + 1) m <- inspect(_.master) _ <- log(Value(m, event)) yield () private def snapshot(raws: RawValues, instance: Instance): ZIO[Any, Any, (Values, Instance)] = val withsnapshot = for _ <- set(instance) weight = raws.foldLeft(0)((s, r) => if r.eventCategory == Category.Created && r.aggregateLatest > 1 then s else s + r.eventData.length + MinSnapshotEventSize ) _ = println(s"weight $weight $DefaultSnapshotThreshold") _ <- if weight > DefaultSnapshotThreshold then for m <- inspect(_.master) currentindex <- inspect(_.index) _ <- lift(SnapshotTaken(m.snapshot(currentindex))) yield () else unit yield () withsnapshot.runAll(instance) match case (snapshot, Right(instance, _)) => ZIO.succeed(snapshot, instance) case (_, Left(cause)) => ZIO.fail(cause) extension (program: LoggedProgram[Unit]) def runFactsEither: (Values, Either[zio.Cause[Error], Instance]) = program.runAll(Instance.empty) match case (facts, Right(instance, _)) => (facts, Right(instance)) case (_, Left(cause)) => (Chunk.empty, Left(cause.toCause)) def runFacts(tags: Tags): ZIO[Any, Any, (RawValues, Instance)] = runFactsEither match case (facts, Right(instance)) => for raws <- facts.mapZIO(transport.toRawValue(_, tags)) (s, instance) <- snapshot(raws, instance) withsnapshot <- if s.size == 1 then transport.toRawValue(s(0), tags).flatMap(s => ZIO.succeed(raws :+ s)) else ZIO.succeed(raws) yield (withsnapshot, instance) case (_, Left(cause)) => ZIO.fail(cause) final given instanceTransition: Transition = new Transition final class Transition: def apply(event: Event, latest: Int): Program[Unit] = import Entity.EntityHolder for previous <- inspect(_.master.latest) _ <- event match case SnapshotTaken(_) => unit case Created(_) | _ => assertThat(latest - previous == 1, EventNotInSequence(s"expected ${previous + 1}, but found ${latest}}, event ${event}")) _ <- event match case Nested(Nested(Nested(nested))) => update(i => val iter = nested match case Deleted => i.iteration case Frozen(index: Int) => i.revision.entities(index - 1).freeze case AddedProperty(property) => i.iteration.add(property) case RemovedProperty(property) => i.iteration.remove(property) case NewEntity(ie: EntityHolder) => i.iteration.copy(index = ie.index, workable = ie.workable) val (r, iternew) = nested match case NewEntity(_) => (i.revision.add(iter), iter) case Deleted => val r = i.revision.remove(iter) (r, r.entities.last.unfreeze) case _ => (i.revision.update(iter), iter) val v = i.version.update(r) val m = i.master.update(v) i.copy(m, v, r, iternew) ) case Nested(Nested(nested)) => update(i => val r = nested match case Deleted => i.revision case Frozen(index: Int) => i.version.entities(index - 1).freeze case AddedProperty(property) => i.revision.add(property) case RemovedProperty(property) => i.revision.remove(property) case NewEntity(re: EntityHolder) => val ie = re.entitystack.last val iter = i.iteration.copy(index = ie.index, workable = ie.workable) i.revision.copy(index = re.index, workable = re.workable, entities = Vector(iter)) val (v, rnew) = nested match case NewEntity(_) => (i.version.add(r), r) case Deleted => val v = i.version.remove(r) (v, v.entities.last.unfreeze) case _ => (i.version.update(r), r) val m = i.master.update(v) nested match case NewEntity(_) => val iter = r.entities.last i.copy(m, v, r, iter) case _ => i.copy(m, v, rnew) ) case Nested(nested) => update(i => val v = nested match case Deleted => i.version case Frozen(index: Int) => i.master.entities(index - 1).freeze case AddedProperty(property) => i.version.add(property) case RemovedProperty(property) => i.version.remove(property) case NewEntity(ve: EntityHolder) => val ie = ve.entitystack.head val re = ve.entitystack.last val iter = i.iteration.copy(index = ie.index, workable = ie.workable) val r = i.revision.copy(index = re.index, workable = re.workable, entities = Vector(iter)) i.version.copy(index = ve.index, workable = ve.workable, entities = Vector(r)) val (m, vnew) = nested match case NewEntity(_) => (i.master.add(v), v) case Deleted => val m = i.master.remove(v) (m, m.entities.last.unfreeze) case _ => (i.master.update(v), v) nested match case NewEntity(_) => val r = v.entities.last val iter = r.entities.last i.copy(m, v, r, iter) case _ => i.copy(m, vnew) ) case Created(m: Master) => update(i => val (iv, ir, iiter) = m.currentindex val v = m.entities(iv - 1) val r = v.entities(ir - 1) val iter = r.entities(iiter - 1) i.copy(m, v, r, iter) ) case SnapshotTaken(m: Master) => update(i => val (iv, ir, iiter) = m.currentindex val v = m.entities(iv - 1) val r = v.entities(ir - 1) val iter = r.entities(iiter - 1) i.copy(m, v, r, iter) ) case Unfrozen((iv, ir, iiter)) => update(i => val m = i.master.unfreeze val v = m.entities(iv - 1).unfreeze val r = v.entities(ir - 1).unfreeze val iter = r.entities(iiter - 1).unfreeze i.copy(m, v, r, iter) ) case Deleted => update(i => i.copy(i.master.delete, Version.empty, Revision.empty, Iteration.empty)) case AddedProperty(property) => update(i => i.copy(i.master.add(property))) case RemovedProperty(property) => update(i => i.copy(i.master.remove(property))) case _ => raiseError(UnhandledEvent(event.toString)) yield () def deleteMaster = change(_.master, Deleted) def deleteVersion = delete(_.master) def deleteRevision = delete(_.version) def deleteIteration = delete(_.revision) def addPropertyMaster(property: Property) = change(_.master, AddedProperty(property)) def addPropertyVersion(property: Property) = change(_.version, AddedProperty(property)) def addPropertyRevision(property: Property) = change(_.revision, AddedProperty(property)) def addPropertyIteration(property: Property) = change(_.iteration, AddedProperty(property)) def removePropertyMaster(property: String) = change(_.master, RemovedProperty(property)) def removePropertyVersion(property: String) = change(_.version, RemovedProperty(property)) def removePropertyRevision(property: String) = change(_.revision, RemovedProperty(property)) def removePropertyIteration(property: String) = change(_.iteration, RemovedProperty(property)) def createMaster(uuid: Uuid) = val iter = Iteration(Properties.empty, true, 1) val r = Revision(Properties.empty, true, 1, Vector(iter)) val v = Version(Properties.empty, true, 1, Vector(r)) val m = Master(uuid, 1, Properties.empty, false, true, Vector(v), (1, 1, 1)) lift(Created(m)) def newVersion = for i <- get iter = Entity(index = 1) r = Entity(index = 1) v = Entity(index = i.master.nextIndex, Vector(iter, r)) _ <- change(_.iteration, Frozen(i.iteration)) _ <- change(_.revision, Frozen(i.revision)) _ <- change(_.version, Frozen(i.version)) _ <- lift(Nested(NewEntity(v))) yield () def newRevision = for i <- get iter = Entity(index = 1) r = Entity(index = i.version.nextIndex, Vector(iter)) _ <- change(_.iteration, Frozen(i.iteration)) _ <- change(_.revision, Frozen(i.revision)) _ <- lift(Nested(Nested(NewEntity(r)))) yield () def newIteration = for i <- get iter = Entity(index = i.revision.nextIndex) _ <- change(_.iteration, Frozen(i.iteration)) _ <- lift(Nested(Nested(Nested(NewEntity(iter))))) yield () def selectIndex(index: (Int, Int, Int)) = def check[A <: Entity[A]](i: Int, entities: Vector[A], m: Master): Program[A] = for _ <- assertThat( 0 < i && i <= entities.size, InvalidInput(s"index out of range $index, $i not in [1, ${entities.size}], aggregateroot: $m") ) yield entities(i - 1) val (iv, ir, ii) = index for m <- inspect(_.master) v <- check(iv, m.entities, m) r <- check(ir, v.entities, m) iter <- check(ii, r.entities, m) _ <- lift(Unfrozen(index)) yield () def selectLatest = for m <- inspect(_.master) v = m.entities.last r = v.entities.last iter = r.entities.last _ <- selectIndex((v.index, r.index, iter.index)) yield () private def delete(f: Instance => Aggregate[?, ?]) = for a <- inspect(f) _ <- assertThat(a.workable, ValidationError.NotWorkable(s"Cannot delete entity in $a")) i <- get _ <- assertThat(a.entities.size > 1, ValidationError.InvalidState(s"Cannot delete last entity ${i.index} for $a")) _ <- lift(a match case _: Revision => Nested(Nested(Nested(Deleted))) case _: Version => Nested(Nested(Deleted)) case _: Master => Nested(Deleted) ) yield () private def change(f: Instance => Workable[?], event: Event) = for w <- inspect(f) _ <- assertThat(w.workable, ValidationError.NotWorkable(s"$w event $event")) _ <- lift(w match case _: Iteration => Nested(Nested(Nested(event))) case _: Revision => Nested(Nested(event)) case _: Version => Nested(event) case _: Master => event ) yield () end Syntax object Syntax: val layer = ZLayer.fromZIO(makeLayer) private def makeLayer = for transport <- ZIO.service[Transport] result = Syntax(transport) yield result