Skip to content

Instantly share code, notes, and snippets.

@apsamuel
Last active November 13, 2021 19:53
Show Gist options
  • Select an option

  • Save apsamuel/04cbb2538f64a287f72e18e00ba76450 to your computer and use it in GitHub Desktop.

Select an option

Save apsamuel/04cbb2538f64a287f72e18e00ba76450 to your computer and use it in GitHub Desktop.
[SPIKE] declared queues unused by celery beat scheduled tasks

Expectations

  • Background tasks are defined in a Flask application namespace using Celery and the @app.task() decorator
  • Each tasks results is routed to a different queue on the broker

Celery is loaded

app = create_app()
celery = middleware(
    app=app
)

def middleware(
    app: Flask = None,
    mq_host: str = None,
    mq_port: int = None,
    mq_user: str = None,
    mq_password: str = None,
) -> Celery:
    mq_host = "127.0.0.1" if mq_host is None else mq_host 
    mq_port = 5726 if mq_port is None else mq_port
    # print(f"import name: {app.import_name}")
    celery = Celery(
       app.import_name,
       backend='rpc://',
       broker=broker_connection_string()
    )
    #celery.conf.task_default_queue = "metrics.system"
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

Example usage

In [84]: pylibs.sidecars.metrics.app.celery
Out[84]: <Celery pylibs.sidecars.metrics.metrics at 0xaff9d690>

In [91]: pylibs.sidecars.metrics.app.celery.amqp.queues
Out[91]: 
{'default': <unbound Queue default -> [default->default]>,
 'metrics.system': <unbound Queue metrics.system -> [metrics->metrics.system]>,
 'metrics.system.vmem': <unbound Queue metrics.system.vmem -> [metrics->metrics.system.vmem]>,
 'metrics.system.swap': <unbound Queue metrics.system.swap -> [metrics->metrics.system.swap]>,
 'metrics.system.cputime': <unbound Queue metrics.system.cputime -> [metrics->metrics.system.cputime]>,
 'metrics.system.cpustats': <unbound Queue metrics.system.cpustats -> [metrics->metrics.system.cpustats]>,
 'metrics.system.temps': <unbound Queue metrics.system.temps -> [metrics->metrics.system.temps]>,
 'metrics.system.diskusage': <unbound Queue metrics.system.diskusage -> [metrics->metrics.system.diskusage]>,
 'metrics.system.netio': <unbound Queue metrics.system.netio -> [metrics->metrics.system.netio]>}
 
In [93]: pylibs.sidecars.metrics.app.celery.conf.task_queues
Out[93]: 
(<unbound Queue default -> [default->default]>,
 <unbound Queue metrics.system -> [metrics->metrics.system]>,
 <unbound Queue metrics.system.vmem -> [metrics->metrics.system.vmem]>,
 <unbound Queue metrics.system.swap -> [metrics->metrics.system.swap]>,
 <unbound Queue metrics.system.cputime -> [metrics->metrics.system.cputime]>,
 <unbound Queue metrics.system.cpustats -> [metrics->metrics.system.cpustats]>,
 <unbound Queue metrics.system.temps -> [metrics->metrics.system.temps]>,
 <unbound Queue metrics.system.diskusage -> [metrics->metrics.system.diskusage]>,
 <unbound Queue metrics.system.netio -> [metrics->metrics.system.netio]>)

I have defined a beat schedule a follows, these tasks execute with no problem, however they are not routed to the

celery.conf.update(beat_schedule = {
    'publish-vmem-60s': {
        'task': 'pylibs.sidecars.metrics.app.publish_vmem',
        'schedule': 60.0,
        'options': {
            'queue': 'metrics.system.vmem',
            'routing_key': 'metrics.system.vmem',
            # 'exchange': 'metrics',            
            'exchange_type': 'direct',
            'priority': 1,
        },
    },
    'publish-swap-60s': {
        'task': 'pylibs.sidecars.metrics.app.publish_swap',
        'schedule': 60.0,
        'options': {
            'queue': 'metrics.system.swap',
            'routing_key': 'metrics.system.swap',
            # 'exchange': 'metrics',            
            'exchange_type': 'direct',
            'priority': 1,
        },
    },
    'publish-cputime-60s': {
        'task': 'pylibs.sidecars.metrics.app.publish_cputime',
        'schedule': 60.0,
        'options': {
            'queue': 'metrics.system.cputime',
            'routing_key': 'metrics.system.cputime',
            # 'exchange': 'metrics',            
            'exchange_type': 'direct',
            'priority': 1,
        },
    },
    'publish-cpustats-60s': {
        'task': 'pylibs.sidecars.metrics.app.publish_cpustats',
        'schedule': 60.0,
        'options': {
            'queue': 'metrics.system.cpustats',
            'routing_key': 'metrics.system.cpustats',
            #'exchange': 'metrics',            
            'exchange_type': 'direct',
            'priority': 1,
        },
    },
    'publish-temperature-60s': {
        'task': 'pylibs.sidecars.metrics.app.publish_temps',
        'schedule': 60.0,
        'options': {
            'queue': 'metrics.system.temps',
            'routing_key': 'metrics.system.temps',
            # 'exchange': 'metrics',            
            'exchange_type': 'direct',
            'priority': 1,
        },
    },
    'publish-disk-usage-60s': {
        'task': 'pylibs.sidecars.metrics.app.publish_diskusage',
        'schedule': 60.0,
        'options': {
            'queue': 'metrics.system.diskusage',
            'routing_key': 'metrics.system.diskusage',
            # 'exchange': 'metrics',            
            'exchange_type': 'direct',
            'priority': 1,

        },
    },
    'publish-netio-60s': {
        'task': 'pylibs.sidecars.metrics.app.publish_netio',
        'schedule': 60.0,
        'options': {
            'queue': 'metrics.system.netio',
            'routing_key': 'metrics.system.netio',
            # 'exchange': 'metrics',         
            'exchange_type': 'direct',
            'priority': 1,
        },
    },
})

