"""
camcops_server/cc_modules/cc_sqla_coltypes.py
===============================================================================
Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CamCOPS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**SQLAlchemy column types used by CamCOPS.**
Note these built-in SQLAlchemy types
(https://docs.sqlalchemy.org/en/latest/core/type_basics.html#generic-types):
=============== ===========================================================
SQLAlchemy type Comment
=============== ===========================================================
BigInteger MySQL: -9,223,372,036,854,775,808 to
9,223,372,036,854,775,807 (64-bit)
(compare NHS number: up to 9,999,999,999)
Boolean
Date
DateTime
Enum
Float
Integer MySQL: -2,147,483,648 to 2,147,483,647 (32-bit)
Interval For ``datetime.timedelta``
LargeBinary Under MySQL, maps to ``BLOB``
MatchType For the return type of the ``MATCH`` operator
Numeric For fixed-precision numbers like ``NUMERIC`` or ``DECIMAL``
PickleType
SchemaType
SmallInteger
String ``VARCHAR``
Text Variably sized string type.
(Under MySQL, renders as ``TEXT``.)
Time
Unicode Implies that the underlying column explicitly supports
Unicode
UnicodeText Variably sized version of Unicode
(Under MySQL, renders as ``TEXT`` too.)
=============== ===========================================================
Not supported across all platforms:
=============== ===========================================================
SQL type Comment
=============== ===========================================================
BIGINT UNSIGNED MySQL: 0 to 18,446,744,073,709,551,615 (64-bit).
Use ``sqlalchemy.dialects.mysql.BIGINT(unsigned=True)``.
INT UNSIGNED MySQL: 0 to 4,294,967,295 (32-bit).
Use ``sqlalchemy.dialects.mysql.INTEGER(unsigned=True)``.
=============== ===========================================================
Other MySQL sizes:
=============== ===========================================================
MySQL type Comment
=============== ===========================================================
TINYBLOB 2^8 bytes = 256 bytes
BLOB 2^16 bytes = 64 KiB
MEDIUMBLOB 2^24 bytes = 16 MiB
LONGBLOB 2^32 bytes = 4 GiB
TINYTEXT 255 (2^8 - 1) bytes
TEXT 65,535 bytes (2^16 - 1) = 64 KiB
MEDIUMTEXT 16,777,215 (2^24 - 1) bytes = 16 MiB
LONGTEXT 4,294,967,295 (2^32 - 1) bytes = 4 GiB
=============== ===========================================================
See https://stackoverflow.com/questions/13932750/tinytext-text-mediumtext-and-longtext-maximum-storage-sizes.
Also notes:
- Columns may need their character set specified explicitly under MySQL:
https://stackoverflow.com/questions/2108824/mysql-incorrect-string-value-error-when-save-unicode-string-in-django
""" # noqa
# =============================================================================
# Imports
# =============================================================================
import json
import logging
from typing import (
Any,
Generator,
List,
NoReturn,
Optional,
Sequence,
Tuple,
Type,
TYPE_CHECKING,
Union,
)
import uuid
from cardinal_pythonlib.datetimefunc import (
coerce_to_pendulum,
convert_datetime_to_utc,
duration_from_iso,
duration_to_iso,
PotentialDatetimeType,
)
from cardinal_pythonlib.lists import chunks
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.reprfunc import auto_repr
from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
from cardinal_pythonlib.sqlalchemy.orm_inspect import (
gen_columns,
gen_relationships,
)
from cardinal_pythonlib.sqlalchemy.sqlfunc import (
fail_unknown_dialect,
fetch_processed_single_clause,
)
from isodate.isoerror import ISO8601Error
from pendulum import DateTime as Pendulum, Duration
from pendulum.parsing.exceptions import ParserError
import phonenumbers
from semantic_version import Version
from sqlalchemy.dialects import mysql
from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import mapped_column, MappedColumn
from sqlalchemy.orm.relationships import RelationshipProperty
from sqlalchemy.sql.elements import conv
from sqlalchemy.sql.expression import text
from sqlalchemy.sql.functions import FunctionElement
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import (
Boolean,
CHAR,
DateTime,
LargeBinary,
String,
Text,
Unicode,
UnicodeText,
)
from sqlalchemy.sql.type_api import TypeDecorator
from camcops_server.cc_modules.cc_constants import PV, StringLengths
from camcops_server.cc_modules.cc_simpleobjects import IdNumReference
from camcops_server.cc_modules.cc_sqlalchemy import (
LONG_COLUMN_NAME_WARNING_LIMIT,
)
from camcops_server.cc_modules.cc_version import make_version
if TYPE_CHECKING:
from sqlalchemy.sql.elements import ClauseElement
from sqlalchemy.sql.compiler import SQLCompiler
from sqlalchemy.sql.type_api import _CT, ColumnElement, OperatorType
from camcops_server.cc_modules.cc_db import (
GenericTabletRecordMixin,
)
log = BraceStyleAdapter(logging.getLogger(__name__))
# =============================================================================
# Debugging options
# =============================================================================
DEBUG_DATETIME_AS_ISO_TEXT = False
DEBUG_DURATION_AS_ISO_TEXT = False
DEBUG_IDNUMDEF_LIST = False
DEBUG_INT_LIST_COLTYPE = False
DEBUG_SEMANTIC_VERSION = False
DEBUG_STRING_LIST_COLTYPE = False
if any(
[
DEBUG_DATETIME_AS_ISO_TEXT,
DEBUG_DURATION_AS_ISO_TEXT,
DEBUG_SEMANTIC_VERSION,
DEBUG_IDNUMDEF_LIST,
DEBUG_INT_LIST_COLTYPE,
DEBUG_STRING_LIST_COLTYPE,
]
):
log.warning("Debugging options enabled!")
# =============================================================================
# Constants
# =============================================================================
[docs]class RelationshipInfo(object):
"""
Used as keys the ``info`` (user-defined) dictionary parameter to SQLAlchemy
``relationship`` calls; see
https://docs.sqlalchemy.org/en/latest/orm/relationship_api.html#sqlalchemy.orm.relationship.
"""
IS_ANCILLARY = "is_ancillary"
IS_BLOB = "is_blob"
# =============================================================================
# Simple derivative column types
# =============================================================================
# If you insert something too long into a VARCHAR, it just gets truncated.
AuditSourceColType = String(length=StringLengths.AUDIT_SOURCE_MAX_LEN)
# BigIntUnsigned = Integer().with_variant(mysql.BIGINT(unsigned=True), 'mysql')
# ... partly because Alembic breaks on variants (Aug 2017), and partly because
# it's nonstandard and unnecessary, changed all BigIntUnsigned to
# BigInteger (2017-08-25).
Base32ColType = String(length=StringLengths.BASE32_MAX_LEN)
CharColType = String(length=1)
CharsetColType = String(length=StringLengths.CHARSET_MAX_LEN)
CurrencyColType = Unicode(length=StringLengths.CURRENCY_MAX_LEN)
DatabaseTitleColType = Unicode(length=StringLengths.DATABASE_TITLE_MAX_LEN)
DeviceNameColType = String(length=StringLengths.DEVICE_NAME_MAX_LEN)
DiagnosticCodeColType = String(length=StringLengths.DIAGNOSTIC_CODE_MAX_LEN)
EmailAddressColType = Unicode(length=StringLengths.EMAIL_ADDRESS_MAX_LEN)
EraColType = String(length=StringLengths.ISO8601_DATETIME_STRING_MAX_LEN)
ExportRecipientNameColType = String(
length=StringLengths.EXPORT_RECIPIENT_NAME_MAX_LEN
)
ExportTransmissionMethodColType = String(
length=StringLengths.SENDING_FORMAT_MAX_LEN
)
FilterTextColType = Unicode(length=StringLengths.FILTER_TEXT_MAX_LEN)
FileSpecColType = Unicode(length=StringLengths.FILESPEC_MAX_LEN)
FullNameColType = Unicode(length=StringLengths.FULLNAME_MAX_LEN)
GroupDescriptionColType = Unicode(
length=StringLengths.GROUP_DESCRIPTION_MAX_LEN
)
GroupNameColType = Unicode(length=StringLengths.GROUP_NAME_MAX_LEN)
HashedPasswordColType = String(length=StringLengths.HASHED_PW_MAX_LEN)
# ... You might think that we must ensure case-SENSITIVE comparison on this
# field. That would require the option collation='utf8mb4_bin' to String(),
# for MySQL. However, that is MySQL-specific, and SQLAlchemy currently (Oct
# 2017) doesn't support database-specific *per-column* collations. SQLite
# accepts COLLATE commands but chokes on 'utf8mb4_bin'. Now, the hashed
# password from bcrypt() is case-sensitive. HOWEVER, the important thing is
# that we always retrieve the string from the database and do a case-sensitive
# comparison in Python (see calls to is_password_valid()). So the database
# collation doesn't matter. So we don't set it.
# See further notes in cc_sqlalchemy.py
HL7AssigningAuthorityType = String(length=StringLengths.HL7_AA_MAX_LEN)
HL7IdTypeType = String(length=StringLengths.HL7_ID_TYPE_MAX_LEN)
HostnameColType = String(length=StringLengths.HOSTNAME_MAX_LEN)
IdDescriptorColType = Unicode(length=StringLengths.ID_DESCRIPTOR_MAX_LEN)
IdPolicyColType = String(length=StringLengths.ID_POLICY_MAX_LEN)
# IntUnsigned = Integer().with_variant(mysql.INTEGER(unsigned=True), 'mysql')
IPAddressColType = String(length=StringLengths.IP_ADDRESS_MAX_LEN)
# This is a plain string.
# See also e.g. http://sqlalchemy-utils.readthedocs.io/en/latest/_modules/sqlalchemy_utils/types/ip_address.html # noqa
LanguageCodeColType = String(length=StringLengths.LANGUAGE_CODE_MAX_LEN)
# Large BLOB:
# https://stackoverflow.com/questions/43791725/sqlalchemy-how-to-make-a-longblob-column-in-mysql # noqa
# One of these:
# noinspection PyTypeChecker
LongBlob = LargeBinary().with_variant(mysql.LONGBLOB, "mysql")
# LongBlob = LargeBinary(length=LONGBLOB_LONGTEXT_MAX_LEN) # doesn't translate to SQL Server # noqa
# noinspection PyTypeChecker
LongText = UnicodeText().with_variant(mysql.LONGTEXT, "mysql")
# LongText = UnicodeText(length=LONGBLOB_LONGTEXT_MAX_LEN) # doesn't translate to SQL Server # noqa
MfaMethodColType = String(length=StringLengths.MFA_METHOD_MAX_LEN)
MimeTypeColType = String(length=StringLengths.MIMETYPE_MAX_LEN)
PatientNameColType = Unicode(length=StringLengths.PATIENT_NAME_MAX_LEN)
Rfc2822DateColType = String(length=StringLengths.RFC_2822_DATE_MAX_LEN)
SessionTokenColType = String(length=StringLengths.SESSION_TOKEN_MAX_LEN)
SexColType = String(length=1)
SummaryCategoryColType = String(
length=StringLengths.TASK_SUMMARY_TEXT_FIELD_DEFAULT_MAX_LEN
)
# ... pretty generic
TableNameColType = String(length=StringLengths.TABLENAME_MAX_LEN)
UrlColType = String(length=StringLengths.URL_MAX_LEN)
UserNameCamcopsColType = String(length=StringLengths.USERNAME_CAMCOPS_MAX_LEN)
UserNameExternalColType = String(
length=StringLengths.USERNAME_EXTERNAL_MAX_LEN
)
# =============================================================================
# Helper operations for PendulumDateTimeAsIsoTextColType
# =============================================================================
# Database string format is e.g.
# 2013-07-24T20:04:07.123456+01:00
# 2013-07-24T20:04:07.123+01:00
# 0 1 2 3 } position in string; 1-based
# 12345678901234567890123456789012 }
#
# So: rightmost 6 characters are time zone; rest is date/time.
# leftmost 23 characters are time up to millisecond precision.
# overall length is typically 29 (milliseconds) or 32 (microseconds)
_TZ_LEN = 6 # length of the timezone part of the ISO8601 string
_UTC_TZ_LITERAL = "'+00:00'"
_SQLITE_DATETIME_FMT_FOR_PYTHON = "'%Y-%m-%d %H:%M:%f'"
_MYSQL_DATETIME_LEN = 19
_SQLSERVER_DATETIME_LEN = 19
_SQLSERVER_DATETIME2_LEN = 27
# -----------------------------------------------------------------------------
# isotzdatetime_to_utcdatetime
# -----------------------------------------------------------------------------
# noinspection PyPep8Naming
[docs]class isotzdatetime_to_utcdatetime(FunctionElement):
"""
Used as an SQL operation by :class:`PendulumDateTimeAsIsoTextColType`.
Creates an SQL expression wrapping a field containing our ISO-8601 text,
making a ``DATETIME`` out of it, in the UTC timezone.
Implemented for different SQL dialects.
"""
type = DateTime()
name = "isotzdatetime_to_utcdatetime"
inherit_cache = False
# noinspection PyUnusedLocal
[docs]@compiles(isotzdatetime_to_utcdatetime)
def isotzdatetime_to_utcdatetime_default(
element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
) -> None:
"""
Default implementation for :class:`isotzdatetime_to_utcdatetime`: fail.
"""
fail_unknown_dialect(compiler, "perform isotzdatetime_to_utcdatetime")
# noinspection PyUnusedLocal
[docs]@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.MYSQL)
def isotzdatetime_to_utcdatetime_mysql(
element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
) -> str:
"""
Implementation of :class:`isotzdatetime_to_utcdatetime` for MySQL.
For format, see
https://dev.mysql.com/doc/refman/5.5/en/date-and-time-functions.html#function_date-format
Note the use of "%i" for minutes.
Things after ``func.`` get passed to the database engine as literal SQL
functions; https://docs.sqlalchemy.org/en/latest/core/tutorial.html
"""
x = fetch_processed_single_clause(element, compiler)
# Let's do this in a clear way:
date_time_part = f"LEFT({x}, LENGTH({x}) - {_TZ_LEN})"
# ... drop the rightmost 6 chars (the timezone component)
fmt = compiler.process(text("'%Y-%m-%dT%H:%i:%S.%f'"))
# ... the text() part deals with the necessary escaping of % for the DBAPI
the_date_time = f"STR_TO_DATE({date_time_part}, {fmt})"
# ... STR_TO_DATE() returns a DATETIME if the string contains both date and
# time components.
old_timezone = f"RIGHT({x}, {_TZ_LEN})"
result_utc = (
f"CONVERT_TZ({the_date_time}, {old_timezone}, {_UTC_TZ_LITERAL})"
)
# log.debug(result_utc)
return result_utc
# noinspection PyUnusedLocal
[docs]@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.SQLITE)
def isotzdatetime_to_utcdatetime_sqlite(
element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
) -> str:
"""
Implementation of :class:`isotzdatetime_to_utcdatetime` for SQLite.
- https://sqlite.org/lang_corefunc.html#substr
- https://sqlite.org/lang_datefunc.html
- https://www.sqlite.org/lang_expr.html
Get an SQL expression for the timezone adjustment in hours.
Note that if a time is 12:00+01:00, that means e.g. midday BST, which
is 11:00+00:00 or 11:00 UTC. So you SUBTRACT the displayed timezone from
the time, which I've always thought is a bit odd.
Ha! Was busy implementing this, but SQLite is magic; if there's a
timezone at the end, ``STRFTIME()`` will convert it to UTC automatically!
Moreover, the format is the OUTPUT format that a Python datetime will
recognize, so no 'T'.
The output format is like this: ``2018-06-01 00:00:00.000``. Note that
SQLite provides millisecond precision only (in general and via the ``%f``
argument to ``STRFTIME``).
See also SQLAlchemy's DATETIME support for SQLite:
- https://docs.sqlalchemy.org/en/13/dialects/sqlite.html?highlight=sqlite#sqlalchemy.dialects.sqlite.DATETIME
... but that doesn't support timezones, so that doesn't help us.
One further problem -- see
:class:`camcops_server.tasks.core10.Core10ReportDateRangeTests` -- is that
comparisons are done by SQLite as text, so e.g.
.. code-block:: sql
SELECT '2018-06-01 00:00:00.000' >= '2018-06-01 00:00:00.000000'; -- 0, false
SELECT '2018-06-01 00:00:00.000' >= '2018-06-01 00:00:00.000'; -- 1, true
and therefore we need to ensure either that the SQLite side gets translated
to 6dp, or the bind param gets translated to 3dp. I don't think we can
always have control over the bind parameter. So we append '000' to the
SQLite side.
""" # noqa
x = fetch_processed_single_clause(element, compiler)
fmt = compiler.process(text(_SQLITE_DATETIME_FMT_FOR_PYTHON))
result = f"(STRFTIME({fmt}, {x}) || '000')"
# log.debug(result)
return result
# noinspection PyUnusedLocal
[docs]@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.SQLSERVER)
def isotzdatetime_to_utcdatetime_sqlserver(
element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
) -> str:
"""
Implementation of :class:`isotzdatetime_to_utcdatetime` for SQL Server.
**Converting strings to DATETIME values**
- ``CAST()``: Part of ANSI SQL.
- ``CONVERT()``: Not part of ANSI SQL; has some extra formatting options.
Both methods work:
.. code-block:: sql
SELECT CAST('2001-01-31T21:30:49.123' AS DATETIME) AS via_cast,
CONVERT(DATETIME, '2001-01-31T21:30:49.123') AS via_convert;
... fine on SQL Server 2005, with milliseconds in both cases.
However, going beyond milliseconds doesn't fail gracefully, it causes an
error (e.g. "...21:30.49.123456") both for CAST and CONVERT.
The ``DATETIME2`` format accepts greater precision, but requires SQL Server
2008 or higher. Then this works:
.. code-block:: sql
SELECT CAST('2001-01-31T21:30:49.123456' AS DATETIME2) AS via_cast,
CONVERT(DATETIME2, '2001-01-31T21:30:49.123456') AS via_convert;
So as not to be too optimistic: ``CAST(x AS DATETIME2)`` ignores (silently)
any timezone information in the string. So does ``CONVERT(DATETIME2, x, {0
or 1})``.
**Converting between time zones**
NO TIME ZONE SUPPORT in SQL Server 2005.
e.g. https://stackoverflow.com/questions/3200827/how-to-convert-timezones-in-sql-server-2005.
.. code-block:: none
TODATETIMEOFFSET(expression, time_zone):
expression: something that evaluates to a DATETIME2 value
time_zone: integer minutes, or string hours/minutes e.g. "+13.00"
-> produces a DATETIMEOFFSET value
Available from SQL Server 2008
(https://docs.microsoft.com/en-us/sql/t-sql/functions/todatetimeoffset-transact-sql).
.. code-block:: none
SWITCHOFFSET
-> converts one DATETIMEOFFSET value to another, preserving its UTC
time, but changing the displayed (local) time zone.
... however, is that unnecessary? We want a plain ``DATETIME2`` in UTC, and
.conversion to UTC is automatically achieved by ``CONVERT(DATETIME2,
.some_datetimeoffset, 1)``
... https://stackoverflow.com/questions/4953903/how-can-i-convert-a-sql-server-2008-datetimeoffset-to-a-datetime
... but not by ``CAST(some_datetimeoffset AS DATETIME2)``, and not by
``CONVERT(DATETIME2, some_datetimeoffset, 0)``
... and styles 0 and 1 are the only ones permissible from SQL Server 2012
and up (empirically, and documented for the reverse direction at
https://docs.microsoft.com/en-us/sql/t-sql/functions/cast-and-convert-transact-sql?view=sql-server-2017)
... this is not properly documented re UTC conversion, as far as I can
see. Let's use ``SWITCHOFFSET -> CAST`` to be explicit and clear.
``AT TIME ZONE``: From SQL Server 2016 only.
https://docs.microsoft.com/en-us/sql/t-sql/queries/at-time-zone-transact-sql?view=sql-server-2017
**Therefore**
- We need to require SQL Server 2008 or higher.
- Therefore we can use the ``DATETIME2`` type.
- Note that ``LEN()``, not ``LENGTH()``, is ANSI SQL; SQL Server only
supports ``LEN``.
**Example (tested on SQL Server 2014)**
.. code-block:: sql
DECLARE @source AS VARCHAR(100) = '2001-01-31T21:30:49.123456+07:00';
SELECT CAST(
SWITCHOFFSET(
TODATETIMEOFFSET(
CAST(LEFT(@source, LEN(@source) - 6) AS DATETIME2),
RIGHT(@source, 6)
),
'+00:00'
)
AS DATETIME2
) -- 2001-01-31 14:30:49.1234560
""" # noqa
x = fetch_processed_single_clause(element, compiler)
date_time_part = f"LEFT({x}, LEN({x}) - {_TZ_LEN})" # a VARCHAR
old_timezone = f"RIGHT({x}, {_TZ_LEN})" # a VARCHAR
date_time_no_tz = f"CAST({date_time_part} AS DATETIME2)" # a DATETIME2
date_time_offset_with_old_tz = (
f"TODATETIMEOFFSET({date_time_no_tz}, {old_timezone})"
# a DATETIMEOFFSET
)
date_time_offset_with_utc_tz = (
f"SWITCHOFFSET({date_time_offset_with_old_tz}, {_UTC_TZ_LITERAL})"
# a DATETIMEOFFSET in UTC
)
result_utc = f"CAST({date_time_offset_with_utc_tz} AS DATETIME2)"
# log.debug(result_utc)
return result_utc
# -----------------------------------------------------------------------------
# unknown_field_to_utcdatetime
# -----------------------------------------------------------------------------
# noinspection PyPep8Naming
[docs]class unknown_field_to_utcdatetime(FunctionElement):
"""
Used as an SQL operation by :class:`PendulumDateTimeAsIsoTextColType`.
Creates an SQL expression wrapping a field containing something unknown,
which might be a ``DATETIME`` or an ISO-formatted field, and
making a ``DATETIME`` out of it, in the UTC timezone.
Implemented for different SQL dialects.
"""
type = DateTime()
name = "unknown_field_to_utcdatetime"
inherit_cache = False
# noinspection PyUnusedLocal
[docs]@compiles(unknown_field_to_utcdatetime)
def unknown_field_to_utcdatetime_default(
element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
) -> None:
"""
Default implementation for :class:`unknown_field_to_utcdatetime`: fail.
"""
fail_unknown_dialect(compiler, "perform unknown_field_to_utcdatetime")
# noinspection PyUnusedLocal
[docs]@compiles(unknown_field_to_utcdatetime, SqlaDialectName.MYSQL)
def unknown_field_to_utcdatetime_mysql(
element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
) -> str:
"""
Implementation of :class:`unknown_field_to_utcdatetime` for MySQL.
If it's the length of a plain ``DATETIME`` e.g. ``2013-05-30 00:00:00``
(19), leave it as a ``DATETIME``; otherwise convert ISO -> ``DATETIME``.
"""
x = fetch_processed_single_clause(element, compiler)
converted = isotzdatetime_to_utcdatetime_mysql(element, compiler, **kw)
result = f"IF(LENGTH({x}) = {_MYSQL_DATETIME_LEN}, {x}, {converted})"
# log.debug(result)
return result
# noinspection PyUnusedLocal
[docs]@compiles(unknown_field_to_utcdatetime, SqlaDialectName.SQLITE)
def unknown_field_to_utcdatetime_sqlite(
element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
) -> str:
"""
Implementation of :class:`unknown_field_to_utcdatetime` for SQLite.
"""
x = fetch_processed_single_clause(element, compiler)
fmt = compiler.process(text(_SQLITE_DATETIME_FMT_FOR_PYTHON))
result = f"STRFTIME({fmt}, {x})"
# log.debug(result)
return result
# noinspection PyUnusedLocal
[docs]@compiles(unknown_field_to_utcdatetime, SqlaDialectName.SQLSERVER)
def unknown_field_to_utcdatetime_sqlserver(
element: "ClauseElement", compiler: "SQLCompiler", **kw: Any
) -> str:
"""
Implementation of :class:`unknown_field_to_utcdatetime` for SQL Server.
We should cope also with the possibility of a ``DATETIME2`` field, not just
``DATETIME``. It seems consistent that ``LEN(DATETIME2) = 27``, with
precision tenth of a microsecond, e.g. ``2001-01-31 21:30:49.1234567``
(27).
So, if it looks like a ``DATETIME`` or a ``DATETIME2``, then we leave it
alone; otherwise we put it through our ISO-to-datetime function.
Importantly, note that neither ``_SQLSERVER_DATETIME_LEN`` nor
``_SQLSERVER_DATETIME2_LEN`` are the length of any of our ISO strings.
"""
x = fetch_processed_single_clause(element, compiler)
# https://stackoverflow.com/questions/5487892/sql-server-case-when-or-then-else-end-the-or-is-not-supported # noqa
converted = isotzdatetime_to_utcdatetime_sqlserver(element, compiler, **kw)
result = (
f"CASE WHEN LEN({x}) IN "
f"({_SQLSERVER_DATETIME_LEN}, {_SQLSERVER_DATETIME2_LEN}) THEN {x} "
f"ELSE {converted} "
f"END"
)
# log.debug(result)
return result
# =============================================================================
# Custom date/time field as ISO-8601 text including timezone, using
# pendulum.DateTime on the Python side.
# =============================================================================
[docs]class PendulumDateTimeAsIsoTextColType(TypeDecorator):
"""
Stores date/time values as ISO-8601, in a specific format.
Uses Pendulum on the Python side.
"""
impl = String(length=StringLengths.ISO8601_DATETIME_STRING_MAX_LEN)
# ... underlying SQL type
cache_ok = False
_coltype_name = "PendulumDateTimeAsIsoTextColType"
@property
def python_type(self) -> type:
"""
The Python type of the object.
"""
return Pendulum
[docs] @staticmethod
def pendulum_to_isostring(x: PotentialDatetimeType) -> Optional[str]:
"""
From a Python datetime to an ISO-formatted string in our particular
format.
"""
# https://docs.python.org/3.4/library/datetime.html#strftime-strptime-behavior # noqa
x = coerce_to_pendulum(x)
try:
mainpart = x.strftime(
"%Y-%m-%dT%H:%M:%S.%f"
) # microsecond accuracy
timezone = x.strftime("%z") # won't have the colon in
return mainpart + timezone[:-2] + ":" + timezone[-2:]
except AttributeError:
return None
[docs] @staticmethod
def isostring_to_pendulum(x: Optional[str]) -> Optional[Pendulum]:
"""
From an ISO-formatted string to a Python Pendulum, with timezone.
"""
try:
return coerce_to_pendulum(x)
except (ParserError, ValueError):
log.warning("Bad ISO date/time string: {!r}", x)
return None
[docs] def process_bind_param(
self, value: Optional[Pendulum], dialect: Dialect
) -> Optional[str]:
"""
Convert parameters on the way from Python to the database.
"""
retval = self.pendulum_to_isostring(value)
if DEBUG_DATETIME_AS_ISO_TEXT:
log.debug(
"{}.process_bind_param("
"self={!r}, value={!r}, dialect={!r}) -> {!r}",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
[docs] def process_literal_param(
self, value: Optional[Pendulum], dialect: Dialect
) -> Optional[str]:
"""
Convert literals on the way from Python to the database.
"""
retval = self.pendulum_to_isostring(value)
if DEBUG_DATETIME_AS_ISO_TEXT:
log.debug(
"{}.process_literal_param("
"self={!r}, value={!r}, dialect={!r}) -> {!r}",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
[docs] def process_result_value(
self, value: Optional[str], dialect: Dialect
) -> Optional[Pendulum]:
"""
Convert things on the way from the database to Python.
"""
retval = self.isostring_to_pendulum(value)
if DEBUG_DATETIME_AS_ISO_TEXT:
log.debug(
"{}.process_result_value("
"self={!r}, value={!r}, dialect={!r}) -> {!r}",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
# noinspection PyPep8Naming
[docs] class comparator_factory(TypeDecorator.Comparator):
"""
Process SQL for when we are comparing our column, in the database,
to something else.
We make this dialect-independent by calling functions like
.. code-block:: none
unknown_field_to_utcdatetime
isotzdatetime_to_utcdatetime
... which we then specialize for specific dialects.
This function itself does not appear to be able to access any
information about the dialect.
"""
[docs] def operate(
self, op: "OperatorType", *other: Any, **kwargs: Any
) -> "ColumnElement[_CT]":
assert len(other) == 1
assert not kwargs
other = other[0]
try:
processed_other = convert_datetime_to_utc(
coerce_to_pendulum(other)
)
# - If you try to call a dialect-specialized FunctionElement,
# it processes the clause to "?" (meaning "attach bind
# parameter here"); it's not the value itself.
# - For our SQLite "milliseconds only" comparator problem (see
# above), we can't do very much here without knowing the
# dialect. So we make the SQLite side look like it has
# microseconds by appending "000"...
except (AttributeError, ParserError, TypeError, ValueError):
# OK. At this point, "other" could be a plain DATETIME field,
# or a PendulumDateTimeAsIsoTextColType field (or potentially
# something else that we don't really care about). If it's a
# DATETIME, then we assume it is already in UTC.
processed_other = unknown_field_to_utcdatetime(other) # type: ignore[assignment] # noqa: E501
if DEBUG_DATETIME_AS_ISO_TEXT:
log.debug(
"operate(self={!r}, op={!r}, other={!r})", self, op, other
)
log.debug("self.expr = {!r}", self.expr)
log.debug("processed_other = {!r}", processed_other)
# traceback.print_stack()
return op(isotzdatetime_to_utcdatetime(self.expr), processed_other)
[docs] def reverse_operate(
self, op: "OperatorType", *other: Any, **kwargs: Any
) -> NoReturn:
assert False, "I don't think this is ever being called"
# =============================================================================
# Custom duration field as ISO-8601 text, using pendulum.Duration on the Python
# side.
# =============================================================================
[docs]class PendulumDurationAsIsoTextColType(TypeDecorator):
"""
Stores time durations as ISO-8601, in a specific format.
Uses :class:`pendulum.Duration` on the Python side.
"""
impl = String(length=StringLengths.ISO8601_DURATION_STRING_MAX_LEN)
# ... underlying SQL type
cache_ok = False
_coltype_name = "PendulumDurationAsIsoTextColType"
@property
def python_type(self) -> type:
"""
The Python type of the object.
"""
return Duration
[docs] @staticmethod
def pendulum_duration_to_isostring(x: Optional[Duration]) -> Optional[str]:
"""
From a :class:`pendulum.Duration` (or ``None``) an ISO-formatted string
in our particular format (or ``NULL``).
"""
if x is None:
return None
return duration_to_iso(
x, permit_years_months=True, minus_sign_at_front=True
)
[docs] @staticmethod
def isostring_to_pendulum_duration(x: Optional[str]) -> Optional[Duration]:
"""
From an ISO-formatted string to a Python Pendulum, with timezone.
"""
if not x: # None (NULL) or blank string
return None
try:
return duration_from_iso(x)
except (ISO8601Error, ValueError):
log.warning("Bad ISO duration string: {!r}", x)
return None
[docs] def process_bind_param(
self, value: Optional[Duration], dialect: Dialect
) -> Optional[str]:
"""
Convert parameters on the way from Python to the database.
"""
retval = self.pendulum_duration_to_isostring(value)
if DEBUG_DURATION_AS_ISO_TEXT:
log.debug(
"{}.process_bind_param("
"self={!r}, value={!r}, dialect={!r}) -> {!r}",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
[docs] def process_literal_param(
self, value: Optional[Duration], dialect: Dialect
) -> Optional[str]:
"""
Convert literals on the way from Python to the database.
"""
retval = self.pendulum_duration_to_isostring(value)
if DEBUG_DURATION_AS_ISO_TEXT:
log.debug(
"{}.process_literal_param("
"self={!r}, value={!r}, dialect={!r}) -> {!r}",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
[docs] def process_result_value(
self, value: Optional[str], dialect: Dialect
) -> Optional[Duration]:
"""
Convert things on the way from the database to Python.
"""
retval = self.isostring_to_pendulum_duration(value)
if DEBUG_DURATION_AS_ISO_TEXT:
log.debug(
"{}.process_result_value("
"self={!r}, value={!r}, dialect={!r}) -> {!r}",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
# No comparator_factory; we do not use SQL to compare ISO durations.
# =============================================================================
# Semantic version column type
# =============================================================================
[docs]class SemanticVersionColType(TypeDecorator):
"""
Stores semantic versions in the database.
Uses :class:`semantic_version.Version` on the Python side.
"""
impl = String(length=147) # https://github.com/mojombo/semver/issues/79
cache_ok = False
_coltype_name = "SemanticVersionColType"
@property
def python_type(self) -> type:
"""
The Python type of the object.
"""
return Version
[docs] def process_bind_param(
self, value: Optional[Version], dialect: Dialect
) -> Optional[str]:
"""
Convert parameters on the way from Python to the database.
"""
retval = str(value) if value is not None else None
if DEBUG_SEMANTIC_VERSION:
log.debug(
"{}.process_bind_param("
"self={!r}, value={!r}, dialect={!r}) -> {!r}",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
[docs] def process_literal_param(
self, value: Optional[Version], dialect: Dialect
) -> Optional[str]:
"""
Convert literals on the way from Python to the database.
"""
retval = str(value) if value is not None else None
if DEBUG_SEMANTIC_VERSION:
log.debug(
"{}.process_literal_param("
"self={!r}, value={!r}, dialect={!r}) -> !r",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
[docs] def process_result_value(
self, value: Optional[str], dialect: Dialect
) -> Optional[Version]:
"""
Convert things on the way from the database to Python.
"""
if value is None:
retval = None
else:
# Here we do some slightly fancier conversion to deal with all
# sorts of potential rubbish coming in, so we get a properly
# ordered Version out:
retval = make_version(value)
if DEBUG_SEMANTIC_VERSION:
log.debug(
"{}.process_result_value("
"self={!r}, value={!r}, dialect={!r}) -> {!r}",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
'''
# noinspection PyPep8Naming
class comparator_factory(TypeDecorator.Comparator):
"""
Process SQL for when we are comparing our column, in the database,
to something else.
See https://docs.sqlalchemy.org/en/13/core/type_api.html#sqlalchemy.types.TypeEngine.comparator_factory.
.. warning::
I'm not sure this is either (a) correct or (b) used; it may
produce a string comparison of e.g. ``14.0.0`` versus ``2.0.0``,
which will be alphabetical and therefore wrong.
Disabled on 2019-04-28.
""" # noqa
def operate(self, op, *other, **kwargs):
assert len(other) == 1
assert not kwargs
other = other[0]
if isinstance(other, Version):
processed_other = str(Version)
else:
processed_other = other
return op(self.expr, processed_other)
def reverse_operate(self, op, *other, **kwargs):
assert False, "I don't think this is ever being called"
'''
# =============================================================================
# IdNumReferenceListColType
# =============================================================================
[docs]class IdNumReferenceListColType(TypeDecorator):
"""
Stores a list of IdNumReference objects.
On the database side, uses a comma-separated list of integers.
"""
impl = Text()
_coltype_name = "IdNumReferenceListColType"
@property
def python_type(self) -> type:
"""
The Python type of the object.
"""
return list
@staticmethod
def _idnumdef_list_to_dbstr(
idnumdef_list: Optional[List[IdNumReference]],
) -> str:
"""
Converts an optional list of
:class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference`
objects to a CSV string suitable for storing in the database.
"""
if not idnumdef_list:
return ""
elements = [] # type: List[int]
for idnumdef in idnumdef_list:
elements.append(idnumdef.which_idnum)
elements.append(idnumdef.idnum_value)
return ",".join(str(x) for x in elements)
@staticmethod
def _dbstr_to_idnumdef_list(dbstr: Optional[str]) -> List[IdNumReference]:
"""
Converts a CSV string (from the database) to a list of
:class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference`
objects.
"""
idnumdef_list = [] # type: List[IdNumReference]
try:
intlist = [int(numstr) for numstr in dbstr.split(",")]
except (AttributeError, TypeError, ValueError):
return []
length = len(intlist)
if length == 0 or length % 2 != 0: # enforce pairs
return []
for which_idnum, idnum_value in chunks(intlist, n=2):
if which_idnum < 0 or idnum_value < 0: # enforce positive integers
return []
idnumdef_list.append(
IdNumReference(
which_idnum=which_idnum, idnum_value=idnum_value
)
)
return idnumdef_list
[docs] def process_bind_param(
self, value: Optional[List[IdNumReference]], dialect: Dialect
) -> str:
"""
Convert parameters on the way from Python to the database.
"""
retval = self._idnumdef_list_to_dbstr(value)
if DEBUG_IDNUMDEF_LIST:
log.debug(
"{}.process_bind_param("
"self={!r}, value={!r}, dialect={!r}) -> {!r}",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
[docs] def process_literal_param(
self, value: Optional[List[IdNumReference]], dialect: Dialect
) -> str:
"""
Convert literals on the way from Python to the database.
"""
retval = self._idnumdef_list_to_dbstr(value)
if DEBUG_IDNUMDEF_LIST:
log.debug(
"{}.process_literal_param("
"self={!r}, value={!r}, dialect={!r}) -> !r",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
[docs] def process_result_value(
self, value: Optional[str], dialect: Dialect
) -> List[IdNumReference]:
"""
Convert things on the way from the database to Python.
"""
retval = self._dbstr_to_idnumdef_list(value)
if DEBUG_IDNUMDEF_LIST:
log.debug(
"{}.process_result_value("
"self={!r}, value={!r}, dialect={!r}) -> {!r}",
self._coltype_name,
self,
value,
dialect,
retval,
)
return retval
# =============================================================================
# UUID column type
# =============================================================================
[docs]class UuidColType(TypeDecorator):
# Based on:
# https://docs.sqlalchemy.org/en/13/core/custom_types.html#backend-agnostic-guid-type # noqa: E501
# which will use postgresql UUID if relevant, not doing that here
impl = CHAR(32)
cache_ok = False
@property
def python_type(self) -> type:
return str
[docs] def process_bind_param(
self, value: uuid.UUID, dialect: Dialect
) -> Optional[str]:
"""
Convert parameters on the way from Python to the database.
"""
if value is None:
return None
return "%.32x" % value.int
[docs] def process_result_value(
self, value: Optional[str], dialect: Dialect
) -> Optional[uuid.UUID]:
"""
Convert things on the way from the database to Python.
"""
if value is None:
return None
return uuid.UUID(value)
# =============================================================================
# JSON column type
# =============================================================================
[docs]class JsonColType(TypeDecorator):
# Unlike
# https://docs.sqlalchemy.org/en/13/core/type_basics.html#sqlalchemy.types.JSON
# does not use vendor-specific JSON type
impl = UnicodeText
cache_ok = False
@property
def python_type(self) -> type:
return str
[docs] def process_bind_param(
self, value: Any, dialect: Dialect
) -> Optional[str]:
"""
Convert parameters on the way from Python to the database.
"""
if value is None:
return None
return json.dumps(value)
[docs] def process_result_value(self, value: str, dialect: Dialect) -> Any:
"""
Convert things on the way from the database to Python.
"""
if value is None:
return None
return json.loads(value)
# =============================================================================
# Phone number column type
# =============================================================================
[docs]class PhoneNumberColType(TypeDecorator):
impl = Unicode(length=StringLengths.PHONE_NUMBER_MAX_LEN)
cache_ok = False
@property
def python_type(self) -> type:
return str
[docs] def process_bind_param(
self, value: Any, dialect: Dialect
) -> Optional[str]:
"""
Convert parameters on the way from Python to the database.
"""
if value is None:
return None
return phonenumbers.format_number(
value, phonenumbers.PhoneNumberFormat.E164
)
[docs] def process_result_value(self, value: str, dialect: Dialect) -> Any:
"""
Convert things on the way from the database to Python.
"""
if not value:
return None
# Should be stored as E164 so no need to pass a region
return phonenumbers.parse(value, None)
# =============================================================================
# PermittedValueChecker: used by camcops_column
# =============================================================================
[docs]class PermittedValueChecker(object):
"""
Represents permitted values (in columns belonging to CamCOPS tasks), and
checks a value against them.
"""
[docs] def __init__(
self,
not_null: bool = False,
minimum: Union[int, float] = None,
maximum: Union[int, float] = None,
permitted_values: Sequence[Any] = None,
) -> None:
"""
Args:
not_null: must the value not be NULL?
minimum: if specified, a numeric minimum value
maximum: if specified, a numeric maximum value
permitted_values: if specified, a list of permitted values
"""
self.not_null = not_null
self.minimum = minimum
self.maximum = maximum
self.permitted_values = permitted_values
[docs] def is_ok(self, value: Any) -> bool:
"""
Does the value pass our tests?
"""
if value is None:
return not self.not_null
# If not_null is True, then the value is not OK; return False.
# If not_null is False, then a null value passes all other tests.
if (
self.permitted_values is not None
and value not in self.permitted_values
):
return False
if self.minimum is not None and value < self.minimum:
return False
if self.maximum is not None and value > self.maximum:
return False
return True
[docs] def failure_msg(self, value: Any) -> str:
"""
Why does the value not pass our tests?
"""
if value is None:
if self.not_null:
return "value is None and NULL values are not permitted"
else:
return "" # value is OK
if (
self.permitted_values is not None
and value not in self.permitted_values
):
return (
f"value {value!r} not in permitted values "
f"{self.permitted_values!r}"
)
if self.minimum is not None and value < self.minimum:
return f"value {value!r} less than minimum of {self.minimum!r}"
if self.maximum is not None and value > self.maximum:
return f"value {value!r} more than maximum of {self.maximum!r}"
return ""
def __repr__(self) -> str:
return auto_repr(self)
[docs] def permitted_values_inc_minmax(self) -> Tuple:
"""
Returns permitted values, either specified directly or via a
minimum/maximum.
"""
if self.permitted_values:
return tuple(self.permitted_values)
# Take a punt that integer minima/maxima mean that only integers are
# permitted...
if isinstance(self.minimum, int) and isinstance(self.maximum, int):
return tuple(range(self.minimum, self.maximum + 1))
return ()
[docs] def permitted_values_csv(self) -> str:
"""
Returns a CSV representation of the permitted values.
Primarily used for CRIS data dictionaries.
"""
return ",".join(str(x) for x in self.permitted_values_inc_minmax())
# Specific instances, to reduce object duplication and magic numbers:
MIN_ZERO_CHECKER = PermittedValueChecker(minimum=0)
BIT_CHECKER = PermittedValueChecker(permitted_values=PV.BIT)
ZERO_TO_ONE_CHECKER = PermittedValueChecker(minimum=0, maximum=1)
ZERO_TO_TWO_CHECKER = PermittedValueChecker(minimum=0, maximum=2)
ZERO_TO_THREE_CHECKER = PermittedValueChecker(minimum=0, maximum=3)
ZERO_TO_FOUR_CHECKER = PermittedValueChecker(minimum=0, maximum=4)
ZERO_TO_FIVE_CHECKER = PermittedValueChecker(minimum=0, maximum=5)
ZERO_TO_SIX_CHECKER = PermittedValueChecker(minimum=0, maximum=6)
ZERO_TO_SEVEN_CHECKER = PermittedValueChecker(minimum=0, maximum=7)
ZERO_TO_EIGHT_CHECKER = PermittedValueChecker(minimum=0, maximum=8)
ZERO_TO_NINE_CHECKER = PermittedValueChecker(minimum=0, maximum=9)
ZERO_TO_10_CHECKER = PermittedValueChecker(minimum=0, maximum=10)
ZERO_TO_100_CHECKER = PermittedValueChecker(minimum=0, maximum=100)
ONE_TO_TWO_CHECKER = PermittedValueChecker(minimum=1, maximum=2)
ONE_TO_THREE_CHECKER = PermittedValueChecker(minimum=1, maximum=3)
ONE_TO_FOUR_CHECKER = PermittedValueChecker(minimum=1, maximum=4)
ONE_TO_FIVE_CHECKER = PermittedValueChecker(minimum=1, maximum=5)
ONE_TO_SIX_CHECKER = PermittedValueChecker(minimum=1, maximum=6)
ONE_TO_SEVEN_CHECKER = PermittedValueChecker(minimum=1, maximum=7)
ONE_TO_EIGHT_CHECKER = PermittedValueChecker(minimum=1, maximum=8)
ONE_TO_NINE_CHECKER = PermittedValueChecker(minimum=1, maximum=9)
# =============================================================================
# camcops_column: provides extra functions over Column.
# =============================================================================
# Column attributes
COLATTR_BLOB_RELATIONSHIP_ATTR_NAME = "blob_relationship_attr_name"
COLATTR_EXEMPT_FROM_ANONYMISATION = "exempt_from_anonymisation"
COLATTR_IDENTIFIES_PATIENT = "identifies_patient"
COLATTR_INCLUDE_IN_ANON_STAGING_DB = "include_in_anon_staging_db"
COLATTR_IS_BLOB_ID_FIELD = "is_blob_id_field"
COLATTR_IS_CAMCOPS_COLUMN = "is_camcops_column"
COLATTR_PERMITTED_VALUE_CHECKER = "permitted_value_checker"
[docs]def camcops_column(
*args: Any,
include_in_anon_staging_db: bool = False,
exempt_from_anonymisation: bool = False,
identifies_patient: bool = False,
is_blob_id_field: bool = False,
blob_relationship_attr_name: str = "",
permitted_value_checker: PermittedValueChecker = None,
**kwargs: Any,
) -> Column[Any]:
"""
Args:
*args:
Arguments to the :class:`Column` constructor.
include_in_anon_staging_db:
Ensure this is marked for inclusion in data dictionaries for an
anonymisation staging database.
exempt_from_anonymisation:
If true: though this field might be text, it is guaranteed not
to contain identifiers (e.g. it might contain only predefined
disease severity descriptions) and does not require
anonymisation.
identifies_patient:
If true: contains a patient identifier (e.g. name).
is_blob_id_field:
If true: this field contains a reference (client FK) to the
BLOB table.
blob_relationship_attr_name:
For BLOB ID fields: the name of the associated relationship
attribute (which, when accessed, yields the BLOB itself) in
the owning class/object.
permitted_value_checker:
If specified, a :class:`PermittedValueChecker` that allows
soft constraints to be specified on the field's contents. (That
is, no constraints are specified at the database level, but we
can moan if incorrect data are present.)
**kwargs:
Arguments to the :class:`Column` constructor.
"""
if is_blob_id_field:
assert blob_relationship_attr_name, (
"If specifying a BLOB ID field, must give the attribute name "
"of the relationship too"
)
info = {
COLATTR_IS_CAMCOPS_COLUMN: True,
COLATTR_INCLUDE_IN_ANON_STAGING_DB: include_in_anon_staging_db,
COLATTR_EXEMPT_FROM_ANONYMISATION: exempt_from_anonymisation,
COLATTR_IDENTIFIES_PATIENT: identifies_patient,
COLATTR_IS_BLOB_ID_FIELD: is_blob_id_field,
COLATTR_BLOB_RELATIONSHIP_ATTR_NAME: blob_relationship_attr_name,
COLATTR_PERMITTED_VALUE_CHECKER: permitted_value_checker,
}
return Column(*args, info=info, **kwargs)
[docs]def mapped_camcops_column( # type: ignore[no-untyped-def]
*args,
include_in_anon_staging_db: bool = False,
exempt_from_anonymisation: bool = False,
identifies_patient: bool = False,
is_blob_id_field: bool = False,
blob_relationship_attr_name: str = "",
permitted_value_checker: PermittedValueChecker = None,
**kwargs,
) -> MappedColumn[Any]:
"""
As :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` but
returns a python typing-compatible MappedColumn.
Args:
*args:
Arguments to the :class:`Column` constructor.
include_in_anon_staging_db:
Ensure this is marked for inclusion in data dictionaries for an
anonymisation staging database.
exempt_from_anonymisation:
If true: though this field might be text, it is guaranteed not
to contain identifiers (e.g. it might contain only predefined
disease severity descriptions) and does not require
anonymisation.
identifies_patient:
If true: contains a patient identifier (e.g. name).
is_blob_id_field:
If true: this field contains a reference (client FK) to the
BLOB table.
blob_relationship_attr_name:
For BLOB ID fields: the name of the associated relationship
attribute (which, when accessed, yields the BLOB itself) in
the owning class/object.
permitted_value_checker:
If specified, a :class:`PermittedValueChecker` that allows
soft constraints to be specified on the field's contents. (That
is, no constraints are specified at the database level, but we
can moan if incorrect data are present.)
**kwargs:
Arguments to the :class:`Column` constructor.
"""
if is_blob_id_field:
assert blob_relationship_attr_name, (
"If specifying a BLOB ID field, must give the attribute name "
"of the relationship too"
)
info = {
COLATTR_IS_CAMCOPS_COLUMN: True,
COLATTR_INCLUDE_IN_ANON_STAGING_DB: include_in_anon_staging_db,
COLATTR_EXEMPT_FROM_ANONYMISATION: exempt_from_anonymisation,
COLATTR_IDENTIFIES_PATIENT: identifies_patient,
COLATTR_IS_BLOB_ID_FIELD: is_blob_id_field,
COLATTR_BLOB_RELATIONSHIP_ATTR_NAME: blob_relationship_attr_name,
COLATTR_PERMITTED_VALUE_CHECKER: permitted_value_checker,
}
return mapped_column(*args, info=info, **kwargs)
# =============================================================================
# Operate on Column/MappedColumn properties
# =============================================================================
[docs]def gen_columns_matching_attrnames( # type: ignore[no-untyped-def]
obj, attrnames: List[str]
) -> Generator[Tuple[str, Column], None, None]:
"""
Find columns of an SQLAlchemy ORM object whose attribute names match a
list.
Args:
obj: SQLAlchemy ORM object to inspect
attrnames: attribute names
Yields:
``attrname, column`` tuples
"""
for attrname, column in gen_columns(obj):
if attrname in attrnames:
yield attrname, column
[docs]def gen_camcops_columns( # type: ignore[no-untyped-def]
obj,
) -> Generator[Tuple[str, Column], None, None]:
"""
Finds all columns of an object that are
:func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` columns.
Args:
obj: SQLAlchemy ORM object to inspect
Yields:
``attrname, column`` tuples
"""
for attrname, column in gen_columns(obj):
if column.info.get(COLATTR_IS_CAMCOPS_COLUMN, False):
yield attrname, column
[docs]def gen_camcops_blob_columns( # type: ignore[no-untyped-def]
obj,
) -> Generator[Tuple[str, Column], None, None]:
"""
Finds all columns of an object that are
:func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` columns
referencing the BLOB table.
Args:
obj: SQLAlchemy ORM object to inspect
Yields:
``attrname, column`` tuples
"""
for attrname, column in gen_camcops_columns(obj):
if column.info.get(COLATTR_IS_BLOB_ID_FIELD, False):
if attrname != column.name:
log.warning(
"BLOB field where attribute name {!r} != SQL "
"column name {!r}",
attrname,
column.name,
)
yield attrname, column
[docs]def get_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def]
"""
Get a list of column attribute names from an SQLAlchemy ORM object.
"""
return [attrname for attrname, _ in gen_columns(obj)]
[docs]def get_camcops_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501
"""
Get a list of
:func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` column
attribute names from an SQLAlchemy ORM object.
"""
return [attrname for attrname, _ in gen_camcops_columns(obj)]
[docs]def get_camcops_blob_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501
"""
Get a list of
:func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` BLOB
column attribute names from an SQLAlchemy ORM object.
"""
return [attrname for attrname, _ in gen_camcops_blob_columns(obj)]
[docs]def permitted_value_failure_msgs(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501
"""
Checks a SQLAlchemy ORM object instance against its permitted value checks
(via its :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column`
columns), if it has any.
Returns a list of failure messages (empty list means all OK).
If you just want to know whether it passes, a quicker way is via
:func:`permitted_values_ok`.
"""
failure_msgs = []
for attrname, camcops_column_ in gen_camcops_columns(obj):
pv_checker = camcops_column_.info.get(
COLATTR_PERMITTED_VALUE_CHECKER
) # type: Optional[PermittedValueChecker]
if pv_checker is None:
continue
value = getattr(obj, attrname)
failure_msg = pv_checker.failure_msg(value)
if failure_msg:
failure_msgs.append(f"Invalid value for {attrname}: {failure_msg}")
return failure_msgs
[docs]def permitted_values_ok(obj) -> bool: # type: ignore[no-untyped-def]
"""
Checks whether an instance passes its permitted value checks, if it has
any.
If you want to know why it failed, see
:func:`permitted_value_failure_msgs`.
"""
for attrname, camcops_column_ in gen_camcops_columns(obj):
pv_checker = camcops_column_.info.get(
COLATTR_PERMITTED_VALUE_CHECKER
) # type: Optional[PermittedValueChecker]
if pv_checker is None:
continue
value = getattr(obj, attrname)
if not pv_checker.is_ok(value):
return False
return True
[docs]def gen_ancillary_relationships( # type: ignore[no-untyped-def]
obj,
) -> Generator[
Tuple[str, RelationshipProperty, Type["GenericTabletRecordMixin"]],
None,
None,
]:
"""
For an SQLAlchemy ORM object, yields tuples of ``attrname,
relationship_property, related_class`` for all relationships that are
marked as a CamCOPS ancillary relationship.
"""
for attrname, rel_prop, related_class in gen_relationships(obj):
if rel_prop.info.get(RelationshipInfo.IS_ANCILLARY, None) is True:
yield attrname, rel_prop, related_class
[docs]def gen_blob_relationships( # type: ignore[no-untyped-def]
obj,
) -> Generator[
Tuple[str, RelationshipProperty, Type["GenericTabletRecordMixin"]],
None,
None,
]:
"""
For an SQLAlchemy ORM object, yields tuples of ``attrname,
relationship_property, related_class`` for all relationships that are
marked as a CamCOPS BLOB relationship.
"""
for attrname, rel_prop, related_class in gen_relationships(obj):
if rel_prop.info.get(RelationshipInfo.IS_BLOB, None) is True:
yield attrname, rel_prop, related_class
# =============================================================================
# Specializations of camcops_column to save typing
# =============================================================================
def bool_column(name: str, *args: Any, **kwargs: Any) -> Column[bool]:
type_arg = _get_bool_column_args(name, kwargs)
return camcops_column(name, type_arg, *args, **kwargs)
def mapped_bool_column(
name: str, *args: Any, **kwargs: Any
) -> MappedColumn[bool]:
type_arg = _get_bool_column_args(name, kwargs)
return mapped_camcops_column(name, type_arg, *args, **kwargs)
def _get_bool_column_args(name: str, kwargs: dict[str, Any]) -> Boolean:
constraint_name = kwargs.pop(
"constraint_name", None
) # type: Optional[str]
if constraint_name:
constraint_name_conv = conv(constraint_name)
# ... see help for ``conv``
else:
constraint_name_conv = None
type_arg = Boolean(name=constraint_name_conv)
# The "name" parameter to Boolean() specifies the name of the
# (0, 1) constraint.
kwargs[COLATTR_PERMITTED_VALUE_CHECKER] = BIT_CHECKER
if not constraint_name and len(name) >= LONG_COLUMN_NAME_WARNING_LIMIT:
log.warning(
"bool_column with long column name and no constraint name: {!r}",
name,
)
return type_arg