15.2.166. camcops_server.cc_modules.celery

camcops_server/cc_modules/celery.py


Copyright (C) 2012, University of Cambridge, Department of Psychiatry. Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

This file is part of CamCOPS.

CamCOPS is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

CamCOPS is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.


Celery app.

Basic steps to set up Celery:

  • Our app will be “camcops_server.cc_modules”.

  • Within that, Celery expects “celery.py”, in which configuration is set up by defining the app object.

  • Also, in __init__.py, we should import that app. (No, scratch that; not necessary.)

  • That makes @shared_task work in all other modules here.

  • Finally, here, we ask Celery to scan tasks.py to find tasks.

Modified:

  • The @shared_task decorator doesn’t offer all the options that @app.task has. Let’s skip @shared_task and the increased faff that entails.

The difficult part seems to be getting a broker URL in the config.

  • If we load the config here, from celery.py, then if the config uses any SQLAlchemy objects, it’ll crash because some aren’t imported.

  • A better way is to delay configuring the app.

  • But also, it is very tricky if the config uses SQLAlchemy objects; so it shouldn’t.

Note also re logging:

  • The log here is configured (at times, at least) by Celery, so uses its log settings. At the time of startup, that looks like plain print() statements.

In general, prefer delayed imports during actual tasks. Otherwise circular imports are very hard to avoid.

If using a separate celery_tasks.py file:

  • Import this only after celery.py, or the decorators will fail.

  • If you see this error from camcops_server launch_workers when using a separate celery_tasks.py file:

    [2018-12-26 21:08:01,316: ERROR/MainProcess] Received unregistered task of type 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'.
    The message has been ignored and discarded.
    
    Did you remember to import the module containing this task?
    Or maybe you're using relative imports?
    
    Please see
    https://docs.celeryq.org/en/latest/internals/protocol.html
    for more information.
    
    The full contents of the message body was:
    '[["recipient_email_rnc"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (98b)
    Traceback (most recent call last):
      File "/home/rudolf/dev/venvs/camcops/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 558, in on_task_received
        strategy = strategies[type_]
    KeyError: 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'
    

    then (1) run with --verbose, which will show you the list of registered tasks; (2) note that everything here is absent; (3) insert a “crash” line at the top of this file and re-run; (4) note what’s importing this file too early.

General advice:

Task decorator options:

camcops_server.cc_modules.celery.backoff_delay_s(attempts: int) float[source]

Return a backoff delay, in seconds, given a number of attempts.

The delay increases very rapidly with the number of attempts: 1, 2, 4, 8, 16, 32, …

As per https://blog.balthazar-rouberol.com/celery-best-practices.

camcops_server.cc_modules.celery.delete_old_user_downloads(req: CamcopsRequest) None[source]

Deletes user download files that are past their expiry time.

Parameters

req – a camcops_server.cc_modules.cc_request.CamcopsRequest

camcops_server.cc_modules.celery.get_celery_settings_dict() Dict[str, Any][source]

Returns a dictionary of settings to configure Celery.

camcops_server.cc_modules.celery.jittered_delay_s() float[source]

Returns a retry delay, in seconds, that is jittered.

camcops_server.cc_modules.celery.purge_jobs() None[source]

Purge all jobs from the Celery queue.

camcops_server.cc_modules.celery.retry_backoff_if_raises(self: CeleryTask) None[source]

Context manager to retry a Celery task if an exception is raised, using a “backoff” method.

camcops_server.cc_modules.celery.retry_jitter_if_raises(self: CeleryTask) None[source]

Context manager to retry a Celery task if an exception is raised, using a “jittered delay” method.

camcops_server.cc_modules.celery.uniform(low=0.0, high=1.0, size=None)

Draw samples from a uniform distribution.

Samples are uniformly distributed over the half-open interval [low, high) (includes low, but excludes high). In other words, any value within the given interval is equally likely to be drawn by uniform.

Note

New code should use the uniform method of a default_rng() instance instead; please see the random-quick-start.

Parameters
  • low (float or array_like of floats, optional) – Lower boundary of the output interval. All values generated will be greater than or equal to low. The default value is 0.

  • high (float or array_like of floats) – Upper boundary of the output interval. All values generated will be less than or equal to high. The default value is 1.0.

  • size (int or tuple of ints, optional) – Output shape. If the given shape is, e.g., (m, n, k), then m * n * k samples are drawn. If size is None (default), a single value is returned if low and high are both scalars. Otherwise, np.broadcast(low, high).size samples are drawn.

Returns

out – Drawn samples from the parameterized uniform distribution.

Return type

ndarray or scalar

See also

randint

Discrete uniform distribution, yielding integers.

random_integers

Discrete uniform distribution over the closed interval [low, high].

random_sample

Floats uniformly distributed over [0, 1).

random

Alias for random_sample.

rand

Convenience function that accepts dimensions as input, e.g., rand(2,2) would generate a 2-by-2 array of floats, uniformly distributed over [0, 1).

Generator.uniform

which should be used for new code.

Notes

The probability density function of the uniform distribution is

p(x) = \frac{1}{b - a}

anywhere within the interval [a, b), and zero elsewhere.

When high == low, values of low will be returned. If high < low, the results are officially undefined and may eventually raise an error, i.e. do not rely on this function to behave when passed arguments satisfying that inequality condition. The high limit may be included in the returned array of floats due to floating-point rounding in the equation low + (high-low) * random_sample(). For example:

>>> x = np.float32(5*0.99999999)
>>> x
5.0

Examples

Draw samples from the distribution:

>>> s = np.random.uniform(-1,0,1000)

All values are within the given interval:

>>> np.all(s >= -1)
True
>>> np.all(s < 0)
True

Display the histogram of the samples, along with the probability density function:

>>> import matplotlib.pyplot as plt
>>> count, bins, ignored = plt.hist(s, 15, density=True)
>>> plt.plot(bins, np.ones_like(bins), linewidth=2, color='r')
>>> plt.show()