Created
July 18, 2020 19:40
-
-
Save gmr/535c68a72b0338b3c4dd1832403422b1 to your computer and use it in GitHub Desktop.
Revisions
-
gmr created this gist
Jul 18, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,197 @@ #!/usr/bin/env python3 import argparse import json import logging import sys import time import typing from urllib import parse import httpx LOGGER = logging.getLogger(__name__) class Converter: def __init__(self, url: str, vhost: str) -> None: self.base_url = url.rstrip('/') self.vhost = parse.quote(vhost, '') response = httpx.get('{}/api/queues/{}'.format( self.base_url, self.vhost)) self.queues = sorted((queue for queue in response.json() if queue['type'] == 'quorum' and not queue['auto_delete']), key=lambda queue: queue['name']) def run(self) -> None: for queue in self.queues: LOGGER.info('Processing %s', queue['name']) temp_queue = '{}-temp'.format(queue['name']) if not self.create_queue( temp_queue, self._strip_x_queue_type(queue['arguments']), durable=queue['durable'], exclusive=queue['exclusive']): LOGGER.error('Failed to create temporary queue: %s', temp_queue) sys.exit(1) self.switch_bindings(queue['name'], temp_queue) self.wait_for_queue_to_drain(queue['name']) if not self.delete_queue(queue['name']): LOGGER.error('Failed to delete queue: %s', queue['name']) sys.exit(1) if not self.create_queue( queue['name'], self._strip_x_queue_type(queue['arguments']), durable=queue['durable'], exclusive=queue['exclusive']): LOGGER.error('Failed to create queue: %s', queue['name']) sys.exit(1) self.switch_bindings(temp_queue, queue['name']) if not self.create_temp_shovel(temp_queue, queue['name']): LOGGER.error('Failed to create temp shovel from %s to %s', temp_queue, queue['name']) sys.exit(1) self.wait_for_queue_to_drain(temp_queue) if not self.delete_queue(temp_queue): LOGGER.error('Failed to delete queue: %s', temp_queue) sys.exit(1) def bind_queue(self, exchange: str, routing_key: str, queue: str, arguments: dict) -> bool: url = '{}/api/bindings/{}/e/{}/q/{}'.format( self.base_url, self.vhost, exchange, queue) response = httpx.post( url, headers={'Content-Type': 'application/json'}, data=json.dumps({'routing_key': routing_key, 'arguments': arguments})) return response.status_code == 201 def create_queue(self, name: str, arguments: dict, durable: bool, exclusive: bool) -> bool: response = httpx.put(self._api_url_queue(name), headers={'Content-Type': 'application/json'}, data=json.dumps({ 'arguments': arguments, 'auto_delete': False, 'durable': durable, 'exclusive': exclusive, 'type': 'classic'})) LOGGER.debug('Response: %r', response) return response.status_code == 201 def create_temp_shovel(self, from_queue: str, to_queue: str) -> bool: shovel_name = '{}-to-{}'.format(from_queue, to_queue) response = httpx.put( '{}/api/parameters/shovel/{}/{}'.format( self.base_url, self.vhost, shovel_name), headers={'Content-Type': 'application/json'}, data=json.dumps({ 'component': 'shovel', 'name': shovel_name, 'value': { 'ack-mode': 'on-confirm', 'add-forward-headers': False, 'delete-after': 'queue-length', 'dest-queue': to_queue, 'dest-uri': 'amqp://', 'prefetch-count': 100, 'reconnect-delay': 30, 'src-queue': from_queue, 'src-uri': 'amqp://' }, 'vhost': self.vhost})) if response.status_code != 201: LOGGER.error('Response: %r: %r', response, response.json()) return response.status_code == 201 def delete_queue(self, name: str) -> bool: response = httpx.delete(self._api_url_queue(name)) LOGGER.debug('Response: %r', response) return response.status_code == 204 def get_queue_bindings(self, name: str) -> typing.List[typing.Dict]: response = httpx.get( '{}/api/queues/{}/{}/bindings'.format( self.base_url, self.vhost, name)) return [binding for binding in response.json() if binding['destination'] != binding['properties_key'] and binding['destination'] != binding['routing_key'] and binding['source'] != ''] def switch_bindings(self, from_queue: str, to_queue: str) -> None: for binding in self.get_queue_bindings(from_queue): if not self.bind_queue( binding['source'], binding['routing_key'], to_queue, binding['arguments']): LOGGER.error('Failed to bind queue %s to %s with %s', to_queue, binding['source'], binding['routing_key']) sys.exit(1) if not self.unbind_queue( binding['source'], from_queue, binding['properties_key']): LOGGER.error('Failed to unbind queue %s to %s with %s', from_queue, binding['source'], binding['routing_key']) sys.exit(1) def unbind_queue(self, exchange: str, queue: str, properties_key: str) -> bool: url = '{}/api/bindings/{}/e/{}/q/{}/{}'.format( self.base_url, self.vhost, exchange, queue, properties_key) response = httpx.delete(url) return response.status_code == 204 def wait_for_queue_to_drain(self, name: str) -> None: while True: response = httpx.get(self._api_url_queue(name)) if not response.status_code == 200: LOGGER.error('Failed to get queue details', name) sys.exit(1) body = response.json() if body['messages'] == 0: return LOGGER.info('Queue %s has %i messages', name, body['messages']) time.sleep(5) def _api_url_queue(self, name: str) -> str: return '{}/api/queues/{}/{}'.format(self.base_url, self.vhost, name) @staticmethod def _strip_x_queue_type(args: dict) -> dict: if 'x-queue-type' in args: del args['x-queue-type'] return args def main() -> None: parser = argparse.ArgumentParser( description='Convert Quorum queues to Classic queues', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--vhost', default='/') parser.add_argument( 'url', metavar='URL', nargs='?', default='http://guest:guest@localhost:15672', help='The Base URL to the RabbitMQ Management UI, ' 'including credentials') args = parser.parse_args() logging.basicConfig(level=logging.INFO) Converter(args.url, args.vhost).run() if __name__ == '__main__': main()