Source code for camcops_server.cc_modules.cc_patient

#!/usr/bin/env python

"""
camcops_server/cc_modules/cc_patient.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Patients.**

"""

import logging
from typing import (
    Any,
    Dict,
    Generator,
    List,
    Optional,
    Set,
    Tuple,
    TYPE_CHECKING,
    Union,
)
import uuid

from cardinal_pythonlib.classes import classproperty
from cardinal_pythonlib.datetimefunc import (
    coerce_to_pendulum_date,
    format_datetime,
    get_age,
    PotentialDatetimeType,
)
from cardinal_pythonlib.json.typing_helpers import JsonObjectType
from cardinal_pythonlib.logs import BraceStyleAdapter
import cardinal_pythonlib.rnc_web as ws
from fhirclient.models.address import Address
from fhirclient.models.contactpoint import ContactPoint
from fhirclient.models.humanname import HumanName
from fhirclient.models.fhirreference import FHIRReference
from fhirclient.models.identifier import Identifier
from fhirclient.models.patient import Patient as FhirPatient
import hl7
import pendulum
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session as SqlASession
from sqlalchemy.orm.relationships import RelationshipProperty
from sqlalchemy.sql.expression import and_, ClauseElement, select
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.selectable import SelectBase
from sqlalchemy.sql import sqltypes
from sqlalchemy.sql.sqltypes import Integer, UnicodeText

from camcops_server.cc_modules.cc_audit import audit
from camcops_server.cc_modules.cc_constants import (
    DateFormat,
    ERA_NOW,
    FHIRConst as Fc,
    FP_ID_DESC,
    FP_ID_SHORT_DESC,
    FP_ID_NUM,
    SEX_FEMALE,
    SEX_MALE,
    SEX_OTHER_UNSPECIFIED,
    SPREADSHEET_PATIENT_FIELD_PREFIX,
)
from camcops_server.cc_modules.cc_dataclasses import SummarySchemaInfo
from camcops_server.cc_modules.cc_db import (
    GenericTabletRecordMixin,
    PFN_UUID,
    TABLET_ID_FIELD,
)
from camcops_server.cc_modules.cc_fhir import (
    fhir_pk_identifier,
    make_fhir_bundle_entry,
)
from camcops_server.cc_modules.cc_hl7 import make_pid_segment
from camcops_server.cc_modules.cc_html import answer
from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition
from camcops_server.cc_modules.cc_simpleobjects import (
    BarePatientInfo,
    HL7PatientIdentifier,
)
from camcops_server.cc_modules.cc_patientidnum import (
    extra_id_colname,
    PatientIdNum,
)
from camcops_server.cc_modules.cc_proquint import proquint_from_uuid
from camcops_server.cc_modules.cc_report import Report
from camcops_server.cc_modules.cc_simpleobjects import (
    IdNumReference,
    TaskExportOptions,
)
from camcops_server.cc_modules.cc_specialnote import SpecialNote
from camcops_server.cc_modules.cc_sqla_coltypes import (
    CamcopsColumn,
    EmailAddressColType,
    PatientNameColType,
    SexColType,
    UuidColType,
)
from camcops_server.cc_modules.cc_sqlalchemy import Base
from camcops_server.cc_modules.cc_spreadsheet import SpreadsheetPage
from camcops_server.cc_modules.cc_version import CAMCOPS_SERVER_VERSION_STRING
from camcops_server.cc_modules.cc_xml import (
    XML_COMMENT_SPECIAL_NOTES,
    XmlElement,
)

if TYPE_CHECKING:
    from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
    from camcops_server.cc_modules.cc_group import Group
    from camcops_server.cc_modules.cc_policy import TokenizedPolicy
    from camcops_server.cc_modules.cc_request import CamcopsRequest
    from camcops_server.cc_modules.cc_taskschedule import PatientTaskSchedule
    from camcops_server.cc_modules.cc_user import User

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Patient class
# =============================================================================


[docs]class Patient(GenericTabletRecordMixin, Base): """ Class representing a patient. """ __tablename__ = "patient" id = Column( TABLET_ID_FIELD, Integer, nullable=False, comment="Primary key (patient ID) on the source tablet device" # client PK ) uuid = CamcopsColumn( PFN_UUID, UuidColType, comment="UUID", default=uuid.uuid4, # generates a random UUID ) # type: Optional[uuid.UUID] forename = CamcopsColumn( "forename", PatientNameColType, index=True, identifies_patient=True, include_in_anon_staging_db=True, comment="Forename", ) # type: Optional[str] surname = CamcopsColumn( "surname", PatientNameColType, index=True, identifies_patient=True, include_in_anon_staging_db=True, comment="Surname", ) # type: Optional[str] dob = CamcopsColumn( "dob", sqltypes.Date, # verified: merge_db handles this correctly index=True, identifies_patient=True, include_in_anon_staging_db=True, comment="Date of birth" # ... e.g. "2013-02-04" ) sex = CamcopsColumn( "sex", SexColType, index=True, include_in_anon_staging_db=True, comment="Sex (M, F, X)", ) address = CamcopsColumn( "address", UnicodeText, identifies_patient=True, comment="Address" ) email = CamcopsColumn( "email", EmailAddressColType, identifies_patient=True, comment="Patient's e-mail address", ) gp = CamcopsColumn( "gp", UnicodeText, identifies_patient=True, comment="General practitioner (GP)", ) other = CamcopsColumn( "other", UnicodeText, identifies_patient=True, comment="Other details" ) idnums = relationship( # https://docs.sqlalchemy.org/en/latest/orm/join_conditions.html#relationship-custom-foreign # https://docs.sqlalchemy.org/en/latest/orm/relationship_api.html#sqlalchemy.orm.relationship # noqa # https://docs.sqlalchemy.org/en/latest/orm/join_conditions.html#relationship-primaryjoin # noqa "PatientIdNum", primaryjoin=( "and_(" " remote(PatientIdNum.patient_id) == foreign(Patient.id), " " remote(PatientIdNum._device_id) == foreign(Patient._device_id), " " remote(PatientIdNum._era) == foreign(Patient._era), " " remote(PatientIdNum._current) == True " ")" ), uselist=True, viewonly=True, # Profiling results 2019-10-14 exporting 4185 phq9 records with # unique patients to xlsx (task-patient relationship "selectin") # lazy="select" : 35.3s # lazy="joined" : 27.3s # lazy="subquery": 15.2s (31.0s when task-patient also subquery) # lazy="selectin": 26.4s # See also patient relationship on Task class (cc_task.py) lazy="subquery", ) # type: List[PatientIdNum] task_schedules = relationship( "PatientTaskSchedule", back_populates="patient", cascade="all, delete" ) # type: List[PatientTaskSchedule] # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # THE FOLLOWING ARE DEFUNCT, AND THE SERVER WORKS AROUND OLD TABLETS IN # THE UPLOAD API. # # idnum1 = Column("idnum1", BigInteger, comment="ID number 1") # idnum2 = Column("idnum2", BigInteger, comment="ID number 2") # idnum3 = Column("idnum3", BigInteger, comment="ID number 3") # idnum4 = Column("idnum4", BigInteger, comment="ID number 4") # idnum5 = Column("idnum5", BigInteger, comment="ID number 5") # idnum6 = Column("idnum6", BigInteger, comment="ID number 6") # idnum7 = Column("idnum7", BigInteger, comment="ID number 7") # idnum8 = Column("idnum8", BigInteger, comment="ID number 8") # # iddesc1 = Column("iddesc1", IdDescriptorColType, comment="ID description 1") # noqa # iddesc2 = Column("iddesc2", IdDescriptorColType, comment="ID description 2") # noqa # iddesc3 = Column("iddesc3", IdDescriptorColType, comment="ID description 3") # noqa # iddesc4 = Column("iddesc4", IdDescriptorColType, comment="ID description 4") # noqa # iddesc5 = Column("iddesc5", IdDescriptorColType, comment="ID description 5") # noqa # iddesc6 = Column("iddesc6", IdDescriptorColType, comment="ID description 6") # noqa # iddesc7 = Column("iddesc7", IdDescriptorColType, comment="ID description 7") # noqa # iddesc8 = Column("iddesc8", IdDescriptorColType, comment="ID description 8") # noqa # # idshortdesc1 = Column("idshortdesc1", IdDescriptorColType, comment="ID short description 1") # noqa # idshortdesc2 = Column("idshortdesc2", IdDescriptorColType, comment="ID short description 2") # noqa # idshortdesc3 = Column("idshortdesc3", IdDescriptorColType, comment="ID short description 3") # noqa # idshortdesc4 = Column("idshortdesc4", IdDescriptorColType, comment="ID short description 4") # noqa # idshortdesc5 = Column("idshortdesc5", IdDescriptorColType, comment="ID short description 5") # noqa # idshortdesc6 = Column("idshortdesc6", IdDescriptorColType, comment="ID short description 6") # noqa # idshortdesc7 = Column("idshortdesc7", IdDescriptorColType, comment="ID short description 7") # noqa # idshortdesc8 = Column("idshortdesc8", IdDescriptorColType, comment="ID short description 8") # noqa # # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ------------------------------------------------------------------------- # Relationships # ------------------------------------------------------------------------- # noinspection PyMethodParameters @declared_attr def special_notes(cls) -> RelationshipProperty: """ Relationship to all :class:`SpecialNote` objects associated with this patient. """ # The SpecialNote also allows a link to patients, not just tasks, # like this: return relationship( SpecialNote, primaryjoin=( "and_(" " remote(SpecialNote.basetable) == literal({repr_patient_tablename}), " # noqa " remote(SpecialNote.task_id) == foreign(Patient.id), " " remote(SpecialNote.device_id) == foreign(Patient._device_id), " # noqa " remote(SpecialNote.era) == foreign(Patient._era), " " not_(SpecialNote.hidden)" ")".format(repr_patient_tablename=repr(cls.__tablename__)) ), uselist=True, order_by="SpecialNote.note_at", viewonly=True, # for now! ) # ------------------------------------------------------------------------- # Patient-fetching classmethods # -------------------------------------------------------------------------
[docs] @classmethod def get_patients_by_idnum( cls, dbsession: SqlASession, which_idnum: int, idnum_value: int, group_id: int = None, current_only: bool = True, ) -> List["Patient"]: """ Get all patients matching the specified ID number. Args: dbsession: a :class:`sqlalchemy.orm.session.Session` which_idnum: which ID number type? idnum_value: actual value of the ID number group_id: optional group ID to restrict to current_only: restrict to ``_current`` patients? Returns: list of all matching patients """ if not which_idnum or which_idnum < 1: return [] if idnum_value is None: return [] q = dbsession.query(cls).join(cls.idnums) # ... the join pre-restricts to current ID numbers # https://docs.sqlalchemy.org/en/latest/orm/join_conditions.html#using-custom-operators-in-join-conditions # noqa q = q.filter(PatientIdNum.which_idnum == which_idnum) q = q.filter(PatientIdNum.idnum_value == idnum_value) if group_id is not None: q = q.filter(Patient._group_id == group_id) if current_only: q = q.filter(cls._current == True) # noqa: E712 patients = q.all() # type: List[Patient] return patients
[docs] @classmethod def get_patient_by_pk( cls, dbsession: SqlASession, server_pk: int ) -> Optional["Patient"]: """ Fetch a patient by the server PK. """ return dbsession.query(cls).filter(cls._pk == server_pk).first()
[docs] @classmethod def get_patient_by_id_device_era( cls, dbsession: SqlASession, client_id: int, device_id: int, era: str ) -> Optional["Patient"]: """ Fetch a patient by the client ID, device ID, and era. """ return ( dbsession.query(cls) .filter(cls.id == client_id) .filter(cls._device_id == device_id) .filter(cls._era == era) .first() )
# ------------------------------------------------------------------------- # String representations # ------------------------------------------------------------------------- def __str__(self) -> str: """ A plain string version, without the need for a request object. Example: .. code-block:: none SMITH, BOB (M, 1 Jan 1950, idnum1=123, idnum2=456) """ return "{sf} ({sex}, {dob}, {ids})".format( sf=self.get_surname_forename_upper(), sex=self.sex, dob=self.get_dob_str(), ids=", ".join(str(i) for i in self.get_idnum_objects()), )
[docs] def prettystr(self, req: "CamcopsRequest") -> str: """ A prettified string version. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` Example: .. code-block:: none SMITH, BOB (M, 1 Jan 1950, RiO# 123, NHS# 456) """ return "{sf} ({sex}, {dob}, {ids})".format( sf=self.get_surname_forename_upper(), sex=self.sex, dob=self.get_dob_str(), ids=", ".join(i.prettystr(req) for i in self.get_idnum_objects()), )
[docs] def get_letter_style_identifiers(self, req: "CamcopsRequest") -> str: """ Our best guess at the kind of text you'd put in a clinical letter to say "it's about this patient". Example: .. code-block:: none Bob Smith (1 Jan 1950, RiO number 123, NHS number 456) """ return "{fs} ({dob}, {ids})".format( fs=self.get_forename_surname(), dob=self.get_dob_str(), ids=", ".join( i.full_prettystr(req) for i in self.get_idnum_objects() ), )
# ------------------------------------------------------------------------- # Equality # ------------------------------------------------------------------------- def __eq__(self, other: "Patient") -> bool: """ Is this patient the same as another? .. code-block:: python from camcops_server.cc_modules.cc_patient import Patient p1 = Patient(id=1, _device_id=1, _era="NOW") print(p1 == p1) # True p2 = Patient(id=1, _device_id=1, _era="NOW") print(p1 == p2) # True p3 = Patient(id=1, _device_id=2, _era="NOW") print(p1 == p3) # False s = set([p1, p2, p3]) # contains two patients IMPERFECT in that it doesn't use intermediate patients to link identity (e.g. P1 has RiO#=3, P2 has RiO#=3, NHS#=5, P3 has NHS#=5; they are all the same by inference but P1 and P3 will not compare equal). """ # Same object? # log.debug("self={}, other={}", self, other) if self is other: # log.debug("... same object; equal") return True # Same device/era/patient ID (client PK)? Test int before str for speed if ( self.id == other.id and self._device_id == other._device_id and self._era == other._era and self.id is not None and self._device_id is not None and self._era is not None ): # log.debug("... same device/era/id; equal") return True # Shared ID number? for sid in self.idnums: if sid in other.idnums: # log.debug("... share idnum {}; equal", sid) return True # Otherwise... # log.debug("... unequal") return False def __hash__(self) -> int: """ To put objects into a set, they must be hashable. See https://docs.python.org/3/glossary.html#term-hashable. If two objects are equal (via :func:`__eq__`) they must provide the same hash value (but two objects with the same hash are not necessarily equal). """ return 0 # all objects have the same hash; "use __eq__() instead" # ------------------------------------------------------------------------- # ID numbers # -------------------------------------------------------------------------
[docs] def get_idnum_objects(self) -> List[PatientIdNum]: """ Returns all :class:`PatientIdNum` objects for the patient. These are SQLAlchemy ORM objects. """ return self.idnums
[docs] def get_idnum_references(self) -> List[IdNumReference]: """ Returns all :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference` objects for the patient. These are simple which_idnum/idnum_value pairs. """ idnums = self.idnums # type: List[PatientIdNum] return [ x.get_idnum_reference() for x in idnums if x.is_superficially_valid() ]
[docs] def get_idnum_raw_values_only(self) -> List[int]: """ Get all plain ID number values (ignoring which ID number type they represent) for the patient. """ idnums = self.idnums # type: List[PatientIdNum] return [x.idnum_value for x in idnums if x.is_superficially_valid()]
[docs] def get_idnum_object(self, which_idnum: int) -> Optional[PatientIdNum]: """ Gets the PatientIdNum object for a specified which_idnum, or None. """ idnums = self.idnums # type: List[PatientIdNum] for x in idnums: if x.which_idnum == which_idnum: return x return None
[docs] def has_idnum_type(self, which_idnum: int) -> bool: """ Does the patient have an ID number of the specified type? """ return self.get_idnum_object(which_idnum) is not None
[docs] def get_idnum_value(self, which_idnum: int) -> Optional[int]: """ Get value of a specific ID number, if present. """ idobj = self.get_idnum_object(which_idnum) return idobj.idnum_value if idobj else None
[docs] def set_idnum_value( self, req: "CamcopsRequest", which_idnum: int, idnum_value: int ) -> None: """ Sets an ID number value. """ dbsession = req.dbsession ccsession = req.camcops_session idnums = self.idnums # type: List[PatientIdNum] for idobj in idnums: if idobj.which_idnum == which_idnum: idobj.idnum_value = idnum_value return # Otherwise, make a new one: newid = PatientIdNum() newid.patient_id = self.id newid._device_id = self._device_id newid._era = self._era newid._current = True newid._when_added_exact = req.now_era_format newid._when_added_batch_utc = req.now_utc newid._adding_user_id = ccsession.user_id newid._camcops_version = CAMCOPS_SERVER_VERSION_STRING dbsession.add(newid) self.idnums.append(newid)
[docs] def get_iddesc( self, req: "CamcopsRequest", which_idnum: int ) -> Optional[str]: """ Get value of a specific ID description, if present. """ idobj = self.get_idnum_object(which_idnum) return idobj.description(req) if idobj else None
[docs] def get_idshortdesc( self, req: "CamcopsRequest", which_idnum: int ) -> Optional[str]: """ Get value of a specific ID short description, if present. """ idobj = self.get_idnum_object(which_idnum) return idobj.short_description(req) if idobj else None
[docs] def add_extra_idnum_info_to_row(self, row: Dict[str, Any]) -> None: """ For the ``DB_PATIENT_ID_PER_ROW`` export option. Adds additional ID number info to a row. Args: row: future database row, as a dictionary """ for idobj in self.idnums: which_idnum = idobj.which_idnum fieldname = extra_id_colname(which_idnum) row[fieldname] = idobj.idnum_value
# ------------------------------------------------------------------------- # Group # ------------------------------------------------------------------------- @property def group(self) -> Optional["Group"]: """ Returns the :class:`camcops_server.cc_modules.cc_group.Group` to which this patient's record belongs. """ return self._group # ------------------------------------------------------------------------- # Policies # -------------------------------------------------------------------------
[docs] def satisfies_upload_id_policy(self) -> bool: """ Does the patient satisfy the uploading ID policy? """ group = self._group # type: Optional[Group] if not group: return False return self.satisfies_id_policy(group.tokenized_upload_policy())
[docs] def satisfies_finalize_id_policy(self) -> bool: """ Does the patient satisfy the finalizing ID policy? """ group = self._group # type: Optional[Group] if not group: return False return self.satisfies_id_policy(group.tokenized_finalize_policy())
[docs] def satisfies_id_policy(self, policy: "TokenizedPolicy") -> bool: """ Does the patient satisfy a particular ID policy? """ return policy.satisfies_id_policy(self.get_bare_ptinfo())
# ------------------------------------------------------------------------- # Name, DOB/age, sex, address, etc. # -------------------------------------------------------------------------
[docs] def get_surname(self) -> str: """ Get surname (in upper case) or "". """ return self.surname.upper() if self.surname else ""
[docs] def get_forename(self) -> str: """ Get forename (in upper case) or "". """ return self.forename.upper() if self.forename else ""
[docs] def get_forename_surname(self) -> str: """ Get "Forename Surname" as a string, using "(UNKNOWN)" for missing details. """ f = self.forename or "(UNKNOWN)" s = self.surname or "(UNKNOWN)" return f"{f} {s}"
[docs] def get_surname_forename_upper(self) -> str: """ Get "SURNAME, FORENAME", using "(UNKNOWN)" for missing details. """ s = self.surname.upper() if self.surname else "(UNKNOWN)" f = self.forename.upper() if self.forename else "(UNKNOWN)" return f"{s}, {f}"
[docs] def get_dob_html(self, req: "CamcopsRequest", longform: bool) -> str: """ HTML fragment for date of birth. """ _ = req.gettext if longform: dob = answer( format_datetime(self.dob, DateFormat.LONG_DATE, default=None) ) dobtext = _("Date of birth:") return f"<br>{dobtext} {dob}" else: dobtext = _("DOB:") dob = format_datetime(self.dob, DateFormat.SHORT_DATE) return f"{dobtext} {dob}."
[docs] def get_age( self, req: "CamcopsRequest", default: str = "" ) -> Union[int, str]: """ Age (in whole years) today, or default. """ now = req.now return self.get_age_at(now, default=default)
[docs] def get_dob(self) -> Optional[pendulum.Date]: """ Date of birth, as a a timezone-naive date. """ dob = self.dob if not dob: return None return coerce_to_pendulum_date(dob)
[docs] def get_dob_str(self) -> Optional[str]: """ Date of birth, as a string. """ dob_dt = self.get_dob() if dob_dt is None: return None return format_datetime(dob_dt, DateFormat.SHORT_DATE)
[docs] def get_age_at( self, when: PotentialDatetimeType, default: str = "" ) -> Union[int, str]: """ Age (in whole years) at a particular date, or default. """ return get_age(self.dob, when, default=default)
[docs] def is_female(self) -> bool: """ Is sex 'F'? """ return self.sex == SEX_FEMALE
[docs] def is_male(self) -> bool: """ Is sex 'M'? """ return self.sex == SEX_MALE
[docs] def get_sex(self) -> str: """ Return sex or "". """ return self.sex or ""
[docs] def get_sex_verbose(self, default: str = "sex unknown") -> str: """ Returns HTML-safe version of sex, or default. """ return default if not self.sex else ws.webify(self.sex)
[docs] def get_address(self) -> Optional[str]: """ Returns address (NOT necessarily web-safe). """ address = self.address # type: Optional[str] return address or ""
[docs] def get_email(self) -> Optional[str]: """ Returns email address """ email = self.email # type: Optional[str] return email or ""
# ------------------------------------------------------------------------- # Other representations # -------------------------------------------------------------------------
[docs] def get_xml_root( self, req: "CamcopsRequest", options: TaskExportOptions = None ) -> XmlElement: """ Get root of XML tree, as an :class:`camcops_server.cc_modules.cc_xml.XmlElement`. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` options: a :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions` """ # noqa # No point in skipping old ID columns (1-8) now; they're gone. branches = self._get_xml_branches(req, options=options) # Now add new-style IDs: pidnum_branches = [] # type: List[XmlElement] pidnum_options = TaskExportOptions( xml_include_plain_columns=True, xml_with_header_comments=False ) for pidnum in self.idnums: # type: PatientIdNum pidnum_branches.append( pidnum._get_xml_root(req, options=pidnum_options) ) branches.append(XmlElement(name="idnums", value=pidnum_branches)) # Special notes branches.append(XML_COMMENT_SPECIAL_NOTES) special_notes = self.special_notes # type: List[SpecialNote] for sn in special_notes: branches.append(sn.get_xml_root()) return XmlElement(name=self.__tablename__, value=branches)
[docs] def get_spreadsheet_page(self, req: "CamcopsRequest") -> SpreadsheetPage: """ Get a :class:`camcops_server.cc_modules.cc_spreadsheet.SpreadsheetPage` for the patient. """ # 1. Our core fields. page = self._get_core_spreadsheet_page( req, heading_prefix=SPREADSHEET_PATIENT_FIELD_PREFIX ) # 2. ID number details # We can't just iterate through the ID numbers; we have to iterate # through all possible ID numbers. for iddef in req.idnum_definitions: n = iddef.which_idnum nstr = str(n) shortdesc = iddef.short_description longdesc = iddef.description idnum_value = next( ( idnum.idnum_value for idnum in self.idnums if idnum.which_idnum == n and idnum.is_superficially_valid() ), None, ) page.add_or_set_value( heading=SPREADSHEET_PATIENT_FIELD_PREFIX + FP_ID_NUM + nstr, value=idnum_value, ) page.add_or_set_value( heading=SPREADSHEET_PATIENT_FIELD_PREFIX + FP_ID_DESC + nstr, value=longdesc, ) page.add_or_set_value( heading=( SPREADSHEET_PATIENT_FIELD_PREFIX + FP_ID_SHORT_DESC + nstr ), value=shortdesc, ) return page
[docs] def get_spreadsheet_schema_elements( self, req: "CamcopsRequest", table_name: str = "" ) -> Set[SummarySchemaInfo]: """ Follows :func:`get_spreadsheet_page`, but retrieving schema information. """ # 1. Core fields items = self._get_core_spreadsheet_schema( table_name=table_name, column_name_prefix=SPREADSHEET_PATIENT_FIELD_PREFIX, ) # 2. ID number details table_name = table_name or self.__tablename__ for iddef in req.idnum_definitions: n = iddef.which_idnum nstr = str(n) comment_suffix = f" [ID#{n}]" items.add( SummarySchemaInfo( table_name=table_name, source=SummarySchemaInfo.SSV_DB, column_name=( SPREADSHEET_PATIENT_FIELD_PREFIX + FP_ID_NUM + nstr ), data_type=str(PatientIdNum.idnum_value.type), comment=PatientIdNum.idnum_value.comment + comment_suffix, ) ) items.add( SummarySchemaInfo( table_name=table_name, source=SummarySchemaInfo.SSV_DB, column_name=( SPREADSHEET_PATIENT_FIELD_PREFIX + FP_ID_DESC + nstr ), data_type=str(IdNumDefinition.description.type), comment=IdNumDefinition.description.comment + comment_suffix, ) ) items.add( SummarySchemaInfo( table_name=table_name, source=SummarySchemaInfo.SSV_DB, column_name=( SPREADSHEET_PATIENT_FIELD_PREFIX + FP_ID_SHORT_DESC + nstr ), data_type=str(IdNumDefinition.short_description.type), comment=( IdNumDefinition.short_description.comment + comment_suffix ), ) ) return items
[docs] def get_bare_ptinfo(self) -> BarePatientInfo: """ Get basic identifying information, as a :class:`camcops_server.cc_modules.cc_simpleobjects.BarePatientInfo` object. """ return BarePatientInfo( forename=self.forename, surname=self.surname, sex=self.sex, dob=self.dob, address=self.address, email=self.email, gp=self.gp, otherdetails=self.other, idnum_definitions=self.get_idnum_references(), )
[docs] def get_hl7_pid_segment( self, req: "CamcopsRequest", recipient: "ExportRecipient" ) -> hl7.Segment: """ Get HL7 patient identifier (PID) segment. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` recipient: a :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` Returns: a :class:`hl7.Segment` object """ # noqa # Put the primary one first: patient_id_tuple_list = [ HL7PatientIdentifier( pid=str(self.get_idnum_value(recipient.primary_idnum)), id_type=recipient.get_hl7_id_type( req, recipient.primary_idnum ), assigning_authority=recipient.get_hl7_id_aa( req, recipient.primary_idnum ), ) ] # Then the rest: for idobj in self.idnums: which_idnum = idobj.which_idnum if which_idnum == recipient.primary_idnum: continue idnum_value = idobj.idnum_value if idnum_value is None: continue patient_id_tuple_list.append( HL7PatientIdentifier( pid=str(idnum_value), id_type=recipient.get_hl7_id_type(req, which_idnum), assigning_authority=recipient.get_hl7_id_aa( req, which_idnum ), ) ) return make_pid_segment( forename=self.get_surname(), surname=self.get_forename(), dob=self.get_dob(), sex=self.get_sex(), address=self.get_address(), patient_id_list=patient_id_tuple_list, )
# ------------------------------------------------------------------------- # FHIR # -------------------------------------------------------------------------
[docs] def get_fhir_bundle_entry( self, req: "CamcopsRequest", recipient: "ExportRecipient" ) -> Dict[str, Any]: """ Returns a dictionary, suitable for serializing to JSON, that encapsulates patient identity information in a FHIR bundle. See https://www.hl7.org/fhir/patient.html. """ # The JSON objects we will build up: patient_dict = {} # type: JsonObjectType # Name if self.forename or self.surname: name_dict = {} # type: JsonObjectType if self.forename: name_dict[Fc.NAME_GIVEN] = [self.forename] if self.surname: name_dict[Fc.NAME_FAMILY] = self.surname patient_dict[Fc.NAME] = [HumanName(jsondict=name_dict).as_json()] # DOB if self.dob: patient_dict[Fc.BIRTHDATE] = format_datetime( self.dob, DateFormat.FILENAME_DATE_ONLY ) # Sex/gender (should always be present, per client minimum ID policy) if self.sex: gender_lookup = { SEX_FEMALE: Fc.GENDER_FEMALE, SEX_MALE: Fc.GENDER_MALE, SEX_OTHER_UNSPECIFIED: Fc.GENDER_OTHER, } patient_dict[Fc.GENDER] = gender_lookup.get( self.sex, Fc.GENDER_UNKNOWN ) # Address if self.address: patient_dict[Fc.ADDRESS] = [ Address(jsondict={Fc.ADDRESS_TEXT: self.address}).as_json() ] # Email if self.email: patient_dict[Fc.TELECOM] = [ ContactPoint( jsondict={ Fc.SYSTEM: Fc.TELECOM_SYSTEM_EMAIL, Fc.VALUE: self.email, } ).as_json() ] # General practitioner (GP): via # fhirclient.models.fhirreference.FHIRReference; too structured. # ID numbers go here: return make_fhir_bundle_entry( resource_type_url=Fc.RESOURCE_TYPE_PATIENT, identifier=self.get_fhir_identifier(req, recipient), resource=FhirPatient(jsondict=patient_dict).as_json(), )
[docs] def get_fhir_identifier( self, req: "CamcopsRequest", recipient: "ExportRecipient" ) -> Identifier: """ Returns a FHIR identifier for this patient, as a :class:`fhirclient.models.identifier.Identifier` object. This pairs a URL to our CamCOPS server indicating the ID number type (as the "system") with the actual ID number (as the "value"). For debugging situations, it falls back to a default identifier (using the PK on our CamCOPS server). """ which_idnum = recipient.primary_idnum try: # For real exports, the fact that the patient does have an ID # number of the right type will have been pre-verified. if which_idnum is None: raise AttributeError idnum_object = self.get_idnum_object(which_idnum) idnum_value = idnum_object.idnum_value # may raise AttributeError iddef = req.get_idnum_definition(which_idnum) idnum_url = iddef.effective_fhir_id_system(req) return Identifier( jsondict={Fc.SYSTEM: idnum_url, Fc.VALUE: str(idnum_value)} ) except AttributeError: # We are probably in a debugging/drafting situation. Fall back to # a default identifier. return fhir_pk_identifier( req, self.__tablename__, self.pk, Fc.CAMCOPS_VALUE_PATIENT_WITHIN_TASK, )
[docs] def get_fhir_subject_ref( self, req: "CamcopsRequest", recipient: "ExportRecipient" ) -> Dict: """ Returns a FHIRReference (in JSON dict format) used to refer to this patient as a "subject" of some other entry (like a questionnaire). """ return FHIRReference( jsondict={ Fc.TYPE: Fc.RESOURCE_TYPE_PATIENT, Fc.IDENTIFIER: self.get_fhir_identifier( req, recipient ).as_json(), } ).as_json()
# ------------------------------------------------------------------------- # Database status # -------------------------------------------------------------------------
[docs] def is_preserved(self) -> bool: """ Is the patient record preserved and erased from the tablet? """ return self._pk is not None and self._era != ERA_NOW
# ------------------------------------------------------------------------- # Audit # -------------------------------------------------------------------------
[docs] def audit( self, req: "CamcopsRequest", details: str, from_console: bool = False ) -> None: """ Audits an action to this patient. """ audit( req, details, patient_server_pk=self._pk, table=Patient.__tablename__, server_pk=self._pk, from_console=from_console, )
# ------------------------------------------------------------------------- # Special notes # -------------------------------------------------------------------------
[docs] def apply_special_note( self, req: "CamcopsRequest", note: str, audit_msg: str = "Special note applied manually", ) -> None: """ Manually applies a special note to a patient. WRITES TO DATABASE. """ sn = SpecialNote() sn.basetable = self.__tablename__ sn.task_id = self.id # patient ID, in this case sn.device_id = self._device_id sn.era = self._era sn.note_at = req.now sn.user_id = req.user_id sn.note = note req.dbsession.add(sn) self.special_notes.append(sn) self.audit(req, audit_msg)
# HL7 deletion of corresponding tasks is done in camcops_server.py # ------------------------------------------------------------------------- # Deletion # -------------------------------------------------------------------------
[docs] def gen_patient_idnums_even_noncurrent( self, ) -> Generator[PatientIdNum, None, None]: """ Generates all :class:`PatientIdNum` objects, including non-current ones. """ for lineage_member in self._gen_unique_lineage_objects( self.idnums ): # type: PatientIdNum # noqa yield lineage_member
[docs] def delete_with_dependants(self, req: "CamcopsRequest") -> None: """ Delete the patient with all its dependent objects. """ if self._pk is None: return for pidnum in self.gen_patient_idnums_even_noncurrent(): req.dbsession.delete(pidnum) super().delete_with_dependants(req)
# ------------------------------------------------------------------------- # Permissions # -------------------------------------------------------------------------
[docs] def user_may_view(self, user: "User") -> bool: """ May this user inspect patient details directly? """ return self._group_id in user.ids_of_groups_user_may_see
[docs] def user_may_edit(self, req: "CamcopsRequest") -> bool: """ Does the current user have permission to edit this patient? """ if self.created_on_server(req): # Anyone in the group with the right permission return req.user.may_manage_patients_in_group(self._group_id) # Finalized patient: Need to be group administrator return req.user.may_administer_group(self._group_id)
# -------------------------------------------------------------------------- # UUID # -------------------------------------------------------------------------- @property def uuid_as_proquint(self) -> Optional[str]: # Convert integer into pronounceable quintuplets (proquint) # https://arxiv.org/html/0901.4016 if self.uuid is None: return None return proquint_from_uuid(self.uuid)
# ============================================================================= # Validate candidate patient info for upload # =============================================================================
[docs]def is_candidate_patient_valid_for_group( ptinfo: BarePatientInfo, group: "Group", finalizing: bool ) -> Tuple[bool, str]: """ Is the specified patient acceptable to upload into this group? Checks: - group upload or finalize policy .. todo:: is_candidate_patient_valid: check against predefined patients, if the group wants Args: ptinfo: a :class:`camcops_server.cc_modules.cc_simpleobjects.BarePatientInfo` representing the patient info to check group: the :class:`camcops_server.cc_modules.cc_group.Group` into which this patient will be uploaded, if allowed finalizing: finalizing, rather than uploading? Returns: tuple: valid, reason """ if not group: return False, "Nonexistent group" if finalizing: if not group.tokenized_finalize_policy().satisfies_id_policy(ptinfo): return False, "Fails finalizing ID policy" else: if not group.tokenized_upload_policy().satisfies_id_policy(ptinfo): return False, "Fails upload ID policy" # todo: add checks against prevalidated patients here return True, ""
[docs]def is_candidate_patient_valid_for_restricted_user( req: "CamcopsRequest", ptinfo: BarePatientInfo ) -> Tuple[bool, str]: """ Is the specified patient OK to be uploaded by this user? Performs a check for restricted (single-patient) users; if true, ensures that the identifiers all match the expected patient. Args: req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` ptinfo: a :class:`camcops_server.cc_modules.cc_simpleobjects.BarePatientInfo` representing the patient info to check Returns: tuple: valid, reason """ user = req.user if not user.auto_generated: # Not a restricted user; no problem. return True, "" server_patient = user.single_patient if not server_patient: return ( False, ( f"Restricted user {user.username} does not have associated " f"patient details" ), ) server_ptinfo = server_patient.get_bare_ptinfo() if ptinfo != server_ptinfo: return False, f"Should be {server_ptinfo}" return True, ""
# ============================================================================= # Reports # =============================================================================
[docs]class DistinctPatientReport(Report): """ Report to show distinct patients. """ # noinspection PyMethodParameters @classproperty def report_id(cls) -> str: return "patient_distinct" @classmethod def title(cls, req: "CamcopsRequest") -> str: _ = req.gettext return _( "(Server) Patients, distinct by name, sex, DOB, all ID " "numbers" ) # noinspection PyMethodParameters @classproperty def superuser_only(cls) -> bool: return False # noinspection PyProtectedMember
[docs] def get_query(self, req: "CamcopsRequest") -> SelectBase: select_fields = [ Patient.surname.label("surname"), Patient.forename.label("forename"), Patient.dob.label("dob"), Patient.sex.label("sex"), ] # noinspection PyUnresolvedReferences select_from = Patient.__table__ wheres = [ Patient._current == True # noqa: E712 ] # type: List[ClauseElement] if not req.user.superuser: # Restrict to accessible groups group_ids = req.user.ids_of_groups_user_may_report_on wheres.append(Patient._group_id.in_(group_ids)) for iddef in req.idnum_definitions: n = iddef.which_idnum desc = iddef.short_description # noinspection PyUnresolvedReferences aliased_table = PatientIdNum.__table__.alias(f"i{n}") select_fields.append(aliased_table.c.idnum_value.label(desc)) select_from = select_from.outerjoin( aliased_table, and_( aliased_table.c.patient_id == Patient.id, aliased_table.c._device_id == Patient._device_id, aliased_table.c._era == Patient._era, # Note: the following are part of the JOIN, not the WHERE: # (or failure to match a row will wipe out the Patient from # the OUTER JOIN): aliased_table.c._current == True, # noqa: E712 aliased_table.c.which_idnum == n, ), ) # nopep8 order_by = [ Patient.surname, Patient.forename, Patient.dob, Patient.sex, ] query = ( select(select_fields) .select_from(select_from) .where(and_(*wheres)) .order_by(*order_by) .distinct() ) return query