Source code for camcops_server.cc_modules.celery

#!/usr/bin/env python

"""
camcops_server/cc_modules/celery.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Celery app.**

Basic steps to set up Celery:

- Our app will be "camcops_server.cc_modules".
- Within that, Celery expects "celery.py", in which configuration is set up
  by defining the ``app`` object.
- Also, in ``__init__.py``, we should import that app. (No, scratch that; not
  necessary.)
- That makes ``@shared_task`` work in all other modules here.
- Finally, here, we ask Celery to scan ``tasks.py`` to find tasks.

Modified:

- The ``@shared_task`` decorator doesn't offer all the options that
  ``@app.task`` has. Let's skip ``@shared_task`` and the increased faff that
  entails.

The difficult part seems to be getting a broker URL in the config.

- If we load the config here, from ``celery.py``, then if the config uses any
  SQLAlchemy objects, it'll crash because some aren't imported.
- A better way is to delay configuring the app.
- But also, it is very tricky if the config uses SQLAlchemy objects; so it
  shouldn't.

Note also re logging:

- The log here is configured (at times, at least) by Celery, so uses its log
  settings. At the time of startup, that looks like plain ``print()``
  statements.

**In general, prefer delayed imports during actual tasks. Otherwise circular
imports are very hard to avoid.**

If using a separate ``celery_tasks.py`` file:

- Import this only after celery.py, or the decorators will fail.

- If you see this error from ``camcops_server launch_workers`` when using a
  separate ``celery_tasks.py`` file:

  .. code-block:: none

    [2018-12-26 21:08:01,316: ERROR/MainProcess] Received unregistered task of type 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'.
    The message has been ignored and discarded.

    Did you remember to import the module containing this task?
    Or maybe you're using relative imports?

    Please see
    https://docs.celeryq.org/en/latest/internals/protocol.html
    for more information.

    The full contents of the message body was:
    '[["recipient_email_rnc"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (98b)
    Traceback (most recent call last):
      File "/home/rudolf/dev/venvs/camcops/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 558, in on_task_received
        strategy = strategies[type_]
    KeyError: 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'

  then (1) run with ``--verbose``, which will show you the list of registered
  tasks; (2) note that everything here is absent; (3) insert a "crash" line at
  the top of this file and re-run; (4) note what's importing this file too
  early.

General advice:

- https://medium.com/@taylorhughes/three-quick-tips-from-two-years-with-celery-c05ff9d7f9eb

Task decorator options:

- https://docs.celeryproject.org/en/latest/reference/celery.app.task.html
- ``bind``: makes the first argument a ``self`` parameter to manipulate the
  task itself;
  https://docs.celeryproject.org/en/latest/userguide/tasks.html#example
- ``acks_late`` (for the decorator) or ``task_acks_late``: see

  - https://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_acks_late
  - https://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry
  - Here I am retrying on failure with exponential backoff, but not using
    ``acks_late`` in addition.

"""  # noqa

from contextlib import contextmanager
import logging
import os
from typing import Any, Dict, TYPE_CHECKING

from cardinal_pythonlib.json.serialize import json_encode, json_decode
from cardinal_pythonlib.logs import BraceStyleAdapter
from celery import Celery, current_task
from kombu.serialization import register

# TODO: Investigate
# "from numpy.random import uniform" leads to uniform ending up in the
# documentation for this file and Sphinx error:
# celery.py:docstring of camcops_server.cc_modules.celery.uniform:9:
# undefined label: random-quick-start
from numpy import random

# noinspection PyUnresolvedReferences
import camcops_server.cc_modules.cc_all_models  # import side effects (ensure all models registered)  # noqa

if TYPE_CHECKING:
    from celery.app.task import Task as CeleryTask
    from camcops_server.cc_modules.cc_export import DownloadOptions
    from camcops_server.cc_modules.cc_request import CamcopsRequest
    from camcops_server.cc_modules.cc_taskcollection import TaskCollection

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Constants
# =============================================================================

