"""
camcops_server/tasks/cardinal_expdetthreshold.py
===============================================================================
Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CamCOPS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
"""
import math
import logging
from typing import List, Optional, Tuple, Type
from cardinal_pythonlib.maths_numpy import inv_logistic, logistic
import cardinal_pythonlib.rnc_web as ws
from matplotlib.figure import Figure
import numpy as np
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import Float, Integer, Text, UnicodeText
from camcops_server.cc_modules.cc_constants import (
CssClass,
MatplotlibConstants,
PlotDefaults,
)
from camcops_server.cc_modules.cc_db import (
ancillary_relationship,
GenericTabletRecordMixin,
TaskDescendant,
)
from camcops_server.cc_modules.cc_html import get_yes_no_none, tr_qa
from camcops_server.cc_modules.cc_request import CamcopsRequest
from camcops_server.cc_modules.cc_sqla_coltypes import (
CamcopsColumn,
PendulumDateTimeAsIsoTextColType,
)
from camcops_server.cc_modules.cc_sqlalchemy import Base
from camcops_server.cc_modules.cc_task import Task, TaskHasPatientMixin
from camcops_server.cc_modules.cc_text import SS
log = logging.getLogger(__name__)
LOWER_MARKER = 0.25
UPPER_MARKER = 0.75
EQUATION_COMMENT = (
"logits: L(X) = intercept + slope * X; "
"probability: P = 1 / (1 + exp(-intercept - slope * X))"
)
MODALITY_AUDITORY = 0
MODALITY_VISUAL = 1
DP = 3
# =============================================================================
# CardinalExpDetThreshold
# =============================================================================
[docs]class CardinalExpDetThresholdTrial(
GenericTabletRecordMixin, TaskDescendant, Base
):
__tablename__ = "cardinal_expdetthreshold_trials"
cardinal_expdetthreshold_id = Column(
"cardinal_expdetthreshold_id",
Integer,
nullable=False,
comment="FK to CardinalExpDetThreshold",
)
trial = Column(
"trial", Integer, nullable=False, comment="Trial number (0-based)"
)
# Results
trial_ignoring_catch_trials = Column(
"trial_ignoring_catch_trials",
Integer,
comment="Trial number, ignoring catch trials (0-based)",
)
target_presented = Column(
"target_presented", Integer, comment="Target presented? (0 no, 1 yes)"
)
target_time = Column(
"target_time",
PendulumDateTimeAsIsoTextColType,
comment="Target presentation time (ISO-8601)",
)
intensity = Column(
"intensity", Float, comment="Target intensity (0.0-1.0)"
)
choice_time = Column(
"choice_time",
PendulumDateTimeAsIsoTextColType,
comment="Time choice offered (ISO-8601)",
)
responded = Column(
"responded", Integer, comment="Responded? (0 no, 1 yes)"
)
response_time = Column(
"response_time",
PendulumDateTimeAsIsoTextColType,
comment="Time of response (ISO-8601)",
)
response_latency_ms = Column(
"response_latency_ms", Integer, comment="Response latency (ms)"
)
yes = Column(
"yes", Integer, comment="Subject chose YES? (0 didn't, 1 did)"
)
no = Column("no", Integer, comment="Subject chose NO? (0 didn't, 1 did)")
caught_out_reset = Column(
"caught_out_reset",
Integer,
comment="Caught out on catch trial, thus reset? (0 no, 1 yes)",
)
trial_num_in_calculation_sequence = Column(
"trial_num_in_calculation_sequence",
Integer,
comment="Trial number as used for threshold calculation",
)
@classmethod
def get_html_table_header(cls) -> str:
return f"""
<table class="{CssClass.EXTRADETAIL}">
<tr>
<th>Trial# (0-based)</th>
<th>Trial# (ignoring catch trials) (0-based)</th>
<th>Target presented?</th>
<th>Target time</th>
<th>Intensity</th>
<th>Choice time</th>
<th>Responded?</th>
<th>Response time</th>
<th>Response latency (ms)</th>
<th>Yes?</th>
<th>No?</th>
<th>Caught out (and reset)?</th>
<th>Trial# in calculation sequence</th>
</tr>
"""
def get_html_table_row(self) -> str:
return ("<tr>" + "<td>{}</td>" * 13 + "</th>").format(
self.trial,
self.trial_ignoring_catch_trials,
self.target_presented,
self.target_time,
ws.number_to_dp(self.intensity, DP),
self.choice_time,
self.responded,
self.response_time,
self.response_latency_ms,
self.yes,
self.no,
ws.webify(self.caught_out_reset),
ws.webify(self.trial_num_in_calculation_sequence),
)
# -------------------------------------------------------------------------
# TaskDescendant overrides
# -------------------------------------------------------------------------
@classmethod
def task_ancestor_class(cls) -> Optional[Type["Task"]]:
return CardinalExpDetThreshold
[docs] def task_ancestor(self) -> Optional["CardinalExpDetThreshold"]:
return CardinalExpDetThreshold.get_linked(
self.cardinal_expdetthreshold_id, self
)
[docs]class CardinalExpDetThreshold(TaskHasPatientMixin, Task):
"""
Server implementation of the Cardinal_ExpDetThreshold task.
"""
__tablename__ = "cardinal_expdetthreshold"
shortname = "Cardinal_ExpDetThreshold"
use_landscape_for_pdf = True
# Config
modality = Column(
"modality", Integer, comment="Modality (0 auditory, 1 visual)"
)
target_number = Column(
"target_number",
Integer,
comment="Target number (within available targets of that modality)",
)
background_filename = CamcopsColumn(
"background_filename",
Text,
exempt_from_anonymisation=True,
comment="Filename of media used for background",
)
target_filename = CamcopsColumn(
"target_filename",
Text,
exempt_from_anonymisation=True,
comment="Filename of media used for target",
)
visual_target_duration_s = Column(
"visual_target_duration_s", Float, comment="Visual target duration (s)"
)
background_intensity = Column(
"background_intensity",
Float,
comment="Intensity of background (0.0-1.0)",
)
start_intensity_min = Column(
"start_intensity_min",
Float,
comment="Minimum starting intensity (0.0-1.0)",
)
start_intensity_max = Column(
"start_intensity_max",
Float,
comment="Maximum starting intensity (0.0-1.0)",
)
initial_large_intensity_step = Column(
"initial_large_intensity_step",
Float,
comment="Initial, large, intensity step (0.0-1.0)",
)
main_small_intensity_step = Column(
"main_small_intensity_step",
Float,
comment="Main, small, intensity step (0.0-1.0)",
)
num_trials_in_main_sequence = Column(
"num_trials_in_main_sequence",
Integer,
comment="Number of trials required in main sequence",
)
p_catch_trial = Column(
"p_catch_trial", Float, comment="Probability of catch trial"
)
prompt = CamcopsColumn(
"prompt",
UnicodeText,
exempt_from_anonymisation=True,
comment="Prompt given to subject",
)
iti_s = Column("iti_s", Float, comment="Intertrial interval (s)")
# Results
finished = Column(
"finished",
Integer,
comment="Subject finished successfully (0 no, 1 yes)",
)
intercept = Column("intercept", Float, comment=EQUATION_COMMENT)
slope = Column("slope", Float, comment=EQUATION_COMMENT)
k = Column("k", Float, comment=EQUATION_COMMENT + "; k = slope")
theta = Column(
"theta",
Float,
comment=EQUATION_COMMENT + "; theta = -intercept/k = -intercept/slope",
)
# Relationships
trials = ancillary_relationship(
parent_class_name="CardinalExpDetThreshold",
ancillary_class_name="CardinalExpDetThresholdTrial",
ancillary_fk_to_parent_attr_name="cardinal_expdetthreshold_id",
ancillary_order_by_attr_name="trial",
) # type: List[CardinalExpDetThresholdTrial]
[docs] @staticmethod
def longname(req: "CamcopsRequest") -> str:
_ = req.gettext
return _(
"Cardinal RN – Threshold determination for "
"Expectation–Detection task"
)
[docs] def is_complete(self) -> bool:
return bool(self.finished)
def _get_figures(
self, req: CamcopsRequest
) -> Tuple[Figure, Optional[Figure]]:
"""
Create and return figures. Returns ``trialfig, fitfig``.
"""
trialarray = self.trials
# Constants
jitter_step = 0.02
dp_to_consider_same_for_jitter = 3
y_extra_space = 0.1
x_extra_space = 0.02
figsize = (
PlotDefaults.FULLWIDTH_PLOT_WIDTH / 2,
PlotDefaults.FULLWIDTH_PLOT_WIDTH / 2,
)
# Figure and axes
trialfig = req.create_figure(figsize=figsize)
trialax = trialfig.add_subplot(MatplotlibConstants.WHOLE_PANEL)
fitfig = None # type: Optional[Figure]
# Anything to do?
if not trialarray:
return trialfig, fitfig
# Data
notcalc_detected_x = []
notcalc_detected_y = []
notcalc_missed_x = []
notcalc_missed_y = []
calc_detected_x = []
calc_detected_y = []
calc_missed_x = []
calc_missed_y = []
catch_detected_x = []
catch_detected_y = []
catch_missed_x = []
catch_missed_y = []
all_x = []
all_y = []
for t in trialarray:
x = t.trial
y = t.intensity
all_x.append(x)
all_y.append(y)
if t.trial_num_in_calculation_sequence is not None:
if t.yes:
calc_detected_x.append(x)
calc_detected_y.append(y)
else:
calc_missed_x.append(x)
calc_missed_y.append(y)
elif t.target_presented:
if t.yes:
notcalc_detected_x.append(x)
notcalc_detected_y.append(y)
else:
notcalc_missed_x.append(x)
notcalc_missed_y.append(y)
else: # catch trial
if t.yes:
catch_detected_x.append(x)
catch_detected_y.append(y)
else:
catch_missed_x.append(x)
catch_missed_y.append(y)
# Create trialfig plots
trialax.plot(
all_x,
all_y,
marker=MatplotlibConstants.MARKER_NONE,
color=MatplotlibConstants.COLOUR_GREY_50,
linestyle=MatplotlibConstants.LINESTYLE_SOLID,
label=None,
)
trialax.plot(
notcalc_missed_x,
notcalc_missed_y,
marker=MatplotlibConstants.MARKER_CIRCLE,
color=MatplotlibConstants.COLOUR_BLACK,
linestyle=MatplotlibConstants.LINESTYLE_NONE,
label="miss",
)
trialax.plot(
notcalc_detected_x,
notcalc_detected_y,
marker=MatplotlibConstants.MARKER_PLUS,
color=MatplotlibConstants.COLOUR_BLACK,
linestyle=MatplotlibConstants.LINESTYLE_NONE,
label="hit",
)
trialax.plot(
calc_missed_x,
calc_missed_y,
marker=MatplotlibConstants.MARKER_CIRCLE,
color=MatplotlibConstants.COLOUR_RED,
linestyle=MatplotlibConstants.LINESTYLE_NONE,
label="miss, scored",
)
trialax.plot(
calc_detected_x,
calc_detected_y,
marker=MatplotlibConstants.MARKER_PLUS,
color=MatplotlibConstants.COLOUR_BLUE,
linestyle=MatplotlibConstants.LINESTYLE_NONE,
label="hit, scored",
)
trialax.plot(
catch_missed_x,
catch_missed_y,
marker=MatplotlibConstants.MARKER_CIRCLE,
color=MatplotlibConstants.COLOUR_GREEN,
linestyle=MatplotlibConstants.LINESTYLE_NONE,
label="CR",
)
trialax.plot(
catch_detected_x,
catch_detected_y,
marker=MatplotlibConstants.MARKER_STAR,
color=MatplotlibConstants.COLOUR_GREEN,
linestyle=MatplotlibConstants.LINESTYLE_NONE,
label="FA",
)
leg = trialax.legend(
numpoints=1,
fancybox=True, # for set_alpha (below)
loc="best", # bbox_to_anchor=(0.75, 1.05)
labelspacing=0,
handletextpad=0,
prop=req.fontprops,
)
leg.get_frame().set_alpha(0.5)
trialax.set_xlabel("Trial number (0-based)", fontdict=req.fontdict)
trialax.set_ylabel("Intensity", fontdict=req.fontdict)
trialax.set_ylim(0 - y_extra_space, 1 + y_extra_space)
trialax.set_xlim(-0.5, len(trialarray) - 0.5)
req.set_figure_font_sizes(trialax)
# Anything to do for fitfig?
if self.k is None or self.theta is None:
return trialfig, fitfig
# Create fitfig
fitfig = req.create_figure(figsize=figsize)
fitax = fitfig.add_subplot(MatplotlibConstants.WHOLE_PANEL)
detected_x = []
detected_x_approx = []
detected_y = []
missed_x = []
missed_x_approx = []
missed_y = []
all_x = []
for t in trialarray:
if t.trial_num_in_calculation_sequence is not None:
all_x.append(t.intensity)
approx_x = f"{t.intensity:.{dp_to_consider_same_for_jitter}f}"
if t.yes:
detected_y.append(
1 - detected_x_approx.count(approx_x) * jitter_step
)
detected_x.append(t.intensity)
detected_x_approx.append(approx_x)
else:
missed_y.append(
0 + missed_x_approx.count(approx_x) * jitter_step
)
missed_x.append(t.intensity)
missed_x_approx.append(approx_x)
# Again, anything to do for fitfig?
if not all_x:
return trialfig, fitfig
fit_x = np.arange(0.0 - x_extra_space, 1.0 + x_extra_space, 0.001)
fit_y = logistic(fit_x, self.k, self.theta)
fitax.plot(
fit_x,
fit_y,
color=MatplotlibConstants.COLOUR_GREEN,
linestyle=MatplotlibConstants.LINESTYLE_SOLID,
)
fitax.plot(
missed_x,
missed_y,
marker=MatplotlibConstants.MARKER_CIRCLE,
color=MatplotlibConstants.COLOUR_RED,
linestyle=MatplotlibConstants.LINESTYLE_NONE,
)
fitax.plot(
detected_x,
detected_y,
marker=MatplotlibConstants.MARKER_PLUS,
color=MatplotlibConstants.COLOUR_BLUE,
linestyle=MatplotlibConstants.LINESTYLE_NONE,
)
fitax.set_ylim(0 - y_extra_space, 1 + y_extra_space)
fitax.set_xlim(
np.amin(all_x) - x_extra_space, np.amax(all_x) + x_extra_space
)
marker_points = []
for y in (LOWER_MARKER, 0.5, UPPER_MARKER):
x = inv_logistic(y, self.k, self.theta)
marker_points.append((x, y))
for p in marker_points:
fitax.plot(
[p[0], p[0]], # x
[-1, p[1]], # y
color=MatplotlibConstants.COLOUR_GREY_50,
linestyle=MatplotlibConstants.LINESTYLE_DOTTED,
)
fitax.plot(
[-1, p[0]], # x
[p[1], p[1]], # y
color=MatplotlibConstants.COLOUR_GREY_50,
linestyle=MatplotlibConstants.LINESTYLE_DOTTED,
)
fitax.set_xlabel("Intensity", fontdict=req.fontdict)
fitax.set_ylabel(
"Detected? (0=no, 1=yes; jittered)", fontdict=req.fontdict
)
req.set_figure_font_sizes(fitax)
# Done
return trialfig, fitfig
[docs] def get_trial_html(self, req: CamcopsRequest) -> str:
"""
Note re plotting markers without lines:
.. code-block:: python
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.plot([1, 2], [1, 2], marker="+", color="r", linestyle="-")
ax.plot([1, 2], [2, 1], marker="o", color="b", linestyle="None")
fig.savefig("test.png")
# ... the "absent" line does NOT "cut" the red one.
Args:
req:
Returns:
"""
trialarray = self.trials
html = CardinalExpDetThresholdTrial.get_html_table_header()
for t in trialarray:
html += t.get_html_table_row()
html += """</table>"""
# Don't add figures if we're incomplete
if not self.is_complete():
return html
# Add figures
trialfig, fitfig = self._get_figures(req)
html += f"""
<table class="{CssClass.NOBORDER}">
<tr>
<td class="{CssClass.NOBORDERPHOTO}">
{req.get_html_from_pyplot_figure(trialfig)}
</td>
<td class="{CssClass.NOBORDERPHOTO}">
{req.get_html_from_pyplot_figure(fitfig)}
</td>
</tr>
</table>
"""
return html
def logistic_x_from_p(self, p: Optional[float]) -> Optional[float]:
try:
return (math.log(p / (1 - p)) - self.intercept) / self.slope
except (TypeError, ValueError):
return None
[docs] def get_task_html(self, req: CamcopsRequest) -> str:
if self.modality == MODALITY_AUDITORY:
modality = req.sstring(SS.AUDITORY)
elif self.modality == MODALITY_VISUAL:
modality = req.sstring(SS.VISUAL)
else:
modality = None
h = f"""
<div class="{CssClass.SUMMARY}">
<table class="{CssClass.SUMMARY}">
{self.get_is_complete_tr(req)}
</table>
</div>
<div class="{CssClass.EXPLANATION}">
The ExpDet-Threshold task measures visual and auditory
thresholds for stimuli on a noisy background, using a
single-interval up/down method. It is intended as a prequel to
the Expectation–Detection task.
</div>
<table class="{CssClass.TASKCONFIG}">
<tr>
<th width="50%">Configuration variable</th>
<th width="50%">Value</th>
</tr>
"""
h += tr_qa("Modality", modality)
h += tr_qa("Target number", self.target_number)
h += tr_qa("Background filename", ws.webify(self.background_filename))
h += tr_qa("Background intensity", self.background_intensity)
h += tr_qa("Target filename", ws.webify(self.target_filename))
h += tr_qa(
"(For visual targets) Target duration (s)",
self.visual_target_duration_s,
)
h += tr_qa("Start intensity (minimum)", self.start_intensity_min)
h += tr_qa("Start intensity (maximum)", self.start_intensity_max)
h += tr_qa(
"Initial (large) intensity step", self.initial_large_intensity_step
)
h += tr_qa(
"Main (small) intensity step", self.main_small_intensity_step
)
h += tr_qa(
"Number of trials in main sequence",
self.num_trials_in_main_sequence,
)
h += tr_qa("Probability of a catch trial", self.p_catch_trial)
h += tr_qa("Prompt", self.prompt)
h += tr_qa("Intertrial interval (ITI) (s)", self.iti_s)
h += f"""
</table>
<table class="{CssClass.TASKDETAIL}">
<tr><th width="50%">Measure</th><th width="50%">Value</th></tr>
"""
h += tr_qa("Finished?", get_yes_no_none(req, self.finished))
h += tr_qa("Logistic intercept", ws.number_to_dp(self.intercept, DP))
h += tr_qa("Logistic slope", ws.number_to_dp(self.slope, DP))
h += tr_qa("Logistic k (= slope)", ws.number_to_dp(self.k, DP))
h += tr_qa(
"Logistic theta (= –intercept/slope)",
ws.number_to_dp(self.theta, DP),
)
h += tr_qa(
f"Intensity for {100 * LOWER_MARKER}% detection",
ws.number_to_dp(self.logistic_x_from_p(LOWER_MARKER), DP),
)
h += tr_qa(
"Intensity for 50% detection", ws.number_to_dp(self.theta, DP)
)
h += tr_qa(
f"Intensity for {100 * UPPER_MARKER}% detection",
ws.number_to_dp(self.logistic_x_from_p(UPPER_MARKER), DP),
)
h += """
</table>
"""
h += self.get_trial_html(req)
return h