Problem

tasks don't get routed to the queues defined in beat schedule but instead are routed to the auto-generated task queue for celery (or hardcoded in @app.task(queue=<queue>)

pi@raspberrypi:~/restart/headunit $ ./dev/rabbitmqadmin list queues
+-----------------------------------------------+----------+
|                     name                      | messages |
+-----------------------------------------------+----------+
| 2aa4c2ac-4f30-348f-9eec-44103f41a86f          | 535      | <-- wrong queue
| celery@raspberrypi.celery.pidbox              | 0        |
| celeryev.4ccf8ffd-cfa4-4143-a0b2-3fc51576d695 | 0        |
| celeryev.7e381597-0755-4c91-84aa-88faee6d8f7c | 0        |
| default                                       | 0        |
| metrics.system                                | 0        |
| metrics.system.cpustats                       | 0        |
| metrics.system.cputime                        | 0        |
| metrics.system.diskusage                      | 0        |
| metrics.system.netio                          | 0        |
| metrics.system.swap                           | 0        |
| metrics.system.temps                          | 0        |
| metrics.system.vmem                           | 0        |
+-----------------------------------------------+----------+

pip freeze

pi@raspberrypi:~/restart/headunit $ pip3 freeze
aiofiles==0.7.0
ajsonrpc==1.2.0
alabaster==0.7.12
amqp==5.0.6
aniso8601==7.0.0
anyio==3.3.2
arduino-sketch==0.2
asgiref==3.4.1
asn1crypto==0.24.0
astroid==2.8.0
asttokens==1.1.13
attrs==21.2.0
automationhat==0.2.0
Babel==2.9.1
backcall==0.2.0
backports.entry-points-selectable==1.1.0
beautifulsoup4==4.7.1
billiard==3.6.4.0
black==21.9b0
blinker==1.4
blinkt==0.1.2
bottle==0.12.19
buttonshim==0.0.2
cached-property==1.5.1
Cap1xxx==0.1.3
celery==5.1.2
certifi==2018.8.24
cfgv==3.3.1
chardet==3.0.4
click==7.1.2
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.2.0
colorama==0.3.7
colorzero==1.1
cookies==2.2.1
coverage==6.0.2
cryptography==2.6.1
cupshelpers==1.0
decorator==5.1.0
distlib==0.3.2
dnspython==1.16.0
docker==3.4.1
docker-compose==1.21.0
docker-pycreds==0.3.0
dockerpty==0.4.1
docopt==0.6.2
docutils==0.14
drumhat==0.1.0
email-validator==1.1.3
entrypoints==0.3
envirophat==1.0.0
ExplorerHAT==0.4.2
filelock==3.0.12
flake8==3.9.2
Flask==2.0.1
Flask-GraphQL==2.0.1
Flask-MongoAlchemy==0.7.2
flask-mongoengine==1.0.0
Flask-WTF==0.15.1
fourletterphat==0.1.0
gpiozero==1.6.2
graphene==2.1.9
graphene-mongo==0.2.13
graphql-core==2.3.2
graphql-relay==2.0.1
graphql-server-core==1.2.0
h11==0.12.0
html5lib==1.0.1
identify==2.2.15
idna==3.2
ifaddr==0.1.7
imagesize==1.2.0
importlib-metadata==4.8.1
iniconfig==1.1.1
ipython==7.28.0
iso8601==0.1.16
isort==5.9.3
itsdangerous==2.0.1
jedi==0.18.0
Jinja2==3.0.1
jsonschema==2.6.0
keyring==17.1.1
keyrings.alt==3.1.1
kombu==5.2.1
lazy-object-proxy==1.6.0
librabbitmq==2.0.0
logilab-common==1.4.2
lxml==4.3.2
MarkupSafe==2.0.1
marshmallow==3.13.0
matplotlib-inline==0.1.3
mccabe==0.6.1
microdotphat==0.2.1
MongoAlchemy==0.19
mongoengine==0.23.1
mote==0.0.4
motephat==0.0.3
mypy==0.670
mypy-extensions==0.4.3
nodeenv==1.6.0
numpy==1.16.2
oauthlib==2.1.0
olefile==0.46
packaging==21.0
pantilthat==0.0.7
parso==0.8.2
pathspec==0.9.0
pexpect==4.6.0
pgzero==1.2
phatbeat==0.1.1
pianohat==0.1.0
picamera==1.13
pickleshare==0.7.5
piglow==1.2.5
pigpio==1.78
pika==1.2.0
Pillow==5.4.1
platformdirs==2.3.0
platformio==5.2.0
pluggy==1.0.0
pre-commit==2.15.0
promise==2.3
prompt-toolkit==3.0.20
psutil==5.5.1
py==1.10.0
pycairo==1.16.2
pycodestyle==2.7.0
pycrypto==2.6.1
pycups==1.9.73
pyelftools==0.27
pyFirmata==1.1.0
pyflakes==2.3.1
pygame==1.9.4.post1
Pygments==2.3.1
PyGObject==3.30.4
pyinotify==0.9.6
PyJWT==1.7.0
pylint==2.11.1
pymata4==1.11
pymongo==3.12.0
pyOpenSSL==19.0.0
pyparsing==2.4.7
pyserial==3.5
pysmbc==1.0.15.6
pytest==6.2.5
pytest-cov==3.0.0
pytest-mock-resources==2.1.3
python-apt==1.8.4.3
python-dotenv==0.19.0
python-settings==0.2.2
pytz==2021.3
pyudev==0.22.0
pyxdg==0.25
PyYAML==5.4.1
rainbowhat==0.1.0
regex==2021.9.30
reportlab==3.5.13
requests==2.21.0
requests-oauthlib==1.0.0
responses==0.9.0
roman==2.0.0
RPi.GPIO==0.7.0
RTIMULib==7.2.1
Rx==1.6.1
SCons==4.2.0
scrollphat==0.0.7
scrollphathd==1.2.1
SecretStorage==2.3.1
semantic-version==2.8.5
Send2Trash==1.5.0
sense-hat==2.2.0
simplejson==3.16.0
singledispatch==3.7.0
six==1.12.0
skywriter==0.0.7
sn3218==1.2.7
sniffio==1.2.0
snowballstemmer==2.1.0
soupsieve==1.8
Sphinx==4.2.0
sphinxcontrib-applehelp==1.0.2
sphinxcontrib-devhelp==1.0.2
sphinxcontrib-htmlhelp==2.0.0
sphinxcontrib-jsmath==1.0.1
sphinxcontrib-qthelp==1.0.3
sphinxcontrib-serializinghtml==1.1.5
spidev==3.5
SQLAlchemy==1.4.25
ssh-import-id==5.7
starlette==0.16.0
tabulate==0.8.9
texttable==1.6.0
thonny==3.3.10
toml==0.10.2
tomli==1.2.1
touchphat==0.0.1
traitlets==5.1.0
twython==3.7.0
typed-ast==1.4.3
typing-extensions==3.10.0.2
unicornhathd==0.0.4
urllib3==1.24.1
uvicorn==0.15.0
vine==5.0.0
virtualenv==20.8.0
wcwidth==0.2.5
webencodings==0.5.1
websocket-client==0.53.0
Werkzeug==2.0.1
wrapt==1.12.1
wsproto==1.0.0
WTForms==2.3.3
yapf==0.31.0
zeroconf==0.36.7
zipp==3.5.0

