Source code for camcops_server.cc_modules.cc_dump

"""
camcops_server/cc_modules/cc_dump.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Methods for providing a dump of data from the server to the web user.**

"""

import logging
from typing import (
    Any,
    Dict,
    Generator,
    Iterable,
    List,
    Optional,
    Set,
    Tuple,
    Type,
    TYPE_CHECKING,
    Union,
)

from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.sqlalchemy.orm_inspect import (
    gen_columns,
    gen_orm_classes_from_base,
    walk_orm_tree,
)
from sqlalchemy.exc import CompileError
from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import Session as SqlASession
from sqlalchemy.sql.schema import Column, MetaData, Table

from camcops_server.cc_modules.cc_blob import Blob
from camcops_server.cc_modules.cc_db import (
    GenericTabletRecordMixin,
    TaskDescendant,
)
from camcops_server.cc_modules.cc_device import Device
from camcops_server.cc_modules.cc_email import Email
from camcops_server.cc_modules.cc_exportmodels import (
    ExportedTask,
    ExportedTaskEmail,
    ExportedTaskFileGroup,
    ExportedTaskHL7Message,
)
from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
from camcops_server.cc_modules.cc_group import Group, group_group_table
from camcops_server.cc_modules.cc_membership import UserGroupMembership
from camcops_server.cc_modules.cc_patient import Patient
from camcops_server.cc_modules.cc_patientidnum import (
    all_extra_id_columns,
    PatientIdNum,
)
from camcops_server.cc_modules.cc_sqla_coltypes import CamcopsColumn
from camcops_server.cc_modules.cc_task import Task
from camcops_server.cc_modules.cc_user import User

if TYPE_CHECKING:
    from camcops_server.cc_modules.cc_request import CamcopsRequest
    from camcops_server.cc_modules.cc_summaryelement import ExtraSummaryTable
    from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Constants
# =============================================================================

# Restrict specified tables to certain columns only:
DUMP_ONLY_COLNAMES = {  # mapping of tablename : list_of_column_names
    Device.__tablename__: ["camcops_version", "friendly_name", "id", "name"],
    User.__tablename__: ["fullname", "id", "username"],
}
# Drop specific columns from certain tables:
DUMP_DROP_COLNAMES = {}  # mapping of tablename : list_of_column_names
# List of columns to be skipped regardless of table:
DUMP_SKIP_COLNAMES = [
    # We restrict to current records only, so many of these are irrelevant:
    "_addition_pending",
    "_forcibly_preserved",
    "_manually_erased",
    "_manually_erased_at",
    "_manually_erasing_user_id",
    "_move_off_tablet",
    "_removal_pending",
    "_removing_user_id",
    "_successor_pk",
    "_when_removed_batch_utc",
    "_when_removed_exact",
]
DUMP_SKIP_RELNAMES = [
    # List of *relationship* names to ignore
    "_manually_erasing_user",
    "_removing_user",
]
# List of table names to be skipped at all times:
DUMP_SKIP_TABLES = [
    # We don't have to list all admin tables here; we process the dump starting
    # with tasks, so only things that have ORM relationships to a task might
    # feature. (The Email/ExportedTask* set don't, so this is just caution in
    # case we add a relationship later!)
    Email.__tablename__,
    ExportedTask.__tablename__,
    ExportedTaskEmail.__tablename__,
    ExportedTaskFileGroup.__tablename__,
    ExportedTaskHL7Message.__tablename__,
    ExportRecipient.__tablename__,
    group_group_table.name,
    UserGroupMembership.__tablename__,
]
# Tables for which no relationships will be traversed:
DUMP_SKIP_ALL_RELS_FOR_TABLES = [Group.__tablename__]
FOREIGN_KEY_CONSTRAINTS_IN_DUMP = False
# ... the keys will be present, but should we try to enforce constraints?


# =============================================================================
# Handy place to hold the controlling information
# =============================================================================


