Skip to content

Instantly share code, notes, and snippets.

@VldMrgnn
Last active July 28, 2024 15:33
Show Gist options
  • Select an option

  • Save VldMrgnn/0a4b3082e33a48fbee27b6d057866bca to your computer and use it in GitHub Desktop.

Select an option

Save VldMrgnn/0a4b3082e33a48fbee27b6d057866bca to your computer and use it in GitHub Desktop.

Revisions

  1. VldMrgnn revised this gist Jul 28, 2024. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion px.worker__v1.ts
    Original file line number Diff line number Diff line change
    @@ -91,7 +91,7 @@ function* setStoreParams({
    email: string;
    }) {
    const stopSync = yield* call(pxSync(email, perseed));
    console.log("stopSync >> SHOULD BE THE TEXT OF TEH FUNCTION:", stopSync);
    console.log("stopSync >> SHOULD BE THE TEXT OF THE FUNCTION:", stopSync);
    // to really stop the sync we need to call the stopSync function
    }

  2. VldMrgnn revised this gist Jul 28, 2024. 2 changed files with 149 additions and 0 deletions.
    31 changes: 31 additions & 0 deletions backend_express_endpoints.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,31 @@
    // we could store on filesystem as well...

    export async function setPersisted(req: Request) {
    const group = Object.values(req.session.groups || {}).find(
    (x) => x.PERSEED === req.session.INSIDER
    );
    if (!group) {
    console.log("setPersisted", "no group found");
    return {};
    }
    // const { data } = req.body;
    const data = req.body;

    if (process.env.NODE_ENV === "development") {
    const sizeInMB = data.length / 1_048_576;
    console.log("[dev.] sizeInMB", sizeInMB);
    }

    const key = `${rkey.INSIDER_STATE}:${req.session.email}`;

    // Store buffer in Redis directly as binary data
    rasydb0.hset(key, ["persist", data]);
    return {}; // We don't use the return value
    }

    export async function getPersisted(req: Request) {
    const key = `${rkey.INSIDER_STATE}:${req.session.email}`;
    const base64Data = (await rasydb0.hget(key, "persist")) as string;
    return base64Data; // (!)
    }

    118 changes: 118 additions & 0 deletions px.worker__v2_inprogress.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,118 @@

    /// IN PROGRESS. BUT PROVEN IT IS A VIABLE SOLUTION

    import { each, ensure, main, on, resource, run } from 'effection';
    import { gzip } from 'fflate';
    import {
    call, createApi, createThunks, keepAlive, mdw, parallel, put, sleep, spawn, take, takeLeading
    } from 'starfx';
    import { IndexeddbPersistence } from 'y-indexeddb';
    import { WebsocketProvider } from 'y-websocket';
    import * as Y from 'yjs';

    import { service } from '@app/service';
    import { PERSISTOR_NAME } from '@app/state/constants';

    import { isErr, isOk } from '../helpers';
    import { parseMessageEvent } from './helpers';

    const dev = process.env.NODE_ENV === "development";
    const serverUrl =
    process.env.NODE_ENV === "production"
    ? process.env.REACT_APP_SERVERURL_PROD || ""
    : process.env.REACT_APP_SERVERURL_DEV || "";
    const wsUrl = serverUrl.replace("http", "ws");

    const pxMap = new Map<string, any>();
    pxMap.set("hydrate", true);
    pxMap.set("email", "");
    pxMap.set("perseed", "");
    //


    // ---------------
    // this works somehow but not in production yet.

    let pyDoc = new Y.Doc();

    let provider: IndexeddbPersistence;
    let wsProvider: WebsocketProvider;

    const pxSync = (email, perseed) => {
    return resource(function* (provide) {
    if (!email) {
    return;
    }
    /// some parameters mitigation

    pyDoc = new Y.Doc();

    provider = new IndexeddbPersistence(`${PERSISTOR_NAME}.sync`, pyDoc);
    wsProvider = new WebsocketProvider(wsUrl, "starfx-persist", pyDoc, {
    params: {
    email,
    perseed,
    },
    });

    wsProvider.connect();

    function* stopSync() {
    console.log("stopSync");
    yield* call(() => wsProvider.disconnect());
    yield* call(() => provider.destroy());
    }

    wsProvider.on("status", (event) => {
    console.log("WSPROVIDER", event.status); // logs "connected" or "disconnected"
    });

    pyDoc.on("afterTransaction", async (transaction, remote) => {
    const once = pxMap.get("hydrate");
    console.log("hydrate", once);
    if (once) {
    const persisted = remote.getMap("persistor");
    const data = persisted.get("data");
    if (data) {
    //guardian :: compare the current login/perseed with the persisted one.
    // some logic here


    self.postMessage({
    type: "/rehydrate",
    payload: JSON.stringify(statePersisted),
    });
    pxMap.set("hydrate", false);
    }
    return;
    }
    });

    yield* provide(stopSync);
    });
    };

    // todo: with logout operation at the end, after sync
    const shardPersist = pxThunks.create<ArrayBuffer>(
    "/shardPersist/yjs",
    { supervisor: takeLeading },
    function* (ctx, next) {
    const jsonString = new TextDecoder().decode(ctx.payload);
    const fullDataset = JSON.parse(jsonString);
    const filtered = filterStore(fullDataset);
    const persistor = pyDoc.getMap("persistor");
    persistor.set("data", filtered);
    // yield* call(() => provider.whenSynced);
    yield* next();
    }
    );

    const getWsPersist = pxThunks.create(
    "/persist/first",
    { supervisor: takeLeading },
    function* (ctx, next) {
    pxMap.set("hydrate", true);
    // yield* call(() => provider.whenSynced);
    yield* next();
    }
    );
  3. VldMrgnn revised this gist Jul 28, 2024. 1 changed file with 59 additions and 0 deletions.
    59 changes: 59 additions & 0 deletions worker.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,59 @@
    // px.worker-factory.ts
    interface IWorkerOne {
    hello?: (name: string) => Promise<string>;
    postMessage?: (data: any) => void;
    onmessage?: (data: any) => void | null;
    }

    let workerInstance = null;

    export const startPxWorker = () => {
    if (!workerInstance) {
    console.log("px.worker INSTANCE STARTING startWorker");
    workerInstance = new Worker(new URL("./px.worker.ts", import.meta.url));
    }
    return workerInstance;
    };

    export const terminatePxWorker = () => {
    if (workerInstance) {
    workerInstance.terminate();
    workerInstance = null;
    }
    };

    export const getPxWorker = () => {
    return workerInstance;
    };


    // index.ts
    export const initPxWorker = (userseed: string) => {
    return resource(function* (provide) {
    const pxWorker = startPxWorker();
    pxWorker.postMessage(`idb@${userseed}`);
    pxWorker.onmessage = (e) => {
    switch (e.data.type) {
    case "hello":
    console.log("px.worker: hello", e.data);
    break;
    case "pong":
    console.log("px.worker: pong", e.data);
    break;
    case "/rehydrate": {
    fxDispatch(redisRehydrate(e.data.payload));
    break;
    }
    default: {
    const { type, payload, json } = e.data;
    fxDispatch({ type, payload: json ?? payload });
    if (process.env.NODE_ENV === "development") {
    console.log("[dev]. px.worker: default action", e.data);
    }
    break;
    }
    }
    };
    yield* provide(pxWorker);
    });
    };
  4. VldMrgnn created this gist Jul 28, 2024.
    143 changes: 143 additions & 0 deletions idbAdapter.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,143 @@
    import { openDB } from "idb";
    import storage from "localforage";
    import {
    AnyState,
    call,
    Err,
    Ok,
    Operation,
    PersistAdapter,
    Result,
    select as starfxSelect,
    sleep,
    } from "starfx";

    import { PERSISTOR_NAME } from "@app/state/constants";
    import { schema } from "@app/statefx/schema";
    import { processSkipForwardPersistor } from "@app/statefx/selectors/applog.sel";
    import { getPxWorker } from "@app/statefx/workers/pxworker-factory";
    import { allowlist, AppState } from "@app/statefx/xstore";
    import { debounceThx } from "@app/statefx/xThunks/ui-persist";

    const CONST_PERSISTOR_VERSION = 9;

    export function* forwardPersitence(
    key: string = null,
    jsonState: string = null
    ) {
    if (jsonState === null) {
    const s_ = yield* starfxSelect((a) => a);
    const s = {};
    for (const k of allowlist) {
    s[k] = s_[k];
    }
    jsonState = JSON.stringify(s);
    }

    const who = yield* starfxSelect(schema.whoami.select);

    if (who.wai_database) {
    const pxWorker = getPxWorker();
    if (!pxWorker) {
    console.log("no pxWorker");
    }
    if (pxWorker) {
    // custom debounce middleware
    const canPost = yield* call(() => debounceThx.run());

    if (canPost.key === "out") {
    return Ok(undefined);
    }

    // includes the lock and the collection active
    const shouldSkip = yield* starfxSelect(processSkipForwardPersistor);
    // create the waiting mechanism but exit if the lock is on
    if (shouldSkip) {
    return Ok(undefined);
    }
    if (canPost.key === "in") {
    // efficient way to send the data
    // Encode the JSON string to an ArrayBuffer
    const buffer = new TextEncoder().encode(jsonState).buffer;
    console.log(buffer.byteLength, "before"); // 4MB
    pxWorker.postMessage(
    {
    type: "/runner/arrayBuffer",
    data: buffer,
    },
    [buffer]
    );
    console.log(buffer.byteLength, "after"); //0 bytes
    }
    }
    }
    }



    export const openDbfn = (pName = PERSISTOR_NAME) => {
    if (!pName) throw new Error("pName is required");

    return openDB(pName, CONST_PERSISTOR_VERSION, {
    upgrade(db) {
    if (!db.objectStoreNames.contains("persist")) {
    db.createObjectStore("persist");
    }
    // ui
    if (!db.objectStoreNames.contains("ui")) {
    db.createObjectStore("ui");
    }
    // workermain
    if (!db.objectStoreNames.contains("workermain")) {
    db.createObjectStore("workermain");
    }
    },
    });
    };

    export function createIDBStorageAdapter<S extends AnyState>(
    pName: string
    ): PersistAdapter<S> {
    storage.config({
    driver: storage.INDEXEDDB,
    name: pName,
    version: CONST_PERSISTOR_VERSION,
    storeName: "persist",
    });

    return {
    getItem: function* get(key: string): Operation<Result<S>> {
    try {
    const ostorage = yield* call(storage.getItem(key)) as Operation<string>;
    const returnvalue = JSON.parse(ostorage || "{}");
    return Ok(returnvalue);
    } catch (error) {
    console.error("Error getting item from storage:", error);
    return Err(error);
    }
    },
    setItem: function* set(key: string, s: Partial<S>) {
    const state = JSON.stringify(s);
    try {
    yield* call(storage.setItem(key, state));

    // forward a slice of the state to persist in redis.
    yield* forwardPersitence(key, state);
    // maybe is too much ....
    return Ok(undefined);
    } catch (error) {
    console.error("Error setting item to storage:", error, key, state);
    return Err(error);
    }
    },
    removeItem: function* remove(key: string) {
    try {
    yield* call(storage.removeItem(key));
    return Ok(undefined);
    } catch (error) {
    console.error("Error removing item from storage:", error);
    return Err(error);
    }
    },
    };
    }
    270 changes: 270 additions & 0 deletions px.worker__v1.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,270 @@
    import { each, ensure, main, on, resource, run } from "effection";
    import { gzip } from "fflate";
    import {
    call,
    createApi,
    createThunks,
    keepAlive,
    mdw,
    parallel,
    put,
    sleep,
    spawn,
    take,
    takeLeading,
    } from "starfx";
    import { IndexeddbPersistence } from "y-indexeddb";
    import { WebsocketProvider } from "y-websocket";
    import * as Y from "yjs";

    import { service } from "@app/service";
    import { PERSISTOR_NAME } from "@app/state/constants";

    import { isErr, isOk } from "../helpers";
    import { parseMessageEvent } from "./helpers";

    const dev = process.env.NODE_ENV === "development";
    const serverUrl =
    process.env.NODE_ENV === "production"
    ? process.env.REACT_APP_SERVERURL_PROD || ""
    : process.env.REACT_APP_SERVERURL_DEV || "";
    const wsUrl = serverUrl.replace("http", "ws");

    const pxMap = new Map<string, any>();
    pxMap.set("hydrate", true);
    pxMap.set("email", "");
    pxMap.set("perseed", "");
    //

    import type { ApiCtx, ThunkCtx, Next } from "starfx";
    let storeName; // [findme#320]

    type TWorkerParams = {
    key: string;
    url: URL;
    req: Request;
    data: any;
    body?: any;
    isLast?: boolean;
    meta?: {
    halt: boolean;
    };
    };

    // gas
    const persitInRedis = [
    //.. ui stuff
    ];

    //ice = where shid0 is null
    //liquid = where shid0 is not null
    const tables = [
    "currencyRates",
    //'....
    ];

    const shid0s = [];
    // ["currencyRates", "invoiceFiscal", "SPVInvoiceRaw", "SPVMessages", "SPVMessageKeys", "SPVMessageUserState"];

    /* -------------------------------------------------------------------------- */
    const setStoreName = (name: string) => {
    if (!name) {
    return;
    }
    // first we have no email.
    // the worker starts on saga-query store bootup
    storeName = name;
    run(function* () {
    yield* call(() => getWsPersist.run());
    });
    };

    const getStoreName = () => {
    return storeName;
    };

    function* setStoreParams({
    perseed,
    email,
    }: {
    perseed: string;
    email: string;
    }) {
    const stopSync = yield* call(pxSync(email, perseed));
    console.log("stopSync >> SHOULD BE THE TEXT OF TEH FUNCTION:", stopSync);
    // to really stop the sync we need to call the stopSync function
    }

    const pxApi = createApi<ApiCtx>();
    pxApi.use(mdw.err);
    pxApi.use(mdw.queryCtx);
    pxApi.use(mdw.nameParser);
    pxApi.use(pxApi.routes());
    pxApi.use(mdw.fetch({ baseUrl: service }));

    export const pxThunks = createThunks<ThunkCtx>();
    pxThunks.use(mdw.err);
    pxThunks.use(pxThunks.routes());

    const filterStore = (s) => {
    const subset = {};
    for (const key of persitInRedis) {
    if (key in s) {
    subset[key] = s[key];
    }
    }
    // latest data: maybe is too much
    for (const key of shid0s) {
    if (key in s) {
    subset[key] = Object.fromEntries(
    Object.entries(s[key]).filter(([k, v]: [string, any]) => !!v?.shid0)
    );
    }
    }
    // the full dataset. idk how o optimize
    for (const key of tables) {
    if (key in s) {
    subset[key] = s[key];
    }
    }
    return subset;
    };

    /* --------------------------------------------------------------------------
    v1. hand made
    /* -------------------------------------------------------------------------- */

    function compressData(data: unknown): Promise<ArrayBuffer> {
    // Convert the object to a JSON string
    const jsonString = JSON.stringify(data);
    return new Promise((resolve, reject) => {
    // Gzip compress the JSON string
    gzip(new TextEncoder().encode(jsonString), (err, compressed) => {
    if (err) {
    reject(err);
    } else {
    // Resolve with the buffer part of the Uint8Array
    resolve(
    compressed.buffer.slice(
    compressed.byteOffset,
    compressed.byteOffset + compressed.byteLength
    )
    );
    }
    });
    });
    }
    const getPersist = pxApi.get(
    "/persist",
    { supervisor: takeLeading },
    function* (ctx: ApiCtx, next: Next) {
    yield* next();
    if (isErr(ctx.json)) {
    return;
    }
    if (isOk(ctx.json)) {
    if (ctx?.json?.value) {
    /// can we transfer the data to the main thread?
    self.postMessage({ type: "/rehydrate", payload: ctx.json.value });
    }
    }
    }
    );

    const postPersist = pxApi.post<ArrayBuffer>(
    "/persist",
    { supervisor: takeLeading },
    function* (ctx, next) {
    const jsonString = new TextDecoder().decode(ctx.payload);
    const data = JSON.parse(jsonString);
    const relevantData = filterStore(data);
    const compressed = yield* call(() => compressData(relevantData));

    // inspect the size of the compressed data in MB
    // console.log(
    // "compressed.byteLength",
    // compressed.byteLength / 1024 / 1024,
    // "MB"
    // );

    ctx.request = ctx.req({
    method: "POST",
    url: `/persist`,
    headers: {
    "Content-Type": "application/octet-stream",
    "Content-Encoding": "gzip",
    credentials: "include",
    },
    body: compressed,
    });
    yield* next();
    }
    );


    function* worker() {
    const dispatcher = function* () {
    while (true) {
    const action = yield* take("*");
    const { type: actionType, payload } = action as {
    type: string;
    payload: any;
    };
    switch (actionType) {
    default:
    console.log("did you forget this action: ", actionType, "?");
    if (actionType.startsWith("/nop/")) {
    break;
    }
    break;
    }
    }
    };

    yield* spawn(() =>
    run(function* () {
    const group = yield* parallel([
    dispatcher,
    pxApi.bootup,
    pxThunks.bootup,
    ]);
    yield* group;
    })
    );

    // listener //
    for (const event of yield* each(on(self, "message"))) {
    //the fastest track //
    if (event.data?.type === "/runner/arrayBuffer") {
    // yield* spawn(() => postPersist.run(event.data.data));
    yield* spawn(() => shardPersist.run(event.data.data));
    yield* each.next();
    continue;
    }
    // on login, set the email and perseed
    if (event.data?.type === "/upgrade") {
    try {
    const { email, perseed } = JSON.parse(event.data.data);
    yield* spawn(() => setStoreParams({ perseed, email }));
    } finally {
    yield* each.next();
    }
    continue;
    }
    const paramResult = yield* parseMessageEvent(
    event,
    setStoreName,
    getStoreName
    );

    if (isErr(paramResult)) {
    yield* each.next();
    continue;
    }
    const { url } = paramResult.value;
    yield* put({ type: url.pathname, payload: paramResult.value });
    yield* each.next();
    }
    }

    keepAlive([main(worker)]);
    35 changes: 35 additions & 0 deletions store.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,35 @@
    //....


    const allowlist = [...] as (keyof AppState)[];
    export const persistor = createPersistor<AppState>({
    //m. main
    adapter: createIDBStorageAdapter<AppState>(PERSISTOR_NAME),
    allowlist: allowlist,
    });

    export const store = createStore({
    initialState: initialState,
    middleware: [persistStoreMdw(persistor)],
    });
    window["fx"] = store; //important //

    export const runFxState = () => {
    devtoolsEnabled && setupDevTool({}, { name: "starfx", enabled: true });
    store.run(function* () {
    yield* persistor.rehydrate();
    yield* schema.update(schema.loaders.success({ id: PERSIST_LOADER_ID }));
    // yield* spawn(() => initWorker());
    const group = yield* parallel([
    ...tasks,
    xApi.bootup,
    xThunk.bootup,
    xWorker.bootup,
    xouterApi.bootup,
    xDbc.bootup,
    ]);

    yield* group;
    });
    return store;
    };