Source code for camcops_server.cc_modules.cc_session

"""
camcops_server/cc_modules/cc_session.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Implements sessions for web clients (humans).**

"""

import datetime
import logging
from typing import Any, Optional, TYPE_CHECKING

from cardinal_pythonlib.datetimefunc import (
    format_datetime,
    pendulum_to_utc_datetime_without_tz,
)
from cardinal_pythonlib.reprfunc import simple_repr
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.randomness import create_base64encoded_randomness
from cardinal_pythonlib.sqlalchemy.orm_query import CountStarSpecializedQuery
from pendulum import DateTime as Pendulum
from pyramid.interfaces import ISession
from sqlalchemy.orm import (
    Mapped,
    mapped_column,
    relationship,
    Session as SqlASession,
)
from sqlalchemy.sql.schema import ForeignKey

from camcops_server.cc_modules.cc_constants import DateFormat
from camcops_server.cc_modules.cc_pyramid import CookieKey
from camcops_server.cc_modules.cc_sqla_coltypes import (
    IPAddressColType,
    JsonColType,
    SessionTokenColType,
)
from camcops_server.cc_modules.cc_sqlalchemy import Base, MutableDict
from camcops_server.cc_modules.cc_taskfilter import TaskFilter
from camcops_server.cc_modules.cc_user import User

if TYPE_CHECKING:
    from camcops_server.cc_modules.cc_request import CamcopsRequest
    from camcops_server.cc_modules.cc_tabletsession import TabletSession

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Debugging options
# =============================================================================

DEBUG_CAMCOPS_SESSION_CREATION = False

if DEBUG_CAMCOPS_SESSION_CREATION:
    log.warning("Debugging options enabled!")


# =============================================================================
# Constants
# =============================================================================

DEFAULT_NUMBER_OF_TASKS_TO_VIEW = 25


# =============================================================================
# Security for web sessions
# =============================================================================


