#!/usr/bin/env python
"""
camcops_server/cc_modules/cc_tracker.py
===============================================================================
Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CamCOPS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**Trackers, showing numerical information over time, and clinical text views,
showing text that a clinician might care about.**
"""
import logging
from typing import Any, Dict, List, Optional, Set, Tuple, TYPE_CHECKING
from cardinal_pythonlib.datetimefunc import format_datetime
from cardinal_pythonlib.logs import BraceStyleAdapter
from pendulum import DateTime as Pendulum
from pyramid.renderers import render
from camcops_server.cc_modules.cc_audit import audit
from camcops_server.cc_modules.cc_constants import (
CssClass,
CSS_PAGED_MEDIA,
DateFormat,
MatplotlibConstants,
PlotDefaults,
)
from camcops_server.cc_modules.cc_filename import get_export_filename
from camcops_server.cc_modules.cc_plot import matplotlib
from camcops_server.cc_modules.cc_pdf import pdf_from_html
from camcops_server.cc_modules.cc_pyramid import ViewArg, ViewParam
from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions
from camcops_server.cc_modules.cc_task import Task
from camcops_server.cc_modules.cc_taskcollection import (
TaskCollection,
TaskFilter,
TaskSortMethod,
)
from camcops_server.cc_modules.cc_xml import (
get_xml_document,
XmlDataTypes,
XmlElement,
)
import matplotlib.dates # delayed until after the cc_plot import
if TYPE_CHECKING:
from camcops_server.cc_modules.cc_patient import Patient # noqa: F401
from camcops_server.cc_modules.cc_patientidnum import ( # noqa: F401
PatientIdNum,
)
from camcops_server.cc_modules.cc_request import ( # noqa: F401
CamcopsRequest,
)
from camcops_server.cc_modules.cc_trackerhelpers import ( # noqa: F401
TrackerInfo,
)
log = BraceStyleAdapter(logging.getLogger(__name__))
# =============================================================================
# Constants
# =============================================================================
TRACKER_DATEFORMAT = "%Y-%m-%d"
WARNING_NO_PATIENT_FOUND = f"""
<div class="{CssClass.WARNING}">
</div>
"""
WARNING_DENIED_INFORMATION = f"""
<div class="{CssClass.WARNING}">
Other tasks exist for this patient that you do not have access to view.
</div>
"""
DEBUG_TRACKER_TASK_INCLUSION = False # should be False for production system
# =============================================================================
# Helper functions
# =============================================================================
# http://stackoverflow.com/questions/11788195
[docs]def consistency(
req: "CamcopsRequest",
values: List[Any],
servervalue: Any = None,
case_sensitive: bool = True,
) -> Tuple[bool, str]:
"""
Checks for consistency in a set of values (e.g. names, dates of birth).
(ID numbers are done separately via :func:`consistency_idnums`.)
The list of values (with the ``servervalue`` appended, if not ``None``) is
checked to ensure that it contains only one unique value (ignoring ``None``
values or empty ``""`` values).
Returns:
the tuple ``consistent, msg``, where ``consistent`` is a bool and
``msg`` is a descriptive HTML message
"""
if case_sensitive:
vallist = [str(v) if v is not None else v for v in values]
if servervalue is not None:
vallist.append(str(servervalue))
else:
vallist = [str(v).upper() if v is not None else v for v in values]
if servervalue is not None:
vallist.append(str(servervalue).upper())
# Replace "" with None, so we only have a single "not-present" value
vallist = [None if x == "" else x for x in vallist]
unique = list(set(vallist))
_ = req.gettext
if len(unique) == 0:
return True, _("consistent (no values)")
if len(unique) == 1:
return True, f"{_('consistent')} ({unique[0]})"
if len(unique) == 2:
if None in unique:
return (
True,
(
f"{_('consistent')} "
f"({_('all blank or')} {unique[1 - unique.index(None)]})"
),
)
return (
False,
(
f"<b>{_('INCONSISTENT')} "
f"({_('contains values')} {', '.join(unique)})</b>"
),
)
[docs]def consistency_idnums(
req: "CamcopsRequest", idnum_lists: List[List["PatientIdNum"]]
) -> Tuple[bool, str]:
"""
Checks the consistency of a set of :class:`PatientIdNum` objects.
"Are all these records from the same patient?"
Args:
req:
a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
idnum_lists:
a list of lists (one per task/patient instance) of
:class:`PatientIdNum` objects
Returns:
the tuple ``consistent, msg``, where ``consistent`` is a bool and
``msg`` is a descriptive HTML message
"""
# 1. Generate "known", mapping which_idnum -> set of observed non-NULL
# idnum_values
known = {} # type: Dict[int, Set[int]]
for task_idnum_list in idnum_lists:
for idnum in task_idnum_list:
idnum_value = idnum.idnum_value
if idnum_value is not None:
which_idnum = idnum.which_idnum
if which_idnum not in known:
known[which_idnum] = set() # type: Set[int]
known[which_idnum].add(idnum_value)
# 2. For every observed which_idnum, was it observed in all tasks?
present_in_all = {} # type: Dict[int, bool]
for which_idnum in known.keys():
present_for_all_tasks = all(
# "For all tasks..."
(
# "At least one ID number record relates to this which_idnum".
any(
idnum.which_idnum == which_idnum
and idnum.idnum_value is not None
)
for idnum in task_idnum_list
)
for task_idnum_list in idnum_lists
)
present_in_all[which_idnum] = present_for_all_tasks
# 3. Summarize
failures = [] # type: List[str]
successes = [] # type: List[str]
_ = req.gettext
for which_idnum, encountered_values in known.items():
value_str = ", ".join(str(v) for v in sorted(list(encountered_values)))
if len(encountered_values) > 1:
failures.append(
f"idnum{which_idnum} {_('contains values')} {value_str}"
)
else:
if present_in_all[which_idnum]:
successes.append(
f"idnum{which_idnum} {_('consistent')} ({value_str})"
)
else:
successes.append(
f"idnum{which_idnum} {_('all blank or')} {value_str}"
)
if failures:
return (
False,
(
f"<b>{_('INCONSISTENT')} "
f"({'; '.join(failures + successes)})</b>"
),
)
else:
return True, f"{_('consistent')} ({'; '.join(successes)})"
# =============================================================================
# ConsistencyInfo class
# =============================================================================
[docs]class ConsistencyInfo(object):
"""
Represents ID consistency information about a set of tasks.
"""
[docs] def __init__(self, req: "CamcopsRequest", tasklist: List[Task]) -> None:
"""
Initialize values, from a list of task instances.
"""
self.request = req
self.consistent_forename, self.msg_forename = consistency(
req,
[task.get_patient_forename() for task in tasklist],
servervalue=None,
case_sensitive=False,
)
self.consistent_surname, self.msg_surname = consistency(
req,
[task.get_patient_surname() for task in tasklist],
servervalue=None,
case_sensitive=False,
)
self.consistent_dob, self.msg_dob = consistency(
req, [task.get_patient_dob_first11chars() for task in tasklist]
)
self.consistent_sex, self.msg_sex = consistency(
req, [task.get_patient_sex() for task in tasklist]
)
self.consistent_idnums, self.msg_idnums = consistency_idnums(
req, [task.get_patient_idnum_objects() for task in tasklist]
)
self.all_consistent = (
self.consistent_forename
and self.consistent_surname
and self.consistent_dob
and self.consistent_sex
and self.consistent_idnums
)
[docs] def are_all_consistent(self) -> bool:
"""
Is all the ID information consistent?
"""
return self.all_consistent
[docs] def get_description_list(self) -> List[str]:
"""
Textual representation of ID information, indicating consistency or
lack of it.
"""
_ = self.request.gettext
cons = [
f"{_('Forename:')} {self.msg_forename}",
f"{_('Surname:')} {self.msg_surname}",
f"{_('DOB:')} {self.msg_dob}",
f"{_('Sex:')} {self.msg_sex}",
f"{_('ID numbers:')} {self.msg_idnums}",
]
return cons
[docs] def get_xml_root(self) -> XmlElement:
"""
XML tree (as root :class:`camcops_server.cc_modules.cc_xml.XmlElement`)
of consistency information.
"""
branches = [
XmlElement(
name="all_consistent",
value=self.are_all_consistent(),
datatype="boolean",
)
]
for c in self.get_description_list():
branches.append(XmlElement(name="consistency_check", value=c))
return XmlElement(name="_consistency", value=branches)
# =============================================================================
# TrackerCtvCommon class:
# =============================================================================
[docs]class TrackerCtvCommon(object):
"""
Base class for :class:`camcops_server.cc_modules.cc_tracker.Tracker` and
:class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`.
"""
[docs] def __init__(
self,
req: "CamcopsRequest",
taskfilter: TaskFilter,
as_ctv: bool,
via_index: bool = True,
) -> None:
"""
Initialize, fetching applicable tasks.
"""
# Record input variables at this point (for URL regeneration)
self.req = req
self.taskfilter = taskfilter
self.as_ctv = as_ctv
assert taskfilter.tasks_with_patient_only
self.collection = TaskCollection(
req=req,
taskfilter=taskfilter,
sort_method_by_class=TaskSortMethod.CREATION_DATE_ASC,
sort_method_global=TaskSortMethod.CREATION_DATE_ASC,
via_index=via_index,
)
all_tasks = self.collection.all_tasks
if all_tasks:
self.earliest = all_tasks[0].when_created
self.latest = all_tasks[-1].when_created
self.patient = all_tasks[0].patient
else:
self.earliest = None # type: Optional[Pendulum]
self.latest = None # type: Optional[Pendulum]
self.patient = None # type: Optional[Patient]
# Summary information
self.summary = ""
if DEBUG_TRACKER_TASK_INCLUSION:
first = True
for cls in self.taskfilter.task_classes:
if not first:
self.summary += " // "
self.summary += cls.tablename
first = False
task_instances = self.collection.tasks_for_task_class(cls)
if not task_instances:
if DEBUG_TRACKER_TASK_INCLUSION:
self.summary += " (no instances)"
continue
for task in task_instances:
if DEBUG_TRACKER_TASK_INCLUSION:
self.summary += f" / PK {task.pk}"
self.summary += " ~~~ "
self.summary += " — ".join(
[
"; ".join(
[
f"({task.tablename},{task.pk},"
f"{task.get_patient_server_pk()})"
for task in self.collection.tasks_for_task_class(cls)
]
)
for cls in self.taskfilter.task_classes
]
)
# Consistency information
self.consistency_info = ConsistencyInfo(req, all_tasks)
# -------------------------------------------------------------------------
# Required for implementation
# -------------------------------------------------------------------------
[docs] def get_xml(
self,
indent_spaces: int = 4,
eol: str = "\n",
include_comments: bool = False,
) -> str:
"""
Returns an XML representation.
Args:
indent_spaces: number of spaces to indent formatted XML
eol: end-of-line string
include_comments: include comments describing each field?
Returns:
an XML UTF-8 document representing our object.
"""
raise NotImplementedError("implement in subclass")
def _get_html(self) -> str:
"""
Returns an HTML representation.
"""
raise NotImplementedError("implement in subclass")
def _get_pdf_html(self) -> str:
"""
Returns HTML used for making PDFs.
"""
raise NotImplementedError("implement in subclass")
# -------------------------------------------------------------------------
# XML view
# -------------------------------------------------------------------------
def _get_xml(
self,
audit_string: str,
xml_name: str,
indent_spaces: int = 4,
eol: str = "\n",
include_comments: bool = False,
) -> str:
"""
Returns an XML document representing this object.
Args:
audit_string: description used to audit access to this information
xml_name: name of the root XML element
indent_spaces: number of spaces to indent formatted XML
eol: end-of-line string
include_comments: include comments describing each field?
Returns:
an XML UTF-8 document representing the task.
"""
iddef = self.taskfilter.get_only_iddef()
if not iddef:
raise ValueError(
"Tracker/CTV doesn't have a single ID number " "criterion"
)
branches = [
self.consistency_info.get_xml_root(),
XmlElement(
name="_search_criteria",
value=[
XmlElement(
name="task_tablename_list",
value=",".join(self.taskfilter.task_tablename_list),
),
XmlElement(
name=ViewParam.WHICH_IDNUM,
value=iddef.which_idnum,
datatype=XmlDataTypes.INTEGER,
),
XmlElement(
name=ViewParam.IDNUM_VALUE,
value=iddef.idnum_value,
datatype=XmlDataTypes.INTEGER,
),
XmlElement(
name=ViewParam.START_DATETIME,
value=format_datetime(
self.taskfilter.start_datetime, DateFormat.ISO8601
),
datatype=XmlDataTypes.DATETIME,
),
XmlElement(
name=ViewParam.END_DATETIME,
value=format_datetime(
self.taskfilter.end_datetime, DateFormat.ISO8601
),
datatype=XmlDataTypes.DATETIME,
),
],
),
]
options = TaskExportOptions(
xml_include_plain_columns=True,
xml_include_calculated=True,
include_blobs=False,
)
for t in self.collection.all_tasks:
branches.append(t.get_xml_root(self.req, options))
audit(
self.req,
audit_string,
table=t.tablename,
server_pk=t.pk,
patient_server_pk=t.get_patient_server_pk(),
)
tree = XmlElement(name=xml_name, value=branches)
return get_xml_document(
tree,
indent_spaces=indent_spaces,
eol=eol,
include_comments=include_comments,
)
# -------------------------------------------------------------------------
# HTML view
# -------------------------------------------------------------------------
[docs] def get_html(self) -> str:
"""
Get HTML representing this object.
"""
self.req.prepare_for_html_figures()
return self._get_html()
# -------------------------------------------------------------------------
# PDF view
# -------------------------------------------------------------------------
[docs] def get_pdf_html(self) -> str:
"""
Returns HTML to be made into a PDF representing this object.
"""
self.req.prepare_for_pdf_figures()
return self._get_pdf_html()
[docs] def get_pdf(self) -> bytes:
"""
Get PDF representing tracker/CTV.
"""
req = self.req
html = self.get_pdf_html() # main content
if CSS_PAGED_MEDIA:
return pdf_from_html(req, html)
else:
return pdf_from_html(
req,
html=html,
header_html=render(
"wkhtmltopdf_header.mako",
dict(
inner_text=render(
"tracker_ctv_header.mako",
dict(tracker=self),
request=req,
)
),
request=req,
),
footer_html=render(
"wkhtmltopdf_footer.mako",
dict(
inner_text=render(
"tracker_ctv_footer.mako",
dict(tracker=self),
request=req,
)
),
request=req,
),
extra_wkhtmltopdf_options={"orientation": "Portrait"},
)
[docs] def suggested_pdf_filename(self) -> str:
"""
Get suggested filename for tracker/CTV PDF.
"""
cfg = self.req.config
return get_export_filename(
req=self.req,
patient_spec_if_anonymous=cfg.patient_spec_if_anonymous,
patient_spec=cfg.patient_spec,
filename_spec=cfg.ctv_filename_spec
if self.as_ctv
else cfg.tracker_filename_spec, # noqa
filetype=ViewArg.PDF,
is_anonymous=self.patient is None,
surname=self.patient.get_surname() if self.patient else "",
forename=self.patient.get_forename() if self.patient else "",
dob=self.patient.get_dob() if self.patient else None,
sex=self.patient.get_sex() if self.patient else None,
idnum_objects=self.patient.get_idnum_objects()
if self.patient
else None, # noqa
creation_datetime=None,
basetable=None,
serverpk=None,
)
# =============================================================================
# Tracker class
# =============================================================================
[docs]class Tracker(TrackerCtvCommon):
"""
Class representing a numerical tracker.
"""
[docs] def __init__(
self,
req: "CamcopsRequest",
taskfilter: TaskFilter,
via_index: bool = True,
) -> None:
super().__init__(
req=req, taskfilter=taskfilter, as_ctv=False, via_index=via_index
)
[docs] def get_xml(
self,
indent_spaces: int = 4,
eol: str = "\n",
include_comments: bool = False,
) -> str:
return self._get_xml(
audit_string="Tracker XML accessed",
xml_name="tracker",
indent_spaces=indent_spaces,
eol=eol,
include_comments=include_comments,
)
def _get_html(self) -> str:
return render(
"tracker.mako",
dict(tracker=self, viewtype=ViewArg.HTML),
request=self.req,
)
def _get_pdf_html(self) -> str:
return render(
"tracker.mako",
dict(tracker=self, pdf_landscape=False, viewtype=ViewArg.PDF),
request=self.req,
)
# -------------------------------------------------------------------------
# Plotting
# -------------------------------------------------------------------------
[docs] def get_all_plots_for_one_task_html(self, tasks: List[Task]) -> str:
"""
HTML for all plots for a given task type.
"""
html = ""
ntasks = len(tasks)
if ntasks == 0:
return html
if not tasks[0].provides_trackers:
# ask the first of the task instances
return html
alltrackers = [task.get_trackers(self.req) for task in tasks]
datetimes = [task.get_creation_datetime() for task in tasks]
ntrackers = len(alltrackers[0])
# ... number of trackers supplied by the first task (and all tasks)
for tracker in range(ntrackers):
values = [
alltrackers[tasknum][tracker].value
for tasknum in range(ntasks)
]
html += self.get_single_plot_html(
datetimes, values, specimen_tracker=alltrackers[0][tracker]
)
for task in tasks:
audit(
self.req,
"Tracker data accessed",
table=task.tablename,
server_pk=task.pk,
patient_server_pk=task.get_patient_server_pk(),
)
return html
[docs] def get_single_plot_html(
self,
datetimes: List[Pendulum],
values: List[Optional[float]],
specimen_tracker: "TrackerInfo",
) -> str:
"""
HTML for a single figure.
"""
nonblank_values = [x for x in values if x is not None]
# NB DIFFERENT to list(filter(None, values)), which implements the
# test "if x", not "if x is not None" -- thus eliminating zero values!
# We don't want that.
if not nonblank_values:
return ""
plot_label = specimen_tracker.plot_label
axis_label = specimen_tracker.axis_label
axis_min = specimen_tracker.axis_min
axis_max = specimen_tracker.axis_max
axis_ticks = specimen_tracker.axis_ticks
horizontal_lines = specimen_tracker.horizontal_lines
horizontal_labels = specimen_tracker.horizontal_labels
aspect_ratio = specimen_tracker.aspect_ratio
figsize = (
PlotDefaults.FULLWIDTH_PLOT_WIDTH,
(1.0 / float(aspect_ratio)) * PlotDefaults.FULLWIDTH_PLOT_WIDTH,
)
fig = self.req.create_figure(figsize=figsize)
ax = fig.add_subplot(MatplotlibConstants.WHOLE_PANEL)
x = [matplotlib.dates.date2num(t) for t in datetimes]
datelabels = [dt.strftime(TRACKER_DATEFORMAT) for dt in datetimes]
# Plot lines and markers (on top of lines)
ax.plot(
x, # x
values, # y
color=MatplotlibConstants.COLOUR_BLUE, # line colour
linestyle=MatplotlibConstants.LINESTYLE_SOLID,
marker=MatplotlibConstants.MARKER_PLUS, # point shape
markeredgecolor=MatplotlibConstants.COLOUR_RED, # point colour
markerfacecolor=MatplotlibConstants.COLOUR_RED, # point colour
label=None,
zorder=PlotDefaults.ZORDER_DATA_LINES_POINTS,
)
# x axis
ax.set_xlabel("Date/time", fontdict=self.req.fontdict)
ax.set_xticks(x)
ax.set_xticklabels(datelabels, fontdict=self.req.fontdict)
if (
self.earliest is not None
and self.latest is not None
and self.earliest != self.latest
):
xlim = matplotlib.dates.date2num((self.earliest, self.latest))
margin = (2.5 / 95.0) * (xlim[1] - xlim[0])
xlim[0] -= margin
xlim[1] += margin
ax.set_xlim(xlim)
xlim = ax.get_xlim()
fig.autofmt_xdate(rotation=90)
# ... autofmt_xdate must be BEFORE twinx:
# http://stackoverflow.com/questions/8332395
if axis_ticks is not None and len(axis_ticks) > 0:
tick_positions = [m.y for m in axis_ticks]
tick_labels = [m.label for m in axis_ticks]
ax.set_yticks(tick_positions)
ax.set_yticklabels(tick_labels, fontdict=self.req.fontdict)
# y axis
ax.set_ylabel(axis_label, fontdict=self.req.fontdict)
axis_min = (
min(axis_min, min(nonblank_values))
if axis_min is not None
else min(nonblank_values)
)
axis_max = (
max(axis_max, max(nonblank_values))
if axis_max is not None
else max(nonblank_values)
)
# ... the supplied values are stretched if the data are outside them
# ... but min(something, None) is None, so beware
# If we get something with no sense of scale whatsoever, then what
# we do is arbitrary. Matplotlib does its own thing, but we could do:
if axis_min == axis_max:
if axis_min == 0:
axis_min, axis_min = -1.0, 1.0
else:
singlevalue = axis_min
axis_min = 0.9 * singlevalue
axis_max = 1.1 * singlevalue
if axis_min > axis_max:
axis_min, axis_max = axis_max, axis_min
ax.set_ylim(axis_min, axis_max)
# title
ax.set_title(plot_label, fontdict=self.req.fontdict)
# Horizontal lines
stupid_jitter = 0.001
if horizontal_lines is not None:
for y in horizontal_lines:
ax.plot(
xlim, # x
[y, y + stupid_jitter], # y
color=MatplotlibConstants.COLOUR_GREY_50,
linestyle=MatplotlibConstants.LINESTYLE_DOTTED,
zorder=PlotDefaults.ZORDER_PRESET_LINES,
)
# PROBLEM: horizontal lines becoming invisible
# (whether from ax.axhline or plot)
# Horizontal labels
if horizontal_labels is not None:
label_left = xlim[0] + 0.01 * (xlim[1] - xlim[0])
for lab in horizontal_labels:
y = lab.y
l_ = lab.label
va = lab.vertical_alignment.value
ax.text(
label_left, # x
y, # y
l_, # text
verticalalignment=va,
# alpha=0.5,
# ... was "0.5" rather than 0.5, which led to a
# tricky-to-find "TypeError: a float is required" exception
# after switching to Python 3.
# ... and switched to grey colour with zorder on 2020-06-28
# after wkhtmltopdf 0.12.5 had problems rendering
# opacity=0.5 with SVG lines
color=MatplotlibConstants.COLOUR_GREY_50,
fontdict=self.req.fontdict,
zorder=PlotDefaults.ZORDER_PRESET_LABELS,
)
self.req.set_figure_font_sizes(ax)
fig.tight_layout()
# ... stop the labels dropping off
# (only works properly for LEFT labels...)
# http://matplotlib.org/faq/howto_faq.html
# ... tried it - didn't work (internal numbers change fine,
# check the logger, but visually doesn't help)
# - http://stackoverflow.com/questions/9126838
# - http://matplotlib.org/examples/pylab_examples/finance_work2.html
return self.req.get_html_from_pyplot_figure(fig) + "<br>"
# ... extra line break for the PDF rendering
# =============================================================================
# ClinicalTextView class
# =============================================================================
[docs]class ClinicalTextView(TrackerCtvCommon):
"""
Class representing a clinical text view.
"""
[docs] def __init__(
self,
req: "CamcopsRequest",
taskfilter: TaskFilter,
via_index: bool = True,
) -> None:
super().__init__(
req=req, taskfilter=taskfilter, as_ctv=True, via_index=via_index
)
[docs] def get_xml(
self,
indent_spaces: int = 4,
eol: str = "\n",
include_comments: bool = False,
) -> str:
return self._get_xml(
audit_string="Clinical text view XML accessed",
xml_name="ctv",
indent_spaces=indent_spaces,
eol=eol,
include_comments=include_comments,
)
def _get_html(self) -> str:
return render(
"ctv.mako",
dict(tracker=self, viewtype=ViewArg.HTML),
request=self.req,
)
def _get_pdf_html(self) -> str:
return render(
"ctv.mako",
dict(tracker=self, pdf_landscape=False, viewtype=ViewArg.PDF),
request=self.req,
)