#!/usr/bin/env python
"""
camcops_server/cc_modules/cc_user.py
===============================================================================
Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CamCOPS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**CamCOPS users.**
"""
import datetime
import logging
import re
from typing import List, Optional, Set, Tuple, TYPE_CHECKING
import cardinal_pythonlib.crypto as rnc_crypto
from cardinal_pythonlib.datetimefunc import convert_datetime_to_local
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.reprfunc import simple_repr
from cardinal_pythonlib.sqlalchemy.orm_query import (
CountStarSpecializedQuery,
exists_orm,
)
from pendulum import DateTime as Pendulum
import phonenumbers
import pyotp
from sqlalchemy import text
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import relationship, Session as SqlASession, Query
from sqlalchemy.sql import false
from sqlalchemy.sql.expression import and_, exists, not_
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.schema import Column, ForeignKey
from sqlalchemy.sql.sqltypes import Boolean, DateTime, Integer
from camcops_server.cc_modules.cc_audit import audit
from camcops_server.cc_modules.cc_constants import (
MfaMethod,
OBSCURE_EMAIL_ASTERISKS,
OBSCURE_PHONE_ASTERISKS,
USER_NAME_FOR_SYSTEM,
)
from camcops_server.cc_modules.cc_group import Group
from camcops_server.cc_modules.cc_membership import UserGroupMembership
from camcops_server.cc_modules.cc_sqla_coltypes import (
Base32ColType,
EmailAddressColType,
FullNameColType,
HashedPasswordColType,
LanguageCodeColType,
MfaMethodColType,
PendulumDateTimeAsIsoTextColType,
PhoneNumberColType,
UserNameCamcopsColType,
)
from camcops_server.cc_modules.cc_sqlalchemy import Base
from camcops_server.cc_modules.cc_text import TERMS_CONDITIONS_UPDATE_DATE
if TYPE_CHECKING:
from camcops_server.cc_modules.cc_patient import Patient
from camcops_server.cc_modules.cc_request import CamcopsRequest
log = BraceStyleAdapter(logging.getLogger(__name__))
# =============================================================================
# Constants
# =============================================================================
_TYPE_LUGM = List[UserGroupMembership]
VALID_USERNAME_REGEX = "^[A-Za-z0-9_-]+$"
BCRYPT_DEFAULT_LOG_ROUNDS = 6
# Default is 12, but it does impact on the tablet upload speed (cost per
# transaction). Time is expected to be proportional to 2^n, i.e. incrementing 1
# increases time by a factor of 2.
# Empirically, on egret:
# 2^12 rounds takes around 400 ms
# 2^8 rounds takes around 30 ms (as expected, 1/16 of the time as for 12)
# we'd like around 8 ms; http://security.stackexchange.com/questions/17207
# ... so we should be using 12 + log(8/400)/log(2) = 6 rounds
CLEAR_DUMMY_LOGIN_FREQUENCY_DAYS = 7
CLEAR_DUMMY_LOGIN_PERIOD = datetime.timedelta(
days=CLEAR_DUMMY_LOGIN_FREQUENCY_DAYS
)
# =============================================================================
# SecurityAccountLockout
# =============================================================================
# Note that we record login failures for non-existent users, and pretend
# they're locked out (to prevent username discovery that way, by timing)
[docs]class SecurityAccountLockout(Base):
"""
Represents an account "lockout".
"""
__tablename__ = "_security_account_lockouts"
id = Column("id", Integer, primary_key=True, autoincrement=True)
username = Column(
"username",
UserNameCamcopsColType,
nullable=False,
index=True,
comment="User name (which may be a non-existent user, to prevent "
"subtle username discovery by careful timing)",
)
locked_until = Column(
"locked_until",
DateTime,
nullable=False,
index=True,
comment="Account is locked until (UTC)",
)
[docs] @classmethod
def delete_old_account_lockouts(cls, req: "CamcopsRequest") -> None:
"""
Delete all expired account lockouts.
"""
dbsession = req.dbsession
now = req.now_utc
dbsession.query(cls).filter(cls.locked_until <= now).delete(
synchronize_session=False
)
[docs] @classmethod
def is_user_locked_out(cls, req: "CamcopsRequest", username: str) -> bool:
"""
Is the specified user locked out?
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
username: the user's username
"""
dbsession = req.dbsession
now = req.now_utc
return exists_orm(
dbsession, cls, cls.username == username, cls.locked_until > now
)
[docs] @classmethod
def user_locked_out_until(
cls, req: "CamcopsRequest", username: str
) -> Optional[Pendulum]:
"""
When is the user locked out until?
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
username: the user's username
Returns:
Pendulum datetime in local timezone (or ``None`` if not
locked out).
"""
dbsession = req.dbsession
now = req.now_utc
locked_until_utc = (
dbsession.query(func.max(cls.locked_until))
.filter(cls.username == username)
.filter(cls.locked_until > now)
.scalar()
) # type: Optional[Pendulum]
# ... NOT first(), which returns (result,); we want just result
if not locked_until_utc:
return None
return convert_datetime_to_local(locked_until_utc)
[docs] @classmethod
def lock_user_out(
cls, req: "CamcopsRequest", username: str, lockout_minutes: int
) -> None:
"""
Lock user out for a specified number of minutes.
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
username: the user's username
lockout_minutes: number of minutes
"""
dbsession = req.dbsession
now = req.now_utc
lock_until = now + datetime.timedelta(minutes=lockout_minutes)
# noinspection PyArgumentList
lock = cls(username=username, locked_until=lock_until)
dbsession.add(lock)
audit(
req, f"Account {username} locked out for {lockout_minutes} minutes"
)
[docs] @classmethod
def unlock_user(cls, req: "CamcopsRequest", username: str) -> None:
"""
Unlock a user.
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
username: the user's username
"""
dbsession = req.dbsession
dbsession.query(cls).filter(cls.username == username).delete(
synchronize_session=False
)
# =============================================================================
# SecurityLoginFailure
# =============================================================================
[docs]class SecurityLoginFailure(Base):
"""
Represents a record of a failed login.
Too many failed logins lead to a lockout; see
:class:`SecurityAccountLockout`.
"""
__tablename__ = "_security_login_failures"
id = Column("id", Integer, primary_key=True, autoincrement=True)
username = Column(
"username",
UserNameCamcopsColType,
nullable=False,
index=True,
comment="User name (which may be a non-existent user, to prevent "
"subtle username discovery by careful timing)",
)
login_failure_at = Column(
"login_failure_at",
DateTime,
nullable=False,
index=True,
comment="Login failure occurred at (UTC)",
)
[docs] @classmethod
def record_login_failure(
cls, req: "CamcopsRequest", username: str
) -> None:
"""
Record that a user has failed to log in.
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
username: the user's username
"""
dbsession = req.dbsession
now = req.now_utc
# noinspection PyArgumentList
failure = cls(username=username, login_failure_at=now)
dbsession.add(failure)
[docs] @classmethod
def act_on_login_failure(
cls, req: "CamcopsRequest", username: str
) -> None:
"""
Record login failure and lock out user if necessary.
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
username: the user's username
"""
cfg = req.config
audit(req, f"Failed login as user: {username}")
cls.record_login_failure(req, username)
nfailures = cls.how_many_login_failures(req, username)
nlockouts = nfailures // cfg.lockout_threshold
nfailures_since_last_lockout = nfailures % cfg.lockout_threshold
if nlockouts >= 1 and nfailures_since_last_lockout == 0:
# new lockout required
lockout_minutes = (
nlockouts * cfg.lockout_duration_increment_minutes
)
SecurityAccountLockout.lock_user_out(
req, username, lockout_minutes
)
[docs] @classmethod
def clear_login_failures(
cls, req: "CamcopsRequest", username: str
) -> None:
"""
Clear login failures for a user.
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
username: the user's username
"""
dbsession = req.dbsession
dbsession.query(cls).filter(cls.username == username).delete(
synchronize_session=False
)
[docs] @classmethod
def how_many_login_failures(
cls, req: "CamcopsRequest", username: str
) -> int:
"""
How many times has the user tried and failed to log in (recently)?
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
username: the user's username
"""
dbsession = req.dbsession
q = CountStarSpecializedQuery([cls], session=dbsession).filter(
cls.username == username
)
return q.count_star()
[docs] @classmethod
def enable_user(cls, req: "CamcopsRequest", username: str) -> None:
"""
Unlock user and clear login failures.
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
username: the user's username
"""
SecurityAccountLockout.unlock_user(req, username)
cls.clear_login_failures(req, username)
audit(req, f"User {username} re-enabled")
[docs] @classmethod
def clear_login_failures_for_nonexistent_users(
cls, req: "CamcopsRequest"
) -> None:
"""
Clear login failures for nonexistent users.
Login failues are recorded for nonexistent users to mimic the lockout
seen for real users, i.e. to reduce the potential for username
discovery.
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
"""
dbsession = req.dbsession
all_user_names = dbsession.query(User.username)
dbsession.query(cls).filter(
cls.username.notin_(all_user_names)
).delete(synchronize_session=False)
# https://stackoverflow.com/questions/26182027/how-to-use-not-in-clause-in-sqlalchemy-orm-query # noqa
[docs] @classmethod
def clear_dummy_login_failures_if_necessary(
cls, req: "CamcopsRequest"
) -> None:
"""
Clear dummy login failures if we haven't done so for a while.
Not too often! See :data:`CLEAR_DUMMY_LOGIN_PERIOD`.
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
"""
now = req.now_utc
ss = req.server_settings
last_dummy_login_failure_clearance = (
ss.get_last_dummy_login_failure_clearance_pendulum()
)
if last_dummy_login_failure_clearance is not None:
elapsed = now - last_dummy_login_failure_clearance
if elapsed < CLEAR_DUMMY_LOGIN_PERIOD:
# We cleared it recently.
return
cls.clear_login_failures_for_nonexistent_users(req)
log.debug("Dummy login failures cleared.")
ss.last_dummy_login_failure_clearance_at_utc = now
# =============================================================================
# User class
# =============================================================================
[docs]class User(Base):
"""
Class representing a user.
"""
__tablename__ = "_security_users"
# -------------------------------------------------------------------------
# Columns
# -------------------------------------------------------------------------
id = Column(
"id",
Integer,
primary_key=True,
autoincrement=True,
index=True,
comment="User ID",
)
username = Column(
"username",
UserNameCamcopsColType,
nullable=False,
index=True,
unique=True,
comment="User name",
) # type: str
fullname = Column("fullname", FullNameColType, comment="User's full name")
email = Column(
"email", EmailAddressColType, comment="User's e-mail address"
)
phone_number = Column(
"phone_number", PhoneNumberColType, comment="User's phone number"
)
hashedpw = Column(
"hashedpw",
HashedPasswordColType,
nullable=False,
comment="Password hash",
)
mfa_secret_key = Column(
"mfa_secret_key",
Base32ColType,
nullable=True,
comment="Secret key used for multi-factor authentication",
)
mfa_method = Column(
"mfa_method",
MfaMethodColType,
nullable=False,
server_default=MfaMethod.NO_MFA,
comment="Preferred method of multi-factor authentication",
)
hotp_counter = Column(
"hotp_counter",
Integer,
nullable=False,
server_default=text("0"),
comment="Counter used for HOTP authentication",
)
last_login_at_utc = Column(
"last_login_at_utc",
DateTime,
comment="Date/time this user last logged in (UTC)",
)
last_password_change_utc = Column(
"last_password_change_utc",
DateTime,
comment="Date/time this user last changed their password (UTC)",
)
superuser = Column(
"superuser", Boolean, default=False, comment="Superuser?"
)
must_change_password = Column(
"must_change_password",
Boolean,
default=False,
comment="Must change password at next webview login",
)
when_agreed_terms_of_use = Column(
"when_agreed_terms_of_use",
PendulumDateTimeAsIsoTextColType,
comment="Date/time this user acknowledged the Terms and "
"Conditions of Use (ISO 8601)",
)
upload_group_id = Column(
"upload_group_id",
Integer,
ForeignKey("_security_groups.id"),
comment="ID of the group to which this user uploads at present",
# OK to be NULL in the database, but the user will not be able to
# upload while it is.
)
language = Column(
"language",
LanguageCodeColType,
comment="Language code preferred by this user",
)
auto_generated = Column(
"auto_generated",
Boolean,
nullable=False,
default=False,
comment="Is automatically generated user with random password",
)
single_patient_pk = Column(
"single_patient_pk",
Integer,
ForeignKey("patient._pk", ondelete="SET NULL", use_alter=True),
comment="For users locked to a single patient, the server PK of the "
"server-created patient with which they are associated",
)
# -------------------------------------------------------------------------
# Relationships
# -------------------------------------------------------------------------
user_group_memberships = relationship(
"UserGroupMembership", back_populates="user"
) # type: _TYPE_LUGM
groups = association_proxy(
"user_group_memberships", "group"
) # type: List[Group]
upload_group = relationship(
"Group", foreign_keys=[upload_group_id]
) # type: Optional[Group]
single_patient = relationship(
"Patient", foreign_keys=[single_patient_pk]
) # type: Optional[Patient]
# -------------------------------------------------------------------------
# __init__
# -------------------------------------------------------------------------
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
# Prevent Python None from being converted to database string 'none'.
self.mfa_method = kwargs.get("mfa_method", MfaMethod.NO_MFA)
# -------------------------------------------------------------------------
# String representations
# -------------------------------------------------------------------------
def __repr__(self) -> str:
return simple_repr(
self, ["id", "username", "fullname"], with_addr=True
)
# -------------------------------------------------------------------------
# Lookup methods
# -------------------------------------------------------------------------
[docs] @classmethod
def get_user_by_id(
cls, dbsession: SqlASession, user_id: Optional[int]
) -> Optional["User"]:
"""
Returns a User from their integer ID, or ``None``.
"""
if user_id is None:
return None
return dbsession.query(cls).filter(cls.id == user_id).first()
[docs] @classmethod
def get_user_by_name(
cls, dbsession: SqlASession, username: str
) -> Optional["User"]:
"""
Returns a User from their username, or ``None``.
"""
if not username:
return None
return dbsession.query(cls).filter(cls.username == username).first()
[docs] @classmethod
def user_exists(cls, req: "CamcopsRequest", username: str) -> bool:
"""
Does a user exist with this username?
"""
if not username:
return False
dbsession = req.dbsession
return exists_orm(dbsession, cls, cls.username == username)
[docs] @classmethod
def create_superuser(
cls, req: "CamcopsRequest", username: str, password: str
) -> bool:
"""
Creates a superuser.
Will fail if the user already exists.
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
username: the new superuser's username
password: the new superuser's password
Returns:
success?
"""
assert username, "Can't create superuser with no name"
assert (
username != USER_NAME_FOR_SYSTEM
), f"Can't create user with name {USER_NAME_FOR_SYSTEM!r}"
dbsession = req.dbsession
user = cls.get_user_by_name(dbsession, username)
if user:
# already exists!
return False
# noinspection PyArgumentList
user = cls(username=username) # does work!
user.superuser = True
audit(req, "SUPERUSER CREATED: " + user.username, from_console=True)
user.set_password(req, password) # will audit
user.language = req.language # a reasonable default
dbsession.add(user)
return True
[docs] @classmethod
def get_username_from_id(
cls, req: "CamcopsRequest", user_id: int
) -> Optional[str]:
"""
Looks up a user from their integer ID and returns their name, if found.
"""
dbsession = req.dbsession
return (
dbsession.query(cls.username)
.filter(cls.id == user_id)
.first()
.scalar()
)
[docs] @classmethod
def get_user_from_username_password(
cls,
req: "CamcopsRequest",
username: str,
password: str,
take_time_for_nonexistent_user: bool = True,
) -> Optional["User"]:
"""
Retrieve a User object from the supplied username, if the password is
correct; otherwise, return None.
Args:
req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
username: the username
password: the password attempt
take_time_for_nonexistent_user: if ``True`` (the default), then
even if the user doesn't exist, we take some time to mimic
the time we spend doing deliberately wasteful password
encryption (to prevent attackers from discovering real
usernames via timing attacks).
"""
dbsession = req.dbsession
user = cls.get_user_by_name(dbsession, username)
if user is None:
if take_time_for_nonexistent_user:
# If the user really existed, we'd be running a somewhat
# time-consuming bcrypt operation. So that attackers can't
# identify fake users easily based on timing, we consume some
# time:
cls.take_some_time_mimicking_password_encryption()
return None
if not user.is_password_correct(password):
return None
return user
[docs] @classmethod
def get_system_user(cls, dbsession: SqlASession) -> "User":
"""
Returns a user representing "command-line access".
"""
user = cls.get_user_by_name(dbsession, USER_NAME_FOR_SYSTEM)
if not user:
# noinspection PyArgumentList
user = cls(username=USER_NAME_FOR_SYSTEM) # does work!
dbsession.add(user)
user.fullname = "CamCOPS system user"
user.superuser = True
user.hashedpw = "" # because it's not nullable
# ... note that no password will hash to '', in addition to the fact
# that the system will not allow logon attempts for this user!
return user
# -------------------------------------------------------------------------
# Static methods
# -------------------------------------------------------------------------
[docs] @staticmethod
def is_username_permissible(username: str) -> bool:
"""
Is this a permissible username?
"""
return bool(re.match(VALID_USERNAME_REGEX, username))
[docs] @staticmethod
def take_some_time_mimicking_password_encryption() -> None:
"""
Waste some time. We use this when an attempt has been made to log in
with a nonexistent user; we know the user doesn't exist very quickly,
but we mimic the time it takes to check a real user's password.
"""
rnc_crypto.hash_password("dummy!", BCRYPT_DEFAULT_LOG_ROUNDS)
# -------------------------------------------------------------------------
# Authentication: passwords
# -------------------------------------------------------------------------
[docs] def set_password(self, req: "CamcopsRequest", new_password: str) -> None:
"""
Set a user's password.
"""
self.hashedpw = rnc_crypto.hash_password(
new_password, BCRYPT_DEFAULT_LOG_ROUNDS
)
self.last_password_change_utc = req.now_utc_no_tzinfo
self.must_change_password = False
audit(req, "Password changed for user " + self.username)
[docs] def is_password_correct(self, password: str) -> bool:
"""
Is the supplied password valid for this user?
"""
return rnc_crypto.is_password_valid(password, self.hashedpw)
[docs] def force_password_change(self) -> None:
"""
Make the user change their password at next login.
"""
self.must_change_password = True
[docs] def set_password_change_flag_if_necessary(
self, req: "CamcopsRequest"
) -> None:
"""
If we're requiring users to change their passwords, then check to
see if they must do so now.
"""
if self.must_change_password:
# already required, pointless to check again
return
cfg = req.config
if cfg.password_change_frequency_days <= 0:
# changes never required
return
if not self.last_password_change_utc:
# we don't know when the last change was, so it's overdue
self.force_password_change()
return
delta = req.now_utc_no_tzinfo - self.last_password_change_utc
# Must use a version of "now" with no timezone info, since
# self.last_password_change_utc is "offset-naive" (has no timezone
# info)
if delta.days >= cfg.password_change_frequency_days:
self.force_password_change()
# -------------------------------------------------------------------------
# Authentication: multi-factor authentication
# -------------------------------------------------------------------------
[docs] def set_mfa_method(self, mfa_method: str) -> None:
"""
Resets the multi-factor authentication (MFA) method.
"""
assert MfaMethod.valid(
mfa_method
), f"Invalid MFA method: {mfa_method!r}"
# Set the method
self.mfa_method = mfa_method
# A new secret key
self.mfa_secret_key = pyotp.random_base32()
# Reset the HOTP counter
self.hotp_counter = 0
[docs] def ensure_mfa_info(self) -> None:
"""
If for some reason we have lost aspects of our MFA information,
reset it. This step also ensures that anything erroneous in the
database is cleaned to a valid value.
"""
if not self.mfa_secret_key or self.hotp_counter is None:
self.set_mfa_method(MfaMethod.clean(self.mfa_method))
[docs] def verify_one_time_password(self, one_time_password: str) -> bool:
"""
Determines whether the supplied one-time password is valid for the
multi-factor authentication (MFA) currently selected.
Returns ``False`` if no MFA method is selected.
"""
mfa_method = self.mfa_method
if not MfaMethod.requires_second_step(mfa_method):
return False
if mfa_method == MfaMethod.TOTP:
totp = pyotp.TOTP(self.mfa_secret_key)
return totp.verify(one_time_password)
elif mfa_method in (MfaMethod.HOTP_EMAIL, MfaMethod.HOTP_SMS):
hotp = pyotp.HOTP(self.mfa_secret_key)
return one_time_password == hotp.at(self.hotp_counter)
else:
raise ValueError(
f"User.verify_one_time_password(): "
f"Bad mfa_method = {mfa_method}"
)
# -------------------------------------------------------------------------
# Authentication: logging in
# -------------------------------------------------------------------------
[docs] def login(self, req: "CamcopsRequest") -> None:
"""
Called when the framework has determined a successful login.
Clears any login failures.
Requires the user to change their password if policies say they should.
"""
self.clear_login_failures(req)
self.set_password_change_flag_if_necessary(req)
self.last_login_at_utc = req.now_utc_no_tzinfo
[docs] def clear_login_failures(self, req: "CamcopsRequest") -> None:
"""
Clear login failures.
"""
if not self.username:
return
SecurityLoginFailure.clear_login_failures(req, self.username)
[docs] def is_locked_out(self, req: "CamcopsRequest") -> bool:
"""
Is the user locked out because of multiple login failures?
"""
return SecurityAccountLockout.is_user_locked_out(req, self.username)
[docs] def locked_out_until(self, req: "CamcopsRequest") -> Optional[Pendulum]:
"""
When is the user locked out until?
Returns a Pendulum datetime in local timezone (or ``None`` if the
user isn't locked out).
"""
return SecurityAccountLockout.user_locked_out_until(req, self.username)
[docs] def enable(self, req: "CamcopsRequest") -> None:
"""
Re-enables the user, unlocking them and clearing login failures.
"""
SecurityLoginFailure.enable_user(req, self.username)
# -------------------------------------------------------------------------
# Details used for authentication
# -------------------------------------------------------------------------
@property
def partial_email(self) -> str:
"""
Returns a partially obscured version of the user's e-mail address.
There doesn't seem to be an agreed way of doing this. Here we show the
first and last letter of the "local-part" (see
https://en.wikipedia.org/wiki/Email_address), separated by asterisks.
If the local part is a single letter, it's shown twice.
"""
regex = r"^(.+)@(.*)$"
m = re.search(regex, self.email)
first_letter = m.group(1)[0]
last_letter = m.group(1)[-1]
domain = m.group(2)
return f"{first_letter}{OBSCURE_EMAIL_ASTERISKS}{last_letter}@{domain}"
@property
def raw_phone_number(self) -> str:
"""
Returns the user's phone number in E164 format:
https://en.wikipedia.org/wiki/E.164
"""
return phonenumbers.format_number(
self.phone_number, phonenumbers.PhoneNumberFormat.E164
)
@property
def partial_phone_number(self) -> str:
"""
Returns a partially obscured version of the user's phone number.
There doesn't seem to be an agreed way of doing this either.
https://www.karansaini.com/fuzzing-obfuscated-phone-numbers/
"""
return f"{OBSCURE_PHONE_ASTERISKS}{self.raw_phone_number[-2:]}"
# -------------------------------------------------------------------------
# Requirements
# -------------------------------------------------------------------------
@property
def must_agree_terms(self) -> bool:
"""
Does the user still need to agree the terms/conditions of use?
"""
if self.when_agreed_terms_of_use is None:
# User hasn't agreed yet.
return True
if self.when_agreed_terms_of_use.date() < TERMS_CONDITIONS_UPDATE_DATE:
# User hasn't agreed since the terms were updated.
return True
return False
[docs] def agree_terms(self, req: "CamcopsRequest") -> None:
"""
Mark the user as having agreed to the terms/conditions of use now.
"""
self.when_agreed_terms_of_use = req.now
[docs] def must_set_mfa_method(self, req: "CamcopsRequest") -> bool:
"""
Does the user still need to select a (valid) multi-factor
authentication method? We are happy if the user has selected a method
that is approved in the current config.
"""
return self.mfa_method not in req.config.mfa_methods
# -------------------------------------------------------------------------
# Groups
# -------------------------------------------------------------------------
@property
def group_ids(self) -> List[int]:
"""
Return a list of group IDs for all the groups that the user is a member
of.
"""
return sorted(list(g.id for g in self.groups))
@property
def group_names(self) -> List[str]:
"""
Returns a list of group names for all the groups that the user is a
member of.
"""
return sorted(list(g.name for g in self.groups))
[docs] def set_group_ids(self, group_ids: List[int]) -> None:
"""
Set the user's groups to the groups whose integer IDs are in the
``group_ids`` list, and remove the user from any other groups.
"""
dbsession = SqlASession.object_session(self)
assert dbsession, (
"User.set_group_ids() called on a User that's not "
"yet in a session"
)
# groups = Group.get_groups_from_id_list(dbsession, group_ids)
# Remove groups that no longer apply
for m in self.user_group_memberships:
if m.group_id not in group_ids:
dbsession.delete(m)
# Add new groups
current_group_ids = [m.group_id for m in self.user_group_memberships]
new_group_ids = [
gid for gid in group_ids if gid not in current_group_ids
]
for gid in new_group_ids:
self.user_group_memberships.append(
UserGroupMembership(user_id=self.id, group_id=gid)
)
@property
def ids_of_groups_user_may_see(self) -> List[int]:
"""
Return a list of group IDs for groups that the user may see data
from. (That means the groups the user is in, plus any other groups that
the user's groups are authorized to see.)
"""
# Incidentally: "list_a += list_b" vs "list_a.extend(list_b)":
# https://stackoverflow.com/questions/3653298/concatenating-two-lists-difference-between-and-extend # noqa
# ... not much difference; perhaps += is slightly better (also clearer)
# And relevant set operations:
# https://stackoverflow.com/questions/4045403/python-how-to-add-the-contents-of-an-iterable-to-a-set # noqa
#
# Process as a set rather than a list, to eliminate duplicates:
group_ids = set() # type: Set[int]
for my_group in self.groups: # type: Group
group_ids.update(my_group.ids_of_groups_group_may_see())
return list(group_ids)
# Return as a list rather than a set, because SQLAlchemy's in_()
# operator only likes lists and ?tuples.
@property
def ids_of_groups_user_may_dump(self) -> List[int]:
"""
Return a list of group IDs for groups that the user may dump data
from.
See also :meth:`groups_user_may_dump`.
This does **not** give "second-hand authority" to dump. For example,
if group G1 can "see" G2, and user U has authority to dump G1, that
authority does not extend to G2.
"""
if self.superuser:
return Group.all_group_ids(
dbsession=SqlASession.object_session(self)
)
memberships = self.user_group_memberships # type: _TYPE_LUGM
return [m.group_id for m in memberships if m.may_dump_data]
@property
def ids_of_groups_user_may_report_on(self) -> List[int]:
"""
Returns a list of group IDs for groups that the user may run reports
on.
This does **not** give "second-hand authority" to dump. For example,
if group G1 can "see" G2, and user U has authority to report on G1,
that authority does not extend to G2.
"""
if self.superuser:
return Group.all_group_ids(
dbsession=SqlASession.object_session(self)
)
memberships = self.user_group_memberships # type: _TYPE_LUGM
return [m.group_id for m in memberships if m.may_run_reports]
@property
def ids_of_groups_user_is_admin_for(self) -> List[int]:
"""
Returns a list of group IDs for groups that the user is an
administrator for.
"""
if self.superuser:
return Group.all_group_ids(
dbsession=SqlASession.object_session(self)
)
memberships = self.user_group_memberships # type: _TYPE_LUGM
return [m.group_id for m in memberships if m.groupadmin]
@property
def ids_of_groups_user_may_manage_patients_in(self) -> List[int]:
"""
Returns a list of group IDs for groups that the user may
add/edit/delete patients in
"""
if self.superuser:
return Group.all_group_ids(
dbsession=SqlASession.object_session(self)
)
memberships = self.user_group_memberships # type: _TYPE_LUGM
return [
m.group_id
for m in memberships
if m.may_manage_patients or m.groupadmin
]
@property
def ids_of_groups_user_may_email_patients_in(self) -> List[int]:
"""
Returns a list of group IDs for groups that the user may send emails to
patients in
"""
if self.superuser:
return Group.all_group_ids(
dbsession=SqlASession.object_session(self)
)
memberships = self.user_group_memberships # type: _TYPE_LUGM
return [
m.group_id
for m in memberships
if m.may_email_patients or m.groupadmin
]
@property
def names_of_groups_user_is_admin_for(self) -> List[str]:
"""
Returns a list of group names for groups that the user is an
administrator for.
"""
if self.superuser:
return Group.all_group_names(
dbsession=SqlASession.object_session(self)
)
memberships = self.user_group_memberships # type: _TYPE_LUGM
return [m.group.name for m in memberships if m.groupadmin]
@property
def names_of_groups_user_is_admin_for_csv(self) -> str:
"""
Returns a list of group names for groups that the user is an
administrator for.
"""
names = sorted(self.names_of_groups_user_is_admin_for)
return ", ".join(names)
[docs] def may_administer_group(self, group_id: int) -> bool:
"""
May this user administer the group identified by ``group_id``?
"""
if self.superuser:
return True
return group_id in self.ids_of_groups_user_is_admin_for
[docs] def may_manage_patients_in_group(self, group_id: int) -> bool:
"""
May this user manage patients in the group identified by ``group_id``?
"""
if self.superuser:
return True
return group_id in self.ids_of_groups_user_may_manage_patients_in
[docs] def may_email_patients_in_group(self, group_id: int) -> bool:
"""
May this user send emails to patients in the group identified by
``group_id``?
"""
if self.superuser:
return True
return group_id in self.ids_of_groups_user_may_email_patients_in
@property
def groups_user_may_see(self) -> List[Group]:
"""
Returns a list of :class:`camcops_server.cc_modules.cc_group.Group`
objects for groups the user can see.
Less efficient than the group ID version; for visual display (see
``view_own_user_info.mako``).
"""
groups = set(self.groups) # type: Set[Group]
for my_group in self.groups: # type: Group
groups.update(set(my_group.can_see_other_groups))
return sorted(list(groups), key=lambda g: g.name)
@property
def groups_user_may_dump(self) -> List[Group]:
"""
Returns a list of :class:`camcops_server.cc_modules.cc_group.Group`
objects for groups the user can dump.
For security notes, see :meth:`ids_of_groups_user_may_dump`.
Less efficient than the group ID version (see
:meth:`ids_of_groups_user_may_dump`). This version is for visual
display (see ``view_own_user_info.mako``).
"""
memberships = self.user_group_memberships # type: _TYPE_LUGM
return sorted(
[m.group for m in memberships if m.may_dump_data],
key=lambda g: g.name,
)
@property
def groups_user_may_report_on(self) -> List[Group]:
"""
Returns a list of :class:`camcops_server.cc_modules.cc_group.Group`
objects for groups the user can report on.
For security notes, see :meth:`ids_of_groups_user_may_report_on`.
Less efficient than the group ID version (see
:meth:`ids_of_groups_user_may_report_on`). This version is for visual
display (see ``view_own_user_info.mako``).
"""
memberships = self.user_group_memberships # type: _TYPE_LUGM
return sorted(
[m.group for m in memberships if m.may_run_reports],
key=lambda g: g.name,
)
@property
def groups_user_may_upload_into(self) -> List[Group]:
"""
Returns a list of :class:`camcops_server.cc_modules.cc_group.Group`
objects for groups the user can upload into.
For visual display (see ``view_own_user_info.mako``).
"""
memberships = self.user_group_memberships # type: _TYPE_LUGM
return sorted(
[m.group for m in memberships if m.may_upload],
key=lambda g: g.name,
)
@property
def groups_user_may_add_special_notes(self) -> List[Group]:
"""
Returns a list of :class:`camcops_server.cc_modules.cc_group.Group`
objects for groups the user can add special notes to.
For visual display (see ``view_own_user_info.mako``).
"""
memberships = self.user_group_memberships # type: _TYPE_LUGM
return sorted(
[m.group for m in memberships if m.may_add_notes],
key=lambda g: g.name,
)
@property
def groups_user_may_see_all_pts_when_unfiltered(self) -> List[Group]:
"""
Returns a list of :class:`camcops_server.cc_modules.cc_group.Group`
objects for groups the user can see all patients when unfiltered.
For visual display (see ``view_own_user_info.mako``).
"""
memberships = self.user_group_memberships # type: _TYPE_LUGM
return sorted(
[
m.group
for m in memberships
if m.view_all_patients_when_unfiltered
],
key=lambda g: g.name,
)
@property
def groups_user_is_admin_for(self) -> List[Group]:
"""
Returns a list of :class:`camcops_server.cc_modules.cc_group.Group`
objects for groups the user is an administrator for.
Less efficient than the group ID version; for visual display (see
``view_own_user_info.mako``).
"""
memberships = self.user_group_memberships # type: _TYPE_LUGM
return sorted(
[m.group for m in memberships if m.groupadmin],
key=lambda g: g.name,
)
@property
def groups_user_may_manage_patients_in(self) -> List[Group]:
"""
Returns a list of :class:`camcops_server.cc_modules.cc_group.Group`
objects for groups the user may manage patients in.
"""
memberships = self.user_group_memberships # type: _TYPE_LUGM
return sorted(
[m.group for m in memberships if m.may_manage_patients],
key=lambda g: g.name,
)
@property
def groups_user_may_email_patients_in(self) -> List[Group]:
"""
Returns a list of :class:`camcops_server.cc_modules.cc_group.Group`
objects for groups the user may send emails to patients in.
"""
memberships = self.user_group_memberships # type: _TYPE_LUGM
return sorted(
[m.group for m in memberships if m.may_email_patients],
key=lambda g: g.name,
)
@property
def is_a_groupadmin(self) -> bool:
"""
Is the user a specifically defined group administrator (for any group)?
"""
memberships = self.user_group_memberships # type: _TYPE_LUGM
return any(m.groupadmin for m in memberships)
@property
def authorized_as_groupadmin(self) -> bool:
"""
Is the user authorized as a group administrator for any group (either
by being specifically set as a group administrator, or by being a
superuser)?
"""
return self.superuser or self.is_a_groupadmin
[docs] def membership_for_group_id(self, group_id: int) -> UserGroupMembership:
"""
Returns the :class:`UserGroupMembership` object relating this user
to the group identified by ``group_id``.
"""
return next(
(m for m in self.user_group_memberships if m.group_id == group_id),
None,
)
[docs] def group_ids_nonsuperuser_may_see_when_unfiltered(self) -> List[int]:
"""
Which group IDs may this user see all patients for, when unfiltered?
"""
memberships = self.user_group_memberships # type: _TYPE_LUGM
return [
m.group_id
for m in memberships
if m.view_all_patients_when_unfiltered
]
[docs] def may_upload_to_group(self, group_id: int) -> bool:
"""
May this user upload to the specified group?
"""
if self.superuser:
return True
memberships = self.user_group_memberships # type: _TYPE_LUGM
return any(m.may_upload for m in memberships if m.group_id == group_id)
# -------------------------------------------------------------------------
# Other permissions
# -------------------------------------------------------------------------
@property
def may_login_as_tablet(self) -> bool:
"""
May the user login via the client (tablet) API?
"""
return self.may_upload or self.may_register_devices
@property
def may_use_webviewer(self) -> bool:
"""
May this user log in to the web front end?
"""
if self.superuser:
return True
memberships = self.user_group_memberships # type: _TYPE_LUGM
return any(m.may_use_webviewer for m in memberships)
[docs] def authorized_to_add_special_note(self, group_id: int) -> bool:
"""
Is this user authorized to add special notes for the group identified
by ``group_id``?
"""
if self.superuser:
return True
membership = self.membership_for_group_id(group_id)
if not membership:
return False
return membership.may_add_notes
[docs] def authorized_to_erase_tasks(self, group_id: int) -> bool:
"""
Is this user authorized to erase tasks for the group identified
by ``group_id``?
"""
if self.superuser:
return True
membership = self.membership_for_group_id(group_id)
if not membership:
return False
return membership.groupadmin
@property
def authorized_to_dump(self) -> bool:
"""
Is the user authorized to dump data (for some group)?
"""
if self.superuser:
return True
memberships = self.user_group_memberships # type: _TYPE_LUGM
return any(m.may_dump_data for m in memberships)
@property
def authorized_for_reports(self) -> bool:
"""
Is the user authorized to run reports (for some group)?
"""
if self.superuser:
return True
memberships = self.user_group_memberships # type: _TYPE_LUGM
return any(m.may_run_reports for m in memberships)
@property
def authorized_to_manage_patients(self) -> bool:
"""
Is the user authorized to manage patients (for some group)?
"""
if self.authorized_as_groupadmin:
return True
memberships = self.user_group_memberships # type: _TYPE_LUGM
return any(m.may_manage_patients for m in memberships)
@property
def authorized_to_email_patients(self) -> bool:
"""
Is the user authorized to send emails to patients (for some group)?
"""
if self.authorized_as_groupadmin:
return True
memberships = self.user_group_memberships # type: _TYPE_LUGM
return any(m.may_email_patients for m in memberships)
@property
def may_view_all_patients_when_unfiltered(self) -> bool:
"""
May the user view all patients when no filters are applied (for all
groups that the user is a member of)?
"""
if self.superuser:
return True
memberships = self.user_group_memberships # type: _TYPE_LUGM
return all(m.view_all_patients_when_unfiltered for m in memberships)
@property
def may_view_no_patients_when_unfiltered(self) -> bool:
"""
May the user view *no* patients when no filters are applied?
"""
if self.superuser:
return False
memberships = self.user_group_memberships # type: _TYPE_LUGM
return all(
not m.view_all_patients_when_unfiltered for m in memberships
)
@property
def may_upload(self) -> bool:
"""
May this user upload to the group that is set as their upload group?
"""
if self.upload_group_id is None:
return False
return self.may_upload_to_group(self.upload_group_id)
@property
def may_register_devices(self) -> bool:
"""
May this user register devices?
You can register a device if your chosen upload groups allow you to do
so. (You have to have a chosen group -- even for superusers -- because
the tablet wants group ID policies at the moment of registration, so we
have to know which group.)
"""
if self.upload_group_id is None:
return False
if self.superuser:
return True
memberships = self.user_group_memberships # type: _TYPE_LUGM
return any(
m.may_register_devices
for m in memberships
if m.group_id == self.upload_group_id
)
# -------------------------------------------------------------------------
# Managing other users
# -------------------------------------------------------------------------
[docs] def managed_users(self) -> Optional[Query]:
"""
Return a query for all users managed by this user.
LOGIC SHOULD MATCH :meth:`may_edit_user`.
"""
dbsession = SqlASession.object_session(self)
if not self.superuser and not self.is_a_groupadmin:
return dbsession.query(User).filter(false())
# https://stackoverflow.com/questions/10345327/sqlalchemy-create-an-intentionally-empty-query # noqa
q = (
dbsession.query(User)
.filter(User.username != USER_NAME_FOR_SYSTEM)
.order_by(User.username)
)
if not self.superuser:
# LOGIC SHOULD MATCH assert_may_edit_user
# Restrict to users who are members of groups that I am an admin
# for:
groupadmin_group_ids = self.ids_of_groups_user_is_admin_for
# noinspection PyUnresolvedReferences
ugm2 = UserGroupMembership.__table__.alias("ugm2")
q = (
q.join(User.user_group_memberships)
.filter(not_(User.superuser))
.filter(UserGroupMembership.group_id.in_(groupadmin_group_ids))
.filter(
~exists()
.select_from(ugm2)
.where(and_(ugm2.c.user_id == User.id, ugm2.c.groupadmin))
)
)
# ... no superusers
# ... user must be a member of one of our groups
# ... no groupadmins
# https://stackoverflow.com/questions/14600619/using-not-exists-clause-in-sqlalchemy-orm-query # noqa
return q
[docs] def may_edit_user(
self, req: "CamcopsRequest", other: "User"
) -> Tuple[bool, str]:
"""
May the ``self`` user edit the ``other`` user?
Args:
req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
other: the user to be edited (potentially)
Returns:
tuple: may_edit (bool), reason_why_not (str)
LOGIC SHOULD MATCH :meth:`managed_users`.
"""
_ = req.gettext
if other.username == USER_NAME_FOR_SYSTEM:
return False, _("Nobody may edit the system user")
if not self.superuser:
if other.superuser:
return False, _("You may not edit a superuser")
if other.is_a_groupadmin:
return False, _("You may not edit a group administrator")
groupadmin_group_ids = self.ids_of_groups_user_is_admin_for
if not any(gid in groupadmin_group_ids for gid in other.group_ids):
return (
False,
_(
"You are not a group administrator for any "
"groups that this user is in"
),
)
return True, ""
# =============================================================================
# Command-line password control
# =============================================================================
[docs]def set_password_directly(
req: "CamcopsRequest", username: str, password: str
) -> bool:
"""
If the user exists, set its password. Returns Boolean success.
Used from the command line.
"""
dbsession = req.dbsession
user = User.get_user_by_name(dbsession, username)
if not user:
return False
user.set_password(req, password)
user.enable(req)
audit(req, "Password changed for user " + user.username, from_console=True)
return True