Skip to content

Instantly share code, notes, and snippets.

@CarsonMcKinstry
Last active October 7, 2019 17:53
Show Gist options
  • Select an option

  • Save CarsonMcKinstry/fb1d8052c1ade30efb3f18721f21e6a4 to your computer and use it in GitHub Desktop.

Select an option

Save CarsonMcKinstry/fb1d8052c1ade30efb3f18721f21e6a4 to your computer and use it in GitHub Desktop.
import { Observable, merge, interval } from "rxjs";
import { map, tap, pluck } from "rxjs/operators";
type KV = Record<any, any>;
interface Cacheable<K, V> {
delete: () => void;
get: (key: K) => V | undefined;
has: (key: K) => boolean;
set: (key: K, value: V) => this;
readonly size: number;
}
function join(
left: Observable<KV>,
right: Observable<KV>,
makeCache?: () => Cacheable<any, any>
) {
return new Observable(observer => {
const cache = makeCache ? makeCache() : new Map<any, any>();
merge(
left.pipe(map(i => ({ observable: i, cacheKey: "left" }))),
right.pipe(map(i => ({ observable: i, cacheKey: "right" })))
)
.pipe(
map(obj => {
const [[key, value]] = Object.entries(obj.observable);
return {
key,
value,
cacheKey: obj.cacheKey
};
}),
tap(({ key, value, cacheKey }) => {
const current = cache.get(key);
if (current) {
cache.set(key, {
...current,
[cacheKey]: value
});
} else {
cache.set(key, {
[cacheKey]: value
});
}
}),
pluck("key")
)
.subscribe(key => {
const current = cache.get(key);
if (current && current.left && current.right) {
observer.next({
key,
data: current
});
}
});
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment