Skip to content

Instantly share code, notes, and snippets.

@obazoud
Forked from marcocitus/create_table.sql
Created April 8, 2016 15:26
Show Gist options
  • Select an option

  • Save obazoud/329e5e974411260cf8950f78e512bbd7 to your computer and use it in GitHub Desktop.

Select an option

Save obazoud/329e5e974411260cf8950f78e512bbd7 to your computer and use it in GitHub Desktop.

Revisions

  1. @marcocitus marcocitus revised this gist Mar 10, 2016. 2 changed files with 29 additions and 29 deletions.
    22 changes: 11 additions & 11 deletions get_date_shard.sql
    Original file line number Diff line number Diff line change
    @@ -2,21 +2,21 @@ CREATE OR REPLACE FUNCTION get_date_shard(events_date date)
    RETURNS bigint AS
    $BODY$
    DECLARE
    date_shard_id bigint;
    date_shard_id bigint;
    BEGIN
    SELECT shardid INTO date_shard_id FROM pg_dist_shard
    WHERE logicalrelid = 'github_events'::regclass AND shardminvalue::date = events_date;
    SELECT shardid INTO date_shard_id FROM pg_dist_shard
    WHERE logicalrelid = 'github_events'::regclass AND shardminvalue::date = events_date;

    IF NOT FOUND THEN
    SELECT master_create_empty_shard('github_events') INTO date_shard_id;
    IF NOT FOUND THEN
    SELECT master_create_empty_shard('github_events') INTO date_shard_id;

    UPDATE pg_dist_shard
    SET shardminvalue = events_date::timestamp,
    shardmaxvalue = events_date::timestamp + interval '1 day' - interval '1 second'
    WHERE shardid = date_shard_id;
    END IF;
    UPDATE pg_dist_shard
    SET shardminvalue = events_date::timestamp,
    shardmaxvalue = events_date::timestamp + interval '1 day' - interval '1 second'
    WHERE shardid = date_shard_id;
    END IF;

    RETURN date_shard_id;
    RETURN date_shard_id;
    END;
    $BODY$
    LANGUAGE plpgsql;
    36 changes: 18 additions & 18 deletions load_github_events.sql
    Original file line number Diff line number Diff line change
    @@ -3,28 +3,28 @@ CREATE SEQUENCE IF NOT EXISTS stage_id;
    CREATE OR REPLACE FUNCTION load_github_events(events_date date, start_hour int, end_hour int) RETURNS text AS
    $BODY$
    DECLARE
    stage_table text := 'stage_'||nextval('stage_id');
    stage_table text := 'stage_'||nextval('stage_id');
    BEGIN
    CREATE TEMPORARY TABLE input (data jsonb);
    CREATE TEMPORARY TABLE input (data jsonb);

    /* Download, decompress, and filter JSON data */
    EXECUTE format('COPY input FROM PROGRAM ''curl -s http://data.githubarchive.org/%s-{%s..%s}.json.gz | zcat | grep -v "\\u0000"'''||
    'CSV QUOTE e''\x01'' DELIMITER e''\x02''', events_date, start_hour, end_hour);
    /* Download, decompress, and filter JSON data */
    EXECUTE format('COPY input FROM PROGRAM ''curl -s http://data.githubarchive.org/%s-{%s..%s}.json.gz | zcat | grep -v "\\u0000"'''||
    'CSV QUOTE e''\x01'' DELIMITER e''\x02''', events_date, start_hour, end_hour);

    /* Convert raw JSON to table format */
    EXECUTE format('CREATE TABLE %I AS '||
    'SELECT (data->>''id'')::bigint AS event_id, '||
    '(data->>''type'')::text AS event_type, '||
    '(data->>''public'')::boolean AS event_public, '||
    '(data->''repo''->>''id'')::bigint AS repo_id, '||
    'data->''payload'' AS payload, '||
    'data->''repo'' AS repo, '||
    'data->''actor'' AS actor, '||
    'data->''org'' AS org, '||
    '(data->>''created_at'')::timestamp AS created_at '||
    'FROM input', stage_table);
    /* Convert raw JSON to table format */
    EXECUTE format('CREATE TABLE %I AS '||
    'SELECT (data->>''id'')::bigint AS event_id, '||
    '(data->>''type'')::text AS event_type, '||
    '(data->>''public'')::boolean AS event_public, '||
    '(data->''repo''->>''id'')::bigint AS repo_id, '||
    'data->''payload'' AS payload, '||
    'data->''repo'' AS repo, '||
    'data->''actor'' AS actor, '||
    'data->''org'' AS org, '||
    '(data->>''created_at'')::timestamp AS created_at '||
    'FROM input', stage_table);

    RETURN stage_table;
    RETURN stage_table;
    END;
    $BODY$
    LANGUAGE plpgsql;
  2. @marcocitus marcocitus revised this gist Mar 10, 2016. No changes.
  3. @marcocitus marcocitus created this gist Mar 10, 2016.
    18 changes: 18 additions & 0 deletions create_table.sql
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,18 @@
    CREATE TABLE github_events
    (
    event_id bigint,
    event_type text,
    event_public boolean,
    repo_id bigint,
    payload jsonb,
    repo jsonb,
    actor jsonb,
    org jsonb,
    created_at timestamp
    );

    SELECT master_create_distributed_table('github_events', 'created_at', 'append');

    CREATE INDEX ON github_events (event_type);
    CREATE INDEX ON github_events USING GIN (actor jsonb_path_ops);
    CREATE INDEX ON github_events USING GIN (repo jsonb_path_ops);
    22 changes: 22 additions & 0 deletions get_date_shard.sql
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,22 @@
    CREATE OR REPLACE FUNCTION get_date_shard(events_date date)
    RETURNS bigint AS
    $BODY$
    DECLARE
    date_shard_id bigint;
    BEGIN
    SELECT shardid INTO date_shard_id FROM pg_dist_shard
    WHERE logicalrelid = 'github_events'::regclass AND shardminvalue::date = events_date;

    IF NOT FOUND THEN
    SELECT master_create_empty_shard('github_events') INTO date_shard_id;

    UPDATE pg_dist_shard
    SET shardminvalue = events_date::timestamp,
    shardmaxvalue = events_date::timestamp + interval '1 day' - interval '1 second'
    WHERE shardid = date_shard_id;
    END IF;

    RETURN date_shard_id;
    END;
    $BODY$
    LANGUAGE plpgsql;
    21 changes: 21 additions & 0 deletions load_github_events
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,21 @@
    #!/bin/sh -e
    date=$1
    start_hour=$2
    end_hour=${3:-$2}

    # Get a shard for the date
    shard_id=$(psql -tA -c "SELECT get_date_shard('$date')")

    # Stage from one of the shard placements
    worker_name=$(psql -tA -F" " -c "SELECT nodename FROM pg_dist_shard_placement WHERE shardid = $shard_id LIMIT 1")

    # Load the raw data from githubarchive.org
    stage_table=$(psql -tA -h $worker_name -c "SELECT load_github_events('$date', $start_hour, $end_hour)")

    # Append the data to the appropriate shard
    psql -c "SELECT master_append_table_to_shard($shard_id, '$stage_table', '$worker_name', 5432)" >/dev/null

    # Drop the stage table
    psql -h $worker_name -c "DROP TABLE $stage_table" >/dev/null

    echo loaded $date from $start_hour:00:00 to $end_hour:59:59
    30 changes: 30 additions & 0 deletions load_github_events.sql
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,30 @@
    CREATE SEQUENCE IF NOT EXISTS stage_id;

    CREATE OR REPLACE FUNCTION load_github_events(events_date date, start_hour int, end_hour int) RETURNS text AS
    $BODY$
    DECLARE
    stage_table text := 'stage_'||nextval('stage_id');
    BEGIN
    CREATE TEMPORARY TABLE input (data jsonb);

    /* Download, decompress, and filter JSON data */
    EXECUTE format('COPY input FROM PROGRAM ''curl -s http://data.githubarchive.org/%s-{%s..%s}.json.gz | zcat | grep -v "\\u0000"'''||
    'CSV QUOTE e''\x01'' DELIMITER e''\x02''', events_date, start_hour, end_hour);

    /* Convert raw JSON to table format */
    EXECUTE format('CREATE TABLE %I AS '||
    'SELECT (data->>''id'')::bigint AS event_id, '||
    '(data->>''type'')::text AS event_type, '||
    '(data->>''public'')::boolean AS event_public, '||
    '(data->''repo''->>''id'')::bigint AS repo_id, '||
    'data->''payload'' AS payload, '||
    'data->''repo'' AS repo, '||
    'data->''actor'' AS actor, '||
    'data->''org'' AS org, '||
    '(data->>''created_at'')::timestamp AS created_at '||
    'FROM input', stage_table);

    RETURN stage_table;
    END;
    $BODY$
    LANGUAGE plpgsql;