Skip to content

Instantly share code, notes, and snippets.

@mmautner
Last active July 11, 2023 17:41
Show Gist options
  • Select an option

  • Save mmautner/c102bff31d386679f021 to your computer and use it in GitHub Desktop.

Select an option

Save mmautner/c102bff31d386679f021 to your computer and use it in GitHub Desktop.
Redshift Upserts w/ Python
import uuid
import psycopg2
from secret import REDSHIFT_CREDS
from secret import AWS_ACCESS_KEY, AWS_SECRET_KEY
def get_primary_keys(tablename, db):
c = db.cursor()
sql = "select indexdef from pg_indexes where tablename = '%s';" % tablename
c.execute(sql)
result = c.fetchall()[0][0]
rfields = result.split('(')[1].strip(')').split(',')
fields = [field.strip().strip('"') for field in rfields]
return fields
def load_s3_file_into_redshift(s3path, tablename, delimiter='|'):
"""File must be gzipped:
s3path on Amazon S3 - (str)
tablename - (str)
field delimiter - (str)"""
temp_tablename = 'temp_%s' % uuid.uuid4().get_hex()
db = psycopg2.connect(**REDSHIFT_CREDS)
primary_keys = get_primary_keys(tablename, db)
equals_clause = '{dest}.%s = {src}.%s'
join_clause = ' AND '.join([equals_clause % (pk, pk) for pk in primary_keys])
join_clause = join_clause.format(dest=tablename, src=temp_tablename)
upsert_qry = """\
CREATE TEMPORARY TABLE {src} (LIKE {dest});
COPY {src} FROM '{s3path}'
CREDENTIALS 'aws_access_key_id={access_key};aws_secret_access_key={secret_key}'
DELIMITER '{delimiter}' gzip;
BEGIN;
LOCK {dest};
DELETE FROM {dest} USING {src} WHERE {join_clause};
INSERT INTO {dest} SELECT * FROM {src};
END;
""".format(dest=tablename,
src=temp_tablename,
s3path=s3path,
join_clause=join_clause,
access_key=AWS_ACCESS_KEY,
secret_key=AWS_SECRET_KEY,
delimiter=delimiter)
c = db.cursor()
c.execute(upsert_qry)
db.commit()
return s3path
@Uzzije
Copy link
Copy Markdown

Uzzije commented Nov 3, 2022

Thanks for this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment