Source code for camcops_server.cc_modules.cc_report

#!/usr/bin/env python

"""
camcops_server/cc_modules/cc_report.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**CamCOPS reports.**

"""

import logging
from abc import ABC
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Sequence,
    Type,
    TYPE_CHECKING,
    Union,
)

from cardinal_pythonlib.classes import all_subclasses, classproperty
from cardinal_pythonlib.datetimefunc import format_datetime
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.pyramid.responses import (
    OdsResponse,
    TsvResponse,
    XlsxResponse,
)
from deform.form import Form
from pyramid.httpexceptions import HTTPBadRequest
from pyramid.renderers import render_to_response
from pyramid.response import Response
from sqlalchemy.engine import CursorResult
from sqlalchemy.orm.query import Query
from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy.sql.expression import and_, column, func, select
from sqlalchemy.sql.selectable import SelectBase

# import as LITTLE AS POSSIBLE; this is used by lots of modules
from camcops_server.cc_modules.cc_constants import (
    DateFormat,
    DEFAULT_ROWS_PER_PAGE,
)
from camcops_server.cc_modules.cc_db import FN_CURRENT, TFN_WHEN_CREATED
from camcops_server.cc_modules.cc_pyramid import (
    CamcopsPage,
    PageUrl,
    ViewArg,
    ViewParam,
)
from camcops_server.cc_modules.cc_spreadsheet import (
    SpreadsheetCollection,
    SpreadsheetPage,
)

if TYPE_CHECKING:
    from camcops_server.cc_modules.cc_forms import (  # noqa: F401
        ReportParamForm,
        ReportParamSchema,
    )
    from camcops_server.cc_modules.cc_request import (  # noqa: F401
        CamcopsRequest,
    )
    from camcops_server.cc_modules.cc_task import Task  # noqa: F401

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Other constants
# =============================================================================


[docs]class PlainReportType(object): """ Simple class to hold the results of a plain report. """
[docs] def __init__( self, rows: Sequence[Sequence[Any]], column_names: Sequence[str] ) -> None: self.rows = rows self.column_names = column_names
# ============================================================================= # Report class # =============================================================================
[docs]class Report(object): """ Abstract base class representing a report. If you are writing a report, you must override these attributes: - :meth:`report_id` - :meth:`report_title` - One combination of: - (simplest) :meth:`get_query` OR :meth:`get_rows_colnames` - (for multi-page results) :meth:`render_html` and :meth:`get_spreadsheet_pages` - (manual control) all ``render_*`` functions See the explanations of each. """ template_name = "report.mako" # ------------------------------------------------------------------------- # Attributes that must be provided # ------------------------------------------------------------------------- # noinspection PyMethodParameters @classproperty def report_id(cls) -> str: """ Returns a identifying string, unique to this report, used in the HTML report selector. """ raise NotImplementedError("implement in subclass")
[docs] @classmethod def title(cls, req: "CamcopsRequest") -> str: """ Descriptive title for display purposes. """ raise NotImplementedError("implement in subclass")
# noinspection PyMethodParameters @classproperty def superuser_only(cls) -> bool: """ If ``True`` (the default), only superusers may run the report. You must explicitly override this property to permit others. """ return True
[docs] @classmethod def get_http_query_keys(cls) -> List[str]: """ Returns the keys used for the HTTP GET query. They include details of: - which report? - how to view it? - pagination options - report-specific configuration details from :func:`get_specific_http_query_keys`. """ return [ ViewParam.REPORT_ID, ViewParam.VIEWTYPE, ViewParam.ROWS_PER_PAGE, ViewParam.PAGE, ] + cls.get_specific_http_query_keys()
[docs] @classmethod def get_specific_http_query_keys(cls) -> List[str]: """ Additional HTTP GET query keys used by this report. Override to add custom ones. """ return []
[docs] def get_query( self, req: "CamcopsRequest" ) -> Union[None, SelectBase, Query]: """ Overriding this function is one way of providing a report. (The other is :func:`get_rows_colnames`.) To override this function, return the SQLAlchemy Base :class:`Select` statement or the SQLAlchemy ORM :class:`Query` to execute the report. Parameters are passed in via the request. """ return None
[docs] def get_rows_colnames( self, req: "CamcopsRequest" ) -> Optional[PlainReportType]: """ Overriding this function is one way of providing a report. (The other is :func:`get_query`.) To override this function, return a :class:`PlainReportType` with column names and row content. """ return None
[docs] @staticmethod def get_paramform_schema_class() -> Type["ReportParamSchema"]: """ Returns the class used as the Colander schema for the form that configures the report. By default, this is a simple form that just offers a choice of output format, but you can provide a more extensive one (an example being in :class:`camcops_server.tasks.diagnosis.DiagnosisFinderReportBase`. """ from camcops_server.cc_modules.cc_forms import ( ReportParamSchema, ) # delayed import return ReportParamSchema
[docs] def get_form(self, req: "CamcopsRequest") -> Form: """ Returns a Colander form to configure the report. The default usually suffices, and it will use the schema specified in :func:`get_paramform_schema_class`. """ from camcops_server.cc_modules.cc_forms import ( # noqa: F811 ReportParamForm, ) # delayed import schema_class = self.get_paramform_schema_class() return ReportParamForm(request=req, schema_class=schema_class)
[docs] @staticmethod def get_test_querydict() -> Dict[str, Any]: """ What this function returns is used as the specimen Colander ``appstruct`` for unit tests. The default is an empty dictionary. """ return {}
[docs] @staticmethod def add_task_report_filters(wheres: List[ColumnElement]) -> None: """ Adds any restrictions required to a list of SQLAlchemy Core ``WHERE`` clauses. Override this (or provide additional filters and call this) to provide global filters to queries used to create reports. Used by :class:`DateTimeFilteredReportMixin`, etc. The presumption is that the thing being filtered is an instance of :class:`camcops_server.cc_modules.cc_task.Task`. Args: wheres: list of SQL ``WHERE`` conditions, each represented as an SQLAlchemy :class:`ColumnElement`. This list is modifed in place. The caller will need to apply the final list to the query. """ # noinspection PyPep8 wheres.append(column(FN_CURRENT) == True) # noqa: E712
# ------------------------------------------------------------------------- # Common functionality: classmethods # -------------------------------------------------------------------------
[docs] @classmethod def all_subclasses(cls) -> List[Type["Report"]]: """ Get all report subclasses, except those not implementing their ``report_id`` property. Optionally, sort by their title. """ # noinspection PyTypeChecker classes = all_subclasses(cls) # type: List[Type["Report"]] instantiated_report_classes = [] # type: List[Type["Report"]] for reportcls in classes: if reportcls.__name__ == "TestReport": continue try: _ = reportcls.report_id instantiated_report_classes.append(reportcls) except NotImplementedError: # This is a subclass of Report, but it's still an abstract # class; skip it. pass return instantiated_report_classes
# ------------------------------------------------------------------------- # Common functionality: default Response # -------------------------------------------------------------------------
[docs] def get_response(self, req: "CamcopsRequest") -> Response: """ Return the report content itself, as an HTTP :class:`Response`. """ # Check the basic parameters report_id = req.get_str_param(ViewParam.REPORT_ID) if report_id != self.report_id: raise HTTPBadRequest("Error - request directed to wrong report!") # viewtype = req.get_str_param(ViewParam.VIEWTYPE, ViewArg.HTML, # lower=True) # ... NO; for a Deform radio button, the request contains parameters # like # ('__start__', 'viewtype:rename'), # ('deformField2', 'tsv'), # ('__end__', 'viewtype:rename') # ... so we need to ask the appstruct instead. # This is a bit different from how we manage trackers/CTVs, where we # recode the appstruct to a URL. # # viewtype = appstruct.get(ViewParam.VIEWTYPE) # type: str # # Ah, no... that fails with pagination of reports. Let's redirect # things to the HTTP query, as for trackers/audit! viewtype = req.get_str_param( ViewParam.VIEWTYPE, ViewArg.HTML, lower=True ) # Run the report (which may take additional parameters from the # request) # Serve the result if viewtype == ViewArg.HTML: return self.render_html(req=req) if viewtype == ViewArg.ODS: return self.render_ods(req=req) if viewtype == ViewArg.TSV: return self.render_tsv(req=req) if viewtype == ViewArg.XLSX: return self.render_xlsx(req=req) raise HTTPBadRequest("Bad viewtype")
def render_html(self, req: "CamcopsRequest") -> Response: rows_per_page = req.get_int_param( ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE ) page_num = req.get_int_param(ViewParam.PAGE, 1) plain_report = self._get_plain_report(req) page = CamcopsPage( collection=plain_report.rows, page=page_num, items_per_page=rows_per_page, url_maker=PageUrl(req), request=req, ) return self.render_single_page_html( req=req, column_names=plain_report.column_names, page=page ) def render_tsv(self, req: "CamcopsRequest") -> TsvResponse: filename = self.get_filename(req, ViewArg.TSV) # By default there is only one page. If there are more, # we only output the first page = self.get_spreadsheet_pages(req)[0] return TsvResponse(body=page.get_tsv(), filename=filename) def render_xlsx(self, req: "CamcopsRequest") -> XlsxResponse: filename = self.get_filename(req, ViewArg.XLSX) tsvcoll = self.get_spreadsheet_collection(req) content = tsvcoll.as_xlsx() return XlsxResponse(body=content, filename=filename) def render_ods(self, req: "CamcopsRequest") -> OdsResponse: filename = self.get_filename(req, ViewArg.ODS) tsvcoll = self.get_spreadsheet_collection(req) content = tsvcoll.as_ods() return OdsResponse(body=content, filename=filename) def get_spreadsheet_collection( self, req: "CamcopsRequest" ) -> SpreadsheetCollection: coll = SpreadsheetCollection() coll.add_pages(self.get_spreadsheet_pages(req)) return coll def get_spreadsheet_pages( self, req: "CamcopsRequest" ) -> List[SpreadsheetPage]: plain_report = self._get_plain_report(req) page = self.get_spreadsheet_page( name=self.title(req), column_names=plain_report.column_names, rows=plain_report.rows, ) return [page] @staticmethod def get_spreadsheet_page( name: str, column_names: Sequence[str], rows: Sequence[Sequence[Any]] ) -> SpreadsheetPage: keyed_rows = [dict(zip(column_names, r)) for r in rows] page = SpreadsheetPage(name=name, rows=keyed_rows) return page def get_filename(self, req: "CamcopsRequest", viewtype: str) -> str: extension_dict = { ViewArg.ODS: "ods", ViewArg.TSV: "tsv", ViewArg.XLSX: "xlsx", } if viewtype not in extension_dict: raise HTTPBadRequest("Unsupported viewtype") extension = extension_dict.get(viewtype) return ( "CamCOPS_" + self.report_id + "_" + format_datetime(req.now, DateFormat.FILENAME) + "." + extension )
[docs] def render_single_page_html( self, req: "CamcopsRequest", column_names: Sequence[str], page: CamcopsPage, ) -> Response: """ Converts a paginated report into an HTML response. If you wish, you can override this for more report customization. """ return render_to_response( self.template_name, dict( title=self.title(req), page=page, column_names=column_names, report_id=self.report_id, ), request=req, )
def _get_plain_report(self, req: "CamcopsRequest") -> PlainReportType: """ Uses :meth:`get_query`, or if absent, :meth:`get_rows_colnames`, to fetch data. Returns a "single-page" type report, in the form of a :class:`PlainReportType`. """ statement = self.get_query(req) if statement is not None: rp = req.dbsession.execute(statement) # type: CursorResult column_names = rp.keys() rows = rp.fetchall() plain_report = PlainReportType( rows=rows, column_names=column_names ) else: plain_report = self.get_rows_colnames(req) if plain_report is None: raise NotImplementedError( "Report did not implement either of get_query()" " or get_rows_colnames()" ) return plain_report
[docs]class PercentageSummaryReportMixin(object): """ Mixin to be used with :class:`Report`. """ @classproperty def task_class(self) -> Type["Task"]: raise NotImplementedError("implement in subclass")
[docs] def get_percentage_summaries( self, req: "CamcopsRequest", column_dict: Dict[str, str], num_answers: int, cell_format: str = "{}", min_answer: int = 0, ) -> List[List[str]]: """ Provides a summary of each question, x% of people said each response. """ rows = [] for column_name, question in column_dict.items(): """ e.g. SELECT COUNT(col) FROM perinatal_poem WHERE col IS NOT NULL """ wheres = [column(column_name).isnot(None)] # noinspection PyUnresolvedReferences self.add_task_report_filters(wheres) # noinspection PyUnresolvedReferences total_query = ( select([func.count(column_name)]) .select_from(self.task_class.__table__) .where(and_(*wheres)) ) total_responses = req.dbsession.execute(total_query).fetchone()[0] row = [question] + [total_responses] + [""] * num_answers """ e.g. SELECT total_responses,col, ((100 * COUNT(col)) / total_responses) FROM perinatal_poem WHERE col is not NULL GROUP BY col """ # noinspection PyUnresolvedReferences query = ( select( [ column(column_name), ((100 * func.count(column_name)) / total_responses), ] ) .select_from(self.task_class.__table__) .where(and_(*wheres)) .group_by(column_name) ) # row output is: # 0 1 2 3 # +----------+-----------------+--------------+--------------+---- # | question | total responses | % 1st answer | % 2nd answer | ... # +----------+-----------------+--------------+--------------+---- for result in req.dbsession.execute(query): col = 2 + (result[0] - min_answer) row[col] = cell_format.format(result[1]) rows.append(row) return rows
class DateTimeFilteredReportMixin(object): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.start_datetime = None # type: Optional[str] self.end_datetime = None # type: Optional[str] @staticmethod def get_paramform_schema_class() -> Type["ReportParamSchema"]: from camcops_server.cc_modules.cc_forms import ( DateTimeFilteredReportParamSchema, ) # delayed import return DateTimeFilteredReportParamSchema @classmethod def get_specific_http_query_keys(cls) -> List[str]: # noinspection PyUnresolvedReferences return super().get_specific_http_query_keys() + [ ViewParam.START_DATETIME, ViewParam.END_DATETIME, ] def get_response(self, req: "CamcopsRequest") -> Response: self.start_datetime = format_datetime( req.get_datetime_param(ViewParam.START_DATETIME), DateFormat.ERA ) self.end_datetime = format_datetime( req.get_datetime_param(ViewParam.END_DATETIME), DateFormat.ERA ) # noinspection PyUnresolvedReferences return super().get_response(req) def add_task_report_filters(self, wheres: List[ColumnElement]) -> None: """ Adds any restrictions required to a list of SQLAlchemy Core ``WHERE`` clauses. See :meth:`Report.add_task_report_filters`. Args: wheres: list of SQL ``WHERE`` conditions, each represented as an SQLAlchemy :class:`ColumnElement`. This list is modifed in place. The caller will need to apply the final list to the query. """ # noinspection PyUnresolvedReferences super().add_task_report_filters(wheres) if self.start_datetime is not None: wheres.append(column(TFN_WHEN_CREATED) >= self.start_datetime) if self.end_datetime is not None: wheres.append(column(TFN_WHEN_CREATED) < self.end_datetime)
[docs]class ScoreDetails(object): """ Represents a type of score whose progress we want to track over time. """
[docs] def __init__( self, name: str, scorefunc: Callable[["Task"], Union[None, int, float]], minimum: int, maximum: int, higher_score_is_better: bool = False, ) -> None: """ Args: name: human-friendly name of this score scorefunc: function that can be called with a task instance as its sole parameter and which will return a numerical score (or ``None``) minimum: minimum possible value of this score (for display purposes) maximum: maximum possible value of this score (for display purposes) higher_score_is_better: is a higher score a better thing? """ self.name = name self.scorefunc = scorefunc self.minimum = minimum self.maximum = maximum self.higher_score_is_better = higher_score_is_better
[docs] def calculate_improvement( self, first_score: float, latest_score: float ) -> float: """ Improvement is positive. So if higher scores are better, returns ``latest - first``; otherwise returns ``first - latest``. """ if self.higher_score_is_better: return latest_score - first_score else: return first_score - latest_score
[docs]class AverageScoreReport(DateTimeFilteredReportMixin, Report, ABC): """ Used by MAAS, CORE-10 and PBQ to report average scores and progress """ template_name = "average_score_report.mako"
[docs] def __init__(self, *args, via_index: bool = True, **kwargs) -> None: """ Args: via_index: set this to ``False`` for unit test when you don't want to have to build a dummy task index. """ super().__init__(*args, **kwargs) self.via_index = via_index
# noinspection PyMethodParameters @classproperty def superuser_only(cls) -> bool: return False # noinspection PyMethodParameters @classproperty def task_class(cls) -> Type["Task"]: raise NotImplementedError("Report did not implement task_class") # noinspection PyMethodParameters @classmethod def scoretypes(cls, req: "CamcopsRequest") -> List[ScoreDetails]: raise NotImplementedError("Report did not implement 'scoretypes'")
[docs] @staticmethod def no_data_value() -> Any: """ The value used for a "no data" cell. The only reason this is accessible outside this class is for unit testing. """ return ""
def render_html(self, req: "CamcopsRequest") -> Response: pages = self.get_spreadsheet_pages(req) return render_to_response( self.template_name, dict( title=self.title(req), mainpage=pages[0], datepage=pages[1], report_id=self.report_id, ), request=req, )
[docs] def get_spreadsheet_pages( self, req: "CamcopsRequest" ) -> List[SpreadsheetPage]: """ We use an SQLAlchemy ORM, rather than Core, method. Why? - "Patient equality" is complex (e.g. same patient_id on same device, or a shared ID number, etc.) -- simplicity via Patient.__eq__. - Facilities "is task complete?" checks, and use of Python calculations. """ _ = req.gettext from camcops_server.cc_modules.cc_taskcollection import ( TaskCollection, task_when_created_sorter, ) # delayed import from camcops_server.cc_modules.cc_taskfilter import ( TaskFilter, ) # delayed import # Which tasks? taskfilter = TaskFilter() taskfilter.task_types = [self.task_class.__tablename__] taskfilter.start_datetime = self.start_datetime taskfilter.end_datetime = self.end_datetime taskfilter.complete_only = True # Get tasks collection = TaskCollection( req=req, taskfilter=taskfilter, current_only=True, via_index=self.via_index, ) all_tasks = collection.all_tasks # Get all distinct patients patients = set(t.patient for t in all_tasks) # log.debug("all_tasks: {}", all_tasks) # log.debug("patients: {}", [str(p) for p in patients]) scoretypes = self.scoretypes(req) n_scoretypes = len(scoretypes) # Sum first/last/progress scores by patient sum_first_by_score = [0] * n_scoretypes sum_last_by_score = [0] * n_scoretypes sum_improvement_by_score = [0] * n_scoretypes n_first = 0 n_last = 0 # also n_progress for patient in patients: # Find tasks for this patient patient_tasks = [t for t in all_tasks if t.patient == patient] assert patient_tasks, f"No tasks for patient {patient}" # log.debug("For patient {}, tasks: {}", patient, patient_tasks) # Find first and last task (last may be absent) patient_tasks.sort(key=task_when_created_sorter) first = patient_tasks[0] n_first += 1 if len(patient_tasks) > 1: last = patient_tasks[-1] n_last += 1 else: last = None # Obtain first/last scores and progress for scoreidx, scoretype in enumerate(scoretypes): firstscore = scoretype.scorefunc(first) # Scores should not be None, because all tasks are complete. sum_first_by_score[scoreidx] += firstscore if last: lastscore = scoretype.scorefunc(last) sum_last_by_score[scoreidx] += lastscore improvement = scoretype.calculate_improvement( firstscore, lastscore ) sum_improvement_by_score[scoreidx] += improvement # Format output column_names = [ _("Number of initial records"), _("Number of latest subsequent records"), ] row = [n_first, n_last] no_data = self.no_data_value() for scoreidx, scoretype in enumerate(scoretypes): # Calculations if n_first == 0: avg_first = no_data else: avg_first = sum_first_by_score[scoreidx] / n_first if n_last == 0: avg_last = no_data avg_improvement = no_data else: avg_last = sum_last_by_score[scoreidx] / n_last avg_improvement = sum_improvement_by_score[scoreidx] / n_last # Columns and row data column_names += [ f"{scoretype.name} ({scoretype.minimum}{scoretype.maximum}): " f"{_('First')}", f"{scoretype.name} ({scoretype.minimum}{scoretype.maximum}): " f"{_('Latest')}", f"{scoretype.name}: {_('Improvement')}", ] row += [avg_first, avg_last, avg_improvement] # Create and return report mainpage = self.get_spreadsheet_page( name=self.title(req), column_names=column_names, rows=[row] ) datepage = self.get_spreadsheet_page( name=_("Date filters"), column_names=[_("Start date"), _("End date")], rows=[[str(self.start_datetime), str(self.end_datetime)]], ) return [mainpage, datepage]
# ============================================================================= # Report framework # =============================================================================
[docs]def get_all_report_classes(req: "CamcopsRequest") -> List[Type["Report"]]: """ Returns all :class:`Report` (sub)classes, i.e. all report types. """ classes = Report.all_subclasses() classes.sort(key=lambda c: c.title(req)) return classes
[docs]def get_report_instance(report_id: str) -> Optional[Report]: """ Creates an instance of a :class:`Report`, given its ID (name), or return ``None`` if the ID is invalid. """ if not report_id: return None for cls in Report.all_subclasses(): if cls.report_id == report_id: return cls() return None