CELERY_APP_NAME = "camcops_server.cc_modules"
# CELERY_TASKS_MODULE = "celery_tasks"
# ... look for "celery_tasks.py" (as opposed to the more common "tasks.py")

CELERY_TASK_MODULE_NAME = CELERY_APP_NAME + ".celery"

CELERY_SOFT_TIME_LIMIT_SEC = 300.0
MAX_RETRIES = 10
RETRY_MIN_DELAY_S = 5.0
RETRY_MAX_DELAY_S = 60.0


# =============================================================================
# Configuration
# =============================================================================

register(
    "json",
    json_encode,
    json_decode,
    content_type="application/json",
    content_encoding="utf-8",
)


[docs]def get_celery_settings_dict() -> Dict[str, Any]: """ Returns a dictionary of settings to configure Celery. """ log.debug("Configuring Celery") from camcops_server.cc_modules.cc_config import ( CrontabEntry, get_default_config_from_os_env, ) # delayed import config = get_default_config_from_os_env() # ------------------------------------------------------------------------- # Schedule # ------------------------------------------------------------------------- schedule = {} # type: Dict[str, Any] # ------------------------------------------------------------------------- # User-defined schedule entries # ------------------------------------------------------------------------- for crontab_entry in config.crontab_entries: recipient_name = crontab_entry.content schedule_name = f"export_to_{recipient_name}" log.debug( "Adding regular export job {}: crontab: {}", schedule_name, crontab_entry, ) schedule[schedule_name] = { "task": CELERY_TASK_MODULE_NAME + ".export_to_recipient_backend", "schedule": crontab_entry.get_celery_schedule(), "args": (recipient_name,), } # ------------------------------------------------------------------------- # Housekeeping once per minute # ------------------------------------------------------------------------- housekeeping_crontab = CrontabEntry(minute="*", content="dummy") schedule["housekeeping"] = { "task": CELERY_TASK_MODULE_NAME + ".housekeeping", "schedule": housekeeping_crontab.get_celery_schedule(), } # ------------------------------------------------------------------------- # Final Celery settings # ------------------------------------------------------------------------- return { "beat_schedule": schedule, "broker_url": config.celery_broker_url, "timezone": config.schedule_timezone, "task_annotations": { "camcops_server.cc_modules.celery.export_task_backend": { "rate_limit": config.celery_export_task_rate_limit } }, # "worker_log_color": True, # true by default for consoles anyway }
# ============================================================================= # The Celery app # ============================================================================= celery_app = Celery() celery_app.add_defaults(get_celery_settings_dict()) # celery_app.autodiscover_tasks([CELERY_APP_NAME], # related_name=CELERY_TASKS_MODULE) _ = """ @celery_app.on_configure.connect def _app_on_configure(**kwargs) -> None: log.critical("@celery_app.on_configure: {!r}", kwargs) @celery_app.on_after_configure.connect def _app_on_after_configure(**kwargs) -> None: log.critical("@celery_app.on_after_configure: {!r}", kwargs) """ # ============================================================================= # Test tasks # ============================================================================= @celery_app.task(bind=True) def debug_task(self) -> None: """ Test as follows: .. code-block:: python from camcops_server.cc_modules.celery import * debug_task.delay() and also launch workers with ``camcops_server launch_workers``. For a bound task, the first (``self``) argument is the task instance; see https://docs.celeryproject.org/en/latest/userguide/tasks.html#bound-tasks """ log.info(f"self: {self!r}") log.info(f"Backend: {current_task.backend}") @celery_app.task def debug_task_add(a: float, b: float) -> float: """ Test as follows: .. code-block:: python from camcops_server.cc_modules.celery import * debug_task_add.delay() """ result = a + b log.info("a = {}, b = {} => a + b = {}", a, b, result) return result # ============================================================================= # Exponential backoff # =============================================================================
[docs]def backoff_delay_s(attempts: int) -> float: """ Return a backoff delay, in seconds, given a number of attempts. The delay increases very rapidly with the number of attempts: 1, 2, 4, 8, 16, 32, ... As per https://blog.balthazar-rouberol.com/celery-best-practices. """ return 2.0**attempts
[docs]def jittered_delay_s() -> float: """ Returns a retry delay, in seconds, that is jittered. """ return random.uniform(RETRY_MIN_DELAY_S, RETRY_MAX_DELAY_S)
[docs]@contextmanager def retry_backoff_if_raises(self: "CeleryTask") -> None: """ Context manager to retry a Celery task if an exception is raised, using a "backoff" method. """ try: yield except Exception as exc: delay_s = backoff_delay_s(self.request.retries) log.error( "Task failed. Backing off. Will retry after {} s. " "Error was:\n{}", delay_s, exc, ) self.retry(countdown=delay_s, exc=exc)
[docs]@contextmanager def retry_jitter_if_raises(self: "CeleryTask") -> None: """ Context manager to retry a Celery task if an exception is raised, using a "jittered delay" method. """ try: yield except Exception as exc: delay_s = jittered_delay_s() log.error( "Task failed. Will retry after jittered delay: {} s. " "Error was:\n{}", delay_s, exc, ) self.retry(countdown=delay_s, exc=exc)
# ============================================================================= # Controlling tasks # =============================================================================
[docs]def purge_jobs() -> None: """ Purge all jobs from the Celery queue. """ log.info("Purging back-end (Celery) jobs") celery_app.control.purge() log.info("... purged.")
# ============================================================================= # Note re request creation and context manager # ============================================================================= # NOTE: # - You MUST use some sort of context manager to handle requests here, because # the normal Pyramid router [which ordinarily called the "finished" callbacks # via request._process_finished_callbacks()] will not be plumbed in. # - For debugging, use the MySQL command # SELECT * FROM information_schema.innodb_locks; # ============================================================================= # Export tasks # ============================================================================= @celery_app.task( bind=True, ignore_result=True, max_retries=MAX_RETRIES, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, ) def export_task_backend( self: "CeleryTask", recipient_name: str, basetable: str, task_pk: int ) -> None: """ This function exports a single task but does so with only simple (string, integer) information, so it can be called via the Celery task queue. - Calls :func:`camcops_server.cc_modules.cc_export.export_task`. Args: self: the Celery task, :class:`celery.app.task.Task` recipient_name: export recipient name (as per the config file) basetable: name of the task's base table task_pk: server PK of the task """ from camcops_server.cc_modules.cc_export import ( export_task, ) # delayed import from camcops_server.cc_modules.cc_request import ( command_line_request_context, ) # delayed import from camcops_server.cc_modules.cc_taskfactory import ( task_factory_no_security_checks, ) # delayed import with retry_backoff_if_raises(self): with command_line_request_context() as req: recipient = req.get_export_recipient(recipient_name) task = task_factory_no_security_checks( req.dbsession, basetable, task_pk ) if task is None: log.error( "export_task_backend for recipient {!r}: No task " "found for {} {}", recipient_name, basetable, task_pk, ) return export_task(req, recipient, task) @celery_app.task( bind=True, ignore_result=True, max_retries=MAX_RETRIES, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, ) def export_to_recipient_backend( self: "CeleryTask", recipient_name: str ) -> None: """ From the backend, exports all pending tasks for a given recipient. - Calls :func:`camcops_server.cc_modules.cc_export.export`. There are two ways of doing this, when we call :func:`camcops_server.cc_modules.cc_export.export`. If we set ``schedule_via_backend=True``, this backend job fires up a whole bunch of other backend jobs, one per task to export. If we set ``schedule_via_backend=False``, our current backend job does all the work. Which is best? - Well, keeping it to one job is a bit simpler, perhaps. - But everything is locked independently so we can do the multi-job version, and we may as well use all the workers available. So my thought was to use ``schedule_via_backend=True``. - However, that led to database deadlocks (multiple processes trying to write a new ExportRecipient). - With some bugfixes to equality checking and a global lock (see :meth:`camcops_server.cc_modules.cc_config.CamcopsConfig.get_master_export_recipient_lockfilename`), we can try again with ``True``. - Yup, works nicely. Args: self: the Celery task, :class:`celery.app.task.Task` recipient_name: export recipient name (as per the config file) """ from camcops_server.cc_modules.cc_export import export # delayed import from camcops_server.cc_modules.cc_request import ( command_line_request_context, ) # delayed import with retry_backoff_if_raises(self): with command_line_request_context() as req: export( req, recipient_names=[recipient_name], schedule_via_backend=True, ) @celery_app.task( bind=True, ignore_result=True, max_retries=MAX_RETRIES, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, ) def email_basic_dump( self: "CeleryTask", collection: "TaskCollection", options: "DownloadOptions", ) -> None: """ Send a research dump to the user via e-mail. Args: self: the Celery task, :class:`celery.app.task.Task` collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` options: :class:`camcops_server.cc_modules.cc_export.DownloadOptions` governing the download """ from camcops_server.cc_modules.cc_export import ( make_exporter, ) # delayed import from camcops_server.cc_modules.cc_request import ( command_line_request_context, ) # delayed import with retry_backoff_if_raises(self): # Create request for a specific user, so the auditing is correct. with command_line_request_context(user_id=options.user_id) as req: collection.set_request(req) exporter = make_exporter( req=req, collection=collection, options=options ) exporter.send_by_email() @celery_app.task( bind=True, ignore_result=True, max_retries=MAX_RETRIES, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC, ) def create_user_download( self: "CeleryTask", collection: "TaskCollection", options: "DownloadOptions", ) -> None: """ Create a research dump file for the user to download later. Let them know by e-mail. Args: self: the Celery task, :class:`celery.app.task.Task` collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` options: :class:`camcops_server.cc_modules.cc_export.DownloadOptions` governing the download """ from camcops_server.cc_modules.cc_export import ( make_exporter, ) # delayed import from camcops_server.cc_modules.cc_request import ( command_line_request_context, ) # delayed import with retry_backoff_if_raises(self): # Create request for a specific user, so the auditing is correct. with command_line_request_context(user_id=options.user_id) as req: collection.set_request(req) exporter = make_exporter( req=req, collection=collection, options=options ) exporter.create_user_download_and_email() # ============================================================================= # Housekeeping # =============================================================================
[docs]def delete_old_user_downloads(req: "CamcopsRequest") -> None: """ Deletes user download files that are past their expiry time. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ from camcops_server.cc_modules.cc_export import ( UserDownloadFile, ) # delayed import now = req.now lifetime = req.user_download_lifetime_duration oldest_allowed = now - lifetime log.debug(f"Deleting any user download files older than {oldest_allowed}") for root, dirs, files in os.walk(req.config.user_download_dir): for f in files: udf = UserDownloadFile(filename=f, directory=root) if udf.older_than(oldest_allowed): udf.delete()
@celery_app.task( bind=False, ignore_result=True, soft_time_limit=CELERY_SOFT_TIME_LIMIT_SEC ) def housekeeping() -> None: """ Function that is run regularly to do cleanup tasks. (Remember that the ``bind`` parameter to ``@celery_app.task()`` means that the first argument to the function, typically called ``self``, is the Celery task. We don't need it here. See https://docs.celeryproject.org/en/latest/userguide/tasks.html#bound-tasks.) """ from camcops_server.cc_modules.cc_request import ( command_line_request_context, ) # delayed import from camcops_server.cc_modules.cc_session import ( CamcopsSession, ) # delayed import from camcops_server.cc_modules.cc_user import ( SecurityAccountLockout, SecurityLoginFailure, ) # delayed import log.debug("Housekeeping!") with command_line_request_context() as req: # --------------------------------------------------------------------- # Housekeeping tasks # --------------------------------------------------------------------- # We had a problem with MySQL locking here (two locks open for what # appeared to be a single delete, followed by a lock timeout). Seems to # be working now. CamcopsSession.delete_old_sessions(req) SecurityAccountLockout.delete_old_account_lockouts(req) SecurityLoginFailure.clear_dummy_login_failures_if_necessary(req) delete_old_user_downloads(req)