Skip to content

Instantly share code, notes, and snippets.

@gjcourt
Created December 4, 2014 01:51
Show Gist options
  • Select an option

  • Save gjcourt/4f3d1db1763399cbf4a0 to your computer and use it in GitHub Desktop.

Select an option

Save gjcourt/4f3d1db1763399cbf4a0 to your computer and use it in GitHub Desktop.

Revisions

  1. gjcourt renamed this gist Dec 4, 2014. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. gjcourt created this gist Dec 4, 2014.
    161 changes: 161 additions & 0 deletions -
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,161 @@
    import hashlib
    import hmac
    import pytz # pip install pytz
    from collections import namedtuple
    from datetime import datetime, timedelta
    from itertools import izip
    from urllib import quote


    def utcaware(dt):
    return dt.replace(tzinfo=pytz.utc)


    def utcnow():
    return utcaware(datetime.utcnow())


    # AWS Version 4 signing example
    # See: http://docs.aws.amazon.com/general/latest/gr/sigv4_signing.html


    AmazonParameters = namedtuple('AmazonParameters', [
    'algorithm',
    'amz_date',
    'amz_request_id',
    'amz_customer_id',
    'amz_dta_version',
    'signed_headers',
    'credential_public_key',
    'credential_date_stamp',
    'signature',
    ])


    class Signer(object):
    def __init__(self, request):
    self.request = request

    self.public_key = 'public'
    self.secret_key = 'secret'

    self.service = ''
    self.region = ''

    def sha256(self, msg):
    return hashlib.sha256(msg.encode('utf-8')).hexdigest()

    def sign(self, key, msg, hex=False):
    if hex:
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).hexdigest()
    else:
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

    def get_amazon_params(self):
    if not hasattr(self, '_amazon_parameters'):
    amz_date = self.request.META['HTTP_X_AMZ_DATE']
    amz_request_id = self.request.META['HTTP_X_AMZ_REQUEST_ID']
    amz_customer_id = self.request.META['HTTP_X_AMZ_CUSTOMER_ID']
    amz_dta_version = self.request.META['HTTP_X_AMZ_DTA_VERSION']
    auth_header = self.request.META['HTTP_AUTHORIZATION']
    preamble, credential_protocol, signature_protocol = [l.strip() for l in auth_header.split(',')]
    algorithm, signed_headers_protocol = preamble.split(' ')
    _, signed_headers = signed_headers_protocol.split('=')
    _, credential = credential_protocol.split('=')
    _, signature = signature_protocol.split('=')

    credential_public_key, credential_date_stamp = credential.split('/')

    self._amazon_parameters = AmazonParameters(
    algorithm,
    amz_date,
    amz_request_id,
    amz_customer_id,
    amz_dta_version,
    signed_headers,
    credential_public_key,
    credential_date_stamp,
    signature,
    )

    return self._amazon_parameters

    def get_request_method(self):
    return self.request.method

    def get_canonical_uri(self):
    return quote(self.request.path)

    def get_canonical_querystring(self):
    return self.request.META['QUERY_STRING'] # usually empty?

    def get_canonical_headers(self):
    # Step 4: Create the canonical headers. Header names and values
    # must be trimmed and lowercase, and sorted in ASCII order.
    # Note that there is a trailing \n.
    params = self.get_amazon_params()
    canonical_headers = params.signed_headers.split(';')
    values = [self.request.META['HTTP_' + h.upper().replace('-', '_')] for h in canonical_headers]
    return '\n'.join([h + ':' + v for h, v in izip(canonical_headers, values)]) + '\n'

    def get_signed_headers(self):
    # Step 5: Create the list of signed headers. This lists the headers
    # in the canonical_headers list, delimited with ";" and in alpha order.
    # Note: The request can include any headers; canonical_headers and
    # signed_headers include those that you want to be included in the
    # hash of the request. "Host" and "http-x-amz-date" are always required.
    # For DynamoDB, content-type and http-x-amz-target are also required.
    params = self.get_amazon_params()
    return params.signed_headers

    def get_payload_hash(self):
    return self.sha256(self.request.body)

    def get_canonical_request(self):
    return self.get_request_method() + \
    '\n' + self.get_canonical_uri() + \
    '\n' + self.get_canonical_querystring() + \
    '\n' + self.get_canonical_headers() + \
    '\n' + self.get_signed_headers() + \
    '\n' + self.get_payload_hash()

    def get_signing_key(self):
    params = self.get_amazon_params()
    date_stamp = params.credential_date_stamp
    # date_key = self.sign(('AWS4' + self.secret_key).encode('utf-8'), date_stamp)
    # region_key = self.sign(date_key, self.region)
    # service_key = self.sign(region_key, self.service)
    # signing_key = self.sign(service_key, 'aws4_request')
    # return signing_key
    return self.sign(self.secret_key, date_stamp)

    def get_credential_scope(self):
    # params = self.get_amazon_params()
    # return params.credential_date_stamp + '/' + self.region + '/' + self.service + '/' + 'aws4_request'
    return ''

    def get_signing_string(self):
    params = self.get_amazon_params()
    credential_scope = self.get_credential_scope()
    canonical_request = self.get_canonical_request()
    return params.algorithm + \
    '\n' + params.amz_date + \
    '\n' + credential_scope + \
    '\n' + self.sha256(canonical_request)

    def get_signature(self):
    signing_key = self.get_signing_key()
    signing_string = self.get_signing_string()
    return self.sign(signing_key, signing_string, hex=True)

    def verify_signature(self):
    params = self.get_amazon_params()
    amz_datetime = utcaware(datetime.strptime(params.amz_date, '%Y%m%dT%H%M%SZ'))
    return utcnow() - amz_datetime < timedelta(minutes=45) \
    and params.credential_public_key == self.public_key \
    and params.signature == self.get_signature()


    # Example:
    # signer = Signer(request)
    # signer.verify_signature()