Source code for camcops_server.tasks.kirby_mcq

"""
camcops_server/tasks/kirby.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

"""

import logging
import math
from typing import Dict, List, Optional, Type

import numpy as np
from numpy.linalg.linalg import LinAlgError
from scipy.stats.mstats import gmean
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.sql.sqltypes import Float
import statsmodels.api as sm

# noinspection PyProtectedMember
from statsmodels.discrete.discrete_model import BinaryResultsWrapper
from statsmodels.tools.sm_exceptions import PerfectSeparationError

from camcops_server.cc_modules.cc_constants import CssClass
from camcops_server.cc_modules.cc_db import (
    ancillary_relationship,
    GenericTabletRecordMixin,
    TaskDescendant,
)
from camcops_server.cc_modules.cc_html import answer, tr_qa
from camcops_server.cc_modules.cc_request import CamcopsRequest
from camcops_server.cc_modules.cc_sqlalchemy import Base
from camcops_server.cc_modules.cc_sqla_coltypes import (
    CurrencyColType,
    mapped_bool_column,
)
from camcops_server.cc_modules.cc_summaryelement import SummaryElement
from camcops_server.cc_modules.cc_task import Task, TaskHasPatientMixin

log = logging.getLogger(__name__)


# =============================================================================
# KirbyRewardPair
# =============================================================================


