Last active
January 11, 2022 14:39
-
-
Save TylerHendrickson/eff26096e6cc8626e4a57b2edb9895f0 to your computer and use it in GitHub Desktop.
Republish SQS DLQ messages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Republishes messages in an AWS SQS dead-letter queue (DLQ) | |
| Useful for moving messages in a DLQ back to a "normal" SQS queue | |
| (e.g. once bugs are fixed or initial error is otherwise resolved). | |
| Workflow: | |
| 1. Get a message from the configured source DLQ | |
| 2. Publish a new message to the configured destination queue, | |
| using the body of the message from #1 | |
| 3. After a successful publish, delete the message retrieved in #1 from the DLQ | |
| 4a. If deletion in #3 was successful, repeat until the configured DLQ is empty | |
| or configured limit is reached | |
| 4b. If deletion #3 failed, get user confirmation before continuing | |
| Tested with Python 3.6.8 | |
| """ | |
| import argparse | |
| import boto3 | |
| sqs = boto3.resource('sqs') | |
| def load_queue(url): | |
| queue = sqs.Queue(url) | |
| queue.reload() | |
| return queue | |
| def get_next_message(source_queue): | |
| messages = source_queue.receive_messages( | |
| MaxNumberOfMessages=1, | |
| VisibilityTimeout=0, | |
| WaitTimeSeconds=2, | |
| ) | |
| if messages: | |
| print( | |
| f'Received DLQ message with ID {messages[0].message_id} ' | |
| f'(receipt handle: {messages[0].receipt_handle})' | |
| ) | |
| return messages[0] | |
| return None | |
| def publish_message(destination_queue, message): | |
| rs = destination_queue.send_message( | |
| MessageBody=message.body, | |
| DelaySeconds=60, | |
| ) | |
| print(f"Published new message to {destination_queue.url} with ID {rs['MessageId']}") | |
| def delete_message(source_queue, message): | |
| rs = source_queue.delete_messages( | |
| Entries=[{'Id': message.message_id, 'ReceiptHandle': message.receipt_handle}] | |
| ) | |
| if rs.get('Failed'): | |
| print(f'Failed to delete message with ID {message.message_id}') | |
| print(rs['Failed'][0]) | |
| return False | |
| return True | |
| def republish_all_messages(source_queue, destination_queue, limit=None): | |
| counter = 1 | |
| while True: | |
| message = get_next_message(source_queue) | |
| if message is None: | |
| break | |
| publish_message(destination_queue, message) | |
| deletion_successful = delete_message(source_queue, message) | |
| if not deletion_successful and _prompt_to_continue() is False: | |
| break | |
| if limit is not None and counter >= limit: | |
| break | |
| else: | |
| counter += 1 | |
| def _prompt_to_continue() -> bool: | |
| return input('Type "yes" to continue (all other input aborts): ').strip() == 'yes' | |
| def _get_parsed_arguments(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--source-dlq', '-s', required=True, help='URL of the source DLQ') | |
| parser.add_argument( | |
| '--destination-queue', | |
| '-d', | |
| required=True, | |
| help='URL of the destination queue' | |
| ) | |
| parser.add_argument('--limit', '-l', type=int, help='Optional limit for republish operations') | |
| parser.add_argument( | |
| '--no-confirm', | |
| action='store_true', | |
| default=False, | |
| help=( | |
| 'Set to disable confirmation of configured settings ' | |
| 'before proceeding with republish operations' | |
| ) | |
| ) | |
| args = parser.parse_args() | |
| if args.limit is not None and args.limit <= 0: | |
| print( | |
| 'Error:', | |
| ' --limit must be an integer greater than or equal to 1', | |
| sep='\n', | |
| ) | |
| exit(1) | |
| return args | |
| def _main(): | |
| args = _get_parsed_arguments() | |
| dlq = load_queue(args.source_dlq) | |
| queue = load_queue(args.destination_queue) | |
| print( | |
| 'Configuration:', | |
| f' Configured source DLQ with URL: {dlq.url}', | |
| f' Configured destination queue with URL: {queue.url}', | |
| f' Configured Limit: {args.limit}', | |
| sep='\n' | |
| ) | |
| if not args.no_confirm and not _prompt_to_continue(): | |
| print('Exiting due to user input') | |
| exit(1) | |
| print('Starting to republish...') | |
| republish_all_messages( | |
| source_queue=dlq, | |
| destination_queue=queue, | |
| limit=args.limit, | |
| ) | |
| print('Republish operation(s) complete!') | |
| if __name__ == '__main__': | |
| _main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Republishes DDB stream events that were sent to an AWS SQS dead-letter queue (DLQ) | |
| Useful for retrying DynamoDB stream events that were sent to a DLQ after failing | |
| too many times (e.g. once bugs are fixed or initial error is otherwise resolved). | |
| Workflow: | |
| 1. Get a message from the configured source DLQ | |
| 2. Publish a new message to the configured destination lambda, | |
| using the DDB stream event derived from the body of the message from #1 | |
| 3. After a successful async publish, delete the message retrieved in #1 from the DLQ | |
| 4a. If deletion in #3 was successful, repeat until the configured DLQ is empty | |
| or configured limit is reached | |
| 4b. If deletion #3 failed, get user confirmation before continuing | |
| Tested with Python 3.6.8, Python 3.8 | |
| """ | |
| import argparse | |
| import json | |
| import boto3 | |
| sqs = boto3.resource('sqs') | |
| def to_json_helper(obj): | |
| try: | |
| return obj.isoformat() | |
| except AttributeError: | |
| return json.JSONEncoder().default(obj) | |
| def load_queue(url): | |
| queue = sqs.Queue(url) | |
| queue.reload() | |
| return queue | |
| def get_next_message(source_queue): | |
| messages = source_queue.receive_messages( | |
| MaxNumberOfMessages=1, | |
| VisibilityTimeout=0, | |
| WaitTimeSeconds=2, | |
| ) | |
| if messages: | |
| print( | |
| f'Received DLQ message with ID {messages[0].message_id} ' | |
| f'(receipt handle: {messages[0].receipt_handle})' | |
| ) | |
| return messages[0] | |
| return None | |
| def publish_message(destination_queue, message): | |
| rs = destination_queue.send_message( | |
| MessageBody=message.body, | |
| DelaySeconds=60, | |
| ) | |
| print(f"Published new message to {destination_queue.url} with ID {rs['MessageId']}") | |
| def publish_stream_event(dlq_message): | |
| stream_event = get_stream_event_payload(dlq_message['DDBStreamBatchInfo']) | |
| rs = boto3.client('lambda').invoke( | |
| FunctionName=dlq_message['requestContext']['functionArn'], | |
| InvocationType='Event', | |
| Payload=json.dumps(stream_event, default=to_json_helper).encode('utf-8') | |
| ) | |
| print('Sent stream event to', dlq_message['requestContext']['functionArn']) | |
| return rs['StatusCode'] == 202 | |
| def get_stream_event_payload(stream_batch_info): | |
| stream_client = boto3.client('dynamodbstreams') | |
| shard_iterator = stream_client.get_shard_iterator( | |
| StreamArn=stream_batch_info['streamArn'], | |
| ShardId=stream_batch_info['shardId'], | |
| ShardIteratorType='AT_SEQUENCE_NUMBER', | |
| SequenceNumber=stream_batch_info['startSequenceNumber'] | |
| )['ShardIterator'] | |
| records_rs = stream_client.get_records(ShardIterator=shard_iterator, Limit=1) | |
| return {'Records': records_rs['Records']} | |
| def delete_message(source_queue, message): | |
| rs = source_queue.delete_messages( | |
| Entries=[{'Id': message.message_id, 'ReceiptHandle': message.receipt_handle}] | |
| ) | |
| if rs.get('Failed'): | |
| print(f'Failed to delete message with ID {message.message_id}') | |
| print(rs['Failed'][0]) | |
| return False | |
| return True | |
| def republish_all_messages(source_queue, limit=None): | |
| counter = 1 | |
| while True: | |
| message = get_next_message(source_queue) | |
| if message is None: | |
| break | |
| is_republished = publish_stream_event(json.loads(message.body)) | |
| if is_republished: | |
| deletion_successful = delete_message(source_queue, message) | |
| if not deletion_successful and _prompt_to_continue() is False: | |
| break | |
| else: | |
| if _prompt_to_continue() is False: | |
| break | |
| if limit is not None and counter >= limit: | |
| break | |
| else: | |
| counter += 1 | |
| def _prompt_to_continue() -> bool: | |
| return input('Type "yes" to continue (all other input aborts): ').strip() == 'yes' | |
| def _get_parsed_arguments(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--source-dlq', '-s', required=True, help='URL of the source DLQ') | |
| parser.add_argument('--limit', '-l', type=int, help='Optional limit for republish operations') | |
| parser.add_argument( | |
| '--no-confirm', | |
| action='store_true', | |
| default=False, | |
| help=( | |
| 'Set to disable confirmation of configured settings ' | |
| 'before proceeding with republish operations' | |
| ) | |
| ) | |
| args = parser.parse_args() | |
| if args.limit is not None and args.limit <= 0: | |
| print( | |
| 'Error:', | |
| ' --limit must be an integer greater than or equal to 1', | |
| sep='\n', | |
| ) | |
| exit(1) | |
| return args | |
| def _main(): | |
| args = _get_parsed_arguments() | |
| dlq = load_queue(args.source_dlq) | |
| print( | |
| 'Configuration:', | |
| f' Configured source DLQ with URL: {dlq.url}', | |
| f' Configured Limit: {args.limit}', | |
| sep='\n' | |
| ) | |
| if not args.no_confirm and not _prompt_to_continue(): | |
| print('Exiting due to user input') | |
| exit(1) | |
| print('Starting to republish...') | |
| republish_all_messages( | |
| source_queue=dlq, | |
| limit=args.limit, | |
| ) | |
| print('Republish operation(s) complete!') | |
| if __name__ == '__main__': | |
| _main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Republishes EventBridge events that were sent to an AWS SQS dead-letter queue (DLQ) | |
| Useful for retrying EventBridge events that were sent to a DLQ after failing | |
| too many times (e.g. once bugs are fixed or initial error is otherwise resolved). | |
| Workflow: | |
| 1. Get a message from the configured source DLQ | |
| 2. Publish a new event to the configured destination lambda, | |
| using the EventBridge event derived from the body of the message from #1 | |
| 3. After a successful async publish, delete the message retrieved in #1 from the DLQ | |
| 4a. If deletion in #3 was successful, repeat until the configured DLQ is empty | |
| or configured limit is reached | |
| 4b. If deletion #3 failed, get user confirmation before continuing | |
| Tested with Python 3.6.8, Python 3.8 | |
| """ | |
| import argparse | |
| import json | |
| import boto3 | |
| sqs = boto3.resource('sqs') | |
| def load_queue(url): | |
| queue = sqs.Queue(url) | |
| queue.reload() | |
| return queue | |
| def get_next_message(source_queue): | |
| messages = source_queue.receive_messages( | |
| MaxNumberOfMessages=1, | |
| VisibilityTimeout=0, | |
| WaitTimeSeconds=2, | |
| ) | |
| if messages: | |
| print( | |
| f'Received DLQ message with ID {messages[0].message_id} ' | |
| f'(receipt handle: {messages[0].receipt_handle})' | |
| ) | |
| return messages[0] | |
| return None | |
| def publish_event(dlq_message, target_lambda): | |
| rs = boto3.client('lambda').invoke( | |
| FunctionName=target_lambda, | |
| InvocationType='Event', | |
| Payload=dlq_message.body.encode('utf-8') | |
| ) | |
| print('Sent stream event to', target_lambda) | |
| return rs['StatusCode'] == 202 | |
| def delete_message(source_queue, message): | |
| rs = source_queue.delete_messages( | |
| Entries=[{'Id': message.message_id, 'ReceiptHandle': message.receipt_handle}] | |
| ) | |
| if rs.get('Failed'): | |
| print(f'Failed to delete message with ID {message.message_id}') | |
| print(rs['Failed'][0]) | |
| return False | |
| return True | |
| def republish_all_messages(source_queue, target_lambda, limit=None): | |
| counter = 1 | |
| while True: | |
| message = get_next_message(source_queue) | |
| if message is None: | |
| break | |
| is_republished = publish_event(message, target_lambda) | |
| if is_republished: | |
| deletion_successful = delete_message(source_queue, message) | |
| if not deletion_successful and _prompt_to_continue() is False: | |
| break | |
| else: | |
| if _prompt_to_continue() is False: | |
| break | |
| if limit is not None and counter >= limit: | |
| break | |
| else: | |
| counter += 1 | |
| def _prompt_to_continue() -> bool: | |
| return input('Type "yes" to continue (all other input aborts): ').strip() == 'yes' | |
| def _get_parsed_arguments(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--source-dlq', '-s', required=True, help='URL of the source DLQ') | |
| parser.add_argument('--target-lambda', '-t', required=True, help='ARN or name of target Lambda') | |
| parser.add_argument('--limit', '-l', type=int, help='Optional limit for republish operations') | |
| parser.add_argument( | |
| '--no-confirm', | |
| action='store_true', | |
| default=False, | |
| help=( | |
| 'Set to disable confirmation of configured settings ' | |
| 'before proceeding with republish operations' | |
| ) | |
| ) | |
| args = parser.parse_args() | |
| if args.limit is not None and args.limit <= 0: | |
| print( | |
| 'Error:', | |
| ' --limit must be an integer greater than or equal to 1', | |
| sep='\n', | |
| ) | |
| exit(1) | |
| return args | |
| def _main(): | |
| args = _get_parsed_arguments() | |
| dlq = load_queue(args.source_dlq) | |
| target_lambda = args.target_lambda | |
| print( | |
| 'Configuration:', | |
| f' Configured source DLQ with URL: {dlq.url}', | |
| f' Configured Lambda: {target_lambda}', | |
| f' Configured Limit: {args.limit}', | |
| sep='\n' | |
| ) | |
| if not args.no_confirm and not _prompt_to_continue(): | |
| print('Exiting due to user input') | |
| exit(1) | |
| print('Starting to republish...') | |
| republish_all_messages( | |
| source_queue=dlq, | |
| target_lambda=target_lambda, | |
| limit=args.limit, | |
| ) | |
| print('Republish operation(s) complete!') | |
| if __name__ == '__main__': | |
| _main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment