15.2.178. camcops_server.cc_modules.celery

camcops_server/cc_modules/celery.py


Copyright (C) 2012, University of Cambridge, Department of Psychiatry. Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

This file is part of CamCOPS.

CamCOPS is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

CamCOPS is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.


Celery app.

Basic steps to set up Celery:

  • Our app will be “camcops_server.cc_modules”.

  • Within that, Celery expects “celery.py”, in which configuration is set up by defining the app object.

  • Also, in __init__.py, we should import that app. (No, scratch that; not necessary.)

  • That makes @shared_task work in all other modules here.

  • Finally, here, we ask Celery to scan tasks.py to find tasks.

Modified:

  • The @shared_task decorator doesn’t offer all the options that @app.task has. Let’s skip @shared_task and the increased faff that entails.

The difficult part seems to be getting a broker URL in the config.

  • If we load the config here, from celery.py, then if the config uses any SQLAlchemy objects, it’ll crash because some aren’t imported.

  • A better way is to delay configuring the app.

  • But also, it is very tricky if the config uses SQLAlchemy objects; so it shouldn’t.

Note also re logging:

  • The log here is configured (at times, at least) by Celery, so uses its log settings. At the time of startup, that looks like plain print() statements.

In general, prefer delayed imports during actual tasks. Otherwise circular imports are very hard to avoid.

If using a separate celery_tasks.py file:

  • Import this only after celery.py, or the decorators will fail.

  • If you see this error from camcops_server launch_workers when using a separate celery_tasks.py file:

    [2018-12-26 21:08:01,316: ERROR/MainProcess] Received unregistered task of type 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'.
    The message has been ignored and discarded.
    
    Did you remember to import the module containing this task?
    Or maybe you're using relative imports?
    
    Please see
    https://docs.celeryq.org/en/latest/internals/protocol.html
    for more information.
    
    The full contents of the message body was:
    '[["recipient_email_rnc"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (98b)
    Traceback (most recent call last):
      File "/home/rudolf/dev/venvs/camcops/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 558, in on_task_received
        strategy = strategies[type_]
    KeyError: 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'
    

    then (1) run with --verbose, which will show you the list of registered tasks; (2) note that everything here is absent; (3) insert a “crash” line at the top of this file and re-run; (4) note what’s importing this file too early.

General advice:

Task decorator options:

camcops_server.cc_modules.celery.backoff_delay_s(attempts: int) float[source]

Return a backoff delay, in seconds, given a number of attempts.

The delay increases very rapidly with the number of attempts: 1, 2, 4, 8, 16, 32, …

As per https://blog.balthazar-rouberol.com/celery-best-practices.

camcops_server.cc_modules.celery.delete_old_user_downloads(req: CamcopsRequest) None[source]

Deletes user download files that are past their expiry time.

Parameters

req – a camcops_server.cc_modules.cc_request.CamcopsRequest

camcops_server.cc_modules.celery.get_celery_settings_dict() Dict[str, Any][source]

Returns a dictionary of settings to configure Celery.

camcops_server.cc_modules.celery.jittered_delay_s() float[source]

Returns a retry delay, in seconds, that is jittered.

camcops_server.cc_modules.celery.purge_jobs() None[source]

Purge all jobs from the Celery queue.

camcops_server.cc_modules.celery.retry_backoff_if_raises(self: CeleryTask) None[source]

Context manager to retry a Celery task if an exception is raised, using a “backoff” method.

camcops_server.cc_modules.celery.retry_jitter_if_raises(self: CeleryTask) None[source]

Context manager to retry a Celery task if an exception is raised, using a “jittered delay” method.