#!/usr/bin/env python
"""
camcops_server/cc_modules/cc_request.py
===============================================================================
Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CamCOPS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**Implements a Pyramid Request object customized for CamCOPS.**
"""
import collections
from contextlib import contextmanager
import datetime
import gettext
import logging
import os
import re
import secrets
from typing import (
Any,
Dict,
Generator,
List,
Optional,
Set,
Tuple,
TYPE_CHECKING,
Union,
)
import urllib.parse
from cardinal_pythonlib.datetimefunc import (
coerce_to_pendulum,
coerce_to_pendulum_date,
convert_datetime_to_utc,
format_datetime,
pendulum_to_utc_datetime_without_tz,
)
from cardinal_pythonlib.fileops import get_directory_contents_size, mkdir_p
from cardinal_pythonlib.httpconst import HttpMethod
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.plot import (
png_img_html_from_pyplot_figure,
svg_html_from_pyplot_figure,
)
import cardinal_pythonlib.rnc_web as ws
from cardinal_pythonlib.wsgi.constants import WsgiEnvVar
import lockfile
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from matplotlib.figure import Figure
from matplotlib.font_manager import FontProperties
from pendulum import Date, DateTime as Pendulum, Duration
from pendulum.parsing.exceptions import ParserError
from pyramid.config import Configurator
from pyramid.decorator import reify
from pyramid.httpexceptions import HTTPBadRequest, HTTPException
from pyramid.interfaces import ISession
from pyramid.request import Request
from pyramid.response import Response
from pyramid.testing import DummyRequest
from sqlalchemy.engine.base import Engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import Session as SqlASession
from webob.multidict import MultiDict
# Note: everything uder the sun imports this file, so keep the intra-package
# imports as minimal as possible.
from camcops_server.cc_modules.cc_baseconstants import (
DOCUMENTATION_URL,
TRANSLATIONS_DIR,
)
from camcops_server.cc_modules.cc_config import (
CamcopsConfig,
get_config,
get_config_filename_from_os_env,
)
from camcops_server.cc_modules.cc_constants import (
CSS_PAGED_MEDIA,
DateFormat,
PlotDefaults,
USE_SVG_IN_HTML,
)
from camcops_server.cc_modules.cc_idnumdef import (
get_idnum_definitions,
IdNumDefinition,
validate_id_number,
)
from camcops_server.cc_modules.cc_language import (
DEFAULT_LOCALE,
GETTEXT_DOMAIN,
POSSIBLE_LOCALES,
)
# noinspection PyUnresolvedReferences
import camcops_server.cc_modules.cc_plot # import side effects (configure matplotlib) # noqa
from camcops_server.cc_modules.cc_pyramid import (
camcops_add_mako_renderer,
CamcopsAuthenticationPolicy,
CamcopsAuthorizationPolicy,
CookieKey,
get_session_factory,
icon_html,
icon_text,
icons_text,
Permission,
RouteCollection,
Routes,
STATIC_CAMCOPS_PACKAGE_PATH,
)
from camcops_server.cc_modules.cc_response import camcops_response_factory
from camcops_server.cc_modules.cc_serversettings import (
get_server_settings,
ServerSettings,
)
from camcops_server.cc_modules.cc_string import (
all_extra_strings_as_dicts,
APPSTRING_TASKNAME,
MISSING_LOCALE,
)
from camcops_server.cc_modules.cc_tabletsession import TabletSession
from camcops_server.cc_modules.cc_text import SS, server_string
from camcops_server.cc_modules.cc_user import User
from camcops_server.cc_modules.cc_validators import (
STRING_VALIDATOR_TYPE,
validate_alphanum_underscore,
validate_redirect_url,
)
if TYPE_CHECKING:
from matplotlib.axis import Axis
from matplotlib.axes import Axes
from matplotlib.text import Text
from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
from camcops_server.cc_modules.cc_exportrecipientinfo import (
ExportRecipientInfo,
)
from camcops_server.cc_modules.cc_session import CamcopsSession
from camcops_server.cc_modules.cc_snomed import SnomedConcept
log = BraceStyleAdapter(logging.getLogger(__name__))
# =============================================================================
# Debugging options
# =============================================================================
DEBUG_ADD_ROUTES = False
DEBUG_AUTHORIZATION = False
DEBUG_CAMCOPS_SESSION = False
DEBUG_DBSESSION_MANAGEMENT = False
DEBUG_GETTEXT = False
DEBUG_REQUEST_CREATION = False
DEBUG_TABLET_SESSION = False
if any(
[
DEBUG_ADD_ROUTES,
DEBUG_AUTHORIZATION,
DEBUG_CAMCOPS_SESSION,
DEBUG_DBSESSION_MANAGEMENT,
DEBUG_GETTEXT,
DEBUG_REQUEST_CREATION,
DEBUG_TABLET_SESSION,
]
):
log.warning("Debugging options enabled!")
# =============================================================================
# Constants
# =============================================================================
TRUE_STRINGS_LOWER_CASE = ["true", "t", "1", "yes", "y"]
FALSE_STRINGS_LOWER_CASE = ["false", "f", "0", "no", "n"]
# =============================================================================
# Modified Request interface, for type checking
# =============================================================================
# https://docs.pylonsproject.org/projects/pyramid_cookbook/en/latest/auth/user_object.html
# https://rollbar.com/blog/using-pyramid-request-factory-to-write-less-code/
#
# ... everything with reify=True is cached, so if we ask for something
# more than once, we keep getting the same thing
# ... https://docs.pylonsproject.org/projects/pyramid/en/latest/api/request.html#pyramid.request.Request.set_property # noqa
[docs]class CamcopsRequest(Request):
"""
The CamcopsRequest is an object central to all HTTP requests. It is the
main thing passed all around the server, and embodies what we need to know
about the client request -- including user information, ways of accessing
the database, and so on.
It reads its config (on first demand) from the config file specified in
``os.environ[ENVVAR_CONFIG_FILE]``.
"""
[docs] def __init__(self, *args, **kwargs):
"""
This is called as the Pyramid request factory; see
``config.set_request_factory(CamcopsRequest)``
What's the best way of handling the database client?
- With Titanium, we were constrained not to use cookies. With Qt, we
have the option.
- But are cookies a good idea?
Probably not; they are somewhat overcomplicated for this.
See also
- https://softwareengineering.stackexchange.com/questions/141019/
- https://stackoverflow.com/questions/6068113/do-sessions-really-violate-restfulness
- Let's continue to avoid cookies.
- We don't have to cache any information (we still send username/
password details with each request, and that is RESTful) but it
does save authentication time to do so on calls after the first.
- What we could try to do is:
- look up a session here, at Request creation time;
- add a new session if there wasn't one;
- but allow the database API code to replace that session (BEFORE
it's saved to the database and gains its PK) with another,
determined by the content.
- This gives one more database hit, but avoids the bcrypt time.
""" # noqa
super().__init__(*args, **kwargs)
self.use_svg = False # use SVG (not just PNG) for graphics
self.provide_png_fallback_for_svg = (
True # for SVG: provide PNG fallback image?
)
self.add_response_callback(complete_request_add_cookies)
self._camcops_session = None # type: Optional[CamcopsSession]
self._debugging_db_session = (
None
) # type: Optional[SqlASession] # for unit testing only
self._debugging_user = (
None
) # type: Optional[User] # for unit testing only
self._pending_export_push_requests = (
[]
) # type: List[Tuple[str, str, int]]
self._cached_sstring = {} # type: Dict[SS, str]
# Don't make the _camcops_session yet; it will want a Registry, and
# we may not have one yet; see command_line_request().
if DEBUG_REQUEST_CREATION:
log.debug(
"CamcopsRequest.__init__: args={!r}, kwargs={!r}", args, kwargs
)
# -------------------------------------------------------------------------
# HTTP nonce
# -------------------------------------------------------------------------
@reify
def nonce(self) -> str:
"""
Return a nonce that is generated at random for each request, but
remains constant for that request (because we use ``@reify``).
See https://content-security-policy.com/examples/allow-inline-style/.
And for how to make one:
https://stackoverflow.com/questions/5590170/what-is-the-standard-method-for-generating-a-nonce-in-python
""" # noqa
return secrets.token_urlsafe()
# -------------------------------------------------------------------------
# CamcopsSession
# -------------------------------------------------------------------------
@property
def camcops_session(self) -> "CamcopsSession":
"""
Returns the
:class:`camcops_server.cc_modules.cc_session.CamcopsSession` for this
request (q.v.).
Contrast:
.. code-block:: none
ccsession = request.camcops_session # type: CamcopsSession
pyramid_session = request.session # type: ISession
"""
if self._camcops_session is None:
from camcops_server.cc_modules.cc_session import (
CamcopsSession,
) # delayed import
self._camcops_session = CamcopsSession.get_session_using_cookies(
self
)
if DEBUG_CAMCOPS_SESSION:
log.debug("{!r}", self._camcops_session)
return self._camcops_session
[docs] def replace_camcops_session(self, ccsession: "CamcopsSession") -> None:
"""
Replaces any existing
:class:`camcops_server.cc_modules.cc_session.CamcopsSession` with a new
one.
Rationale:
We may have created a new HTTP session because the request had no
cookies (added to the DB session but not yet saved), but we might
then enter the database/tablet upload API and find session details,
not from the cookies, but from the POST data. At that point, we
want to replace the session in the Request, without committing the
first one to disk.
"""
if self._camcops_session is not None:
self.dbsession.expunge(self._camcops_session)
self._camcops_session = ccsession
[docs] def complete_request_add_cookies(self) -> None:
"""
Finializes the response by adding session cookies.
We do this late so that we can hot-swap the session if we're using the
database/tablet API rather than a human web browser.
Response callbacks are called in the order
first-to-most-recently-added. See
:class:`pyramid.request.CallbackMethodsMixin`.
That looks like we can add a callback in the process of running a
callback. And when we add a cookie to a Pyramid session, that sets a
callback. Let's give it a go...
"""
# 2019-03-21: If we've not used a CamcopsSession (e.g. for serving
# a static view), do we care?
if self._camcops_session is None:
return
dbsession = self.dbsession
dbsession.flush() # sets the PK for ccsession, if it wasn't set
# Write the details back to the Pyramid session (will be persisted
# via the Response automatically):
pyramid_session = self.session # type: ISession
ccsession = self.camcops_session
pyramid_session[CookieKey.SESSION_ID] = str(ccsession.id)
pyramid_session[CookieKey.SESSION_TOKEN] = ccsession.token
# ... should cause the ISession to add a callback to add cookies,
# which will be called immediately after this one.
# -------------------------------------------------------------------------
# Config
# -------------------------------------------------------------------------
@reify
def config_filename(self) -> str:
"""
Gets the CamCOPS config filename in use, from the config file specified
in ``os.environ[ENVVAR_CONFIG_FILE]``.
"""
return get_config_filename_from_os_env()
@reify
def config(self) -> CamcopsConfig:
"""
Return an instance of
:class:`camcops_server/cc_modules/cc_config.CamcopsConfig` for the
request.
Access it as ``request.config``, with no brackets.
"""
config = get_config(config_filename=self.config_filename)
return config
# -------------------------------------------------------------------------
# Database
# -------------------------------------------------------------------------
@reify
def engine(self) -> Engine:
"""
Returns the SQLAlchemy :class:`Engine` for the request.
"""
cfg = self.config
return cfg.get_sqla_engine()
@reify
def dbsession(self) -> SqlASession:
"""
Return an SQLAlchemy session for the relevant request.
The use of ``@reify`` makes this elegant. If and only if a view wants a
database, it can say
.. code-block:: python
dbsession = request.dbsession
and if it requests that, the cleanup callbacks (COMMIT or ROLLBACK) get
installed.
"""
# log.debug("CamcopsRequest.dbsession: caller stack:\n{}",
# "\n".join(get_caller_stack_info()))
_dbsession = self.get_bare_dbsession()
def end_sqlalchemy_session(req: Request) -> None:
# noinspection PyProtectedMember
req._finish_dbsession()
# - For command-line pseudo-requests, add_finished_callback is no use,
# because that's called by the Pyramid routing framework.
# - So how do we autocommit a command-line session?
# - Hooking into CamcopsRequest.__del__ did not work: called, yes, but
# object state (e.g. newly inserted User objects) went wrong (e.g.
# the objects had been blanked somehow, or that's what the INSERT
# statements looked like).
# - Use a context manager instead; see below.
self.add_finished_callback(end_sqlalchemy_session)
if DEBUG_DBSESSION_MANAGEMENT:
log.debug(
"Returning SQLAlchemy session as " "CamcopsRequest.dbsession"
)
return _dbsession
def _finish_dbsession(self) -> None:
"""
A database session has finished. COMMIT or ROLLBACK, depending on how
things went.
"""
# Do NOT roll back "if req.exception is not None"; that includes
# all sorts of exceptions like HTTPFound, HTTPForbidden, etc.
# See also
# - https://docs.pylonsproject.org/projects/pyramid_cookbook/en/latest/pylons/exceptions.html # noqa
# But they are neatly subclasses of HTTPException, and isinstance()
# deals with None, so:
session = self.dbsession
if self.exception is not None and not isinstance(
self.exception, HTTPException
):
log.critical(
"Request raised exception that wasn't an "
"HTTPException; rolling back; exception was: {!r}",
self.exception,
)
session.rollback()
else:
if DEBUG_DBSESSION_MANAGEMENT:
log.debug("Committing to database")
session.commit()
if self._pending_export_push_requests:
self._process_pending_export_push_requests()
if DEBUG_DBSESSION_MANAGEMENT:
log.debug("Closing SQLAlchemy session")
session.close()
[docs] def get_bare_dbsession(self) -> SqlASession:
"""
Returns a bare SQLAlchemy session for the request.
See :func:`dbsession`, the more commonly used wrapper function.
"""
if self._debugging_db_session:
log.debug("Request is using debugging SQLAlchemy session")
return self._debugging_db_session
if DEBUG_DBSESSION_MANAGEMENT:
log.debug("Making SQLAlchemy session")
engine = self.engine
maker = sessionmaker(bind=engine)
session = maker() # type: SqlASession
return session
# -------------------------------------------------------------------------
# TabletSession
# -------------------------------------------------------------------------
@reify
def tabletsession(self) -> TabletSession:
"""
Request a
:class:`camcops_server.cc_modules.cc_tabletsession.TabletSession`,
which is an information structure geared to client (tablet) database
accesses.
If we're using this interface, we also want to ensure we're using
the :class:`camcops_server.cc_modules.cc_session.CamcopsSession` for
the information provided by the tablet in the POST request, not
anything already loaded/reset via cookies.
"""
from camcops_server.cc_modules.cc_session import (
CamcopsSession,
) # delayed import
ts = TabletSession(self)
new_cc_session = CamcopsSession.get_session_for_tablet(ts)
# ... does login
self.replace_camcops_session(new_cc_session)
ts.set_session_id_token(new_cc_session.id, new_cc_session.token)
if DEBUG_TABLET_SESSION:
log.debug("CamcopsRequest: {!r}", self)
log.debug("CamcopsRequest.tabletsession: {!r}", ts)
log.debug(
"CamcopsRequest.camcops_session: {!r}", self._camcops_session
)
return ts
# -------------------------------------------------------------------------
# Date/time
# -------------------------------------------------------------------------
@reify
def now(self) -> Pendulum:
"""
Returns the time of the request as an Pendulum object.
(Reified, so a request only ever has one time.)
Exposed as a property.
"""
return Pendulum.now()
@reify
def now_utc(self) -> Pendulum:
"""
Returns the time of the request as a UTC Pendulum.
"""
p = self.now # type: Pendulum
return convert_datetime_to_utc(p)
@reify
def now_utc_no_tzinfo(self) -> datetime.datetime:
"""
Returns the time of the request as a datetime in UTC with no timezone
information attached. For when you want to compare to something similar
without getting the error "TypeError: can't compare offset-naive and
offset-aware datetimes".
"""
p = self.now # type: Pendulum
return pendulum_to_utc_datetime_without_tz(p)
@reify
def now_era_format(self) -> str:
"""
Returns the request time in an ISO-8601 format suitable for use as a
CamCOPS ``era``.
"""
return format_datetime(self.now_utc, DateFormat.ERA)
@property
def today(self) -> Date:
"""
Returns today's date.
"""
# noinspection PyTypeChecker
return self.now.date()
# -------------------------------------------------------------------------
# Logos, static files, and other institution-specific stuff
# -------------------------------------------------------------------------
@property
def url_local_institution(self) -> str:
"""
Returns the local institution's home URL.
"""
return self.config.local_institution_url
@property
def url_camcops_favicon(self) -> str:
"""
Returns a URL to the favicon (see
https://en.wikipedia.org/wiki/Favicon) from within the CamCOPS static
files.
"""
# Cope with reverse proxies, etc.
# https://docs.pylonsproject.org/projects/pyramid/en/latest/api/request.html#pyramid.request.Request.static_url # noqa
return self.static_url(
STATIC_CAMCOPS_PACKAGE_PATH + "favicon_camcops.png"
)
@property
def url_camcops_logo(self) -> str:
"""
Returns a URL to the CamCOPS logo from within our static files.
Returns:
"""
return self.static_url(
STATIC_CAMCOPS_PACKAGE_PATH + "logo_camcops.png"
)
@property
def url_local_logo(self) -> str:
"""
Returns a URL to the local institution's logo, from somewhere on our
server.
"""
return self.static_url(STATIC_CAMCOPS_PACKAGE_PATH + "logo_local.png")
@property
def url_camcops_docs(self) -> str:
"""
Returns the URL to the CamCOPS documentation.
"""
return DOCUMENTATION_URL
# -------------------------------------------------------------------------
# Icons
# -------------------------------------------------------------------------
[docs] @staticmethod
def icon(
icon: str,
alt: str,
url: str = None,
extra_classes: List[str] = None,
extra_styles: List[str] = None,
escape_alt: bool = True,
) -> str:
"""
Instantiates a Bootstrap icon, usually with a hyperlink. Returns
rendered HTML.
Args:
icon:
Icon name, without ".svg" extension (or "bi-" prefix!).
alt:
Alternative text for image.
url:
Optional URL of hyperlink.
extra_classes:
Optional extra CSS classes for the icon.
extra_styles:
Optional extra CSS styles for the icon (each looks like:
"color: blue").
escape_alt:
HTML-escape the alt text? Default is True.
"""
return icon_html(
icon=icon,
alt=alt,
url=url,
extra_classes=extra_classes,
extra_styles=extra_styles,
escape_alt=escape_alt,
)
[docs] @staticmethod
def icon_text(
icon: str,
text: str,
url: str = None,
alt: str = None,
extra_icon_classes: List[str] = None,
extra_icon_styles: List[str] = None,
extra_a_classes: List[str] = None,
extra_a_styles: List[str] = None,
escape_alt: bool = True,
escape_text: bool = True,
hyperlink_together: bool = False,
) -> str:
"""
Provide an icon and accompanying text. Usually, both are hyperlinked
(to the same destination URL). Returns rendered HTML.
Args:
icon:
Icon name, without ".svg" extension.
url:
Optional URL of hyperlink.
alt:
Alternative text for image. Will default to the main text.
text:
Main text to display.
extra_icon_classes:
Optional extra CSS classes for the icon.
extra_icon_styles:
Optional extra CSS styles for the icon (each looks like:
"color: blue").
extra_a_classes:
Optional extra CSS classes for the <a> element.
extra_a_styles:
Optional extra CSS styles for the <a> element.
escape_alt:
HTML-escape the alt text?
escape_text:
HTML-escape the main text?
hyperlink_together:
Hyperlink the image and text as one (rather than separately and
adjacent to each other)?
"""
return icon_text(
icon=icon,
text=text,
url=url,
alt=alt,
extra_icon_classes=extra_icon_classes,
extra_icon_styles=extra_icon_styles,
extra_a_classes=extra_a_classes,
extra_a_styles=extra_a_styles,
escape_alt=escape_alt,
escape_text=escape_text,
hyperlink_together=hyperlink_together,
)
[docs] @staticmethod
def icons_text(
icons: List[str],
text: str,
url: str = None,
alt: str = None,
extra_icon_classes: List[str] = None,
extra_icon_styles: List[str] = None,
extra_a_classes: List[str] = None,
extra_a_styles: List[str] = None,
escape_alt: bool = True,
escape_text: bool = True,
hyperlink_together: bool = False,
) -> str:
"""
Multiple-icon version of :meth:``icon_text``.
"""
return icons_text(
icons=icons,
text=text,
url=url,
alt=alt,
extra_icon_classes=extra_icon_classes,
extra_icon_styles=extra_icon_styles,
extra_a_classes=extra_a_classes,
extra_a_styles=extra_a_styles,
escape_alt=escape_alt,
escape_text=escape_text,
hyperlink_together=hyperlink_together,
)
# -------------------------------------------------------------------------
# Low-level HTTP information
# -------------------------------------------------------------------------
@reify
def remote_port(self) -> Optional[int]:
"""
What port number is the client using?
The ``remote_port`` variable is an optional WSGI extra provided by some
frameworks, such as mod_wsgi.
The WSGI spec:
- https://www.python.org/dev/peps/pep-0333/
The CGI spec:
- https://en.wikipedia.org/wiki/Common_Gateway_Interface
The Pyramid Request object:
- https://docs.pylonsproject.org/projects/pyramid/en/latest/api/request.html#pyramid.request.Request
- ... note: that includes ``remote_addr``, but not ``remote_port``.
""" # noqa
try:
return int(self.environ.get("REMOTE_PORT", ""))
except (TypeError, ValueError):
return None
# -------------------------------------------------------------------------
# HTTP request convenience functions
# -------------------------------------------------------------------------
[docs] def has_param(self, key: str) -> bool:
"""
Is the parameter in the request?
Args:
key: the parameter's name
"""
return key in self.params
[docs] def get_str_param(
self,
key: str,
default: str = None,
lower: bool = False,
upper: bool = False,
validator: STRING_VALIDATOR_TYPE = validate_alphanum_underscore,
) -> Optional[str]:
"""
Returns an HTTP parameter from the request (GET or POST). If it does
not exist, or is blank, return ``default``. If it fails the validator,
raise :exc:`pyramid.httpexceptions.HTTPBadRequest`.
Args:
key: the parameter's name
default: the value to return if the parameter is not found
lower: convert to lower case?
upper: convert to upper case?
validator: validator function
Returns:
the parameter's (string) contents, or ``default``
"""
# HTTP parameters are always strings at heart
if key not in self.params: # missing from request?
return default
value = self.params.get(key)
if not value: # blank, e.g. "source=" in URL?
return default
assert isinstance(value, str) # ... or we wouldn't have got here
if lower:
value = value.lower()
elif upper:
value = value.upper()
try:
validator(value, self)
return value
except ValueError as e:
raise HTTPBadRequest(f"Bad {key!r} parameter: {e}")
[docs] def get_str_list_param(
self,
key: str,
lower: bool = False,
upper: bool = False,
validator: STRING_VALIDATOR_TYPE = validate_alphanum_underscore,
) -> List[str]:
"""
Returns a list of HTTP parameter values from the request. Ensures all
have been validated.
Args:
key: the parameter's name
lower: convert to lower case?
upper: convert to upper case?
validator: validator function
Returns:
a list of string values
"""
values = self.params.getall(key)
if lower:
values = [x.lower() for x in values]
elif upper:
values = [x.upper() for x in values]
try:
for v in values:
validator(v, self)
except ValueError as e:
raise HTTPBadRequest(
f"Parameter {key!r} contains a bad value: {e}"
)
return values
[docs] def get_int_param(self, key: str, default: int = None) -> Optional[int]:
"""
Returns an integer parameter from the HTTP request.
Args:
key: the parameter's name
default: the value to return if the parameter is not found or is
not a valid integer
Returns:
an integer, or ``default``
"""
try:
return int(self.params[key])
except (KeyError, TypeError, ValueError):
return default
[docs] def get_int_list_param(self, key: str) -> List[int]:
"""
Returns a list of integer parameter values from the HTTP request.
Args:
key: the parameter's name
Returns:
a list of integer values
"""
values = self.params.getall(key)
try:
return [int(x) for x in values]
except (KeyError, TypeError, ValueError):
return []
[docs] def get_bool_param(self, key: str, default: bool) -> bool:
"""
Returns a boolean parameter from the HTTP request.
Args:
key: the parameter's name
default: the value to return if the parameter is not found or is
not a valid boolean value
Returns:
an integer, or ``default``
Valid "true" and "false" values (case-insensitive): see
``TRUE_STRINGS_LOWER_CASE``, ``FALSE_STRINGS_LOWER_CASE``.
"""
try:
param_str = self.params[key].lower()
if param_str in TRUE_STRINGS_LOWER_CASE:
return True
elif param_str in FALSE_STRINGS_LOWER_CASE:
return False
else:
return default
except (AttributeError, KeyError, TypeError, ValueError):
return default
[docs] def get_date_param(self, key: str) -> Optional[Date]:
"""
Returns a date parameter from the HTTP request. If it is missing or
looks bad, return ``None``.
Args:
key: the parameter's name
Returns:
a :class:`pendulum.Date`, or ``None``
"""
try:
return coerce_to_pendulum_date(self.params[key])
except (KeyError, ParserError, TypeError, ValueError):
return None
[docs] def get_datetime_param(self, key: str) -> Optional[Pendulum]:
"""
Returns a datetime parameter from the HTTP request. If it is missing or
looks bad, return ``None``.
Args:
key: the parameter's name
Returns:
a :class:`pendulum.DateTime`, or ``None``
"""
try:
return coerce_to_pendulum(self.params[key])
except (KeyError, ParserError, TypeError, ValueError):
return None
[docs] def get_redirect_url_param(
self, key: str, default: str = None
) -> Optional[str]:
"""
Returns a redirection URL parameter from the HTTP request, validating
it. (The validation process does not allow all types of URLs!)
If it was missing, return ``default``. If it was bad, raise
:exc:`pyramid.httpexceptions.HTTPBadRequest`.
Args:
key:
the parameter's name
default:
the value to return if the parameter is not found, or is
invalid
Returns:
a URL string, or ``default``
"""
return self.get_str_param(
key, default=default, validator=validate_redirect_url
)
# -------------------------------------------------------------------------
# Routing
# -------------------------------------------------------------------------
[docs] def route_url_params(
self, route_name: str, paramdict: Dict[str, Any]
) -> str:
"""
Provides a simplified interface to :func:`Request.route_url` when you
have parameters to pass.
It does two things:
(1) convert all params to their ``str()`` form;
(2) allow you to pass parameters more easily using a string
parameter name.
The normal Pyramid Request use is:
.. code-block:: python
Request.route_url(route_name, param1=value1, param2=value2)
where "param1" is the literal name of the parameter, but here we can do
.. code-block:: python
CamcopsRequest.route_url_params(route_name, {
PARAM1_NAME: value1_not_necessarily_str,
PARAM2_NAME: value2
})
"""
strparamdict = {k: str(v) for k, v in paramdict.items()}
return self.route_url(route_name, **strparamdict)
# -------------------------------------------------------------------------
# Strings
# -------------------------------------------------------------------------
@reify
def _all_extra_strings(self) -> Dict[str, Dict[str, Dict[str, str]]]:
"""
Returns all CamCOPS "extra strings" (from XML files) in the format
used by :func:`camcops_server.cc_string.all_extra_strings_as_dicts`.
"""
return all_extra_strings_as_dicts(self.config_filename)
[docs] def xstring(
self,
taskname: str,
stringname: str,
default: str = None,
provide_default_if_none: bool = True,
language: str = None,
) -> Optional[str]:
"""
Looks up a string from one of the optional extra XML string files.
Args:
taskname: task name (top-level key)
stringname: string name within task (second-level key)
default: default to return if the string is not found
provide_default_if_none: if ``True`` and ``default is None``,
return a helpful missing-string message in the style
"string x.y not found"
language: language code to use, e.g. ``en-GB``; if ``None`` is
passed, the default behaviour is to look up the current
language for this request (see :meth:`language`).
Returns:
the "extra string"
"""
# For speed, calculate default only if needed:
allstrings = self._all_extra_strings
if taskname in allstrings:
taskstrings = allstrings[taskname]
if stringname in taskstrings:
langversions = taskstrings[stringname]
if language is None:
language = self.language
if language: # Specific language requested
# 1. Requested language, e.g. "en-GB"
if language in langversions:
return langversions[language]
# 2. Same language, different country, e.g. "en-US"
shortlang = language[:2] # e.g. "en"
for key in langversions.keys():
if key.startswith(shortlang):
return langversions[shortlang]
# 3. Default language
if DEFAULT_LOCALE in langversions:
return langversions[DEFAULT_LOCALE]
# 4. Strings with no language specified in the XML
if MISSING_LOCALE in langversions:
return langversions[MISSING_LOCALE]
# Not found
if default is None and provide_default_if_none:
default = (
f"EXTRA_STRING_NOT_FOUND({taskname}.{stringname}[{language}])"
)
return default
[docs] def wxstring(
self,
taskname: str,
stringname: str,
default: str = None,
provide_default_if_none: bool = True,
language: str = None,
) -> Optional[str]:
"""
Returns a web-safe version of an :func:`xstring` (q.v.).
"""
value = self.xstring(
taskname,
stringname,
default,
provide_default_if_none=provide_default_if_none,
language=language,
)
if value is None and not provide_default_if_none:
return None
return ws.webify(value)
[docs] def wappstring(
self,
stringname: str,
default: str = None,
provide_default_if_none: bool = True,
language: str = None,
) -> Optional[str]:
"""
Returns a web-safe version of an appstring (an app-wide extra string).
This uses the XML file shared between the client and the server.
"""
value = self.xstring(
APPSTRING_TASKNAME,
stringname,
default,
provide_default_if_none=provide_default_if_none,
language=language,
)
if value is None and not provide_default_if_none:
return None
return ws.webify(value)
@reify
def language(self) -> str:
"""
Returns the language code selected by the current user, or if none is
selected (or the user isn't logged in) the server's default language.
Returns:
str: a language code of the form ``en-GB``
"""
if self.user is not None:
language = self.user.language
if language in POSSIBLE_LOCALES:
return language
# Fallback to default
return self.config.language
@reify
def language_iso_639_1(self) -> str:
"""
Returns the language code selected by the current user, or if none is
selected (or the user isn't logged in) the server's default language.
This assumes all the possible supported languages start with a
two-letter primary language tag, which currently they do.
Returns:
str: a two-letter language code of the form ``en``
"""
return self.language[:2]
[docs] def gettext(self, message: str) -> str:
"""
Returns a version of ``msg`` translated into the current language.
This is used for server-only strings.
The ``gettext()`` function is normally aliased to ``_()`` for
auto-translation tools to read the souce code.
"""
lang = self.language
# We can't work out if the string is missing; gettext falls back to
# the source message.
if lang == DEFAULT_LOCALE:
translated = message
else:
try:
translator = gettext.translation(
domain=GETTEXT_DOMAIN,
localedir=TRANSLATIONS_DIR,
languages=[lang],
)
translated = translator.gettext(message)
except OSError: # e.g. translation file not found
log.warning(f"Failed to find translation files for {lang}")
translated = message
if DEBUG_GETTEXT:
return f"[{message}→{lang}→{translated}]"
else:
return translated
[docs] def wgettext(self, message: str) -> str:
"""
A web-safe version of :func:`gettext`.
"""
return ws.webify(self.gettext(message))
[docs] def sstring(self, which_string: SS) -> str:
"""
Returns a translated server string via a lookup mechanism.
Args:
which_string:
which string? A :class:`camcops_server.cc_modules.cc_text.SS`
enumeration value
Returns:
str: the string
"""
try:
result = self._cached_sstring[which_string]
except KeyError:
result = server_string(self, which_string)
self._cached_sstring[which_string] = result
return result
[docs] def wsstring(self, which_string: SS) -> str:
"""
Returns a web-safe version of a translated server string via a lookup
mechanism.
Args:
which_string:
which string? A :class:`camcops_server.cc_modules.cc_text.SS`
enumeration value
Returns:
str: the string
"""
return ws.webify(self.sstring(which_string))
# -------------------------------------------------------------------------
# PNG versus SVG output, so tasks don't have to care (for e.g. PDF/web)
# -------------------------------------------------------------------------
# ... wkhtmltopdf can cope with SVGs
[docs] def switch_output_to_png(self) -> None:
"""
Switch server (for this request) to producing figures in PNG format.
"""
self.use_svg = False
[docs] def switch_output_to_svg(self, provide_png_fallback: bool = True) -> None:
"""
Switch server (for this request) to producing figures in SVG format.
Args:
provide_png_fallback:
Offer a PNG fallback option/
"""
self.use_svg = True
self.provide_png_fallback_for_svg = provide_png_fallback
@reify
def fontdict(self) -> Dict[str, Any]:
"""
Returns a font dictionary for use with Matplotlib plotting.
**matplotlib font handling and fontdict parameter**
- https://stackoverflow.com/questions/3899980
- https://matplotlib.org/users/customizing.html
- matplotlib/font_manager.py
- Note that the default TrueType font is "DejaVu Sans"; see
:class:`matplotlib.font_manager.FontManager`
- Example sequence:
- CamCOPS does e.g. ``ax.set_xlabel("Date/time",
fontdict=self.req.fontdict)``
- matplotlib.axes.Axes.set_xlabel:
https://matplotlib.org/api/_as_gen/matplotlib.axes.Axes.set_xlabel.html
- matplotlib.axes.Axes.text documentation, explaining the fontdict
parameter:
https://matplotlib.org/api/_as_gen/matplotlib.axes.Axes.text.html
- What's created is probably a :class:`matplotlib.text.Text` object,
whose ``update()`` function is called with the dictionary. Via its
superclass :class:`matplotlib.artist.Artist` and its ``update()``
function, this sets attributes on the Text object. Ultimately,
without having explored this in too much depth, it's probably the
``self._fontproperties`` object of Text that holds this info.
- That is an instance of
:class:`matplotlib.font_manager.FontProperties`.
**Linux fonts**
Anyway, the main things are (1) that the relevant fonts need to be
installed, and (2) that the default is DejaVu Sans.
- Linux fonts are installed in ``/usr/share/fonts``, and TrueType fonts
within ``/usr/share/fonts/truetype``.
- Use ``fc-match`` to see the font mappings being used.
- Use ``fc-list`` to list available fonts.
- Use ``fc-cache`` to rebuild the font cache.
- Files in ``/etc/fonts/conf.avail/`` do some thinking.
**Problems with pixellated fonts in PDFs made via wkhtmltopdf**
- See also https://github.com/wkhtmltopdf/wkhtmltopdf/issues/2193,
about pixellated fonts via wkhtmltopdf (which was our problem for a
subset of the fonts in trackers, on 2020-06-28, using wkhtmltopd
0.12.5 with patched Qt).
- When you get pixellated fonts in a PDF, look also at the embedded
font list in the PDF (e.g. in Okular: File -> Properties -> Fonts).
- Matplotlib helpfully puts the text (rendered as lines in SVG) as
comments.
- As a debugging sequence, we can manually trim the "pdfhtml" output
down to just the SVG file. Still has problems. Yet there's no text
in it; the text is made of pure SVG lines. And Chrome renders it
perfectly. As does Firefox.
- The rendering bug goes away entirely if you delete the opacity
styling throughout the SVG:
.. code-block:: none
<g style="opacity:0.5;" transform=...>
^^^^^^^^^^^^^^^^^^^^
this
- So, simple fix:
- rather than opacity (alpha) 0.5 and on top...
- 50% grey colour and on the bottom.
""" # noqa
fontsize = self.config.plot_fontsize
return dict(
family="sans-serif",
# ... serif, sans-serif, cursive, fantasy, monospace
style="normal", # normal (roman), italic, oblique
variant="normal", # normal, small-caps
weight="normal",
# ... normal [=400], bold [=700], bolder [relative to current],
# lighter [relative], 100, 200, 300, ..., 900
size=fontsize, # in pt (default 12)
)
@reify
def fontprops(self) -> FontProperties:
"""
Return a :class:`matplotlib.font_manager.FontProperties` object for
use with Matplotlib plotting.
"""
return FontProperties(**self.fontdict)
# -------------------------------------------------------------------------
# Convenience functions for user information
# -------------------------------------------------------------------------
@property
def user(self) -> Optional["User"]:
"""
Returns the :class:`camcops_server.cc_modules.cc_user.User` for the
current request.
"""
return self._debugging_user or self.camcops_session.user
@property
def user_id(self) -> Optional[int]:
"""
Returns the integer user ID for the current request.
"""
if self._debugging_user:
return self._debugging_user.id
return self.camcops_session.user_id
# -------------------------------------------------------------------------
# ID number definitions
# -------------------------------------------------------------------------
@reify
def idnum_definitions(self) -> List[IdNumDefinition]:
"""
Returns all
:class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` objects.
"""
return get_idnum_definitions(self.dbsession) # no longer cached
@reify
def valid_which_idnums(self) -> List[int]:
"""
Returns the ``which_idnum`` values for all ID number definitions.
"""
return [iddef.which_idnum for iddef in self.idnum_definitions]
# ... pre-sorted
[docs] def get_idnum_definition(
self, which_idnum: int
) -> Optional[IdNumDefinition]:
"""
Retrieves an
:class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` for the
specified ``which_idnum`` value.
"""
return next(
(
iddef
for iddef in self.idnum_definitions
if iddef.which_idnum == which_idnum
),
None,
)
[docs] def get_id_desc(
self, which_idnum: int, default: str = None
) -> Optional[str]:
"""
Get the server's ID description for the specified ``which_idnum``
value.
"""
return next(
(
iddef.description
for iddef in self.idnum_definitions
if iddef.which_idnum == which_idnum
),
default,
)
[docs] def get_id_shortdesc(
self, which_idnum: int, default: str = None
) -> Optional[str]:
"""
Get the server's short ID description for the specified ``which_idnum``
value.
"""
return next(
(
iddef.short_description
for iddef in self.idnum_definitions
if iddef.which_idnum == which_idnum
),
default,
)
[docs] def is_idnum_valid(
self, which_idnum: int, idnum_value: Optional[int]
) -> bool:
"""
Does the ID number pass any extended validation checks?
Args:
which_idnum: which ID number type is this?
idnum_value: ID number value
Returns:
bool: valid?
"""
idnumdef = self.get_idnum_definition(which_idnum)
if not idnumdef:
return False
valid, _ = validate_id_number(
self, idnum_value, idnumdef.validation_method
)
return valid
[docs] def why_idnum_invalid(
self, which_idnum: int, idnum_value: Optional[int]
) -> str:
"""
Why does the ID number fail any extended validation checks?
Args:
which_idnum: which ID number type is this?
idnum_value: ID number value
Returns:
str: why invalid? (Human-readable string.)
"""
idnumdef = self.get_idnum_definition(which_idnum)
if not idnumdef:
_ = self.gettext
return _("Can't fetch ID number definition")
_, why = validate_id_number(
self, idnum_value, idnumdef.validation_method
)
return why
# -------------------------------------------------------------------------
# Server settings
# -------------------------------------------------------------------------
@reify
def server_settings(self) -> ServerSettings:
"""
Return the
:class:`camcops_server.cc_modules.cc_serversettings.ServerSettings` for
the server.
"""
return get_server_settings(self)
@reify
def database_title(self) -> str:
"""
Return the database friendly title for the server.
"""
ss = self.server_settings
return ss.database_title or ""
[docs] def set_database_title(self, title: str) -> None:
"""
Sets the database friendly title for the server.
"""
ss = self.server_settings
ss.database_title = title
# -------------------------------------------------------------------------
# SNOMED-CT
# -------------------------------------------------------------------------
@reify
def snomed_supported(self) -> bool:
"""
Is SNOMED-CT supported for CamCOPS tasks?
"""
return bool(self.config.get_task_snomed_concepts())
[docs] def snomed(self, lookup: str) -> "SnomedConcept":
"""
Fetches a SNOMED-CT concept for a CamCOPS task.
Args:
lookup: a CamCOPS SNOMED lookup string
Returns:
a :class:`camcops_server.cc_modules.cc_snomed.SnomedConcept`
Raises:
:exc:`KeyError`, if the lookup cannot be found (e.g. UK data not
installed)
"""
concepts = self.config.get_task_snomed_concepts()
assert concepts, "No SNOMED-CT data available for CamCOPS tasks"
return concepts[lookup]
@reify
def icd9cm_snomed_supported(self) -> bool:
"""
Is SNOMED-CT supported for ICD-9-CM codes?
"""
return bool(self.config.get_icd9cm_snomed_concepts())
[docs] def icd9cm_snomed(self, code: str) -> List["SnomedConcept"]:
"""
Fetches a SNOMED-CT concept for an ICD-9-CM code
Args:
code: an ICD-9-CM code
Returns:
a :class:`camcops_server.cc_modules.cc_snomed.SnomedConcept`
Raises:
:exc:`KeyError`, if the lookup cannot be found (e.g. data not
installed)
"""
concepts = self.config.get_icd9cm_snomed_concepts()
assert concepts, "No SNOMED-CT data available for ICD-9-CM"
return concepts[code]
@reify
def icd10_snomed_supported(self) -> bool:
"""
Is SNOMED-CT supported for ICD-10 codes?
"""
return bool(self.config.get_icd9cm_snomed_concepts())
[docs] def icd10_snomed(self, code: str) -> List["SnomedConcept"]:
"""
Fetches a SNOMED-CT concept for an ICD-10 code
Args:
code: an ICD-10 code
Returns:
a :class:`camcops_server.cc_modules.cc_snomed.SnomedConcept`
Raises:
:exc:`KeyError`, if the lookup cannot be found (e.g. data not
installed)
"""
concepts = self.config.get_icd10_snomed_concepts()
assert concepts, "No SNOMED-CT data available for ICD-10"
return concepts[code]
# -------------------------------------------------------------------------
# Export recipients
# -------------------------------------------------------------------------
[docs] def get_export_recipients(
self,
recipient_names: List[str] = None,
all_recipients: bool = False,
all_push_recipients: bool = False,
save: bool = True,
database_versions: bool = True,
) -> List[Union["ExportRecipient", "ExportRecipientInfo"]]:
"""
Returns a list of export recipients, with some filtering if desired.
Validates them against the database.
- If ``all_recipients``, return all.
- Otherwise, if ``all_push_recipients``, return all "push" recipients.
- Otherwise, return all named in ``recipient_names``.
- If any are invalid, raise an error.
- If any are duplicate, raise an error.
- Holds a global export file lock for some database access relating to
export recipient records.
Args:
all_recipients: use all recipients?
all_push_recipients: use all "push" recipients?
recipient_names: recipient names
save: save any freshly created recipient records to the DB?
database_versions: return ExportRecipient objects that are attached
to a database session (rather than ExportRecipientInfo objects
that aren't)?
Returns:
list: of :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
Raises:
- :exc:`ValueError` if a name is invalid
- :exc:`ValueError` if a name is duplicated
- :exc:`camcops_server.cc_modules.cc_exportrecipient.InvalidExportRecipient`
if an export recipient configuration is invalid
""" # noqa
# Delayed imports
from camcops_server.cc_modules.cc_exportrecipient import (
ExportRecipient,
) # delayed import
# Check parameters
recipient_names = recipient_names or [] # type: List[str]
if save and not database_versions:
raise AssertionError("Can't save unless taking database versions")
# Start with ExportRecipientInfo objects:
recipientinfolist = self.config.get_all_export_recipient_info()
# Restrict
if not all_recipients:
if all_push_recipients:
recipientinfolist = [r for r in recipientinfolist if r.push]
else:
# Specified by name
duplicates = [
name
for name, count in collections.Counter(
recipient_names
).items()
if count > 1
]
if duplicates:
raise ValueError(
f"Duplicate export recipients "
f"specified: {duplicates!r}"
)
valid_names = set(r.recipient_name for r in recipientinfolist)
bad_names = [
name for name in recipient_names if name not in valid_names
]
if bad_names:
raise ValueError(
f"Bad export recipients specified: {bad_names!r}. "
f"Valid recipients are: {valid_names!r}"
)
recipientinfolist = [
r
for r in recipientinfolist
if r.recipient_name in recipient_names
]
# Complete validation
for r in recipientinfolist:
r.validate(self)
# Does the caller want them as ExportRecipientInfo objects
if not database_versions:
return recipientinfolist
# Convert to SQLAlchemy ORM ExportRecipient objects:
recipients = [
ExportRecipient(x) for x in recipientinfolist
] # type: List[ExportRecipient]
final_recipients = [] # type: List[ExportRecipient]
dbsession = self.dbsession
def process_final_recipients(_save: bool) -> None:
for r in recipients:
other = ExportRecipient.get_existing_matching_recipient(
dbsession, r
)
if other:
# This other one matches, and is already in the database.
# Use it. But first...
for (
attrname
) in (
ExportRecipient.RECOPY_EACH_TIME_FROM_CONFIG_ATTRNAMES
):
setattr(other, attrname, getattr(r, attrname))
# OK.
final_recipients.append(other)
else:
# Our new object doesn't match. Use (+/- save) it.
if save:
log.debug(
"Creating new ExportRecipient record in database"
)
dbsession.add(r)
r.current = True
final_recipients.append(r)
if save:
lockfilename = (
self.config.get_master_export_recipient_lockfilename()
)
with lockfile.FileLock(
lockfilename, timeout=None
): # waits forever if necessary
process_final_recipients(_save=True)
else:
process_final_recipients(_save=False)
# OK
return final_recipients
[docs] def get_export_recipient(
self, recipient_name: str, save: bool = True
) -> "ExportRecipient":
"""
Returns a single validated export recipient, given its name.
Args:
recipient_name: recipient name
save: save any freshly created recipient records to the DB?
Returns:
list: of :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
Raises:
- :exc:`ValueError` if a name is invalid
- :exc:`camcops_server.cc_modules.cc_exportrecipient.InvalidExportRecipient`
if an export recipient configuration is invalid
""" # noqa
recipients = self.get_export_recipients([recipient_name], save=save)
assert len(recipients) == 1
return recipients[0]
@reify
def all_push_recipients(self) -> List["ExportRecipient"]:
"""
Cached for speed (will potentially be called for multiple tables in
a bulk upload).
"""
return self.get_export_recipients(
all_push_recipients=True,
save=False,
database_versions=True, # we need group ID info somehow
)
[docs] def add_export_push_request(
self, recipient_name: str, basetable: str, task_pk: int
) -> None:
"""
Adds a request to push a task to an export recipient.
The reason we use this slightly convoluted approach is because
otherwise, it's very easy to generate a backend request for a new task
before it's actually been committed (so the backend finds no task).
Args:
recipient_name: name of the recipient
basetable: name of the task's base table
task_pk: server PK of the task
"""
self._pending_export_push_requests.append(
(recipient_name, basetable, task_pk)
)
def _process_pending_export_push_requests(self) -> None:
"""
Sends pending export push requests to the backend.
Called after the COMMIT.
"""
from camcops_server.cc_modules.celery import (
export_task_backend,
) # delayed import
for (
recipient_name,
basetable,
task_pk,
) in self._pending_export_push_requests:
log.info(
"Submitting background job to export task {}.{} to {}",
basetable,
task_pk,
recipient_name,
)
export_task_backend.delay(
recipient_name=recipient_name,
basetable=basetable,
task_pk=task_pk,
)
# -------------------------------------------------------------------------
# User downloads
# -------------------------------------------------------------------------
@property
def user_download_dir(self) -> str:
"""
The directory in which this user's downloads should be/are stored, or a
blank string if user downloads are not available. Also ensures it
exists.
"""
if self.config.user_download_max_space_mb <= 0:
return ""
basedir = self.config.user_download_dir
if not basedir:
return ""
user_id = self.user_id
if user_id is None:
return ""
userdir = os.path.join(basedir, str(user_id))
mkdir_p(userdir)
return userdir
@property
def user_download_bytes_permitted(self) -> int:
"""
Amount of space the user is permitted.
"""
if not self.user_download_dir:
return 0
return self.config.user_download_max_space_mb * 1024 * 1024
@reify
def user_download_bytes_used(self) -> int:
"""
Returns the disk space used by this user.
"""
download_dir = self.user_download_dir
if not download_dir:
return 0
return get_directory_contents_size(download_dir)
@property
def user_download_bytes_available(self) -> int:
"""
Returns the available space for this user in their download area.
"""
permitted = self.user_download_bytes_permitted
used = self.user_download_bytes_used
available = permitted - used
return available
@property
def user_download_lifetime_duration(self) -> Duration:
"""
Returns the lifetime of user download objects.
"""
return Duration(minutes=self.config.user_download_file_lifetime_min)
# noinspection PyUnusedLocal
[docs]def complete_request_add_cookies(
req: CamcopsRequest, response: Response
) -> None:
"""
Finializes the response by adding session cookies.
See :meth:`CamcopsRequest.complete_request_add_cookies`.
"""
req.complete_request_add_cookies()
# =============================================================================
# Configurator
# =============================================================================
[docs]@contextmanager
def camcops_pyramid_configurator_context(
debug_toolbar: bool = False, static_cache_duration_s: int = 0
) -> Configurator:
"""
Context manager to create a Pyramid configuration context, for making
(for example) a WSGI server or a debugging request. That means setting up
things like:
- the authentication and authorization policies
- our request and session factories
- our Mako renderer
- our routes and views
Args:
debug_toolbar:
Add the Pyramid debug toolbar?
static_cache_duration_s:
Lifetime (in seconds) for the HTTP cache-control setting for
static content.
Returns:
a :class:`Configurator` object
Note this includes settings that transcend the config file.
Most things should be in the config file. This enables us to run multiple
configs (e.g. multiple CamCOPS databases) through the same process.
However, some things we need to know right now, to make the WSGI app.
Here, OS environment variables and command-line switches are appropriate.
"""
# -------------------------------------------------------------------------
# 1. Base app
# -------------------------------------------------------------------------
settings = { # Settings that can't be set directly?
"debug_authorization": DEBUG_AUTHORIZATION,
# ... see https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/security.html#debugging-view-authorization-failures # noqa
}
with Configurator(settings=settings) as config:
# ---------------------------------------------------------------------
# Authentication; authorizaion (permissions)
# ---------------------------------------------------------------------
authentication_policy = CamcopsAuthenticationPolicy()
config.set_authentication_policy(authentication_policy)
# Let's not use ACLAuthorizationPolicy, which checks an access control
# list for a resource hierarchy of objects, but instead:
authorization_policy = CamcopsAuthorizationPolicy()
config.set_authorization_policy(authorization_policy)
config.set_default_permission(Permission.HAPPY)
# ... applies to all SUBSEQUENT view configuration registrations
# ---------------------------------------------------------------------
# Factories
# ---------------------------------------------------------------------
config.set_request_factory(CamcopsRequest)
# ... for request attributes: config, database, etc.
config.set_session_factory(get_session_factory())
# ... for request.session
config.set_response_factory(camcops_response_factory)
# ---------------------------------------------------------------------
# Renderers
# ---------------------------------------------------------------------
camcops_add_mako_renderer(config, extension=".mako")
# deform_bootstrap.includeme(config)
# ---------------------------------------------------------------------
# Routes and accompanying views
# ---------------------------------------------------------------------
# Add static views
# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/assets.html#serving-static-assets # noqa
# Hmm. We cannot fail to set up a static file route, because otherwise
# we can't provide URLs to them.
static_filepath = STATIC_CAMCOPS_PACKAGE_PATH
static_name = RouteCollection.STATIC.route
log.debug(
"... including static files from {!r} at Pyramid static "
"name {!r}",
static_filepath,
static_name,
)
# ... does the name needs to start with "/" or the pattern "static/"
# will override the later "deform_static"? Not sure.
# We were doing this:
# config.add_static_view(name=static_name, path=static_filepath)
# But now we need to (a) add the
# "cache_max_age=static_cache_duration_s" argument, and (b) set the
# HTTP header 'Cache-Control: no-cache="Set-Cookie, Set-Cookie2"',
# for the ZAP penetration tester:
# ... https://cheatsheetseries.owasp.org/cheatsheets/Session_Management_Cheat_Sheet.html#web-content-caching # noqa
# We can do the former, but not the latter, via add_static_view(),
# because it sends its keyword arguments to add_route(), not the view
# creation. So, alternatives ways...
# - from https://github.com/Pylons/pyramid/issues/1486
# - and https://stackoverflow.com/questions/24854300/
# - to https://github.com/Pylons/pyramid/pull/2021
# - to https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-derivers # noqa
config.add_static_view(
name=static_name,
path=static_filepath,
cache_max_age=static_cache_duration_s,
)
# Add all the routes:
for pr in RouteCollection.all_routes():
if DEBUG_ADD_ROUTES:
suffix = (
f", pregenerator={pr.pregenerator}"
if pr.pregenerator
else ""
)
log.info("Adding route: {} -> {}{}", pr.route, pr.path, suffix)
config.add_route(pr.route, pr.path, pregenerator=pr.pregenerator)
# See also:
# https://stackoverflow.com/questions/19184612/how-to-ensure-urls-generated-by-pyramids-route-url-and-route-path-are-valid # noqa
# Routes added EARLIER have priority. So add this AFTER our custom
# bugfix:
config.add_static_view(
name="/deform_static",
path="deform:static/",
cache_max_age=static_cache_duration_s,
)
# Most views are using @view_config() which calls add_view().
# Scan for @view_config decorators, to map views to routes:
# https://docs.pylonsproject.org/projects/venusian/en/latest/api.html
config.scan(
"camcops_server.cc_modules", ignore=[re.compile("_tests$").search]
)
# ---------------------------------------------------------------------
# Add tweens (inner to outer)
# ---------------------------------------------------------------------
# We will use implicit positioning:
# - https://www.slideshare.net/aconrad/alex-conrad-pyramid-tweens-ploneconf-2011 # noqa
# config.add_tween('camcops_server.camcops.http_session_tween_factory')
# ---------------------------------------------------------------------
# Debug toolbar
# ---------------------------------------------------------------------
if debug_toolbar:
log.debug("Enabling Pyramid debug toolbar")
config.include("pyramid_debugtoolbar") # BEWARE! SIDE EFFECTS
# ... Will trigger an import that hooks events into all
# SQLAlchemy queries. There's a bug somewhere relating to that;
# see notes below relating to the "mergedb" function.
config.add_route(
RouteCollection.DEBUG_TOOLBAR.route,
RouteCollection.DEBUG_TOOLBAR.path,
)
yield config
# =============================================================================
# Debugging requests
# =============================================================================
[docs]def make_post_body_from_dict(
d: Dict[str, str], encoding: str = "utf8"
) -> bytes:
"""
Makes an HTTP POST body from a dictionary.
For debugging HTTP requests.
It mimics how the tablet operates.
"""
# https://docs.pylonsproject.org/projects/pyramid-cookbook/en/latest/testing/testing_post_curl.html # noqa
txt = urllib.parse.urlencode(query=d)
# ... this encoding mimics how the tablet operates
body = txt.encode(encoding)
return body
[docs]class CamcopsDummyRequest(CamcopsRequest, DummyRequest):
"""
Request class that allows manual manipulation of GET/POST parameters
for debugging.
It reads its config (on first demand) from the config file specified in
``os.environ[ENVVAR_CONFIG_FILE]``.
Notes:
- The important base class is :class:`webob.request.BaseRequest`.
- ``self.params`` is a :class:`NestedMultiDict` (see
``webob/multidict.py``); these are intrinsically read-only.
- ``self.params`` is also a read-only property. When read, it combines
data from ``self.GET`` and ``self.POST``.
- What we do here is to manipulate the underlying GET/POST data.
"""
_CACHE_KEY = "webob._parsed_query_vars"
_QUERY_STRING_KEY = "QUERY_STRING"
# def __init__(self, *args, **kwargs) -> None:
# super().__init__(*args, **kwargs)
# # Just a technique worth noting:
# #
# # self._original_params_property = CamcopsRequest.params # type: property # noqa
# # self._original_params = self._original_params_property.fget(self) # type: NestedMultiDict # noqa
# # self._fake_params = self._original_params.copy() # type: MultiDict
# # if params:
# # self._fake_params.update(params)
#
# @property
# def params(self):
# log.debug(repr(self._fake_params))
# return self._fake_params
# # Returning the member object allows clients to call
# # dummyreq.params.update(...)
#
# @params.setter
# def params(self, value):
# self._fake_params = value
[docs] def set_method_get(self) -> None:
"""
Sets the fictional request method to GET.
"""
self.method = HttpMethod.GET
[docs] def set_method_post(self) -> None:
"""
Sets the fictional request method to POST.
"""
self.method = HttpMethod.POST
[docs] def clear_get_params(self) -> None:
"""
Clear all GET parameters.
"""
env = self.environ
if self._CACHE_KEY in env:
del env[self._CACHE_KEY]
env[self._QUERY_STRING_KEY] = ""
[docs] def add_get_params(
self, d: Dict[str, str], set_method_get: bool = True
) -> None:
"""
Add GET parameters.
Args:
d: dictionary of ``{parameter: value}`` pairs.
set_method_get: also set the request's method to GET?
"""
if not d:
return
# webob.request.BaseRequest.GET reads from self.environ['QUERY_STRING']
paramdict = self.GET.copy() # type: MultiDict
paramdict.update(d)
env = self.environ
# Delete the cached version.
if self._CACHE_KEY in env:
del env[self._CACHE_KEY]
# Write the new version
env[self._QUERY_STRING_KEY] = urllib.parse.urlencode(query=paramdict)
if set_method_get:
self.set_method_get()
[docs] def set_get_params(
self, d: Dict[str, str], set_method_get: bool = True
) -> None:
"""
Clear any GET parameters, and then set them to new values.
See :func:`add_get_params`.
"""
self.clear_get_params()
self.add_get_params(d, set_method_get=set_method_get)
[docs] def set_post_body(self, body: bytes, set_method_post: bool = True) -> None:
"""
Sets the fake POST body.
Args:
body: the body to set
set_method_post: also set the request's method to POST?
"""
log.debug("Applying fake POST body: {!r}", body)
self.body = body
self.content_length = len(body)
if set_method_post:
self.set_method_post()
[docs] def fake_request_post_from_dict(
self,
d: Dict[str, str],
encoding: str = "utf8",
set_method_post: bool = True,
) -> None:
"""
Sets the request's POST body according to a dictionary.
Args:
d: dictionary of ``{parameter: value}`` pairs.
encoding: character encoding to use
set_method_post: also set the request's method to POST?
"""
# webob.request.BaseRequest.POST reads from 'body' (indirectly).
body = make_post_body_from_dict(d, encoding=encoding)
self.set_post_body(body, set_method_post=set_method_post)
_ = """
# A demonstration of the manipulation of superclass properties:
class Test(object):
def __init__(self):
self.a = 3
@property
def b(self):
return 4
class Derived(Test):
def __init__(self):
super().__init__()
self._superclass_b = super().b
self._b = 4
@property
def b(self):
print("Superclass b: {}".format(self._superclass_b.fget(self)))
print("Self _b: {}".format(self._b))
return self._b
@b.setter
def b(self, value):
self._b = value
x = Test()
x.a # 3
x.a = 5
x.a # 5
x.b # 4
x.b = 6 # can't set attribute
y = Derived()
y.a # 3
y.a = 5
y.a # 5
y.b # 4
y.b = 6
y.b # 6
"""
[docs]def get_core_debugging_request() -> CamcopsDummyRequest:
"""
Returns a basic :class:`CamcopsDummyRequest`.
It reads its config (on first demand) from the config file specified in
``os.environ[ENVVAR_CONFIG_FILE]``.
"""
with camcops_pyramid_configurator_context(debug_toolbar=False) as pyr_cfg:
req = CamcopsDummyRequest(
environ={
# In URL sequence:
WsgiEnvVar.WSGI_URL_SCHEME: "http",
WsgiEnvVar.SERVER_NAME: "127.0.0.1",
WsgiEnvVar.SERVER_PORT: "8000",
WsgiEnvVar.SCRIPT_NAME: "",
WsgiEnvVar.PATH_INFO: "/",
} # environ parameter: goes to pyramid.testing.DummyRequest.__init__ # noqa
)
# ... must pass an actual dict to the "environ" parameter; os.environ
# itself isn't OK ("TypeError: WSGI environ must be a dict; you passed
# environ({'key1': 'value1', ...})
# Being a CamcopsRequest, this object will read a config file from
# os.environ[ENVVAR_CONFIG_FILE] -- not the environ dictionary above --
# when needed. That means we can now rewrite some of these URL
# components to give a valid external URL, if the config has the right
# information.
cfg = req.config
req.environ[WsgiEnvVar.WSGI_URL_SCHEME] = cfg.external_url_scheme
req.environ[WsgiEnvVar.SERVER_NAME] = cfg.external_server_name
req.environ[WsgiEnvVar.SERVER_PORT] = cfg.external_server_port
req.environ[WsgiEnvVar.SCRIPT_NAME] = cfg.external_script_name
# PATH_INFO remains "/"
req.registry = pyr_cfg.registry
pyr_cfg.begin(request=req)
return req
[docs]def get_command_line_request(user_id: int = None) -> CamcopsRequest:
"""
Creates a dummy CamcopsRequest for use on the command line.
By default, it does so for the system user. Optionally, you can specify a
user by their ID number.
- Presupposes that ``os.environ[ENVVAR_CONFIG_FILE]`` has been set, as it
is in :func:`camcops_server.camcops.main`.
**WARNING:** this does not provide a COMMIT/ROLLBACK context. If you use
this directly, you must manage that yourself. Consider using
:func:`command_line_request_context` instead.
"""
log.debug(f"Creating command-line pseudo-request (user_id={user_id})")
req = get_core_debugging_request()
# If we proceed with an out-of-date database, we will have problems, and
# those problems may not be immediately apparent, which is bad. So:
req.config.assert_database_ok()
# Ensure we have a user
if user_id is None:
req._debugging_user = User.get_system_user(req.dbsession)
else:
req._debugging_user = User.get_user_by_id(req.dbsession, user_id)
log.debug(
"Command-line request: external URL is {}", req.route_url(Routes.HOME)
)
return req
[docs]@contextmanager
def command_line_request_context(
user_id: int = None,
) -> Generator[CamcopsRequest, None, None]:
"""
Request objects are ubiquitous, and allow code to refer to the HTTP
request, config, HTTP session, database session, and so on. Here we make
a special sort of request for use from the command line, and provide it
as a context manager that will COMMIT the database afterwards (because the
normal method, via the Pyramid router, is unavailable).
"""
req = get_command_line_request(user_id=user_id)
yield req
# noinspection PyProtectedMember
req._finish_dbsession()
[docs]def get_unittest_request(
dbsession: SqlASession, params: Dict[str, Any] = None
) -> CamcopsDummyRequest:
"""
Creates a :class:`CamcopsDummyRequest` for use by unit tests.
- Points to an existing database (e.g. SQLite in-memory database).
- Presupposes that ``os.environ[ENVVAR_CONFIG_FILE]`` has been set, as it
is in :func:`camcops_server.camcops.main`.
"""
log.debug("Creating unit testing pseudo-request")
req = get_core_debugging_request()
req.set_get_params(params)
req._debugging_db_session = dbsession
user = User()
user.superuser = True
req._debugging_user = user
return req