#!/usr/bin/env python
"""
camcops_server/cc_modules/cc_forms.py
===============================================================================
Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CamCOPS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
.. _Deform: https://docs.pylonsproject.org/projects/deform/en/latest/
**Forms for use by the web front end.**
*COLANDER NODES, NULLS, AND VALIDATION*
- Surprisingly tricky.
- Nodes must be validly intialized with NO USER-DEFINED PARAMETERS to __init__;
the Deform framework clones them.
- A null appstruct is used to initialize nodes as Forms are created.
Therefore, the "default" value must be acceptable to the underlying type's
serialize() function. Note in particular that "default = None" is not
acceptable to Integer. Having no default is fine, though.
- In general, flexible inheritance is very hard to implement.
- Note that this error:
.. code-block:: none
AttributeError: 'EditTaskFilterSchema' object has no attribute 'typ'
means you have failed to call super().__init__() properly from __init__().
- When creating a schema, its members seem to have to be created in the class
declaration as class properties, not in __init__().
*ACCESSING THE PYRAMID REQUEST IN FORMS AND SCHEMAS*
We often want to be able to access the request for translation purposes, or
sometimes more specialized reasons.
Forms are created dynamically as simple Python objects. So, for a
:class:`deform.form.Form`, just add a ``request`` parameter to the constructor,
and pass it when you create the form. An example is
:class:`camcops_server.cc_modules.cc_forms.DeleteCancelForm`.
For a :class:`colander.Schema` and :class:`colander.SchemaNode`, construction
is separate from binding. The schema nodes are created as part of a schema
class, not a schema instance. The schema is created by the form, and then bound
to a request. Access to the request is therefore via the :func:`after_bind`
callback function, offered by colander, via the ``kw`` parameter or
``self.bindings``. We use ``Binding.REQUEST`` as a standard key for this
dictionary. The bindings are also available in :func:`validator` and similar
functions, as ``self.bindings``.
All forms containing any schema that needs to see the request should have this
sort of ``__init__`` function:
.. code-block:: python
class SomeForm(...):
def __init__(...):
schema = schema_class().bind(request=request)
super().__init__(
schema,
...,
**kwargs
)
The simplest thing, therefore, is for all forms to do this. Some of our forms
use a form superclass that does this via the ``schema_class`` argument (which
is not part of colander, so if you see that, the superclass should do the work
of binding a request).
For translation, throughout there will be ``_ = self.gettext`` or ``_ =
request.gettext``.
Form titles need to be dynamically written via
:class:`cardinal_pythonlib.deform_utils.DynamicDescriptionsForm` or similar.
.. glossary::
cstruct
See `cstruct
<https://docs.pylonsproject.org/projects/deform/en/latest/glossary.html#term-cstruct>`_
in the Deform_ docs.
Colander
See `Colander
<https://docs.pylonsproject.org/projects/deform/en/latest/glossary.html#term-colander>`_
in the Deform_ docs.
field
See `field
<https://docs.pylonsproject.org/projects/deform/en/latest/glossary.html#term-field>`_
in the Deform_ docs.
Peppercorn
See `Peppercorn
<https://docs.pylonsproject.org/projects/deform/en/latest/glossary.html#term-peppercorn>`_
in the Deform_ docs.
pstruct
See `pstruct
<https://docs.pylonsproject.org/projects/deform/en/latest/glossary.html#term-pstruct>`_
in the Deform_ docs.
""" # noqa
from io import BytesIO
import json
import logging
import os
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Tuple,
Type,
TYPE_CHECKING,
Union,
)
from cardinal_pythonlib.colander_utils import (
AllowNoneType,
BooleanNode,
DateSelectorNode,
DateTimeSelectorNode,
DEFAULT_WIDGET_DATE_OPTIONS_FOR_PENDULUM,
DEFAULT_WIDGET_TIME_OPTIONS_FOR_PENDULUM,
get_child_node,
get_values_and_permissible,
HiddenIntegerNode,
HiddenStringNode,
MandatoryEmailNode,
MandatoryStringNode,
OptionalEmailNode,
OptionalIntNode,
OptionalPendulumNode,
OptionalStringNode,
ValidateDangerousOperationNode,
)
from cardinal_pythonlib.deform_utils import (
DynamicDescriptionsForm,
InformativeForm,
)
from cardinal_pythonlib.httpconst import HttpMethod
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
from cardinal_pythonlib.sqlalchemy.orm_query import CountStarSpecializedQuery
# noinspection PyProtectedMember
from colander import (
Boolean,
Date,
drop,
Integer,
Invalid,
Length,
MappingSchema,
null,
OneOf,
Range,
Schema,
SchemaNode,
SchemaType,
SequenceSchema,
Set,
String,
_null,
url,
)
from deform.form import Button
from deform.widget import (
CheckboxChoiceWidget,
CheckedPasswordWidget,
# DateInputWidget,
DateTimeInputWidget,
FormWidget,
HiddenWidget,
MappingWidget,
PasswordWidget,
RadioChoiceWidget,
RichTextWidget,
SelectWidget,
SequenceWidget,
TextAreaWidget,
TextInputWidget,
Widget,
)
from pendulum import Duration
import phonenumbers
import pyotp
import qrcode
import qrcode.image.svg
# import as LITTLE AS POSSIBLE; this is used by lots of modules
# We use some delayed imports here (search for "delayed import")
from camcops_server.cc_modules.cc_baseconstants import (
DEFORM_SUPPORTS_CSP_NONCE,
TEMPLATE_DIR,
)
from camcops_server.cc_modules.cc_constants import (
ConfigParamSite,
DEFAULT_ROWS_PER_PAGE,
MfaMethod,
MINIMUM_PASSWORD_LENGTH,
SEX_OTHER_UNSPECIFIED,
SEX_FEMALE,
SEX_MALE,
StringLengths,
USER_NAME_FOR_SYSTEM,
)
from camcops_server.cc_modules.cc_group import Group
from camcops_server.cc_modules.cc_idnumdef import (
IdNumDefinition,
ID_NUM_VALIDATION_METHOD_CHOICES,
validate_id_number,
)
from camcops_server.cc_modules.cc_ipuse import IpUse
from camcops_server.cc_modules.cc_language import (
DEFAULT_LOCALE,
POSSIBLE_LOCALES,
POSSIBLE_LOCALES_WITH_DESCRIPTIONS,
)
from camcops_server.cc_modules.cc_patient import Patient
from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
from camcops_server.cc_modules.cc_policy import (
TABLET_ID_POLICY_STR,
TokenizedPolicy,
)
from camcops_server.cc_modules.cc_pyramid import FormAction, ViewArg, ViewParam
from camcops_server.cc_modules.cc_task import tablename_to_task_class_dict
from camcops_server.cc_modules.cc_taskschedule import (
TaskSchedule,
TaskScheduleEmailTemplateFormatter,
)
from camcops_server.cc_modules.cc_validators import (
ALPHANUM_UNDERSCORE_CHAR,
validate_anything,
validate_by_char_and_length,
validate_download_filename,
validate_group_name,
validate_hl7_aa,
validate_hl7_id_type,
validate_ip_address,
validate_new_password,
validate_redirect_url,
validate_username,
)
if TYPE_CHECKING:
from deform.field import Field
from camcops_server.cc_modules.cc_request import CamcopsRequest
from camcops_server.cc_modules.cc_task import Task
from camcops_server.cc_modules.cc_user import User
log = BraceStyleAdapter(logging.getLogger(__name__))
ColanderNullType = _null
ValidatorType = Callable[[SchemaNode, Any], None] # called as v(node, value)
# =============================================================================
# Debugging options
# =============================================================================
DEBUG_CSRF_CHECK = False
if DEBUG_CSRF_CHECK:
log.warning("Debugging options enabled!")
# =============================================================================
# Constants
# =============================================================================
DEFORM_ACCORDION_BUG = True
# If you have a sequence containing an accordion (e.g. advanced JSON settings),
# then when you add a new node (e.g. "Add Task schedule") then the newly
# created node's accordion won't open out.
# https://github.com/Pylons/deform/issues/347
[docs]class Binding(object):
"""
Keys used for binding dictionaries with Colander schemas (schemata).
Must match ``kwargs`` of calls to ``bind()`` function of each ``Schema``.
"""
GROUP = "group"
OPEN_ADMIN = "open_admin"
OPEN_WHAT = "open_what"
OPEN_WHEN = "open_when"
OPEN_WHO = "open_who"
REQUEST = "request"
TRACKER_TASKS_ONLY = "tracker_tasks_only"
USER = "user"
[docs]class BootstrapCssClasses(object):
"""
Constants from Bootstrap to control display.
"""
FORM_INLINE = "form-inline"
RADIO_INLINE = "radio-inline"
LIST_INLINE = "list-inline"
CHECKBOX_INLINE = "checkbox-inline"
AUTOCOMPLETE_ATTR = "autocomplete"
[docs]class AutocompleteAttrValues(object):
"""
Some values for the HTML "autocomplete" attribute, as per
https://developer.mozilla.org/en-US/docs/Web/HTML/Attributes/autocomplete.
Not all are used.
"""
BDAY = "bday"
CURRENT_PASSWORD = "current-password"
EMAIL = "email"
FAMILY_NAME = "family-name"
GIVEN_NAME = "given-name"
NEW_PASSWORD = "new-password"
OFF = "off"
ON = "on" # browser decides
STREET_ADDRESS = "stree-address"
USERNAME = "username"
def get_tinymce_options(request: "CamcopsRequest") -> Dict[str, Any]:
return {
"content_css": "static/tinymce/custom_content.css",
"menubar": "false",
"plugins": "link",
"toolbar": (
"undo redo | bold italic underline | link | "
"bullist numlist | "
"alignleft aligncenter alignright alignjustify | "
"outdent indent"
),
"language": request.language_iso_639_1,
}
# =============================================================================
# Common phrases for translation
# =============================================================================
def or_join_description(request: "CamcopsRequest") -> str:
_ = request.gettext
return _("If you specify more than one, they will be joined with OR.")
def change_password_title(request: "CamcopsRequest") -> str:
_ = request.gettext
return _("Change password")
def sex_choices(request: "CamcopsRequest") -> List[Tuple[str, str]]:
_ = request.gettext
return [
(SEX_FEMALE, _("Female (F)")),
(SEX_MALE, _("Male (M)")),
# TRANSLATOR: sex code description
(SEX_OTHER_UNSPECIFIED, _("Other/unspecified (X)")),
]
# =============================================================================
# Deform bug fix: SelectWidget "multiple" attribute
# =============================================================================
SelectWidget = BugfixSelectWidget
# =============================================================================
# Form that handles Content-Security-Policy nonce tags
# =============================================================================
# =============================================================================
# Mixin for Schema/SchemaNode objects for translation
# =============================================================================
GETTEXT_TYPE = Callable[[str], str]
[docs]class RequestAwareMixin(object):
"""
Mixin to add Pyramid request awareness to Schema/SchemaNode objects,
together with some translations and other convenience functions.
"""
[docs] def __init__(self, *args, **kwargs) -> None:
# Stop multiple inheritance complaints
super().__init__(*args, **kwargs)
# noinspection PyUnresolvedReferences
@property
def request(self) -> "CamcopsRequest":
return self.bindings[Binding.REQUEST]
# noinspection PyUnresolvedReferences,PyPropertyDefinition
@property
def gettext(self) -> GETTEXT_TYPE:
return self.request.gettext
@property
def or_join_description(self) -> str:
return or_join_description(self.request)
# =============================================================================
# Translatable version of ValidateDangerousOperationNode
# =============================================================================
[docs]class TranslatableValidateDangerousOperationNode(
ValidateDangerousOperationNode, RequestAwareMixin
):
"""
Translatable version of ValidateDangerousOperationNode.
"""
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
super().after_bind(node, kw) # calls set_description()
_ = self.gettext
node.title = _("Danger")
user_entry = get_child_node(self, "user_entry")
user_entry.title = _("Validate this dangerous operation")
def set_description(self, target_value: str) -> None:
# Overrides parent version (q.v.).
_ = self.gettext
user_entry = get_child_node(self, "user_entry")
prefix = _("Please enter the following: ")
user_entry.description = prefix + target_value
# =============================================================================
# Translatable version of SequenceWidget
# =============================================================================
# =============================================================================
# Translatable version of OptionalPendulumNode
# =============================================================================
[docs]class TranslatableOptionalPendulumNode(
OptionalPendulumNode, RequestAwareMixin
):
"""
Translates the "Date" and "Time" labels for the widget, via
the request.
.. todo:: TranslatableOptionalPendulumNode not fully implemented
"""
[docs] def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.widget = None # type: Optional[Widget]
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.widget = DateTimeInputWidget(
date_options=DEFAULT_WIDGET_DATE_OPTIONS_FOR_PENDULUM,
time_options=DEFAULT_WIDGET_TIME_OPTIONS_FOR_PENDULUM,
)
# log.debug("TranslatableOptionalPendulumNode.widget: {!r}",
# self.widget.__dict__)
[docs]class TranslatableDateTimeSelectorNode(
DateTimeSelectorNode, RequestAwareMixin
):
"""
Translates the "Date" and "Time" labels for the widget, via
the request.
.. todo:: TranslatableDateTimeSelectorNode not fully implemented
"""
[docs] def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.widget = None # type: Optional[Widget]
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.widget = DateTimeInputWidget()
# log.debug("TranslatableDateTimeSelectorNode.widget: {!r}",
# self.widget.__dict__)
'''
class TranslatableDateSelectorNode(DateSelectorNode,
RequestAwareMixin):
"""
Translates the "Date" and "Time" labels for the widget, via
the request.
.. todo:: TranslatableDateSelectorNode not fully implemented
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.widget = None # type: Optional[Widget]
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.widget = DateInputWidget()
# log.debug("TranslatableDateSelectorNode.widget: {!r}",
# self.widget.__dict__)
'''
# =============================================================================
# CSRF
# =============================================================================
[docs]class CSRFToken(SchemaNode, RequestAwareMixin):
"""
Node to embed a cross-site request forgery (CSRF) prevention token in a
form.
As per https://deformdemo.repoze.org/pyramid_csrf_demo/, modified for a
more recent Colander API.
NOTE that this makes use of colander.SchemaNode.bind; this CLONES the
Schema, and resolves any deferred values by means of the keywords passed to
bind(). Since the Schema is created at module load time, but since we're
asking the Schema to know about the request's CSRF values, this is the only
mechanism
(https://docs.pylonsproject.org/projects/colander/en/latest/api.html#colander.SchemaNode.bind).
From https://deform2000.readthedocs.io/en/latest/basics.html:
"The default of a schema node indicates the value to be serialized if a
value for the schema node is not found in the input data during
serialization. It should be the deserialized representation. If a schema
node does not have a default, it is considered "serialization required"."
"The missing of a schema node indicates the value to be deserialized if a
value for the schema node is not found in the input data during
deserialization. It should be the deserialized representation. If a schema
node does not have a missing value, a colander.Invalid exception will be
raised if the data structure being deserialized does not contain a matching
value."
RNC: Serialized values are always STRINGS.
""" # noqa
schema_type = String
default = ""
missing = ""
title = " "
# ... evaluates to True but won't be visible, if the "hidden" aspect ever
# fails
widget = HiddenWidget()
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
request = self.request
csrf_token = request.session.get_csrf_token()
if DEBUG_CSRF_CHECK:
log.debug("Got CSRF token from session: {!r}", csrf_token)
self.default = csrf_token
def validator(self, node: SchemaNode, value: Any) -> None:
# Deferred validator via method, as per
# https://docs.pylonsproject.org/projects/colander/en/latest/basics.html # noqa
request = self.request
csrf_token = request.session.get_csrf_token() # type: str
matches = value == csrf_token
if DEBUG_CSRF_CHECK:
log.debug(
"Validating CSRF token: form says {!r}, session says "
"{!r}, matches = {}",
value,
csrf_token,
matches,
)
if not matches:
log.warning(
"CSRF token mismatch; remote address {}", request.remote_addr
)
_ = request.gettext
raise Invalid(node, _("Bad CSRF token"))
[docs]class CSRFSchema(Schema, RequestAwareMixin):
"""
Base class for form schemas that use CSRF (XSRF; cross-site request
forgery) tokens.
You can't put the call to ``bind()`` at the end of ``__init__()``, because
``bind()`` calls ``clone()`` with no arguments and ``clone()`` ends up
calling ``__init__()```...
The item name should be one that the ZAP penetration testing tool expects,
or you get:
.. code-block:: none
No known Anti-CSRF token [anticsrf, CSRFToken,
__RequestVerificationToken, csrfmiddlewaretoken, authenticity_token,
OWASP_CSRFTOKEN, anoncsrf, csrf_token, _csrf, _csrfSecret] was found in
the following HTML form: [Form 1: "_charset_" "__formid__"
"deformField1" "deformField2" "deformField3" "deformField4" ].
"""
csrf_token = CSRFToken() # name must match ViewParam.CSRF_TOKEN
# ... name should also be one that ZAP expects, as above
# =============================================================================
# Horizontal forms
# =============================================================================
[docs]def add_css_class(
kwargs: Dict[str, Any], extra_classes: str, param_name: str = "css_class"
) -> None:
"""
Modifies a kwargs dictionary to add a CSS class to the ``css_class``
parameter.
Args:
kwargs: a dictionary
extra_classes: CSS classes to add (as a space-separated string)
param_name: parameter name to modify; by default, "css_class"
"""
if param_name in kwargs:
kwargs[param_name] += " " + extra_classes
else:
kwargs[param_name] = extra_classes
# =============================================================================
# Specialized Form classes
# =============================================================================
# =============================================================================
# Specialized SchemaNode classes used in several contexts
# =============================================================================
# -----------------------------------------------------------------------------
# Task types
# -----------------------------------------------------------------------------
[docs]class OptionalSingleTaskSelector(OptionalStringNode, RequestAwareMixin):
"""
Node to pick one task type.
"""
[docs] def __init__(
self, *args, tracker_tasks_only: bool = False, **kwargs
) -> None:
"""
Args:
tracker_tasks_only: restrict the choices to tasks that offer
trackers.
"""
self.title = "" # for type checker
self.tracker_tasks_only = tracker_tasks_only
self.widget = None # type: Optional[Widget]
self.validator = None # type: Optional[ValidatorType]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Task type")
if Binding.TRACKER_TASKS_ONLY in kw:
self.tracker_tasks_only = kw[Binding.TRACKER_TASKS_ONLY]
values, pv = get_values_and_permissible(
self.get_task_choices(), True, _("[Any]")
)
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
def get_task_choices(self) -> List[Tuple[str, str]]:
from camcops_server.cc_modules.cc_task import Task # delayed import
choices = [] # type: List[Tuple[str, str]]
for tc in Task.all_subclasses_by_shortname():
if self.tracker_tasks_only and not tc.provides_trackers:
continue
choices.append((tc.tablename, tc.shortname))
return choices
[docs]class MandatorySingleTaskSelector(MandatoryStringNode, RequestAwareMixin):
"""
Node to pick one task type.
"""
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None:
self.title = "" # for type checker
self.widget = None # type: Optional[Widget]
self.validator = None # type: Optional[ValidatorType]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Task type")
values, pv = get_values_and_permissible(self.get_task_choices(), False)
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
@staticmethod
def get_task_choices() -> List[Tuple[str, str]]:
from camcops_server.cc_modules.cc_task import Task # delayed import
choices = [] # type: List[Tuple[str, str]]
for tc in Task.all_subclasses_by_shortname():
choices.append((tc.tablename, tc.shortname))
return choices
[docs]class MultiTaskSelector(SchemaNode, RequestAwareMixin):
"""
Node to select multiple task types.
"""
schema_type = Set
default = ""
missing = ""
[docs] def __init__(
self,
*args,
tracker_tasks_only: bool = False,
minimum_number: int = 0,
**kwargs,
) -> None:
self.tracker_tasks_only = tracker_tasks_only
self.minimum_number = minimum_number
self.widget = None # type: Optional[Widget]
self.validator = None # type: Optional[ValidatorType]
self.title = "" # for type checker
self.description = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
request = self.request # noqa: F841
self.title = _("Task type(s)")
self.description = (
_("If none are selected, all task types will be offered.")
+ " "
+ self.or_join_description
)
if Binding.TRACKER_TASKS_ONLY in kw:
self.tracker_tasks_only = kw[Binding.TRACKER_TASKS_ONLY]
values, pv = get_values_and_permissible(self.get_task_choices())
self.widget = CheckboxChoiceWidget(values=values, inline=True)
self.validator = Length(min=self.minimum_number)
def get_task_choices(self) -> List[Tuple[str, str]]:
from camcops_server.cc_modules.cc_task import Task # delayed import
choices = [] # type: List[Tuple[str, str]]
for tc in Task.all_subclasses_by_shortname():
if self.tracker_tasks_only and not tc.provides_trackers:
continue
choices.append((tc.tablename, tc.shortname))
return choices
# -----------------------------------------------------------------------------
# Use the task index?
# -----------------------------------------------------------------------------
[docs]class ViaIndexSelector(BooleanNode, RequestAwareMixin):
"""
Node to choose whether we use the server index or not.
Default is true.
"""
[docs] def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, default=True, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Use server index?")
self.label = _("Use server index? (Default is true; much faster.)")
# -----------------------------------------------------------------------------
# ID numbers
# -----------------------------------------------------------------------------
[docs]class MandatoryWhichIdNumSelector(SchemaNode, RequestAwareMixin):
"""
Node to enforce the choice of a single ID number type (e.g. "NHS number"
or "study Blah ID number").
"""
widget = SelectWidget()
[docs] def __init__(self, *args, **kwargs) -> None:
if not hasattr(self, "allow_none"):
# ... allows parameter-free (!) inheritance by
# OptionalWhichIdNumSelector
self.allow_none = False
self.title = "" # for type checker
self.description = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
request = self.request
_ = request.gettext
self.title = _("Identifier")
values = [] # type: List[Tuple[Optional[int], str]]
for iddef in request.idnum_definitions:
values.append((iddef.which_idnum, iddef.description))
values, pv = get_values_and_permissible(
values, self.allow_none, _("[ignore]")
)
# ... can't use None, because SelectWidget() will convert that to
# "None"; can't use colander.null, because that converts to
# "<colander.null>"; use "", which is the default null_value of
# SelectWidget.
self.widget.values = values
self.validator = OneOf(pv)
@staticmethod
def schema_type() -> SchemaType:
return Integer()
[docs]class LinkingIdNumSelector(MandatoryWhichIdNumSelector):
"""
Convenience node: pick a single ID number, with title/description
indicating that it's the ID number to link on.
"""
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
super().after_bind(node, kw)
_ = self.gettext
self.title = _("Linking ID number")
self.description = _("Which ID number to link on?")
[docs]class MandatoryIdNumValue(SchemaNode, RequestAwareMixin):
"""
Mandatory node to capture an ID number value.
"""
schema_type = Integer
validator = Range(min=0)
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("ID# value")
[docs]class MandatoryIdNumNode(MappingSchema, RequestAwareMixin):
"""
Mandatory node to capture an ID number type and the associated actual
ID number (value).
This is also where we apply ID number validation rules (e.g. NHS number).
"""
which_idnum = (
MandatoryWhichIdNumSelector()
) # must match ViewParam.WHICH_IDNUM
idnum_value = MandatoryIdNumValue() # must match ViewParam.IDNUM_VALUE
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("ID number")
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: Dict[str, int]) -> None:
assert isinstance(value, dict)
req = self.request
_ = req.gettext
which_idnum = value[ViewParam.WHICH_IDNUM]
idnum_value = value[ViewParam.IDNUM_VALUE]
idnum_def = req.get_idnum_definition(which_idnum)
if not idnum_def:
raise Invalid(node, _("Bad ID number type")) # shouldn't happen
method = idnum_def.validation_method
if method:
valid, why_invalid = validate_id_number(req, idnum_value, method)
if not valid:
raise Invalid(node, why_invalid)
[docs]class IdNumSequenceAnyCombination(SequenceSchema, RequestAwareMixin):
"""
Sequence to capture multiple ID numbers (as type/value pairs).
"""
idnum_sequence = MandatoryIdNumNode()
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("ID numbers")
self.widget = TranslatableSequenceWidget(request=self.request)
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: List[Dict[str, int]]) -> None:
assert isinstance(value, list)
list_of_lists = [
(x[ViewParam.WHICH_IDNUM], x[ViewParam.IDNUM_VALUE]) for x in value
]
if len(list_of_lists) != len(set(list_of_lists)):
_ = self.gettext
raise Invalid(
node, _("You have specified duplicate ID definitions")
)
[docs]class IdNumSequenceUniquePerWhichIdnum(SequenceSchema, RequestAwareMixin):
"""
Sequence to capture multiple ID numbers (as type/value pairs) but with only
up to one per ID number type.
"""
idnum_sequence = MandatoryIdNumNode()
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("ID numbers")
self.widget = TranslatableSequenceWidget(request=self.request)
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: List[Dict[str, int]]) -> None:
assert isinstance(value, list)
which_idnums = [x[ViewParam.WHICH_IDNUM] for x in value]
if len(which_idnums) != len(set(which_idnums)):
_ = self.gettext
raise Invalid(
node, _("You have specified >1 value for one ID number type")
)
# -----------------------------------------------------------------------------
# Sex
# -----------------------------------------------------------------------------
[docs]class OptionalSexSelector(OptionalStringNode, RequestAwareMixin):
"""
Optional node to choose sex.
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Sex")
choices = sex_choices(self.request)
values, pv = get_values_and_permissible(choices, True, _("Any"))
self.widget = RadioChoiceWidget(values=values, inline=True)
self.validator = OneOf(pv)
[docs]class MandatorySexSelector(MandatoryStringNode, RequestAwareMixin):
"""
Mandatory node to choose sex.
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Sex")
choices = sex_choices(self.request)
values, pv = get_values_and_permissible(choices)
self.widget = RadioChoiceWidget(values=values, inline=True)
self.validator = OneOf(pv)
# -----------------------------------------------------------------------------
# Users
# -----------------------------------------------------------------------------
[docs]class MandatoryUserIdSelectorUsersAllowedToSee(SchemaNode, RequestAwareMixin):
"""
Mandatory node to choose a user, from the users that the requesting user
is allowed to see.
"""
schema_type = Integer
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
from camcops_server.cc_modules.cc_user import User # delayed import
_ = self.gettext
self.title = _("User")
request = self.request
dbsession = request.dbsession
user = request.user
if user.superuser:
users = dbsession.query(User).order_by(User.username)
else:
# Users in my groups, or groups I'm allowed to see
my_allowed_group_ids = user.ids_of_groups_user_may_see
users = (
dbsession.query(User)
.join(Group)
.filter(Group.id.in_(my_allowed_group_ids))
.order_by(User.username)
)
values = [] # type: List[Tuple[Optional[int], str]]
for user in users:
values.append((user.id, user.username))
values, pv = get_values_and_permissible(values, False)
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
[docs]class OptionalUserNameSelector(OptionalStringNode, RequestAwareMixin):
"""
Optional node to select a username, from all possible users.
"""
title = "User"
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
from camcops_server.cc_modules.cc_user import User # delayed import
_ = self.gettext
self.title = _("User")
request = self.request
dbsession = request.dbsession
values = [] # type: List[Tuple[str, str]]
users = dbsession.query(User).order_by(User.username)
for user in users:
values.append((user.username, user.username))
values, pv = get_values_and_permissible(values, True, _("[ignore]"))
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
[docs]class UsernameNode(SchemaNode, RequestAwareMixin):
"""
Node to enter a username.
"""
schema_type = String
widget = TextInputWidget(
attributes={AUTOCOMPLETE_ATTR: AutocompleteAttrValues.OFF}
)
[docs] def __init__(
self, *args, autocomplete: str = AutocompleteAttrValues.OFF, **kwargs
) -> None:
self.title = "" # for type checker
self.autocomplete = autocomplete
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Username")
# noinspection PyUnresolvedReferences
self.widget.attributes[AUTOCOMPLETE_ATTR] = self.autocomplete
def validator(self, node: SchemaNode, value: str) -> None:
if value == USER_NAME_FOR_SYSTEM:
_ = self.gettext
raise Invalid(
node,
_("Cannot use system username")
+ " "
+ repr(USER_NAME_FOR_SYSTEM),
)
try:
validate_username(value, self.request)
except ValueError as e:
raise Invalid(node, str(e))
[docs]class UserFilterSchema(Schema, RequestAwareMixin):
"""
Schema to filter the list of users
"""
# must match ViewParam.INCLUDE_AUTO_GENERATED
include_auto_generated = BooleanNode()
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
include_auto_generated = get_child_node(self, "include_auto_generated")
include_auto_generated.title = _("Include auto-generated users")
include_auto_generated.label = None
# -----------------------------------------------------------------------------
# Devices
# -----------------------------------------------------------------------------
[docs]class MandatoryDeviceIdSelector(SchemaNode, RequestAwareMixin):
"""
Mandatory node to select a client device ID.
"""
schema_type = Integer
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
from camcops_server.cc_modules.cc_device import (
Device,
) # delayed import
_ = self.gettext
self.title = _("Device")
request = self.request
dbsession = request.dbsession
devices = dbsession.query(Device).order_by(Device.friendly_name)
values = [] # type: List[Tuple[Optional[int], str]]
for device in devices:
values.append((device.id, device.friendly_name))
values, pv = get_values_and_permissible(values, False)
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
# -----------------------------------------------------------------------------
# Server PK
# -----------------------------------------------------------------------------
[docs]class ServerPkSelector(OptionalIntNode, RequestAwareMixin):
"""
Optional node to request an integer, marked as a server PK.
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Server PK")
# -----------------------------------------------------------------------------
# Dates/times
# -----------------------------------------------------------------------------
[docs]class StartPendulumSelector(
TranslatableOptionalPendulumNode, RequestAwareMixin
):
"""
Optional node to select a start date/time.
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
super().after_bind(node, kw)
_ = self.gettext
self.title = _("Start date/time (local timezone; inclusive)")
[docs]class EndPendulumSelector(TranslatableOptionalPendulumNode, RequestAwareMixin):
"""
Optional node to select an end date/time.
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
super().after_bind(node, kw)
_ = self.gettext
self.title = _("End date/time (local timezone; exclusive)")
[docs]class StartDateTimeSelector(
TranslatableDateTimeSelectorNode, RequestAwareMixin
):
"""
Optional node to select a start date/time (in UTC).
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
super().after_bind(node, kw)
_ = self.gettext
self.title = _("Start date/time (UTC; inclusive)")
[docs]class EndDateTimeSelector(TranslatableDateTimeSelectorNode, RequestAwareMixin):
"""
Optional node to select an end date/time (in UTC).
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
super().after_bind(node, kw)
_ = self.gettext
self.title = _("End date/time (UTC; exclusive)")
'''
class StartDateSelector(TranslatableDateSelectorNode,
RequestAwareMixin):
"""
Optional node to select a start date (in UTC).
"""
def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
super().after_bind(node, kw)
_ = self.gettext
self.title = _("Start date (UTC; inclusive)")
class EndDateSelector(TranslatableDateSelectorNode,
RequestAwareMixin):
"""
Optional node to select an end date (in UTC).
"""
def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
super().after_bind(node, kw)
_ = self.gettext
self.title = _("End date (UTC; inclusive)")
'''
# -----------------------------------------------------------------------------
# Rows per page
# -----------------------------------------------------------------------------
[docs]class RowsPerPageSelector(SchemaNode, RequestAwareMixin):
"""
Node to select how many rows per page are shown.
"""
_choices = ((10, "10"), (25, "25"), (50, "50"), (100, "100"))
schema_type = Integer
default = DEFAULT_ROWS_PER_PAGE
widget = RadioChoiceWidget(values=_choices)
validator = OneOf(list(x[0] for x in _choices))
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Items to show per page")
# -----------------------------------------------------------------------------
# Groups
# -----------------------------------------------------------------------------
[docs]class MandatoryGroupIdSelectorAllGroups(SchemaNode, RequestAwareMixin):
"""
Offers a picklist of groups from ALL POSSIBLE GROUPS.
Used by superusers: "add user to any group".
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Group")
request = self.request
dbsession = request.dbsession
groups = dbsession.query(Group).order_by(Group.name)
values = [(g.id, g.name) for g in groups]
values, pv = get_values_and_permissible(values)
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
@staticmethod
def schema_type() -> SchemaType:
return Integer()
[docs]class MandatoryGroupIdSelectorAdministeredGroups(
SchemaNode, RequestAwareMixin
):
"""
Offers a picklist of groups from GROUPS ADMINISTERED BY REQUESTOR.
Used by groupadmins: "add user to one of my groups".
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Group")
request = self.request
dbsession = request.dbsession
administered_group_ids = request.user.ids_of_groups_user_is_admin_for
groups = dbsession.query(Group).order_by(Group.name)
values = [
(g.id, g.name) for g in groups if g.id in administered_group_ids
]
values, pv = get_values_and_permissible(values)
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
@staticmethod
def schema_type() -> SchemaType:
return Integer()
[docs]class MandatoryGroupIdSelectorPatientGroups(SchemaNode, RequestAwareMixin):
"""
Offers a picklist of groups the user can manage patients in.
Used when managing patients: "add patient to one of my groups".
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Group")
request = self.request
dbsession = request.dbsession
group_ids = request.user.ids_of_groups_user_may_manage_patients_in
groups = dbsession.query(Group).order_by(Group.name)
values = [(g.id, g.name) for g in groups if g.id in group_ids]
values, pv = get_values_and_permissible(values)
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
@staticmethod
def schema_type() -> SchemaType:
return Integer()
[docs]class MandatoryGroupIdSelectorOtherGroups(SchemaNode, RequestAwareMixin):
"""
Offers a picklist of groups THAT ARE NOT THE SPECIFIED GROUP (as specified
in ``kw[Binding.GROUP]``).
Used by superusers: "which other groups can this group see?"
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Other group")
request = self.request
group = kw[Binding.GROUP] # type: Group # ATYPICAL BINDING
dbsession = request.dbsession
groups = dbsession.query(Group).order_by(Group.name)
values = [(g.id, g.name) for g in groups if g.id != group.id]
values, pv = get_values_and_permissible(values)
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
@staticmethod
def schema_type() -> SchemaType:
return Integer()
[docs]class MandatoryGroupIdSelectorUserGroups(SchemaNode, RequestAwareMixin):
"""
Offers a picklist of groups from THOSE THE USER IS A MEMBER OF.
Used for: "which of your groups do you want to upload into?"
"""
[docs] def __init__(self, *args, **kwargs) -> None:
if not hasattr(self, "allow_none"):
# ... allows parameter-free (!) inheritance by
# OptionalGroupIdSelectorUserGroups
self.allow_none = False
self.title = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Group")
user = kw[Binding.USER] # type: User # ATYPICAL BINDING
groups = sorted(list(user.groups), key=lambda g: g.name)
values = [(g.id, g.name) for g in groups]
values, pv = get_values_and_permissible(
values, self.allow_none, _("[None]")
)
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
@staticmethod
def schema_type() -> SchemaType:
return Integer()
[docs]class OptionalGroupIdSelectorUserGroups(MandatoryGroupIdSelectorUserGroups):
"""
Offers a picklist of groups from THOSE THE USER IS A MEMBER OF.
Used for "which do you want to upload into?". Optional.
"""
default = None
missing = None
[docs] def __init__(self, *args, **kwargs) -> None:
self.allow_none = True
super().__init__(*args, **kwargs)
@staticmethod
def schema_type() -> SchemaType:
return AllowNoneType(Integer())
[docs]class MandatoryGroupIdSelectorAllowedGroups(SchemaNode, RequestAwareMixin):
"""
Offers a picklist of groups from THOSE THE USER IS ALLOWED TO SEE.
Used for task filters.
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Group")
request = self.request
dbsession = request.dbsession
user = request.user
if user.superuser:
groups = dbsession.query(Group).order_by(Group.name)
else:
groups = sorted(list(user.groups), key=lambda g: g.name)
values = [(g.id, g.name) for g in groups]
values, pv = get_values_and_permissible(values)
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
@staticmethod
def schema_type() -> SchemaType:
return Integer()
[docs]class GroupsSequenceBase(SequenceSchema, RequestAwareMixin):
"""
Sequence schema to capture zero or more non-duplicate groups.
"""
[docs] def __init__(self, *args, minimum_number: int = 0, **kwargs) -> None:
self.title = "" # for type checker
self.minimum_number = minimum_number
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Groups")
self.widget = TranslatableSequenceWidget(request=self.request)
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: List[int]) -> None:
assert isinstance(value, list)
_ = self.gettext
if len(value) != len(set(value)):
raise Invalid(node, _("You have specified duplicate groups"))
if len(value) < self.minimum_number:
raise Invalid(
node,
_("You must specify at least {} group(s)").format(
self.minimum_number
),
)
[docs]class AllGroupsSequence(GroupsSequenceBase):
"""
Sequence to offer a choice of all possible groups.
Typical use: superuser assigns group memberships to a user.
"""
group_id_sequence = MandatoryGroupIdSelectorAllGroups()
[docs]class AdministeredGroupsSequence(GroupsSequenceBase):
"""
Sequence to offer a choice of the groups administered by the requestor.
Typical use: (non-superuser) group administrator assigns group memberships
to a user.
"""
group_id_sequence = MandatoryGroupIdSelectorAdministeredGroups()
[docs] def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, minimum_number=1, **kwargs)
[docs]class AllOtherGroupsSequence(GroupsSequenceBase):
"""
Sequence to offer a choice of all possible OTHER groups (as determined
relative to the group specified in ``kw[Binding.GROUP]``).
Typical use: superuser assigns group permissions to another group.
"""
group_id_sequence = MandatoryGroupIdSelectorOtherGroups()
[docs]class AllowedGroupsSequence(GroupsSequenceBase):
"""
Sequence to offer a choice of all the groups the user is allowed to see.
"""
group_id_sequence = MandatoryGroupIdSelectorAllowedGroups()
[docs] def __init__(self, *args, **kwargs) -> None:
self.description = "" # for type checker
super().__init__(*args, **kwargs)
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
super().after_bind(node, kw)
self.description = self.or_join_description
# -----------------------------------------------------------------------------
# Languages (strictly, locales)
# -----------------------------------------------------------------------------
[docs]class LanguageSelector(SchemaNode, RequestAwareMixin):
"""
Node to choose a language code, from those supported by the server.
"""
_choices = POSSIBLE_LOCALES_WITH_DESCRIPTIONS
schema_type = String
default = DEFAULT_LOCALE
missing = DEFAULT_LOCALE
widget = SelectWidget(values=_choices) # intrinsically translated!
validator = OneOf(POSSIBLE_LOCALES)
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Group")
request = self.request # noqa: F841
self.title = _("Language")
# -----------------------------------------------------------------------------
# Validating dangerous operations
# -----------------------------------------------------------------------------
[docs]class HardWorkConfirmationSchema(CSRFSchema):
"""
Schema to make it hard to do something. We require a pattern of yes/no
answers before we will proceed.
"""
confirm_1_t = BooleanNode(default=False)
confirm_2_t = BooleanNode(default=True)
confirm_3_f = BooleanNode(default=True)
confirm_4_t = BooleanNode(default=False)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
confirm_1_t = get_child_node(self, "confirm_1_t")
confirm_1_t.title = _("Really?")
confirm_2_t = get_child_node(self, "confirm_2_t")
# TRANSLATOR: string context described here
confirm_2_t.title = _("Leave ticked to confirm")
confirm_3_f = get_child_node(self, "confirm_3_f")
confirm_3_f.title = _("Please untick to confirm")
confirm_4_t = get_child_node(self, "confirm_4_t")
confirm_4_t.title = _("Be really sure; tick here also to confirm")
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: Any) -> None:
if (
(not value["confirm_1_t"])
or (not value["confirm_2_t"])
or value["confirm_3_f"]
or (not value["confirm_4_t"])
):
_ = self.gettext
raise Invalid(node, _("Not fully confirmed"))
# -----------------------------------------------------------------------------
# URLs
# -----------------------------------------------------------------------------
[docs]class HiddenRedirectionUrlNode(HiddenStringNode, RequestAwareMixin):
"""
Note to encode a hidden URL, for redirection.
"""
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: str) -> None:
if value:
try:
validate_redirect_url(value, self.request)
except ValueError:
_ = self.gettext
raise Invalid(node, _("Invalid redirection URL"))
# -----------------------------------------------------------------------------
# Phone number
# -----------------------------------------------------------------------------
[docs]class PhoneNumberType(String):
[docs] def __init__(self, request: "CamcopsRequest", *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.request = request
# noinspection PyMethodMayBeStatic, PyUnusedLocal
def deserialize(
self, node: SchemaNode, cstruct: Union[str, ColanderNullType, None]
) -> Optional[phonenumbers.PhoneNumber]:
request = self.request # type: CamcopsRequest
_ = request.gettext
err_message = _("Invalid phone number")
# is null when form is empty
if not cstruct:
if not self.allow_empty:
raise Invalid(node, err_message)
return null
cstruct: str
try:
phone_number = phonenumbers.parse(
cstruct, request.config.region_code
)
except phonenumbers.NumberParseException:
raise Invalid(node, err_message)
if not phonenumbers.is_valid_number(phone_number):
# the number may parse but could still be invalid
# (e.g. too few digits)
raise Invalid(node, err_message)
return phone_number
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def serialize(
self,
node: SchemaNode,
appstruct: Union[phonenumbers.PhoneNumber, None, ColanderNullType],
) -> Union[str, ColanderNullType]:
# is None when populated from empty value in the database
if not appstruct:
return null
# appstruct should be well formed here (it would already have failed
# when reading from the database)
return phonenumbers.format_number(
appstruct, phonenumbers.PhoneNumberFormat.E164
)
[docs]class MandatoryPhoneNumberNode(MandatoryStringNode, RequestAwareMixin):
default = None
missing = None
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Phone number")
self.typ = PhoneNumberType(self.request, allow_empty=False)
# =============================================================================
# Login
# =============================================================================
[docs]class LoginSchema(CSRFSchema):
"""
Schema to capture login details.
"""
username = UsernameNode(
autocomplete=AutocompleteAttrValues.USERNAME
) # name must match ViewParam.USERNAME
password = SchemaNode( # name must match ViewParam.PASSWORD
String(),
widget=PasswordWidget(
attributes={
AUTOCOMPLETE_ATTR: AutocompleteAttrValues.CURRENT_PASSWORD
}
),
)
redirect_url = (
HiddenRedirectionUrlNode()
) # name must match ViewParam.REDIRECT_URL
[docs] def __init__(
self, *args, autocomplete_password: bool = True, **kwargs
) -> None:
self.autocomplete_password = autocomplete_password
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
password = get_child_node(self, "password")
password.title = _("Password")
password.widget.attributes[AUTOCOMPLETE_ATTR] = (
AutocompleteAttrValues.CURRENT_PASSWORD
if self.autocomplete_password
else AutocompleteAttrValues.OFF
)
# Suboptimal: autocomplete_password is not applied to the password
# widget, just to the form; see
# http://stackoverflow.com/questions/2530
# Note that e.g. Chrome may ignore this.
# ... fixed 2020-09-29 by applying autocomplete to LoginSchema.password
[docs]class OtpSchema(CSRFSchema):
"""
Schema to capture a one-time password for Multi-factor Authentication.
"""
one_time_password = MandatoryStringNode()
redirect_url = (
HiddenRedirectionUrlNode()
) # name must match ViewParam.REDIRECT_URL
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
one_time_password = get_child_node(self, "one_time_password")
one_time_password.title = _("Enter the six-digit code")
# =============================================================================
# Change password
# =============================================================================
[docs]class MustChangePasswordNode(SchemaNode, RequestAwareMixin):
"""
Boolean node: must the user change their password?
"""
schema_type = Boolean
default = True
missing = True
[docs] def __init__(self, *args, **kwargs) -> None:
self.label = "" # for type checker
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.label = _("User must change password at next login")
self.title = _("Must change password at next login?")
[docs]class OldUserPasswordCheck(SchemaNode, RequestAwareMixin):
"""
Schema to capture an old password (for when a password is being changed).
"""
schema_type = String
widget = PasswordWidget(
attributes={AUTOCOMPLETE_ATTR: AutocompleteAttrValues.CURRENT_PASSWORD}
)
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Old password")
def validator(self, node: SchemaNode, value: str) -> None:
request = self.request
user = request.user
assert user is not None
if not user.is_password_correct(value):
_ = request.gettext
raise Invalid(node, _("Old password incorrect"))
[docs]class NewPasswordNode(SchemaNode, RequestAwareMixin):
"""
Node to enter a new password.
"""
schema_type = String
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.description = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("New password")
self.description = _("Type the new password and confirm it")
self.widget = InformationalCheckedPasswordWidget(
self.request,
attributes={
AUTOCOMPLETE_ATTR: AutocompleteAttrValues.NEW_PASSWORD
},
)
def validator(self, node: SchemaNode, value: str) -> None:
try:
validate_new_password(value, self.request)
except ValueError as e:
raise Invalid(node, str(e))
[docs]class ChangeOwnPasswordSchema(CSRFSchema):
"""
Schema to change one's own password.
"""
old_password = OldUserPasswordCheck()
new_password = NewPasswordNode() # name must match ViewParam.NEW_PASSWORD
[docs] def __init__(self, *args, must_differ: bool = True, **kwargs) -> None:
"""
Args:
must_differ:
must the new password be different from the old one?
"""
self.must_differ = must_differ
super().__init__(*args, **kwargs)
def validator(self, node: SchemaNode, value: Dict[str, str]) -> None:
if self.must_differ and value["new_password"] == value["old_password"]:
_ = self.gettext
raise Invalid(node, _("New password must differ from old"))
[docs]class ChangeOtherPasswordSchema(CSRFSchema):
"""
Schema to change another user's password.
"""
user_id = HiddenIntegerNode() # name must match ViewParam.USER_ID
must_change_password = (
MustChangePasswordNode()
) # match ViewParam.MUST_CHANGE_PASSWORD
new_password = NewPasswordNode() # name must match ViewParam.NEW_PASSWORD
[docs]class DisableMfaNode(SchemaNode, RequestAwareMixin):
"""
Boolean node: disable multi-factor authentication
"""
schema_type = Boolean
default = False
missing = False
[docs] def __init__(self, *args, **kwargs) -> None:
self.label = "" # for type checker
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.label = _("Disable multi-factor authentication")
self.title = _("Disable multi-factor authentication?")
[docs]class EditOtherUserMfaSchema(CSRFSchema):
"""
Schema to reset multi-factor authentication for another user.
"""
user_id = HiddenIntegerNode() # name must match ViewParam.USER_ID
disable_mfa = DisableMfaNode() # match ViewParam.DISABLE_MFA
# =============================================================================
# Multi-factor authentication
# =============================================================================
[docs]class MfaSecretNode(OptionalStringNode, RequestAwareMixin):
"""
Node to display the TOTP (authorization app) secret as a QR code and
alphanumeric string.
"""
schema_type = String
# noinspection PyUnusedLocal,PyAttributeOutsideInit
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
self.widget = MfaSecretWidget(self.request)
[docs]class MfaMethodSelector(SchemaNode, RequestAwareMixin):
"""
Node to select type of authentication
"""
schema_type = String
default = MfaMethod.TOTP
missing = MfaMethod.TOTP
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.widget = None # type: Optional[Widget]
self.validator = None # type: Optional[ValidatorType]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Authentication type")
request = self.bindings[Binding.REQUEST] # type: CamcopsRequest
all_mfa_choices = [
(
MfaMethod.TOTP,
_("Use an app such as Google Authenticator or Twilio Authy"),
),
(MfaMethod.HOTP_EMAIL, _("Send me a code by email")),
(MfaMethod.HOTP_SMS, _("Send me a code by text message")),
(MfaMethod.NO_MFA, _("Disable multi-factor authentication")),
]
choices = []
for (label, description) in all_mfa_choices:
if label in request.config.mfa_methods:
choices.append((label, description))
values, pv = get_values_and_permissible(choices)
self.widget = RadioChoiceWidget(values=values)
self.validator = OneOf(pv)
[docs]class MfaMethodSchema(CSRFSchema):
"""
Schema to edit Multi-factor Authentication method.
"""
mfa_method = MfaMethodSelector() # must match ViewParam.MFA_METHOD
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
mfa_method = get_child_node(self, "mfa_method")
mfa_method.title = _("How do you wish to authenticate?")
[docs]class MfaTotpSchema(CSRFSchema):
"""
Schema to set up Multi-factor Authentication with authentication app.
"""
mfa_secret_key = MfaSecretNode() # must match ViewParam.MFA_SECRET_KEY
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
mfa_secret_key = get_child_node(self, "mfa_secret_key")
mfa_secret_key.title = _("Follow these steps:")
[docs]class MfaHotpEmailSchema(CSRFSchema):
"""
Schema to change a user's email address for multi-factor authentication.
"""
mfa_secret_key = HiddenStringNode() # must match ViewParam.MFA_SECRET_KEY
email = MandatoryEmailNode() # must match ViewParam.EMAIL
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
[docs]class MfaHotpSmsSchema(CSRFSchema):
"""
Schema to change a user's phone number for multi-factor authentication.
"""
mfa_secret_key = HiddenStringNode() # must match ViewParam.MFA_SECRET_KEY
phone_number = (
MandatoryPhoneNumberNode()
) # must match ViewParam.PHONE_NUMBER
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
phone_number = get_child_node(self, ViewParam.PHONE_NUMBER)
phone_number.description = _(
"Include the country code (e.g. +123) for numbers outside of the "
"'{region_code}' region"
).format(region_code=self.request.config.region_code)
# =============================================================================
# Offer/agree terms
# =============================================================================
[docs]class OfferTermsSchema(CSRFSchema):
"""
Schema to offer terms and ask the user to accept them.
"""
pass
# =============================================================================
# View audit trail
# =============================================================================
[docs]class OptionalIPAddressNode(OptionalStringNode, RequestAwareMixin):
"""
Optional IPv4 or IPv6 address.
"""
def validator(self, node: SchemaNode, value: str) -> None:
try:
validate_ip_address(value, self.request)
except ValueError as e:
raise Invalid(node, e)
[docs]class OptionalAuditSourceNode(OptionalStringNode, RequestAwareMixin):
"""
Optional IPv4 or IPv6 address.
"""
def validator(self, node: SchemaNode, value: str) -> None:
try:
validate_by_char_and_length(
value,
permitted_char_expression=ALPHANUM_UNDERSCORE_CHAR,
min_length=0,
max_length=StringLengths.AUDIT_SOURCE_MAX_LEN,
req=self.request,
)
except ValueError as e:
raise Invalid(node, e)
[docs]class AuditTrailSchema(CSRFSchema):
"""
Schema to filter audit trail entries.
"""
rows_per_page = RowsPerPageSelector() # must match ViewParam.ROWS_PER_PAGE
start_datetime = (
StartPendulumSelector()
) # must match ViewParam.START_DATETIME # noqa
end_datetime = EndPendulumSelector() # must match ViewParam.END_DATETIME
source = OptionalAuditSourceNode() # must match ViewParam.SOURCE # noqa
remote_ip_addr = (
OptionalIPAddressNode()
) # must match ViewParam.REMOTE_IP_ADDR # noqa
username = (
OptionalUserNameSelector()
) # must match ViewParam.USERNAME # noqa
table_name = (
OptionalSingleTaskSelector()
) # must match ViewParam.TABLENAME # noqa
server_pk = ServerPkSelector() # must match ViewParam.SERVER_PK
truncate = BooleanNode(default=True) # must match ViewParam.TRUNCATE
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
source = get_child_node(self, "source")
source.title = _("Source (e.g. webviewer, tablet, console)")
remote_ip_addr = get_child_node(self, "remote_ip_addr")
remote_ip_addr.title = _("Remote IP address")
truncate = get_child_node(self, "truncate")
truncate.title = _("Truncate details for easy viewing")
# =============================================================================
# View export logs
# =============================================================================
[docs]class OptionalExportRecipientNameSelector(
OptionalStringNode, RequestAwareMixin
):
"""
Optional node to pick an export recipient name from those present in the
database.
"""
title = "Export recipient"
[docs] def __init__(self, *args, **kwargs) -> None:
self.validator = None # type: Optional[ValidatorType]
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
from camcops_server.cc_modules.cc_exportrecipient import (
ExportRecipient,
) # delayed import
request = self.request
_ = request.gettext
dbsession = request.dbsession
q = (
dbsession.query(ExportRecipient.recipient_name)
.distinct()
.order_by(ExportRecipient.recipient_name)
)
values = [] # type: List[Tuple[str, str]]
for row in q:
recipient_name = row[0]
values.append((recipient_name, recipient_name))
values, pv = get_values_and_permissible(values, True, _("[Any]"))
self.widget = SelectWidget(values=values)
self.validator = OneOf(pv)
[docs]class ExportedTaskListSchema(CSRFSchema):
"""
Schema to filter HL7 message logs.
"""
rows_per_page = RowsPerPageSelector() # must match ViewParam.ROWS_PER_PAGE
recipient_name = (
OptionalExportRecipientNameSelector()
) # must match ViewParam.RECIPIENT_NAME # noqa
table_name = (
OptionalSingleTaskSelector()
) # must match ViewParam.TABLENAME # noqa
server_pk = ServerPkSelector() # must match ViewParam.SERVER_PK
id = OptionalIntNode() # must match ViewParam.ID # noqa
start_datetime = (
StartDateTimeSelector()
) # must match ViewParam.START_DATETIME # noqa
end_datetime = EndDateTimeSelector() # must match ViewParam.END_DATETIME
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
id_ = get_child_node(self, "id")
id_.title = _("ExportedTask ID")
# =============================================================================
# Task filters
# =============================================================================
[docs]class TextContentsSequence(SequenceSchema, RequestAwareMixin):
"""
Sequence to capture multiple pieces of text (representing text contents
for a task filter).
"""
text_sequence = SchemaNode(
String(), validator=Length(0, StringLengths.FILTER_TEXT_MAX_LEN)
) # BEWARE: fairly unrestricted contents.
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.description = "" # for type checker
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Text contents")
self.description = self.or_join_description
self.widget = TranslatableSequenceWidget(request=self.request)
# Now it'll say "[Add]" Text Sequence because it'll make the string
# "Text Sequence" from the name of text_sequence. Unless we do this:
text_sequence = get_child_node(self, "text_sequence")
# TRANSLATOR: For the task filter form: the text in "Add text"
text_sequence.title = _("text")
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: List[str]) -> None:
assert isinstance(value, list)
if len(value) != len(set(value)):
_ = self.gettext
raise Invalid(node, _("You have specified duplicate text filters"))
[docs]class UploadingUserSequence(SequenceSchema, RequestAwareMixin):
"""
Sequence to capture multiple users (for task filters: "uploaded by one of
the following users...").
"""
user_id_sequence = MandatoryUserIdSelectorUsersAllowedToSee()
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.description = "" # for type checker
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Uploading users")
self.description = self.or_join_description
self.widget = TranslatableSequenceWidget(request=self.request)
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: List[int]) -> None:
assert isinstance(value, list)
if len(value) != len(set(value)):
_ = self.gettext
raise Invalid(node, _("You have specified duplicate users"))
[docs]class DevicesSequence(SequenceSchema, RequestAwareMixin):
"""
Sequence to capture multiple client devices (for task filters: "uploaded by
one of the following devices...").
"""
device_id_sequence = MandatoryDeviceIdSelector()
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.description = "" # for type checker
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Uploading devices")
self.description = self.or_join_description
self.widget = TranslatableSequenceWidget(request=self.request)
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: List[int]) -> None:
assert isinstance(value, list)
if len(value) != len(set(value)):
raise Invalid(node, "You have specified duplicate devices")
[docs]class OptionalPatientNameNode(OptionalStringNode, RequestAwareMixin):
def validator(self, node: SchemaNode, value: str) -> None:
try:
# TODO: Validating human names is hard.
# Decide if validation here is necessary and whether it should
# be configurable.
# validate_human_name(value, self.request)
# Does nothing but better to be explicit
validate_anything(value, self.request)
except ValueError as e:
# Should never happen with validate_anything
raise Invalid(node, str(e))
[docs]class EditTaskFilterWhoSchema(Schema, RequestAwareMixin):
"""
Schema to edit the "who" parts of a task filter.
"""
surname = OptionalPatientNameNode() # must match ViewParam.SURNAME # noqa
forename = (
OptionalPatientNameNode()
) # must match ViewParam.FORENAME # noqa
dob = SchemaNode(Date(), missing=None) # must match ViewParam.DOB
sex = OptionalSexSelector() # must match ViewParam.SEX
id_references = (
IdNumSequenceAnyCombination()
) # must match ViewParam.ID_REFERENCES # noqa
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
surname = get_child_node(self, "surname")
surname.title = _("Surname")
forename = get_child_node(self, "forename")
forename.title = _("Forename")
dob = get_child_node(self, "dob")
dob.title = _("Date of birth")
id_references = get_child_node(self, "id_references")
id_references.description = self.or_join_description
[docs]class EditTaskFilterWhenSchema(Schema):
"""
Schema to edit the "when" parts of a task filter.
"""
start_datetime = (
StartPendulumSelector()
) # must match ViewParam.START_DATETIME # noqa
end_datetime = EndPendulumSelector() # must match ViewParam.END_DATETIME
[docs]class EditTaskFilterWhatSchema(Schema, RequestAwareMixin):
"""
Schema to edit the "what" parts of a task filter.
"""
text_contents = (
TextContentsSequence()
) # must match ViewParam.TEXT_CONTENTS # noqa
complete_only = BooleanNode(
default=False
) # must match ViewParam.COMPLETE_ONLY # noqa
tasks = MultiTaskSelector() # must match ViewParam.TASKS
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
complete_only = get_child_node(self, "complete_only")
only_completed_text = _("Only completed tasks?")
complete_only.title = only_completed_text
complete_only.label = only_completed_text
[docs]class EditTaskFilterAdminSchema(Schema):
"""
Schema to edit the "admin" parts of a task filter.
"""
device_ids = DevicesSequence() # must match ViewParam.DEVICE_IDS
user_ids = UploadingUserSequence() # must match ViewParam.USER_IDS
group_ids = AllowedGroupsSequence() # must match ViewParam.GROUP_IDS
[docs]class EditTaskFilterSchema(CSRFSchema):
"""
Schema to edit a task filter.
"""
who = EditTaskFilterWhoSchema( # must match ViewParam.WHO
widget=MappingWidget(template="mapping_accordion", open=False)
)
what = EditTaskFilterWhatSchema( # must match ViewParam.WHAT
widget=MappingWidget(template="mapping_accordion", open=False)
)
when = EditTaskFilterWhenSchema( # must match ViewParam.WHEN
widget=MappingWidget(template="mapping_accordion", open=False)
)
admin = EditTaskFilterAdminSchema( # must match ViewParam.ADMIN
widget=MappingWidget(template="mapping_accordion", open=False)
)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
# log.debug("EditTaskFilterSchema.after_bind")
# log.debug("{!r}", self.__dict__)
# This is pretty nasty. By the time we get here, the Form class has
# made Field objects, and, I think, called a clone() function on us.
# Objects like "who" are not in our __dict__ any more. Our __dict__
# looks like:
# {
# 'typ': <colander.Mapping object at 0x7fd7989b18d0>,
# 'bindings': {
# 'open_who': True,
# 'open_when': True,
# 'request': ...,
# },
# '_order': 118,
# 'children': [
# <...CSRFToken object at ... (named csrf)>,
# <...EditTaskFilterWhoSchema object at ... (named who)>,
# ...
# ],
# 'title': ''
# }
_ = self.gettext
who = get_child_node(self, "who")
what = get_child_node(self, "what")
when = get_child_node(self, "when")
admin = get_child_node(self, "admin")
who.title = _("Who")
what.title = _("What")
when.title = _("When")
admin.title = _("Administrative criteria")
# log.debug("who = {!r}", who)
# log.debug("who.__dict__ = {!r}", who.__dict__)
who.widget.open = kw[Binding.OPEN_WHO]
what.widget.open = kw[Binding.OPEN_WHAT]
when.widget.open = kw[Binding.OPEN_WHEN]
admin.widget.open = kw[Binding.OPEN_ADMIN]
[docs]class TasksPerPageSchema(CSRFSchema):
"""
Schema to edit the number of rows per page, for the task view.
"""
rows_per_page = RowsPerPageSelector() # must match ViewParam.ROWS_PER_PAGE
[docs]class TasksPerPageForm(InformativeNonceForm):
"""
Form to edit the number of tasks per page, for the task view.
"""
[docs] def __init__(self, request: "CamcopsRequest", **kwargs) -> None:
_ = request.gettext
schema = TasksPerPageSchema().bind(request=request)
super().__init__(
schema,
buttons=[
Button(
name=FormAction.SUBMIT_TASKS_PER_PAGE,
title=_("Set n/page"),
)
],
css_class=BootstrapCssClasses.FORM_INLINE,
**kwargs,
)
[docs]class RefreshTasksSchema(CSRFSchema):
"""
Schema for a "refresh tasks" button.
"""
pass
# =============================================================================
# Trackers
# =============================================================================
[docs]class TaskTrackerOutputTypeSelector(SchemaNode, RequestAwareMixin):
"""
Node to select the output format for a tracker.
"""
# Choices don't require translation
_choices = (
(ViewArg.HTML, "HTML"),
(ViewArg.PDF, "PDF"),
(ViewArg.XML, "XML"),
)
schema_type = String
default = ViewArg.HTML
missing = ViewArg.HTML
widget = RadioChoiceWidget(values=_choices)
validator = OneOf(list(x[0] for x in _choices))
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("View as")
[docs]class ChooseTrackerSchema(CSRFSchema):
"""
Schema to select a tracker or CTV.
"""
which_idnum = (
MandatoryWhichIdNumSelector()
) # must match ViewParam.WHICH_IDNUM # noqa
idnum_value = (
MandatoryIdNumValue()
) # must match ViewParam.IDNUM_VALUE # noqa
start_datetime = (
StartPendulumSelector()
) # must match ViewParam.START_DATETIME # noqa
end_datetime = EndPendulumSelector() # must match ViewParam.END_DATETIME
all_tasks = BooleanNode(default=True) # match ViewParam.ALL_TASKS
tasks = MultiTaskSelector() # must match ViewParam.TASKS
# tracker_tasks_only will be set via the binding
via_index = ViaIndexSelector() # must match ViewParam.VIA_INDEX
viewtype = (
TaskTrackerOutputTypeSelector()
) # must match ViewParam.VIEWTYPE # noqa
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
all_tasks = get_child_node(self, "all_tasks")
text = _("Use all eligible task types?")
all_tasks.title = text
all_tasks.label = text
# =============================================================================
# Reports, which use dynamically created forms
# =============================================================================
[docs]class ReportOutputTypeSelector(SchemaNode, RequestAwareMixin):
"""
Node to select the output format for a report.
"""
schema_type = String
default = ViewArg.HTML
missing = ViewArg.HTML
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.widget = None # type: Optional[Widget]
self.validator = None # type: Optional[ValidatorType]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("View as")
choices = self.get_choices()
values, pv = get_values_and_permissible(choices)
self.widget = RadioChoiceWidget(values=choices)
self.validator = OneOf(pv)
def get_choices(self) -> Tuple[Tuple[str, str]]:
_ = self.gettext
# noinspection PyTypeChecker
return (
(ViewArg.HTML, _("HTML")),
(ViewArg.ODS, _("OpenOffice spreadsheet (ODS) file")),
(ViewArg.TSV, _("TSV (tab-separated values)")),
(ViewArg.XLSX, _("XLSX (Microsoft Excel) file")),
)
[docs]class ReportParamSchema(CSRFSchema):
"""
Schema to embed a report type (ID) and output format (view type).
"""
viewtype = ReportOutputTypeSelector() # must match ViewParam.VIEWTYPE
report_id = HiddenStringNode() # must match ViewParam.REPORT_ID
# Specific forms may inherit from this.
[docs]class DateTimeFilteredReportParamSchema(ReportParamSchema):
start_datetime = StartPendulumSelector()
end_datetime = EndPendulumSelector()
# =============================================================================
# View DDL
# =============================================================================
def get_sql_dialect_choices(
request: "CamcopsRequest",
) -> List[Tuple[str, str]]:
_ = request.gettext
return [
# https://docs.sqlalchemy.org/en/latest/dialects/
(SqlaDialectName.MYSQL, "MySQL"),
(SqlaDialectName.MSSQL, "Microsoft SQL Server"),
(SqlaDialectName.ORACLE, "Oracle" + _("[WILL NOT WORK]")),
# ... Oracle doesn't work; SQLAlchemy enforces the Oracle rule of a 30-
# character limit for identifiers, only relaxed to 128 characters in
# Oracle 12.2 (March 2017).
(SqlaDialectName.FIREBIRD, "Firebird"),
(SqlaDialectName.POSTGRES, "PostgreSQL"),
(SqlaDialectName.SQLITE, "SQLite"),
(SqlaDialectName.SYBASE, "Sybase"),
]
[docs]class DatabaseDialectSelector(SchemaNode, RequestAwareMixin):
"""
Node to choice an SQL dialect (for viewing DDL).
"""
schema_type = String
default = SqlaDialectName.MYSQL
missing = SqlaDialectName.MYSQL
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.widget = None # type: Optional[Widget]
self.validator = None # type: Optional[ValidatorType]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("SQL dialect to use (not all may be valid)")
choices = get_sql_dialect_choices(self.request)
values, pv = get_values_and_permissible(choices)
self.widget = RadioChoiceWidget(values=values)
self.validator = OneOf(pv)
[docs]class ViewDdlSchema(CSRFSchema):
"""
Schema to choose how to view DDL.
"""
dialect = DatabaseDialectSelector() # must match ViewParam.DIALECT
# =============================================================================
# Add/edit/delete users
# =============================================================================
[docs]class UserGroupPermissionsGroupAdminSchema(CSRFSchema):
"""
Edit group-specific permissions for a user. For group administrators.
"""
# Currently the defaults here will be ignored because we don't use this
# schema to create new UserGroupMembership records. The record will already
# exist by the time we see the forms that use this schema. So the database
# defaults will be used instead.
may_upload = BooleanNode(
default=False
) # match ViewParam.MAY_UPLOAD and User attribute # noqa
may_register_devices = BooleanNode(
default=False
) # match ViewParam.MAY_REGISTER_DEVICES and User attribute # noqa
may_use_webviewer = BooleanNode(
default=False
) # match ViewParam.MAY_USE_WEBVIEWER and User attribute # noqa
may_manage_patients = BooleanNode(
default=False
) # match ViewParam.MAY_MANAGE_PATIENTS # noqa
may_email_patients = BooleanNode(
default=False
) # match ViewParam.MAY_EMAIL_PATIENTS # noqa
view_all_patients_when_unfiltered = BooleanNode(
default=False
) # match ViewParam.VIEW_ALL_PATIENTS_WHEN_UNFILTERED and User attribute # noqa
may_dump_data = BooleanNode(
default=False
) # match ViewParam.MAY_DUMP_DATA and User attribute # noqa
may_run_reports = BooleanNode(
default=False
) # match ViewParam.MAY_RUN_REPORTS and User attribute # noqa
may_add_notes = BooleanNode(
default=False
) # match ViewParam.MAY_ADD_NOTES and User attribute # noqa
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
may_upload = get_child_node(self, "may_upload")
mu_text = _("Permitted to upload from a tablet/device")
may_upload.title = mu_text
may_upload.label = mu_text
may_register_devices = get_child_node(self, "may_register_devices")
mrd_text = _("Permitted to register tablet/client devices")
may_register_devices.title = mrd_text
may_register_devices.label = mrd_text
may_use_webviewer = get_child_node(self, "may_use_webviewer")
ml_text = _("May log in to web front end")
may_use_webviewer.title = ml_text
may_use_webviewer.label = ml_text
may_manage_patients = get_child_node(self, "may_manage_patients")
mmp_text = _("May add, edit or delete patients created on the server")
may_manage_patients.title = mmp_text
may_manage_patients.label = mmp_text
may_email_patients = get_child_node(self, "may_email_patients")
mep_text = _("May send emails to patients created on the server")
may_email_patients.title = mep_text
may_email_patients.label = mep_text
view_all_patients_when_unfiltered = get_child_node(
self, "view_all_patients_when_unfiltered"
)
vap_text = _(
"May view (browse) records from all patients when no patient "
"filter set"
)
view_all_patients_when_unfiltered.title = vap_text
view_all_patients_when_unfiltered.label = vap_text
may_dump_data = get_child_node(self, "may_dump_data")
md_text = _("May perform bulk data dumps")
may_dump_data.title = md_text
may_dump_data.label = md_text
may_run_reports = get_child_node(self, "may_run_reports")
mrr_text = _("May run reports")
may_run_reports.title = mrr_text
may_run_reports.label = mrr_text
may_add_notes = get_child_node(self, "may_add_notes")
man_text = _("May add special notes to tasks")
may_add_notes.title = man_text
may_add_notes.label = man_text
[docs]class UserGroupPermissionsFullSchema(UserGroupPermissionsGroupAdminSchema):
"""
Edit group-specific permissions for a user. For superusers; includes the
option to make the user a groupadmin.
"""
groupadmin = BooleanNode(
default=False
) # match ViewParam.GROUPADMIN and User attribute
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
super().after_bind(node, kw)
_ = self.gettext
groupadmin = get_child_node(self, "groupadmin")
text = _("User is a privileged group administrator for this group")
groupadmin.title = text
groupadmin.label = text
[docs]class EditUserGroupAdminSchema(CSRFSchema):
"""
Schema to edit a user. Version for group administrators.
"""
username = (
UsernameNode()
) # name must match ViewParam.USERNAME and User attribute # noqa
fullname = OptionalStringNode( # name must match ViewParam.FULLNAME and User attribute # noqa
validator=Length(0, StringLengths.FULLNAME_MAX_LEN)
)
email = (
OptionalEmailNode()
) # name must match ViewParam.EMAIL and User attribute # noqa
must_change_password = (
MustChangePasswordNode()
) # match ViewParam.MUST_CHANGE_PASSWORD and User attribute # noqa
language = LanguageSelector() # must match ViewParam.LANGUAGE
group_ids = AdministeredGroupsSequence() # must match ViewParam.GROUP_IDS
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
fullname = get_child_node(self, "fullname")
fullname.title = _("Full name")
email = get_child_node(self, "email")
email.title = _("E-mail address")
[docs]class EditUserFullSchema(EditUserGroupAdminSchema):
"""
Schema to edit a user. Version for superusers; can also make the user a
superuser.
"""
superuser = BooleanNode(
default=False
) # match ViewParam.SUPERUSER and User attribute # noqa
group_ids = AllGroupsSequence() # must match ViewParam.GROUP_IDS
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
superuser = get_child_node(self, "superuser")
text = _("Superuser (CAUTION!)")
superuser.title = text
superuser.label = text
[docs]class AddUserSuperuserSchema(CSRFSchema):
"""
Schema to add a user. Version for superusers.
"""
username = (
UsernameNode()
) # name must match ViewParam.USERNAME and User attribute # noqa
new_password = NewPasswordNode() # name must match ViewParam.NEW_PASSWORD
must_change_password = (
MustChangePasswordNode()
) # match ViewParam.MUST_CHANGE_PASSWORD and User attribute # noqa
group_ids = AllGroupsSequence() # must match ViewParam.GROUP_IDS
[docs]class AddUserGroupadminSchema(AddUserSuperuserSchema):
"""
Schema to add a user. Version for group administrators.
"""
group_ids = AdministeredGroupsSequence() # must match ViewParam.GROUP_IDS
[docs]class SetUserUploadGroupSchema(CSRFSchema):
"""
Schema to choose the group into which a user uploads.
"""
upload_group_id = (
OptionalGroupIdSelectorUserGroups()
) # must match ViewParam.UPLOAD_GROUP_ID # noqa
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
upload_group_id = get_child_node(self, "upload_group_id")
upload_group_id.title = _("Group into which to upload data")
upload_group_id.description = _(
"Pick a group from those to which the user belongs"
)
[docs]class DeleteUserSchema(HardWorkConfirmationSchema):
"""
Schema to delete a user.
"""
user_id = HiddenIntegerNode() # name must match ViewParam.USER_ID
danger = TranslatableValidateDangerousOperationNode()
# =============================================================================
# Add/edit/delete groups
# =============================================================================
[docs]class PolicyNode(MandatoryStringNode, RequestAwareMixin):
"""
Node to capture a CamCOPS ID number policy, and make sure it is
syntactically valid.
"""
def validator(self, node: SchemaNode, value: Any) -> None:
_ = self.gettext
if not isinstance(value, str):
# unlikely!
raise Invalid(node, _("Not a string"))
policy = TokenizedPolicy(value)
if not policy.is_syntactically_valid():
raise Invalid(node, _("Syntactically invalid policy"))
if not policy.is_valid_for_idnums(self.request.valid_which_idnums):
raise Invalid(
node,
_(
"Invalid policy. Have you referred to non-existent ID "
"numbers? Is the policy less restrictive than the "
"tablet’s minimum ID policy?"
)
+ f" [{TABLET_ID_POLICY_STR!r}]",
)
[docs]class GroupNameNode(MandatoryStringNode, RequestAwareMixin):
"""
Node to capture a CamCOPS group name, and check it's valid as a string.
"""
def validator(self, node: SchemaNode, value: str) -> None:
try:
validate_group_name(value, self.request)
except ValueError as e:
raise Invalid(node, str(e))
[docs]class IpUseType(SchemaType):
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def deserialize(
self,
node: SchemaNode,
cstruct: Union[Dict[str, Any], None, ColanderNullType],
) -> Optional[IpUse]:
if cstruct in (None, null):
return None
cstruct: Dict[str, Any] # For type checker
return IpUse(**cstruct)
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def serialize(
self, node: SchemaNode, ip_use: Union[IpUse, None, ColanderNullType]
) -> Union[Dict, ColanderNullType]:
if ip_use in (null, None):
return null
return {
context: getattr(ip_use, context) for context in IpUse.CONTEXTS
}
[docs]class GroupIpUseNode(SchemaNode, RequestAwareMixin):
schema_type = IpUseType
# noinspection PyUnusedLocal,PyAttributeOutsideInit
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
self.widget = GroupIpUseWidget(self.request)
[docs]class EditGroupSchema(CSRFSchema):
"""
Schema to edit a group.
"""
group_id = HiddenIntegerNode() # must match ViewParam.GROUP_ID
name = GroupNameNode() # must match ViewParam.NAME
description = MandatoryStringNode( # must match ViewParam.DESCRIPTION
validator=Length(
StringLengths.GROUP_DESCRIPTION_MIN_LEN,
StringLengths.GROUP_DESCRIPTION_MAX_LEN,
)
)
ip_use = GroupIpUseNode()
group_ids = AllOtherGroupsSequence() # must match ViewParam.GROUP_IDS
upload_policy = PolicyNode() # must match ViewParam.UPLOAD_POLICY
finalize_policy = PolicyNode() # must match ViewParam.FINALIZE_POLICY
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
name = get_child_node(self, "name")
name.title = _("Group name")
ip_use = get_child_node(self, "ip_use")
ip_use.title = _("Group intellectual property settings")
group_ids = get_child_node(self, "group_ids")
group_ids.title = _("Other groups this group may see")
upload_policy = get_child_node(self, "upload_policy")
upload_policy.title = _("Upload policy")
upload_policy.description = _(
"Minimum required patient information to copy data to server"
)
finalize_policy = get_child_node(self, "finalize_policy")
finalize_policy.title = _("Finalize policy")
finalize_policy.description = _(
"Minimum required patient information to clear data off "
"source device"
)
def validator(self, node: SchemaNode, value: Any) -> None:
request = self.bindings[Binding.REQUEST] # type: CamcopsRequest
q = (
CountStarSpecializedQuery(Group, session=request.dbsession)
.filter(Group.id != value[ViewParam.GROUP_ID])
.filter(Group.name == value[ViewParam.NAME])
)
if q.count_star() > 0:
_ = request.gettext
raise Invalid(node, _("Name is used by another group!"))
[docs]class AddGroupSchema(CSRFSchema):
"""
Schema to add a group.
"""
name = GroupNameNode() # name must match ViewParam.NAME
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
name = get_child_node(self, "name")
name.title = _("Group name")
def validator(self, node: SchemaNode, value: Any) -> None:
request = self.bindings[Binding.REQUEST] # type: CamcopsRequest
q = CountStarSpecializedQuery(Group, session=request.dbsession).filter(
Group.name == value[ViewParam.NAME]
)
if q.count_star() > 0:
_ = request.gettext
raise Invalid(node, _("Name is used by another group!"))
[docs]class DeleteGroupSchema(HardWorkConfirmationSchema):
"""
Schema to delete a group.
"""
group_id = HiddenIntegerNode() # name must match ViewParam.GROUP_ID
danger = TranslatableValidateDangerousOperationNode()
# =============================================================================
# Offer research dumps
# =============================================================================
[docs]class DumpTypeSelector(SchemaNode, RequestAwareMixin):
"""
Node to select the filtering method for a data dump.
"""
schema_type = String
default = ViewArg.EVERYTHING
missing = ViewArg.EVERYTHING
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.widget = None # type: Optional[Widget]
self.validator = None # type: Optional[ValidatorType]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Dump method")
choices = (
(ViewArg.EVERYTHING, _("Everything")),
(ViewArg.USE_SESSION_FILTER, _("Use the session filter settings")),
(
ViewArg.SPECIFIC_TASKS_GROUPS,
_("Specify tasks/groups manually (see below)"),
),
)
self.widget = RadioChoiceWidget(values=choices)
self.validator = OneOf(list(x[0] for x in choices))
[docs]class DeliveryModeNode(SchemaNode, RequestAwareMixin):
"""
Mode of delivery of data downloads.
"""
schema_type = String
default = ViewArg.EMAIL
missing = ViewArg.EMAIL
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Delivery")
choices = (
(ViewArg.IMMEDIATELY, _("Serve immediately")),
(ViewArg.EMAIL, _("E-mail me")),
(ViewArg.DOWNLOAD, _("Create a file for me to download")),
)
values, pv = get_values_and_permissible(choices)
self.widget = RadioChoiceWidget(values=values)
# noinspection PyUnusedLocal
def validator(self, node: SchemaNode, value: Any) -> None:
request = self.bindings[Binding.REQUEST] # type: CamcopsRequest
_ = request.gettext
if value == ViewArg.IMMEDIATELY:
if not request.config.permit_immediate_downloads:
raise Invalid(
self,
_("Disabled by the system administrator")
+ f" [{ConfigParamSite.PERMIT_IMMEDIATE_DOWNLOADS}]",
)
elif value == ViewArg.EMAIL:
if not request.user.email:
raise Invalid(
self, _("Your user does not have an email address")
)
elif value == ViewArg.DOWNLOAD:
if not request.user_download_dir:
raise Invalid(
self,
_("User downloads not configured by administrator")
+ f" [{ConfigParamSite.USER_DOWNLOAD_DIR}, "
f"{ConfigParamSite.USER_DOWNLOAD_MAX_SPACE_MB}]",
)
else:
raise Invalid(self, _("Bad value"))
[docs]class SqliteSelector(SchemaNode, RequestAwareMixin):
"""
Node to select a way of downloading an SQLite database.
"""
schema_type = String
default = ViewArg.SQLITE
missing = ViewArg.SQLITE
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.widget = None # type: Optional[Widget]
self.validator = None # type: Optional[ValidatorType]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Database download method")
choices = (
# https://docs.sqlalchemy.org/en/latest/dialects/
(ViewArg.SQLITE, _("Binary SQLite database")),
(ViewArg.SQL, _("SQL text to create SQLite database")),
)
values, pv = get_values_and_permissible(choices)
self.widget = RadioChoiceWidget(values=values)
self.validator = OneOf(pv)
[docs]class SimplifiedSpreadsheetsNode(SchemaNode, RequestAwareMixin):
"""
Boolean node: simplify basic dump spreadsheets?
"""
schema_type = Boolean
default = True
missing = True
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.label = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Simplify spreadsheets?")
self.label = _("Remove non-essential details?")
[docs]class SortTsvByHeadingsNode(SchemaNode, RequestAwareMixin):
"""
Boolean node: sort TSV files by column name?
"""
schema_type = Boolean
default = False
missing = False
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.label = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Sort columns?")
self.label = _("Sort by heading (column) names within spreadsheets?")
[docs]class IncludeSchemaNode(SchemaNode, RequestAwareMixin):
"""
Boolean node: should INFORMATION_SCHEMA.COLUMNS be included (for
downloads)?
False by default -- adds about 350 kb to an ODS download, for example.
"""
schema_type = Boolean
default = False
missing = False
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.label = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Include column information?")
self.label = _(
"Include details of all columns in the source database?"
)
[docs]class IncludeBlobsNode(SchemaNode, RequestAwareMixin):
"""
Boolean node: should BLOBs be included (for downloads)?
"""
schema_type = Boolean
default = False
missing = False
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.label = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Include BLOBs?")
self.label = _(
"Include binary large objects (BLOBs)? WARNING: may be large"
)
[docs]class PatientIdPerRowNode(SchemaNode, RequestAwareMixin):
"""
Boolean node: should patient ID information, and other cross-referencing
denormalized info, be included per row?
See :ref:`DB_PATIENT_ID_PER_ROW <DB_PATIENT_ID_PER_ROW>`.
"""
schema_type = Boolean
default = True
missing = True
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.label = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Patient ID per row?")
self.label = _(
"Include patient ID numbers and task cross-referencing "
"(denormalized) information per row?"
)
[docs]class OfferDumpManualSchema(Schema, RequestAwareMixin):
"""
Schema to offer the "manual" settings for a data dump (groups, task types).
"""
group_ids = AllowedGroupsSequence() # must match ViewParam.GROUP_IDS
tasks = MultiTaskSelector() # must match ViewParam.TASKS
widget = MappingWidget(template="mapping_accordion", open=False)
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Manual settings")
[docs]class OfferBasicDumpSchema(CSRFSchema):
"""
Schema to choose the settings for a basic (TSV/ZIP) data dump.
"""
dump_method = DumpTypeSelector() # must match ViewParam.DUMP_METHOD
simplified = (
SimplifiedSpreadsheetsNode()
) # must match ViewParam.SIMPLIFIED
sort = SortTsvByHeadingsNode() # must match ViewParam.SORT
include_schema = IncludeSchemaNode() # must match ViewParam.INCLUDE_SCHEMA
manual = OfferDumpManualSchema() # must match ViewParam.MANUAL
viewtype = (
SpreadsheetFormatSelector()
) # must match ViewParam.VIEWTYPE # noqa
delivery_mode = DeliveryModeNode() # must match ViewParam.DELIVERY_MODE
[docs]class OfferSqlDumpSchema(CSRFSchema):
"""
Schema to choose the settings for an SQL data dump.
"""
dump_method = DumpTypeSelector() # must match ViewParam.DUMP_METHOD
sqlite_method = SqliteSelector() # must match ViewParam.SQLITE_METHOD
include_schema = IncludeSchemaNode() # must match ViewParam.INCLUDE_SCHEMA
include_blobs = IncludeBlobsNode() # must match ViewParam.INCLUDE_BLOBS
patient_id_per_row = (
PatientIdPerRowNode()
) # must match ViewParam.PATIENT_ID_PER_ROW # noqa
manual = OfferDumpManualSchema() # must match ViewParam.MANUAL
delivery_mode = DeliveryModeNode() # must match ViewParam.DELIVERY_MODE
# =============================================================================
# Edit server settings
# =============================================================================
[docs]class EditServerSettingsSchema(CSRFSchema):
"""
Schema to edit the global settings for the server.
"""
database_title = SchemaNode( # must match ViewParam.DATABASE_TITLE
String(),
validator=Length(
StringLengths.DATABASE_TITLE_MIN_LEN,
StringLengths.DATABASE_TITLE_MAX_LEN,
),
)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
database_title = get_child_node(self, "database_title")
database_title.title = _("Database friendly title")
# =============================================================================
# Edit ID number definitions
# =============================================================================
[docs]class IdDefinitionDescriptionNode(SchemaNode, RequestAwareMixin):
"""
Node to capture the description of an ID number type.
"""
schema_type = String
validator = Length(1, StringLengths.ID_DESCRIPTOR_MAX_LEN)
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Full description (e.g. “NHS number”)")
[docs]class IdDefinitionShortDescriptionNode(SchemaNode, RequestAwareMixin):
"""
Node to capture the short description of an ID number type.
"""
schema_type = String
validator = Length(1, StringLengths.ID_DESCRIPTOR_MAX_LEN)
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.description = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Short description (e.g. “NHS#”)")
self.description = _("Try to keep it very short!")
[docs]class IdValidationMethodNode(OptionalStringNode, RequestAwareMixin):
"""
Node to choose a build-in ID number validation method.
"""
widget = SelectWidget(values=ID_NUM_VALIDATION_METHOD_CHOICES)
validator = OneOf(list(x[0] for x in ID_NUM_VALIDATION_METHOD_CHOICES))
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.description = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Validation method")
self.description = _("Built-in CamCOPS ID number validation method")
[docs]class Hl7AssigningAuthorityNode(OptionalStringNode, RequestAwareMixin):
"""
Optional node to capture the name of an HL7 Assigning Authority.
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.description = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("HL7 Assigning Authority")
self.description = _(
"For HL7 messaging: "
"HL7 Assigning Authority for ID number (unique name of the "
"system/organization/agency/department that creates the data)."
)
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: str) -> None:
try:
validate_hl7_aa(value, self.request)
except ValueError as e:
raise Invalid(node, str(e))
[docs]class Hl7IdTypeNode(OptionalStringNode, RequestAwareMixin):
"""
Optional node to capture the name of an HL7 Identifier Type code.
"""
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.description = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("HL7 Identifier Type")
self.description = _(
"For HL7 messaging: "
"HL7 Identifier Type code: ‘a code corresponding to the type "
"of identifier. In some cases, this code may be used as a "
"qualifier to the “Assigning Authority” component.’"
)
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: str) -> None:
try:
validate_hl7_id_type(value, self.request)
except ValueError as e:
raise Invalid(node, str(e))
[docs]class FHIRIdSystemUrlNode(OptionalStringNode, RequestAwareMixin):
"""
Optional node to capture the URL for a FHIR ID system:
- https://www.hl7.org/fhir/datatypes.html#Identifier
- https://www.hl7.org/fhir/datatypes-definitions.html#Identifier.system
"""
validator = url
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.description = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("FHIR ID system")
self.description = _("For FHIR exports: URL defining the ID system.")
[docs]class EditIdDefinitionSchema(CSRFSchema):
"""
Schema to edit an ID number definition.
"""
which_idnum = HiddenIntegerNode() # must match ViewParam.WHICH_IDNUM
description = (
IdDefinitionDescriptionNode()
) # must match ViewParam.DESCRIPTION # noqa
short_description = (
IdDefinitionShortDescriptionNode()
) # must match ViewParam.SHORT_DESCRIPTION # noqa
validation_method = (
IdValidationMethodNode()
) # must match ViewParam.VALIDATION_METHOD # noqa
hl7_id_type = Hl7IdTypeNode() # must match ViewParam.HL7_ID_TYPE
hl7_assigning_authority = (
Hl7AssigningAuthorityNode()
) # must match ViewParam.HL7_ASSIGNING_AUTHORITY # noqa
fhir_id_system = (
FHIRIdSystemUrlNode()
) # must match ViewParam.FHIR_ID_SYSTEM # noqa
def validator(self, node: SchemaNode, value: Any) -> None:
request = self.bindings[Binding.REQUEST] # type: CamcopsRequest
_ = request.gettext
qd = (
CountStarSpecializedQuery(
IdNumDefinition, session=request.dbsession
)
.filter(
IdNumDefinition.which_idnum != value[ViewParam.WHICH_IDNUM]
)
.filter(
IdNumDefinition.description == value[ViewParam.DESCRIPTION]
)
)
if qd.count_star() > 0:
raise Invalid(node, _("Description is used by another ID number!"))
qs = (
CountStarSpecializedQuery(
IdNumDefinition, session=request.dbsession
)
.filter(
IdNumDefinition.which_idnum != value[ViewParam.WHICH_IDNUM]
)
.filter(
IdNumDefinition.short_description
== value[ViewParam.SHORT_DESCRIPTION]
)
)
if qs.count_star() > 0:
raise Invalid(
node, _("Short description is used by another ID number!")
)
[docs]class AddIdDefinitionSchema(CSRFSchema):
"""
Schema to add an ID number definition.
"""
which_idnum = SchemaNode( # must match ViewParam.WHICH_IDNUM
Integer(), validator=Range(min=1)
)
description = (
IdDefinitionDescriptionNode()
) # must match ViewParam.DESCRIPTION # noqa
short_description = (
IdDefinitionShortDescriptionNode()
) # must match ViewParam.SHORT_DESCRIPTION # noqa
validation_method = (
IdValidationMethodNode()
) # must match ViewParam.VALIDATION_METHOD # noqa
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
which_idnum = get_child_node(self, "which_idnum")
which_idnum.title = _("Which ID number?")
which_idnum.description = (
"Specify the integer to represent the type of this ID "
"number class (e.g. consecutive numbering from 1)"
)
def validator(self, node: SchemaNode, value: Any) -> None:
request = self.bindings[Binding.REQUEST] # type: CamcopsRequest
_ = request.gettext
qw = CountStarSpecializedQuery(
IdNumDefinition, session=request.dbsession
).filter(IdNumDefinition.which_idnum == value[ViewParam.WHICH_IDNUM])
if qw.count_star() > 0:
raise Invalid(node, _("ID# clashes with another ID number!"))
qd = CountStarSpecializedQuery(
IdNumDefinition, session=request.dbsession
).filter(IdNumDefinition.description == value[ViewParam.DESCRIPTION])
if qd.count_star() > 0:
raise Invalid(node, _("Description is used by another ID number!"))
qs = CountStarSpecializedQuery(
IdNumDefinition, session=request.dbsession
).filter(
IdNumDefinition.short_description
== value[ViewParam.SHORT_DESCRIPTION]
)
if qs.count_star() > 0:
raise Invalid(
node, _("Short description is used by another ID number!")
)
[docs]class DeleteIdDefinitionSchema(HardWorkConfirmationSchema):
"""
Schema to delete an ID number definition.
"""
which_idnum = HiddenIntegerNode() # name must match ViewParam.WHICH_IDNUM
danger = TranslatableValidateDangerousOperationNode()
# =============================================================================
# Special notes
# =============================================================================
[docs]class AddSpecialNoteSchema(CSRFSchema):
"""
Schema to add a special note to a task.
"""
table_name = HiddenStringNode() # must match ViewParam.TABLENAME
server_pk = HiddenIntegerNode() # must match ViewParam.SERVER_PK
note = MandatoryStringNode( # must match ViewParam.NOTE
widget=TextAreaWidget(rows=20, cols=80)
)
danger = TranslatableValidateDangerousOperationNode()
[docs]class DeleteSpecialNoteSchema(CSRFSchema):
"""
Schema to add a special note to a task.
"""
note_id = HiddenIntegerNode() # must match ViewParam.NOTE_ID
danger = TranslatableValidateDangerousOperationNode()
# =============================================================================
# The unusual data manipulation operations
# =============================================================================
[docs]class EraseTaskSchema(HardWorkConfirmationSchema):
"""
Schema to erase a task.
"""
table_name = HiddenStringNode() # must match ViewParam.TABLENAME
server_pk = HiddenIntegerNode() # must match ViewParam.SERVER_PK
danger = TranslatableValidateDangerousOperationNode()
[docs]class DeletePatientChooseSchema(CSRFSchema):
"""
Schema to delete a patient.
"""
which_idnum = (
MandatoryWhichIdNumSelector()
) # must match ViewParam.WHICH_IDNUM # noqa
idnum_value = MandatoryIdNumValue() # must match ViewParam.IDNUM_VALUE
group_id = (
MandatoryGroupIdSelectorAdministeredGroups()
) # must match ViewParam.GROUP_ID # noqa
danger = TranslatableValidateDangerousOperationNode()
[docs]class DeletePatientConfirmSchema(HardWorkConfirmationSchema):
"""
Schema to confirm deletion of a patient.
"""
which_idnum = HiddenIntegerNode() # must match ViewParam.WHICH_IDNUM
idnum_value = HiddenIntegerNode() # must match ViewParam.IDNUM_VALUE
group_id = HiddenIntegerNode() # must match ViewParam.GROUP_ID
danger = TranslatableValidateDangerousOperationNode()
[docs]class DeleteServerCreatedPatientSchema(HardWorkConfirmationSchema):
"""
Schema to delete a patient created on the server.
"""
# name must match ViewParam.SERVER_PK
server_pk = HiddenIntegerNode()
danger = TranslatableValidateDangerousOperationNode()
EDIT_PATIENT_SIMPLE_PARAMS = [
ViewParam.FORENAME,
ViewParam.SURNAME,
ViewParam.DOB,
ViewParam.SEX,
ViewParam.ADDRESS,
ViewParam.EMAIL,
ViewParam.GP,
ViewParam.OTHER,
]
[docs]class TaskScheduleSelector(SchemaNode, RequestAwareMixin):
"""
Drop-down with all available task schedules
"""
widget = SelectWidget()
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None:
self.title = "" # for type checker
self.name = "" # for type checker
self.validator = None # type: Optional[ValidatorType]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
request = self.request
_ = request.gettext
self.title = _("Task schedule")
values = [] # type: List[Tuple[Optional[int], str]]
valid_group_ids = (
request.user.ids_of_groups_user_may_manage_patients_in
)
task_schedules = (
request.dbsession.query(TaskSchedule)
.filter(TaskSchedule.group_id.in_(valid_group_ids))
.order_by(TaskSchedule.name)
)
for task_schedule in task_schedules:
values.append((task_schedule.id, task_schedule.name))
values, pv = get_values_and_permissible(values, add_none=False)
self.widget.values = values
self.validator = OneOf(pv)
@staticmethod
def schema_type() -> SchemaType:
return Integer()
[docs]class JsonType(SchemaType):
"""
Schema type for JsonNode
"""
# noinspection PyMethodMayBeStatic, PyUnusedLocal
def deserialize(
self, node: SchemaNode, cstruct: Union[str, ColanderNullType, None]
) -> Any:
# is null when form is empty
if cstruct in (null, None):
return None
cstruct: str
try:
# Validation happens on the widget class
json_value = json.loads(cstruct)
except json.JSONDecodeError:
return None
return json_value
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def serialize(
self, node: SchemaNode, appstruct: Union[Dict, None, ColanderNullType]
) -> Union[str, ColanderNullType]:
# is null when form is empty (new record)
# is None when populated from empty value in the database
if appstruct in (null, None):
return null
# appstruct should be well formed here (it would already have failed
# when reading from the database)
return json.dumps(appstruct)
[docs]class JsonSettingsNode(SchemaNode, RequestAwareMixin):
"""
Note to edit raw JSON.
"""
schema_type = JsonType
missing = null
# noinspection PyUnusedLocal,PyAttributeOutsideInit
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.widget = JsonWidget(self.request)
self.title = _("Task-specific settings for this patient")
self.description = _(
"ADVANCED. Only applicable to tasks that are configurable on a "
"per-patient basis. Format: JSON object, with settings keyed on "
"task table name."
)
def validator(self, node: SchemaNode, value: Any) -> None:
if value is not None:
# will be None if JSON failed to validate
if not isinstance(value, dict):
_ = self.request.gettext
error_message = _(
"Please enter a valid JSON object (with settings keyed on "
"task table name) or leave blank"
)
raise Invalid(node, error_message)
[docs]class TaskScheduleJsonSchema(Schema):
"""
Schema for the advanced JSON parts of a patient-to-task-schedule mapping.
"""
settings = JsonSettingsNode() # must match ViewParam.SETTINGS
[docs]class TaskScheduleNode(MappingSchema, RequestAwareMixin):
"""
Node to edit settings for a patient-to-task-schedule mapping.
"""
patient_task_schedule_id = (
HiddenIntegerNode()
) # name must match ViewParam.PATIENT_TASK_SCHEDULE_ID
schedule_id = TaskScheduleSelector() # must match ViewParam.SCHEDULE_ID
start_datetime = (
StartPendulumSelector()
) # must match ViewParam.START_DATETIME
if DEFORM_ACCORDION_BUG:
settings = JsonSettingsNode() # must match ViewParam.SETTINGS
else:
advanced = TaskScheduleJsonSchema( # must match ViewParam.ADVANCED
widget=MappingWidget(template="mapping_accordion", open=False)
)
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Task schedule")
start_datetime = get_child_node(self, "start_datetime")
start_datetime.description = _(
"Leave blank for the date the patient first downloads the schedule"
)
if not DEFORM_ACCORDION_BUG:
advanced = get_child_node(self, "advanced")
advanced.title = _("Advanced")
[docs]class TaskScheduleSequence(SequenceSchema, RequestAwareMixin):
"""
Sequence for multiple patient-to-task-schedule mappings.
"""
task_schedule_sequence = TaskScheduleNode()
missing = drop
[docs] def __init__(self, *args: Any, **kwargs: Any) -> None:
self.title = "" # for type checker
self.widget = None # type: Optional[Widget]
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Task Schedules")
self.widget = TranslatableSequenceWidget(request=self.request)
[docs]class EditPatientSchema(CSRFSchema):
"""
Schema to edit a patient.
"""
server_pk = HiddenIntegerNode() # must match ViewParam.SERVER_PK
forename = OptionalPatientNameNode() # must match ViewParam.FORENAME
surname = OptionalPatientNameNode() # must match ViewParam.SURNAME
dob = DateSelectorNode() # must match ViewParam.DOB
sex = MandatorySexSelector() # must match ViewParam.SEX
address = OptionalStringNode() # must match ViewParam.ADDRESS
email = OptionalEmailNode() # must match ViewParam.EMAIL
gp = OptionalStringNode() # must match ViewParam.GP
other = OptionalStringNode() # must match ViewParam.OTHER
id_references = (
IdNumSequenceUniquePerWhichIdnum()
) # must match ViewParam.ID_REFERENCES # noqa
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
dob = get_child_node(self, "dob")
dob.title = _("Date of birth")
gp = get_child_node(self, "gp")
gp.title = _("GP")
def validator(self, node: SchemaNode, value: Any) -> None:
request = self.bindings[Binding.REQUEST] # type: CamcopsRequest
dbsession = request.dbsession
group_id = value[ViewParam.GROUP_ID]
group = Group.get_group_by_id(dbsession, group_id)
testpatient = Patient()
for k in EDIT_PATIENT_SIMPLE_PARAMS:
setattr(testpatient, k, value[k])
testpatient.idnums = []
for idrefdict in value[ViewParam.ID_REFERENCES]:
pidnum = PatientIdNum()
pidnum.which_idnum = idrefdict[ViewParam.WHICH_IDNUM]
pidnum.idnum_value = idrefdict[ViewParam.IDNUM_VALUE]
testpatient.idnums.append(pidnum)
tk_finalize_policy = TokenizedPolicy(group.finalize_policy)
if not testpatient.satisfies_id_policy(tk_finalize_policy):
_ = self.gettext
raise Invalid(
node,
_("Patient would not meet 'finalize' ID policy for group:")
+ f" {group.name}! ["
+ _("That policy is:")
+ f" {group.finalize_policy!r}]",
)
[docs]class DangerousEditPatientSchema(EditPatientSchema):
group_id = HiddenIntegerNode() # must match ViewParam.GROUP_ID
danger = TranslatableValidateDangerousOperationNode()
[docs]class EditServerCreatedPatientSchema(EditPatientSchema):
# Must match ViewParam.GROUP_ID
group_id = MandatoryGroupIdSelectorPatientGroups(insert_before="forename")
task_schedules = (
TaskScheduleSequence()
) # must match ViewParam.TASK_SCHEDULES
[docs]class EmailTemplateNode(OptionalStringNode, RequestAwareMixin):
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
self.description = "" # for type checker
self.formatter = TaskScheduleEmailTemplateFormatter()
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Email template")
self.description = _(
"Template of email to be sent to patients when inviting them to "
"complete the tasks in the schedule. Valid placeholders: {}"
).format(self.formatter.get_valid_parameters_string())
# noinspection PyAttributeOutsideInit
self.widget = RichTextWidget(options=get_tinymce_options(self.request))
def validator(self, node: SchemaNode, value: Any) -> None:
_ = self.gettext
try:
self.formatter.validate(value)
return
except KeyError as e:
error = _("{bad_key} is not a valid placeholder").format(bad_key=e)
except ValueError:
error = _(
"Invalid email template. Is there a missing '{' or '}' ?"
)
raise Invalid(node, error)
[docs]class EmailCcNode(OptionalEmailNode, RequestAwareMixin):
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Email CC")
self.description = _(
"The patient will see these email addresses. Separate multiple "
"addresses with commas."
)
[docs]class EmailBccNode(OptionalEmailNode, RequestAwareMixin):
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Email BCC")
self.description = _(
"The patient will not see these email addresses. Separate "
"multiple addresses with commas."
)
[docs]class EmailFromNode(OptionalEmailNode, RequestAwareMixin):
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _('Email "From" address')
self.description = _(
"You must set this if you want to send emails to your patients"
)
[docs]class TaskScheduleSchema(CSRFSchema):
name = OptionalStringNode()
group_id = (
MandatoryGroupIdSelectorAdministeredGroups()
) # must match ViewParam.GROUP_ID # noqa
email_from = EmailFromNode() # must match ViewParam.EMAIL_FROM
email_cc = EmailCcNode() # must match ViewParam.EMAIL_CC
email_bcc = EmailBccNode() # must match ViewParam.EMAIL_BCC
email_subject = OptionalStringNode()
email_template = EmailTemplateNode()
[docs]class DeleteTaskScheduleSchema(HardWorkConfirmationSchema):
"""
Schema to delete a task schedule.
"""
# name must match ViewParam.SCHEDULE_ID
schedule_id = HiddenIntegerNode()
danger = TranslatableValidateDangerousOperationNode()
[docs]class DurationType(SchemaType):
"""
Custom colander schema type to convert between Pendulum Duration objects
and months, weeks and days.
"""
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def deserialize(
self,
node: SchemaNode,
cstruct: Union[Dict[str, Any], None, ColanderNullType],
) -> Optional[Duration]:
# called when validating the submitted form with the total days
# from DurationWidget.deserialize()
if cstruct in (None, null):
return None
cstruct: Dict[str, Any]
# may be passed invalid values when re-rendering widget with error
# messages
try:
days = int(cstruct.get("days") or "0")
except ValueError:
days = 0
try:
weeks = int(cstruct.get("weeks") or "0")
except ValueError:
weeks = 0
try:
months = int(cstruct.get("months") or "0")
except ValueError:
months = 0
total_days = months * 30 + weeks * 7 + days
return Duration(days=total_days)
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def serialize(
self, node: SchemaNode, duration: Union[Duration, ColanderNullType]
) -> Union[Dict, ColanderNullType]:
if duration is null:
# For new schedule item
return null
duration: Duration
total_days = duration.in_days()
months = total_days // 30
weeks = (total_days % 30) // 7
days = (total_days % 30) % 7
# Existing schedule item
cstruct = {"days": days, "months": months, "weeks": weeks}
return cstruct
[docs]class DurationNode(SchemaNode, RequestAwareMixin):
schema_type = DurationType
# noinspection PyUnusedLocal,PyAttributeOutsideInit
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
self.widget = DurationWidget(self.request)
[docs]class TaskScheduleItemSchema(CSRFSchema):
schedule_id = HiddenIntegerNode() # name must match ViewParam.SCHEDULE_ID
# name must match ViewParam.TABLE_NAME
table_name = MandatorySingleTaskSelector()
# name must match ViewParam.CLINICIAN_CONFIRMATION
clinician_confirmation = BooleanNode(default=False)
due_from = DurationNode() # name must match ViewParam.DUE_FROM
due_within = DurationNode() # name must match ViewParam.DUE_WITHIN
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
due_from = get_child_node(self, "due_from")
due_from.title = _("Due from")
due_from.description = _(
"Time from the start of schedule when the patient may begin this "
"task"
)
due_within = get_child_node(self, "due_within")
due_within.title = _("Due within")
due_within.description = _(
"Time the patient has to complete this task"
)
clinician_confirmation = get_child_node(self, "clinician_confirmation")
clinician_confirmation.title = _("Allow clinician tasks")
clinician_confirmation.label = None
clinician_confirmation.description = _(
"Tick this box to schedule a task that would normally be "
"completed by (or with) a clinician"
)
def validator(self, node: SchemaNode, value: Dict[str, Any]) -> None:
task_class = self._get_task_class(value)
self._validate_clinician_status(node, value, task_class)
self._validate_due_dates(node, value)
self._validate_task_ip_use(node, value, task_class)
# noinspection PyMethodMayBeStatic
def _get_task_class(self, value: Dict[str, Any]) -> Type["Task"]:
return tablename_to_task_class_dict()[value[ViewParam.TABLE_NAME]]
def _validate_clinician_status(
self, node: SchemaNode, value: Dict[str, Any], task_class: Type["Task"]
) -> None:
_ = self.gettext
clinician_confirmation = value[ViewParam.CLINICIAN_CONFIRMATION]
if task_class.has_clinician and not clinician_confirmation:
raise Invalid(
node,
_(
"You have selected the task '{task_name}', which a "
"patient would not normally complete by themselves. "
"If you are sure you want to do this, you must tick "
"'Allow clinician tasks'."
).format(task_name=task_class.shortname),
)
def _validate_due_dates(
self, node: SchemaNode, value: Dict[str, Any]
) -> None:
_ = self.gettext
due_from = value[ViewParam.DUE_FROM]
if due_from.total_days() < 0:
raise Invalid(node, _("'Due from' must be zero or more days"))
due_within = value[ViewParam.DUE_WITHIN]
if due_within.total_days() <= 0:
raise Invalid(node, _("'Due within' must be more than zero days"))
def _validate_task_ip_use(
self, node: SchemaNode, value: Dict[str, Any], task_class: Type["Task"]
) -> None:
_ = self.gettext
if not task_class.prohibits_anything():
return
schedule_id = value[ViewParam.SCHEDULE_ID]
schedule = (
self.request.dbsession.query(TaskSchedule)
.filter(TaskSchedule.id == schedule_id)
.one()
)
if schedule.group.ip_use is None:
raise Invalid(
node,
_(
"The task you have selected prohibits use in certain "
"contexts. The group '{group_name}' has no intellectual "
"property settings. "
"You need to edit the group '{group_name}' to say which "
"contexts it operates in.".format(
group_name=schedule.group.name
)
),
)
# TODO: On the client we say 'to use this task, you must seek
# permission from the copyright holder'. We could do the same but at
# the moment there isn't a way of telling the system that we have done
# so.
if (
task_class.prohibits_commercial
and schedule.group.ip_use.commercial
):
raise Invalid(
node,
_(
"The group '{group_name}' associated with schedule "
"'{schedule_name}' operates in a "
"commercial context but the task you have selected "
"prohibits commercial use."
).format(
group_name=schedule.group.name, schedule_name=schedule.name
),
)
if task_class.prohibits_clinical and schedule.group.ip_use.clinical:
raise Invalid(
node,
_(
"The group '{group_name}' associated with schedule "
"'{schedule_name}' operates in a "
"clinical context but the task you have selected "
"prohibits clinical use."
).format(
group_name=schedule.group.name, schedule_name=schedule.name
),
)
if (
task_class.prohibits_educational
and schedule.group.ip_use.educational
):
raise Invalid(
node,
_(
"The group '{group_name}' associated with schedule "
"'{schedule_name}' operates in an "
"educational context but the task you have selected "
"prohibits educational use."
).format(
group_name=schedule.group.name, schedule_name=schedule.name
),
)
if task_class.prohibits_research and schedule.group.ip_use.research:
raise Invalid(
node,
_(
"The group '{group_name}' associated with schedule "
"'{schedule_name}' operates in a "
"research context but the task you have selected "
"prohibits research use."
).format(
group_name=schedule.group.name, schedule_name=schedule.name
),
)
[docs]class DeleteTaskScheduleItemSchema(HardWorkConfirmationSchema):
"""
Schema to delete a task schedule item.
"""
# name must match ViewParam.SCHEDULE_ITEM_ID
schedule_item_id = HiddenIntegerNode()
danger = TranslatableValidateDangerousOperationNode()
[docs]class ForciblyFinalizeChooseDeviceSchema(CSRFSchema):
"""
Schema to force-finalize records from a device.
"""
device_id = MandatoryDeviceIdSelector() # must match ViewParam.DEVICE_ID
danger = TranslatableValidateDangerousOperationNode()
[docs]class ForciblyFinalizeConfirmSchema(HardWorkConfirmationSchema):
"""
Schema to confirm force-finalizing of a device.
"""
device_id = HiddenIntegerNode() # must match ViewParam.DEVICE_ID
danger = TranslatableValidateDangerousOperationNode()
# =============================================================================
# User downloads
# =============================================================================
[docs]class HiddenDownloadFilenameNode(HiddenStringNode, RequestAwareMixin):
"""
Note to encode a hidden filename.
"""
# noinspection PyMethodMayBeStatic
def validator(self, node: SchemaNode, value: str) -> None:
if value:
try:
validate_download_filename(value, self.request)
except ValueError as e:
raise Invalid(node, str(e))
[docs]class UserDownloadDeleteSchema(CSRFSchema):
"""
Schema to capture details of a file to be deleted.
"""
filename = (
HiddenDownloadFilenameNode()
) # name must match ViewParam.FILENAME # noqa
[docs]class EmailBodyNode(MandatoryStringNode, RequestAwareMixin):
[docs] def __init__(self, *args, **kwargs) -> None:
self.title = "" # for type checker
super().__init__(*args, **kwargs)
# noinspection PyUnusedLocal
def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
_ = self.gettext
self.title = _("Message")
# noinspection PyAttributeOutsideInit
self.widget = RichTextWidget(options=get_tinymce_options(self.request))
[docs]class SendEmailSchema(CSRFSchema):
email = MandatoryEmailNode() # name must match ViewParam.EMAIL
email_cc = HiddenStringNode()
email_bcc = HiddenStringNode()
email_from = HiddenStringNode()
email_subject = MandatoryStringNode()
email_body = EmailBodyNode()