Skip to content

Instantly share code, notes, and snippets.

@TylerHendrickson
Last active January 11, 2022 14:39
Show Gist options
  • Select an option

  • Save TylerHendrickson/eff26096e6cc8626e4a57b2edb9895f0 to your computer and use it in GitHub Desktop.

Select an option

Save TylerHendrickson/eff26096e6cc8626e4a57b2edb9895f0 to your computer and use it in GitHub Desktop.
Republish SQS DLQ messages
"""Republishes messages in an AWS SQS dead-letter queue (DLQ)
Useful for moving messages in a DLQ back to a "normal" SQS queue
(e.g. once bugs are fixed or initial error is otherwise resolved).
Workflow:
1. Get a message from the configured source DLQ
2. Publish a new message to the configured destination queue,
using the body of the message from #1
3. After a successful publish, delete the message retrieved in #1 from the DLQ
4a. If deletion in #3 was successful, repeat until the configured DLQ is empty
or configured limit is reached
4b. If deletion #3 failed, get user confirmation before continuing
Tested with Python 3.6.8
"""
import argparse
import boto3
sqs = boto3.resource('sqs')
def load_queue(url):
queue = sqs.Queue(url)
queue.reload()
return queue
def get_next_message(source_queue):
messages = source_queue.receive_messages(
MaxNumberOfMessages=1,
VisibilityTimeout=0,
WaitTimeSeconds=2,
)
if messages:
print(
f'Received DLQ message with ID {messages[0].message_id} '
f'(receipt handle: {messages[0].receipt_handle})'
)
return messages[0]
return None
def publish_message(destination_queue, message):
rs = destination_queue.send_message(
MessageBody=message.body,
DelaySeconds=60,
)
print(f"Published new message to {destination_queue.url} with ID {rs['MessageId']}")
def delete_message(source_queue, message):
rs = source_queue.delete_messages(
Entries=[{'Id': message.message_id, 'ReceiptHandle': message.receipt_handle}]
)
if rs.get('Failed'):
print(f'Failed to delete message with ID {message.message_id}')
print(rs['Failed'][0])
return False
return True
def republish_all_messages(source_queue, destination_queue, limit=None):
counter = 1
while True:
message = get_next_message(source_queue)
if message is None:
break
publish_message(destination_queue, message)
deletion_successful = delete_message(source_queue, message)
if not deletion_successful and _prompt_to_continue() is False:
break
if limit is not None and counter >= limit:
break
else:
counter += 1
def _prompt_to_continue() -> bool:
return input('Type "yes" to continue (all other input aborts): ').strip() == 'yes'
def _get_parsed_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--source-dlq', '-s', required=True, help='URL of the source DLQ')
parser.add_argument(
'--destination-queue',
'-d',
required=True,
help='URL of the destination queue'
)
parser.add_argument('--limit', '-l', type=int, help='Optional limit for republish operations')
parser.add_argument(
'--no-confirm',
action='store_true',
default=False,
help=(
'Set to disable confirmation of configured settings '
'before proceeding with republish operations'
)
)
args = parser.parse_args()
if args.limit is not None and args.limit <= 0:
print(
'Error:',
' --limit must be an integer greater than or equal to 1',
sep='\n',
)
exit(1)
return args
def _main():
args = _get_parsed_arguments()
dlq = load_queue(args.source_dlq)
queue = load_queue(args.destination_queue)
print(
'Configuration:',
f' Configured source DLQ with URL: {dlq.url}',
f' Configured destination queue with URL: {queue.url}',
f' Configured Limit: {args.limit}',
sep='\n'
)
if not args.no_confirm and not _prompt_to_continue():
print('Exiting due to user input')
exit(1)
print('Starting to republish...')
republish_all_messages(
source_queue=dlq,
destination_queue=queue,
limit=args.limit,
)
print('Republish operation(s) complete!')
if __name__ == '__main__':
_main()
"""Republishes DDB stream events that were sent to an AWS SQS dead-letter queue (DLQ)
Useful for retrying DynamoDB stream events that were sent to a DLQ after failing
too many times (e.g. once bugs are fixed or initial error is otherwise resolved).
Workflow:
1. Get a message from the configured source DLQ
2. Publish a new message to the configured destination lambda,
using the DDB stream event derived from the body of the message from #1
3. After a successful async publish, delete the message retrieved in #1 from the DLQ
4a. If deletion in #3 was successful, repeat until the configured DLQ is empty
or configured limit is reached
4b. If deletion #3 failed, get user confirmation before continuing
Tested with Python 3.6.8, Python 3.8
"""
import argparse
import json
import boto3
sqs = boto3.resource('sqs')
def to_json_helper(obj):
try:
return obj.isoformat()
except AttributeError:
return json.JSONEncoder().default(obj)
def load_queue(url):
queue = sqs.Queue(url)
queue.reload()
return queue
def get_next_message(source_queue):
messages = source_queue.receive_messages(
MaxNumberOfMessages=1,
VisibilityTimeout=0,
WaitTimeSeconds=2,
)
if messages:
print(
f'Received DLQ message with ID {messages[0].message_id} '
f'(receipt handle: {messages[0].receipt_handle})'
)
return messages[0]
return None
def publish_message(destination_queue, message):
rs = destination_queue.send_message(
MessageBody=message.body,
DelaySeconds=60,
)
print(f"Published new message to {destination_queue.url} with ID {rs['MessageId']}")
def publish_stream_event(dlq_message):
stream_event = get_stream_event_payload(dlq_message['DDBStreamBatchInfo'])
rs = boto3.client('lambda').invoke(
FunctionName=dlq_message['requestContext']['functionArn'],
InvocationType='Event',
Payload=json.dumps(stream_event, default=to_json_helper).encode('utf-8')
)
print('Sent stream event to', dlq_message['requestContext']['functionArn'])
return rs['StatusCode'] == 202
def get_stream_event_payload(stream_batch_info):
stream_client = boto3.client('dynamodbstreams')
shard_iterator = stream_client.get_shard_iterator(
StreamArn=stream_batch_info['streamArn'],
ShardId=stream_batch_info['shardId'],
ShardIteratorType='AT_SEQUENCE_NUMBER',
SequenceNumber=stream_batch_info['startSequenceNumber']
)['ShardIterator']
records_rs = stream_client.get_records(ShardIterator=shard_iterator, Limit=1)
return {'Records': records_rs['Records']}
def delete_message(source_queue, message):
rs = source_queue.delete_messages(
Entries=[{'Id': message.message_id, 'ReceiptHandle': message.receipt_handle}]
)
if rs.get('Failed'):
print(f'Failed to delete message with ID {message.message_id}')
print(rs['Failed'][0])
return False
return True
def republish_all_messages(source_queue, limit=None):
counter = 1
while True:
message = get_next_message(source_queue)
if message is None:
break
is_republished = publish_stream_event(json.loads(message.body))
if is_republished:
deletion_successful = delete_message(source_queue, message)
if not deletion_successful and _prompt_to_continue() is False:
break
else:
if _prompt_to_continue() is False:
break
if limit is not None and counter >= limit:
break
else:
counter += 1
def _prompt_to_continue() -> bool:
return input('Type "yes" to continue (all other input aborts): ').strip() == 'yes'
def _get_parsed_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--source-dlq', '-s', required=True, help='URL of the source DLQ')
parser.add_argument('--limit', '-l', type=int, help='Optional limit for republish operations')
parser.add_argument(
'--no-confirm',
action='store_true',
default=False,
help=(
'Set to disable confirmation of configured settings '
'before proceeding with republish operations'
)
)
args = parser.parse_args()
if args.limit is not None and args.limit <= 0:
print(
'Error:',
' --limit must be an integer greater than or equal to 1',
sep='\n',
)
exit(1)
return args
def _main():
args = _get_parsed_arguments()
dlq = load_queue(args.source_dlq)
print(
'Configuration:',
f' Configured source DLQ with URL: {dlq.url}',
f' Configured Limit: {args.limit}',
sep='\n'
)
if not args.no_confirm and not _prompt_to_continue():
print('Exiting due to user input')
exit(1)
print('Starting to republish...')
republish_all_messages(
source_queue=dlq,
limit=args.limit,
)
print('Republish operation(s) complete!')
if __name__ == '__main__':
_main()
"""Republishes EventBridge events that were sent to an AWS SQS dead-letter queue (DLQ)
Useful for retrying EventBridge events that were sent to a DLQ after failing
too many times (e.g. once bugs are fixed or initial error is otherwise resolved).
Workflow:
1. Get a message from the configured source DLQ
2. Publish a new event to the configured destination lambda,
using the EventBridge event derived from the body of the message from #1
3. After a successful async publish, delete the message retrieved in #1 from the DLQ
4a. If deletion in #3 was successful, repeat until the configured DLQ is empty
or configured limit is reached
4b. If deletion #3 failed, get user confirmation before continuing
Tested with Python 3.6.8, Python 3.8
"""
import argparse
import json
import boto3
sqs = boto3.resource('sqs')
def load_queue(url):
queue = sqs.Queue(url)
queue.reload()
return queue
def get_next_message(source_queue):
messages = source_queue.receive_messages(
MaxNumberOfMessages=1,
VisibilityTimeout=0,
WaitTimeSeconds=2,
)
if messages:
print(
f'Received DLQ message with ID {messages[0].message_id} '
f'(receipt handle: {messages[0].receipt_handle})'
)
return messages[0]
return None
def publish_event(dlq_message, target_lambda):
rs = boto3.client('lambda').invoke(
FunctionName=target_lambda,
InvocationType='Event',
Payload=dlq_message.body.encode('utf-8')
)
print('Sent stream event to', target_lambda)
return rs['StatusCode'] == 202
def delete_message(source_queue, message):
rs = source_queue.delete_messages(
Entries=[{'Id': message.message_id, 'ReceiptHandle': message.receipt_handle}]
)
if rs.get('Failed'):
print(f'Failed to delete message with ID {message.message_id}')
print(rs['Failed'][0])
return False
return True
def republish_all_messages(source_queue, target_lambda, limit=None):
counter = 1
while True:
message = get_next_message(source_queue)
if message is None:
break
is_republished = publish_event(message, target_lambda)
if is_republished:
deletion_successful = delete_message(source_queue, message)
if not deletion_successful and _prompt_to_continue() is False:
break
else:
if _prompt_to_continue() is False:
break
if limit is not None and counter >= limit:
break
else:
counter += 1
def _prompt_to_continue() -> bool:
return input('Type "yes" to continue (all other input aborts): ').strip() == 'yes'
def _get_parsed_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--source-dlq', '-s', required=True, help='URL of the source DLQ')
parser.add_argument('--target-lambda', '-t', required=True, help='ARN or name of target Lambda')
parser.add_argument('--limit', '-l', type=int, help='Optional limit for republish operations')
parser.add_argument(
'--no-confirm',
action='store_true',
default=False,
help=(
'Set to disable confirmation of configured settings '
'before proceeding with republish operations'
)
)
args = parser.parse_args()
if args.limit is not None and args.limit <= 0:
print(
'Error:',
' --limit must be an integer greater than or equal to 1',
sep='\n',
)
exit(1)
return args
def _main():
args = _get_parsed_arguments()
dlq = load_queue(args.source_dlq)
target_lambda = args.target_lambda
print(
'Configuration:',
f' Configured source DLQ with URL: {dlq.url}',
f' Configured Lambda: {target_lambda}',
f' Configured Limit: {args.limit}',
sep='\n'
)
if not args.no_confirm and not _prompt_to_continue():
print('Exiting due to user input')
exit(1)
print('Starting to republish...')
republish_all_messages(
source_queue=dlq,
target_lambda=target_lambda,
limit=args.limit,
)
print('Republish operation(s) complete!')
if __name__ == '__main__':
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment