Skip to content

Instantly share code, notes, and snippets.

@gmr
Created July 18, 2020 19:40
Show Gist options
  • Select an option

  • Save gmr/535c68a72b0338b3c4dd1832403422b1 to your computer and use it in GitHub Desktop.

Select an option

Save gmr/535c68a72b0338b3c4dd1832403422b1 to your computer and use it in GitHub Desktop.

Revisions

  1. gmr created this gist Jul 18, 2020.
    197 changes: 197 additions & 0 deletions quorum-to-classic.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,197 @@
    #!/usr/bin/env python3
    import argparse
    import json
    import logging
    import sys
    import time
    import typing
    from urllib import parse

    import httpx

    LOGGER = logging.getLogger(__name__)


    class Converter:

    def __init__(self, url: str, vhost: str) -> None:
    self.base_url = url.rstrip('/')
    self.vhost = parse.quote(vhost, '')
    response = httpx.get('{}/api/queues/{}'.format(
    self.base_url, self.vhost))
    self.queues = sorted((queue for queue in response.json()
    if queue['type'] == 'quorum'
    and not queue['auto_delete']),
    key=lambda queue: queue['name'])

    def run(self) -> None:
    for queue in self.queues:
    LOGGER.info('Processing %s', queue['name'])
    temp_queue = '{}-temp'.format(queue['name'])
    if not self.create_queue(
    temp_queue,
    self._strip_x_queue_type(queue['arguments']),
    durable=queue['durable'],
    exclusive=queue['exclusive']):
    LOGGER.error('Failed to create temporary queue: %s',
    temp_queue)
    sys.exit(1)
    self.switch_bindings(queue['name'], temp_queue)
    self.wait_for_queue_to_drain(queue['name'])
    if not self.delete_queue(queue['name']):
    LOGGER.error('Failed to delete queue: %s', queue['name'])
    sys.exit(1)
    if not self.create_queue(
    queue['name'],
    self._strip_x_queue_type(queue['arguments']),
    durable=queue['durable'],
    exclusive=queue['exclusive']):
    LOGGER.error('Failed to create queue: %s', queue['name'])
    sys.exit(1)
    self.switch_bindings(temp_queue, queue['name'])
    if not self.create_temp_shovel(temp_queue, queue['name']):
    LOGGER.error('Failed to create temp shovel from %s to %s',
    temp_queue, queue['name'])
    sys.exit(1)
    self.wait_for_queue_to_drain(temp_queue)
    if not self.delete_queue(temp_queue):
    LOGGER.error('Failed to delete queue: %s', temp_queue)
    sys.exit(1)

    def bind_queue(self,
    exchange: str,
    routing_key: str,
    queue: str,
    arguments: dict) -> bool:
    url = '{}/api/bindings/{}/e/{}/q/{}'.format(
    self.base_url, self.vhost, exchange, queue)
    response = httpx.post(
    url, headers={'Content-Type': 'application/json'},
    data=json.dumps({'routing_key': routing_key,
    'arguments': arguments}))
    return response.status_code == 201

    def create_queue(self,
    name: str,
    arguments: dict,
    durable: bool,
    exclusive: bool) -> bool:
    response = httpx.put(self._api_url_queue(name),
    headers={'Content-Type': 'application/json'},
    data=json.dumps({
    'arguments': arguments,
    'auto_delete': False,
    'durable': durable,
    'exclusive': exclusive,
    'type': 'classic'}))
    LOGGER.debug('Response: %r', response)
    return response.status_code == 201

    def create_temp_shovel(self, from_queue: str, to_queue: str) -> bool:
    shovel_name = '{}-to-{}'.format(from_queue, to_queue)
    response = httpx.put(
    '{}/api/parameters/shovel/{}/{}'.format(
    self.base_url, self.vhost, shovel_name),
    headers={'Content-Type': 'application/json'},
    data=json.dumps({
    'component': 'shovel',
    'name': shovel_name,
    'value': {
    'ack-mode': 'on-confirm',
    'add-forward-headers': False,
    'delete-after': 'queue-length',
    'dest-queue': to_queue,
    'dest-uri': 'amqp://',
    'prefetch-count': 100,
    'reconnect-delay': 30,
    'src-queue': from_queue,
    'src-uri': 'amqp://'
    },
    'vhost': self.vhost}))
    if response.status_code != 201:
    LOGGER.error('Response: %r: %r', response, response.json())
    return response.status_code == 201

    def delete_queue(self, name: str) -> bool:
    response = httpx.delete(self._api_url_queue(name))
    LOGGER.debug('Response: %r', response)
    return response.status_code == 204

    def get_queue_bindings(self, name: str) -> typing.List[typing.Dict]:
    response = httpx.get(
    '{}/api/queues/{}/{}/bindings'.format(
    self.base_url, self.vhost, name))
    return [binding for binding in response.json()
    if binding['destination'] != binding['properties_key']
    and binding['destination'] != binding['routing_key']
    and binding['source'] != '']

    def switch_bindings(self, from_queue: str, to_queue: str) -> None:
    for binding in self.get_queue_bindings(from_queue):
    if not self.bind_queue(
    binding['source'],
    binding['routing_key'],
    to_queue,
    binding['arguments']):
    LOGGER.error('Failed to bind queue %s to %s with %s',
    to_queue, binding['source'],
    binding['routing_key'])
    sys.exit(1)
    if not self.unbind_queue(
    binding['source'],
    from_queue,
    binding['properties_key']):
    LOGGER.error('Failed to unbind queue %s to %s with %s',
    from_queue, binding['source'],
    binding['routing_key'])
    sys.exit(1)

    def unbind_queue(self,
    exchange: str,
    queue: str,
    properties_key: str) -> bool:
    url = '{}/api/bindings/{}/e/{}/q/{}/{}'.format(
    self.base_url, self.vhost, exchange, queue, properties_key)
    response = httpx.delete(url)
    return response.status_code == 204

    def wait_for_queue_to_drain(self, name: str) -> None:
    while True:
    response = httpx.get(self._api_url_queue(name))
    if not response.status_code == 200:
    LOGGER.error('Failed to get queue details', name)
    sys.exit(1)
    body = response.json()
    if body['messages'] == 0:
    return
    LOGGER.info('Queue %s has %i messages', name, body['messages'])
    time.sleep(5)

    def _api_url_queue(self, name: str) -> str:
    return '{}/api/queues/{}/{}'.format(self.base_url, self.vhost, name)

    @staticmethod
    def _strip_x_queue_type(args: dict) -> dict:
    if 'x-queue-type' in args:
    del args['x-queue-type']
    return args


    def main() -> None:
    parser = argparse.ArgumentParser(
    description='Convert Quorum queues to Classic queues',
    formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--vhost', default='/')
    parser.add_argument(
    'url', metavar='URL', nargs='?',
    default='http://guest:guest@localhost:15672',
    help='The Base URL to the RabbitMQ Management UI, '
    'including credentials')

    args = parser.parse_args()
    logging.basicConfig(level=logging.INFO)
    Converter(args.url, args.vhost).run()


    if __name__ == '__main__':
    main()