Source code for camcops_server.cc_modules.cc_exportmodels

#!/usr/bin/env python

"""
camcops_server/cc_modules/cc_exportmodels.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Define models for export functions (e.g. HL7, file-based export).**

"""

import logging
import os
import posixpath
import socket
import subprocess
import sys
from typing import Generator, List, Optional, Tuple, TYPE_CHECKING

from cardinal_pythonlib.datetimefunc import (
    get_now_utc_datetime,
    get_now_utc_pendulum,
)
from cardinal_pythonlib.email.sendmail import (
    CONTENT_TYPE_HTML,
    CONTENT_TYPE_TEXT,
)
from cardinal_pythonlib.fileops import mkdir_p
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.network import ping
from cardinal_pythonlib.sqlalchemy.list_types import StringListType
from cardinal_pythonlib.sqlalchemy.orm_query import bool_from_exists_clause
import hl7
from pendulum import DateTime as Pendulum
from sqlalchemy.orm import reconstructor, relationship, Session as SqlASession
from sqlalchemy.sql.schema import Column, ForeignKey
from sqlalchemy.sql.sqltypes import (
    BigInteger,
    Boolean,
    DateTime,
    Integer,
    Text,
    UnicodeText,
)

from camcops_server.cc_modules.cc_constants import (
    ConfigParamExportRecipient,
    FileType,
    UTF8,
)
from camcops_server.cc_modules.cc_email import Email
from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
from camcops_server.cc_modules.cc_exportrecipientinfo import (
    ExportTransmissionMethod,
)
from camcops_server.cc_modules.cc_fhir import (
    FhirExportException,
    FhirTaskExporter,
)
from camcops_server.cc_modules.cc_filename import change_filename_ext
from camcops_server.cc_modules.cc_hl7 import (
    make_msh_segment,
    MLLPTimeoutClient,
    msg_is_successful_ack,
    SEGMENT_SEPARATOR,
)
from camcops_server.cc_modules.cc_redcap import (
    RedcapExportException,
    RedcapTaskExporter,
)
from camcops_server.cc_modules.cc_sqla_coltypes import (
    LongText,
    TableNameColType,
)
from camcops_server.cc_modules.cc_sqlalchemy import Base
from camcops_server.cc_modules.cc_taskcollection import (
    TaskCollection,
    TaskSortMethod,
)
from camcops_server.cc_modules.cc_taskfactory import (
    task_factory_no_security_checks,
)

if TYPE_CHECKING:
    from camcops_server.cc_modules.cc_request import CamcopsRequest
    from camcops_server.cc_modules.cc_task import Task

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Constants
# =============================================================================

DOS_NEWLINE = "\r\n"


# =============================================================================
# Create task collections for export
# =============================================================================


