Source code for camcops_server.cc_modules.cc_taskcollection

#!/usr/bin/env python

"""
camcops_server/cc_modules/cc_taskcollection.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Classes to fetch tasks from the database as efficiently as possible.**

"""

from collections import OrderedDict
import datetime
from enum import Enum
import logging
from threading import Thread
from typing import (
    Dict,
    Generator,
    List,
    Optional,
    Tuple,
    Type,
    TYPE_CHECKING,
    Union,
)

from cardinal_pythonlib.json.serialize import (
    register_class_for_json,
    register_enum_for_json,
)
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.reprfunc import auto_repr, auto_str
from cardinal_pythonlib.sort import MINTYPE_SINGLETON, MinType
from kombu.serialization import dumps, loads
from pendulum import DateTime as Pendulum
from sqlalchemy.orm import Query
from sqlalchemy.orm.session import Session as SqlASession
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.expression import and_, exists, or_

from camcops_server.cc_modules.cc_constants import ERA_NOW
from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
from camcops_server.cc_modules.cc_task import (
    tablename_to_task_class_dict,
    Task,
)
from camcops_server.cc_modules.cc_taskfactory import (
    task_query_restricted_to_permitted_users,
)
from camcops_server.cc_modules.cc_taskfilter import TaskFilter
from camcops_server.cc_modules.cc_taskindex import TaskIndexEntry

if TYPE_CHECKING:
    from sqlalchemy.sql.elements import ClauseElement, ColumnElement
    from camcops_server.cc_modules.cc_request import CamcopsRequest

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Debugging options
# =============================================================================

DEBUG_QUERY_TIMING = False

if DEBUG_QUERY_TIMING:
    log.warning("Debugging options enabled!")


# =============================================================================
# Sorting helpers
# =============================================================================


[docs]def task_when_created_sorter( task: Task, ) -> Union[Tuple[Pendulum, datetime.datetime], MinType]: """ Function to sort tasks by their creation date/time (with upload date/time as a tiebreak for consistent ordering). """ # For sorting of tasks created = task.when_created # noinspection PyProtectedMember uploaded = task._when_added_batch_utc return MINTYPE_SINGLETON if created is None else (created, uploaded)
[docs]@register_enum_for_json class TaskSortMethod(Enum): """ Enum representing ways to sort tasks. """ NONE = 0 CREATION_DATE_ASC = 1 CREATION_DATE_DESC = 2
[docs]def sort_tasks_in_place( tasklist: List[Task], sortmethod: TaskSortMethod ) -> None: """ Sort a list of tasks, in place, according to ``sortmethod``. Args: tasklist: the list of tasks sortmethod: a :class:`TaskSortMethod` enum """ # Sort? if sortmethod == TaskSortMethod.CREATION_DATE_ASC: tasklist.sort(key=task_when_created_sorter) elif sortmethod == TaskSortMethod.CREATION_DATE_DESC: tasklist.sort(key=task_when_created_sorter, reverse=True)
# ============================================================================= # Parallel fetch helper # ============================================================================= # - Why consider a parallel fetch? # Because a typical fetch might involve 27ms per query (as seen by Python; # less as seen by MySQL) but about 100 queries, for a not-very-large # database. # - Initially UNSUCCESSFUL: even after tweaking pool_size=0 in create_engine() # to get round the SQLAlchemy error "QueuePool limit of size 5 overflow 10 # reached", in the parallel code, a great many queries are launched, but then # something goes wrong and others are started but then block -- for ages -- # waiting for a spare database connection, or something. # - Fixed that: I was not explicitly closing the sessions. # - But then a major conceptual problem: anything to be lazy-loaded (e.g. # patient, but also patient ID, special note, BLOB...) will give this sort of # error: "DetachedInstanceError: Parent instance <Phq9 at 0x7fe6cce2d278> is # not bound to a Session; lazy load operation of attribute 'patient' cannot # proceed" -- for obvious reasons. And some of those operations are only # required on the final paginated task set, which requires aggregation across # all tasks. # # HOWEVER, the query time per table drops from ~27ms to 4-8ms if we disable # eager loading (lazy="joined") of patients from tasks.
[docs]class FetchThread(Thread): """ Thread to fetch tasks in parallel. CURRENTLY UNUSED. """
[docs] def __init__( self, req: "CamcopsRequest", task_class: Type[Task], factory: "TaskCollection", **kwargs ) -> None: self.req = req self.task_class = task_class self.factory = factory self.error = False name = task_class.__tablename__ super().__init__(name=name, target=None, **kwargs)
[docs] def run(self) -> None: log.debug("Thread starting") dbsession = self.req.get_bare_dbsession() # noinspection PyBroadException try: # noinspection PyProtectedMember q = self.factory._make_query(dbsession, self.task_class) if q: tasks = q.all() # type: List[Task] # https://stackoverflow.com/questions/6319207/are-lists-thread-safe # noqa # https://stackoverflow.com/questions/6953351/thread-safety-in-pythons-dictionary # noqa # http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm # noqa # noinspection PyProtectedMember self.factory._tasks_by_class[self.task_class] = tasks log.debug("Thread finishing with results") else: log.debug("Thread finishing without results") except Exception: self.error = True log.error("Thread error") dbsession.close()
# ============================================================================= # Make a set of tasks, deferring work until things are needed # =============================================================================
[docs]class TaskCollection(object): """ Represent a potential or instantiated call to fetch tasks from the database. The caller may want them in a giant list (e.g. task viewer, CTVs), or split by task class (e.g. trackers). """
[docs] def __init__( self, req: Optional["CamcopsRequest"], taskfilter: TaskFilter = None, as_dump: bool = False, sort_method_by_class: TaskSortMethod = TaskSortMethod.NONE, sort_method_global: TaskSortMethod = TaskSortMethod.NONE, current_only: bool = True, via_index: bool = True, export_recipient: "ExportRecipient" = None, ) -> None: """ Args: req: The :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`. ``None`` should only be used as a parameter when serializing a :class:`TaskCollection` to the back-end. taskfilter: A :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter` object that contains any restrictions we may want to apply. Must be supplied unless supplying ``export_recipient`` (in which case, must not be supplied). as_dump: Use the "dump" permissions rather than the "view" permissions? sort_method_by_class: How should we sort tasks within each task class? sort_method_global: How should we sort tasks overall (across all task types)? current_only: Restrict to ``_current`` tasks only? via_index: Use the server's index (faster)? (Not possible with ``current_only=False``.) export_recipient: A :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient` """ # noqa if via_index and not current_only: log.warning("Can't use index for non-current tasks") via_index = False self._req = req self._filter = taskfilter self._as_dump = as_dump self._sort_method_by_class = sort_method_by_class self._sort_method_global = sort_method_global self._current_only = current_only self._via_index = via_index self.export_recipient = export_recipient if export_recipient: # We create a new filter to reflect the export recipient. assert ( self._filter is None ), "Can't supply taskfilter if you supply export_recipient" # We can do lots of what we need with a TaskFilter(). self._filter = TaskFilter() if not export_recipient.all_groups: self._filter.group_ids = export_recipient.group_ids self._filter.task_types = export_recipient.tasks self._filter.start_datetime = export_recipient.start_datetime_utc self._filter.end_datetime = export_recipient.end_datetime_utc self._filter.finalized_only = export_recipient.finalized_only self._filter.tasks_with_patient_only = ( not export_recipient.anonymous_ok() ) self._filter.must_have_idnum_type = export_recipient.primary_idnum else: assert ( self._filter ), "Must supply taskfilter unless you supply export_recipient" self._tasks_by_class = ( OrderedDict() ) # type: Dict[Type[Task], List[Task]] # noqa self._all_tasks = None # type: Optional[List[Task]] self._all_indexes = ( None ) # type: Optional[Union[List[TaskIndexEntry], Query]] # noqa
def __repr__(self) -> str: return auto_repr(self) def __str__(self) -> str: return auto_str(self) # ========================================================================= # Interface to read # ========================================================================= @property def req(self) -> "CamcopsRequest": """ Returns the associated request, or raises :exc:`AssertionError` if it's not been set. """ assert ( self._req is not None ), "Must initialize with a request or call set_request() first" return self._req
[docs] def set_request(self, req: "CamcopsRequest") -> None: """ Sets the request object manually. Used by Celery back-end tasks. Args: req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` """ self._req = req
[docs] def task_classes(self) -> List[Type[Task]]: """ Return a list of task classes that we want. """ return self._filter.task_classes
[docs] def tasks_for_task_class(self, task_class: Type[Task]) -> List[Task]: """ Returns all appropriate task instances for a specific task type. """ if self._via_index: self._ensure_everything_fetched_via_index() else: self._fetch_task_class(task_class) tasklist = self._tasks_by_class.get(task_class, []) return tasklist
@property def all_tasks(self) -> List[Task]: """ Returns a list of all appropriate task instances. """ if self._all_tasks is None: if self._via_index: self._ensure_everything_fetched_via_index() else: self._fetch_all_tasks_without_index() return self._all_tasks @property def all_tasks_or_indexes_or_query( self, ) -> Union[List[Task], List[TaskIndexEntry], Query]: """ Returns a list of all appropriate task instances, or index entries, or a query returning them. - Returning a list of tasks is fine, but the results of this function may be paginated (e.g. in the main task view), so the end result may be that e.g. 20,000 tasks are fetched and 20 are shown. - More efficient is to fetch 20,000 indexes from the single index table, and fetch only the 20 tasks we need. - More efficient still is to fetch the 20 indexes we need, and then their task. """ if not self._via_index: return self.all_tasks self._build_index_query() # ensure self._all_indexes is set if self._all_tasks is not None: # The tasks themselves have been fetched. return self._all_tasks return self._all_indexes # indexes or a query to fetch them # def forget_task_class(self, task_class: Type[Task]) -> None: # """ # Ditch results for a specific task class (for memory efficiency). # """ # self._tasks_by_class.pop(task_class, None) # # The "None" option prevents it from raising KeyError if the key # # doesn't exist. # # https://stackoverflow.com/questions/11277432/how-to-remove-a-key-from-a-python-dictionary # noqa
[docs] def gen_all_tasks_or_indexes( self, ) -> Generator[Union[Task, TaskIndexEntry], None, None]: """ Generates tasks or index entries. """ tasks_or_indexes_or_query = self.all_tasks_or_indexes_or_query if isinstance(tasks_or_indexes_or_query, Query): for item in tasks_or_indexes_or_query.all(): yield item else: for item in tasks_or_indexes_or_query: yield item
[docs] def gen_tasks_by_class(self) -> Generator[Task, None, None]: """ Generates all tasks, class-wise. """ for cls in self.task_classes(): for task in self.tasks_for_task_class(cls): yield task
[docs] def gen_tasks_in_global_order(self) -> Generator[Task, None, None]: """ Generates all tasks, in the global order. """ for task in self.all_tasks: yield task
@property def dbsession(self) -> SqlASession: """ Returns the request's database session. """ return self.req.dbsession # ========================================================================= # Internals: fetching Task objects # ========================================================================= def _fetch_all_tasks_without_index(self, parallel: bool = False) -> None: """ Fetch all tasks from the database. """ # AVOID parallel=True; see notes above. if DEBUG_QUERY_TIMING: start_time = Pendulum.now() if parallel: # Deprecated parallel fetch threads = [] # type: List[FetchThread] for task_class in self._filter.task_classes: thread = FetchThread(self.req, task_class, self) thread.start() threads.append(thread) for thread in threads: thread.join() if thread.error: raise ValueError("Multithreaded fetch failed") else: # Fetch all tasks, classwise. for task_class in self._filter.task_classes: self._fetch_task_class(task_class) if DEBUG_QUERY_TIMING: end_time = Pendulum.now() # noinspection PyUnboundLocalVariable time_taken = end_time - start_time log.info("_fetch_all_tasks took {}", time_taken) # Build our joint task list self._all_tasks = [] # type: List[Task] for single_task_list in self._tasks_by_class.values(): self._all_tasks += single_task_list sort_tasks_in_place(self._all_tasks, self._sort_method_global) def _fetch_task_class(self, task_class: Type[Task]) -> None: """ Fetch tasks from the database for one task type. """ if task_class in self._tasks_by_class: return # already fetched q = self._serial_query(task_class) if q is None: newtasks = [] # type: List[Task] else: newtasks = q.all() # type: List[Task] # Apply Python-side filters? newtasks = self._filter_through_python(newtasks) sort_tasks_in_place(newtasks, self._sort_method_by_class) self._tasks_by_class[task_class] = newtasks def _serial_query(self, task_class: Type[Task]) -> Optional[Query]: """ Make and return an SQLAlchemy ORM query for a specific task class. Returns ``None`` if no tasks would match our criteria. """ dbsession = self.req.dbsession return self._make_query(dbsession, task_class) def _make_query( self, dbsession: SqlASession, task_class: Type[Task] ) -> Optional[Query]: """ Make and return an SQLAlchemy ORM query for a specific task class. Returns ``None`` if no tasks would match our criteria. """ q = dbsession.query(task_class) # Restrict to what the web front end will supply # noinspection PyProtectedMember if self._current_only: # noinspection PyProtectedMember q = q.filter(task_class._current == True) # noqa: E712 # Restrict to what is PERMITTED q = task_query_restricted_to_permitted_users( self.req, q, task_class, as_dump=self._as_dump ) # Restrict to what is DESIRED if q: q = self._task_query_restricted_by_filter(q, task_class) if q and self.export_recipient: q = self._task_query_restricted_by_export_recipient(q, task_class) return q def _task_query_restricted_by_filter( self, q: Query, cls: Type[Task] ) -> Optional[Query]: """ Restricts an SQLAlchemy ORM query for a given task class to those tasks that our filter permits. THIS IS A KEY SECURITY FUNCTION, since it implements some permissions that relate to viewing tasks when unfiltered. Args: q: the starting SQLAlchemy ORM Query cls: the task class Returns: the original query, a modified query, or ``None`` if no tasks would pass the filter """ tf = self._filter # task filter user = self.req.user if tf.group_ids: permitted_group_ids = tf.group_ids.copy() else: permitted_group_ids = None # unrestricted if tf.dates_inconsistent(): return None if cls not in tf.task_classes: # We don't want this task return None if not cls.is_anonymous: # Not anonymous. if not tf.any_specific_patient_filtering(): # No patient filtering. Permissions depend on user settings. if user.may_view_all_patients_when_unfiltered: # May see everything. No restrictions. pass elif user.may_view_no_patients_when_unfiltered: # Can't see patient data from any group. # (a) User not permitted to view any patients when # unfiltered, and (b) not filtered to a level that would # reasonably restrict to one or a small number of # patients. Skip the task class. return None else: # May see patient data from some, but not all, groups. liberal_group_ids = ( user.group_ids_nonsuperuser_may_see_when_unfiltered() ) if not permitted_group_ids: # was unrestricted permitted_group_ids = liberal_group_ids else: # was restricted; restrict further permitted_group_ids = [ gid for gid in permitted_group_ids if gid in liberal_group_ids ] if not permitted_group_ids: return None # down to zero; no point continuing # Patient filtering if tf.any_patient_filtering(): # q = q.join(Patient) # fails q = q.join( cls.patient ) # use explicitly configured relationship # noqa q = tf.filter_query_by_patient(q, via_index=False) # Patient-independent filtering if tf.device_ids: # noinspection PyProtectedMember q = q.filter(cls._device_id.in_(tf.device_ids)) if tf.era: # noinspection PyProtectedMember q = q.filter(cls._era == tf.era) if tf.finalized_only: q = q.filter(cls._era != ERA_NOW) if tf.adding_user_ids: # noinspection PyProtectedMember q = q.filter(cls._adding_user_id.in_(tf.adding_user_ids)) if permitted_group_ids: # noinspection PyProtectedMember q = q.filter(cls._group_id.in_(permitted_group_ids)) if tf.start_datetime is not None: q = q.filter(cls.when_created >= tf.start_datetime) if tf.end_datetime is not None: q = q.filter(cls.when_created < tf.end_datetime) q = self._filter_query_for_text_contents(q, cls) return q def _task_query_restricted_by_export_recipient( self, q: Query, cls: Type[Task] ) -> Optional[Query]: """ For exports. Filters via our :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`, except for the bits already implemented via our :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`. The main job here is for incremental exports: to find tasks that have not yet been exported. We look for any tasks not yet exported to a recipient of the same name (regardless of ``ExportRecipient.id``, which changes when the export recipient is reconfigured). Compare :meth:`_index_query_restricted_by_export_recipient`. Args: q: the starting SQLAlchemy ORM Query cls: the task class Returns: the original query, a modified query, or ``None`` if no tasks would pass the filter """ from camcops_server.cc_modules.cc_exportmodels import ( ExportedTask, ) # delayed import r = self.export_recipient if not r.is_incremental(): # Full database export; no restrictions return q # Otherwise, restrict to tasks not yet sent to this recipient. # noinspection PyUnresolvedReferences q = q.filter( # "There is not a successful export record for this task/recipient" ~exists() .select_from( ExportedTask.__table__.join( ExportRecipient.__table__, ExportedTask.recipient_id == ExportRecipient.id, ) ) .where( and_( ExportRecipient.recipient_name == r.recipient_name, ExportedTask.basetable == cls.__tablename__, ExportedTask.task_server_pk == cls._pk, ExportedTask.success == True, # noqa: E712 ExportedTask.cancelled == False, # noqa: E712 ) ) ) return q def _filter_through_python(self, tasks: List[Task]) -> List[Task]: """ Returns those tasks in the list provided that pass any Python-only aspects of our filter (those parts not easily calculable via SQL). This applies to the "direct" (and not "via index") routes only. With the index, we can do everything via SQL. """ assert not self._via_index if not self._has_python_parts_to_filter(): return tasks return [ t for t in tasks if self._task_matches_python_parts_of_filter(t) ] def _has_python_parts_to_filter(self) -> bool: """ Does the filter have aspects to it that require some Python thought, not just a database query? Only applicable to the direct (not "via index") route. """ assert not self._via_index return self._filter.complete_only def _task_matches_python_parts_of_filter(self, task: Task) -> bool: """ Does the task pass the Python parts of the filter? Only applicable to the direct (not "via index") route. """ assert not self._via_index # "Is task complete" filter if self._filter.complete_only: if not task.is_complete(): return False return True # ========================================================================= # Shared between Task and TaskIndexEntry methods # ========================================================================= def _filter_query_for_text_contents( self, q: Query, taskclass: Type[Task] ) -> Optional[Query]: """ Returns the query, filtered for the "text contents" filter. Args: q: the starting SQLAlchemy ORM Query taskclass: the task class Returns: a Query, potentially modified. """ tf = self._filter # task filter if not tf.text_contents: return q # unmodified # task must contain ALL the strings in AT LEAST ONE text column textcols = taskclass.get_text_filter_columns() if not textcols: # Text filtering requested, but there are no text columns, so # by definition the filter must fail. return None clauses_over_text_phrases = [] # type: List[ColumnElement] # ... each e.g. "col1 LIKE '%paracetamol%' OR col2 LIKE '%paracetamol%'" # noqa for textfilter in tf.text_contents: tf_lower = textfilter.lower() clauses_over_columns = [] # type: List[ColumnElement] # ... each e.g. "col1 LIKE '%paracetamol%'" for textcol in textcols: # Case-insensitive comparison: # https://groups.google.com/forum/#!topic/sqlalchemy/331XoToT4lk # https://bitbucket.org/zzzeek/sqlalchemy/wiki/UsageRecipes/StringComparisonFilter # noqa clauses_over_columns.append( func.lower(textcol).contains(tf_lower, autoescape=True) ) clauses_over_text_phrases.append(or_(*clauses_over_columns)) return q.filter(and_(*clauses_over_text_phrases)) # ... thus, e.g. # "(col1 LIKE '%paracetamol%' OR col2 LIKE '%paracetamol%') AND # (col1 LIKE '%overdose%' OR col2 LIKE '%overdose%') # ========================================================================= # Internals: fetching TaskIndexEntry objects # ========================================================================= def _ensure_everything_fetched_via_index(self) -> None: """ Ensure we have all our tasks loaded, using the index. """ self._build_index_query() self._fetch_tasks_from_indexes() def _build_index_query(self) -> None: """ Creates a Query in :attr:`_all_indexes` that will fetch task indexes. If the task filtering requires the tasks to be fetched (i.e. text contents), fetch the actual tasks too (and filter them). """ if self._all_indexes is not None: return self._all_indexes = self._make_index_query() if self._filter.text_contents: self._fetch_tasks_from_indexes() def _fetch_tasks_from_indexes(self) -> None: """ Takes the query that has already been stored in :attr:`_all_indexes`, and populate the task attributes, :attr:`_all_tasks` and :attr:`_tasks_by_class`. """ if self._all_tasks is not None: return assert self._all_indexes is not None d = tablename_to_task_class_dict() dbsession = self.req.dbsession self._all_tasks = [] # type: List[Task] # Fetch indexes if isinstance(self._all_indexes, Query): # Query built, but indexes not yet fetched. # Replace the query with actual indexes self._all_indexes = ( self._all_indexes.all() ) # type: List[TaskIndexEntry] # noqa indexes = self._all_indexes # Fetch tasks tablenames = set(index.task_table_name for index in indexes) for tablename in tablenames: # We do this by task class, so we can execute a single query per # task type (rather than per task). try: taskclass = d[tablename] except KeyError: log.warning("Bad tablename in index: {!r}", tablename) continue tasklist = self._tasks_by_class.setdefault(taskclass, []) task_pks = [i.task_pk for i in indexes if i.tablename == tablename] # noinspection PyProtectedMember qtask = dbsession.query(taskclass).filter( taskclass._pk.in_(task_pks) ) qtask = self._filter_query_for_text_contents(qtask, taskclass) tasks = qtask.all() # type: List[Task] for task in tasks: tasklist.append(task) self._all_tasks.append(task) # Sort tasks for tasklist in self._tasks_by_class.values(): sort_tasks_in_place(tasklist, self._sort_method_by_class) sort_tasks_in_place(self._all_tasks, self._sort_method_global) def _make_index_query(self) -> Optional[Query]: """ Make and return an SQLAlchemy ORM query to retrieve indexes. Returns ``None`` if no tasks would match our criteria. """ dbsession = self.req.dbsession q = dbsession.query(TaskIndexEntry) # Restrict to what the web front end will supply assert self._current_only, "_current_only must be true to use index" # Restrict to what is PERMITTED if not self.export_recipient: q = task_query_restricted_to_permitted_users( self.req, q, TaskIndexEntry, as_dump=self._as_dump ) # Restrict to what is DESIRED if q: q = self._index_query_restricted_by_filter(q) if q and self.export_recipient: q = self._index_query_restricted_by_export_recipient(q) return q def _index_query_restricted_by_filter(self, q: Query) -> Optional[Query]: """ Counterpart to :func:`_task_query_restricted_by_filter`, but for indexes. THIS IS A KEY SECURITY FUNCTION, since it implements some permissions that relate to viewing tasks when unfiltered. Args: q: the starting SQLAlchemy ORM Query Returns: the original query, a modified query, or ``None`` if no tasks would pass the filter """ tf = self._filter # task filter user = self.req.user if tf.group_ids: permitted_group_ids = tf.group_ids.copy() else: permitted_group_ids = None # unrestricted if tf.dates_inconsistent(): return None # Task type filtering if tf.skip_anonymous_tasks(): # noinspection PyPep8 q = q.filter(TaskIndexEntry.patient_pk != None) # noqa: E711 if not tf.offers_all_non_anonymous_task_types(): permitted_task_tablenames = [ tc.__tablename__ for tc in tf.task_classes ] q = q.filter( TaskIndexEntry.task_table_name.in_(permitted_task_tablenames) ) # Special rules when we've not filtered for any patients if not tf.any_specific_patient_filtering(): # No patient filtering. Permissions depend on user settings. if user.may_view_all_patients_when_unfiltered: # May see everything. No restrictions. pass elif user.may_view_no_patients_when_unfiltered: # Can't see patient data from any group. # (a) User not permitted to view any patients when # unfiltered, and (b) not filtered to a level that would # reasonably restrict to one or a small number of # patients. Restrict to anonymous tasks. # noinspection PyPep8 q = q.filter(TaskIndexEntry.patient_pk == None) # noqa: E711 else: # May see patient data from some, but not all, groups. # This is a little more complex than the equivalent in # _task_query_restricted_by_filter(), because we shouldn't # restrict anonymous tasks. liberal_group_ids = ( user.group_ids_nonsuperuser_may_see_when_unfiltered() ) # noinspection PyPep8 liberal_or_anon_criteria = [ TaskIndexEntry.patient_pk == None # noqa: E711 # anonymous OK ] # type: List[ClauseElement] for gid in liberal_group_ids: liberal_or_anon_criteria.append( TaskIndexEntry.group_id == gid # this group OK ) q = q.filter(or_(*liberal_or_anon_criteria)) # Patient filtering if tf.any_patient_filtering(): q = q.join(TaskIndexEntry.patient) # use relationship q = tf.filter_query_by_patient(q, via_index=True) # Patient-independent filtering if tf.device_ids: # noinspection PyProtectedMember q = q.filter(TaskIndexEntry.device_id.in_(tf.device_ids)) if tf.era: # noinspection PyProtectedMember q = q.filter(TaskIndexEntry.era == tf.era) if tf.finalized_only: q = q.filter(TaskIndexEntry.era != ERA_NOW) if tf.adding_user_ids: # noinspection PyProtectedMember q = q.filter(TaskIndexEntry.adding_user_id.in_(tf.adding_user_ids)) if permitted_group_ids: # noinspection PyProtectedMember q = q.filter(TaskIndexEntry.group_id.in_(permitted_group_ids)) if tf.start_datetime is not None: q = q.filter( TaskIndexEntry.when_created_utc >= tf.start_datetime_utc ) if tf.end_datetime is not None: q = q.filter(TaskIndexEntry.when_created_utc < tf.end_datetime_utc) # text_contents is managed at the later fetch stage when using indexes # But is_complete can be filtered now and in SQL: if tf.complete_only: # noinspection PyPep8 q = q.filter(TaskIndexEntry.task_is_complete == True) # noqa: E712 # When we use indexes, we embed the global sort criteria in the query. if self._sort_method_global == TaskSortMethod.CREATION_DATE_ASC: q = q.order_by( TaskIndexEntry.when_created_utc.asc(), TaskIndexEntry.when_added_batch_utc.asc(), ) elif self._sort_method_global == TaskSortMethod.CREATION_DATE_DESC: q = q.order_by( TaskIndexEntry.when_created_utc.desc(), TaskIndexEntry.when_added_batch_utc.desc(), ) return q def _index_query_restricted_by_export_recipient( self, q: Query ) -> Optional[Query]: """ For exports. Filters via our :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`, except for the bits already implemented via our :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`. The main job here is for incremental exports: to find tasks that have not yet been exported. Compare :meth:`_task_query_restricted_by_export_recipient`. Args: q: the starting SQLAlchemy ORM Query Returns: the original query, a modified query, or ``None`` if no tasks would pass the filter """ from camcops_server.cc_modules.cc_exportmodels import ( ExportedTask, ) # delayed import r = self.export_recipient if not r.is_incremental(): # Full database export; no restrictions return q # Otherwise, restrict to tasks not yet sent to this recipient. # Remember: q is a query on TaskIndexEntry. # noinspection PyUnresolvedReferences q = q.filter( # "There is not a successful export record for this task/recipient" ~exists() .select_from( ExportedTask.__table__.join( ExportRecipient.__table__, ExportedTask.recipient_id == ExportRecipient.id, ) ) .where( and_( ExportRecipient.recipient_name == r.recipient_name, ExportedTask.basetable == TaskIndexEntry.task_table_name, # ... don't use ".tablename" as a property doesn't play # nicely with SQLAlchemy here ExportedTask.task_server_pk == TaskIndexEntry.task_pk, ExportedTask.success == True, # noqa: E712 ExportedTask.cancelled == False, # noqa: E712 ) ) ) return q
# noinspection PyProtectedMember
[docs]def encode_task_collection(coll: TaskCollection) -> Dict: """ Serializes a :class:`TaskCollection`. The request is not serialized and must be rebuilt in another way; see e.g. :func:`camcops_server.cc_modules.celery.email_basic_dump`. """ return { "taskfilter": dumps(coll._filter, serializer="json"), "as_dump": coll._as_dump, "sort_method_by_class": dumps( coll._sort_method_by_class, serializer="json" ), }
# noinspection PyUnusedLocal
[docs]def decode_task_collection(d: Dict, cls: Type) -> TaskCollection: """ Creates a :class:`TaskCollection` from a serialized version. The request is not serialized and must be rebuilt in another way; see e.g. :func:`camcops_server.cc_modules.celery.email_basic_dump`. """ kwargs = { "taskfilter": loads(*reorder_args(*d["taskfilter"])), "as_dump": d["as_dump"], "sort_method_by_class": loads( *reorder_args(*d["sort_method_by_class"]) ), } return TaskCollection(req=None, **kwargs)
[docs]def reorder_args( content_type: str, content_encoding: str, data: str ) -> List[str]: """ kombu :func:`SerializerRegistry.dumps` returns data as last element in tuple but for :func:`SerializeRegistry.loads` it's the first argument """ return [data, content_type, content_encoding]
register_class_for_json( cls=TaskCollection, obj_to_dict_fn=encode_task_collection, dict_to_obj_fn=decode_task_collection, )