Last active
September 21, 2022 02:13
-
-
Save jimmoffet/ca8dd87309f878e5bd8ffb8903e08702 to your computer and use it in GitHub Desktop.
SNS http/s fanout signature verification
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import base64 | |
| import re | |
| from urllib.parse import urlparse | |
| import requests | |
| from M2Crypto import X509 | |
| from app import redis_store | |
| USE_CACHE = True | |
| VALIDATE_ARN = True | |
| VALID_SNS_TOPICS = ['my_bounce_topic_name', 'my_success_topic_name', 'my_complaint_topic_name'] | |
| _signing_cert_cache = {} | |
| _cert_url_re = re.compile( | |
| r'sns\.([a-z]{1,3}-[a-z]+-[0-9]{1,2})\.amazonaws\.com', | |
| ) | |
| def get_certificate(url): | |
| if USE_CACHE: | |
| res = redis_store.get(url) | |
| if res is not None: | |
| return res | |
| res = requests.get(url).text | |
| redis_store.set(url, res, ex=60 * 60) # 60 minutes | |
| return res | |
| else: | |
| return requests.get(url).text | |
| def valid_sns_message(sns_payload): | |
| """ | |
| Adapted from the solution posted at | |
| https://github.com/boto/boto3/issues/2508#issuecomment-992931814 | |
| """ | |
| if not isinstance(sns_payload, dict): | |
| return False | |
| # Amazon SNS currently supports signature version 1. | |
| if sns_payload.get('SignatureVersion') != '1': | |
| return False | |
| if VALIDATE_ARN: | |
| arn = sns_payload.get('TopicArn') | |
| topic_name = arn.split(':')[5] | |
| if topic_name not in VALID_SNS_TOPICS: | |
| return False | |
| payload_type = sns_payload.get('Type') | |
| if payload_type in ['SubscriptionConfirmation', 'UnsubscribeConfirmation']: | |
| fields = ['Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'] | |
| elif payload_type == 'Notification': | |
| fields = ['Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type'] | |
| else: | |
| return False | |
| # Build the string to be signed. | |
| string_to_sign = '' | |
| for field in fields: | |
| field_value = sns_payload.get(field) | |
| if not isinstance(field_value, str): | |
| return False | |
| string_to_sign += field + '\n' + field_value + '\n' | |
| # Get the signature | |
| try: | |
| decoded_signature = base64.b64decode(sns_payload.get('Signature')) | |
| except (TypeError, ValueError): | |
| return False | |
| # Get the signing certificate | |
| signing_cert_url = sns_payload.get('SigningCertUrl') if 'SigningCertUrl' in sns_payload else sns_payload.get('SigningCertURL') | |
| if not isinstance(signing_cert_url, str): | |
| return False | |
| cert_scheme, cert_netloc, *_ = urlparse(signing_cert_url) | |
| if cert_scheme != 'https' or not re.match(_cert_url_re, cert_netloc): | |
| # The cert doesn't seem to be from AWS | |
| return False | |
| certificate = _signing_cert_cache.get(signing_cert_url) | |
| if certificate is None: | |
| certificate = X509.load_cert_string(get_certificate(signing_cert_url)) | |
| _signing_cert_cache[signing_cert_url] = certificate | |
| if certificate.get_subject().as_text() != 'CN=sns.amazonaws.com': | |
| return False | |
| # Extract the public key. | |
| public_key = certificate.get_pubkey() | |
| # Amazon SNS uses SHA1withRSA. | |
| # http://sns-public-resources.s3.amazonaws.com/SNS_Message_Signing_Release_Note_Jan_25_2011.pdf | |
| public_key.reset_context(md='sha1') | |
| public_key.verify_init() | |
| # Sign the string. | |
| public_key.verify_update(string_to_sign.encode()) | |
| # Verify the signature matches. | |
| verification_result = public_key.verify_final(decoded_signature) | |
| # M2Crypto uses EVP_VerifyFinal() from openssl as the underlying | |
| # verification function. 1 indicates success, anything else is either | |
| # a failure or an error. | |
| if verification_result != 1: | |
| return False | |
| return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment