"""
camcops_server/cc_modules/cc_db.py
===============================================================================
Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CamCOPS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**Common database code, e.g. mixins for tables that are uploaded from the
client.**
"""
from collections import OrderedDict
import datetime
import logging
from typing import (
Any,
Callable,
Dict,
Generator,
Iterable,
List,
NoReturn,
Optional,
Sequence,
Set,
Tuple,
Type,
TYPE_CHECKING,
TypeVar,
Union,
)
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.sqlalchemy.orm_inspect import gen_columns
from pendulum import DateTime as Pendulum
from semantic_version import Version
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.orm.relationships import RelationshipProperty
from sqlalchemy.orm import Session as SqlASession
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.schema import Column, ForeignKey
from sqlalchemy.sql.sqltypes import Integer
from camcops_server.cc_modules.cc_constants import (
CLIENT_DATE_FIELD,
ERA_NOW,
EXTRA_COMMENT_PREFIX,
EXTRA_TASK_SERVER_PK_FIELD,
EXTRA_TASK_TABLENAME_FIELD,
MOVE_OFF_TABLET_FIELD,
SPREADSHEET_PATIENT_FIELD_PREFIX,
TABLET_ID_FIELD,
)
from camcops_server.cc_modules.cc_dataclasses import SummarySchemaInfo
from camcops_server.cc_modules.cc_sqla_coltypes import (
camcops_column,
COLATTR_BLOB_RELATIONSHIP_ATTR_NAME,
COLATTR_PERMITTED_VALUE_CHECKER,
EraColType,
gen_ancillary_relationships,
gen_camcops_blob_columns,
PendulumDateTimeAsIsoTextColType,
PermittedValueChecker,
RelationshipInfo,
SemanticVersionColType,
TableNameColType,
)
from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions
from camcops_server.cc_modules.cc_spreadsheet import SpreadsheetPage
from camcops_server.cc_modules.cc_version import CAMCOPS_SERVER_VERSION
from camcops_server.cc_modules.cc_xml import (
make_xml_branches_from_blobs,
make_xml_branches_from_columns,
make_xml_branches_from_summaries,
XML_COMMENT_STORED,
XML_COMMENT_CALCULATED,
XmlElement,
)
if TYPE_CHECKING:
from camcops_server.cc_modules.cc_blob import Blob
from camcops_server.cc_modules.cc_device import Device
from camcops_server.cc_modules.cc_group import Group
from camcops_server.cc_modules.cc_patient import Patient
from camcops_server.cc_modules.cc_request import (
CamcopsRequest,
)
from camcops_server.cc_modules.cc_summaryelement import (
SummaryElement,
)
from camcops_server.cc_modules.cc_task import Task
from camcops_server.cc_modules.cc_user import User
log = BraceStyleAdapter(logging.getLogger(__name__))
# =============================================================================
# Hacks for specific database drivers
# =============================================================================
CRASH_ON_BAD_CONVERSIONS = False # for debugging only!
if CRASH_ON_BAD_CONVERSIONS:
log.error("DANGER: CRASH_ON_BAD_CONVERSIONS set in cc_db.py")
try:
import MySQLdb
import MySQLdb.converters
except ImportError:
MySQLdb = None
try:
import pymysql
import pymysql.converters
except ImportError:
pymysql = None
_SQL_LITERAL_TYPE = Union[int, float, str]
_MYSQL_CONVERSION_DICT_TYPE = Dict[Any, Callable]
_MYSQLDB_PYTHON_TO_DB_TYPE = Callable[
[Any, _MYSQL_CONVERSION_DICT_TYPE], _SQL_LITERAL_TYPE
] # f(o, d) -> s
_MYSQLDB_DB_TO_PYTHON_TYPE = Callable[[_SQL_LITERAL_TYPE], Any] # f(s) -> o
_PYMYSQL_ENCODER_DICT_TYPE = Dict[Type, Callable]
_PYMYSQL_PYTHON_TO_DB_TYPE = Callable[
[Any, Optional[_PYMYSQL_ENCODER_DICT_TYPE]], _SQL_LITERAL_TYPE
] # f(o, mapping) -> s
_PYMYSQL_DB_TO_PYTHON_TYPE = Callable[[_SQL_LITERAL_TYPE], Any]
[docs]def mysqldb_crash_on_bad_conversion(
o: Any, d: _MYSQL_CONVERSION_DICT_TYPE
) -> NoReturn:
"""
Reports a bad conversion and crashes. For debugging only (obviously)!
**Conversions by mysqlclient (MySQLdb)**
As per the help docstring for ``MySQLdb/converters.py``,
- the Python-to-database conversion function has the signature ``f(o, d)``
where ``o`` is the thing to be converted (such as a datetime.datetime)
and ``d`` is the conversion dictionary; it returns an SQL literal value.
- The database-to-Python conversion function has the argument ``f(s)``
where ``s`` is a string; it returns a Python object.
Both types of functions are stored in ``MySQLdb.converters``, which is a
``dict``. The keys named ``FIELD_TYPE.*`` are the database-to-Python
converters; the others are the Python-to-database converters.
**Conversions by pymysql**
Similar (for back compatibility), but not the same.
- ``pymysql.converters.conversions`` is ``pymysql.converters.decoders`` and
contains database-to-Python converters.
- ``pymysql.converters.encoders`` contains Python-to-database converters.
Args:
o: Python object
d: MySQLdb conversion dictionary
Returns:
SQL literal
"""
failmsg = (
f"mysqldb_crash_on_bad_conversion: attempting to convert bad Python "
f"object to database: {o!r}. Conversion dict is {d!r}."
)
log.critical(failmsg)
raise RuntimeError(failmsg)
[docs]def pymysql_crash_on_bad_conversion(
obj: Any, mapping: _PYMYSQL_ENCODER_DICT_TYPE
) -> NoReturn:
"""
See :func:`mysqldb_crash_on_bad_conversion`.
"""
failmsg = (
f"pymysql_crash_on_bad_conversion: attempting to convert bad Python "
f"object to database: {obj!r}. Mapping dict is {mapping!r}."
)
log.critical(failmsg)
raise RuntimeError(failmsg)
# -----------------------------------------------------------------------------
# Pendulum; see https://pypi.org/project/pendulum/ -- but note that it says
# "pymysql.converters.conversions" but should say
# "pymysql.converters.encoders".
# -----------------------------------------------------------------------------
if MySQLdb:
log.debug("Hacking MySQLdb to support pendulum.DateTime")
if CRASH_ON_BAD_CONVERSIONS:
MySQLdb.converters.conversions[Pendulum] = (
mysqldb_crash_on_bad_conversion
)
else:
MySQLdb.converters.conversions[Pendulum] = (
MySQLdb.converters.DateTime2literal
)
if pymysql:
log.debug("Hacking pymysql to support pendulum.DateTime")
if CRASH_ON_BAD_CONVERSIONS:
pymysql.converters.encoders[Pendulum] = pymysql_crash_on_bad_conversion
else:
pymysql.converters.encoders[Pendulum] = (
pymysql.converters.escape_datetime
)
# And also, as per the source code and
# https://stackoverflow.com/questions/59871904/convert-pymysql-query-result-with-mysql-decimal-type-to-python-float # noqa
pymysql.converters.conversions = pymysql.converters.encoders.copy()
pymysql.converters.conversions.update(pymysql.converters.decoders)
# =============================================================================
# Constants
# =============================================================================
T = TypeVar("T")
# Database fieldname constants. Do not change. Used here and in client_api.py
FN_PK = "_pk"
FN_DEVICE_ID = "_device_id"
FN_ERA = "_era"
FN_CURRENT = "_current"
FN_WHEN_ADDED_EXACT = "_when_added_exact"
FN_WHEN_ADDED_BATCH_UTC = "_when_added_batch_utc"
FN_ADDING_USER_ID = "_adding_user_id"
FN_WHEN_REMOVED_EXACT = "_when_removed_exact"
FN_WHEN_REMOVED_BATCH_UTC = "_when_removed_batch_utc"
FN_REMOVING_USER_ID = "_removing_user_id"
FN_PRESERVING_USER_ID = "_preserving_user_id"
FN_FORCIBLY_PRESERVED = "_forcibly_preserved"
FN_PREDECESSOR_PK = "_predecessor_pk"
FN_SUCCESSOR_PK = "_successor_pk"
FN_MANUALLY_ERASED = "_manually_erased"
FN_MANUALLY_ERASED_AT = "_manually_erased_at"
FN_MANUALLY_ERASING_USER_ID = "_manually_erasing_user_id"
FN_CAMCOPS_VERSION = "_camcops_version"
FN_ADDITION_PENDING = "_addition_pending"
FN_REMOVAL_PENDING = "_removal_pending"
FN_GROUP_ID = "_group_id"
# Common fieldnames used by all tasks. Do not change.
TFN_WHEN_CREATED = "when_created"
TFN_WHEN_FIRSTEXIT = "when_firstexit"
TFN_FIRSTEXIT_IS_FINISH = "firstexit_is_finish"
TFN_FIRSTEXIT_IS_ABORT = "firstexit_is_abort"
TFN_EDITING_TIME_S = "editing_time_s"
# Fieldnames for the task patient mixin. Do not change.
TFN_PATIENT_ID = "patient_id"
# Fieldnames for the task clinician mixin. Do not change.
TFN_CLINICIAN_SPECIALTY = "clinician_specialty"
TFN_CLINICIAN_NAME = "clinician_name"
TFN_CLINICIAN_PROFESSIONAL_REGISTRATION = "clinician_professional_registration"
TFN_CLINICIAN_POST = "clinician_post"
TFN_CLINICIAN_SERVICE = "clinician_service"
TFN_CLINICIAN_CONTACT_DETAILS = "clinician_contact_details"
# Fieldnames for the task respondent mixin. Do not change.
TFN_RESPONDENT_NAME = "respondent_name"
TFN_RESPONDENT_RELATIONSHIP = "respondent_relationship"
# Selected field/column names for patients. Do not change.
PFN_UUID = "uuid"
# Column names for task summaries.
SFN_IS_COMPLETE = "is_complete"
SFN_SECONDS_CREATION_TO_FIRST_FINISH = "seconds_from_creation_to_first_finish"
SFN_CAMCOPS_SERVER_VERSION = "camcops_server_version"
RESERVED_FIELDS = ( # fields that tablets can't upload
FN_PK,
FN_DEVICE_ID,
FN_ERA,
FN_CURRENT,
FN_WHEN_ADDED_EXACT,
FN_WHEN_ADDED_BATCH_UTC,
FN_ADDING_USER_ID,
FN_WHEN_REMOVED_EXACT,
FN_WHEN_REMOVED_BATCH_UTC,
FN_REMOVING_USER_ID,
FN_PRESERVING_USER_ID,
FN_FORCIBLY_PRESERVED,
FN_PREDECESSOR_PK,
FN_SUCCESSOR_PK,
FN_MANUALLY_ERASED,
FN_MANUALLY_ERASED_AT,
FN_MANUALLY_ERASING_USER_ID,
FN_CAMCOPS_VERSION,
FN_ADDITION_PENDING,
FN_REMOVAL_PENDING,
FN_GROUP_ID,
) # but more generally: they start with "_"...
assert all(x.startswith("_") for x in RESERVED_FIELDS)
TABLET_STANDARD_FIELDS = RESERVED_FIELDS + (
TABLET_ID_FIELD,
CLIENT_DATE_FIELD, # when_last_modified
MOVE_OFF_TABLET_FIELD,
)
TASK_STANDARD_FIELDS = TABLET_STANDARD_FIELDS + (
# All tasks:
TFN_WHEN_CREATED,
TFN_WHEN_FIRSTEXIT,
TFN_FIRSTEXIT_IS_FINISH,
TFN_FIRSTEXIT_IS_ABORT,
TFN_EDITING_TIME_S,
)
TASK_FREQUENT_AND_FK_FIELDS = TASK_STANDARD_FIELDS + (
# Tasks with a patient:
TFN_PATIENT_ID,
)
TASK_FREQUENT_FIELDS = TASK_FREQUENT_AND_FK_FIELDS + (
# Tasks with a clinician:
TFN_CLINICIAN_SPECIALTY,
TFN_CLINICIAN_NAME,
TFN_CLINICIAN_PROFESSIONAL_REGISTRATION,
TFN_CLINICIAN_POST,
TFN_CLINICIAN_SERVICE,
TFN_CLINICIAN_CONTACT_DETAILS,
# Tasks with a respondent:
TFN_RESPONDENT_NAME,
TFN_RESPONDENT_RELATIONSHIP,
)
REMOVE_COLUMNS_FOR_SIMPLIFIED_SPREADSHEETS = {
# keep this: CLIENT_DATE_FIELD = when_last_modified
# keep this: FN_PK = task PK
# keep this: SFN_IS_COMPLETE = is the task complete
# keep this: SPREADSHEET_PATIENT_FIELD_PREFIX + FN_PK = patient PK
# keep this: TFN_WHEN_CREATED = main creation time
FN_ADDING_USER_ID,
FN_ADDITION_PENDING,
FN_CAMCOPS_VERSION, # debatable; version that captured the original data
FN_CURRENT,
FN_DEVICE_ID,
FN_ERA,
FN_FORCIBLY_PRESERVED,
FN_GROUP_ID,
FN_MANUALLY_ERASED,
FN_MANUALLY_ERASED_AT,
FN_MANUALLY_ERASING_USER_ID,
FN_PREDECESSOR_PK,
FN_PRESERVING_USER_ID,
FN_REMOVAL_PENDING,
FN_REMOVING_USER_ID,
FN_SUCCESSOR_PK,
FN_WHEN_ADDED_BATCH_UTC,
FN_WHEN_ADDED_EXACT,
FN_WHEN_REMOVED_BATCH_UTC,
FN_WHEN_REMOVED_EXACT,
MOVE_OFF_TABLET_FIELD,
SFN_CAMCOPS_SERVER_VERSION, # debatable; version that generated summary information # noqa
SFN_SECONDS_CREATION_TO_FIRST_FINISH,
SPREADSHEET_PATIENT_FIELD_PREFIX + CLIENT_DATE_FIELD,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_ADDING_USER_ID,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_ADDITION_PENDING,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_CAMCOPS_VERSION,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_CURRENT,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_DEVICE_ID,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_ERA,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_FORCIBLY_PRESERVED,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_GROUP_ID,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_MANUALLY_ERASED,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_MANUALLY_ERASED_AT,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_MANUALLY_ERASING_USER_ID,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_PREDECESSOR_PK,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_PRESERVING_USER_ID,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_REMOVAL_PENDING,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_REMOVING_USER_ID,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_SUCCESSOR_PK,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_WHEN_ADDED_BATCH_UTC,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_WHEN_ADDED_EXACT,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_WHEN_REMOVED_BATCH_UTC,
SPREADSHEET_PATIENT_FIELD_PREFIX + FN_WHEN_REMOVED_EXACT,
SPREADSHEET_PATIENT_FIELD_PREFIX + MOVE_OFF_TABLET_FIELD,
SPREADSHEET_PATIENT_FIELD_PREFIX + PFN_UUID,
SPREADSHEET_PATIENT_FIELD_PREFIX + TABLET_ID_FIELD,
TABLET_ID_FIELD,
TFN_EDITING_TIME_S,
TFN_FIRSTEXIT_IS_ABORT,
TFN_FIRSTEXIT_IS_FINISH,
TFN_PATIENT_ID,
TFN_WHEN_FIRSTEXIT,
}
# =============================================================================
# GenericTabletRecordMixin
# =============================================================================
# noinspection PyAttributeOutsideInit
[docs]class GenericTabletRecordMixin(object):
"""
Mixin for all tables that are uploaded from the client, representing the
fields that the server adds at the point of upload.
From the server's perspective, ``_pk`` is the unique primary key.
However, records are defined also in their tablet context, for which an
individual tablet (defined by the combination of ``_device_id`` and
``_era``) sees its own PK, ``id``.
"""
__tablename__ = None # type: str # sorts out some mixin type checking
# -------------------------------------------------------------------------
# On the server side:
# -------------------------------------------------------------------------
# Plain columns
# noinspection PyMethodParameters
_pk: Mapped[int] = mapped_column(
FN_PK,
primary_key=True,
autoincrement=True,
index=True,
comment="(SERVER) Primary key (on the server)",
)
# noinspection PyMethodParameters
_device_id: Mapped[int] = mapped_column(
FN_DEVICE_ID,
ForeignKey("_security_devices.id", use_alter=True),
index=True,
comment="(SERVER) ID of the source tablet device",
)
# noinspection PyMethodParameters
_era: Mapped[str] = mapped_column(
FN_ERA,
EraColType,
index=True,
comment="(SERVER) 'NOW', or when this row was preserved and "
"removed from the source device (UTC ISO 8601)",
)
# ... note that _era is textual so that plain comparison
# with "=" always works, i.e. no NULLs -- for USER comparison too, not
# just in CamCOPS code
# noinspection PyMethodParameters
_current: Mapped[bool] = mapped_column(
FN_CURRENT,
index=True,
comment="(SERVER) Is the row current (1) or not (0)?",
)
# noinspection PyMethodParameters
_when_added_exact: Mapped[Optional[Pendulum]] = mapped_column(
FN_WHEN_ADDED_EXACT,
PendulumDateTimeAsIsoTextColType,
comment="(SERVER) Date/time this row was added (ISO 8601)",
)
# noinspection PyMethodParameters
_when_added_batch_utc: Mapped[Optional[datetime.datetime]] = mapped_column(
FN_WHEN_ADDED_BATCH_UTC,
comment="(SERVER) Date/time of the upload batch that added this "
"row (DATETIME in UTC)",
)
# noinspection PyMethodParameters
_adding_user_id: Mapped[Optional[int]] = mapped_column(
FN_ADDING_USER_ID,
ForeignKey("_security_users.id"),
comment="(SERVER) ID of user that added this row",
)
# noinspection PyMethodParameters
_when_removed_exact: Mapped[Optional[Pendulum]] = mapped_column(
FN_WHEN_REMOVED_EXACT,
PendulumDateTimeAsIsoTextColType,
comment="(SERVER) Date/time this row was removed, i.e. made "
"not current (ISO 8601)",
)
# noinspection PyMethodParameters
_when_removed_batch_utc: Mapped[Optional[datetime.datetime]] = (
mapped_column(
FN_WHEN_REMOVED_BATCH_UTC,
comment="(SERVER) Date/time of the upload batch that removed "
"this row (DATETIME in UTC)",
)
)
_removing_user_id: Mapped[Optional[int]] = mapped_column(
FN_REMOVING_USER_ID,
ForeignKey("_security_users.id"),
comment="(SERVER) ID of user that removed this row",
)
# noinspection PyMethodParameters
_preserving_user_id: Mapped[Optional[int]] = mapped_column(
FN_PRESERVING_USER_ID,
ForeignKey("_security_users.id"),
comment="(SERVER) ID of user that preserved this row",
)
# noinspection PyMethodParameters
_forcibly_preserved: Mapped[Optional[bool]] = mapped_column(
FN_FORCIBLY_PRESERVED,
default=False,
comment="(SERVER) Forcibly preserved by superuser (rather than "
"normally preserved by tablet)?",
)
# noinspection PyMethodParameters
_predecessor_pk: Mapped[Optional[int]] = mapped_column(
FN_PREDECESSOR_PK,
comment="(SERVER) PK of predecessor record, prior to modification",
)
# noinspection PyMethodParameters
_successor_pk: Mapped[Optional[int]] = mapped_column(
FN_SUCCESSOR_PK,
comment="(SERVER) PK of successor record (after modification) "
"or NULL (whilst live, or after deletion)",
)
# noinspection PyMethodParameters
_manually_erased: Mapped[Optional[bool]] = mapped_column(
FN_MANUALLY_ERASED,
default=False,
comment="(SERVER) Record manually erased (content destroyed)?",
)
# noinspection PyMethodParameters
_manually_erased_at: Mapped[Optional[Pendulum]] = mapped_column(
FN_MANUALLY_ERASED_AT,
PendulumDateTimeAsIsoTextColType,
comment="(SERVER) Date/time of manual erasure (ISO 8601)",
)
# noinspection PyMethodParameters
_manually_erasing_user_id: Mapped[Optional[int]] = mapped_column(
FN_MANUALLY_ERASING_USER_ID,
ForeignKey("_security_users.id"),
comment="(SERVER) ID of user that erased this row manually",
)
# noinspection PyMethodParameters
_camcops_version: Mapped[Optional[Version]] = mapped_column(
FN_CAMCOPS_VERSION,
SemanticVersionColType,
default=CAMCOPS_SERVER_VERSION,
comment="(SERVER) CamCOPS version number of the uploading device",
)
# noinspection PyMethodParameters
_addition_pending: Mapped[bool] = mapped_column(
FN_ADDITION_PENDING,
default=False,
comment="(SERVER) Addition pending?",
)
# noinspection PyMethodParameters
_removal_pending: Mapped[Optional[bool]] = mapped_column(
FN_REMOVAL_PENDING,
default=False,
comment="(SERVER) Removal pending?",
)
# noinspection PyMethodParameters
_group_id: Mapped[int] = mapped_column(
FN_GROUP_ID,
ForeignKey("_security_groups.id"),
index=True,
comment="(SERVER) ID of group to which this record belongs",
)
# -------------------------------------------------------------------------
# Fields that *all* client tables have:
# -------------------------------------------------------------------------
# noinspection PyMethodParameters
id: Mapped[int] = mapped_column(
TABLET_ID_FIELD,
index=True,
comment="(TASK) Primary key (task ID) on the tablet device",
)
# noinspection PyMethodParameters
when_last_modified: Mapped[Optional[Pendulum]] = mapped_column(
CLIENT_DATE_FIELD,
PendulumDateTimeAsIsoTextColType,
index=True, # ... as used by database upload script
comment="(STANDARD) Date/time this row was last modified on the "
"source tablet device (ISO 8601)",
)
# noinspection PyMethodParameters
_move_off_tablet: Mapped[Optional[bool]] = mapped_column(
MOVE_OFF_TABLET_FIELD,
default=False,
comment="(SERVER/TABLET) Record-specific preservation pending?",
)
# -------------------------------------------------------------------------
# Relationships
# -------------------------------------------------------------------------
# noinspection PyMethodParameters
@declared_attr
def _device(cls) -> Mapped["Device"]:
return relationship("Device")
# noinspection PyMethodParameters
@declared_attr
def _adding_user(cls) -> Mapped["User"]:
return relationship("User", foreign_keys=[cls._adding_user_id]) # type: ignore[list-item] # noqa: E501
# noinspection PyMethodParameters
@declared_attr
def _removing_user(cls) -> Mapped["User"]:
return relationship("User", foreign_keys=[cls._removing_user_id]) # type: ignore[list-item] # noqa: E501
# noinspection PyMethodParameters
@declared_attr
def _preserving_user(cls) -> Mapped["User"]:
return relationship("User", foreign_keys=[cls._preserving_user_id]) # type: ignore[list-item] # noqa: E501
# noinspection PyMethodParameters
@declared_attr
def _manually_erasing_user(cls) -> Mapped["User"]:
return relationship(
"User", foreign_keys=[cls._manually_erasing_user_id] # type: ignore[list-item] # noqa: E501
)
# noinspection PyMethodParameters
@declared_attr
def _group(cls) -> Mapped["Group"]:
return relationship("Group", foreign_keys=[cls._group_id]) # type: ignore[list-item] # noqa: E501
# -------------------------------------------------------------------------
# Fetching attributes
# -------------------------------------------------------------------------
@property
def pk(self) -> Optional[int]:
"""
Returns the (server) primary key of this record.
"""
return self._pk
@property
def era(self) -> Optional[str]:
"""
Returns the era of this record (a text representation of the date/time
of the point of record finalization, or ``NOW`` if the record is still
present on the client device).
"""
return self._era
@property
def device_id(self) -> Optional[int]:
"""
Returns the client device ID of this record.
"""
return self._device_id
@property
def group_id(self) -> Optional[int]:
"""
Returns the group ID of this record.
"""
return self._group_id
# -------------------------------------------------------------------------
# Other universal properties
# -------------------------------------------------------------------------
[docs] def is_live_on_tablet(self) -> bool:
"""
Is the record live on a tablet (not finalized)?
"""
return self._era == ERA_NOW
[docs] def is_finalized(self) -> bool:
"""
Is the record finalized (no longer available to be edited on the
client device), and therefore (if required) editable on the server?
"""
return not self.is_live_on_tablet()
[docs] def created_on_server(self, req: "CamcopsRequest") -> bool:
"""
Was this record created on the server?
"""
from camcops_server.cc_modules.cc_device import (
Device,
) # delayed import
server_device = Device.get_server_device(req.dbsession)
return self._era == ERA_NOW and self._device_id == server_device.id
# -------------------------------------------------------------------------
# Autoscanning objects and their relationships
# -------------------------------------------------------------------------
def _get_xml_root(
self, req: "CamcopsRequest", options: TaskExportOptions
) -> XmlElement:
"""
Called to create an XML root object for records ancillary to Task
objects. Tasks themselves use a more complex mechanism.
Args:
req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
options: a :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions`
""" # noqa
# "__tablename__" will make the type checker complain, as we're
# defining a function for a mixin that assumes it's mixed in to a
# SQLAlchemy Base-derived class
# noinspection PyUnresolvedReferences
return XmlElement(
name=self.__tablename__,
value=self._get_xml_branches(req=req, options=options),
)
def _get_xml_branches(
self, req: "CamcopsRequest", options: TaskExportOptions
) -> List[XmlElement]:
"""
Gets the values of SQLAlchemy columns as XmlElement objects.
Optionally, find any SQLAlchemy relationships that are relationships
to Blob objects, and include them too.
Used by :func:`_get_xml_root` above, but also by Tasks themselves.
Args:
req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
options: a :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions`
""" # noqa
# log.debug("_get_xml_branches for {!r}", self)
options = options or TaskExportOptions(
xml_include_plain_columns=True,
xml_include_calculated=True,
xml_sort_by_name=True,
)
branches = [] # type: List[XmlElement]
if options.xml_with_header_comments:
branches.append(XML_COMMENT_STORED)
if options.xml_include_plain_columns:
new_branches = make_xml_branches_from_columns(
self, skip_fields=options.xml_skip_fields
)
if options.xml_sort_by_name:
new_branches.sort(key=lambda el: el.name)
branches += new_branches
if options.include_blobs:
new_branches = make_xml_branches_from_blobs(
req, self, skip_fields=options.xml_skip_fields
)
if options.xml_sort_by_name:
new_branches.sort(key=lambda el: el.name)
branches += new_branches
# Calculated
if options.xml_include_calculated:
if options.xml_with_header_comments:
branches.append(XML_COMMENT_CALCULATED)
branches.extend(
make_xml_branches_from_summaries(
self.get_summaries(req),
skip_fields=options.xml_skip_fields,
sort_by_name=options.xml_sort_by_name,
)
)
# log.debug("... branches for {!r}: {!r}", self, branches)
return branches
def _get_core_spreadsheet_page(
self, req: "CamcopsRequest", heading_prefix: str = ""
) -> SpreadsheetPage:
"""
Returns a single-row
:class:`camcops_server.cc_modules.cc_spreadsheet.SpreadsheetPage`, like
an Excel "sheet", representing this record. (It may be combined with
others later to produce a multi-row spreadsheet.)
"""
row = OrderedDict()
for attrname, column in gen_columns(self):
row[heading_prefix + attrname] = getattr(self, attrname)
for s in self.get_summaries(req):
row[heading_prefix + s.name] = s.value
return SpreadsheetPage(name=self.__tablename__, rows=[row])
def _get_core_spreadsheet_schema(
self, table_name: str = "", column_name_prefix: str = ""
) -> Set[SummarySchemaInfo]:
"""
Returns schema information compatible with
:func:`_get_core_spreadsheet_page`.
"""
return set(
SummarySchemaInfo.from_column(
column,
table_name=table_name,
column_name_prefix=column_name_prefix,
)
for _, column in gen_columns(self)
)
# -------------------------------------------------------------------------
# Erasing (overwriting data, not deleting the database records)
# -------------------------------------------------------------------------
[docs] def manually_erase_with_dependants(self, req: "CamcopsRequest") -> None:
"""
Manually erases a standard record and marks it so erased. Iterates
through any dependants and does likewise to them.
The object remains ``_current`` (if it was), as a placeholder, but its
contents are wiped.
WRITES TO THE DATABASE.
"""
if self._manually_erased or self._pk is None or self._era == ERA_NOW:
# ... _manually_erased: don't do it twice
# ... _pk: basic sanity check
# ... _era: don't erase things that are current on the tablet
return
# 1. "Erase my dependants"
for ancillary in self.gen_ancillary_instances_even_noncurrent():
ancillary.manually_erase_with_dependants(req)
for blob in self.gen_blobs_even_noncurrent():
blob.manually_erase_with_dependants(req)
# 2. "Erase me"
erasure_attrs = [] # type: List[str]
for attrname, column in gen_columns(self):
if attrname.startswith("_"): # system field
continue
if not column.nullable: # this should cover FKs
continue
if column.foreign_keys: # ... but to be sure...
continue
erasure_attrs.append(attrname)
for attrname in erasure_attrs:
setattr(self, attrname, None)
self._current = False
self._manually_erased = True
self._manually_erased_at = req.now
self._manually_erasing_user_id = req.user_id
[docs] def delete_with_dependants(self, req: "CamcopsRequest") -> None:
"""
Deletes (completely from the database) this record and any
dependant records.
"""
if self._pk is None:
return
# 1. "Delete my dependants"
for ancillary in self.gen_ancillary_instances_even_noncurrent():
ancillary.delete_with_dependants(req)
for blob in self.gen_blobs_even_noncurrent():
blob.delete_with_dependants(req)
# 2. "Delete me"
dbsession = SqlASession.object_session(self)
dbsession.delete(self)
[docs] def gen_attrname_ancillary_pairs(
self,
) -> Generator[Tuple[str, "GenericTabletRecordMixin"], None, None]:
"""
Iterates through and yields all ``_current`` "ancillary" objects
(typically: records of subtables).
Yields tuples of ``(attrname, related_record)``.
"""
for attrname, rel_prop, rel_cls in gen_ancillary_relationships(self):
if rel_prop.uselist:
ancillaries = getattr(
self, attrname
) # type: List[GenericTabletRecordMixin]
else:
ancillaries = [ # type: ignore[no-redef]
getattr(self, attrname)
] # type: List[GenericTabletRecordMixin]
for ancillary in ancillaries:
if ancillary is None:
continue
yield attrname, ancillary
[docs] def gen_ancillary_instances(
self,
) -> Generator["GenericTabletRecordMixin", None, None]:
"""
Generates all ``_current`` ancillary objects of this object.
"""
for attrname, ancillary in self.gen_attrname_ancillary_pairs():
yield ancillary
[docs] def gen_ancillary_instances_even_noncurrent(
self,
) -> Generator["GenericTabletRecordMixin", None, None]:
"""
Generates all ancillary objects of this object, even non-current
ones.
"""
for lineage_member in self._gen_unique_lineage_objects(
self.gen_ancillary_instances()
):
yield lineage_member
[docs] def gen_blobs(self) -> Generator["Blob", None, None]:
"""
Generate all ``_current`` BLOBs owned by this object.
"""
for id_attrname, column in gen_camcops_blob_columns(self):
relationship_attr = column.info.get(
COLATTR_BLOB_RELATIONSHIP_ATTR_NAME
)
blob = getattr(self, relationship_attr)
if blob is None:
continue
yield blob
[docs] def gen_blobs_even_noncurrent(self) -> Generator["Blob", None, None]:
"""
Generates all BLOBs owned by this object, even non-current ones.
"""
for lineage_member in self._gen_unique_lineage_objects( # type: ignore[assignment] # noqa: E501
self.gen_blobs()
): # type: "Blob"
yield lineage_member
[docs] def get_lineage(self) -> List["GenericTabletRecordMixin"]:
"""
Returns all records that are part of the same "lineage", that is:
- of the same class;
- matching on id/device_id/era;
- including both current and any historical non-current versions.
Will include the "self" object.
"""
dbsession = SqlASession.object_session(self)
cls = self.__class__
q = (
dbsession.query(cls)
.filter(cls.id == self.id)
.filter(cls._device_id == self._device_id)
.filter(cls._era == self._era)
)
return list(q)
@staticmethod
def _gen_unique_lineage_objects(
collection: Iterable["GenericTabletRecordMixin"],
) -> Generator["GenericTabletRecordMixin", None, None]:
"""
Given an iterable of database records, generate all related lineage
objects for each of them (via :meth:`get_lineage`) that are unique by
PK.
"""
seen_pks = set() # type: Set[int]
for item in collection:
if item is None:
continue
for lineage_member in item.get_lineage():
pk = lineage_member.pk
if pk in seen_pks:
continue
seen_pks.add(pk)
yield lineage_member
# -------------------------------------------------------------------------
# Retrieving a linked record by client ID
# -------------------------------------------------------------------------
[docs] @classmethod
def get_linked(
cls, client_id: Optional[int], other: "GenericTabletRecordMixin"
) -> Optional["GenericTabletRecordMixin"]:
"""
Returns a specific linked record, of the class of ``self``, whose
client-side ID is ``client_id``, and which matches ``other`` in terms
of device/era.
"""
if client_id is None:
return None
dbsession = SqlASession.object_session(other)
# noinspection PyPep8
q = (
dbsession.query(cls)
.filter(cls.id == client_id)
.filter(cls._device_id == other._device_id)
.filter(cls._era == other._era)
.filter(cls._current == True) # noqa: E712
)
return q.first()
# -------------------------------------------------------------------------
# History functions for server-side editing
# -------------------------------------------------------------------------
[docs] def set_predecessor(
self, req: "CamcopsRequest", predecessor: "GenericTabletRecordMixin"
) -> None:
"""
Used for some unusual server-side manipulations (e.g. editing patient
details).
Amends this object so the "self" object replaces the predecessor, so:
- "self" becomes current and refers back to "predecessor";
- "predecessor" becomes non-current and refers forward to "self".
"""
assert predecessor._current
# We become new and current, and refer to our predecessor
self._device_id = predecessor._device_id
self._era = predecessor._era
self._current = True
self._when_added_exact = req.now
self._when_added_batch_utc = req.now_utc
self._adding_user_id = req.user_id
if self._era != ERA_NOW:
self._preserving_user_id = req.user_id
self._forcibly_preserved = True
self._predecessor_pk = predecessor._pk
self._camcops_version = predecessor._camcops_version
self._group_id = predecessor._group_id
# Make our predecessor refer to us
if self._pk is None:
req.dbsession.add(self) # ensure we have a PK, part 1
req.dbsession.flush() # ensure we have a PK, part 2
predecessor._set_successor(req, self)
def _set_successor(
self, req: "CamcopsRequest", successor: "GenericTabletRecordMixin"
) -> None:
"""
See :func:`set_predecessor` above.
"""
assert successor._pk is not None
self._current = False
self._when_removed_exact = req.now
self._when_removed_batch_utc = req.now_utc
self._removing_user_id = req.user_id
self._successor_pk = successor._pk
[docs] def mark_as_deleted(self, req: "CamcopsRequest") -> None:
"""
Ends the history chain and marks this record as non-current.
"""
if self._current:
self._when_removed_exact = req.now
self._when_removed_batch_utc = req.now_utc
self._removing_user_id = req.user_id
self._current = False
[docs] def create_fresh(
self, req: "CamcopsRequest", device_id: int, era: str, group_id: int
) -> None:
"""
Used to create a record from scratch.
"""
self._device_id = device_id
self._era = era
self._group_id = group_id
self._current = True
self._when_added_exact = req.now
self._when_added_batch_utc = req.now_utc
self._adding_user_id = req.user_id
[docs] def save_with_next_available_id(
self, req: "CamcopsRequest", device_id: int, era: str = ERA_NOW
) -> None:
"""
Save a record with the next available client pk in sequence.
This is of use when creating patients and ID numbers on the server
to ensure uniqueness, or when fixing up a missing ID number for
a patient created on a device.
"""
cls = self.__class__
saved_ok = False
# MySql doesn't support "select for update" so we have to keep
# trying the next available ID and checking for an integrity
# error in case another user has grabbed it by the time we have
# committed
# noinspection PyProtectedMember
last_id = (
req.dbsession
# func.max(cls.id) + 1 here will do the right thing for
# backends that support select for update (maybe not for no rows)
.query(func.max(cls.id))
.filter(cls._device_id == device_id)
.filter(cls._era == era)
.scalar()
) or 0
next_id = last_id + 1
while not saved_ok:
self.id = next_id
req.dbsession.add(self)
try:
req.dbsession.flush()
saved_ok = True
except IntegrityError:
req.dbsession.rollback()
next_id += 1
# -------------------------------------------------------------------------
# Override this if you provide summaries
# -------------------------------------------------------------------------
# noinspection PyMethodMayBeStatic
[docs] def get_summaries(self, req: "CamcopsRequest") -> List["SummaryElement"]:
"""
Return a list of :class:`SummaryElement` objects, for this database
object (not any dependent classes/tables).
Note that this is implemented on :class:`GenericTabletRecordMixin`,
not :class:`camcops_server.cc_modules.cc_task.Task`, so that ancillary
objects can also provide summaries.
"""
return []
[docs] def get_summary_names(self, req: "CamcopsRequest") -> List[str]:
"""
Returns a list of summary field names.
"""
return [x.name for x in self.get_summaries(req)]
# =============================================================================
# Relationships
# =============================================================================
[docs]def ancillary_relationship(
parent_class_name: str,
ancillary_class_name: str,
ancillary_fk_to_parent_attr_name: str,
ancillary_order_by_attr_name: str = None,
read_only: bool = True,
) -> RelationshipProperty:
"""
Implements a one-to-many relationship, i.e. one parent to many ancillaries.
"""
parent_pk_attr_name = "id" # always
return relationship(
ancillary_class_name,
primaryjoin=(
"and_("
" remote({a}.{fk}) == foreign({p}.{pk}), "
" remote({a}._device_id) == foreign({p}._device_id), "
" remote({a}._era) == foreign({p}._era), "
" remote({a}._current) == True "
")".format(
a=ancillary_class_name,
fk=ancillary_fk_to_parent_attr_name,
p=parent_class_name,
pk=parent_pk_attr_name,
)
),
uselist=True,
order_by="{a}.{f}".format(
a=ancillary_class_name, f=ancillary_order_by_attr_name
),
viewonly=read_only,
info={RelationshipInfo.IS_ANCILLARY: True},
# ... "info" is a user-defined dictionary; see
# https://docs.sqlalchemy.org/en/latest/orm/relationship_api.html#sqlalchemy.orm.relationship.params.info # noqa
# https://docs.sqlalchemy.org/en/latest/orm/internals.html#MapperProperty.info # noqa
)
# =============================================================================
# Field creation assistance
# =============================================================================
# TypeEngineBase = TypeVar('TypeEngineBase', bound=TypeEngine)
[docs]def add_multiple_columns( # type: ignore[no-untyped-def]
cls: Type,
prefix: str,
start: int,
end: int,
coltype=Integer,
# this type fails: Union[Type[TypeEngineBase], TypeEngine]
# ... https://stackoverflow.com/questions/38106227
# ... https://github.com/python/typing/issues/266
colkwargs: Dict[str, Any] = None,
comment_fmt: str = None,
comment_strings: List[str] = None,
minimum: Union[int, float] = None,
maximum: Union[int, float] = None,
pv: Sequence[Any] = None,
suffix: str = "",
) -> None:
"""
Add a sequence of SQLAlchemy columns to a class.
Called via __init_subclass__() on the base class.
Used to make task creation a bit easier.
Args:
cls:
class to which to add columns
prefix:
Fieldname will be ``prefix + str(n) + suffix``, where ``n`` is
defined as below.
suffix:
Optional. See ``prefix``.
start:
Start of range.
end:
End of range. Thus: ``i`` will range from ``0`` to ``(end -
start)`` inclusive; ``n`` will range from ``start`` to ``end``
inclusive.
coltype:
SQLAlchemy column type, in either of these formats: (a)
``Integer`` (of general type ``Type[TypeEngine]``?); (b)
``Integer()`` (of general type ``TypeEngine``).
colkwargs:
SQLAlchemy column arguments, as in
``Column(name, coltype, **colkwargs)``
comment_fmt:
Format string defining field comments. Substitutable
values are:
- ``{n}``: field number (from range).
- ``{s}``: ``comment_strings[i]``, where ``i`` is a zero-based
index as defined as above, or "" if out of range.
comment_strings:
see ``comment_fmt``
minimum:
minimum permitted value, or ``None``
maximum:
maximum permitted value, or ``None``
pv:
list of permitted values, or ``None``
"""
colkwargs = {} if colkwargs is None else colkwargs # type: Dict[str, Any]
comment_strings = comment_strings or []
for n in range(start, end + 1):
nstr = str(n)
i = n - start
colname = prefix + nstr + suffix
if comment_fmt:
s = ""
if 0 <= i < len(comment_strings):
s = comment_strings[i] or ""
colkwargs["comment"] = comment_fmt.format(n=n, s=s)
if minimum is not None or maximum is not None or pv is not None:
colkwargs[COLATTR_PERMITTED_VALUE_CHECKER] = PermittedValueChecker(
minimum=minimum, maximum=maximum, permitted_values=pv
)
setattr(
cls, colname, camcops_column(colname, coltype, **colkwargs)
)
else:
setattr(cls, colname, Column(colname, coltype, **colkwargs))
# =============================================================================
# TaskDescendant
# =============================================================================
[docs]class TaskDescendant(object):
"""
Information mixin for sub-tables that can be traced back to a class. Used
to denormalize the database for export in some circumstances.
Not used for the Blob class, which has no reasonable way of tracing itself
back to a given task if it is used by a task's ancillary tables rather than
a primary task row.
"""
[docs] @classmethod
def task_ancestor_class(cls) -> Optional[Type["Task"]]:
"""
Returns the class of the ancestral task.
If the descendant can descend from lots of types of task (rare; only
applies to :class:`camcops_server.cc_modules.cc_blob.Blob` and
:class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable`),
returns ``None``.
"""
raise NotImplementedError
[docs] @classmethod
def task_ancestor_might_have_patient(cls) -> bool:
"""
Does this object have a single task ancestor, that is not anonymous?
"""
taskcls = cls.task_ancestor_class()
if not taskcls:
return True # e.g. Blob, ExtraSummaryTable
return not taskcls.is_anonymous
[docs] def task_ancestor_server_pk(self) -> Optional[int]:
"""
Returns the server PK of the ancestral task.
Note that this is an export-time calculation; the client may update its
task rows without updating its descendant rows (so server PKs change
whilst client IDs don't).
"""
task = self.task_ancestor()
if not task:
return None
return task.pk
[docs] def task_ancestor(self) -> Optional["Task"]:
"""
Returns the specific ancestor task of this object.
"""
raise NotImplementedError
[docs] def task_ancestor_patient(self) -> Optional["Patient"]:
"""
Returns the associated patient, if there is one.
"""
task = self.task_ancestor()
return task.patient if task else None
[docs] @classmethod
def extra_task_xref_columns(cls) -> List[Column]:
"""
Returns extra columns used to cross-reference this
:class:`TaskDescendant` to its ancestor task, in certain export
formats (``DB_PATIENT_ID_PER_ROW``).
"""
return [
Column(
EXTRA_TASK_TABLENAME_FIELD,
TableNameColType,
comment=EXTRA_COMMENT_PREFIX + "Table name of ancestor task",
),
Column(
EXTRA_TASK_SERVER_PK_FIELD,
Integer,
comment=EXTRA_COMMENT_PREFIX + "Server PK of ancestor task",
),
]