celery report

pi@raspberrypi:~/restart/headunit $ /usr/bin/python3.7 -m celery -A pylibs.sidecars.metrics.app.celery report
/home/pi/restart/headunit/pylibs/sidecars
import name: pylibs.sidecars.metrics.metrics

software -> celery:5.1.2 (sun-harmonics) kombu:5.2.1 py:3.7.3
            billiard:3.6.4.0 py-amqp:5.0.6
platform -> system:Linux arch:32bit, ELF
            kernel version:5.10.60-v7l+ imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:pyamqp results:rpc:///

broker_url: 'amqp://guest:********@localhost:5672//'
result_backend: 'rpc:///'
deprecated_settings: None
ENV: 'production'
DEBUG: False
TESTING: False
PROPAGATE_EXCEPTIONS: None
PRESERVE_CONTEXT_ON_EXCEPTION: None
SECRET_KEY: '********'
PERMANENT_SESSION_LIFETIME: datetime.timedelta(days=31)
USE_X_SENDFILE: False
SERVER_NAME: None
APPLICATION_ROOT: '/'
SESSION_COOKIE_NAME: 'session'
SESSION_COOKIE_DOMAIN: None
SESSION_COOKIE_PATH: None
SESSION_COOKIE_HTTPONLY: True
SESSION_COOKIE_SECURE: False
SESSION_COOKIE_SAMESITE: None
SESSION_REFRESH_EACH_REQUEST: True
MAX_CONTENT_LENGTH: None
SEND_FILE_MAX_AGE_DEFAULT: None
TRAP_BAD_REQUEST_ERRORS: None
TRAP_HTTP_EXCEPTIONS: False
EXPLAIN_TEMPLATE_LOADING: False
PREFERRED_URL_SCHEME: 'http'
JSON_AS_ASCII: True
JSON_SORT_KEYS: '********'
JSONIFY_PRETTYPRINT_REGULAR: False
JSONIFY_MIMETYPE: 'application/json'
TEMPLATES_AUTO_RELOAD: None
MAX_COOKIE_SIZE: 4093
MONGODB_SETTINGS: {
    'db': 'static',
    'host': 'mongodb://root:toor@mongo.darkphotonworks-labs.io:27017/static'}