[docs]def get_collection_for_export( req: "CamcopsRequest", recipient: ExportRecipient, via_index: bool = True, debug: bool = False, ) -> TaskCollection: """ Returns an appropriate task collection for this export recipient, namely those tasks that are desired and (in the case of incremental exports) haven't already been sent. "Not already sent" means "not already sent to an export recipient with the same name (even if other aspects of the export recipient have changed)". Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` via_index: use the task index (faster)? debug: report details? Returns: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` """ # noqa if not via_index: log.debug("Task index disabled for get_collection_for_export()") collection = TaskCollection( req=req, sort_method_by_class=TaskSortMethod.CREATION_DATE_ASC, current_only=True, via_index=via_index, export_recipient=recipient, ) if debug: log.debug( "get_collection_for_export(): recipient={!r}, " "collection={!r}", recipient, collection, ) return collection
[docs]def gen_exportedtasks( collection: TaskCollection, ) -> Generator["ExportedTask", None, None]: """ Generates task export entries from a collection. Args: collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` Yields: :class:`ExportedTask` objects """ # noqa dbsession = collection.dbsession recipient = collection.export_recipient assert recipient is not None, "TaskCollection has no export_recipient" for task in collection.gen_tasks_by_class(): et = ExportedTask(recipient, task) dbsession.add(et) yield et
[docs]def gen_tasks_having_exportedtasks( collection: TaskCollection, ) -> Generator["Task", None, None]: """ Generates tasks from a collection, creating export logs as we go. Used for database exports. Args: collection: a :class:`camcops_server.cc_modules.cc_taskcollection.TaskCollection` Yields: :class:`camcops_server.cc_modules.cc_task.Task` objects """ # noqa for et in gen_exportedtasks(collection): yield et.task et.succeed()
# ============================================================================= # ExportedTask class # =============================================================================
[docs]class ExportedTask(Base): """ Class representing an attempt to exported a task (as part of a :class:`ExportRun`) to a specific :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`. """ __tablename__ = "_exported_tasks" id = Column( "id", BigInteger, primary_key=True, autoincrement=True, comment="Arbitrary primary key", ) recipient_id = Column( "recipient_id", BigInteger, ForeignKey(ExportRecipient.id), nullable=False, comment=f"FK to {ExportRecipient.__tablename__}.{ExportRecipient.id.name}", # noqa ) basetable = Column( "basetable", TableNameColType, nullable=False, index=True, comment="Base table of task concerned", ) task_server_pk = Column( "task_server_pk", Integer, nullable=False, index=True, comment="Server PK of task in basetable (_pk field)", ) start_at_utc = Column( "start_at_utc", DateTime, nullable=False, index=True, comment="Time export was started (UTC)", ) finish_at_utc = Column( "finish_at_utc", DateTime, comment="Time export was finished (UTC)" ) success = Column( "success", Boolean, default=False, nullable=False, comment="Task exported successfully?", ) failure_reasons = Column( "failure_reasons", StringListType, comment="Reasons for failure" ) cancelled = Column( "cancelled", Boolean, default=False, nullable=False, comment="Export subsequently cancelled/invalidated (may trigger resend)", # noqa ) cancelled_at_utc = Column( "cancelled_at_utc", DateTime, comment="Time export was cancelled at (UTC)", ) recipient = relationship(ExportRecipient) emails = relationship("ExportedTaskEmail", back_populates="exported_task") fhir_exports = relationship( "ExportedTaskFhir", back_populates="exported_task" ) filegroups = relationship( "ExportedTaskFileGroup", back_populates="exported_task" ) hl7_messages = relationship( "ExportedTaskHL7Message", back_populates="exported_task" ) redcap_exports = relationship( "ExportedTaskRedcap", back_populates="exported_task" ) def __init__( self, recipient: ExportRecipient = None, task: "Task" = None, basetable: str = None, task_server_pk: int = None, *args, **kwargs, ) -> None: """ Can initialize with a task, or a basetable/task_server_pk combination. Args: recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` task: a :class:`camcops_server.cc_modules.cc_task.Task` object basetable: base table name of the task task_server_pk: server PK of the task (However, we must also support a no-parameter constructor, not least for our :func:`merge_db` function.) """ # noqa super().__init__(*args, **kwargs) self.recipient = recipient self.start_at_utc = get_now_utc_datetime() if task: assert ( not basetable ) and task_server_pk is None, ( "Task specified; mustn't specify basetable/task_server_pk" ) self.basetable = task.tablename self.task_server_pk = task.pk self._task = task else: self.basetable = basetable self.task_server_pk = task_server_pk self._task = None # type: Optional[Task]
[docs] @reconstructor def init_on_load(self) -> None: """ Called when SQLAlchemy recreates an object; see https://docs.sqlalchemy.org/en/latest/orm/constructors.html. """ self._task = None # type: Optional[Task]
@property def task(self) -> "Task": """ Returns the associated task. """ if self._task is None: dbsession = SqlASession.object_session(self) try: self._task = task_factory_no_security_checks( dbsession, self.basetable, self.task_server_pk ) except KeyError: log.warning( "Failed to retrieve task for basetable={!r}, " "PK={!r}", self.basetable, self.task_server_pk, ) self._task = None return self._task
[docs] def succeed(self) -> None: """ Register success. """ self.success = True self.finish()
[docs] def abort(self, msg: str) -> None: """ Record failure, and why. (Called ``abort`` not ``fail`` because PyCharm has a bug relating to functions named ``fail``: https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.) Args: msg: why """ self.success = False log.error("Task export failed: {}", msg) self._add_failure_reason(msg) self.finish()
def _add_failure_reason(self, msg: str) -> None: """ Writes to our ``failure_reasons`` list in a way that (a) obviates the need to create an empty list via ``__init__()``, and (b) will definitely mark it as dirty, so it gets saved to the database. See :class:`cardinal_pythonlib.sqlalchemy.list_types.StringListType`. Args: msg: the message """ if self.failure_reasons is None: self.failure_reasons = [msg] else: # Do not use .append(); that won't mark the record as dirty. # Don't use "+="; similarly, that calls list.__iadd__(), not # InstrumentedAttribute.__set__(). # noinspection PyAugmentAssignment self.failure_reasons = self.failure_reasons + [msg]
[docs] def finish(self) -> None: """ Records the finish time. """ self.finish_at_utc = get_now_utc_datetime()
[docs] def export(self, req: "CamcopsRequest") -> None: """ Performs an export of the specific task. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ dbsession = req.dbsession recipient = self.recipient transmission_method = recipient.transmission_method log.info("Exporting task {!r} to recipient {}", self.task, recipient) if transmission_method == ExportTransmissionMethod.EMAIL: email = ExportedTaskEmail(self) dbsession.add(email) email.export_task(req) elif transmission_method == ExportTransmissionMethod.FHIR: efhir = ExportedTaskFhir(self) dbsession.add(efhir) dbsession.flush() efhir.export_task(req) elif transmission_method == ExportTransmissionMethod.FILE: efg = ExportedTaskFileGroup(self) dbsession.add(efg) efg.export_task(req) elif transmission_method == ExportTransmissionMethod.HL7: ehl7 = ExportedTaskHL7Message(self) if ehl7.valid(): dbsession.add(ehl7) ehl7.export_task(req) else: self.abort("Task not valid for HL7 export") elif transmission_method == ExportTransmissionMethod.REDCAP: eredcap = ExportedTaskRedcap(self) dbsession.add(eredcap) eredcap.export_task(req) else: raise AssertionError("Bug: bad transmission_method")
@property def filegroup(self) -> "ExportedTaskFileGroup": """ Returns a :class:`ExportedTaskFileGroup`, creating it if necessary. """ if self.filegroups: # noinspection PyUnresolvedReferences filegroup = self.filegroups[0] # type: ExportedTaskFileGroup else: filegroup = ExportedTaskFileGroup(self) # noinspection PyUnresolvedReferences self.filegroups.append(filegroup) return filegroup
[docs] def export_file( self, filename: str, text: str = None, binary: bytes = None, text_encoding: str = UTF8, ) -> bool: """ Exports a file. Args: filename: text: text contents (specify this XOR ``binary``) binary: binary contents (specify this XOR ``text``) text_encoding: encoding to use when writing text Returns: was it exported? """ filegroup = self.filegroup return filegroup.export_file( filename=filename, text=text, binary=binary, text_encoding=text_encoding, )
[docs] def cancel(self) -> None: """ Marks the task export as cancelled/invalidated. May trigger a resend (which is the point). """ self.cancelled = True self.cancelled_at_utc = get_now_utc_datetime()
[docs] @classmethod def task_already_exported( cls, dbsession: SqlASession, recipient_name: str, basetable: str, task_pk: int, ) -> bool: """ Has the specified task already been successfully exported? Args: dbsession: a :class:`sqlalchemy.orm.session.Session` recipient_name: basetable: name of the task's base table task_pk: server PK of the task Returns: does a successful export record exist for this task? """ exists_q = ( dbsession.query(cls) .join(cls.recipient) .filter(ExportRecipient.recipient_name == recipient_name) .filter(cls.basetable == basetable) .filter(cls.task_server_pk == task_pk) .filter(cls.success == True) # noqa: E712 .filter(cls.cancelled == False) # noqa: E712 .exists() ) return bool_from_exists_clause(dbsession, exists_q)
# ============================================================================= # HL7 export # =============================================================================
[docs]class ExportedTaskHL7Message(Base): """ Represents an individual HL7 message. """ __tablename__ = "_exported_task_hl7msg" id = Column( "id", BigInteger, primary_key=True, autoincrement=True, comment="Arbitrary primary key", ) exported_task_id = Column( "exported_task_id", BigInteger, ForeignKey(ExportedTask.id), nullable=False, comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}", ) sent_at_utc = Column( "sent_at_utc", DateTime, comment="Time message was sent at (UTC)" ) reply_at_utc = Column( "reply_at_utc", DateTime, comment="Time message was replied to (UTC)" ) success = Column( "success", Boolean, comment="Message sent successfully and acknowledged by HL7 server", ) failure_reason = Column( "failure_reason", Text, comment="Reason for failure" ) message = Column("message", LongText, comment="Message body, if kept") reply = Column("reply", Text, comment="Server's reply, if kept") exported_task = relationship(ExportedTask) def __init__( self, exported_task: ExportedTask = None, *args, **kwargs ) -> None: """ Must support parameter-free construction, not least for :func:`merge_db`. """ super().__init__(*args, **kwargs) self.exported_task = exported_task self._hl7_msg = None # type: Optional[hl7.Message]
[docs] @reconstructor def init_on_load(self) -> None: """ Called when SQLAlchemy recreates an object; see https://docs.sqlalchemy.org/en/latest/orm/constructors.html. """ self._hl7_msg = None
[docs] @staticmethod def task_acceptable_for_hl7( recipient: ExportRecipient, task: "Task" ) -> bool: """ Is the task valid for HL7 export. (For example, anonymous tasks and tasks missing key ID information may not be.) Args: recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` task: a :class:`camcops_server.cc_modules.cc_task.Task` object Returns: bool: valid? """ # noqa if not task: return False if task.is_anonymous: return False # Cannot send anonymous tasks via HL7 patient = task.patient if not patient: return False if not recipient.primary_idnum: return False # required for HL7 if not patient.has_idnum_type(recipient.primary_idnum): return False return True
[docs] def valid(self) -> bool: """ Checks for internal validity; returns a bool. """ exported_task = self.exported_task task = exported_task.task recipient = exported_task.recipient return self.task_acceptable_for_hl7(recipient, task)
[docs] def succeed(self, now: Pendulum = None) -> None: """ Record that we succeeded, and so did our associated task export. """ now = now or get_now_utc_datetime() self.success = True self.sent_at_utc = now self.exported_task.succeed()
[docs] def abort(self, msg: str, diverted_not_sent: bool = False) -> None: """ Record that we failed, and so did our associated task export. (Called ``abort`` not ``fail`` because PyCharm has a bug relating to functions named ``fail``: https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.) Args: msg: reason for failure diverted_not_sent: deliberately diverted (and not counted as sent) rather than a sending failure? """ self.success = False self.failure_reason = msg self.exported_task.abort( "HL7 message deliberately not sent; diverted to file" if diverted_not_sent else "HL7 sending failed" )
[docs] def export_task(self, req: "CamcopsRequest") -> None: """ Exports the task itself to an HL7 message. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ if not self.valid(): self.abort( "Unsuitable for HL7; should have been filtered out earlier" ) return self.make_hl7_message(req) recipient = self.exported_task.recipient if recipient.hl7_debug_divert_to_file: self.divert_to_file(req) else: # Proper HL7 message self.transmit_hl7()
[docs] def divert_to_file(self, req: "CamcopsRequest") -> None: """ Write an HL7 message to a file. For debugging. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ exported_task = self.exported_task recipient = exported_task.recipient filename = recipient.get_filename( req, exported_task.task, override_task_format="hl7" ) now_utc = get_now_utc_pendulum() log.info("Diverting HL7 message to file {!r}", filename) written = exported_task.export_file( filename=filename, text=str(self._hl7_msg) ) if not written: return if recipient.hl7_debug_treat_diverted_as_sent: self.sent_at_utc = now_utc self.succeed(now_utc) else: self.abort( "Exported to file as requested but not sent via HL7", diverted_not_sent=True, )
[docs] def make_hl7_message(self, req: "CamcopsRequest") -> None: """ Makes an HL7 message and stores it in ``self._hl7_msg``. May also store it in ``self.message`` (which is saved to the database), if we're saving HL7 messages. See - https://python-hl7.readthedocs.org/en/latest/index.html """ task = self.exported_task.task recipient = self.exported_task.recipient # --------------------------------------------------------------------- # Parts # --------------------------------------------------------------------- msh_segment = make_msh_segment( message_datetime=req.now, message_control_id=str(self.id) ) pid_segment = task.get_patient_hl7_pid_segment(req, recipient) other_segments = task.get_hl7_data_segments(req, recipient) # --------------------------------------------------------------------- # Whole message # --------------------------------------------------------------------- segments = [msh_segment, pid_segment] + other_segments self._hl7_msg = hl7.Message(SEGMENT_SEPARATOR, segments) if recipient.hl7_keep_message: self.message = str(self._hl7_msg)
[docs] def transmit_hl7(self) -> None: """ Sends the HL7 message over TCP/IP. - Default MLLP/HL7 port is 2575 - MLLP = minimum lower layer protocol - https://www.cleo.com/support/byproduct/lexicom/usersguide/mllp_configuration.htm - https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml?search=hl7 - Essentially just a TCP socket with a minimal wrapper: https://stackoverflow.com/questions/11126918 - https://python-hl7.readthedocs.org/en/latest/api.html; however, we've modified that """ # noqa recipient = self.exported_task.recipient if recipient.hl7_ping_first: pinged = self.ping_hl7_server(recipient) if not pinged: self.abort("Could not ping HL7 host") return try: log.info( "Sending HL7 message to {}:{}", recipient.hl7_host, recipient.hl7_port, ) with MLLPTimeoutClient( recipient.hl7_host, recipient.hl7_port, recipient.hl7_network_timeout_ms, ) as client: server_replied, reply = client.send_message(self._hl7_msg) except socket.timeout: self.abort("Failed to send message via MLLP: timeout") return except Exception as e: self.abort(f"Failed to send message via MLLP: {e}") return if not server_replied: self.abort("No response from server") return self.reply_at_utc = get_now_utc_datetime() if recipient.hl7_keep_reply: self.reply = reply try: replymsg = hl7.parse(reply) except Exception as e: self.abort(f"Malformed reply: {e}") return success, failure_reason = msg_is_successful_ack(replymsg) if success: self.succeed() else: self.abort(failure_reason)
[docs] @staticmethod def ping_hl7_server(recipient: ExportRecipient) -> bool: # noinspection HttpUrlsUsage """ Performs a TCP/IP ping on our HL7 server; returns success. If we've already pinged successfully during this run, don't bother doing it again. (No HL7 PING method yet. Proposal is http://hl7tsc.org/wiki/index.php?title=FTSD-ConCalls-20081028 So use TCP/IP ping.) Args: recipient: an :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` Returns: bool: success """ # noqa timeout_s = min(recipient.hl7_network_timeout_ms // 1000, 1) if ping(hostname=recipient.hl7_host, timeout_s=timeout_s): return True else: log.error("Failed to ping {!r}", recipient.hl7_host) return False
# ============================================================================= # File export # =============================================================================
[docs]class ExportedTaskFileGroup(Base): """ Represents a small set of files exported in relation to a single task. """ __tablename__ = "_exported_task_filegroup" id = Column( "id", BigInteger, primary_key=True, autoincrement=True, comment="Arbitrary primary key", ) exported_task_id = Column( "exported_task_id", BigInteger, ForeignKey(ExportedTask.id), nullable=False, comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}", ) filenames = Column( "filenames", StringListType, comment="List of filenames exported" ) script_called = Column( "script_called", Boolean, default=False, nullable=False, comment=( f"Was the {ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} " f"script called?" ), ) script_retcode = Column( "script_retcode", Integer, comment=( f"Return code from the " f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script" ), ) script_stdout = Column( "script_stdout", UnicodeText, comment=( f"stdout from the " f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script" ), ) script_stderr = Column( "script_stderr", UnicodeText, comment=( f"stderr from the " f"{ConfigParamExportRecipient.FILE_SCRIPT_AFTER_EXPORT} script" ), ) exported_task = relationship(ExportedTask) def __init__(self, exported_task: ExportedTask = None) -> None: """ Args: exported_task: :class:`ExportedTask` object """ self.exported_task = exported_task
[docs] def export_file( self, filename: str, text: str = None, binary: bytes = None, text_encoding: str = UTF8, ) -> False: """ Exports the file. Args: filename: text: text contents (specify this XOR ``binary``) binary: binary contents (specify this XOR ``text``) text_encoding: encoding to use when writing text Returns: bool: was it exported? """ assert bool(text) != bool(binary), "Specify text XOR binary" exported_task = self.exported_task filename = os.path.abspath(filename) directory = os.path.dirname(filename) recipient = exported_task.recipient if not recipient.file_overwrite_files and os.path.isfile(filename): self.abort(f"File already exists: {filename!r}") return False if recipient.file_make_directory: try: mkdir_p(directory) except Exception as e: self.abort(f"Couldn't make directory {directory!r}: {e}") return False try: log.debug("Writing to {!r}", filename) if text: with open(filename, mode="w", encoding=text_encoding) as f: f.write(text) else: with open(filename, mode="wb") as f: f.write(binary) except Exception as e: self.abort(f"Failed to open or write file {filename!r}: {e}") return False self.note_exported_file(filename) return True
[docs] def note_exported_file(self, *filenames: str) -> None: """ Records a filename that has been exported, or several. Args: *filenames: filenames """ if self.filenames is None: self.filenames = list(filenames) else: # See ExportedTask._add_failure_reason() above: # noinspection PyAugmentAssignment,PyTypeChecker self.filenames = self.filenames + list(filenames)
[docs] def export_task(self, req: "CamcopsRequest") -> None: """ Exports the task itself to a file. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ exported_task = self.exported_task task = exported_task.task recipient = exported_task.recipient task_format = recipient.task_format task_filename = recipient.get_filename(req, task) rio_metadata_filename = change_filename_ext( task_filename, ".metadata" ).replace(" ", "") # ... in case we use it. No spaces in its filename. # Before we calculate the PDF, etc., we can pre-check for existing # files. if not recipient.file_overwrite_files: target_filenames = [task_filename] if recipient.file_export_rio_metadata: target_filenames.append(rio_metadata_filename) for fname in target_filenames: if os.path.isfile(os.path.abspath(fname)): self.abort(f"File already exists: {fname!r}") return # Export task if task_format == FileType.PDF: binary = task.get_pdf(req) text = None elif task_format == FileType.HTML: binary = None text = task.get_html(req) elif task_format == FileType.XML: binary = None text = task.get_xml(req) else: raise AssertionError("Unknown task_format") written = self.export_file( task_filename, text=text, binary=binary, text_encoding=UTF8 ) if not written: return # RiO metadata too? if recipient.file_export_rio_metadata: metadata = task.get_rio_metadata( req, recipient.rio_idnum, recipient.rio_uploading_user, recipient.rio_document_type, ) # We're going to write in binary mode, to get the newlines right. # One way is: # with codecs.open(filename, mode="w", encoding="ascii") as f: # f.write(metadata.replace("\n", DOS_NEWLINE)) # Here's another. metadata = metadata.replace("\n", DOS_NEWLINE) # ... Servelec say CR = "\r", but DOS is \r\n. metadata_binary = metadata.encode("ascii") # UTF-8 is NOT supported by RiO for metadata. written_metadata = self.export_file( rio_metadata_filename, binary=metadata_binary ) if not written_metadata: return self.finish_run_script_if_necessary()
[docs] def succeed(self) -> None: """ Register success. """ self.exported_task.succeed()
[docs] def abort(self, msg: str) -> None: """ Record failure, and why. (Called ``abort`` not ``fail`` because PyCharm has a bug relating to functions named ``fail``: https://stackoverflow.com/questions/21954959/pycharm-unreachable-code.) Args: msg: why """ self.exported_task.abort(msg)
[docs] def finish_run_script_if_necessary(self) -> None: """ Completes the file export by running the external script, if required. """ recipient = self.exported_task.recipient if self.filenames and recipient.file_script_after_export: args = [recipient.file_script_after_export] + self.filenames try: encoding = sys.getdefaultencoding() p = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) out, err = p.communicate() self.script_called = True self.script_stdout = out.decode(encoding) self.script_stderr = err.decode(encoding) self.script_retcode = p.returncode except Exception as e: self.script_called = False self.script_stdout = "" self.script_stderr = str(e) self.abort("Failed to run script") return self.succeed()
# ============================================================================= # E-mail export # =============================================================================
[docs]class ExportedTaskEmail(Base): """ Represents an individual email export. """ __tablename__ = "_exported_task_email" id = Column( "id", BigInteger, primary_key=True, autoincrement=True, comment="Arbitrary primary key", ) exported_task_id = Column( "exported_task_id", BigInteger, ForeignKey(ExportedTask.id), nullable=False, comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}", ) email_id = Column( "email_id", BigInteger, ForeignKey(Email.id), comment=f"FK to {Email.__tablename__}.{Email.id.name}", ) exported_task = relationship(ExportedTask) email = relationship(Email) def __init__(self, exported_task: ExportedTask = None) -> None: """ Args: exported_task: :class:`ExportedTask` object """ self.exported_task = exported_task
[docs] def export_task(self, req: "CamcopsRequest") -> None: """ Exports the task itself to an email. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ exported_task = self.exported_task task = exported_task.task recipient = exported_task.recipient task_format = recipient.task_format task_filename = os.path.basename(recipient.get_filename(req, task)) # ... we don't want a full path for e-mail! encoding = "utf8" # Export task attachments = [] # type: List[Tuple[str, bytes]] if task_format == FileType.PDF: binary = task.get_pdf(req) elif task_format == FileType.HTML: binary = task.get_html(req).encode(encoding) elif task_format == FileType.XML: binary = task.get_xml(req).encode(encoding) else: raise AssertionError("Unknown task_format") attachments.append((task_filename, binary)) self.email = Email( from_addr=recipient.email_from, # date: automatic sender=recipient.email_sender, reply_to=recipient.email_reply_to, to=recipient.email_to, cc=recipient.email_cc, bcc=recipient.email_bcc, subject=recipient.get_email_subject(req, task), body=recipient.get_email_body(req, task), content_type=( CONTENT_TYPE_HTML if recipient.email_body_as_html else CONTENT_TYPE_TEXT ), charset=encoding, attachments_binary=attachments, save_msg_string=recipient.email_keep_message, ) self.email.send( host=recipient.email_host, username=recipient.email_host_username, password=recipient.email_host_password, port=recipient.email_port, use_tls=recipient.email_use_tls, ) if self.email.sent: exported_task.succeed() else: exported_task.abort("Failed to send e-mail")
# ============================================================================= # REDCap export # =============================================================================
[docs]class ExportedTaskRedcap(Base): """ Represents an individual REDCap export. """ __tablename__ = "_exported_task_redcap" id = Column( "id", Integer, primary_key=True, autoincrement=True, comment="Arbitrary primary key", ) exported_task_id = Column( "exported_task_id", BigInteger, ForeignKey(ExportedTask.id), nullable=False, comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}", ) exported_task = relationship(ExportedTask) # We store these just as an audit trail redcap_record_id = Column( "redcap_record_id", UnicodeText, comment=( "ID of the (patient) record on the REDCap instance where " "this task has been exported" ), ) redcap_instrument_name = Column( "redcap_instrument_name", UnicodeText, comment=( "The name of the REDCap instrument name (form) where this " "task has been exported" ), ) redcap_instance_id = Column( "redcap_instance_id", Integer, comment=( "1-based index of this particular task within the patient " "record. Increments on every repeat attempt." ), ) def __init__(self, exported_task: ExportedTask = None) -> None: """ Args: exported_task: :class:`ExportedTask` object """ self.exported_task = exported_task
[docs] def export_task(self, req: "CamcopsRequest") -> None: """ Exports the task to REDCap. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ exported_task = self.exported_task exporter = RedcapTaskExporter() try: exporter.export_task(req, self) exported_task.succeed() except RedcapExportException as e: exported_task.abort(str(e))
# ============================================================================= # FHIR export # =============================================================================
[docs]class ExportedTaskFhir(Base): """ Represents an individual FHIR export. """ __tablename__ = "_exported_task_fhir" id = Column( "id", Integer, primary_key=True, autoincrement=True, comment="Arbitrary primary key", ) exported_task_id = Column( "exported_task_id", BigInteger, ForeignKey(ExportedTask.id), nullable=False, comment=f"FK to {ExportedTask.__tablename__}.{ExportedTask.id.name}", ) exported_task = relationship(ExportedTask) entries = relationship( "ExportedTaskFhirEntry", back_populates="exported_task_fhir" ) def __init__(self, exported_task: ExportedTask = None) -> None: """ Args: exported_task: :class:`ExportedTask` object """ self.exported_task = exported_task
[docs] def export_task(self, req: "CamcopsRequest") -> None: """ Exports the task to FHIR. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ exported_task = self.exported_task try: exporter = FhirTaskExporter(req, self) exporter.export_task() exported_task.succeed() except FhirExportException as e: exported_task.abort(str(e))
[docs]class ExportedTaskFhirEntry(Base): """ Details of Patients, Questionnaires, QuestionnaireResponses exported to a FHIR server for a single task. """ __tablename__ = "_exported_task_fhir_entry" id = Column( "id", Integer, primary_key=True, autoincrement=True, comment="Arbitrary primary key", ) exported_task_fhir_id = Column( "exported_task_fhir_id", Integer, ForeignKey(ExportedTaskFhir.id), nullable=False, comment="FK to {}.{}".format( ExportedTaskFhir.__tablename__, ExportedTaskFhir.id.name ), ) etag = Column( "etag", UnicodeText, comment="The ETag for the resource (if relevant)" ) last_modified = Column( "last_modified", DateTime, comment="Server's date/time modified." ) location = Column( "location", UnicodeText, comment="The location (if the operation returns a location).", ) status = Column( "status", UnicodeText, comment="Status response code (text optional)." ) # TODO: outcome? exported_task_fhir = relationship(ExportedTaskFhir) @property def location_url(self) -> str: """ Puts the FHIR server API URL together with the returned location, so we can hyperlink to the resource. """ if not self.location: return "" try: api_url = ( self.exported_task_fhir.exported_task.recipient.fhir_api_url ) except AttributeError: return "" # Avoid urllib.parse.urljoin; it does complex (and for our purposes # wrong) things. See # https://stackoverflow.com/questions/10893374/python-confusions-with-urljoin return posixpath.join(api_url, self.location)