Source code for camcops_server.cc_modules.cc_dummy_database

#!/usr/bin/env python

"""
camcops_server/cc_modules/cc_dummy_database.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Functions for dummy database creation for manual testing.**

"""

import logging
import random
from typing import Optional, TYPE_CHECKING

from cardinal_pythonlib.datetimefunc import (
    convert_datetime_to_utc,
    format_datetime,
)
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.nhs import generate_random_nhs_number
from faker import Faker
import pendulum
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.session import sessionmaker
from sqlalchemy.sql.expression import func
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import (
    Boolean,
    Date,
    Float,
    Integer,
    String,
    UnicodeText,
)

from camcops_server.cc_modules.cc_constants import DateFormat
from camcops_server.cc_modules.cc_db import TASK_FREQUENT_AND_FK_FIELDS
from camcops_server.cc_modules.cc_device import Device
from camcops_server.cc_modules.cc_group import Group
from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition
from camcops_server.cc_modules.cc_patient import Patient
from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
from camcops_server.cc_modules.cc_sqla_coltypes import (
    COLATTR_PERMITTED_VALUE_CHECKER,
    PendulumDateTimeAsIsoTextColType,
)

from camcops_server.cc_modules.cc_task import Task
from camcops_server.cc_modules.cc_user import User
from camcops_server.cc_modules.cc_version import CAMCOPS_SERVER_VERSION


if TYPE_CHECKING:
    from sqlalchemy.orm import Session as SqlASession
    from camcops_server.cc_modules.cc_config import CamcopsConfig
    from camcops_server.cc_modules.cc_db import GenericTabletRecordMixin

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# DummyDataInserter
# =============================================================================


