Source code for camcops_server.cc_modules.celery



    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <>.


**Celery app.**

Basic steps to set up Celery:

- Our app will be "camcops_server.cc_modules".
- Within that, Celery expects "", in which configuration is set up
  by defining the ``app`` object.
- Also, in ````, we should import that app. (No, scratch that; not
- That makes ``@shared_task`` work in all other modules here.
- Finally, here, we ask Celery to scan ```` to find tasks.


- The ``@shared_task`` decorator doesn't offer all the options that
  ``@app.task`` has. Let's skip ``@shared_task`` and the increased faff that

The difficult part seems to be getting a broker URL in the config.

- If we load the config here, from ````, then if the config uses any
  SQLAlchemy objects, it'll crash because some aren't imported.
- A better way is to delay configuring the app.
- But also, it is very tricky if the config uses SQLAlchemy objects; so it

Note also re logging:

- The log here is configured (at times, at least) by Celery, so uses its log
  settings. At the time of startup, that looks like plain ``print()``

**In general, prefer delayed imports during actual tasks. Otherwise circular
imports are very hard to avoid.**

If using a separate ```` file:

- Import this only after, or the decorators will fail.

- If you see this error from ``camcops_server launch_workers`` when using a
  separate ```` file:

  .. code-block:: none

    [2018-12-26 21:08:01,316: ERROR/MainProcess] Received unregistered task of type 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'.
    The message has been ignored and discarded.

    Did you remember to import the module containing this task?
    Or maybe you're using relative imports?

    Please see
    for more information.

    The full contents of the message body was:
    '[["recipient_email_rnc"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (98b)
    Traceback (most recent call last):
      File "/home/rudolf/dev/venvs/camcops/lib/python3.6/site-packages/celery/worker/consumer/", line 558, in on_task_received
        strategy = strategies[type_]
    KeyError: 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'

  then (1) run with ``--verbose``, which will show you the list of registered
  tasks; (2) note that everything here is absent; (3) insert a "crash" line at
  the top of this file and re-run; (4) note what's importing this file too

General advice:


Task decorator options:

- ``bind``: makes the first argument a ``self`` parameter to manipulate the
  task itself;
- ``acks_late`` (for the decorator) or ``task_acks_late``: see

  - Here I am retrying on failure with exponential backoff, but not using
    ``acks_late`` in addition.

"""  # noqa

from contextlib import contextmanager
import logging
import os
from typing import Any, Dict, TYPE_CHECKING

from cardinal_pythonlib.json.serialize import json_encode, json_decode
from cardinal_pythonlib.logs import BraceStyleAdapter
from celery import Celery, current_task
from kombu.serialization import register

# TODO: Investigate
# "from numpy.random import uniform" leads to uniform ending up in the
# documentation for this file and Sphinx error:
# of camcops_server.cc_modules.celery.uniform:9:
# undefined label: random-quick-start
from numpy import random

# noinspection PyUnresolvedReferences
import camcops_server.cc_modules.cc_all_models  # import side effects (ensure all models registered)  # noqa

    from import Task as CeleryTask
    from camcops_server.cc_modules.cc_export import DownloadOptions
    from camcops_server.cc_modules.cc_request import CamcopsRequest
    from camcops_server.cc_modules.cc_taskcollection import TaskCollection

log = BraceStyleAdapter(logging.getLogger(__name__))

# =============================================================================
# Constants
# =============================================================================

CELERY_APP_NAME = "camcops_server.cc_modules"
# CELERY_TASKS_MODULE = "celery_tasks"
# ... look for "" (as opposed to the more common "")



# =============================================================================
# Configuration
# =============================================================================


[docs]def get_celery_settings_dict() -> Dict[str, Any]: """ Returns a dictionary of settings to configure Celery. """ log.debug("Configuring Celery") from camcops_server.cc_modules.cc_config import ( CrontabEntry, get_default_config_from_os_env, ) # delayed import config = get_default_config_from_os_env() # ------------------------------------------------------------------------- # Schedule # ------------------------------------------------------------------------- schedule = {} # type: Dict[str, Any] # ------------------------------------------------------------------------- # User-defined schedule entries # ------------------------------------------------------------------------- for crontab_entry in config.crontab_entries: recipient_name = crontab_entry.content schedule_name = f"export_to_{recipient_name}" log.debug( "Adding regular export job {}: crontab: {}", schedule_name, crontab_entry, ) schedule[schedule_name] = { "task": CELERY_TASK_MODULE_NAME + ".export_to_recipient_backend", "schedule": crontab_entry.get_celery_schedule(), "args": (recipient_name,), } # ------------------------------------------------------------------------- # Housekeeping once per minute # ------------------------------------------------------------------------- housekeeping_crontab = CrontabEntry(minute="*", content="dummy") schedule["housekeeping"] = { "task": CELERY_TASK_MODULE_NAME + ".housekeeping", "schedule": housekeeping_crontab.get_celery_schedule(), } # ------------------------------------------------------------------------- # Final Celery settings # ------------------------------------------------------------------------- return { "beat_schedule": schedule, "broker_url": config.celery_broker_url, "timezone": config.schedule_timezone, "task_annotations": { "camcops_server.cc_modules.celery.export_task_backend": { "rate_limit": config.celery_export_task_rate_limit } }, # "worker_log_color": True, # true by default for consoles anyway }
# ============================================================================= # The Celery app # ============================================================================= celery_app = Celery() celery_app.add_defaults(get_celery_settings_dict()) # celery_app.autodiscover_tasks([CELERY_APP_NAME], # related_name=CELERY_TASKS_MODULE) _ = """ @celery_app.on_configure.connect def _app_on_configure(**kwargs) -> None: log.critical("@celery_app.on_configure: {!r}", kwargs) @celery_app.on_after_configure.connect def _app_on_after_configure(**kwargs) -> None: log.critical("@celery_app.on_after_configure: {!r}", kwargs) """ # ============================================================================= # Test tasks # ============================================================================= @celery_app.task(bind=True) def debug_task(self) -> None: """ Test as follows: .. code-block:: python from camcops_server.cc_modules.celery import * debug_task.delay() and also launch workers with ``camcops_server launch_workers``. For a bound task, the first (``self``) argument is the task instance; see """"self: {self!r}")"Backend: {current_task.backend}") @celery_app.task def debug_task_add(a: float, b: float) -> float: """ Test as follows: .. code-block:: python from camcops_server.cc_modules.celery import * debug_task_add.delay() """ result = a + b"a = {}, b = {} => a + b = {}", a, b, result) return result # ============================================================================= # Exponential backoff # =============================================================================
[docs]def backoff_delay_s(attempts: int) -> float: """ Return a backoff delay, in seconds, given a number of attempts. The delay increases very rapidly with the number of attempts: 1, 2, 4, 8, 16, 32, ... As per """ return 2.0**attempts
[docs]def jittered_delay_s() -> float: """ Returns a retry delay, in seconds, that is jittered. """ return random.uniform(RETRY_MIN_DELAY_S, RETRY_MAX_DELAY_S)
[docs]@contextmanager def retry_backoff_if_raises(self: "CeleryTask") -> None: """ Context manager to retry a Celery task if an exception is raised, using a "backoff" method. """ try: yield except Exception as exc: delay_s = backoff_delay_s(self.request.retries) log.error( "Task failed. Backing off. Will retry after {} s. " "Error was:\n{}", delay_s, exc, ) self.retry(countdown=delay_s, exc=exc)
[docs]@contextmanager def retry_jitter_if_raises(self: "CeleryTask") -> None: """ Context manager to retry a Celery task if an exception is raised, using a "jittered delay" method. """ try: yield except Exception as exc: delay_s = jittered_delay_s() log.error( "Task failed. Will retry after jittered delay: {} s. " "Error was:\n{}", delay_s, exc, ) self.retry(countdown=delay_s, exc=exc)
# ============================================================================= # Controlling tasks # =============================================================================
[docs]def purge_jobs() -> None: """ Purge all jobs from the Celery queue. """"Purging back-end (Celery) jobs") celery_app.control.purge()"... purged.")
# ============================================================================= # Note re request creation and context manager # ============================================================================= # NOTE: # - You MUST use some sort of context manager to handle requests here, because # the normal Pyramid router [which ordinarily called the "finished" callbacks # via request._process_finished_callbacks()] will not be plumbed in. # - For debugging, use the MySQL command # SELECT * FROM information_schema.innodb_locks; # ============================================================================= # Export tasks # ============================================================================= @celery_app.task( bind=True, ignore_result=True, max_retries=MAX_RETRIES, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, ) def export_task_backend( self: "CeleryTask", recipient_name: str, basetable: str, task_pk: int ) -> None: """ This function exports a single task but does so with only simple (string, integer) information, so it can be called via the Celery task queue. - Calls :func:`camcops_server.cc_modules.cc_export.export_task`. Args: self: the Celery task, :class:`` recipient_name: export recipient name (as per the config file) basetable: name of the task's base table task_pk: server PK of the task """ from camcops_server.cc_modules.cc_export import ( export_task, ) # delayed import from camcops_server.cc_modules.cc_request import ( command_line_request_context, ) # delayed import from camcops_server.cc_modules.cc_taskfactory import ( task_factory_no_security_checks, ) # delayed import with retry_backoff_if_raises(self): with command_line_request_context() as req: recipient = req.get_export_recipient(recipient_name) task = task_factory_no_security_checks( req.dbsession, basetable, task_pk ) if task is None: log.error( "export_task_backend for recipient {!r}: No task " "found for {} {}", recipient_name, basetable, task_pk, ) return export_task(req, recipient, task) @celery_app.task( bind=True, ignore_result=True, max_retries=MAX_RETRIES, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, ) def export_to_recipient_backend( self: "CeleryTask", recipient_name: str ) -> None: """ From the backend, exports all pending tasks for a given recipient. - Calls :func:`camcops_server.cc_modules.cc_export.export`. There are two ways of doing this, when we call :func:`camcops_server.cc_modules.cc_export.export`. If we set ``schedule_via_backend=True``, this backend job fires up a whole bunch of other backend jobs, one per task to export. If we set ``schedule_via_backend=False``, our current backend job does all the work. Which is best? - Well, keeping it to one job is a bit simpler, perhaps. - But everything is locked independently so we can do the multi-job version, and we may as well use all the workers available. So my thought was to use ``schedule_via_backend=True``. - However, that led to database deadlocks (multiple processes trying to write a new ExportRecipient). - With some bugfixes to equality checking and a global lock (see :meth:`camcops_server.cc_modules.cc_config.CamcopsConfig.get_master_export_recipient_lockfilename`), we can try again with ``True``. - Yup, works nicely. Args: self: the Celery task, :class:`` recipient_name: export recipient name (as per the config file) """ from camcops_server.cc_modules.cc_export import export # delayed import from camcops_server.cc_modules.cc_request import ( command_line_request_context, ) # delayed import with retry_backoff_if_raises(self): with command_line_request_context() as req: export( req, recipient_names=[recipient_name], schedule_via_backend=True, ) @celery_app.task( bind=True, ignore_result=True, max_retries=MAX_RETRIES, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, ) def email_basic_dump( self: "CeleryTask", collection: "TaskCollection", options: "DownloadOptions", ) -> None: """ Send a research dump to the user via e-mail. Args: self: the Celery task, :class:`` collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` options: :class:`camcops_server.cc_modules.cc_export.DownloadOptions` governing the download """ from camcops_server.cc_modules.cc_export import ( make_exporter, ) # delayed import from camcops_server.cc_modules.cc_request import ( command_line_request_context, ) # delayed import with retry_backoff_if_raises(self): # Create request for a specific user, so the auditing is correct. with command_line_request_context(user_id=options.user_id) as req: collection.set_request(req) exporter = make_exporter( req=req, collection=collection, options=options ) exporter.send_by_email() @celery_app.task( bind=True, ignore_result=True, max_retries=MAX_RETRIES, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, ) def create_user_download( self: "CeleryTask", collection: "TaskCollection", options: "DownloadOptions", ) -> None: """ Create a research dump file for the user to download later. Let them know by e-mail. Args: self: the Celery task, :class:`` collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` options: :class:`camcops_server.cc_modules.cc_export.DownloadOptions` governing the download """ from camcops_server.cc_modules.cc_export import ( make_exporter, ) # delayed import from camcops_server.cc_modules.cc_request import ( command_line_request_context, ) # delayed import with retry_backoff_if_raises(self): # Create request for a specific user, so the auditing is correct. with command_line_request_context(user_id=options.user_id) as req: collection.set_request(req) exporter = make_exporter( req=req, collection=collection, options=options ) exporter.create_user_download_and_email() # ============================================================================= # Housekeeping # =============================================================================
[docs]def delete_old_user_downloads(req: "CamcopsRequest") -> None: """ Deletes user download files that are past their expiry time. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ from camcops_server.cc_modules.cc_export import ( UserDownloadFile, ) # delayed import now = lifetime = req.user_download_lifetime_duration oldest_allowed = now - lifetime log.debug(f"Deleting any user download files older than {oldest_allowed}") for root, dirs, files in os.walk(req.config.user_download_dir): for f in files: udf = UserDownloadFile(filename=f, directory=root) if udf.older_than(oldest_allowed): udf.delete()
@celery_app.task( bind=False, ignore_result=True, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC ) def housekeeping() -> None: """ Function that is run regularly to do cleanup tasks. (Remember that the ``bind`` parameter to ``@celery_app.task()`` means that the first argument to the function, typically called ``self``, is the Celery task. We don't need it here. See """ from camcops_server.cc_modules.cc_request import ( command_line_request_context, ) # delayed import from camcops_server.cc_modules.cc_session import ( CamcopsSession, ) # delayed import from camcops_server.cc_modules.cc_user import ( SecurityAccountLockout, SecurityLoginFailure, ) # delayed import log.debug("Housekeeping!") with command_line_request_context() as req: # --------------------------------------------------------------------- # Housekeeping tasks # --------------------------------------------------------------------- # We had a problem with MySQL locking here (two locks open for what # appeared to be a single delete, followed by a lock timeout). Seems to # be working now. CamcopsSession.delete_old_sessions(req) SecurityAccountLockout.delete_old_account_lockouts(req) SecurityLoginFailure.clear_dummy_login_failures_if_necessary(req) delete_old_user_downloads(req)