Source code for camcops_server.tasks.kirby_mcq

#!/usr/bin/env python

"""
camcops_server/tasks/kirby.py

===============================================================================

    Copyright (C) 2012-2019 Rudolf Cardinal (rudolf@pobox.com).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <http://www.gnu.org/licenses/>.

===============================================================================

"""

import logging
import math
from typing import Dict, List, Optional, Type

import numpy as np
from numpy.linalg.linalg import LinAlgError
from scipy.stats.mstats import gmean
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import Float, Integer
import statsmodels.api as sm
# noinspection PyProtectedMember
from statsmodels.discrete.discrete_model import BinaryResultsWrapper
from statsmodels.tools.sm_exceptions import PerfectSeparationError

from camcops_server.cc_modules.cc_constants import CssClass
from camcops_server.cc_modules.cc_db import (
    ancillary_relationship,
    GenericTabletRecordMixin,
    TaskDescendant,
)
from camcops_server.cc_modules.cc_html import (
    answer,
    tr_qa,
)
from camcops_server.cc_modules.cc_request import CamcopsRequest
from camcops_server.cc_modules.cc_sqlalchemy import Base
from camcops_server.cc_modules.cc_sqla_coltypes import (
    BoolColumn,
    CurrencyColType,
)
from camcops_server.cc_modules.cc_summaryelement import SummaryElement
from camcops_server.cc_modules.cc_task import Task, TaskHasPatientMixin

log = logging.getLogger(__name__)


# =============================================================================
# KirbyRewardPair
# =============================================================================

