"""
camcops_server/tasks/kirby.py
===============================================================================
Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CamCOPS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
"""
import logging
import math
from typing import Dict, List, Optional, Type
import numpy as np
from numpy.linalg.linalg import LinAlgError
from scipy.stats.mstats import gmean
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.sql.sqltypes import Float
import statsmodels.api as sm
# noinspection PyProtectedMember
from statsmodels.discrete.discrete_model import BinaryResultsWrapper
from statsmodels.tools.sm_exceptions import PerfectSeparationError
from camcops_server.cc_modules.cc_constants import CssClass
from camcops_server.cc_modules.cc_db import (
ancillary_relationship,
GenericTabletRecordMixin,
TaskDescendant,
)
from camcops_server.cc_modules.cc_html import answer, tr_qa
from camcops_server.cc_modules.cc_request import CamcopsRequest
from camcops_server.cc_modules.cc_sqlalchemy import Base
from camcops_server.cc_modules.cc_sqla_coltypes import (
CurrencyColType,
mapped_bool_column,
)
from camcops_server.cc_modules.cc_summaryelement import SummaryElement
from camcops_server.cc_modules.cc_task import Task, TaskHasPatientMixin
log = logging.getLogger(__name__)
# =============================================================================
# KirbyRewardPair
# =============================================================================
[docs]class KirbyRewardPair(object):
"""
Represents a pair of rewards: a small immediate reward (SIR) and a large
delayed reward (LDR).
"""
[docs] def __init__(
self,
sir: int,
ldr: int,
delay_days: int,
chose_ldr: bool = None,
currency: str = "£",
currency_symbol_first: bool = True,
) -> None:
"""
Args:
sir: amount of the small immediate reward (SIR)
ldr: amount of the large delayed reward (LDR)
delay_days: delay to the LDR, in days
chose_ldr: if result also represented, did the subject choose the
LDR?
currency: currency symbol
currency_symbol_first: symbol before amount?
"""
self.sir = sir
self.ldr = ldr
self.delay_days = delay_days
self.chose_ldr = chose_ldr
self.currency = currency
self.currency_symbol_first = currency_symbol_first
[docs] def money(self, amount: int) -> str:
"""
Returns a currency amount, formatted.
"""
if self.currency_symbol_first:
return f"{self.currency}{amount}"
return f"{amount}{self.currency}"
[docs] def sir_string(self, req: CamcopsRequest) -> str:
"""
Returns a string representing the small immediate reward, e.g.
"£10 today".
"""
_ = req.gettext
return _("{money} today").format(money=self.money(self.sir))
[docs] def ldr_string(self, req: CamcopsRequest) -> str:
"""
Returns a string representing the large delayed reward, e.g.
"£50 in 200 days".
"""
_ = req.gettext
return _("{money} in {days} days").format(
money=self.money(self.ldr), days=self.delay_days
)
[docs] def question(self, req: CamcopsRequest) -> str:
"""
The question posed for this reward pair.
"""
_ = req.gettext
return _("Would you prefer {sir}, or {ldr}?").format(
sir=self.sir_string(req), ldr=self.ldr_string(req)
)
[docs] def answer(self, req: CamcopsRequest) -> str:
"""
Returns the subject's answer, or "?".
"""
if self.chose_ldr is None:
return "?"
return self.ldr_string(req) if self.chose_ldr else self.sir_string(req)
[docs] def k_indifference(self) -> float:
"""
Returns the value of k, the discounting parameter (units: days ^ -1)
if the subject is indifferent between the two choices.
For calculations see :ref:`kirby_mcq.rst <kirby_mcq>`.
"""
a1 = self.sir
a2 = self.ldr
d2 = self.delay_days
return (a2 - a1) / (a1 * d2)
[docs] def choice_consistent(self, k: float) -> bool:
"""
Was the choice consistent with the k value given?
- If no choice has been recorded, returns false.
- If the k value equals the implied indifference point exactly (meaning
that the subject should not care), return true.
"""
if self.chose_ldr is None:
return False
k_indiff = self.k_indifference()
if math.isclose(k, k_indiff):
# Subject is indifferent
return True
# WARNING: "self.chose_ldr == k < k_indiff" FAILS.
# Python will evaluate this to "(self.chose_ldr == k) < k_indiff", and
# despite that evaluating to "a bool < an int", that's legal; e.g.
# "False < 4" evaluates to True.
# Must be bracketed like this:
return self.chose_ldr == (k < k_indiff)
# =============================================================================
# KirbyTrial
# =============================================================================
[docs]class KirbyTrial(GenericTabletRecordMixin, TaskDescendant, Base):
__tablename__ = "kirby_mcq_trials"
kirby_mcq_id: Mapped[int] = mapped_column(comment="FK to kirby_mcq")
trial: Mapped[int] = mapped_column(comment="Trial number (1-based)")
sir: Mapped[Optional[int]] = mapped_column(
comment="Small immediate reward"
)
ldr: Mapped[Optional[int]] = mapped_column(comment="Large delayed reward")
delay_days: Mapped[Optional[int]] = mapped_column(comment="Delay in days")
currency: Mapped[Optional[str]] = mapped_column(
CurrencyColType, comment="Currency symbol"
)
currency_symbol_first: Mapped[Optional[bool]] = mapped_bool_column(
"currency_symbol_first",
comment="Does the currency symbol come before the amount?",
)
chose_ldr: Mapped[Optional[bool]] = mapped_bool_column(
"chose_ldr", comment="Did the subject choose the large delayed reward?"
)
[docs] def info(self) -> KirbyRewardPair:
"""
Returns the trial information as a :class:`KirbyRewardPair`.
"""
return KirbyRewardPair(
sir=self.sir,
ldr=self.ldr,
delay_days=self.delay_days,
chose_ldr=self.chose_ldr,
currency=self.currency,
currency_symbol_first=self.currency_symbol_first,
)
[docs] def answered(self) -> bool:
"""
Has the subject answered this question?
"""
return self.chose_ldr is not None
# -------------------------------------------------------------------------
# TaskDescendant overrides
# -------------------------------------------------------------------------
@classmethod
def task_ancestor_class(cls) -> Optional[Type["Task"]]:
return Kirby
[docs] def task_ancestor(self) -> Optional["Kirby"]:
return Kirby.get_linked(
self.kirby_mcq_id, self
) # type: ignore[return-value]
# =============================================================================
# Kirby
# =============================================================================
[docs]class Kirby(TaskHasPatientMixin, Task): # type: ignore[misc]
"""
Server implementation of the Kirby Monetary Choice Questionnaire task.
"""
__tablename__ = "kirby_mcq"
shortname = "KirbyMCQ"
EXPECTED_N_TRIALS = 27
# No fields beyond the basics.
# Relationships
trials = ancillary_relationship( # type: ignore[assignment]
parent_class_name="Kirby",
ancillary_class_name="KirbyTrial",
ancillary_fk_to_parent_attr_name="kirby_mcq_id",
ancillary_order_by_attr_name="trial",
) # type: List[KirbyTrial]
[docs] @staticmethod
def longname(req: "CamcopsRequest") -> str:
_ = req.gettext
return _("Kirby et al. 1999 Monetary Choice Questionnaire")
[docs] def is_complete(self) -> bool:
if len(self.trials) != self.EXPECTED_N_TRIALS:
return False
for t in self.trials:
if not t.answered():
return False
return True
[docs] def all_choice_results(self) -> List[KirbyRewardPair]:
"""
Returns a list of :class:`KirbyRewardPair` objects, one for each
answered question.
"""
results = [] # type: List[KirbyRewardPair]
for t in self.trials:
if t.answered():
results.append(t.info())
return results
[docs] @staticmethod
def n_choices_consistent(k: float, results: List[KirbyRewardPair]) -> int:
"""
Returns the number of choices that are consistent with the given k
value.
"""
n_consistent = 0
for pair in results:
if pair.choice_consistent(k):
n_consistent += 1
return n_consistent
[docs] def k_kirby(self, results: List[KirbyRewardPair]) -> Optional[float]:
"""
Returns k for a subject as determined using Kirby's (2000) method.
See :ref:`kirby_mcq.rst <kirby_mcq>`.
"""
# 1. For every k value assessed by the questions, establish the degree
# of consistency.
consistency = {} # type: Dict[float, int]
for pair in results:
k = pair.k_indifference()
if k not in consistency:
consistency[k] = self.n_choices_consistent(k, results)
if not consistency:
return None
# 2. Restrict to the results that are equally and maximally consistent.
max_consistency = max(consistency.values())
good_k_values = [
k for k, v in consistency.items() if v == max_consistency
]
# 3. Take the geometric mean of those good k values.
# noinspection PyTypeChecker
subject_k = gmean(good_k_values) # type: np.float64
return float(subject_k)
[docs] @staticmethod
def k_wileyto(results: List[KirbyRewardPair]) -> Optional[float]:
"""
Returns k for a subject as determined using Wileyto et al.'s (2004)
method. See :ref:`kirby_mcq.rst <kirby_mcq>`.
"""
if not results:
return None
n_predictors = 2
n_observations = len(results)
x = np.zeros((n_observations, n_predictors))
y = np.zeros(n_observations)
for i in range(n_observations):
pair = results[i]
a1 = pair.sir
a2 = pair.ldr
d2 = pair.delay_days
predictor1 = 1 - (a2 / a1)
predictor2 = d2
x[i, 0] = predictor1
x[i, 1] = predictor2
y[i] = int(pair.chose_ldr) # bool to int
lr = sm.Logit(y, x)
try:
result = lr.fit() # type: BinaryResultsWrapper
except (
LinAlgError, # e.g. "singular matrix"
PerfectSeparationError,
) as e:
log.debug(f"sm.Logit error: {e}")
return None
coeffs = result.params
beta1 = coeffs[0]
beta2 = coeffs[1]
try:
k = beta2 / beta1
except ZeroDivisionError:
log.warning("Division by zero when calculating k = beta2 / beta1")
return None
return k
# noinspection PyUnusedLocal
[docs] def get_summaries(self, req: CamcopsRequest) -> List[SummaryElement]:
results = self.all_choice_results()
return self.standard_task_summary_fields() + [
SummaryElement(
name="k_kirby",
coltype=Float(),
value=self.k_kirby(results),
comment="k (days^-1, Kirby 2000 method)",
),
SummaryElement(
name="k_wileyto",
coltype=Float(),
value=self.k_wileyto(results),
comment="k (days^-1, Wileyto 2004 method)",
),
]
[docs] def get_task_html(self, req: CamcopsRequest) -> str:
dp = 6
qlines = [] # type: List[str]
for t in self.trials:
info = t.info()
qlines.append(
tr_qa(
f"{t.trial}. {info.question(req)} "
f"<i>(k<sub>indiff</sub> = "
f"{round(info.k_indifference(), dp)})</i>",
info.answer(req),
)
)
q_a = "\n".join(qlines)
results = self.all_choice_results()
k_kirby = self.k_kirby(results)
if k_kirby is None:
inv_k_kirby = None
else:
inv_k_kirby = int(round(1 / k_kirby)) # round to int
# ... you'd think the int() was unnecessary but it is needed
k_kirby = round(k_kirby, dp)
k_wileyto = self.k_wileyto(results)
if k_wileyto is None:
inv_k_wileyto = None
else:
inv_k_wileyto = int(round(1 / k_wileyto)) # round to int
k_wileyto = round(k_wileyto, dp)
return f"""
<div class="{CssClass.SUMMARY}">
<table class="{CssClass.SUMMARY}">
{self.get_is_complete_tr(req)}
<tr>
<td><i>k</i> (days<sup>–1</sup>, Kirby 2000 method)</td>
<td>{answer(k_kirby)}
</tr>
<tr>
<td>1/<i>k</i> (days, Kirby method): time to half value</td>
<td>{answer(inv_k_kirby)}
</tr>
<tr>
<td><i>k</i> (days<sup>–1</sup>, Wileyto et al. 2004 method)</td>
<td>{answer(k_wileyto)}
</tr>
<tr>
<td>1/<i>k</i> (days, Wileyto method): time to half value</td>
<td>{answer(inv_k_wileyto)}
</tr>
</table>
</div>
<table class="{CssClass.TASKDETAIL}">
<tr>
<th width="75%">Question</th>
<th width="25%">Answer</th>
</tr>
{q_a}
</table>
""" # noqa