/* Start searches on each DB Start pulling values from each iterator in a "puller" "pullers" keep track of the latest state of an iterator, and enable pulling more data out While there is still new data: Find the puller with the latest piece of data Get it's value Pull the next item in that iterator Yield the found value This will sip just the latest values from each iterator until all iterators are settled Use: for await (const item of search(dbs, someQueryHere, 32)) { // Will pull the 32 latest items accross all queries and close the iterators once it has enough console.log(item) } */ async function * search(dbs, query, limit) { const pullers = await Promise.all(dbs // Get iterators for the search results from each db .map((db) => db.search(query).sort('timestamp', -1)[Symbol.asyncIterator]())) // Convert the iterators to pullers .map((iterator) => makePuller(iterator)) ) try { let count = 1 let latest = await getLatest(null) // No results in any DBs found if (latest === null) return while(latest !== null) { if(count >= limit) return count++ latest = await getLatest(latest) if(latest === null) return yield latest } } finally { // Make sure all the iterators get closed for(let puller in pullers) puller.close() } async function getLatest(previousValue) { let found = null let value = previousValue for(const puller of pullers) { if(puller.done) continue if(isNewer(puller.value, value)) { value = puller.value found = puller } } if(!found) return null await found.pull() return value } } function isNewer(time1, time2) { // TODO // Handle point1 being null } async makePuller(iterator) { const state = { value: null, done: false, pull, close } async function pull() { const {done, value} = await iterator.next() state.done = done state.value = value } async function close() { if(state.done) return if(iterator.return) await iterator.return() } return state }