task_create_missing_queues: False
task_queues: 
    (<unbound Queue default -> [default->default]>,
 <unbound Queue metrics.system -> [metrics->metrics.system]>,
 <unbound Queue metrics.system.vmem -> [metrics->metrics.system.vmem]>,
 <unbound Queue metrics.system.swap -> [metrics->metrics.system.swap]>,
 <unbound Queue metrics.system.cputime -> [metrics->metrics.system.cputime]>,
 <unbound Queue metrics.system.cpustats -> [metrics->metrics.system.cpustats]>,
 <unbound Queue metrics.system.temps -> [metrics->metrics.system.temps]>,
 <unbound Queue metrics.system.diskusage -> [metrics->metrics.system.diskusage]>,
 <unbound Queue metrics.system.netio -> [metrics->metrics.system.netio]>)
default_queue: 'metrics.system'
task_default_queue: 'metrics.system'
task_default_exchange: 'metrics'
task_default_exchange_type: 'direct'
task_default_routing_key: '********'
task_routes: {
    'pylibs.sidecars.metrics.app.publish_vmem': {   'queue': 'metrics.system.vmem',
                                                    'routing_key': '********'}}
beat_schedule: {
    'publish-cpustats-60s': {   'options': {   'exchange_type': 'direct',
                                               'priority': 1,
                                               'queue': 'metrics.system.cpustats',
                                               'routing_key': '********'},
                                'schedule': 60.0,
                                'task': 'pylibs.sidecars.metrics.app.publish_cpustats'},
    'publish-cputime-60s': {   'options': {   'exchange_type': 'direct',
                                              'priority': 1,
                                              'queue': 'metrics.system.cputime',
                                              'routing_key': '********'},
                               'schedule': 60.0,
                               'task': 'pylibs.sidecars.metrics.app.publish_cputime'},
    'publish-disk-usage-60s': {   'options': {   'exchange_type': 'direct',
                                                 'priority': 1,
                                                 'queue': 'metrics.system.diskusage',
                                                 'routing_key': '********'},
                                  'schedule': 60.0,
                                  'task': 'pylibs.sidecars.metrics.app.publish_diskusage'},
    'publish-netio-60s': {   'options': {   'exchange_type': 'direct',
                                            'priority': 1,
                                            'queue': 'metrics.system.netio',
                                            'routing_key': '********'},
                             'schedule': 60.0,
                             'task': 'pylibs.sidecars.metrics.app.publish_netio'},
    'publish-swap-60s': {   'options': {   'exchange_type': 'direct',
                                           'priority': 1,
                                           'queue': 'metrics.system.swap',
                                           'routing_key': '********'},
                            'schedule': 60.0,
                            'task': 'pylibs.sidecars.metrics.app.publish_swap'},
    'publish-temperature-60s': {   'options': {   'exchange_type': 'direct',
                                                  'priority': 1,
                                                  'queue': 'metrics.system.temps',
                                                  'routing_key': '********'},
                                   'schedule': 60.0,
                                   'task': 'pylibs.sidecars.metrics.app.publish_temps'},
    'publish-vmem-60s': {   'options': {   'exchange_type': 'direct',
                                           'priority': 1,
                                           'queue': 'metrics.system.vmem',
                                           'routing_key': '********'},
                            'schedule': 60.0,
                            'task': 'pylibs.sidecars.metrics.app.publish_vmem'}}
timezone: 'UTC'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment