15.2.160. camcops_server.cc_modules.celery¶
camcops_server/cc_modules/celery.py
Copyright (C) 2012, University of Cambridge, Department of Psychiatry. Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
CamCOPS is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
Celery app.
Basic steps to set up Celery:
Our app will be “camcops_server.cc_modules”.
Within that, Celery expects “celery.py”, in which configuration is set up by defining the
app
object.Also, in
__init__.py
, we should import that app. (No, scratch that; not necessary.)That makes
@shared_task
work in all other modules here.Finally, here, we ask Celery to scan
tasks.py
to find tasks.
Modified:
The
@shared_task
decorator doesn’t offer all the options that@app.task
has. Let’s skip@shared_task
and the increased faff that entails.
The difficult part seems to be getting a broker URL in the config.
If we load the config here, from
celery.py
, then if the config uses any SQLAlchemy objects, it’ll crash because some aren’t imported.A better way is to delay configuring the app.
But also, it is very tricky if the config uses SQLAlchemy objects; so it shouldn’t.
Note also re logging:
The log here is configured (at times, at least) by Celery, so uses its log settings. At the time of startup, that looks like plain
print()
statements.
In general, prefer delayed imports during actual tasks. Otherwise circular imports are very hard to avoid.
If using a separate celery_tasks.py
file:
Import this only after celery.py, or the decorators will fail.
If you see this error from
camcops_server launch_workers
when using a separatecelery_tasks.py
file:[2018-12-26 21:08:01,316: ERROR/MainProcess] Received unregistered task of type 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'. The message has been ignored and discarded. Did you remember to import the module containing this task? Or maybe you're using relative imports? Please see https://docs.celeryq.org/en/latest/internals/protocol.html for more information. The full contents of the message body was: '[["recipient_email_rnc"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (98b) Traceback (most recent call last): File "/home/rudolf/dev/venvs/camcops/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 558, in on_task_received strategy = strategies[type_] KeyError: 'camcops_server.cc_modules.celery_tasks.export_to_recipient_backend'
then (1) run with
--verbose
, which will show you the list of registered tasks; (2) note that everything here is absent; (3) insert a “crash” line at the top of this file and re-run; (4) note what’s importing this file too early.
General advice:
Task decorator options:
https://docs.celeryproject.org/en/latest/reference/celery.app.task.html
bind
: makes the first argument aself
parameter to manipulate the task itself; https://docs.celeryproject.org/en/latest/userguide/tasks.html#exampleacks_late
(for the decorator) ortask_acks_late
: seehttps://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-task_acks_late
https://docs.celeryproject.org/en/latest/faq.html#faq-acks-late-vs-retry
Here I am retrying on failure with exponential backoff, but not using
acks_late
in addition.
- camcops_server.cc_modules.celery.backoff_delay_s(attempts: int) float [source]¶
Return a backoff delay, in seconds, given a number of attempts.
The delay increases very rapidly with the number of attempts: 1, 2, 4, 8, 16, 32, …
As per https://blog.balthazar-rouberol.com/celery-best-practices.
- camcops_server.cc_modules.celery.delete_old_user_downloads(req: CamcopsRequest) None [source]¶
Deletes user download files that are past their expiry time.
- Parameters
- camcops_server.cc_modules.celery.get_celery_settings_dict() Dict[str, Any] [source]¶
Returns a dictionary of settings to configure Celery.
- camcops_server.cc_modules.celery.jittered_delay_s() float [source]¶
Returns a retry delay, in seconds, that is jittered.
- camcops_server.cc_modules.celery.retry_backoff_if_raises(self: CeleryTask) None [source]¶
Context manager to retry a Celery task if an exception is raised, using a “backoff” method.
- camcops_server.cc_modules.celery.retry_jitter_if_raises(self: CeleryTask) None [source]¶
Context manager to retry a Celery task if an exception is raised, using a “jittered delay” method.
- camcops_server.cc_modules.celery.uniform(low=0.0, high=1.0, size=None)¶
Draw samples from a uniform distribution.
Samples are uniformly distributed over the half-open interval
[low, high)
(includes low, but excludes high). In other words, any value within the given interval is equally likely to be drawn by uniform.Note
New code should use the
uniform
method of adefault_rng()
instance instead; please see the random-quick-start.- Parameters
low (float or array_like of floats, optional) – Lower boundary of the output interval. All values generated will be greater than or equal to low. The default value is 0.
high (float or array_like of floats) – Upper boundary of the output interval. All values generated will be less than or equal to high. The default value is 1.0.
size (int or tuple of ints, optional) – Output shape. If the given shape is, e.g.,
(m, n, k)
, thenm * n * k
samples are drawn. If size isNone
(default), a single value is returned iflow
andhigh
are both scalars. Otherwise,np.broadcast(low, high).size
samples are drawn.
- Returns
out – Drawn samples from the parameterized uniform distribution.
- Return type
ndarray or scalar
See also
randint
Discrete uniform distribution, yielding integers.
random_integers
Discrete uniform distribution over the closed interval
[low, high]
.random_sample
Floats uniformly distributed over
[0, 1)
.random
Alias for random_sample.
rand
Convenience function that accepts dimensions as input, e.g.,
rand(2,2)
would generate a 2-by-2 array of floats, uniformly distributed over[0, 1)
.Generator.uniform
which should be used for new code.
Notes
The probability density function of the uniform distribution is
anywhere within the interval
[a, b)
, and zero elsewhere.When
high
==low
, values oflow
will be returned. Ifhigh
<low
, the results are officially undefined and may eventually raise an error, i.e. do not rely on this function to behave when passed arguments satisfying that inequality condition. Thehigh
limit may be included in the returned array of floats due to floating-point rounding in the equationlow + (high-low) * random_sample()
. For example:>>> x = np.float32(5*0.99999999) >>> x 5.0
Examples
Draw samples from the distribution:
>>> s = np.random.uniform(-1,0,1000)
All values are within the given interval:
>>> np.all(s >= -1) True >>> np.all(s < 0) True
Display the histogram of the samples, along with the probability density function:
>>> import matplotlib.pyplot as plt >>> count, bins, ignored = plt.hist(s, 15, density=True) >>> plt.plot(bins, np.ones_like(bins), linewidth=2, color='r') >>> plt.show()