[docs]def generate_token(num_bytes: int = 16) -> str: """ Make a new session token that's not in use. It doesn't matter if it's already in use by a session with a different ID, because the ID/token pair is unique. (Removing that constraint gets rid of an in-principle-but-rare locking problem.) """ # http://stackoverflow.com/questions/817882/unique-session-id-in-python return create_base64encoded_randomness(num_bytes)
# ============================================================================= # Session class # =============================================================================
[docs]class CamcopsSession(Base): """ Class representing an HTTPS session. """ __tablename__ = "_security_webviewer_sessions" # no TEXT fields here; this is a performance-critical table id: Mapped[int] = mapped_column( primary_key=True, autoincrement=True, index=True, comment="Session ID (internal number for insertion speed)", ) token: Mapped[Optional[str]] = mapped_column( SessionTokenColType, comment="Token (base 64 encoded random number)", ) ip_address: Mapped[Optional[str]] = mapped_column( IPAddressColType, comment="IP address of user" ) user_id: Mapped[Optional[int]] = mapped_column( ForeignKey("_security_users.id", ondelete="CASCADE"), # https://docs.sqlalchemy.org/en/latest/core/constraints.html#on-update-and-on-delete # noqa comment="User ID", ) last_activity_utc: Mapped[Optional[datetime.datetime]] = mapped_column( comment="Date/time of last activity (UTC)", ) number_to_view: Mapped[Optional[int]] = mapped_column( comment="Number of records to view" ) task_filter_id: Mapped[Optional[int]] = mapped_column( ForeignKey("_task_filters.id"), comment="Task filter ID", ) is_api_session: Mapped[Optional[bool]] = mapped_column( default=False, comment="This session is using the client API (not a human browsing).", ) form_state: Mapped[Optional[Any]] = mapped_column( "form_state", MutableDict.as_mutable(JsonColType), comment=( "Any state that needs to be saved temporarily during " "wizard-style form submission" ), ) user = relationship("User", lazy="joined", foreign_keys=[user_id]) task_filter = relationship( "TaskFilter", foreign_keys=[task_filter_id], cascade="all, delete-orphan", single_parent=True, ) # ... "save-update, merge" is the default. We are adding "delete", which # means that when this CamcopsSession is deleted, the corresponding # TaskFilter will be deleted as well. See # https://docs.sqlalchemy.org/en/latest/orm/cascades.html#delete # ... 2020-09-22: changed to "all, delete-orphan" and single_parent=True # https://docs.sqlalchemy.org/en/13/orm/cascades.html#cascade-delete-orphan # https://docs.sqlalchemy.org/en/13/errors.html#error-bbf0 # ------------------------------------------------------------------------- # Basic info # ------------------------------------------------------------------------- def __repr__(self) -> str: return simple_repr( self, [ "id", "token", "ip_address", "user_id", "last_activity_utc_iso", "user", ], with_addr=True, ) @property def last_activity_utc_iso(self) -> str: """ Returns a formatted version of the date/time at which the last activity took place for this session. """ return format_datetime(self.last_activity_utc, DateFormat.ISO8601) # ------------------------------------------------------------------------- # Creating sessions # -------------------------------------------------------------------------
[docs] @classmethod def get_session_using_cookies( cls, req: "CamcopsRequest" ) -> "CamcopsSession": """ Makes, or retrieves, a new :class:`camcops_server.cc_modules.cc_session.CamcopsSession` for this Pyramid Request. The session is found using the ID/token information in the request's cookies. """ pyramid_session = req.session # type: ISession # noinspection PyArgumentList session_id_str = pyramid_session.get(CookieKey.SESSION_ID, "") # noinspection PyArgumentList session_token = pyramid_session.get(CookieKey.SESSION_TOKEN, "") return cls.get_session(req, session_id_str, session_token)
[docs] @classmethod def get_session_for_tablet(cls, ts: "TabletSession") -> "CamcopsSession": """ For a given :class:`camcops_server.cc_modules.cc_tabletsession.TabletSession` (used by tablet client devices), returns a corresponding :class:`camcops_server.cc_modules.cc_session.CamcopsSession`. This also performs user authorization. User authentication is via the :class:`camcops_server.cc_modules.cc_session.CamcopsSession`. """ session = cls.get_session( req=ts.req, session_id_str=ts.session_id, # type: ignore[arg-type] session_token=ts.session_token, ) if not session.user: session._login_from_ts(ts) elif session.user and session.user.username != ts.username: # We found a session, and it's associated with a user, but with # the wrong user. This is unlikely to happen! # Wipe the old one: req = ts.req session.logout() # Create a fresh session. session = cls.get_session( req=req, session_id_str=None, session_token=None ) session._login_from_ts(ts) return session
def _login_from_ts(self, ts: "TabletSession") -> None: """ Used by :meth:`get_session_for_tablet` to log in using information provided by a :class:`camcops_server.cc_modules.cc_tabletsession.TabletSession`. """ if DEBUG_CAMCOPS_SESSION_CREATION: log.debug( "Considering login from tablet (with username: {!r}", ts.username, ) self.is_api_session = True if ts.username: user = User.get_user_from_username_password( ts.req, ts.username, ts.password ) if DEBUG_CAMCOPS_SESSION_CREATION: log.debug("... looked up User: {!r}", user) if user: # Successful login of sorts, ALTHOUGH the user may be # severely restricted (if they can neither register nor # upload). However, effecting a "login" here means that the # error messages can become more helpful! self.login(user) if DEBUG_CAMCOPS_SESSION_CREATION: log.debug("... final session user: {!r}", self.user)
[docs] @classmethod def get_session( cls, req: "CamcopsRequest", session_id_str: Optional[str], session_token: Optional[str], ) -> "CamcopsSession": """ Retrieves, or makes, a new :class:`camcops_server.cc_modules.cc_session.CamcopsSession` for this Pyramid Request, given a specific ``session_id`` and ``session_token``. """ if DEBUG_CAMCOPS_SESSION_CREATION: log.debug( "CamcopsSession.get_session: session_id_str={!r}, " "session_token={!r}", session_id_str, session_token, ) # --------------------------------------------------------------------- # Starting variables # --------------------------------------------------------------------- try: session_id = int(session_id_str) except (TypeError, ValueError): session_id = None dbsession = req.dbsession ip_addr = req.remote_addr now = req.now_utc # --------------------------------------------------------------------- # Fetch or create # --------------------------------------------------------------------- if session_id and session_token: oldest_permitted = cls.get_oldest_last_activity_allowed(req) query = ( dbsession.query(cls) .filter(cls.id == session_id) .filter(cls.token == session_token) .filter(cls.last_activity_utc >= oldest_permitted) ) if req.config.session_check_user_ip: # Binding the session to the IP address can cause problems if # the IP address changes before the session times out. A load # balancer may cause this. query = query.filter(cls.ip_address == ip_addr) candidate = query.first() # type: Optional[CamcopsSession] if DEBUG_CAMCOPS_SESSION_CREATION: if candidate is None: log.debug("Session not found in database") else: if DEBUG_CAMCOPS_SESSION_CREATION: log.debug("Session ID and/or session token is missing.") candidate = None found = candidate is not None if found: candidate.last_activity_utc = now if DEBUG_CAMCOPS_SESSION_CREATION: log.debug("Committing for last_activity_utc") dbsession.commit() # avoid holding a lock, 2019-03-21 ccsession = candidate else: new_http_session = cls(ip_addr=ip_addr, last_activity_utc=now) dbsession.add(new_http_session) if DEBUG_CAMCOPS_SESSION_CREATION: log.debug( "Creating new CamcopsSession: {!r}", new_http_session ) # But we DO NOT FLUSH and we DO NOT SET THE COOKIES YET, because # we might hot-swap the session. # See complete_request_add_cookies(). ccsession = new_http_session return ccsession
[docs] @classmethod def get_oldest_last_activity_allowed( cls, req: "CamcopsRequest" ) -> Pendulum: """ What is the latest time that the last activity (for a session) could have occurred, before the session would have timed out? Calculated as ``now - session_timeout``. """ cfg = req.config now = req.now_utc oldest_last_activity_allowed = now - cfg.session_timeout return oldest_last_activity_allowed
[docs] @classmethod def delete_old_sessions(cls, req: "CamcopsRequest") -> None: """ Delete all expired sessions. """ oldest_last_activity_allowed = cls.get_oldest_last_activity_allowed( req ) dbsession = req.dbsession log.debug("Deleting expired sessions") dbsession.query(cls).filter( cls.last_activity_utc < oldest_last_activity_allowed ).delete(synchronize_session=False) # 2020-09-22: The cascade-delete to TaskFilter (see above) isn't # working, even without synchronize_session=False, and even after # adding delete-orphan and single_parent=True. So: subquery_active_taskfilter_ids = dbsession.query(cls.task_filter_id) dbsession.query(TaskFilter).filter( TaskFilter.id.notin_(subquery_active_taskfilter_ids) ).delete(synchronize_session=False)
@classmethod def n_sessions_active_since( cls, req: "CamcopsRequest", when: Pendulum ) -> int: when_utc = pendulum_to_utc_datetime_without_tz(when) q = CountStarSpecializedQuery(cls, session=req.dbsession).filter( # type: ignore[arg-type] # noqa: E501 cls.last_activity_utc >= when_utc ) return q.count_star() def __init__( self, ip_addr: str = None, last_activity_utc: Pendulum = None, **kwargs: Any ): """ Args: ip_addr: client IP address last_activity_utc: date/time of last activity that occurred """ super().__init__(**kwargs) self.token = generate_token() self.ip_address = ip_addr self.last_activity_utc = last_activity_utc # ------------------------------------------------------------------------- # User info and login/logout # ------------------------------------------------------------------------- @property def username(self) -> Optional[str]: """ Returns the user's username, or ``None``. """ if self.user: return self.user.username return None
[docs] def logout(self) -> None: """ Log out, wiping session details. """ self.user_id = None self.token = "" # so there's no way this token can be re-used
[docs] def login(self, user: User) -> None: """ Log in. Associates the user with the session and makes a new token. 2021-05-01: If this is an API session, we don't interfere with other sessions. But if it is a human logging in, we log out any other non-API sessions from the same user (per security recommendations: one session per authenticated user -- with exceptions that we make for API sessions). """ if DEBUG_CAMCOPS_SESSION_CREATION: log.debug( "Session {} login: username={!r}", self.id, user.username ) self.user = user # will set our user_id FK self.token = generate_token() # fresh token: https://www.owasp.org/index.php/Session_fixation if not self.is_api_session: # Log out any other sessions from the same user. # NOTE that "self" may not have been flushed to the database yet, # so self.id may be None. dbsession = SqlASession.object_session(self) assert dbsession, "No dbsession for a logged-in CamcopsSession" query = ( dbsession.query(CamcopsSession).filter( CamcopsSession.user_id == user.id ) # ... "same user" .filter(CamcopsSession.is_api_session == False) # noqa: E712 # ... "human webviewer sessions" .filter(CamcopsSession.id != self.id) # ... "not this session". # If we have an ID, this will find sessions with a different # ID. If we don't have an ID, that will equate to # "CamcopsSession.id != None", which will translate in SQL to # "id IS NOT NULL", as per # https://docs.sqlalchemy.org/en/14/core/sqlelement.html#sqlalchemy.sql.expression.ColumnElement.__ne__ # noqa ) query.delete(synchronize_session=False)
# ------------------------------------------------------------------------- # Filters # -------------------------------------------------------------------------
[docs] def get_task_filter(self) -> TaskFilter: """ Returns the :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter` in use for this session. """ if not self.task_filter: dbsession = SqlASession.object_session(self) assert dbsession, ( "CamcopsSession.get_task_filter() called on a CamcopsSession " "that's not yet in a database session" ) self.task_filter = TaskFilter() dbsession.add(self.task_filter) return self.task_filter