const {Readable, Transform} = require('stream'); // SplitArrayStream can be used two ways: // // - An intermediary stream that receives arrays and emits its items // - A source stream that takes a function as an argument, that is expected to // return an array each time it is called // // In both cases, an array is only asked for when the destination stream is // ready. class SplitArrayStream extends Transform { constructor(getArrayFn) { super({objectMode: true}) if (typeof getArrayFn === 'function') { this.getArrayFn = getArrayFn this._read = this._readFromFn.bind(this) } } async _readFromFn() { let consumerStreamReady = true const array = await this.getArrayFn() while (consumerStreamReady && array.length > 0) { consumerStreamReady = this.push(array.shift()) } if (consumerStreamReady && array.length > 0) { setImmediate(() => this._read(array)) } } _transform(array, enc, next) { let consumerStreamReady = true while (consumerStreamReady && array.length > 0) { consumerStreamReady = this.push(array.shift()) } if (consumerStreamReady && array.length === 0) { next() } else { setImmediate(() => this._transform(array, enc, next)) } } } // For testing purposes, only returns 5 pages of results // (2 buckets per page, 10 buckets total) let numPagesReturned = 0 async function makeGoogleApiRequest() { const simulatedApiResponse = { kind: 'storage#listBuckets', items: [ { kind: 'storage#bucket', id: `bucket-a-from-page-${numPagesReturned + 1}`, }, { kind: 'storage#bucket', id: `bucket-b-from-page-${numPagesReturned + 1}`, } ], nextPageToken: '...', } if (numPagesReturned > 3) { delete simulatedApiResponse.nextPageToken } numPagesReturned++ return Promise.resolve(simulatedApiResponse) } // Simulated "storage.getBuckets()" method async function getBuckets(options) { const reqOpts = { method: 'GET', uri: 'https://www.googleapis.com/storage/v1/buckets/list', } if (options.pageToken) { reqOpts.qs = {pageToken: options.pageToken} } const apiResponse = await makeGoogleApiRequest(reqOpts) const simulatedBucketObjects = apiResponse.items.map(bucket => { return { name: bucket.id, metadata: bucket, } }) return Promise.resolve([simulatedBucketObjects, apiResponse]) } function getBucketsStreamOptionA() { let nextPageToken const readStream = new Readable({ objectMode: true, read: async function() { const options = {autoPaginate: false} if (nextPageToken) { options.pageToken = nextPageToken } const [buckets, apiResponse] = await getBuckets(options) nextPageToken = apiResponse.nextPageToken this.push(buckets) if (!nextPageToken) { this.push(null) } } }) return readStream.pipe(new SplitArrayStream()) } function getBucketsStreamOptionB() { let nextPageToken const getBucketsAsStream = async () => { const options = {autoPaginate: false} if (nextPageToken) { options.pageToken = nextPageToken } const [buckets, apiResponse] = await getBuckets(options) nextPageToken = apiResponse.nextPageToken if (!nextPageToken) { buckets.push(null) } return Promise.resolve(buckets) } return new SplitArrayStream(getBucketsAsStream) } getBucketsStreamOptionA() .on('data', bucket => { console.log('Bucket name:', bucket.name) }) getBucketsStreamOptionB() .on('data', bucket => { console.log('Bucket name:', bucket.name) })