[docs]class KirbyRewardPair(object): """ Represents a pair of rewards: a small immediate reward (SIR) and a large delayed reward (LDR). """
[docs] def __init__( self, sir: int, ldr: int, delay_days: int, chose_ldr: bool = None, currency: str = "£", currency_symbol_first: bool = True, ) -> None: """ Args: sir: amount of the small immediate reward (SIR) ldr: amount of the large delayed reward (LDR) delay_days: delay to the LDR, in days chose_ldr: if result also represented, did the subject choose the LDR? currency: currency symbol currency_symbol_first: symbol before amount? """ self.sir = sir self.ldr = ldr self.delay_days = delay_days self.chose_ldr = chose_ldr self.currency = currency self.currency_symbol_first = currency_symbol_first
[docs] def money(self, amount: int) -> str: """ Returns a currency amount, formatted. """ if self.currency_symbol_first: return f"{self.currency}{amount}" return f"{amount}{self.currency}"
[docs] def sir_string(self, req: CamcopsRequest) -> str: """ Returns a string representing the small immediate reward, e.g. "£10 today". """ _ = req.gettext return _("{money} today").format(money=self.money(self.sir))
[docs] def ldr_string(self, req: CamcopsRequest) -> str: """ Returns a string representing the large delayed reward, e.g. "£50 in 200 days". """ _ = req.gettext return _("{money} in {days} days").format( money=self.money(self.ldr), days=self.delay_days )
[docs] def question(self, req: CamcopsRequest) -> str: """ The question posed for this reward pair. """ _ = req.gettext return _("Would you prefer {sir}, or {ldr}?").format( sir=self.sir_string(req), ldr=self.ldr_string(req) )
[docs] def answer(self, req: CamcopsRequest) -> str: """ Returns the subject's answer, or "?". """ if self.chose_ldr is None: return "?" return self.ldr_string(req) if self.chose_ldr else self.sir_string(req)
[docs] def k_indifference(self) -> float: """ Returns the value of k, the discounting parameter (units: days ^ -1) if the subject is indifferent between the two choices. For calculations see :ref:`kirby_mcq.rst <kirby_mcq>`. """ a1 = self.sir a2 = self.ldr d2 = self.delay_days return (a2 - a1) / (a1 * d2)
[docs] def choice_consistent(self, k: float) -> bool: """ Was the choice consistent with the k value given? - If no choice has been recorded, returns false. - If the k value equals the implied indifference point exactly (meaning that the subject should not care), return true. """ if self.chose_ldr is None: return False k_indiff = self.k_indifference() if math.isclose(k, k_indiff): # Subject is indifferent return True # WARNING: "self.chose_ldr == k < k_indiff" FAILS. # Python will evaluate this to "(self.chose_ldr == k) < k_indiff", and # despite that evaluating to "a bool < an int", that's legal; e.g. # "False < 4" evaluates to True. # Must be bracketed like this: return self.chose_ldr == (k < k_indiff)
# ============================================================================= # KirbyTrial # =============================================================================
[docs]class KirbyTrial(GenericTabletRecordMixin, TaskDescendant, Base): __tablename__ = "kirby_mcq_trials" kirby_mcq_id: Mapped[int] = mapped_column(comment="FK to kirby_mcq") trial: Mapped[int] = mapped_column(comment="Trial number (1-based)") sir: Mapped[Optional[int]] = mapped_column( comment="Small immediate reward" ) ldr: Mapped[Optional[int]] = mapped_column(comment="Large delayed reward") delay_days: Mapped[Optional[int]] = mapped_column(comment="Delay in days") currency: Mapped[Optional[str]] = mapped_column( CurrencyColType, comment="Currency symbol" ) currency_symbol_first: Mapped[Optional[bool]] = mapped_bool_column( "currency_symbol_first", comment="Does the currency symbol come before the amount?", ) chose_ldr: Mapped[Optional[bool]] = mapped_bool_column( "chose_ldr", comment="Did the subject choose the large delayed reward?" )
[docs] def info(self) -> KirbyRewardPair: """ Returns the trial information as a :class:`KirbyRewardPair`. """ return KirbyRewardPair( sir=self.sir, ldr=self.ldr, delay_days=self.delay_days, chose_ldr=self.chose_ldr, currency=self.currency, currency_symbol_first=self.currency_symbol_first, )
[docs] def answered(self) -> bool: """ Has the subject answered this question? """ return self.chose_ldr is not None
# ------------------------------------------------------------------------- # TaskDescendant overrides # ------------------------------------------------------------------------- @classmethod def task_ancestor_class(cls) -> Optional[Type["Task"]]: return Kirby
[docs] def task_ancestor(self) -> Optional["Kirby"]: return Kirby.get_linked( self.kirby_mcq_id, self ) # type: ignore[return-value]
# ============================================================================= # Kirby # =============================================================================
[docs]class Kirby(TaskHasPatientMixin, Task): # type: ignore[misc] """ Server implementation of the Kirby Monetary Choice Questionnaire task. """ __tablename__ = "kirby_mcq" shortname = "KirbyMCQ" EXPECTED_N_TRIALS = 27 # No fields beyond the basics. # Relationships trials = ancillary_relationship( # type: ignore[assignment] parent_class_name="Kirby", ancillary_class_name="KirbyTrial", ancillary_fk_to_parent_attr_name="kirby_mcq_id", ancillary_order_by_attr_name="trial", ) # type: List[KirbyTrial]
[docs] @staticmethod def longname(req: "CamcopsRequest") -> str: _ = req.gettext return _("Kirby et al. 1999 Monetary Choice Questionnaire")
[docs] def is_complete(self) -> bool: if len(self.trials) != self.EXPECTED_N_TRIALS: return False for t in self.trials: if not t.answered(): return False return True
[docs] def all_choice_results(self) -> List[KirbyRewardPair]: """ Returns a list of :class:`KirbyRewardPair` objects, one for each answered question. """ results = [] # type: List[KirbyRewardPair] for t in self.trials: if t.answered(): results.append(t.info()) return results
[docs] @staticmethod def n_choices_consistent(k: float, results: List[KirbyRewardPair]) -> int: """ Returns the number of choices that are consistent with the given k value. """ n_consistent = 0 for pair in results: if pair.choice_consistent(k): n_consistent += 1 return n_consistent
[docs] def k_kirby(self, results: List[KirbyRewardPair]) -> Optional[float]: """ Returns k for a subject as determined using Kirby's (2000) method. See :ref:`kirby_mcq.rst <kirby_mcq>`. """ # 1. For every k value assessed by the questions, establish the degree # of consistency. consistency = {} # type: Dict[float, int] for pair in results: k = pair.k_indifference() if k not in consistency: consistency[k] = self.n_choices_consistent(k, results) if not consistency: return None # 2. Restrict to the results that are equally and maximally consistent. max_consistency = max(consistency.values()) good_k_values = [ k for k, v in consistency.items() if v == max_consistency ] # 3. Take the geometric mean of those good k values. # noinspection PyTypeChecker subject_k = gmean(good_k_values) # type: np.float64 return float(subject_k)
[docs] @staticmethod def k_wileyto(results: List[KirbyRewardPair]) -> Optional[float]: """ Returns k for a subject as determined using Wileyto et al.'s (2004) method. See :ref:`kirby_mcq.rst <kirby_mcq>`. """ if not results: return None n_predictors = 2 n_observations = len(results) x = np.zeros((n_observations, n_predictors)) y = np.zeros(n_observations) for i in range(n_observations): pair = results[i] a1 = pair.sir a2 = pair.ldr d2 = pair.delay_days predictor1 = 1 - (a2 / a1) predictor2 = d2 x[i, 0] = predictor1 x[i, 1] = predictor2 y[i] = int(pair.chose_ldr) # bool to int lr = sm.Logit(y, x) try: result = lr.fit() # type: BinaryResultsWrapper except ( LinAlgError, # e.g. "singular matrix" PerfectSeparationError, ) as e: log.debug(f"sm.Logit error: {e}") return None coeffs = result.params beta1 = coeffs[0] beta2 = coeffs[1] try: k = beta2 / beta1 except ZeroDivisionError: log.warning("Division by zero when calculating k = beta2 / beta1") return None return k
# noinspection PyUnusedLocal
[docs] def get_summaries(self, req: CamcopsRequest) -> List[SummaryElement]: results = self.all_choice_results() return self.standard_task_summary_fields() + [ SummaryElement( name="k_kirby", coltype=Float(), value=self.k_kirby(results), comment="k (days^-1, Kirby 2000 method)", ), SummaryElement( name="k_wileyto", coltype=Float(), value=self.k_wileyto(results), comment="k (days^-1, Wileyto 2004 method)", ), ]
[docs] def get_task_html(self, req: CamcopsRequest) -> str: dp = 6 qlines = [] # type: List[str] for t in self.trials: info = t.info() qlines.append( tr_qa( f"{t.trial}. {info.question(req)} " f"<i>(k<sub>indiff</sub> = " f"{round(info.k_indifference(), dp)})</i>", info.answer(req), ) ) q_a = "\n".join(qlines) results = self.all_choice_results() k_kirby = self.k_kirby(results) if k_kirby is None: inv_k_kirby = None else: inv_k_kirby = int(round(1 / k_kirby)) # round to int # ... you'd think the int() was unnecessary but it is needed k_kirby = round(k_kirby, dp) k_wileyto = self.k_wileyto(results) if k_wileyto is None: inv_k_wileyto = None else: inv_k_wileyto = int(round(1 / k_wileyto)) # round to int k_wileyto = round(k_wileyto, dp) return f""" <div class="{CssClass.SUMMARY}"> <table class="{CssClass.SUMMARY}"> {self.get_is_complete_tr(req)} <tr> <td><i>k</i> (days<sup>–1</sup>, Kirby 2000 method)</td> <td>{answer(k_kirby)} </tr> <tr> <td>1/<i>k</i> (days, Kirby method): time to half value</td> <td>{answer(inv_k_kirby)} </tr> <tr> <td><i>k</i> (days<sup>–1</sup>, Wileyto et al. 2004 method)</td> <td>{answer(k_wileyto)} </tr> <tr> <td>1/<i>k</i> (days, Wileyto method): time to half value</td> <td>{answer(inv_k_wileyto)} </tr> </table> </div> <table class="{CssClass.TASKDETAIL}"> <tr> <th width="75%">Question</th> <th width="25%">Answer</th> </tr> {q_a} </table> """ # noqa