Skip to content

Instantly share code, notes, and snippets.

@sbrl
Created May 26, 2020 19:35
Show Gist options
  • Select an option

  • Save sbrl/33bc3afc170a12f6eaf49b4ca8e03602 to your computer and use it in GitHub Desktop.

Select an option

Save sbrl/33bc3afc170a12f6eaf49b4ca8e03602 to your computer and use it in GitHub Desktop.

Revisions

  1. sbrl created this gist May 26, 2020.
    89 changes: 89 additions & 0 deletions main.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,89 @@
    "use strict";

    import EventEmitter from 'events';
    import child_process from 'child_process';

    /**
    * Helper method that waits for an event to be fired on a given object.
    * @param {EventEmitter} obj The object that will fire the event - must inherit from EventEmitter
    * @param {string} event_name The name of the event to wait for.
    * @return {Promise} A promise that resolves when the specified event is fired on the given object.
    */
    function wait_for_event(obj, event_name) {
    return new Promise((resolve) => {
    obj.once(event_name, resolve);
    });
    }

    class Controller extends EventEmitter {
    constructor() {
    super();

    this.worker_count = 4;
    this.workers = [];
    this.counter = 0;
    this.counter_last = 0;
    this.stats_update_last = 0;

    this.someone_is_reading = false;
    }

    log_msg(...msg) {
    console.log(`[master] `, ...msg);
    }

    stats_update() {
    if(new Date() - this.stats_update_last < 1000) return;

    let counter_diff = this.counter - this.counter_last;
    console.log(`Rate: ${counter_diff} items / sec`)
    this.counter_last = this.counter;
    this.stats_update_last = +new Date();
    }

    async send_work(worker) {
    while(this.someone_is_reading) {
    // this.log_msg(`someone is reading, waiting for slot`);
    await wait_for_event(this, "reading_complete");
    }
    // this.log_msg(`got slot`);
    this.someone_is_reading = true;
    worker.send({ event: "work", count: this.counter });
    this.counter++;
    }

    async handle_message(worker, i, message) {
    switch(message.event) {
    case "read_complete":
    this.someone_is_reading = false;
    this.emit("reading_complete");
    break;
    case "ready":
    case "done":
    this.stats_update();
    process.nextTick(() => this.send_work(worker));
    break;

    case "end":
    worker.send({ event: "exit" });
    this.log_msg(`Worker ${i} exited`);
    this.workers.splice(i, 1);
    break;
    }
    }

    start() {
    for(let i = 0; i < this.worker_count; i++) {
    this.log_msg(`Spawning worker ${i} / ${this.worker_count}`);
    let next = child_process.fork("worker.mjs", {
    stdio: [ 0, 1, 2, "ipc" ]
    });
    next.on("message", this.handle_message.bind(this, next, i));
    this.workers.push(next);
    }
    }
    }


    let controller = new Controller();
    controller.start();
    6 changes: 6 additions & 0 deletions multi-process-read.sh
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,6 @@
    #!/usr/bin/env bash

    # This test demonstrates reading from a single pipe using multiple processes.
    # License: Mozilla Public Licence 2.0 (MPL-2.0)

    seq 1 1000000 | awk '{ print("LINE_START This is a number [" $0 "] more text more text LINE_END") }' | node main.mjs
    66 changes: 66 additions & 0 deletions worker.mjs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,66 @@
    "use strict";

    import fs from 'fs';

    // Global buffer to avoid unnecessary memory churn
    let buffer = Buffer.alloc(4096);
    function read_line_unbuffered(fd) {
    let i = 0;
    while(true) {
    let bytes_read = fs.readSync(fd, buffer, i, 1);
    if(bytes_read !== 1 || buffer[i] == 0x0A) {
    if(i == 0 && bytes_read == null) return null;
    return buffer.toString("utf-8", 0, i); // This is not inclusive, so we can abuse it to trim the \n off the end
    }

    i++;
    if(i == buffer.length) {
    let new_buffer = new Buffer(Math.ceil(buffer.length * 1.5));
    buffer.copy(new_buffer);
    buffer = new_buffer;
    }
    }
    }

    function log_msg(...msg) {
    console.log(`[worker ${process.pid}] `, ...msg);
    }

    log_msg(`hello, world`);

    function sleep_async(ms) {
    return new Promise((resolve) => {
    setTimeout(resolve, ms);
    });
    }

    process.on("message", async (message) => {
    switch(message.event) {
    case "work":
    // log_msg(`got work, count ${message.count}`);
    // log_msg(`starting read`); let start = new Date();
    let next = read_line_unbuffered(0);
    // log_msg(`ended read in ${new Date() - start}ms`);
    process.send({ event: "read_complete" });
    if(next == null) {
    log_msg(`Done reading`);
    process.send({ event: "end" });
    }

    // Simulate doing hard work
    // let start = +new Date();
    // while(new Date() - start < 1000) {
    // // noop
    // }
    log_msg(`[${message.count}] Processed '${next}'`);
    // await sleep_async(500 + Math.random()*1000s);
    process.send({ event: "done" });
    break;

    case "exit":
    log_msg(`exiting`);
    process.exit(0);
    }
    });

    process.send({ event: "ready" });