Last active
July 28, 2024 15:33
-
-
Save VldMrgnn/0a4b3082e33a48fbee27b6d057866bca to your computer and use it in GitHub Desktop.
Revisions
-
VldMrgnn revised this gist
Jul 28, 2024 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -91,7 +91,7 @@ function* setStoreParams({ email: string; }) { const stopSync = yield* call(pxSync(email, perseed)); console.log("stopSync >> SHOULD BE THE TEXT OF THE FUNCTION:", stopSync); // to really stop the sync we need to call the stopSync function } -
VldMrgnn revised this gist
Jul 28, 2024 . 2 changed files with 149 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,31 @@ // we could store on filesystem as well... export async function setPersisted(req: Request) { const group = Object.values(req.session.groups || {}).find( (x) => x.PERSEED === req.session.INSIDER ); if (!group) { console.log("setPersisted", "no group found"); return {}; } // const { data } = req.body; const data = req.body; if (process.env.NODE_ENV === "development") { const sizeInMB = data.length / 1_048_576; console.log("[dev.] sizeInMB", sizeInMB); } const key = `${rkey.INSIDER_STATE}:${req.session.email}`; // Store buffer in Redis directly as binary data rasydb0.hset(key, ["persist", data]); return {}; // We don't use the return value } export async function getPersisted(req: Request) { const key = `${rkey.INSIDER_STATE}:${req.session.email}`; const base64Data = (await rasydb0.hget(key, "persist")) as string; return base64Data; // (!) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,118 @@ /// IN PROGRESS. BUT PROVEN IT IS A VIABLE SOLUTION import { each, ensure, main, on, resource, run } from 'effection'; import { gzip } from 'fflate'; import { call, createApi, createThunks, keepAlive, mdw, parallel, put, sleep, spawn, take, takeLeading } from 'starfx'; import { IndexeddbPersistence } from 'y-indexeddb'; import { WebsocketProvider } from 'y-websocket'; import * as Y from 'yjs'; import { service } from '@app/service'; import { PERSISTOR_NAME } from '@app/state/constants'; import { isErr, isOk } from '../helpers'; import { parseMessageEvent } from './helpers'; const dev = process.env.NODE_ENV === "development"; const serverUrl = process.env.NODE_ENV === "production" ? process.env.REACT_APP_SERVERURL_PROD || "" : process.env.REACT_APP_SERVERURL_DEV || ""; const wsUrl = serverUrl.replace("http", "ws"); const pxMap = new Map<string, any>(); pxMap.set("hydrate", true); pxMap.set("email", ""); pxMap.set("perseed", ""); // // --------------- // this works somehow but not in production yet. let pyDoc = new Y.Doc(); let provider: IndexeddbPersistence; let wsProvider: WebsocketProvider; const pxSync = (email, perseed) => { return resource(function* (provide) { if (!email) { return; } /// some parameters mitigation pyDoc = new Y.Doc(); provider = new IndexeddbPersistence(`${PERSISTOR_NAME}.sync`, pyDoc); wsProvider = new WebsocketProvider(wsUrl, "starfx-persist", pyDoc, { params: { email, perseed, }, }); wsProvider.connect(); function* stopSync() { console.log("stopSync"); yield* call(() => wsProvider.disconnect()); yield* call(() => provider.destroy()); } wsProvider.on("status", (event) => { console.log("WSPROVIDER", event.status); // logs "connected" or "disconnected" }); pyDoc.on("afterTransaction", async (transaction, remote) => { const once = pxMap.get("hydrate"); console.log("hydrate", once); if (once) { const persisted = remote.getMap("persistor"); const data = persisted.get("data"); if (data) { //guardian :: compare the current login/perseed with the persisted one. // some logic here self.postMessage({ type: "/rehydrate", payload: JSON.stringify(statePersisted), }); pxMap.set("hydrate", false); } return; } }); yield* provide(stopSync); }); }; // todo: with logout operation at the end, after sync const shardPersist = pxThunks.create<ArrayBuffer>( "/shardPersist/yjs", { supervisor: takeLeading }, function* (ctx, next) { const jsonString = new TextDecoder().decode(ctx.payload); const fullDataset = JSON.parse(jsonString); const filtered = filterStore(fullDataset); const persistor = pyDoc.getMap("persistor"); persistor.set("data", filtered); // yield* call(() => provider.whenSynced); yield* next(); } ); const getWsPersist = pxThunks.create( "/persist/first", { supervisor: takeLeading }, function* (ctx, next) { pxMap.set("hydrate", true); // yield* call(() => provider.whenSynced); yield* next(); } ); -
VldMrgnn revised this gist
Jul 28, 2024 . 1 changed file with 59 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,59 @@ // px.worker-factory.ts interface IWorkerOne { hello?: (name: string) => Promise<string>; postMessage?: (data: any) => void; onmessage?: (data: any) => void | null; } let workerInstance = null; export const startPxWorker = () => { if (!workerInstance) { console.log("px.worker INSTANCE STARTING startWorker"); workerInstance = new Worker(new URL("./px.worker.ts", import.meta.url)); } return workerInstance; }; export const terminatePxWorker = () => { if (workerInstance) { workerInstance.terminate(); workerInstance = null; } }; export const getPxWorker = () => { return workerInstance; }; // index.ts export const initPxWorker = (userseed: string) => { return resource(function* (provide) { const pxWorker = startPxWorker(); pxWorker.postMessage(`idb@${userseed}`); pxWorker.onmessage = (e) => { switch (e.data.type) { case "hello": console.log("px.worker: hello", e.data); break; case "pong": console.log("px.worker: pong", e.data); break; case "/rehydrate": { fxDispatch(redisRehydrate(e.data.payload)); break; } default: { const { type, payload, json } = e.data; fxDispatch({ type, payload: json ?? payload }); if (process.env.NODE_ENV === "development") { console.log("[dev]. px.worker: default action", e.data); } break; } } }; yield* provide(pxWorker); }); }; -
VldMrgnn created this gist
Jul 28, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,143 @@ import { openDB } from "idb"; import storage from "localforage"; import { AnyState, call, Err, Ok, Operation, PersistAdapter, Result, select as starfxSelect, sleep, } from "starfx"; import { PERSISTOR_NAME } from "@app/state/constants"; import { schema } from "@app/statefx/schema"; import { processSkipForwardPersistor } from "@app/statefx/selectors/applog.sel"; import { getPxWorker } from "@app/statefx/workers/pxworker-factory"; import { allowlist, AppState } from "@app/statefx/xstore"; import { debounceThx } from "@app/statefx/xThunks/ui-persist"; const CONST_PERSISTOR_VERSION = 9; export function* forwardPersitence( key: string = null, jsonState: string = null ) { if (jsonState === null) { const s_ = yield* starfxSelect((a) => a); const s = {}; for (const k of allowlist) { s[k] = s_[k]; } jsonState = JSON.stringify(s); } const who = yield* starfxSelect(schema.whoami.select); if (who.wai_database) { const pxWorker = getPxWorker(); if (!pxWorker) { console.log("no pxWorker"); } if (pxWorker) { // custom debounce middleware const canPost = yield* call(() => debounceThx.run()); if (canPost.key === "out") { return Ok(undefined); } // includes the lock and the collection active const shouldSkip = yield* starfxSelect(processSkipForwardPersistor); // create the waiting mechanism but exit if the lock is on if (shouldSkip) { return Ok(undefined); } if (canPost.key === "in") { // efficient way to send the data // Encode the JSON string to an ArrayBuffer const buffer = new TextEncoder().encode(jsonState).buffer; console.log(buffer.byteLength, "before"); // 4MB pxWorker.postMessage( { type: "/runner/arrayBuffer", data: buffer, }, [buffer] ); console.log(buffer.byteLength, "after"); //0 bytes } } } } export const openDbfn = (pName = PERSISTOR_NAME) => { if (!pName) throw new Error("pName is required"); return openDB(pName, CONST_PERSISTOR_VERSION, { upgrade(db) { if (!db.objectStoreNames.contains("persist")) { db.createObjectStore("persist"); } // ui if (!db.objectStoreNames.contains("ui")) { db.createObjectStore("ui"); } // workermain if (!db.objectStoreNames.contains("workermain")) { db.createObjectStore("workermain"); } }, }); }; export function createIDBStorageAdapter<S extends AnyState>( pName: string ): PersistAdapter<S> { storage.config({ driver: storage.INDEXEDDB, name: pName, version: CONST_PERSISTOR_VERSION, storeName: "persist", }); return { getItem: function* get(key: string): Operation<Result<S>> { try { const ostorage = yield* call(storage.getItem(key)) as Operation<string>; const returnvalue = JSON.parse(ostorage || "{}"); return Ok(returnvalue); } catch (error) { console.error("Error getting item from storage:", error); return Err(error); } }, setItem: function* set(key: string, s: Partial<S>) { const state = JSON.stringify(s); try { yield* call(storage.setItem(key, state)); // forward a slice of the state to persist in redis. yield* forwardPersitence(key, state); // maybe is too much .... return Ok(undefined); } catch (error) { console.error("Error setting item to storage:", error, key, state); return Err(error); } }, removeItem: function* remove(key: string) { try { yield* call(storage.removeItem(key)); return Ok(undefined); } catch (error) { console.error("Error removing item from storage:", error); return Err(error); } }, }; } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,270 @@ import { each, ensure, main, on, resource, run } from "effection"; import { gzip } from "fflate"; import { call, createApi, createThunks, keepAlive, mdw, parallel, put, sleep, spawn, take, takeLeading, } from "starfx"; import { IndexeddbPersistence } from "y-indexeddb"; import { WebsocketProvider } from "y-websocket"; import * as Y from "yjs"; import { service } from "@app/service"; import { PERSISTOR_NAME } from "@app/state/constants"; import { isErr, isOk } from "../helpers"; import { parseMessageEvent } from "./helpers"; const dev = process.env.NODE_ENV === "development"; const serverUrl = process.env.NODE_ENV === "production" ? process.env.REACT_APP_SERVERURL_PROD || "" : process.env.REACT_APP_SERVERURL_DEV || ""; const wsUrl = serverUrl.replace("http", "ws"); const pxMap = new Map<string, any>(); pxMap.set("hydrate", true); pxMap.set("email", ""); pxMap.set("perseed", ""); // import type { ApiCtx, ThunkCtx, Next } from "starfx"; let storeName; // [findme#320] type TWorkerParams = { key: string; url: URL; req: Request; data: any; body?: any; isLast?: boolean; meta?: { halt: boolean; }; }; // gas const persitInRedis = [ //.. ui stuff ]; //ice = where shid0 is null //liquid = where shid0 is not null const tables = [ "currencyRates", //'.... ]; const shid0s = []; // ["currencyRates", "invoiceFiscal", "SPVInvoiceRaw", "SPVMessages", "SPVMessageKeys", "SPVMessageUserState"]; /* -------------------------------------------------------------------------- */ const setStoreName = (name: string) => { if (!name) { return; } // first we have no email. // the worker starts on saga-query store bootup storeName = name; run(function* () { yield* call(() => getWsPersist.run()); }); }; const getStoreName = () => { return storeName; }; function* setStoreParams({ perseed, email, }: { perseed: string; email: string; }) { const stopSync = yield* call(pxSync(email, perseed)); console.log("stopSync >> SHOULD BE THE TEXT OF TEH FUNCTION:", stopSync); // to really stop the sync we need to call the stopSync function } const pxApi = createApi<ApiCtx>(); pxApi.use(mdw.err); pxApi.use(mdw.queryCtx); pxApi.use(mdw.nameParser); pxApi.use(pxApi.routes()); pxApi.use(mdw.fetch({ baseUrl: service })); export const pxThunks = createThunks<ThunkCtx>(); pxThunks.use(mdw.err); pxThunks.use(pxThunks.routes()); const filterStore = (s) => { const subset = {}; for (const key of persitInRedis) { if (key in s) { subset[key] = s[key]; } } // latest data: maybe is too much for (const key of shid0s) { if (key in s) { subset[key] = Object.fromEntries( Object.entries(s[key]).filter(([k, v]: [string, any]) => !!v?.shid0) ); } } // the full dataset. idk how o optimize for (const key of tables) { if (key in s) { subset[key] = s[key]; } } return subset; }; /* -------------------------------------------------------------------------- v1. hand made /* -------------------------------------------------------------------------- */ function compressData(data: unknown): Promise<ArrayBuffer> { // Convert the object to a JSON string const jsonString = JSON.stringify(data); return new Promise((resolve, reject) => { // Gzip compress the JSON string gzip(new TextEncoder().encode(jsonString), (err, compressed) => { if (err) { reject(err); } else { // Resolve with the buffer part of the Uint8Array resolve( compressed.buffer.slice( compressed.byteOffset, compressed.byteOffset + compressed.byteLength ) ); } }); }); } const getPersist = pxApi.get( "/persist", { supervisor: takeLeading }, function* (ctx: ApiCtx, next: Next) { yield* next(); if (isErr(ctx.json)) { return; } if (isOk(ctx.json)) { if (ctx?.json?.value) { /// can we transfer the data to the main thread? self.postMessage({ type: "/rehydrate", payload: ctx.json.value }); } } } ); const postPersist = pxApi.post<ArrayBuffer>( "/persist", { supervisor: takeLeading }, function* (ctx, next) { const jsonString = new TextDecoder().decode(ctx.payload); const data = JSON.parse(jsonString); const relevantData = filterStore(data); const compressed = yield* call(() => compressData(relevantData)); // inspect the size of the compressed data in MB // console.log( // "compressed.byteLength", // compressed.byteLength / 1024 / 1024, // "MB" // ); ctx.request = ctx.req({ method: "POST", url: `/persist`, headers: { "Content-Type": "application/octet-stream", "Content-Encoding": "gzip", credentials: "include", }, body: compressed, }); yield* next(); } ); function* worker() { const dispatcher = function* () { while (true) { const action = yield* take("*"); const { type: actionType, payload } = action as { type: string; payload: any; }; switch (actionType) { default: console.log("did you forget this action: ", actionType, "?"); if (actionType.startsWith("/nop/")) { break; } break; } } }; yield* spawn(() => run(function* () { const group = yield* parallel([ dispatcher, pxApi.bootup, pxThunks.bootup, ]); yield* group; }) ); // listener // for (const event of yield* each(on(self, "message"))) { //the fastest track // if (event.data?.type === "/runner/arrayBuffer") { // yield* spawn(() => postPersist.run(event.data.data)); yield* spawn(() => shardPersist.run(event.data.data)); yield* each.next(); continue; } // on login, set the email and perseed if (event.data?.type === "/upgrade") { try { const { email, perseed } = JSON.parse(event.data.data); yield* spawn(() => setStoreParams({ perseed, email })); } finally { yield* each.next(); } continue; } const paramResult = yield* parseMessageEvent( event, setStoreName, getStoreName ); if (isErr(paramResult)) { yield* each.next(); continue; } const { url } = paramResult.value; yield* put({ type: url.pathname, payload: paramResult.value }); yield* each.next(); } } keepAlive([main(worker)]); This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,35 @@ //.... const allowlist = [...] as (keyof AppState)[]; export const persistor = createPersistor<AppState>({ //m. main adapter: createIDBStorageAdapter<AppState>(PERSISTOR_NAME), allowlist: allowlist, }); export const store = createStore({ initialState: initialState, middleware: [persistStoreMdw(persistor)], }); window["fx"] = store; //important // export const runFxState = () => { devtoolsEnabled && setupDevTool({}, { name: "starfx", enabled: true }); store.run(function* () { yield* persistor.rehydrate(); yield* schema.update(schema.loaders.success({ id: PERSIST_LOADER_ID })); // yield* spawn(() => initWorker()); const group = yield* parallel([ ...tasks, xApi.bootup, xThunk.bootup, xWorker.bootup, xouterApi.bootup, xDbc.bootup, ]); yield* group; }); return store; };