Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save guizmaii/9df958c2010b4dc593b85c3c11bdeaa2 to your computer and use it in GitHub Desktop.

Select an option

Save guizmaii/9df958c2010b4dc593b85c3c11bdeaa2 to your computer and use it in GitHub Desktop.

Revisions

  1. @guidoschmidt17 guidoschmidt17 revised this gist Feb 23, 2023. 1 changed file with 15 additions and 0 deletions.
    15 changes: 15 additions & 0 deletions example_zpure_eventsourcing_1.scala
    Original file line number Diff line number Diff line change
    @@ -409,4 +409,19 @@ def test(uuid: Uuid): LoggedProgram[Unit] =
    _ <- createMaster(uuid)
    _ <- modify.repeatN(1999)
    yield ()

    // using Transition to recreate the AggregateRoot

    import syntax.*

    def readById(uuid: Uuid) =
    ZIO.scoped(eventstore.readFactsByAggregateRootId(uuid).flatMap(aggregate(_)))

    def aggregate(
    facts: RawValueStream
    )(using transition: Transition): ZIO[Any, StreamingError, ZPure[Nothing, Instance, Instance, Any, Error, Unit]] =
    facts
    .mapZIO(transport.fromRawValue(_))
    .runFold(setWithError(Instance.empty))((s, value) => s.flatMap(_ => transition(value.event, value.identified.latest)))


  2. @guidoschmidt17 guidoschmidt17 revised this gist Feb 23, 2023. 1 changed file with 69 additions and 0 deletions.
    69 changes: 69 additions & 0 deletions example_zpure_eventsourcing_1.scala
    Original file line number Diff line number Diff line change
    @@ -341,3 +341,72 @@ object Syntax:
    transport <- ZIO.service[Transport]
    result = Syntax(transport)
    yield result

    // example usage of domain "mastermanagement"

    case class Person(lastname: String, firstname: String, age: Int)
    given Codec[Person] = deriveCodec[Person]

    case class Matrix(
    m1: Double,
    m2: Double,
    m3: Double,
    m4: Double,
    m5: Double,
    m6: Double,
    m7: Double,
    m8: Double,
    m9: Double,
    m10: Double,
    m11: Double,
    m12: Double
    )
    given Codec[Matrix] = deriveCodec[Matrix]

    object Matrix:
    final val identity = Matrix(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0)

    import Properties.*
    import Properties.given

    def test(uuid: Uuid): LoggedProgram[Unit] =
    val largebytes = util.Random.alphanumeric.take(200).toList.toString.nn
    val modify =
    for
    _ <- addPropertyMaster("name" -> StringElem("this is my name."))
    _ <- addPropertyMaster("description" -> StringElem("this is my description."))
    _ <- addPropertyVersion("creator" -> StringElem("Bob"))
    _ <- addPropertyRevision("size" -> IntElem(4711))
    _ <- addPropertyIteration("pi" -> DoubleElem(3.14159))
    _ <- addPropertyIteration("somebytes" -> ByteArrayElem("blablabla".getBytes.nn))
    _ <- addPropertyIteration("largebytes" -> StringElem(largebytes))
    _ <- addPropertyIteration("happy" -> BooleanElem(true))
    _ <- addPropertyIteration("noidea" -> NullElem)
    _ <- addPropertyIteration("dontcare" -> UndefinedElem)
    _ <- addPropertyIteration("file" -> AnyElem(File(Uuid.randomUuid, 100, 10, "test.txt")))
    _ <- addPropertyIteration("person" -> AnyElem(Person("Smith", "Joe", 37)))
    _ <- addPropertyIteration("matrix" -> AnyElem(Matrix.identity))
    _ <- newIteration
    _ <- newVersion
    _ <- newRevision
    _ <- newIteration
    _ <- newVersion
    _ <- newIteration
    _ <- removePropertyIteration("pi")
    _ <- addPropertyIteration("e" -> DoubleElem(2.71828))
    _ <- removePropertyMaster("name")
    _ <- deleteIteration
    _ <- selectIndex((2, 1, 1))
    _ <- removePropertyIteration("name")
    _ <- removePropertyIteration("name")
    _ <- newIteration
    _ <- addPropertyIteration("name" -> StringElem("this is my new name."))
    _ <- newVersion
    _ <- selectIndex((1, 1, 2))
    _ <- selectLatest
    yield ()
    for
    _ <- createMaster(uuid)
    _ <- modify.repeatN(1999)
    yield ()

  3. @guidoschmidt17 guidoschmidt17 created this gist Feb 23, 2023.
    343 changes: 343 additions & 0 deletions example_zpure_eventsourcing_1.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,343 @@
    package cqrs
    package eventstore

    import zio.*
    import zio.prelude.*
    import zio.prelude.fx.*

    object BaseSyntax:

    type Program[S, R, E, A] = ZPure[Nothing, S, S, R, E, A]

    type LoggedProgram[W, S, R, E, A] = ZPure[W, S, S, R, E, A]

    def pure[S, R, A](a: A): Program[S, R, Nothing, A] =
    ZPure.succeed(a)

    def unit[S, R]: Program[S, R, Nothing, Unit] =
    ZPure.unit

    def raiseError[S, R, E](t: => E): Program[S, R, E, Nothing] =
    ZPure.fail(t)

    def assertThat[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] =
    if cond then unit else raiseError(t)

    def assertThatNot[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] =
    assertThat(!cond, t)

    def extractOption[S, R, E, A](a: Option[A], t: => E): Program[S, R, E, A] =
    a match
    case Some(value) => pure(value)
    case None => raiseError(t)

    def get[S, R]: Program[S, R, Nothing, S] =
    ZPure.get[S]

    def set[S, R](s: S): Program[S, R, Nothing, Unit] =
    EState.set(s)

    def setWithError[S, R, E](s: S): Program[S, R, E, Unit] =
    EState.set(s)

    def update[S, R](f: S => S): Program[S, R, Nothing, Unit] =
    EState.update(f.apply)

    def inspect[S, R, A](f: S => A): Program[S, R, Nothing, A] =
    get.map(f(_))

    def inquire[S, R: Tag, A](f: R => A): Program[S, R, Nothing, A] =
    ZPure.service[S, R].map(f(_))

    def log[W, S, R, E](w: W): LoggedProgram[W, S, R, E, Unit] =
    ZPure.log(w)

    def set[W, S, R](previouslog: Chunk[W]): LoggedProgram[W, S, R, Nothing, Unit] =
    ZPure.forEach(previouslog)(log(_)).unit

    def restore[W, S, R](previouslog: Chunk[W], previousstate: S): LoggedProgram[W, S, R, Nothing, Unit] =
    set(previouslog) *> set(previousstate)


    package cqrs
    package domain
    package mastermanagement

    import zio.*

    import eventstore.*
    import Error.*
    import Error.ValidationError.*
    import Event.*
    import Fact.*
    import Transport.*

    final class Syntax private (transport: Transport):

    type Program[A] = BaseSyntax.Program[Instance, Any, Error, A]
    type LoggedProgram[A] = BaseSyntax.LoggedProgram[Value, Instance, Any, Error, A]

    val get = BaseSyntax.get[Instance, Any]
    val set = BaseSyntax.set[Instance, Any]
    val update = BaseSyntax.update[Instance, Any]
    def setWithError = BaseSyntax.setWithError[Instance, Any, Error]
    def inspect[A] = BaseSyntax.inspect[Instance, Any, A]

    import BaseSyntax.*

    def lift(event: Event)(using transition: Transition): LoggedProgram[Unit] =
    for
    previous <- inspect(_.master.latest)
    _ <- transition(event, previous + 1)
    m <- inspect(_.master)
    _ <- log(Value(m, event))
    yield ()

    private def snapshot(raws: RawValues, instance: Instance): ZIO[Any, Any, (Values, Instance)] =
    val withsnapshot = for
    _ <- set(instance)
    weight = raws.foldLeft(0)((s, r) =>
    if r.eventCategory == Category.Created && r.aggregateLatest > 1 then s else s + r.eventData.length + MinSnapshotEventSize
    )
    _ = println(s"weight $weight $DefaultSnapshotThreshold")
    _ <-
    if weight > DefaultSnapshotThreshold then
    for
    m <- inspect(_.master)
    currentindex <- inspect(_.index)
    _ <- lift(SnapshotTaken(m.snapshot(currentindex)))
    yield ()
    else unit
    yield ()
    withsnapshot.runAll(instance) match
    case (snapshot, Right(instance, _)) => ZIO.succeed(snapshot, instance)
    case (_, Left(cause)) => ZIO.fail(cause)

    extension (program: LoggedProgram[Unit])
    def runFactsEither: (Values, Either[zio.Cause[Error], Instance]) =
    program.runAll(Instance.empty) match
    case (facts, Right(instance, _)) => (facts, Right(instance))
    case (_, Left(cause)) => (Chunk.empty, Left(cause.toCause))
    def runFacts(tags: Tags): ZIO[Any, Any, (RawValues, Instance)] =
    runFactsEither match
    case (facts, Right(instance)) =>
    for
    raws <- facts.mapZIO(transport.toRawValue(_, tags))
    (s, instance) <- snapshot(raws, instance)
    withsnapshot <- if s.size == 1 then transport.toRawValue(s(0), tags).flatMap(s => ZIO.succeed(raws :+ s)) else ZIO.succeed(raws)
    yield (withsnapshot, instance)
    case (_, Left(cause)) => ZIO.fail(cause)

    final given instanceTransition: Transition = new Transition

    final class Transition:
    def apply(event: Event, latest: Int): Program[Unit] =
    import Entity.EntityHolder
    for
    previous <- inspect(_.master.latest)
    _ <- event match
    case SnapshotTaken(_) => unit
    case Created(_) | _ =>
    assertThat(latest - previous == 1, EventNotInSequence(s"expected ${previous + 1}, but found ${latest}}, event ${event}"))
    _ <- event match
    case Nested(Nested(Nested(nested))) =>
    update(i =>
    val iter = nested match
    case Deleted => i.iteration
    case Frozen(index: Int) => i.revision.entities(index - 1).freeze
    case AddedProperty(property) => i.iteration.add(property)
    case RemovedProperty(property) => i.iteration.remove(property)
    case NewEntity(ie: EntityHolder) =>
    i.iteration.copy(index = ie.index, workable = ie.workable)
    val (r, iternew) = nested match
    case NewEntity(_) => (i.revision.add(iter), iter)
    case Deleted =>
    val r = i.revision.remove(iter)
    (r, r.entities.last.unfreeze)
    case _ => (i.revision.update(iter), iter)
    val v = i.version.update(r)
    val m = i.master.update(v)
    i.copy(m, v, r, iternew)
    )
    case Nested(Nested(nested)) =>
    update(i =>
    val r = nested match
    case Deleted => i.revision
    case Frozen(index: Int) => i.version.entities(index - 1).freeze
    case AddedProperty(property) => i.revision.add(property)
    case RemovedProperty(property) => i.revision.remove(property)
    case NewEntity(re: EntityHolder) =>
    val ie = re.entitystack.last
    val iter = i.iteration.copy(index = ie.index, workable = ie.workable)
    i.revision.copy(index = re.index, workable = re.workable, entities = Vector(iter))
    val (v, rnew) = nested match
    case NewEntity(_) => (i.version.add(r), r)
    case Deleted =>
    val v = i.version.remove(r)
    (v, v.entities.last.unfreeze)
    case _ => (i.version.update(r), r)
    val m = i.master.update(v)
    nested match
    case NewEntity(_) =>
    val iter = r.entities.last
    i.copy(m, v, r, iter)
    case _ => i.copy(m, v, rnew)
    )
    case Nested(nested) =>
    update(i =>
    val v = nested match
    case Deleted => i.version
    case Frozen(index: Int) => i.master.entities(index - 1).freeze
    case AddedProperty(property) => i.version.add(property)
    case RemovedProperty(property) => i.version.remove(property)
    case NewEntity(ve: EntityHolder) =>
    val ie = ve.entitystack.head
    val re = ve.entitystack.last
    val iter = i.iteration.copy(index = ie.index, workable = ie.workable)
    val r =
    i.revision.copy(index = re.index, workable = re.workable, entities = Vector(iter))
    i.version.copy(index = ve.index, workable = ve.workable, entities = Vector(r))
    val (m, vnew) = nested match
    case NewEntity(_) => (i.master.add(v), v)
    case Deleted =>
    val m = i.master.remove(v)
    (m, m.entities.last.unfreeze)
    case _ => (i.master.update(v), v)
    nested match
    case NewEntity(_) =>
    val r = v.entities.last
    val iter = r.entities.last
    i.copy(m, v, r, iter)
    case _ => i.copy(m, vnew)
    )
    case Created(m: Master) =>
    update(i =>
    val (iv, ir, iiter) = m.currentindex
    val v = m.entities(iv - 1)
    val r = v.entities(ir - 1)
    val iter = r.entities(iiter - 1)
    i.copy(m, v, r, iter)
    )
    case SnapshotTaken(m: Master) =>
    update(i =>
    val (iv, ir, iiter) = m.currentindex
    val v = m.entities(iv - 1)
    val r = v.entities(ir - 1)
    val iter = r.entities(iiter - 1)
    i.copy(m, v, r, iter)
    )
    case Unfrozen((iv, ir, iiter)) =>
    update(i =>
    val m = i.master.unfreeze
    val v = m.entities(iv - 1).unfreeze
    val r = v.entities(ir - 1).unfreeze
    val iter = r.entities(iiter - 1).unfreeze
    i.copy(m, v, r, iter)
    )
    case Deleted => update(i => i.copy(i.master.delete, Version.empty, Revision.empty, Iteration.empty))
    case AddedProperty(property) => update(i => i.copy(i.master.add(property)))
    case RemovedProperty(property) => update(i => i.copy(i.master.remove(property)))
    case _ => raiseError(UnhandledEvent(event.toString))
    yield ()

    def deleteMaster = change(_.master, Deleted)
    def deleteVersion = delete(_.master)
    def deleteRevision = delete(_.version)
    def deleteIteration = delete(_.revision)
    def addPropertyMaster(property: Property) = change(_.master, AddedProperty(property))
    def addPropertyVersion(property: Property) = change(_.version, AddedProperty(property))
    def addPropertyRevision(property: Property) = change(_.revision, AddedProperty(property))
    def addPropertyIteration(property: Property) = change(_.iteration, AddedProperty(property))
    def removePropertyMaster(property: String) = change(_.master, RemovedProperty(property))
    def removePropertyVersion(property: String) = change(_.version, RemovedProperty(property))
    def removePropertyRevision(property: String) = change(_.revision, RemovedProperty(property))
    def removePropertyIteration(property: String) = change(_.iteration, RemovedProperty(property))
    def createMaster(uuid: Uuid) =
    val iter = Iteration(Properties.empty, true, 1)
    val r = Revision(Properties.empty, true, 1, Vector(iter))
    val v = Version(Properties.empty, true, 1, Vector(r))
    val m = Master(uuid, 1, Properties.empty, false, true, Vector(v), (1, 1, 1))
    lift(Created(m))
    def newVersion =
    for
    i <- get
    iter = Entity(index = 1)
    r = Entity(index = 1)
    v = Entity(index = i.master.nextIndex, Vector(iter, r))
    _ <- change(_.iteration, Frozen(i.iteration))
    _ <- change(_.revision, Frozen(i.revision))
    _ <- change(_.version, Frozen(i.version))
    _ <- lift(Nested(NewEntity(v)))
    yield ()
    def newRevision =
    for
    i <- get
    iter = Entity(index = 1)
    r = Entity(index = i.version.nextIndex, Vector(iter))
    _ <- change(_.iteration, Frozen(i.iteration))
    _ <- change(_.revision, Frozen(i.revision))
    _ <- lift(Nested(Nested(NewEntity(r))))
    yield ()
    def newIteration =
    for
    i <- get
    iter = Entity(index = i.revision.nextIndex)
    _ <- change(_.iteration, Frozen(i.iteration))
    _ <- lift(Nested(Nested(Nested(NewEntity(iter)))))
    yield ()
    def selectIndex(index: (Int, Int, Int)) =
    def check[A <: Entity[A]](i: Int, entities: Vector[A], m: Master): Program[A] =
    for _ <- assertThat(
    0 < i && i <= entities.size,
    InvalidInput(s"index out of range $index, $i not in [1, ${entities.size}], aggregateroot: $m")
    )
    yield entities(i - 1)
    val (iv, ir, ii) = index
    for
    m <- inspect(_.master)
    v <- check(iv, m.entities, m)
    r <- check(ir, v.entities, m)
    iter <- check(ii, r.entities, m)
    _ <- lift(Unfrozen(index))
    yield ()
    def selectLatest =
    for
    m <- inspect(_.master)
    v = m.entities.last
    r = v.entities.last
    iter = r.entities.last
    _ <- selectIndex((v.index, r.index, iter.index))
    yield ()
    private def delete(f: Instance => Aggregate[?, ?]) =
    for
    a <- inspect(f)
    _ <- assertThat(a.workable, ValidationError.NotWorkable(s"Cannot delete entity in $a"))
    i <- get
    _ <- assertThat(a.entities.size > 1, ValidationError.InvalidState(s"Cannot delete last entity ${i.index} for $a"))
    _ <- lift(a match
    case _: Revision => Nested(Nested(Nested(Deleted)))
    case _: Version => Nested(Nested(Deleted))
    case _: Master => Nested(Deleted)
    )
    yield ()
    private def change(f: Instance => Workable[?], event: Event) =
    for
    w <- inspect(f)
    _ <- assertThat(w.workable, ValidationError.NotWorkable(s"$w event $event"))
    _ <- lift(w match
    case _: Iteration => Nested(Nested(Nested(event)))
    case _: Revision => Nested(Nested(event))
    case _: Version => Nested(event)
    case _: Master => event
    )
    yield ()

    end Syntax

    object Syntax:
    val layer = ZLayer.fromZIO(makeLayer)

    private def makeLayer = for
    transport <- ZIO.service[Transport]
    result = Syntax(transport)
    yield result