[docs]class DumpController(object): """ A controller class that manages the copying (dumping) of information from our database to another SQLAlchemy :class:`Engine`/:class:`Session`. """
[docs] def __init__( self, dst_engine: Engine, dst_session: SqlASession, export_options: "TaskExportOptions", req: "CamcopsRequest", ) -> None: """ Args: dst_engine: destination SQLAlchemy Engine dst_session: destination SQLAlchemy Session export_options: :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions` req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ # noqa self.dst_engine = dst_engine self.dst_session = dst_session self.export_options = export_options self.req = req # We start with blank metadata. self.dst_metadata = MetaData() # Tables we are inserting into the destination database: self.dst_tables = {} # type: Dict[str, Table] # ... note that creating a Table() for a given SQLAlchemy metadata is # permitted only once, so we add to self.dst_tables as soon # as we create that. # Tables we've created: self.tablenames_created = set() # type: Set[str] # Tables we've processed, though we may ignore them: self.tablenames_seen = set() # type: Set[str] # ORM objects we've visited: self.instances_seen = set() # type: Set[object] if export_options.db_make_all_tables_even_empty: self._create_all_dest_tables()
def _create_all_dest_tables(self) -> None: """ Creates all tables in the destination database, even ones that may not be used. """ log.debug("Creating all destination tables...") for table in self.gen_all_dest_tables(): self._create_dest_table(table) log.debug("... all destination tables created.")
[docs] def gen_all_dest_tables(self) -> Generator[Table, None, None]: """ Generates all destination tables. """ tablenames_seen = set() # type: Set[str] for cls in gen_orm_classes_from_base( GenericTabletRecordMixin ): # type: Type[GenericTabletRecordMixin] instance = cls() for table in self.gen_all_dest_tables_for_obj(instance): if table.name in tablenames_seen: continue tablenames_seen.add(table.name) yield table
[docs] def gen_all_dest_tables_for_obj( self, src_obj: object ) -> Generator[Table, None, None]: """ Generates all destination tables for an object. """ # Main table yield self.get_dest_table_for_src_object(src_obj) # Additional tables if isinstance(src_obj, Task): add_extra_id_cols = ( self.export_options.db_patient_id_in_each_row and not src_obj.is_anonymous ) estables = src_obj.get_all_summary_tables(self.req) for est in estables: yield self.get_dest_table_for_est( est, add_extra_id_cols=add_extra_id_cols )
[docs] def gen_all_dest_columns( self, ) -> Generator[Union[Column, CamcopsColumn], None, None]: """ Generates all destination columns. """ for table in self.gen_all_dest_tables(): for col in table.columns: yield col
[docs] def consider_object(self, src_obj: object) -> None: """ Think about an SQLAlchemy ORM object. If it comes from a table we want dumped, add this object to the dump. """ # noinspection PyUnresolvedReferences src_table = src_obj.__table__ # type: Table src_tablename = src_table.name if src_tablename not in self.tablenames_seen: # If we encounter a table we've not seen, offer our "table decider" # the opportunity to add it to the metadata and create the table. self._add_dump_table_for_src_object(src_obj) # If this table is going into the destination, copy the object # (and maybe remove columns from it, or add columns to it). if src_tablename in self.dst_tables and not self._dump_skip_table( src_tablename ): self._copy_object_to_dump(src_obj)
@staticmethod def _merits_extra_id_num_columns( obj: object, ) -> Tuple[bool, Optional[Patient]]: """ Is the source object one that would support the addition of extra ID number information if the export option ``DB_PATIENT_ID_PER_ROW`` is set? If so, return the relevant patient. Args: obj: an SQLAlchemy ORM object Returns: tuple: ``(merits, patient)``, where ``merits`` is a ``bool`` (does it merit this?) and ``patient`` is a relevant :class:`camcops_server.cc_modules.cc_patient.Patient``, if found. It is also guaranteed that if a patient is returned, ``merits`` is ``True`` (but not guaranteed that if ``merits`` is true, that ``patient`` is not ``None``). """ if not isinstance(obj, GenericTabletRecordMixin): # Must be data that originated from the client. return False, None if isinstance(obj, PatientIdNum): # PatientIdNum already has this info. return False, None if isinstance(obj, Patient): return True, obj if isinstance(obj, Task): if obj.is_anonymous: # Anonymous tasks don't. return False, None return True, obj.patient if isinstance(obj, TaskDescendant): merits = obj.task_ancestor_might_have_patient() patient = obj.task_ancestor_patient() return merits, patient log.warning( f"_merits_extra_id_num_columns_if_requested: don't know " f"how to handle {obj!r}" ) return False, None
[docs] def get_dest_table_for_src_object(self, src_obj: object) -> Table: """ Produces the destination table for the source object. Args: src_obj: An SQLAlchemy ORM object. It will *not* be a :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable`; those are handled instead by :meth:`_get_or_insert_summary_table`. Returns: an SQLAlchemy :class:`Table` """ # noinspection PyUnresolvedReferences src_table = src_obj.__table__ # type: Table tablename = src_table.name # Don't create it twice in the SQLAlchemy metadata. if tablename in self.dst_tables: return self.dst_tables[tablename] # Copy columns, dropping any we don't want, and dropping FK constraints dst_columns = [] # type: List[Column] for src_column in src_table.columns: # log.debug("trying {!r}", src_column.name) if self._dump_skip_column(tablename, src_column.name): # log.debug("... skipping {!r}", src_column.name) continue # You can't add the source column directly; you get # "sqlalchemy.exc.ArgumentError: Column object 'ccc' already # assigned to Table 'ttt'" copied_column = src_column.copy() copied_column.comment = src_column.comment # ... see SQLAlchemy trivial bug: # https://bitbucket.org/zzzeek/sqlalchemy/issues/4087/columncopy-doesnt-copy-comment-attribute # noqa if FOREIGN_KEY_CONSTRAINTS_IN_DUMP: copied_column.foreign_keys = set( fk.copy() for fk in src_column.foreign_keys ) log.warning( "NOT WORKING: foreign key commands not being " "emitted" ) # but # https://docs.sqlalchemy.org/en/latest/core/constraints.html # works fine under SQLite, even if the other table hasn't been # created yet. Does the table to which the FK refer have to be # in the metadata already? # That's quite possible, but I've not checked. # Would need to iterate through tables in dependency order, # like merge_db() does. else: # Probably blank already, as the copy() command only copies # non-constraint-bound ForeignKey objects, but to be sure: copied_column.foreign_keys = set() # ... type is: Set[ForeignKey] # if src_column.foreign_keys: # log.debug("Column {}, FKs {!r} -> {!r}", src_column.name, # src_column.foreign_keys, # copied_column.foreign_keys) dst_columns.append(copied_column) # Add extra columns? if self.export_options.db_include_summaries: if isinstance(src_obj, GenericTabletRecordMixin): for summary_element in src_obj.get_summaries(self.req): dst_columns.append( CamcopsColumn( summary_element.name, summary_element.coltype, exempt_from_anonymisation=True, comment=summary_element.decorated_comment, ) ) if self.export_options.db_patient_id_in_each_row: merits, _ = self._merits_extra_id_num_columns(src_obj) if merits: dst_columns.extend(all_extra_id_columns(self.req)) if isinstance(src_obj, TaskDescendant): dst_columns += src_obj.extra_task_xref_columns() dst_table = Table(tablename, self.dst_metadata, *dst_columns) # ... that modifies the metadata, so: self.dst_tables[tablename] = dst_table return dst_table
[docs] def get_dest_table_for_est( self, est: "ExtraSummaryTable", add_extra_id_cols: bool = False ) -> Table: """ Add an additional summary table to the dump, if it's not there already. Return the table (from the destination database). Args: est: a :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable` add_extra_id_cols: Add extra ID columns, for the ``DB_PATIENT_ID_PER_ROW`` export option? """ tablename = est.tablename if tablename in self.dst_tables: return self.dst_tables[tablename] columns = est.columns.copy() if add_extra_id_cols: columns.extend(all_extra_id_columns(self.req)) columns.extend(est.extra_task_xref_columns()) table = Table(tablename, self.dst_metadata, *columns) # ... that modifies the metadata, so: self.dst_tables[tablename] = table return table
def _add_dump_table_for_src_object(self, src_obj: object) -> None: """ - Mark the object's table as seen. - If we want it, add it to the metadata and execute a CREATE TABLE command. - We may translate the table en route. Args: src_obj: An SQLAlchemy ORM object. It will *not* be a :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable`; those are handled instead by :meth:`_get_or_insert_summary_table`. """ # noinspection PyUnresolvedReferences src_table = src_obj.__table__ # type: Table tablename = src_table.name self.tablenames_seen.add(tablename) # Skip the table? if self._dump_skip_table(tablename): return # Get the table definition dst_table = self.get_dest_table_for_src_object(src_obj) # Create it self._create_dest_table(dst_table) def _create_dest_table(self, dst_table: Table) -> None: """ Creates a table in the destination database. """ tablename = dst_table.name if tablename in self.tablenames_created: return # don't create it twice # Create the table # log.debug("Adding table {!r} to dump output", tablename) # You have to use an engine, not a session, to create tables (or you # get "AttributeError: 'Session' object has no attribute # '_run_visitor'"). # However, you have to commit the session, or you get # "sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) # database is locked", since a session is also being used. self.dst_session.commit() dst_table.create(self.dst_engine) self.tablenames_created.add(tablename) def _copy_object_to_dump(self, src_obj: object) -> None: """ Copy the SQLAlchemy ORM object to the dump. """ # noinspection PyUnresolvedReferences src_table = src_obj.__table__ # type: Table adding_extra_ids = False patient = None # type: Optional[Patient] if self.export_options.db_patient_id_in_each_row: adding_extra_ids, patient = self._merits_extra_id_num_columns( src_obj ) # 1. Insert row for this object, potentially adding and removing # columns. tablename = src_table.name dst_table = self.dst_tables[tablename] assert dst_table.name == tablename row = {} # type: Dict[str, Any] # Copy columns, skipping any we don't want for attrname, column in gen_columns(src_obj): if self._dump_skip_column(tablename, column.name): continue row[column.name] = getattr(src_obj, attrname) # Any other columns to add for this table? if isinstance(src_obj, GenericTabletRecordMixin): if self.export_options.db_include_summaries: for summary_element in src_obj.get_summaries(self.req): row[summary_element.name] = summary_element.value if adding_extra_ids: if patient: patient.add_extra_idnum_info_to_row(row) if isinstance(src_obj, TaskDescendant): src_obj.add_extra_task_xref_info_to_row(row) try: self.dst_session.execute(dst_table.insert(row)) except CompileError: log.critical("\ndst_table:\n{}\nrow:\n{}", dst_table, row) raise # 2. If required, add extra tables/rows that this task wants to # offer (usually tables whose rows don't have a 1:1 correspondence # to the task or its ancillary objects). if isinstance(src_obj, Task): estables = src_obj.get_all_summary_tables(self.req) # ... includes SNOMED for est in estables: dst_summary_table = self._get_or_insert_summary_table( est, add_extra_id_cols=adding_extra_ids ) for row in est.rows: if patient: patient.add_extra_idnum_info_to_row(row) if adding_extra_ids: est.add_extra_task_xref_info_to_row(row) try: self.dst_session.execute(dst_summary_table.insert(row)) except CompileError: log.critical( "\ndst_summary_table:\n{}\nrow:\n{}", dst_table, row, ) raise def _get_or_insert_summary_table( self, est: "ExtraSummaryTable", add_extra_id_cols: bool = False ) -> Table: """ Add an additional summary table to the dump, if it's not there already. Return the table (from the destination database). Args: est: a :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable` add_extra_id_cols: Add extra ID columns, for the ``DB_PATIENT_ID_PER_ROW`` export option? """ tablename = est.tablename if tablename not in self.tablenames_created: table = self.get_dest_table_for_est( est, add_extra_id_cols=add_extra_id_cols ) self._create_dest_table(table) return self.dst_tables[tablename] def _dump_skip_table(self, tablename: str) -> bool: """ Should we skip this table (omit it from the dump)? """ if ( not self.export_options.include_blobs and tablename == Blob.__tablename__ ): return True if tablename in DUMP_SKIP_TABLES: return True return False @staticmethod def _dump_skip_column(tablename: str, columnname: str) -> bool: """ Should we skip this column (omit it from the dump)? """ if columnname in DUMP_SKIP_COLNAMES: return True if ( tablename in DUMP_ONLY_COLNAMES and columnname not in DUMP_ONLY_COLNAMES[tablename] ): return True if ( tablename in DUMP_DROP_COLNAMES and columnname in DUMP_DROP_COLNAMES[tablename] ): return True return False
# ============================================================================= # Copying stuff to a dump # =============================================================================
[docs]def copy_tasks_and_summaries( tasks: Iterable[Task], dst_engine: Engine, dst_session: SqlASession, export_options: "TaskExportOptions", req: "CamcopsRequest", ) -> None: """ Copy a set of tasks, and their associated related information (found by walking the SQLAlchemy ORM tree), to the dump. Args: tasks: tasks to copy dst_engine: destination SQLAlchemy Engine dst_session: destination SQLAlchemy Session export_options: :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions` req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ # noqa # How best to create the structure that's required? # # https://stackoverflow.com/questions/21770829/sqlalchemy-copy-schema-and-data-of-subquery-to-another-database # noqa # https://stackoverflow.com/questions/40155340/sqlalchemy-reflect-and-copy-only-subset-of-existing-schema # noqa # # - Should we attempt to copy the MetaData object? That seems extremely # laborious, since every ORM class is tied to it. Moreover, # MetaData.tables is an immutabledict, so we're not going to be editing # anything. Even if we cloned the MetaData, that's not going to give us # ORM classes to walk. # - Shall we operate at a lower level? That seems sensible. # - Given that... we don't need to translate the PKs at all, unlike # merge_db. # - Let's not create FK constraints explicitly. Most are not achievable # anyway (e.g. linking on device/era; omission of BLOBs). controller = DumpController( dst_engine=dst_engine, dst_session=dst_session, export_options=export_options, req=req, ) # We walk through all the objects. log.debug("Starting to copy tasks...") for startobj in tasks: log.debug("Processing task: {!r}", startobj) for src_obj in walk_orm_tree( startobj, seen=controller.instances_seen, skip_relationships_always=DUMP_SKIP_RELNAMES, skip_all_relationships_for_tablenames=DUMP_SKIP_ALL_RELS_FOR_TABLES, # noqa skip_all_objects_for_tablenames=DUMP_SKIP_TABLES, ): controller.consider_object(src_obj) log.debug("... finished copying tasks.")