[docs]class KirbyRewardPair(object): """ Represents a pair of rewards: a small immediate reward (SIR) and a large delayed reward (LDR). """ def __init__(self, sir: int, ldr: int, delay_days: int, chose_ldr: bool = None, currency: str = "£", currency_symbol_first: bool = True) -> None: """ Args: sir: amount of the small immediate reward (SIR) ldr: amount of the large delayed reward (LDR) delay_days: delay to the LDR, in days chose_ldr: if result also represented, did the subject choose the LDR? currency: currency symbol currency_symbol_first: symbol before amount? """ self.sir = sir self.ldr = ldr self.delay_days = delay_days self.chose_ldr = chose_ldr self.currency = currency self.currency_symbol_first = currency_symbol_first
[docs] def money(self, amount: int) -> str: """ Returns a currency amount, formatted. """ if self.currency_symbol_first: return f"{self.currency}{amount}" return f"{amount}{self.currency}"
[docs] def sir_string(self, req: CamcopsRequest) -> str: """ Returns a string representing the small immediate reward, e.g. "£10 today". """ _ = req.gettext return _("{money} today").format(money=self.money(self.sir))
[docs] def ldr_string(self, req: CamcopsRequest) -> str: """ Returns a string representing the large delayed reward, e.g. "£50 in 200 days". """ _ = req.gettext return _("{money} in {days} days").format( money=self.money(self.ldr), days=self.delay_days, )
[docs] def question(self, req: CamcopsRequest) -> str: """ The question posed for this reward pair. """ _ = req.gettext return _("Would you prefer {sir}, or {ldr}?").format( sir=self.sir_string(req), ldr=self.ldr_string(req), )
[docs] def answer(self, req: CamcopsRequest) -> str: """ Returns the subject's answer, or "?". """ if self.chose_ldr is None: return "?" return self.ldr_string(req) if self.chose_ldr else self.sir_string(req)
[docs] def k_indifference(self) -> float: """ Returns the value of k, the discounting parameter (units: days ^ -1) if the subject is indifferent between the two choices. For calculations see :ref:`kirby_mcq.rst <kirby_mcq>`. """ a1 = self.sir a2 = self.ldr d2 = self.delay_days return (a2 - a1) / (a1 * d2)
[docs] def choice_consistent(self, k: float) -> bool: """ Was the choice consistent with the k value given? - If no choice has been recorded, returns false. - If the k value equals the implied indifference point exactly (meaning that the subject should not care), return true. """ if self.chose_ldr is None: return False k_indiff = self.k_indifference() if math.isclose(k, k_indiff): # Subject is indifferent return True # WARNING: "self.chose_ldr == k < k_indiff" FAILS. # Python will evaluate this to "(self.chose_ldr == k) < k_indiff", and # despite that evaluating to "a bool < an int", that's legal; e.g. # "False < 4" evaluates to True. # Must be bracketed like this: return self.chose_ldr == (k < k_indiff)
# ============================================================================= # KirbyTrial # =============================================================================
[docs]class KirbyTrial(GenericTabletRecordMixin, TaskDescendant, Base): __tablename__ = "kirby_mcq_trials" kirby_mcq_id = Column( "kirby_mcq_id", Integer, nullable=False, comment="FK to kirby_mcq" ) trial = Column( "trial", Integer, nullable=False, comment="Trial number (1-based)" ) sir = Column( "sir", Integer, comment="Small immediate reward" ) ldr = Column( "ldr", Integer, comment="Large delayed reward" ) delay_days = Column( "delay_days", Integer, comment="Delay in days" ) currency = Column( "currency", CurrencyColType, comment="Currency symbol" ) currency_symbol_first = BoolColumn( "currency_symbol_first", comment="Does the currency symbol come before the amount?" ) chose_ldr = BoolColumn( "chose_ldr", comment="Did the subject choose the large delayed reward?" )
[docs] def info(self) -> KirbyRewardPair: """ Returns the trial information as a :class:`KirbyRewardPair`. """ return KirbyRewardPair( sir=self.sir, ldr=self.ldr, delay_days=self.delay_days, chose_ldr=self.chose_ldr, currency=self.currency, currency_symbol_first=self.currency_symbol_first, )
[docs] def answered(self) -> bool: """ Has the subject answered this question? """ return self.chose_ldr is not None
# ------------------------------------------------------------------------- # TaskDescendant overrides # -------------------------------------------------------------------------
[docs] @classmethod def task_ancestor_class(cls) -> Optional[Type["Task"]]: return Kirby
[docs] def task_ancestor(self) -> Optional["Kirby"]: return Kirby.get_linked(self.kirby_mcq_id, self)
# ============================================================================= # Kirby # =============================================================================
[docs]class Kirby(TaskHasPatientMixin, Task): """ Server implementation of the Kirby Monetary Choice Questionnaire task. """ __tablename__ = "kirby_mcq" shortname = "KirbyMCQ" EXPECTED_N_TRIALS = 27 # No fields beyond the basics. # Relationships trials = ancillary_relationship( parent_class_name="Kirby", ancillary_class_name="KirbyTrial", ancillary_fk_to_parent_attr_name="kirby_mcq_id", ancillary_order_by_attr_name="trial" ) # type: List[KirbyTrial]
[docs] @staticmethod def longname(req: "CamcopsRequest") -> str: _ = req.gettext return _("Kirby et al. 1999 Monetary Choice Questionnaire")
[docs] def is_complete(self) -> bool: if len(self.trials) != self.EXPECTED_N_TRIALS: return False for t in self.trials: if not t.answered(): return False return True
[docs] def all_choice_results(self) -> List[KirbyRewardPair]: """ Returns a list of :class:`KirbyRewardPair` objects, one for each answered question. """ results = [] # type: List[KirbyRewardPair] for t in self.trials: if t.answered(): results.append(t.info()) return results
[docs] @staticmethod def n_choices_consistent(k: float, results: List[KirbyRewardPair]) -> int: """ Returns the number of choices that are consistent with the given k value. """ n_consistent = 0 for pair in results: if pair.choice_consistent(k): n_consistent += 1 return n_consistent
[docs] def k_kirby(self, results: List[KirbyRewardPair]) -> Optional[float]: """ Returns k for a subject as determined using Kirby's (2000) method. See :ref:`kirby_mcq.rst <kirby_mcq>`. """ # 1. For every k value assessed by the questions, establish the degree # of consistency. consistency = {} # type: Dict[float, int] for pair in results: k = pair.k_indifference() if k not in consistency: consistency[k] = self.n_choices_consistent(k, results) if not consistency: return None # 2. Restrict to the results that are equally and maximally consistent. max_consistency = max(consistency.values()) good_k_values = [k for k, v in consistency.items() if v == max_consistency] # 3. Take the geometric mean of those good k values. # noinspection PyTypeChecker subject_k = gmean(good_k_values) # type: np.float64 return float(subject_k)
[docs] @staticmethod def k_wileyto(results: List[KirbyRewardPair]) -> Optional[float]: """ Returns k for a subject as determined using Wileyto et al.'s (2004) method. See :ref:`kirby_mcq.rst <kirby_mcq>`. """ if not results: return None n_predictors = 2 n_observations = len(results) x = np.zeros((n_observations, n_predictors)) y = np.zeros(n_observations) for i in range(n_observations): pair = results[i] a1 = pair.sir a2 = pair.ldr d2 = pair.delay_days predictor1 = 1 - (a2 / a1) predictor2 = d2 x[i, 0] = predictor1 x[i, 1] = predictor2 y[i] = int(pair.chose_ldr) # bool to int lr = sm.Logit(y, x) try: result = lr.fit() # type: BinaryResultsWrapper except (LinAlgError, # e.g. "singular matrix" PerfectSeparationError) as e: log.debug(f"sm.Logit error: {e}") return None coeffs = result.params beta1 = coeffs[0] beta2 = coeffs[1] try: k = beta2 / beta1 except ZeroDivisionError: log.warning("Division by zero when calculating k = beta2 / beta1") return None return k
# noinspection PyUnusedLocal
[docs] def get_summaries(self, req: CamcopsRequest) -> List[SummaryElement]: results = self.all_choice_results() return self.standard_task_summary_fields() + [ SummaryElement( name="k_kirby", coltype=Float(), value=self.k_kirby(results), comment="k (days^-1, Kirby 2000 method)"), SummaryElement( name="k_wileyto", coltype=Float(), value=self.k_wileyto(results), comment="k (days^-1, Wileyto 2004 method)"), ]
[docs] def get_task_html(self, req: CamcopsRequest) -> str: dp = 6 qlines = [] # type: List[str] for t in self.trials: info = t.info() qlines.append( tr_qa(f"{t.trial}. {info.question(req)} " f"<i>(k<sub>indiff</sub> = " f"{round(info.k_indifference(), dp)})</i>", info.answer(req)) ) q_a = "\n".join(qlines) results = self.all_choice_results() k_kirby = self.k_kirby(results) if k_kirby is None: inv_k_kirby = None else: inv_k_kirby = int(round(1 / k_kirby)) # round to int # ... you'd think the int() was unnecessary but it is needed k_kirby = round(k_kirby, dp) k_wileyto = self.k_wileyto(results) if k_wileyto is None: inv_k_wileyto = None else: inv_k_wileyto = int(round(1 / k_wileyto)) # round to int k_wileyto = round(k_wileyto, dp) return f""" <div class="{CssClass.SUMMARY}"> <table class="{CssClass.SUMMARY}"> {self.get_is_complete_tr(req)} <tr> <td><i>k</i> (days<sup>–1</sup>, Kirby 2000 method)</td> <td>{answer(k_kirby)} </tr> <tr> <td>1/<i>k</i> (days, Kirby method): time to half value</td> <td>{answer(inv_k_kirby)} </tr> <tr> <td><i>k</i> (days<sup>–1</sup>, Wileyto et al. 2004 method)</td> <td>{answer(k_wileyto)} </tr> <tr> <td>1/<i>k</i> (days, Wileyto method): time to half value</td> <td>{answer(inv_k_wileyto)} </tr> </table> </div> <table class="{CssClass.TASKDETAIL}"> <tr> <th width="75%">Question</th> <th width="25%">Answer</th> </tr> {q_a} </table> """ # noqa