Source code for camcops_server.cc_modules.cc_taskfilter

#!/usr/bin/env python

"""
camcops_server/cc_modules/cc_taskfilter.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Representation of filtering criteria for tasks.**

"""

import datetime
from enum import Enum
import logging
from typing import Dict, List, Optional, Type, TYPE_CHECKING, Union

from cardinal_pythonlib.datetimefunc import convert_datetime_to_utc
from cardinal_pythonlib.json.serialize import register_class_for_json
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.reprfunc import auto_repr
from cardinal_pythonlib.sqlalchemy.list_types import (
    IntListType,
    StringListType,
)
from pendulum import DateTime as Pendulum
from sqlalchemy.orm import Query, reconstructor
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.expression import and_, or_
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import Boolean, Date, Integer

from camcops_server.cc_modules.cc_cache import cache_region_static, fkg
from camcops_server.cc_modules.cc_device import Device
from camcops_server.cc_modules.cc_group import Group
from camcops_server.cc_modules.cc_patient import Patient
from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
from camcops_server.cc_modules.cc_sqla_coltypes import (
    PendulumDateTimeAsIsoTextColType,
    IdNumReferenceListColType,
    PatientNameColType,
    SexColType,
)
from camcops_server.cc_modules.cc_sqlalchemy import Base
from camcops_server.cc_modules.cc_task import (
    tablename_to_task_class_dict,
    Task,
)
from camcops_server.cc_modules.cc_taskindex import PatientIdNumIndexEntry
from camcops_server.cc_modules.cc_user import User

if TYPE_CHECKING:
    from sqlalchemy.sql.elements import ColumnElement
    from camcops_server.cc_modules.cc_request import CamcopsRequest
    from camcops_server.cc_modules.cc_simpleobjects import IdNumReference

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Sorting helpers
# =============================================================================


[docs]class TaskClassSortMethod(Enum): """ Enum to represent ways to sort task types (classes). """ NONE = 0 TABLENAME = 1 SHORTNAME = 2 LONGNAME = 3
[docs]def sort_task_classes_in_place( classlist: List[Type[Task]], sortmethod: TaskClassSortMethod, req: "CamcopsRequest" = None, ) -> None: """ Sort a list of task classes in place. Args: classlist: the list of task classes sortmethod: a :class:`TaskClassSortMethod` enum req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ if sortmethod == TaskClassSortMethod.TABLENAME: classlist.sort(key=lambda c: c.tablename) elif sortmethod == TaskClassSortMethod.SHORTNAME: classlist.sort(key=lambda c: c.shortname) elif sortmethod == TaskClassSortMethod.LONGNAME: assert req is not None classlist.sort(key=lambda c: c.longname(req))
# ============================================================================= # Cache task class mapping # ============================================================================= # Function, staticmethod, classmethod? # https://stackoverflow.com/questions/8108688/in-python-when-should-i-use-a-function-instead-of-a-method # noqa # https://stackoverflow.com/questions/11788195/module-function-vs-staticmethod-vs-classmethod-vs-no-decorators-which-idiom-is # noqa # https://stackoverflow.com/questions/15017734/using-static-methods-in-python-best-practice # noqa
[docs]def task_classes_from_table_names( tablenames: List[str], sortmethod: TaskClassSortMethod = TaskClassSortMethod.NONE, ) -> List[Type[Task]]: """ Transforms a list of task base tablenames into a list of task classes, appropriately sorted. Args: tablenames: list of task base table names sortmethod: a :class:`TaskClassSortMethod` enum Returns: a list of task classes, in the order requested Raises: :exc:`KeyError` if a table name is invalid """ assert sortmethod != TaskClassSortMethod.LONGNAME d = tablename_to_task_class_dict() classes = [] # type: List[Type[Task]] for tablename in tablenames: cls = d[tablename] classes.append(cls) sort_task_classes_in_place(classes, sortmethod) return classes
[docs]@cache_region_static.cache_on_arguments(function_key_generator=fkg) def all_tracker_task_classes() -> List[Type[Task]]: """ Returns a list of all task classes that provide tracker information. """ return [ cls for cls in Task.all_subclasses_by_shortname() if cls.provides_trackers ]
# ============================================================================= # Define a filter to apply to tasks # =============================================================================
[docs]class TaskFilter(Base): """ SQLAlchemy ORM object representing task filter criteria. """ __tablename__ = "_task_filters" # Lots of these could be changed into lists; for example, filtering to # multiple devices, multiple users, multiple text patterns. For # AND-joining, there is little clear benefit (one could always AND-join # multiple filters with SQL). For OR-joining, this is more useful. # - surname: use ID numbers instead; not very likely to have >1 surname # - forename: ditto # - DOB: ditto # - sex: just eliminate the filter if you don't care about sex # - task_types: needs a list # - device_id: might as well make it a list # - user_id: might as well make it a list # - group_id: might as well make it a list # - start_datetime: single only # - end_datetime: single only # - text_contents: might as well make it a list # - ID numbers: a list, joined with OR. id = Column( "id", Integer, primary_key=True, autoincrement=True, index=True, comment="Task filter ID (arbitrary integer)", ) # Task type filters task_types = Column( "task_types", StringListType, comment="Task filter: task type(s), as CSV list of table names", ) tasks_offering_trackers_only = Column( "tasks_offering_trackers_only", Boolean, comment="Task filter: restrict to tasks offering trackers only?", ) tasks_with_patient_only = Column( "tasks_with_patient_only", Boolean, comment="Task filter: restrict to tasks with a patient (non-anonymous " "tasks) only?", ) # Patient-related filters surname = Column( "surname", PatientNameColType, comment="Task filter: surname" ) forename = Column( "forename", PatientNameColType, comment="Task filter: forename" ) dob = Column( "dob", Date, comment="Task filter: DOB" ) # type: Optional[datetime.date] sex = Column("sex", SexColType, comment="Task filter: sex") idnum_criteria = Column( # new in v2.0.1 "idnum_criteria", IdNumReferenceListColType, comment="ID filters as JSON; the ID number definitions are joined " "with OR", ) # Other filters device_ids = Column( "device_ids", IntListType, comment="Task filter: source device ID(s), as CSV", ) adding_user_ids = Column( "user_ids", IntListType, comment="Task filter: adding (uploading) user ID(s), as CSV", ) group_ids = Column( "group_ids", IntListType, comment="Task filter: group ID(s), as CSV" ) start_datetime = Column( "start_datetime_iso8601", PendulumDateTimeAsIsoTextColType, comment="Task filter: start date/time (UTC as ISO8601)", ) # type: Union[None, Pendulum, datetime.datetime] end_datetime = Column( "end_datetime_iso8601", PendulumDateTimeAsIsoTextColType, comment="Task filter: end date/time (UTC as ISO8601)", ) # type: Union[None, Pendulum, datetime.datetime] # Implemented on the Python side for indexed lookup: text_contents = Column( "text_contents", StringListType, comment="Task filter: filter text fields", ) # task must contain ALL the strings in AT LEAST ONE of its text columns # Implemented on the Python side for non-indexed lookup: complete_only = Column( "complete_only", Boolean, comment="Task filter: task complete?" ) def __init__(self) -> None: # We need to initialize these explicitly, because if we create an # instance via "x = TaskFilter()", they will be initialized to None, # without any recourse to our database to-and-fro conversion code for # each fieldtype. # (If we load from a database, things will be fine.) self.idnum_criteria = [] # type: List[IdNumReference] self.device_ids = [] # type: List[int] self.adding_user_ids = [] # type: List[int] self.group_ids = [] # type: List[int] self.text_contents = [] # type: List[str] # ANYTHING YOU ADD BELOW HERE MUST ALSO BE IN init_on_load(). # Or call it, of course, but we like to keep on the happy side of the # PyCharm type checker. # Python-only filtering attributes (i.e. not saved to database) self.era = None # type: Optional[str] self.finalized_only = False # used for exports self.must_have_idnum_type = None # type: Optional[int] # Other Python-only attributes self._sort_method = TaskClassSortMethod.NONE self._task_classes = None # type: Optional[List[Type[Task]]]
[docs] @reconstructor def init_on_load(self): """ SQLAlchemy function to recreate after loading from the database. """ self.era = None # type: Optional[str] self.finalized_only = False self.must_have_idnum_type = None # type: Optional[int] self._sort_method = TaskClassSortMethod.NONE self._task_classes = None # type: Optional[List[Type[Task]]]
def __repr__(self) -> str: return auto_repr(self, with_addr=True)
[docs] def set_sort_method(self, sort_method: TaskClassSortMethod) -> None: """ Sets the sorting method for task types. """ assert sort_method != TaskClassSortMethod.LONGNAME, ( "If you want to use that sorting method, you need to save a " "request object, because long task names use translation" ) self._sort_method = sort_method
@property def task_classes(self) -> List[Type[Task]]: """ Return a list of task classes permitted by the filter. Uses caching, since the filter will be called repeatedly. """ if self._task_classes is None: self._task_classes = [] # type: List[Type[Task]] if self.task_types: starting_classes = task_classes_from_table_names( self.task_types ) else: starting_classes = Task.all_subclasses_by_shortname() skip_anonymous_tasks = self.skip_anonymous_tasks() for cls in starting_classes: if ( self.tasks_offering_trackers_only and not cls.provides_trackers ): # Class doesn't provide trackers; skip continue if skip_anonymous_tasks and not cls.has_patient: # Anonymous task; skip continue if self.text_contents and not cls.get_text_filter_columns(): # Text filter and task has no text columns; skip continue self._task_classes.append(cls) sort_task_classes_in_place(self._task_classes, self._sort_method) return self._task_classes
[docs] def skip_anonymous_tasks(self) -> bool: """ Should we skip anonymous tasks? """ return self.tasks_with_patient_only or self.any_patient_filtering()
[docs] def offers_all_task_types(self) -> bool: """ Does this filter offer every single task class? Used for efficiency when using indexes. (Since ignored.) """ if self.tasks_offering_trackers_only: return False if self.skip_anonymous_tasks(): return False if not self.task_types: return True return set(self.task_classes) == set(Task.all_subclasses_by_shortname)
[docs] def offers_all_non_anonymous_task_types(self) -> bool: """ Does this filter offer every single non-anonymous task class? Used for efficiency when using indexes. """ offered_task_classes = self.task_classes for taskclass in Task.all_subclasses_by_shortname(): if taskclass.is_anonymous: continue if taskclass not in offered_task_classes: return False return True
@property def task_tablename_list(self) -> List[str]: """ Returns the base table names for all task types permitted by the filter. """ return [cls.__tablename__ for cls in self.task_classes]
[docs] def any_patient_filtering(self) -> bool: """ Is some sort of patient filtering being applied? """ return ( bool(self.surname) or bool(self.forename) or (self.dob is not None) or bool(self.sex) or bool(self.idnum_criteria) )
[docs] def any_specific_patient_filtering(self) -> bool: """ Are there filters that would restrict to one or a few patients? (Differs from :func:`any_patient_filtering` with respect to sex.) """ return ( bool(self.surname) or bool(self.forename) or self.dob is not None or bool(self.idnum_criteria) )
[docs] def get_only_iddef(self) -> Optional["IdNumReference"]: """ If a single ID number type/value restriction is being applied, return it, as an :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference`. Otherwise, return ``None``. """ if len(self.idnum_criteria) != 1: return None return self.idnum_criteria[0]
[docs] def get_group_names(self, req: "CamcopsRequest") -> List[str]: """ Get the names of any groups to which we are restricting. """ groups = ( req.dbsession.query(Group) .filter(Group.id.in_(self.group_ids)) .all() ) # type: List[Group] return [g.name if g and g.name else "" for g in groups]
[docs] def get_user_names(self, req: "CamcopsRequest") -> List[str]: """ Get the usernames of any uploading users to which we are restricting. """ users = ( req.dbsession.query(User) .filter(User.id.in_(self.adding_user_ids)) .all() ) # type: List[User] return [u.username if u and u.username else "" for u in users]
[docs] def get_device_names(self, req: "CamcopsRequest") -> List[str]: """ Get the names of any devices to which we are restricting. """ devices = ( req.dbsession.query(Device) .filter(Device.id.in_(self.device_ids)) .all() ) # type: List[Device] return [d.name if d and d.name else "" for d in devices]
[docs] def clear(self) -> None: """ Clear all parts of the filter. """ self.task_types = [] # type: List[str] self.surname = None self.forename = None self.dob = None # type: Optional[datetime.date] self.sex = None self.idnum_criteria = [] # type: List[IdNumReference] self.device_ids = [] # type: List[int] self.adding_user_ids = [] # type: List[int] self.group_ids = [] # type: List[int] self.start_datetime = ( None ) # type: Union[None, Pendulum, datetime.datetime] self.end_datetime = ( None ) # type: Union[None, Pendulum, datetime.datetime] self.text_contents = [] # type: List[str] self.complete_only = None # type: Optional[bool]
[docs] def dates_inconsistent(self) -> bool: """ Are inconsistent dates specified, such that no tasks should be returned? """ return ( self.start_datetime and self.end_datetime and self.end_datetime < self.start_datetime )
[docs] def filter_query_by_patient(self, q: Query, via_index: bool) -> Query: """ Restricts an query that has *already been joined* to the :class:`camcops_server.cc_modules.cc_patient.Patient` class, according to the patient filtering criteria. Args: q: the starting SQLAlchemy ORM Query via_index: If ``True``, the query relates to a :class:`camcops_server.cc_modules.cc_taskindex.TaskIndexEntry` and we should restrict it according to the :class:`camcops_server.cc_modules.cc_taskindex.PatientIdNumIndexEntry` class. If ``False``, the query relates to a :class:`camcops_server.cc_modules.cc_taskindex.Task` and we should restrict according to :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum`. Returns: a revised Query """ if self.surname: q = q.filter(func.upper(Patient.surname) == self.surname.upper()) if self.forename: q = q.filter(func.upper(Patient.forename) == self.forename.upper()) if self.dob is not None: q = q.filter(Patient.dob == self.dob) if self.sex: q = q.filter(func.upper(Patient.sex) == self.sex.upper()) if self.idnum_criteria: id_filter_parts = [] # type: List[ColumnElement] if via_index: q = q.join(PatientIdNumIndexEntry) # "Specify possible ID number values" for iddef in self.idnum_criteria: id_filter_parts.append( and_( PatientIdNumIndexEntry.which_idnum == iddef.which_idnum, # noqa PatientIdNumIndexEntry.idnum_value == iddef.idnum_value, ) ) # Use OR (disjunction) of the specified values: q = q.filter(or_(*id_filter_parts)) # "Must have a value for a given ID number type" if self.must_have_idnum_type: # noinspection PyComparisonWithNone,PyPep8 q = q.filter( and_( PatientIdNumIndexEntry.which_idnum == self.must_have_idnum_type, # noqa PatientIdNumIndexEntry.idnum_value != None, # noqa: E711 ) ) else: # q = q.join(PatientIdNum) # fails q = q.join(Patient.idnums) # "Specify possible ID number values" for iddef in self.idnum_criteria: id_filter_parts.append( and_( PatientIdNum.which_idnum == iddef.which_idnum, PatientIdNum.idnum_value == iddef.idnum_value, ) ) # Use OR (disjunction) of the specified values: q = q.filter(or_(*id_filter_parts)) # "Must have a value for a given ID number type" if self.must_have_idnum_type: # noinspection PyComparisonWithNone,PyPep8 q = q.filter( and_( PatientIdNum.which_idnum == self.must_have_idnum_type, PatientIdNum.idnum_value != None, # noqa: E711 ) ) return q
@property def start_datetime_utc(self) -> Optional[Pendulum]: if not self.start_datetime: return None return convert_datetime_to_utc(self.start_datetime) @property def end_datetime_utc(self) -> Optional[Pendulum]: if not self.end_datetime: return None return convert_datetime_to_utc(self.end_datetime)
def encode_task_filter(taskfilter: TaskFilter) -> Dict: return { "task_types": taskfilter.task_types, "group_ids": taskfilter.group_ids, } # noinspection PyUnusedLocal def decode_task_filter(d: Dict, cls: Type) -> TaskFilter: taskfilter = TaskFilter() taskfilter.task_types = d["task_types"] taskfilter.group_ids = d["group_ids"] return taskfilter register_class_for_json( cls=TaskFilter, obj_to_dict_fn=encode_task_filter, dict_to_obj_fn=decode_task_filter, )