Skip to content

Instantly share code, notes, and snippets.

@jasonkuhrt
Last active November 12, 2017 03:50
Show Gist options
  • Select an option

  • Save jasonkuhrt/571bc70980a03eeb88751477a812c77f to your computer and use it in GitHub Desktop.

Select an option

Save jasonkuhrt/571bc70980a03eeb88751477a812c77f to your computer and use it in GitHub Desktop.

Revisions

  1. jasonkuhrt revised this gist Nov 12, 2017. 2 changed files with 15 additions and 0 deletions.
    File renamed without changes.
    15 changes: 15 additions & 0 deletions output
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,15 @@
    tap: 0
    tap: 10
    tap: 20
    tap: 30
    tap: 40
    tap: 50
    tap: 60
    tap: 70
    tap: 80
    tap: 90
    tap: 100
    tap: 110
    tap: 120
    tap: 130
    tap: 140
  2. jasonkuhrt created this gist Nov 12, 2017.
    49 changes: 49 additions & 0 deletions untitled
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,49 @@
    function bufferUntil(stream, promise) {
    return {
    run: (sink, scheduler) => {
    let didResolve = false
    const buffer = []

    stream.source.run(
    {
    event: (t, x) => {
    if (didResolve) {
    sink.event(t, x)
    } else {
    buffer.push([t, x])
    }
    },
    error: sink.error.bind(sink),
    },
    scheduler,
    )

    promise
    .then(() => {
    setImmediate(() => {
    didResolve = true
    buffer.forEach(([t, x]) => sink.event(t, x))
    buffer.length = 0
    }, 0)
    })
    .catch(e => {
    sink.error(e)
    })
    },
    }
    }

    Most.Stream.prototype.bufferUntil = function(promise) {
    return new Most.Stream(bufferUntil(this, promise))
    }

    const promiseOfFunction = Promise.resolve(x => x * 10)

    Most.fromPromise(promiseOfFunction)
    .ap(
    Most.periodic(100)
    .scan((acc, _) => acc + 1, 0)
    .bufferUntil(promiseOfFunction),
    )
    .tap(console.log.bind(null, "tap:"))
    .drain()