15.2.115. camcops_server.cc_modules.cc_export¶
camcops_server/cc_modules/cc_export.py
Copyright (C) 2012, University of Cambridge, Department of Psychiatry. Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
CamCOPS is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
Export and research dump functions.
Export design:
WHICH RECORDS TO SEND?
The most powerful mechanism is not to have a sending queue (which would then require careful multi-instance locking), but to have a “sent” log. That way:
A record needs sending if it’s not in the sent log (for an appropriate recipient).
You can add a new recipient and the system will know about the (new) backlog automatically.
You can specify criteria, e.g. don’t upload records before 1/1/2014, and modify that later, and it would catch up with the backlog.
Successes and failures are logged in the same table.
Multiple recipients are handled with ease.
No need to alter database.pl code that receives from tablets.
Can run with a simple cron job.
LOCKING
Don’t use database locking: https://blog.engineyard.com/2011/5-subtle-ways-youre-using-mysql-as-a-queue-and-why-itll-bite-you
Locking via UNIX lockfiles:
http://pythonhosted.org/lockfile/ (which also works on Windows)
On UNIX,
lockfile
usesLinkLockFile
: https://github.com/smontanaro/pylockfile/blob/master/lockfile/linklockfile.py
MESSAGE QUEUE AND BACKEND
Thoughts as of 2018-12-22.
See https://www.fullstackpython.com/task-queues.html. Also http://queues.io/; https://stackoverflow.com/questions/731233/activemq-or-rabbitmq-or-zeromq-or.
The “default” is Celery, with
celery beat
for scheduling, via an AMQP broker like RabbitMQ.Downside: no longer supported under Windows as of Celery 4.
There are immediate bugs when running the demo code with Celery 4.2.1, fixed by setting the environment variable
set FORKED_BY_MULTIPROCESSING=1
before running the worker; see https://github.com/celery/celery/issues/4178 and https://github.com/celery/celery/pull/4078.
Downside: backend is complex; e.g. Erlang dependency of RabbitMQ.
Celery also supports Redis, but Redis doesn’t support Windows directly (except the Windows Subsystem for Linux in Windows 10+).
Another possibility is Dramatiq with APScheduler.
Of note, APScheduler can use an SQLAlchemy database table as its job store, which might be good.
Dramatiq 1.4.0 (2018-11-25) installs cleanly under Windows. Use
pip install --upgrade "dramatic[rabbitmq, watch]"
(i.e. with double quotse, not the single quotes it suggests, which don’t work under Windows).However, the basic example (https://dramatiq.io/guide.html) fails under Windows; when you fire up
dramatic count_words
(even with--processes 1 --threads 1
) it crashes with an error fromForkingPickler
inmultiprocessing.reduction
, i.e. https://docs.python.org/3/library/multiprocessing.html#windows. It also emits aPermissionError: [WinError 5] Access is denied
. This is discussed a bit at https://github.com/Bogdanp/dramatiq/issues/75; https://github.com/Bogdanp/dramatiq/blob/master/docs/source/changelog.rst. The changelog suggests 1.4.0 should work, but it doesn’t.
Worth some thought about ZeroMQ, which is a very different sort of thing. Very cross-platform. Needs work to guard against message loss (i.e. messages are unreliable by default). Dynamic “special socket” style.
Possibly also ActiveMQ.
OK; so speed is not critical but we want message reliability, for it to work under Windows, and decent Python bindings with job scheduling.
OUT: Redis (not Windows easily), ZeroMQ (fast but not by default reliable), ActiveMQ (few Python frameworks?).
REMAINING for message handling: RabbitMQ.
Python options therefore: Celery (but Windows not officially supported from 4+); Dramatiq (but Windows also not very well supported and seems a bit bleeding-edge).
This is looking like a mess from the Windows perspective.
An alternative is just to use the database, of course.
Let’s take a step back and summarize the problem.
Many web threads may upload tasks. This should trigger a prompt export for all push recipients.
Whichever way we schedule a backend task job, it should be as the combination of recipient, basetable, task PK. (That way, if one recipient fails, the others can proceed independently.)
Every job should check that it’s not been completed already (in case of accidental job restarts), i.e. is idempotent as far as we can make it.
How should this interact with the non-push recipients?
We should use the same locking method for push and non-push recipients.
We should make the locking granular and use file locks – for example, for each task/recipient combination (or each whole-database export for a given recipient).
- class camcops_server.cc_modules.cc_export.DownloadOptions(user_id: int, viewtype: str, delivery_mode: str, spreadsheet_simplified: bool = False, spreadsheet_sort_by_heading: bool = False, db_include_blobs: bool = False, db_patient_id_per_row: bool = False, include_information_schema_columns: bool = True, include_summary_schema: bool = True)[source]¶
Represents options for the process of the user downloading tasks.
- __init__(user_id: int, viewtype: str, delivery_mode: str, spreadsheet_simplified: bool = False, spreadsheet_sort_by_heading: bool = False, db_include_blobs: bool = False, db_patient_id_per_row: bool = False, include_information_schema_columns: bool = True, include_summary_schema: bool = True) None [source]¶
- Parameters
user_id – ID of the user creating the request (may be needed to pass to the back-end)
viewtype – file format for receiving data (e.g. XLSX, SQLite)
delivery_mode – method of delivery (e.g. immediate, e-mail)
spreadsheet_sort_by_heading – (For spreadsheets.) Sort columns within each page by heading name?
db_include_blobs – (For database downloads.) Include BLOBs?
db_patient_id_per_row – (For database downloads.) Denormalize by include the patient ID in all rows of patient-related tables?
include_information_schema_columns – Include descriptions of the database source columns?
include_summary_schema – Include descriptions of summary columns and other columns in output spreadsheets?
- class camcops_server.cc_modules.cc_export.OdsExporter(req: CamcopsRequest, collection: TaskCollection, options: camcops_server.cc_modules.cc_export.DownloadOptions)[source]¶
Converts a set of tasks to an OpenOffice ODS file.
- class camcops_server.cc_modules.cc_export.RExporter(*args, **kwargs)[source]¶
Converts a set of tasks to an R script.
- __init__(*args, **kwargs) None [source]¶
- Parameters
collection – a
camcops_server.cc_modules.cc_taskcollection.TaskCollection
options –
DownloadOptions
governing the download
- class camcops_server.cc_modules.cc_export.SqlExporter(*args, **kwargs)[source]¶
Converts a set of tasks to the textual SQL needed to create an SQLite file.
- __init__(*args, **kwargs) None [source]¶
- Parameters
collection – a
camcops_server.cc_modules.cc_taskcollection.TaskCollection
options –
DownloadOptions
governing the download
- class camcops_server.cc_modules.cc_export.SqliteExporter(req: CamcopsRequest, collection: TaskCollection, options: camcops_server.cc_modules.cc_export.DownloadOptions)[source]¶
Converts a set of tasks to an SQLite binary file.
- class camcops_server.cc_modules.cc_export.TaskCollectionExporter(req: CamcopsRequest, collection: TaskCollection, options: camcops_server.cc_modules.cc_export.DownloadOptions)[source]¶
Class to provide tasks for user download.
- __init__(req: CamcopsRequest, collection: TaskCollection, options: camcops_server.cc_modules.cc_export.DownloadOptions)[source]¶
- Parameters
collection – a
camcops_server.cc_modules.cc_taskcollection.TaskCollection
options –
DownloadOptions
governing the download
- create_user_download_and_email() None [source]¶
Creates a user download, and e-mails the user to let them know.
- get_spreadsheet_collection() camcops_server.cc_modules.cc_spreadsheet.SpreadsheetCollection [source]¶
Converts the collection of tasks to a collection of spreadsheet-style data. Also audits the request as a basic data dump.
- Returns
a
camcops_server.cc_modules.cc_spreadsheet.SpreadsheetCollection
object
- immediate_response(req: CamcopsRequest) pyramid.response.Response [source]¶
Returns either a
Response
with the data, or aResponse
saying how the user will obtain their data later.- Parameters
- schedule_download() None [source]¶
Schedule a background export to a file that the user can download later.
- class camcops_server.cc_modules.cc_export.TsvZipExporter(req: CamcopsRequest, collection: TaskCollection, options: camcops_server.cc_modules.cc_export.DownloadOptions)[source]¶
Converts a set of tasks to a set of TSV (tab-separated value) file, (one per table) in a ZIP file.
- class camcops_server.cc_modules.cc_export.UserDownloadFile(filename: str, directory: str = '', permitted_lifespan_min: float = 0, req: CamcopsRequest = None)[source]¶
Represents a file that has been generated for the user to download.
Test code:
from camcops_server.cc_modules.cc_export import UserDownloadFile x = UserDownloadFile("/etc/hosts") print(x.when_last_modified) # should match output of: ls -l /etc/hosts many = UserDownloadFile.from_directory_scan("/etc")
- __init__(filename: str, directory: str = '', permitted_lifespan_min: float = 0, req: CamcopsRequest = None) None [source]¶
- Parameters
filename – Filename, either absolute, or if
directory
is specified, relative todirectory
.directory – Directory. If specified,
filename
must be within it.
Notes:
The Unix
ls
command shows timestamps in the current timezone. TryTZ=utc ls -l <filename>
orTZ="America/New_York" ls -l <filename>
to see this.The underlying timestamp is the time (in seconds) since the Unix “epoch”, which is 00:00:00 UTC on 1 Jan 1970 (https://en.wikipedia.org/wiki/Unix_time).
- property contents: Optional[bytes]¶
The file contents. May raise
OSError
if the read fails.
- property delete_form: str¶
Returns HTML for a form to delete this file.
- property download_url: str¶
Returns a URL to download this file.
- classmethod from_directory_scan(directory: str, permitted_lifespan_min: float = 0, req: CamcopsRequest = None) List[UserDownloadFile] [source]¶
Scans the directory and returns a list of
UserDownloadFile
objects, one for each file in the directory.For each object,
directory
is the root directory (our parameter here), andfilename
is the filename RELATIVE to that.- Parameters
directory – directory to scan
permitted_lifespan_min – lifespan for each file
- older_than(when: pendulum.datetime.DateTime) bool [source]¶
Was the file created before the specified time?
- property size: Optional[int]¶
Size of the file, in bytes. Returns
None
if the file does not exist.
- property size_str: str¶
Returns a pretty-format string describing the file’s size.
- property time_left: Optional[pendulum.duration.Duration]¶
Returns the amount of time that this file has left to live before the server will delete it. Returns
None
if the file does not exist.
- property time_left_str: str¶
A string version of
time_left()
.
- property when_last_modified: Optional[pendulum.datetime.DateTime]¶
Returns the file’s modification time, or
None
if it doesn’t exist.(Creation time is harder! See https://stackoverflow.com/questions/237079/how-to-get-file-creation-modification-date-times-in-python.)
- property when_last_modified_str: str¶
Returns a formatted string with the file’s modification time.
- class camcops_server.cc_modules.cc_export.XlsxExporter(req: CamcopsRequest, collection: TaskCollection, options: camcops_server.cc_modules.cc_export.DownloadOptions)[source]¶
Converts a set of tasks to an Excel XLSX file.
- camcops_server.cc_modules.cc_export.export(req: CamcopsRequest, recipient_names: List[str] = None, all_recipients: bool = False, via_index: bool = True, schedule_via_backend: bool = False) None [source]¶
Exports all relevant tasks (pending incremental exports, or everything if applicable) for specified export recipients.
Called from the command line, or from
camcops_server.cc_modules.celery.export_to_recipient_backend()
.Calls
export_whole_database()
orexport_tasks_individually()
.
- Parameters
recipient_names – list of export recipient names (as per the config file)
all_recipients – use all recipients?
via_index – use the task index (faster)?
schedule_via_backend – schedule jobs via the backend instead?
- camcops_server.cc_modules.cc_export.export_task(req: CamcopsRequest, recipient: camcops_server.cc_modules.cc_exportrecipient.ExportRecipient, task: camcops_server.cc_modules.cc_task.Task) None [source]¶
Exports a single task, checking that it remains valid to do so.
Called by
export_tasks_individually()
directly, or called via :func:camcops_server.cc_modules.celery.export_task_backend
ifexport_tasks_individually()
requested that.Calls
camcops_server.cc_modules.cc_exportmodels.ExportedTask.export()
.For FHIR, holds a recipient-specific “FHIR” file lock during export.
Always holds a recipient-and-task-specific file lock during export.
- Parameters
recipient – an
camcops_server.cc_modules.cc_exportmodels.ExportRecipient
- camcops_server.cc_modules.cc_export.export_tasks_individually(req: CamcopsRequest, recipient: camcops_server.cc_modules.cc_exportrecipient.ExportRecipient, via_index: bool = True, schedule_via_backend: bool = False) None [source]¶
Exports all necessary tasks for a recipient.
Called by
export()
.Calls
export_task()
, ifschedule_via_backend
is False.Schedules :func:
camcops_server.cc_modules.celery.export_task_backend
, ifschedule_via_backend
is True, which callsexport()
in turn.
- Parameters
recipient – an
camcops_server.cc_modules.cc_exportmodels.ExportRecipient
via_index – use the task index (faster)?
schedule_via_backend – schedule jobs via the backend instead?
- camcops_server.cc_modules.cc_export.export_whole_database(req: CamcopsRequest, recipient: camcops_server.cc_modules.cc_exportrecipient.ExportRecipient, via_index: bool = True) None [source]¶
Exports to a database.
Called by
export()
.Holds a recipient-specific “database” file lock in the process.
- Parameters
recipient – an
camcops_server.cc_modules.cc_exportmodels.ExportRecipient
via_index – use the task index (faster)?
- camcops_server.cc_modules.cc_export.gen_audited_tasks_by_task_class(collection: TaskCollection, audit_descriptions: List[str]) Generator[camcops_server.cc_modules.cc_task.Task, None, None] [source]¶
Generates tasks from a collection, across task classes, simultaneously adding to an audit description. Used for user-triggered downloads.
- Parameters
collection – a
camcops_server.cc_modules.cc_taskcollection.TaskCollection
audit_descriptions – list of strings to be modified
- Yields
- camcops_server.cc_modules.cc_export.gen_audited_tasks_for_task_class(collection: TaskCollection, cls: Type[camcops_server.cc_modules.cc_task.Task], audit_descriptions: List[str]) Generator[camcops_server.cc_modules.cc_task.Task, None, None] [source]¶
Generates tasks from a collection, for a given task class, simultaneously adding to an audit description. Used for user-triggered downloads.
- Parameters
collection – a
camcops_server.cc_modules.cc_taskcollection.TaskCollection
cls – the task class to generate
audit_descriptions – list of strings to be modified
- Yields
- camcops_server.cc_modules.cc_export.get_information_schema_query(req: CamcopsRequest) sqlalchemy.engine.cursor.CursorResult [source]¶
Returns an SQLAlchemy query object that fetches the INFORMATION_SCHEMA.COLUMNS information from our source database.
This is not sensitive; there is no data, just structure/comments.
- camcops_server.cc_modules.cc_export.get_information_schema_spreadsheet_page(req: CamcopsRequest, page_name: str = '_camcops_information_schema_columns') camcops_server.cc_modules.cc_spreadsheet.SpreadsheetPage [source]¶
Returns the server database’s
INFORMATION_SCHEMA.COLUMNS
table as acamcops_server.cc_modules.cc_spreadsheet.SpreadsheetPage`
.
- camcops_server.cc_modules.cc_export.make_exporter(req: CamcopsRequest, collection: TaskCollection, options: camcops_server.cc_modules.cc_export.DownloadOptions) camcops_server.cc_modules.cc_export.TaskCollectionExporter [source]¶
- Parameters
collection – a
camcops_server.cc_modules.cc_taskcollection.TaskCollection
options –
camcops_server.cc_modules.cc_export.DownloadOptions
governing the download
- Returns
a
BasicTaskCollectionExporter
- Raises
HTTPBadRequest –
- camcops_server.cc_modules.cc_export.print_export_queue(req: CamcopsRequest, recipient_names: List[str] = None, all_recipients: bool = False, via_index: bool = True, pretty: bool = False, debug_show_fhir: bool = False, debug_fhir_include_docs: bool = False) None [source]¶
Shows tasks that would be exported.
Called from the command line.
- Parameters
recipient_names – list of export recipient names (as per the config file)
all_recipients – use all recipients?
via_index – use the task index (faster)?
pretty – use
str(task)
notrepr(task)
(prettier, but slower because it has to query the patient)debug_show_fhir – Show FHIR output for each task, as JSON?
debug_fhir_include_docs – (If debug_show_fhir.) Include document content? Large!
- camcops_server.cc_modules.cc_export.write_information_schema_to_dst(req: CamcopsRequest, dst_session: sqlalchemy.orm.session.Session, dest_table_name: str = '_camcops_information_schema_columns') None [source]¶
Writes the server’s information schema to a separate database session (which will be an SQLite database being created for download).
There must be no open transactions (i.e. please COMMIT before you call this function), since we need to create a table.