#!/usr/bin/env python



    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <>.


**ExportRecipientInfo class.**

The purpose of this is to capture information without using an SQLAlchemy
class. The :class:`camcops_server.cc_modules.cc_config.CamcopsConfig` class
uses this, as it needs to be readable in the absence of a database connection


import configparser
import datetime
import logging
from typing import List, NoReturn, Optional, TYPE_CHECKING

from cardinal_pythonlib.configfiles import (
from cardinal_pythonlib.datetimefunc import (
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.reprfunc import simple_repr

from camcops_server.cc_modules.cc_constants import (
from camcops_server.cc_modules.cc_filename import (

    from camcops_server.cc_modules.cc_config import CamcopsConfig
    from camcops_server.cc_modules.cc_request import CamcopsRequest
    from camcops_server.cc_modules.cc_task import Task

log = BraceStyleAdapter(logging.getLogger(__name__))

# =============================================================================
# Constants
# =============================================================================

COMMA = ","

[docs]class ExportTransmissionMethod(object): """ Possible export transmission methods. """ DATABASE = "database" EMAIL = "email" FHIR = "fhir" FILE = "file" HL7 = "hl7" REDCAP = "redcap"
NO_PUSH_METHODS = [ # Methods that do not support "push" exports (exports on receipt of a new # task). ExportTransmissionMethod.DATABASE, # ... because these are large and it would probably be silly to export a # whole database whenever a new task arrives. (Is there also a locking # problem? Can't remember right now, 2021-11-08.) ] ALL_TRANSMISSION_METHODS = [ v for k, v in vars(ExportTransmissionMethod).items() if not k.startswith("_") ] # ... the values of all the relevant attributes ALL_TASK_FORMATS = [FileType.HTML, FileType.PDF, FileType.XML]
[docs]class InvalidExportRecipient(ValueError): """ Exception for invalid export recipients. """
[docs] def __init__(self, recipient_name: str, msg: str) -> None: super().__init__(f"For export recipient [{recipient_name}]: {msg}")
# Internal shorthand: _Invalid = InvalidExportRecipient class _Missing(_Invalid): """ Exception for missing config parameters """ def __init__(self, recipient_name: str, paramname: str) -> None: super().__init__(recipient_name, f"Missing parameter {paramname}") # ============================================================================= # ExportRecipientInfo class # =============================================================================
[docs]class ExportRecipientInfo(object): """ Class representing an export recipient, that is not an SQLAlchemy ORM object. This has an unfortunate close relationship with :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` (q.v.). Full details of parameters are in the docs for the config file. """ IGNORE_FOR_EQ_ATTRNAMES = [ # Attribute names to ignore for equality comparison "email_host_password", "fhir_app_secret", "fhir_launch_token", "redcap_api_key", ]
[docs] def __init__(self, other: "ExportRecipientInfo" = None) -> None: """ Initializes, optionally copying attributes from ``other``. """ cd = ConfigDefaults() self.recipient_name = "" # How to export self.transmission_method = ExportTransmissionMethod.EMAIL self.push = cd.PUSH self.task_format = cd.TASK_FORMAT self.xml_field_comments = cd.XML_FIELD_COMMENTS # What to export self.all_groups = cd.ALL_GROUPS self.group_names = ( [] ) # type: List[str] # not in database; see group_ids self.group_ids = [] # type: List[int] self.tasks = [] # type: List[str] self.start_datetime_utc = None # type: Optional[datetime.datetime] self.end_datetime_utc = None # type: Optional[datetime.datetime] self.finalized_only = cd.FINALIZED_ONLY self.include_anonymous = cd.INCLUDE_ANONYMOUS self.primary_idnum = None # type: Optional[int] self.require_idnum_mandatory = ( cd.REQUIRE_PRIMARY_IDNUM_MANDATORY_IN_POLICY ) # Database self.db_url = "" self.db_echo = cd.DB_ECHO self.db_include_blobs = cd.DB_INCLUDE_BLOBS self.db_add_summaries = cd.DB_ADD_SUMMARIES self.db_patient_id_per_row = cd.DB_PATIENT_ID_PER_ROW # Email self.email_host = "" self.email_port = cd.EMAIL_PORT self.email_use_tls = cd.EMAIL_USE_TLS self.email_host_username = "" self.email_host_password = "" # not in database for security self.email_from = "" self.email_sender = "" self.email_reply_to = "" self.email_to = "" # CSV list self.email_cc = "" # CSV list self.email_bcc = "" # CSV list self.email_patient_spec = "" self.email_patient_spec_if_anonymous = cd.PATIENT_SPEC_IF_ANONYMOUS self.email_subject = "" self.email_body_as_html = cd.EMAIL_BODY_IS_HTML self.email_body = "" self.email_keep_message = cd.EMAIL_KEEP_MESSAGE # HL7 self.hl7_host = "" self.hl7_port = cd.HL7_PORT self.hl7_ping_first = cd.HL7_PING_FIRST self.hl7_network_timeout_ms = cd.HL7_NETWORK_TIMEOUT_MS self.hl7_keep_message = cd.HL7_KEEP_MESSAGE self.hl7_keep_reply = cd.HL7_KEEP_REPLY self.hl7_debug_divert_to_file = cd.HL7_DEBUG_DIVERT_TO_FILE self.hl7_debug_treat_diverted_as_sent = ( cd.HL7_DEBUG_TREAT_DIVERTED_AS_SENT ) # File self.file_patient_spec = "" self.file_patient_spec_if_anonymous = cd.PATIENT_SPEC_IF_ANONYMOUS self.file_filename_spec = "" self.file_make_directory = cd.FILE_MAKE_DIRECTORY self.file_overwrite_files = cd.FILE_OVERWRITE_FILES self.file_export_rio_metadata = cd.FILE_EXPORT_RIO_METADATA self.file_script_after_export = "" # File/RiO self.rio_idnum = None # type: Optional[int] self.rio_uploading_user = "" self.rio_document_type = "" # REDCap self.redcap_api_key = "" # not in database for security self.redcap_api_url = "" self.redcap_fieldmap_filename = "" # FHIR self.fhir_app_id = "" self.fhir_api_url = "" self.fhir_app_secret = "" # not in database for security self.fhir_launch_token = "" # not in database for security self.fhir_concurrent = False # Copy from other? if other is not None: assert isinstance(other, ExportRecipientInfo) for attrname in self.get_attrnames(): # Note that both "self" and "other" may be an ExportRecipient # rather than an ExportRecipientInfo. if hasattr(other, attrname): setattr(self, attrname, getattr(other, attrname))
[docs] def get_attrnames(self) -> List[str]: """ Returns all relevant attribute names. """ return sorted( [key for key in self.__dict__ if not key.startswith("_")] )
[docs] def get_eq_attrnames(self) -> List[str]: """ Returns attribute names to use for equality comparison. """ return [ x for x in self.get_attrnames() if x not in self.IGNORE_FOR_EQ_ATTRNAMES ]
def __repr__(self): return simple_repr(self, self.get_attrnames()) def __str__(self) -> str: return repr(self.recipient_name) def __eq__(self, other: "ExportRecipientInfo") -> bool: """ Does this object equal another -- meaning "sufficiently equal that we can use the same one, rather than making a new database copy"? """ for attrname in self.get_attrnames(): if attrname not in self.IGNORE_FOR_EQ_ATTRNAMES: selfattr = getattr(self, attrname) otherattr = getattr(other, attrname) # log.debug("{}.{}: {} {} {}", # self.__class__.__name__, # attrname, # selfattr, # "==" if selfattr == otherattr else "!=", # otherattr) if selfattr != otherattr: log.debug( "{}: For {!r}, new export recipient mismatches " "previous copy on {}: {!r} != {!r}", self.__class__.__name__, self.recipient_name, attrname, selfattr, otherattr, ) return False return True
[docs] @classmethod def create_dummy_recipient(cls) -> "ExportRecipientInfo": """ Creates and returns a dummy :class:`ExportRecipientInfo`. """ d = cls() d.recipient_name = "_dummy_export_recipient_" d.current = True d.transmission_method = ExportTransmissionMethod.FILE d.all_groups = True d.primary_idnum = 1 d.require_idnum_mandatory = False d.finalized_only = False d.task_format = FileType.XML # File d.include_anonymous = True d.file_patient_spec_if_anonymous = "anonymous" d.file_patient_spec = "{surname}_{forename}_{idshortdesc1}{idnum1}" d.file_filename_spec = ( "/tmp/camcops_debug_testing/" "TestCamCOPS_{patient}_{created}_{tasktype}-{serverpk}" ".{filetype}" ) d.file_overwrite_files = False d.file_make_directory = True return d
[docs] @classmethod def read_from_config( cls, cfg: "CamcopsConfig", parser: configparser.ConfigParser, recipient_name: str, ) -> "ExportRecipientInfo": """ Reads from the config file and writes this instance's attributes. Args: cfg: a :class:`camcops_server.cc_modules.cc_config.CamcopsConfig` parser: configparser INI file object recipient_name: name of recipient and of INI file section Returns: an :class:`ExportRecipient` object, which is **not** currently in a database session """ assert recipient_name log.debug("Loading export config for recipient {!r}", recipient_name) section = CONFIG_RECIPIENT_PREFIX + recipient_name cps = ConfigParamSite cpr = ConfigParamExportRecipient cd = ConfigDefaults() r = cls() # type: ExportRecipientInfo def _get_str(paramname: str, default: str = None) -> Optional[str]: return get_config_parameter( parser, section, paramname, str, default ) def _get_bool(paramname: str, default: bool) -> bool: return get_config_parameter_boolean( parser, section, paramname, default ) def _get_int(paramname: str, default: int = None) -> Optional[int]: return get_config_parameter( parser, section, paramname, int, default ) def _get_multiline(paramname: str) -> List[str]: return get_config_parameter_multiline( parser, section, paramname, [] ) def _get_site_str( paramname: str, default: str = None ) -> Optional[str]: return get_config_parameter( parser, CONFIG_FILE_SITE_SECTION, paramname, str, default ) # noinspection PyUnusedLocal def _get_site_bool(paramname: str, default: bool) -> bool: return get_config_parameter_boolean( parser, CONFIG_FILE_SITE_SECTION, paramname, default ) # noinspection PyUnusedLocal def _get_site_int( paramname: str, default: int = None ) -> Optional[int]: return get_config_parameter( parser, CONFIG_FILE_SITE_SECTION, paramname, int, default ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Identity # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ r.recipient_name = recipient_name # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # How to export # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ r.transmission_method = _get_str(cpr.TRANSMISSION_METHOD) r.transmission_method = str(r.transmission_method).lower() # Check this one immediately, since we use it in conditions below if r.transmission_method not in ALL_TRANSMISSION_METHODS: raise _Invalid( r.recipient_name, f"Missing/invalid " f"{ConfigParamExportRecipient.TRANSMISSION_METHOD}: " f"{r.transmission_method}", ) r.push = _get_bool(cpr.PUSH, cd.PUSH) r.task_format = _get_str(cpr.TASK_FORMAT, cd.TASK_FORMAT) r.xml_field_comments = _get_bool( cpr.XML_FIELD_COMMENTS, cd.XML_FIELD_COMMENTS ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # What to export # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ r.all_groups = _get_bool(cpr.ALL_GROUPS, cd.ALL_GROUPS) r.group_names = _get_multiline(cpr.GROUPS) r.group_ids = [] # ... read later by validate_db_dependent() r.tasks = sorted([x.lower() for x in _get_multiline(cpr.TASKS)]) sd = _get_str(cpr.START_DATETIME_UTC) r.start_datetime_utc = ( pendulum_to_utc_datetime_without_tz( coerce_to_pendulum(sd, assume_local=False) ) if sd else None ) ed = _get_str(cpr.END_DATETIME_UTC) r.end_datetime_utc = ( pendulum_to_utc_datetime_without_tz( coerce_to_pendulum(ed, assume_local=False) ) if ed else None ) r.finalized_only = _get_bool(cpr.FINALIZED_ONLY, cd.FINALIZED_ONLY) r.include_anonymous = _get_bool( cpr.INCLUDE_ANONYMOUS, cd.INCLUDE_ANONYMOUS ) r.primary_idnum = _get_int(cpr.PRIMARY_IDNUM) r.require_idnum_mandatory = _get_bool( cpr.REQUIRE_PRIMARY_IDNUM_MANDATORY_IN_POLICY, cd.REQUIRE_PRIMARY_IDNUM_MANDATORY_IN_POLICY, ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Database # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if r.transmission_method == ExportTransmissionMethod.DATABASE: r.db_url = _get_str(cpr.DB_URL) r.db_echo = _get_bool(cpr.DB_ECHO, cd.DB_ECHO) r.db_include_blobs = _get_bool( cpr.DB_INCLUDE_BLOBS, cd.DB_INCLUDE_BLOBS ) r.db_add_summaries = _get_bool( cpr.DB_ADD_SUMMARIES, cd.DB_ADD_SUMMARIES ) r.db_patient_id_per_row = _get_bool( cpr.DB_PATIENT_ID_PER_ROW, cd.DB_PATIENT_ID_PER_ROW ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Email # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ def _make_email_csv_list(paramname: str) -> str: return ", ".join(x for x in _get_multiline(paramname)) if r.transmission_method == ExportTransmissionMethod.EMAIL: r.email_host = cfg.email_host r.email_port = cfg.email_port r.email_use_tls = cfg.email_use_tls r.email_host_username = cfg.email_host_username r.email_host_password = cfg.email_host_password # # Read from password safe using 'pass'
                # from subprocess import run, PIPE
                # output = run(["pass", "dept-of-psychiatry/Hermes"], stdout=PIPE)
                # r.email_host_password = output.stdout.decode("utf-8").split()[0]

                r.email_from = _get_site_str(cps.EMAIL_FROM, "")
                r.email_sender = _get_site_str(cps.EMAIL_SENDER, "")
                r.email_reply_to = _get_site_str(cps.EMAIL_REPLY_TO, "")
                r.email_to = _make_email_csv_list(cpr.EMAIL_TO)
                r.email_cc = _make_email_csv_list(cpr.EMAIL_CC)
                r.email_bcc = _make_email_csv_list(cpr.EMAIL_BCC)
                r.email_patient_spec_if_anonymous = _get_str(
                    cpr.EMAIL_PATIENT_SPEC_IF_ANONYMOUS, ""
                )
                r.email_patient_spec = _get_str(cpr.EMAIL_PATIENT_SPEC, "")
                r.email_subject = _get_str(cpr.EMAIL_SUBJECT, "")
                r.email_body_as_html = _get_bool(
                    cpr.EMAIL_BODY_IS_HTML, cd.EMAIL_BODY_IS_HTML
                )
                r.email_body = _get_str(cpr.EMAIL_BODY, "")
                r.email_keep_message = _get_bool(
                    cpr.EMAIL_KEEP_MESSAGE, cd.EMAIL_KEEP_MESSAGE
                )

            # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
            # HL7
            # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
            if r.transmission_method == ExportTransmissionMethod.HL7:
                r.hl7_host = _get_str(cpr.HL7_HOST)
                r.hl7_port = _get_int(cpr.HL7_PORT, cd.HL7_PORT)
                r.hl7_ping_first = _get_bool(cpr.HL7_PING_FIRST, cd.HL7_PING_FIRST)
                r.hl7_network_timeout_ms = _get_int(
                    cpr.HL7_NETWORK_TIMEOUT_MS, cd.HL7_NETWORK_TIMEOUT_MS
                )
                r.hl7_keep_message = _get_bool(
                    cpr.HL7_KEEP_MESSAGE, cd.HL7_KEEP_MESSAGE
                )
                r.hl7_keep_reply = _get_bool(cpr.HL7_KEEP_REPLY, cd.HL7_KEEP_REPLY)
                r.hl7_debug_divert_to_file = _get_bool(
                    cpr.HL7_DEBUG_DIVERT_TO_FILE, cd.HL7_DEBUG_DIVERT_TO_FILE
                )
                r.hl7_debug_treat_diverted_as_sent = _get_bool(
                    cpr.HL7_DEBUG_TREAT_DIVERTED_AS_SENT,
                    cd.HL7_DEBUG_TREAT_DIVERTED_AS_SENT,
                ) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # File # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if r._need_file_name(): r.file_patient_spec = _get_str(cpr.FILE_PATIENT_SPEC) r.file_patient_spec_if_anonymous = _get_str( cpr.FILE_PATIENT_SPEC_IF_ANONYMOUS, cd.FILE_PATIENT_SPEC_IF_ANONYMOUS, ) r.file_filename_spec = _get_str(cpr.FILE_FILENAME_SPEC) if r._need_file_disk_options(): r.file_make_directory = _get_bool( cpr.FILE_MAKE_DIRECTORY, cd.FILE_MAKE_DIRECTORY ) r.file_overwrite_files = _get_bool( cpr.FILE_OVERWRITE_FILES, cd.FILE_OVERWRITE_FILES ) if r.transmission_method == ExportTransmissionMethod.FILE: r.file_export_rio_metadata = _get_bool( cpr.FILE_EXPORT_RIO_METADATA, cd.FILE_EXPORT_RIO_METADATA ) r.file_script_after_export = _get_str(cpr.FILE_SCRIPT_AFTER_EXPORT) if r._need_rio_metadata_options(): # RiO metadata r.rio_idnum = _get_int(cpr.RIO_IDNUM) r.rio_uploading_user = _get_str(cpr.RIO_UPLOADING_USER) r.rio_document_type = _get_str(cpr.RIO_DOCUMENT_TYPE) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # REDCap # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if r.transmission_method == ExportTransmissionMethod.REDCAP: r.redcap_api_url = _get_str(cpr.REDCAP_API_URL) r.redcap_api_key = _get_str(cpr.REDCAP_API_KEY) r.redcap_fieldmap_filename = _get_str(cpr.REDCAP_FIELDMAP_FILENAME) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # FHIR # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if r.transmission_method == ExportTransmissionMethod.FHIR: r.fhir_api_url = _get_str(cpr.FHIR_API_URL) r.fhir_app_id = _get_str( cpr.FHIR_APP_ID, CAMCOPS_DEFAULT_FHIR_APP_ID ) r.fhir_app_secret = _get_str(cpr.FHIR_APP_SECRET) r.fhir_launch_token = _get_str(cpr.FHIR_LAUNCH_TOKEN) r.fhir_concurrent = _get_bool(cpr.FHIR_CONCURRENT, False) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Validate the basics and return # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ r.validate_db_independent() return r
[docs] @classmethod def report_error(cls, msg: str) -> None: """ Report an error to the log. """ log.error("{}: {}", cls.__name__, msg)
[docs] def valid(self, req: "CamcopsRequest") -> bool: """ Is this definition valid? Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ try: self.validate(req) return True except InvalidExportRecipient as e: self.report_error(str(e)) return False
[docs] def validate(self, req: "CamcopsRequest") -> None: """ Validates all aspects. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` Raises: :exc:`InvalidExportRecipient` if invalid """ self.validate_db_independent() self.validate_db_dependent(req)
[docs] def validate_db_independent(self) -> None: """ Validates the database-independent aspects of the :class:`ExportRecipient`, or raises :exc:`InvalidExportRecipient`. """ # noinspection PyUnresolvedReferences import camcops_server.cc_modules.cc_all_models # import side effects (ensure all models registered) # noqa from camcops_server.cc_modules.cc_task import ( all_task_tablenames, ) # delayed import def fail_invalid(msg: str) -> NoReturn: raise _Invalid(self.recipient_name, msg) def fail_missing(paramname: str) -> NoReturn: raise _Missing(self.recipient_name, paramname) cpr = ConfigParamExportRecipient cps = ConfigParamSite # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Export type # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.transmission_method not in ALL_TRANSMISSION_METHODS: fail_invalid( f"Missing/invalid {cpr.TRANSMISSION_METHOD}: " f"{self.transmission_method}" ) if self.push and self.transmission_method in NO_PUSH_METHODS: fail_invalid( f"Push notifications not supported for these " f"transmission methods: {NO_PUSH_METHODS!r}" ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # What to export # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if not self.all_groups and not self.group_names: fail_invalid(f"Missing group names (from {cpr.GROUPS})") all_basetables = all_task_tablenames() for basetable in self.tasks: if basetable not in all_basetables: fail_invalid(f"Task {basetable!r} doesn't exist") if ( self.transmission_method == ExportTransmissionMethod.HL7 and not self.primary_idnum ): fail_invalid( f"Must specify {cpr.PRIMARY_IDNUM} with " f"{cpr.TRANSMISSION_METHOD} = {ExportTransmissionMethod.HL7}" ) if not self.task_format or self.task_format not in ALL_TASK_FORMATS: fail_invalid( f"Missing/invalid {cpr.TASK_FORMAT}: {self.task_format}" ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Database # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.transmission_method == ExportTransmissionMethod.DATABASE: if not self.db_url: fail_missing(cpr.DB_URL) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Email # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.transmission_method == ExportTransmissionMethod.EMAIL: if not self.email_host: # You can't send an e-mail without knowing which server to send # it to. fail_missing(cps.EMAIL_HOST) # Username is *not* required by all servers! if not self.email_from: # From is mandatory in all e-mails. # (Sender and Reply-To are optional.) fail_missing(cps.EMAIL_FROM) if COMMA in self.email_from: # RFC 5322 permits multiple addresses in From, but Python # sendmail doesn't. fail_invalid( f"Only a single 'From:' address permitted; was " f"{self.email_from!r}" ) if not any([self.email_to, self.email_cc, self.email_bcc]): # At least one destination is required (obviously). fail_invalid( f"Must specify some of: {cpr.EMAIL_TO}, {cpr.EMAIL_CC}, " f"{cpr.EMAIL_BCC}" ) if COMMA in self.email_sender: # RFC 5322 permits multiple addresses in From and Reply-To, # but only one in Sender. fail_invalid( f"Only a single 'Sender:' address permitted; was " f"{self.email_sender!r}" ) if not self.email_subject: # A subject is not obligatory for e-mails in general, but we # will require one for e-mails sent from CamCOPS. fail_missing(cpr.EMAIL_SUBJECT) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # HL7 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.transmission_method == ExportTransmissionMethod.HL7: if not self.hl7_debug_divert_to_file: if not self.hl7_host: fail_missing(cpr.HL7_HOST) if not self.hl7_port or self.hl7_port <= 0: fail_invalid( f"Missing/invalid {cpr.HL7_PORT}: {self.hl7_port}" ) if not self.primary_idnum: fail_missing(cpr.PRIMARY_IDNUM) if self.include_anonymous: fail_invalid("Can't include anonymous tasks for HL7") # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # File # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self._need_file_name(): # Filename options if not self.file_patient_spec_if_anonymous: fail_missing(cpr.FILE_PATIENT_SPEC_IF_ANONYMOUS) if not self.file_patient_spec: fail_missing(cpr.FILE_PATIENT_SPEC) if not self.file_filename_spec: fail_missing(cpr.FILE_FILENAME_SPEC) if self._need_rio_metadata_options(): # RiO metadata if ( not self.rio_uploading_user or " " in self.rio_uploading_user or len(self.rio_uploading_user) > RIO_MAX_USER_LEN ): fail_invalid( f"Missing/invalid {cpr.RIO_UPLOADING_USER}: " f"{self.rio_uploading_user} (must be present, contain no " f"spaces, and max length {RIO_MAX_USER_LEN})" ) if not self.rio_document_type: fail_missing(cpr.RIO_DOCUMENT_TYPE) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # REDCap # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.transmission_method == ExportTransmissionMethod.HL7: if not self.primary_idnum: fail_missing(cpr.PRIMARY_IDNUM) if self.include_anonymous: fail_invalid("Can't include anonymous tasks for REDCap")
[docs] def validate_db_dependent(self, req: "CamcopsRequest") -> None: """ Validates the database-dependent aspects of the :class:`ExportRecipient`, or raises :exc:`InvalidExportRecipient`. :meth:`validate_db_independent` should have been called first; this function presumes that those checks have been passed. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ from camcops_server.cc_modules.cc_group import Group # delayed import def fail_invalid(msg: str) -> NoReturn: raise _Invalid(self.recipient_name, msg) dbsession = req.dbsession valid_which_idnums = req.valid_which_idnums cpr = ConfigParamExportRecipient # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # Set group IDs from group names # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ self.group_ids = [] # type: List[int] for groupname in self.group_names: group = Group.get_group_by_name(dbsession, groupname) if not group: raise ValueError(f"No such group: {groupname!r}") self.group_ids.append( self.group_ids.sort() # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # What to export # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self.all_groups: groups = Group.get_all_groups(dbsession) else: groups = [] # type: List[Group] for gid in self.group_ids: group = Group.get_group_by_id(dbsession, gid) if not group: fail_invalid(f"Invalid group ID: {gid}") groups.append(group) if self.primary_idnum: if self.primary_idnum not in valid_which_idnums: fail_invalid( f"Invalid {cpr.PRIMARY_IDNUM}: {self.primary_idnum}" ) if self.require_idnum_mandatory: # (a) ID number must be mandatory in finalized records for group in groups: finalize_policy = group.tokenized_finalize_policy() if not finalize_policy.is_idnum_mandatory_in_policy( which_idnum=self.primary_idnum, valid_idnums=valid_which_idnums, ): fail_invalid( f"primary_idnum ({self.primary_idnum}) must be " f"mandatory in finalizing policy, but is not for " f"group {group}" ) if not self.finalized_only: # (b) ID number must also be mandatory in uploaded, # non-finalized records upload_policy = group.tokenized_upload_policy() if not upload_policy.is_idnum_mandatory_in_policy( which_idnum=self.primary_idnum, valid_idnums=valid_which_idnums, ): fail_invalid( f"primary_idnum ({self.primary_idnum}) must " f"be mandatory in upload policy (because " f"{cpr.FINALIZED_ONLY} is false), but is not " f"for group {group}" ) # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # File # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ if self._need_file_name(): # Filename options if not patient_spec_for_filename_is_valid( patient_spec=self.file_patient_spec, valid_which_idnums=valid_which_idnums, ): fail_invalid( f"Invalid {cpr.FILE_PATIENT_SPEC}: " f"{self.file_patient_spec}" ) if not filename_spec_is_valid( filename_spec=self.file_filename_spec, valid_which_idnums=valid_which_idnums, ): fail_invalid( f"Invalid {cpr.FILE_FILENAME_SPEC}: " f"{self.file_filename_spec}" ) if self._need_rio_metadata_options(): # RiO metadata if self.rio_idnum not in valid_which_idnums: fail_invalid( f"Invalid ID number type for " f"{cpr.RIO_IDNUM}: {self.rio_idnum}" )
def _need_file_name(self) -> bool: """ Do we need to know about filenames? """ return ( self.transmission_method == ExportTransmissionMethod.FILE or ( self.transmission_method == ExportTransmissionMethod.HL7 and self.hl7_debug_divert_to_file ) or self.transmission_method == ExportTransmissionMethod.EMAIL ) def _need_file_disk_options(self) -> bool: """ Do we need to know about how to write to disk (e.g. overwrite, make directories)? """ return self.transmission_method == ExportTransmissionMethod.FILE or ( self.transmission_method == ExportTransmissionMethod.HL7 and self.hl7_debug_divert_to_file ) def _need_rio_metadata_options(self) -> bool: """ Do we need to know about RiO metadata? """ return ( self.transmission_method == ExportTransmissionMethod.FILE and self.file_export_rio_metadata )
[docs] def using_db(self) -> bool: """ Is the recipient a database? """ return self.transmission_method == ExportTransmissionMethod.DATABASE
[docs] def using_email(self) -> bool: """ Is the recipient an e-mail system? """ return self.transmission_method == ExportTransmissionMethod.EMAIL
[docs] def using_file(self) -> bool: """ Is the recipient a filestore? """ return self.transmission_method == ExportTransmissionMethod.FILE
[docs] def using_hl7(self) -> bool: """ Is the recipient an HL7 v2 recipient? """ return self.transmission_method == ExportTransmissionMethod.HL7
[docs] def using_fhir(self) -> bool: """ Is the recipient a FHIR recipient? """ return self.transmission_method == ExportTransmissionMethod.FHIR
[docs] def anonymous_ok(self) -> bool: """ Does this recipient permit/want anonymous tasks? """ return self.include_anonymous and not ( # Methods that require patient identification: self.using_hl7() or self.using_fhir() )
[docs] def is_incremental(self) -> bool: """ Is this an incremental export? (That's the norm, except for database exports.) """ return not self.using_db()
[docs] @staticmethod def get_hl7_id_type(req: "CamcopsRequest", which_idnum: int) -> str: """ Get the HL7 ID type for a specific CamCOPS ID number type. """ iddef = req.get_idnum_definition(which_idnum) return (iddef.hl7_id_type or "") if iddef else ""
[docs] @staticmethod def get_hl7_id_aa(req: "CamcopsRequest", which_idnum: int) -> str: """ Get the HL7 Assigning Authority for a specific CamCOPS ID number type. """ iddef = req.get_idnum_definition(which_idnum) return (iddef.hl7_assigning_authority or "") if iddef else ""
def _get_processed_spec( self, req: "CamcopsRequest", task: "Task", patient_spec_if_anonymous: str, patient_spec: str, spec: str, treat_as_filename: bool, override_task_format: str = "", ) -> str: """ Returns a Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` task: a :class:`camcops_server.cc_modules.cc_task.Task` patient_spec_if_anonymous: patient specification to be used for anonymous tasks patient_spec: patient specification to be used for patient-identifiable tasks spec: specification to use to create the string (may include patient information from the patient specification) treat_as_filename: convert the resulting string to be a safe filename override_task_format: format to use to override the default (typically to force an extension e.g. for HL7 debugging) Returns: a processed string specification (e.g. a filename; an e-mail subject) """ return get_export_filename( req=req, patient_spec_if_anonymous=patient_spec_if_anonymous, patient_spec=patient_spec, filename_spec=spec, filetype=( override_task_format if override_task_format else self.task_format ), is_anonymous=task.is_anonymous, surname=task.get_patient_surname(), forename=task.get_patient_forename(), dob=task.get_patient_dob(), sex=task.get_patient_sex(), idnum_objects=task.get_patient_idnum_objects(), creation_datetime=task.get_creation_datetime(), basetable=task.tablename,, skip_conversion_to_safe_filename=not treat_as_filename, )
[docs] def get_filename( self, req: "CamcopsRequest", task: "Task", override_task_format: str = "", ) -> str: """ Get the export filename, for file transfers. """ return self._get_processed_spec( req=req, task=task, patient_spec_if_anonymous=self.file_patient_spec_if_anonymous, patient_spec=self.file_patient_spec, spec=self.file_filename_spec, treat_as_filename=True, override_task_format=override_task_format, )
[docs] def get_email_subject(self, req: "CamcopsRequest", task: "Task") -> str: """ Gets a substituted e-mail subject. """ return self._get_processed_spec( req=req, task=task, patient_spec_if_anonymous=self.email_patient_spec_if_anonymous, patient_spec=self.email_patient_spec, spec=self.email_subject, treat_as_filename=False, )
[docs] def get_email_body(self, req: "CamcopsRequest", task: "Task") -> str: """ Gets a substituted e-mail body. """ return self._get_processed_spec( req=req, task=task, patient_spec_if_anonymous=self.email_patient_spec_if_anonymous, patient_spec=self.email_patient_spec, spec=self.email_body, treat_as_filename=False, )