[docs]class DummyDataInserter: """ Class to insert random data (within constraints) to tasks and other objects. It does not touch an actual database, so its methods can be used for free-floating items. """ DEFAULT_MIN_FLOAT = 0 DEFAULT_MAX_FLOAT = 1000 DEFAULT_MIN_INTEGER = 0 DEFAULT_MAX_INTEGER = 1000
[docs] def __init__(self) -> None: self.faker = Faker("en_GB")
@staticmethod def column_is_q_field(column: Column) -> bool: if column.name.startswith("_"): return False if column.name in TASK_FREQUENT_AND_FK_FIELDS: # It's that or TASK_FREQUENT_FIELDS. return False return True
[docs] def fill_in_task_fields(self, task: Task) -> None: """ Inserts random data into a task (within any known constraints). """ # noinspection PyUnresolvedReferences for column in task.__table__.columns: if not self.column_is_q_field(column): continue if isinstance(column.type, Integer): self.set_integer_field(task, column) continue if isinstance(column.type, Float): self.set_float_field(task, column) continue if isinstance(column.type, Boolean): self.set_bool_field(task, column) continue if isinstance(column.type, Date): self.set_date_field(task, column) continue if isinstance(column.type, PendulumDateTimeAsIsoTextColType): self.set_datetime_field(task, column) continue if isinstance(column.type, UnicodeText): self.set_unicode_text_field(task, column) if isinstance(column.type, String): # covers String, Text, UnicodeText self.set_string_field(task, column)
def set_integer_field(self, task: Task, column: Column) -> None: setattr(task, column.name, self.get_valid_integer_for_field(column)) def set_float_field(self, task: Task, column: Column) -> None: setattr(task, column.name, self.get_valid_float_for_field(column)) def set_bool_field(self, task: Task, column: Column) -> None: setattr(task, column.name, self.faker.random.choice([False, True])) def set_date_field(self, task: Task, column: Column) -> None: setattr(task, column.name, self.faker.date_object()) def set_datetime_field(self, task: Task, column: Column) -> None: setattr(task, column.name, self.faker.date_time()) def set_unicode_text_field(self, task: Task, column: Column) -> None: setattr(task, column.name, self.faker.text()) def set_string_field(self, task: Task, column: Column) -> None: setattr(task, column.name, self.get_valid_string_for_field(column)) def get_valid_integer_for_field(self, column: Column) -> int: min_value = self.DEFAULT_MIN_INTEGER max_value = self.DEFAULT_MAX_INTEGER value_checker = getattr(column, COLATTR_PERMITTED_VALUE_CHECKER, None) if value_checker is not None: if value_checker.permitted_values is not None: return self.faker.random.choice(value_checker.permitted_values) if value_checker.minimum is not None: min_value = value_checker.minimum if value_checker.maximum is not None: max_value = value_checker.maximum return self.faker.random.randint(min_value, max_value) def get_valid_float_for_field(self, column: Column) -> float: min_value = self.DEFAULT_MIN_FLOAT max_value = self.DEFAULT_MAX_FLOAT value_checker = getattr(column, COLATTR_PERMITTED_VALUE_CHECKER, None) if value_checker is not None: if value_checker.permitted_values is not None: return self.faker.random.choice(value_checker.permitted_values) if value_checker.minimum is not None: min_value = value_checker.minimum if value_checker.maximum is not None: max_value = value_checker.maximum return self.faker.random.uniform(min_value, max_value) def get_valid_string_for_field(self, column: Column) -> str: value_checker = getattr(column, COLATTR_PERMITTED_VALUE_CHECKER, None) if value_checker is not None: if value_checker.permitted_values is not None: return self.faker.random.choice(value_checker.permitted_values) text = self.faker.text() if column.type.length is None: return text return text[: column.type.length]
# ============================================================================= # DummyDataFactory # =============================================================================
[docs]class DummyDataFactory(DummyDataInserter): """ Factory to insert random data (within constraints) to tasks and other objects in a dummy database. Unlike its parent, this concerns itself with an actual data. """ FIRST_PATIENT_ID = 10001 NUM_PATIENTS = 5
[docs] def __init__(self, cfg: "CamcopsConfig") -> None: super().__init__() engine = cfg.get_sqla_engine() self.dbsession = sessionmaker()(bind=engine) # type: SqlASession self.era_time = pendulum.now() self.era_time_utc = convert_datetime_to_utc(self.era_time) self.era = format_datetime(self.era_time, DateFormat.ISO8601) self.group = None # type: Optional[Group] self.user = None # type: Optional[User] self.device = None # type: Optional[Device] self.nhs_iddef = None # type: Optional[IdNumDefinition]
def add_data(self) -> None: # noinspection PyTypeChecker next_id = self.next_id(Group.id) self.group = Group() self.group.name = f"dummygroup{next_id}" self.group.description = "Dummy group" self.group.upload_policy = "sex AND anyidnum" self.group.finalize_policy = "sex AND idnum1001" self.dbsession.add(self.group) self.dbsession.commit() # sets PK fields self.user = User.get_system_user(self.dbsession) self.user.upload_group_id = self.group.id self.device = self.get_device(self.dbsession) self.dbsession.commit() self.nhs_iddef = IdNumDefinition( which_idnum=1001, description="NHS number (TEST)", short_description="NHS#", hl7_assigning_authority="NHS", hl7_id_type="NHSN", ) self.dbsession.add(self.nhs_iddef) try: self.dbsession.commit() except IntegrityError: self.dbsession.rollback() for patient_id in range( self.FIRST_PATIENT_ID, self.FIRST_PATIENT_ID + self.NUM_PATIENTS ): Faker.seed(patient_id) self.add_patient(patient_id) log.info(f"Adding tasks for patient {patient_id}") Faker.seed() self.add_tasks(patient_id) # noinspection PyMethodMayBeStatic def get_device(self, dbsession: "SqlASession") -> "Device": dummy_device_name = "dummy_device" device = Device.get_device_by_name(dbsession, dummy_device_name) if device is None: device = Device() device.name = dummy_device_name device.friendly_name = "Dummy tablet device" device.registered_by_user = User.get_system_user(dbsession) device.when_registered_utc = pendulum.DateTime.utcnow() device.camcops_version = CAMCOPS_SERVER_VERSION dbsession.add(device) dbsession.flush() # So that we can use the PK elsewhere return device def add_patient(self, patient_id: int) -> Patient: log.info(f"Adding patient {patient_id}") patient = Patient() patient.id = patient_id self.apply_standard_db_fields(patient) patient.sex = self.faker.random.choices( ["M", "F", "X"], weights=[49.8, 49.8, 0.4] )[0] if patient.sex == "M": patient.forename = self.faker.first_name_male() elif patient.sex == "F": patient.forename = self.faker.first_name_female() else: patient.forename = self.faker.first_name()[:1] patient.surname = self.faker.last_name() # Faker date_of_birth calculates from the current time so gives # different results on different days. By fixing the dates we get # consistent results but our population ages over time. patient.dob = self.faker.date_between_dates( date_start=pendulum.date(1900, 1, 1), date_end=pendulum.date(2020, 1, 1), ) self.dbsession.add(patient) self.add_patient_idnum(patient_id) self.dbsession.commit() return patient # noinspection PyTypeChecker def add_patient_idnum(self, patient_id: int) -> None: next_id = self.next_id(PatientIdNum.id) patient_idnum = PatientIdNum() patient_idnum.id = next_id self.apply_standard_db_fields(patient_idnum) patient_idnum.patient_id = patient_id patient_idnum.which_idnum = self.nhs_iddef.which_idnum # Always create the same NHS number for each patient. # Uses a different random object to faker. # Restores the master RNG state afterwards. old_random_state = random.getstate() random.seed(patient_id) patient_idnum.idnum_value = generate_random_nhs_number() random.setstate(old_random_state) self.dbsession.add(patient_idnum) def add_tasks(self, patient_id: int): for cls in Task.all_subclasses_by_tablename(): task = cls() task.id = self.next_id(cls.id) self.apply_standard_task_fields(task) if task.has_patient: task.patient_id = patient_id self.fill_in_task_fields(task) self.dbsession.add(task) self.dbsession.commit() def next_id(self, column: Column) -> int: max_id = self.dbsession.query(func.max(column)).scalar() if max_id is None: return 1 return max_id + 1
[docs] def apply_standard_task_fields(self, task: Task) -> None: """ Writes some default values to an SQLAlchemy ORM object representing a task. """ self.apply_standard_db_fields(task) task.when_created = self.era_time
[docs] def apply_standard_db_fields( self, obj: "GenericTabletRecordMixin" ) -> None: """ Writes some default values to an SQLAlchemy ORM object representing a record uploaded from a client (tablet) device. """ obj._device_id = self.device.id obj._era = self.era obj._group_id = self.group.id obj._current = True obj._adding_user_id = self.user.id obj._when_added_batch_utc = self.era_time_utc