Source code for camcops_server.cc_modules.cc_sqla_coltypes

"""
camcops_server/cc_modules/cc_sqla_coltypes.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**SQLAlchemy column types used by CamCOPS.**

Note these built-in SQLAlchemy types
(https://docs.sqlalchemy.org/en/latest/core/type_basics.html#generic-types):

    =============== ===========================================================
    SQLAlchemy type Comment
    =============== ===========================================================
    BigInteger      MySQL: -9,223,372,036,854,775,808 to
                    9,223,372,036,854,775,807 (64-bit)
                    (compare NHS number: up to 9,999,999,999)
    Boolean
    Date
    DateTime
    Enum
    Float
    Integer         MySQL: -2,147,483,648 to 2,147,483,647 (32-bit)
    Interval        For ``datetime.timedelta``
    LargeBinary     Under MySQL, maps to ``BLOB``
    MatchType       For the return type of the ``MATCH`` operator
    Numeric         For fixed-precision numbers like ``NUMERIC`` or ``DECIMAL``
    PickleType
    SchemaType
    SmallInteger
    String          ``VARCHAR``
    Text            Variably sized string type.
                    (Under MySQL, renders as ``TEXT``.)
    Time
    Unicode         Implies that the underlying column explicitly supports
                    Unicode
    UnicodeText     Variably sized version of Unicode
                    (Under MySQL, renders as ``TEXT`` too.)
    =============== ===========================================================

Not supported across all platforms:

    =============== ===========================================================
    SQL type        Comment
    =============== ===========================================================
    BIGINT UNSIGNED MySQL: 0 to 18,446,744,073,709,551,615 (64-bit).
                    Use ``sqlalchemy.dialects.mysql.BIGINT(unsigned=True)``.
    INT UNSIGNED    MySQL: 0 to 4,294,967,295 (32-bit).
                    Use ``sqlalchemy.dialects.mysql.INTEGER(unsigned=True)``.
    =============== ===========================================================

Other MySQL sizes:

    =============== ===========================================================
    MySQL type      Comment
    =============== ===========================================================
    TINYBLOB        2^8 bytes = 256 bytes
    BLOB            2^16 bytes = 64 KiB
    MEDIUMBLOB      2^24 bytes = 16 MiB
    LONGBLOB        2^32 bytes = 4 GiB
    TINYTEXT        255 (2^8 - 1) bytes
    TEXT            65,535 bytes (2^16 - 1) = 64 KiB
    MEDIUMTEXT      16,777,215 (2^24 - 1) bytes = 16 MiB
    LONGTEXT        4,294,967,295 (2^32 - 1) bytes = 4 GiB
    =============== ===========================================================

See https://stackoverflow.com/questions/13932750/tinytext-text-mediumtext-and-longtext-maximum-storage-sizes.

Also notes:

- Columns may need their character set specified explicitly under MySQL:
  https://stackoverflow.com/questions/2108824/mysql-incorrect-string-value-error-when-save-unicode-string-in-django

"""  # noqa

# =============================================================================
# Imports
# =============================================================================

import json
import logging
from typing import (
    Any,
    Generator,
    List,
    NoReturn,
    Optional,
    Sequence,
    Tuple,
    Type,
    TYPE_CHECKING,
    Union,
)
import uuid

from cardinal_pythonlib.datetimefunc import (
    coerce_to_pendulum,
    convert_datetime_to_utc,
    duration_from_iso,
    duration_to_iso,
    PotentialDatetimeType,
)
from cardinal_pythonlib.lists import chunks
from cardinal_pythonlib.logs import BraceStyleAdapter
from cardinal_pythonlib.reprfunc import auto_repr
from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
from cardinal_pythonlib.sqlalchemy.orm_inspect import (
    gen_columns,
    gen_relationships,
)
from cardinal_pythonlib.sqlalchemy.sqlfunc import (
    fail_unknown_dialect,
    fetch_processed_single_clause,
)
from isodate.isoerror import ISO8601Error
from pendulum import DateTime as Pendulum, Duration
from pendulum.parsing.exceptions import ParserError
import phonenumbers
from semantic_version import Version
from sqlalchemy.dialects import mysql
from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import mapped_column, MappedColumn
from sqlalchemy.orm.relationships import RelationshipProperty
from sqlalchemy.sql.elements import conv
from sqlalchemy.sql.expression import text
from sqlalchemy.sql.functions import FunctionElement
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.sqltypes import (
    Boolean,
    CHAR,
    DateTime,
    LargeBinary,
    String,
    Text,
    Unicode,
    UnicodeText,
)
from sqlalchemy.sql.type_api import TypeDecorator

from camcops_server.cc_modules.cc_constants import PV, StringLengths
from camcops_server.cc_modules.cc_simpleobjects import IdNumReference
from camcops_server.cc_modules.cc_sqlalchemy import (
    LONG_COLUMN_NAME_WARNING_LIMIT,
)
from camcops_server.cc_modules.cc_version import make_version

if TYPE_CHECKING:
    from sqlalchemy.sql.elements import ClauseElement
    from sqlalchemy.sql.compiler import SQLCompiler
    from sqlalchemy.sql.type_api import _CT, ColumnElement, OperatorType
    from camcops_server.cc_modules.cc_db import (
        GenericTabletRecordMixin,
    )

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Debugging options
# =============================================================================

DEBUG_DATETIME_AS_ISO_TEXT = False
DEBUG_DURATION_AS_ISO_TEXT = False
DEBUG_IDNUMDEF_LIST = False
DEBUG_INT_LIST_COLTYPE = False
DEBUG_SEMANTIC_VERSION = False
DEBUG_STRING_LIST_COLTYPE = False

if any(
    [
        DEBUG_DATETIME_AS_ISO_TEXT,
        DEBUG_DURATION_AS_ISO_TEXT,
        DEBUG_SEMANTIC_VERSION,
        DEBUG_IDNUMDEF_LIST,
        DEBUG_INT_LIST_COLTYPE,
        DEBUG_STRING_LIST_COLTYPE,
    ]
):
    log.warning("Debugging options enabled!")


# =============================================================================
# Constants
# =============================================================================


[docs]class RelationshipInfo(object): """ Used as keys the ``info`` (user-defined) dictionary parameter to SQLAlchemy ``relationship`` calls; see https://docs.sqlalchemy.org/en/latest/orm/relationship_api.html#sqlalchemy.orm.relationship. """ IS_ANCILLARY = "is_ancillary" IS_BLOB = "is_blob"
# ============================================================================= # Simple derivative column types # ============================================================================= # If you insert something too long into a VARCHAR, it just gets truncated. AuditSourceColType = String(length=StringLengths.AUDIT_SOURCE_MAX_LEN) # BigIntUnsigned = Integer().with_variant(mysql.BIGINT(unsigned=True), 'mysql') # ... partly because Alembic breaks on variants (Aug 2017), and partly because # it's nonstandard and unnecessary, changed all BigIntUnsigned to # BigInteger (2017-08-25). Base32ColType = String(length=StringLengths.BASE32_MAX_LEN) CharColType = String(length=1) CharsetColType = String(length=StringLengths.CHARSET_MAX_LEN) CurrencyColType = Unicode(length=StringLengths.CURRENCY_MAX_LEN) DatabaseTitleColType = Unicode(length=StringLengths.DATABASE_TITLE_MAX_LEN) DeviceNameColType = String(length=StringLengths.DEVICE_NAME_MAX_LEN) DiagnosticCodeColType = String(length=StringLengths.DIAGNOSTIC_CODE_MAX_LEN) EmailAddressColType = Unicode(length=StringLengths.EMAIL_ADDRESS_MAX_LEN) EraColType = String(length=StringLengths.ISO8601_DATETIME_STRING_MAX_LEN) ExportRecipientNameColType = String( length=StringLengths.EXPORT_RECIPIENT_NAME_MAX_LEN ) ExportTransmissionMethodColType = String( length=StringLengths.SENDING_FORMAT_MAX_LEN ) FilterTextColType = Unicode(length=StringLengths.FILTER_TEXT_MAX_LEN) FileSpecColType = Unicode(length=StringLengths.FILESPEC_MAX_LEN) FullNameColType = Unicode(length=StringLengths.FULLNAME_MAX_LEN) GroupDescriptionColType = Unicode( length=StringLengths.GROUP_DESCRIPTION_MAX_LEN ) GroupNameColType = Unicode(length=StringLengths.GROUP_NAME_MAX_LEN) HashedPasswordColType = String(length=StringLengths.HASHED_PW_MAX_LEN) # ... You might think that we must ensure case-SENSITIVE comparison on this # field. That would require the option collation='utf8mb4_bin' to String(), # for MySQL. However, that is MySQL-specific, and SQLAlchemy currently (Oct # 2017) doesn't support database-specific *per-column* collations. SQLite # accepts COLLATE commands but chokes on 'utf8mb4_bin'. Now, the hashed # password from bcrypt() is case-sensitive. HOWEVER, the important thing is # that we always retrieve the string from the database and do a case-sensitive # comparison in Python (see calls to is_password_valid()). So the database # collation doesn't matter. So we don't set it. # See further notes in cc_sqlalchemy.py HL7AssigningAuthorityType = String(length=StringLengths.HL7_AA_MAX_LEN) HL7IdTypeType = String(length=StringLengths.HL7_ID_TYPE_MAX_LEN) HostnameColType = String(length=StringLengths.HOSTNAME_MAX_LEN) IdDescriptorColType = Unicode(length=StringLengths.ID_DESCRIPTOR_MAX_LEN) IdPolicyColType = String(length=StringLengths.ID_POLICY_MAX_LEN) # IntUnsigned = Integer().with_variant(mysql.INTEGER(unsigned=True), 'mysql') IPAddressColType = String(length=StringLengths.IP_ADDRESS_MAX_LEN) # This is a plain string. # See also e.g. http://sqlalchemy-utils.readthedocs.io/en/latest/_modules/sqlalchemy_utils/types/ip_address.html # noqa LanguageCodeColType = String(length=StringLengths.LANGUAGE_CODE_MAX_LEN) # Large BLOB: # https://stackoverflow.com/questions/43791725/sqlalchemy-how-to-make-a-longblob-column-in-mysql # noqa # One of these: # noinspection PyTypeChecker LongBlob = LargeBinary().with_variant(mysql.LONGBLOB, "mysql") # LongBlob = LargeBinary(length=LONGBLOB_LONGTEXT_MAX_LEN) # doesn't translate to SQL Server # noqa # noinspection PyTypeChecker LongText = UnicodeText().with_variant(mysql.LONGTEXT, "mysql") # LongText = UnicodeText(length=LONGBLOB_LONGTEXT_MAX_LEN) # doesn't translate to SQL Server # noqa MfaMethodColType = String(length=StringLengths.MFA_METHOD_MAX_LEN) MimeTypeColType = String(length=StringLengths.MIMETYPE_MAX_LEN) PatientNameColType = Unicode(length=StringLengths.PATIENT_NAME_MAX_LEN) Rfc2822DateColType = String(length=StringLengths.RFC_2822_DATE_MAX_LEN) SessionTokenColType = String(length=StringLengths.SESSION_TOKEN_MAX_LEN) SexColType = String(length=1) SummaryCategoryColType = String( length=StringLengths.TASK_SUMMARY_TEXT_FIELD_DEFAULT_MAX_LEN ) # ... pretty generic TableNameColType = String(length=StringLengths.TABLENAME_MAX_LEN) UrlColType = String(length=StringLengths.URL_MAX_LEN) UserNameCamcopsColType = String(length=StringLengths.USERNAME_CAMCOPS_MAX_LEN) UserNameExternalColType = String( length=StringLengths.USERNAME_EXTERNAL_MAX_LEN ) # ============================================================================= # Helper operations for PendulumDateTimeAsIsoTextColType # ============================================================================= # Database string format is e.g. # 2013-07-24T20:04:07.123456+01:00 # 2013-07-24T20:04:07.123+01:00 # 0 1 2 3 } position in string; 1-based # 12345678901234567890123456789012 } # # So: rightmost 6 characters are time zone; rest is date/time. # leftmost 23 characters are time up to millisecond precision. # overall length is typically 29 (milliseconds) or 32 (microseconds) _TZ_LEN = 6 # length of the timezone part of the ISO8601 string _UTC_TZ_LITERAL = "'+00:00'" _SQLITE_DATETIME_FMT_FOR_PYTHON = "'%Y-%m-%d %H:%M:%f'" _MYSQL_DATETIME_LEN = 19 _SQLSERVER_DATETIME_LEN = 19 _SQLSERVER_DATETIME2_LEN = 27 # ----------------------------------------------------------------------------- # isotzdatetime_to_utcdatetime # ----------------------------------------------------------------------------- # noinspection PyPep8Naming
[docs]class isotzdatetime_to_utcdatetime(FunctionElement): """ Used as an SQL operation by :class:`PendulumDateTimeAsIsoTextColType`. Creates an SQL expression wrapping a field containing our ISO-8601 text, making a ``DATETIME`` out of it, in the UTC timezone. Implemented for different SQL dialects. """ type = DateTime() name = "isotzdatetime_to_utcdatetime" inherit_cache = False
# noinspection PyUnusedLocal
[docs]@compiles(isotzdatetime_to_utcdatetime) def isotzdatetime_to_utcdatetime_default( element: "ClauseElement", compiler: "SQLCompiler", **kw: Any ) -> None: """ Default implementation for :class:`isotzdatetime_to_utcdatetime`: fail. """ fail_unknown_dialect(compiler, "perform isotzdatetime_to_utcdatetime")
# noinspection PyUnusedLocal
[docs]@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.MYSQL) def isotzdatetime_to_utcdatetime_mysql( element: "ClauseElement", compiler: "SQLCompiler", **kw: Any ) -> str: """ Implementation of :class:`isotzdatetime_to_utcdatetime` for MySQL. For format, see https://dev.mysql.com/doc/refman/5.5/en/date-and-time-functions.html#function_date-format Note the use of "%i" for minutes. Things after ``func.`` get passed to the database engine as literal SQL functions; https://docs.sqlalchemy.org/en/latest/core/tutorial.html """ x = fetch_processed_single_clause(element, compiler) # Let's do this in a clear way: date_time_part = f"LEFT({x}, LENGTH({x}) - {_TZ_LEN})" # ... drop the rightmost 6 chars (the timezone component) fmt = compiler.process(text("'%Y-%m-%dT%H:%i:%S.%f'")) # ... the text() part deals with the necessary escaping of % for the DBAPI the_date_time = f"STR_TO_DATE({date_time_part}, {fmt})" # ... STR_TO_DATE() returns a DATETIME if the string contains both date and # time components. old_timezone = f"RIGHT({x}, {_TZ_LEN})" result_utc = ( f"CONVERT_TZ({the_date_time}, {old_timezone}, {_UTC_TZ_LITERAL})" ) # log.debug(result_utc) return result_utc
# noinspection PyUnusedLocal
[docs]@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.SQLITE) def isotzdatetime_to_utcdatetime_sqlite( element: "ClauseElement", compiler: "SQLCompiler", **kw: Any ) -> str: """ Implementation of :class:`isotzdatetime_to_utcdatetime` for SQLite. - https://sqlite.org/lang_corefunc.html#substr - https://sqlite.org/lang_datefunc.html - https://www.sqlite.org/lang_expr.html Get an SQL expression for the timezone adjustment in hours. Note that if a time is 12:00+01:00, that means e.g. midday BST, which is 11:00+00:00 or 11:00 UTC. So you SUBTRACT the displayed timezone from the time, which I've always thought is a bit odd. Ha! Was busy implementing this, but SQLite is magic; if there's a timezone at the end, ``STRFTIME()`` will convert it to UTC automatically! Moreover, the format is the OUTPUT format that a Python datetime will recognize, so no 'T'. The output format is like this: ``2018-06-01 00:00:00.000``. Note that SQLite provides millisecond precision only (in general and via the ``%f`` argument to ``STRFTIME``). See also SQLAlchemy's DATETIME support for SQLite: - https://docs.sqlalchemy.org/en/13/dialects/sqlite.html?highlight=sqlite#sqlalchemy.dialects.sqlite.DATETIME ... but that doesn't support timezones, so that doesn't help us. One further problem -- see :class:`camcops_server.tasks.core10.Core10ReportDateRangeTests` -- is that comparisons are done by SQLite as text, so e.g. .. code-block:: sql SELECT '2018-06-01 00:00:00.000' >= '2018-06-01 00:00:00.000000'; -- 0, false SELECT '2018-06-01 00:00:00.000' >= '2018-06-01 00:00:00.000'; -- 1, true and therefore we need to ensure either that the SQLite side gets translated to 6dp, or the bind param gets translated to 3dp. I don't think we can always have control over the bind parameter. So we append '000' to the SQLite side. """ # noqa x = fetch_processed_single_clause(element, compiler) fmt = compiler.process(text(_SQLITE_DATETIME_FMT_FOR_PYTHON)) result = f"(STRFTIME({fmt}, {x}) || '000')" # log.debug(result) return result
# noinspection PyUnusedLocal
[docs]@compiles(isotzdatetime_to_utcdatetime, SqlaDialectName.SQLSERVER) def isotzdatetime_to_utcdatetime_sqlserver( element: "ClauseElement", compiler: "SQLCompiler", **kw: Any ) -> str: """ Implementation of :class:`isotzdatetime_to_utcdatetime` for SQL Server. **Converting strings to DATETIME values** - ``CAST()``: Part of ANSI SQL. - ``CONVERT()``: Not part of ANSI SQL; has some extra formatting options. Both methods work: .. code-block:: sql SELECT CAST('2001-01-31T21:30:49.123' AS DATETIME) AS via_cast, CONVERT(DATETIME, '2001-01-31T21:30:49.123') AS via_convert; ... fine on SQL Server 2005, with milliseconds in both cases. However, going beyond milliseconds doesn't fail gracefully, it causes an error (e.g. "...21:30.49.123456") both for CAST and CONVERT. The ``DATETIME2`` format accepts greater precision, but requires SQL Server 2008 or higher. Then this works: .. code-block:: sql SELECT CAST('2001-01-31T21:30:49.123456' AS DATETIME2) AS via_cast, CONVERT(DATETIME2, '2001-01-31T21:30:49.123456') AS via_convert; So as not to be too optimistic: ``CAST(x AS DATETIME2)`` ignores (silently) any timezone information in the string. So does ``CONVERT(DATETIME2, x, {0 or 1})``. **Converting between time zones** NO TIME ZONE SUPPORT in SQL Server 2005. e.g. https://stackoverflow.com/questions/3200827/how-to-convert-timezones-in-sql-server-2005. .. code-block:: none TODATETIMEOFFSET(expression, time_zone): expression: something that evaluates to a DATETIME2 value time_zone: integer minutes, or string hours/minutes e.g. "+13.00" -> produces a DATETIMEOFFSET value Available from SQL Server 2008 (https://docs.microsoft.com/en-us/sql/t-sql/functions/todatetimeoffset-transact-sql). .. code-block:: none SWITCHOFFSET -> converts one DATETIMEOFFSET value to another, preserving its UTC time, but changing the displayed (local) time zone. ... however, is that unnecessary? We want a plain ``DATETIME2`` in UTC, and .conversion to UTC is automatically achieved by ``CONVERT(DATETIME2, .some_datetimeoffset, 1)`` ... https://stackoverflow.com/questions/4953903/how-can-i-convert-a-sql-server-2008-datetimeoffset-to-a-datetime ... but not by ``CAST(some_datetimeoffset AS DATETIME2)``, and not by ``CONVERT(DATETIME2, some_datetimeoffset, 0)`` ... and styles 0 and 1 are the only ones permissible from SQL Server 2012 and up (empirically, and documented for the reverse direction at https://docs.microsoft.com/en-us/sql/t-sql/functions/cast-and-convert-transact-sql?view=sql-server-2017) ... this is not properly documented re UTC conversion, as far as I can see. Let's use ``SWITCHOFFSET -> CAST`` to be explicit and clear. ``AT TIME ZONE``: From SQL Server 2016 only. https://docs.microsoft.com/en-us/sql/t-sql/queries/at-time-zone-transact-sql?view=sql-server-2017 **Therefore** - We need to require SQL Server 2008 or higher. - Therefore we can use the ``DATETIME2`` type. - Note that ``LEN()``, not ``LENGTH()``, is ANSI SQL; SQL Server only supports ``LEN``. **Example (tested on SQL Server 2014)** .. code-block:: sql DECLARE @source AS VARCHAR(100) = '2001-01-31T21:30:49.123456+07:00'; SELECT CAST( SWITCHOFFSET( TODATETIMEOFFSET( CAST(LEFT(@source, LEN(@source) - 6) AS DATETIME2), RIGHT(@source, 6) ), '+00:00' ) AS DATETIME2 ) -- 2001-01-31 14:30:49.1234560 """ # noqa x = fetch_processed_single_clause(element, compiler) date_time_part = f"LEFT({x}, LEN({x}) - {_TZ_LEN})" # a VARCHAR old_timezone = f"RIGHT({x}, {_TZ_LEN})" # a VARCHAR date_time_no_tz = f"CAST({date_time_part} AS DATETIME2)" # a DATETIME2 date_time_offset_with_old_tz = ( f"TODATETIMEOFFSET({date_time_no_tz}, {old_timezone})" # a DATETIMEOFFSET ) date_time_offset_with_utc_tz = ( f"SWITCHOFFSET({date_time_offset_with_old_tz}, {_UTC_TZ_LITERAL})" # a DATETIMEOFFSET in UTC ) result_utc = f"CAST({date_time_offset_with_utc_tz} AS DATETIME2)" # log.debug(result_utc) return result_utc
# ----------------------------------------------------------------------------- # unknown_field_to_utcdatetime # ----------------------------------------------------------------------------- # noinspection PyPep8Naming
[docs]class unknown_field_to_utcdatetime(FunctionElement): """ Used as an SQL operation by :class:`PendulumDateTimeAsIsoTextColType`. Creates an SQL expression wrapping a field containing something unknown, which might be a ``DATETIME`` or an ISO-formatted field, and making a ``DATETIME`` out of it, in the UTC timezone. Implemented for different SQL dialects. """ type = DateTime() name = "unknown_field_to_utcdatetime" inherit_cache = False
# noinspection PyUnusedLocal
[docs]@compiles(unknown_field_to_utcdatetime) def unknown_field_to_utcdatetime_default( element: "ClauseElement", compiler: "SQLCompiler", **kw: Any ) -> None: """ Default implementation for :class:`unknown_field_to_utcdatetime`: fail. """ fail_unknown_dialect(compiler, "perform unknown_field_to_utcdatetime")
# noinspection PyUnusedLocal
[docs]@compiles(unknown_field_to_utcdatetime, SqlaDialectName.MYSQL) def unknown_field_to_utcdatetime_mysql( element: "ClauseElement", compiler: "SQLCompiler", **kw: Any ) -> str: """ Implementation of :class:`unknown_field_to_utcdatetime` for MySQL. If it's the length of a plain ``DATETIME`` e.g. ``2013-05-30 00:00:00`` (19), leave it as a ``DATETIME``; otherwise convert ISO -> ``DATETIME``. """ x = fetch_processed_single_clause(element, compiler) converted = isotzdatetime_to_utcdatetime_mysql(element, compiler, **kw) result = f"IF(LENGTH({x}) = {_MYSQL_DATETIME_LEN}, {x}, {converted})" # log.debug(result) return result
# noinspection PyUnusedLocal
[docs]@compiles(unknown_field_to_utcdatetime, SqlaDialectName.SQLITE) def unknown_field_to_utcdatetime_sqlite( element: "ClauseElement", compiler: "SQLCompiler", **kw: Any ) -> str: """ Implementation of :class:`unknown_field_to_utcdatetime` for SQLite. """ x = fetch_processed_single_clause(element, compiler) fmt = compiler.process(text(_SQLITE_DATETIME_FMT_FOR_PYTHON)) result = f"STRFTIME({fmt}, {x})" # log.debug(result) return result
# noinspection PyUnusedLocal
[docs]@compiles(unknown_field_to_utcdatetime, SqlaDialectName.SQLSERVER) def unknown_field_to_utcdatetime_sqlserver( element: "ClauseElement", compiler: "SQLCompiler", **kw: Any ) -> str: """ Implementation of :class:`unknown_field_to_utcdatetime` for SQL Server. We should cope also with the possibility of a ``DATETIME2`` field, not just ``DATETIME``. It seems consistent that ``LEN(DATETIME2) = 27``, with precision tenth of a microsecond, e.g. ``2001-01-31 21:30:49.1234567`` (27). So, if it looks like a ``DATETIME`` or a ``DATETIME2``, then we leave it alone; otherwise we put it through our ISO-to-datetime function. Importantly, note that neither ``_SQLSERVER_DATETIME_LEN`` nor ``_SQLSERVER_DATETIME2_LEN`` are the length of any of our ISO strings. """ x = fetch_processed_single_clause(element, compiler) # https://stackoverflow.com/questions/5487892/sql-server-case-when-or-then-else-end-the-or-is-not-supported # noqa converted = isotzdatetime_to_utcdatetime_sqlserver(element, compiler, **kw) result = ( f"CASE WHEN LEN({x}) IN " f"({_SQLSERVER_DATETIME_LEN}, {_SQLSERVER_DATETIME2_LEN}) THEN {x} " f"ELSE {converted} " f"END" ) # log.debug(result) return result
# ============================================================================= # Custom date/time field as ISO-8601 text including timezone, using # pendulum.DateTime on the Python side. # =============================================================================
[docs]class PendulumDateTimeAsIsoTextColType(TypeDecorator): """ Stores date/time values as ISO-8601, in a specific format. Uses Pendulum on the Python side. """ impl = String(length=StringLengths.ISO8601_DATETIME_STRING_MAX_LEN) # ... underlying SQL type cache_ok = False _coltype_name = "PendulumDateTimeAsIsoTextColType" @property def python_type(self) -> type: """ The Python type of the object. """ return Pendulum
[docs] @staticmethod def pendulum_to_isostring(x: PotentialDatetimeType) -> Optional[str]: """ From a Python datetime to an ISO-formatted string in our particular format. """ # https://docs.python.org/3.4/library/datetime.html#strftime-strptime-behavior # noqa x = coerce_to_pendulum(x) try: mainpart = x.strftime( "%Y-%m-%dT%H:%M:%S.%f" ) # microsecond accuracy timezone = x.strftime("%z") # won't have the colon in return mainpart + timezone[:-2] + ":" + timezone[-2:] except AttributeError: return None
[docs] @staticmethod def isostring_to_pendulum(x: Optional[str]) -> Optional[Pendulum]: """ From an ISO-formatted string to a Python Pendulum, with timezone. """ try: return coerce_to_pendulum(x) except (ParserError, ValueError): log.warning("Bad ISO date/time string: {!r}", x) return None
[docs] def process_bind_param( self, value: Optional[Pendulum], dialect: Dialect ) -> Optional[str]: """ Convert parameters on the way from Python to the database. """ retval = self.pendulum_to_isostring(value) if DEBUG_DATETIME_AS_ISO_TEXT: log.debug( "{}.process_bind_param(" "self={!r}, value={!r}, dialect={!r}) -> {!r}", self._coltype_name, self, value, dialect, retval, ) return retval
[docs] def process_literal_param( self, value: Optional[Pendulum], dialect: Dialect ) -> Optional[str]: """ Convert literals on the way from Python to the database. """ retval = self.pendulum_to_isostring(value) if DEBUG_DATETIME_AS_ISO_TEXT: log.debug( "{}.process_literal_param(" "self={!r}, value={!r}, dialect={!r}) -> {!r}", self._coltype_name, self, value, dialect, retval, ) return retval
[docs] def process_result_value( self, value: Optional[str], dialect: Dialect ) -> Optional[Pendulum]: """ Convert things on the way from the database to Python. """ retval = self.isostring_to_pendulum(value) if DEBUG_DATETIME_AS_ISO_TEXT: log.debug( "{}.process_result_value(" "self={!r}, value={!r}, dialect={!r}) -> {!r}", self._coltype_name, self, value, dialect, retval, ) return retval
# noinspection PyPep8Naming
[docs] class comparator_factory(TypeDecorator.Comparator): """ Process SQL for when we are comparing our column, in the database, to something else. We make this dialect-independent by calling functions like .. code-block:: none unknown_field_to_utcdatetime isotzdatetime_to_utcdatetime ... which we then specialize for specific dialects. This function itself does not appear to be able to access any information about the dialect. """
[docs] def operate( self, op: "OperatorType", *other: Any, **kwargs: Any ) -> "ColumnElement[_CT]": assert len(other) == 1 assert not kwargs other = other[0] try: processed_other = convert_datetime_to_utc( coerce_to_pendulum(other) ) # - If you try to call a dialect-specialized FunctionElement, # it processes the clause to "?" (meaning "attach bind # parameter here"); it's not the value itself. # - For our SQLite "milliseconds only" comparator problem (see # above), we can't do very much here without knowing the # dialect. So we make the SQLite side look like it has # microseconds by appending "000"... except (AttributeError, ParserError, TypeError, ValueError): # OK. At this point, "other" could be a plain DATETIME field, # or a PendulumDateTimeAsIsoTextColType field (or potentially # something else that we don't really care about). If it's a # DATETIME, then we assume it is already in UTC. processed_other = unknown_field_to_utcdatetime(other) # type: ignore[assignment] # noqa: E501 if DEBUG_DATETIME_AS_ISO_TEXT: log.debug( "operate(self={!r}, op={!r}, other={!r})", self, op, other ) log.debug("self.expr = {!r}", self.expr) log.debug("processed_other = {!r}", processed_other) # traceback.print_stack() return op(isotzdatetime_to_utcdatetime(self.expr), processed_other)
[docs] def reverse_operate( self, op: "OperatorType", *other: Any, **kwargs: Any ) -> NoReturn: assert False, "I don't think this is ever being called"
# ============================================================================= # Custom duration field as ISO-8601 text, using pendulum.Duration on the Python # side. # =============================================================================
[docs]class PendulumDurationAsIsoTextColType(TypeDecorator): """ Stores time durations as ISO-8601, in a specific format. Uses :class:`pendulum.Duration` on the Python side. """ impl = String(length=StringLengths.ISO8601_DURATION_STRING_MAX_LEN) # ... underlying SQL type cache_ok = False _coltype_name = "PendulumDurationAsIsoTextColType" @property def python_type(self) -> type: """ The Python type of the object. """ return Duration
[docs] @staticmethod def pendulum_duration_to_isostring(x: Optional[Duration]) -> Optional[str]: """ From a :class:`pendulum.Duration` (or ``None``) an ISO-formatted string in our particular format (or ``NULL``). """ if x is None: return None return duration_to_iso( x, permit_years_months=True, minus_sign_at_front=True )
[docs] @staticmethod def isostring_to_pendulum_duration(x: Optional[str]) -> Optional[Duration]: """ From an ISO-formatted string to a Python Pendulum, with timezone. """ if not x: # None (NULL) or blank string return None try: return duration_from_iso(x) except (ISO8601Error, ValueError): log.warning("Bad ISO duration string: {!r}", x) return None
[docs] def process_bind_param( self, value: Optional[Duration], dialect: Dialect ) -> Optional[str]: """ Convert parameters on the way from Python to the database. """ retval = self.pendulum_duration_to_isostring(value) if DEBUG_DURATION_AS_ISO_TEXT: log.debug( "{}.process_bind_param(" "self={!r}, value={!r}, dialect={!r}) -> {!r}", self._coltype_name, self, value, dialect, retval, ) return retval
[docs] def process_literal_param( self, value: Optional[Duration], dialect: Dialect ) -> Optional[str]: """ Convert literals on the way from Python to the database. """ retval = self.pendulum_duration_to_isostring(value) if DEBUG_DURATION_AS_ISO_TEXT: log.debug( "{}.process_literal_param(" "self={!r}, value={!r}, dialect={!r}) -> {!r}", self._coltype_name, self, value, dialect, retval, ) return retval
[docs] def process_result_value( self, value: Optional[str], dialect: Dialect ) -> Optional[Duration]: """ Convert things on the way from the database to Python. """ retval = self.isostring_to_pendulum_duration(value) if DEBUG_DURATION_AS_ISO_TEXT: log.debug( "{}.process_result_value(" "self={!r}, value={!r}, dialect={!r}) -> {!r}", self._coltype_name, self, value, dialect, retval, ) return retval
# No comparator_factory; we do not use SQL to compare ISO durations. # ============================================================================= # Semantic version column type # =============================================================================
[docs]class SemanticVersionColType(TypeDecorator): """ Stores semantic versions in the database. Uses :class:`semantic_version.Version` on the Python side. """ impl = String(length=147) # https://github.com/mojombo/semver/issues/79 cache_ok = False _coltype_name = "SemanticVersionColType" @property def python_type(self) -> type: """ The Python type of the object. """ return Version
[docs] def process_bind_param( self, value: Optional[Version], dialect: Dialect ) -> Optional[str]: """ Convert parameters on the way from Python to the database. """ retval = str(value) if value is not None else None if DEBUG_SEMANTIC_VERSION: log.debug( "{}.process_bind_param(" "self={!r}, value={!r}, dialect={!r}) -> {!r}", self._coltype_name, self, value, dialect, retval, ) return retval
[docs] def process_literal_param( self, value: Optional[Version], dialect: Dialect ) -> Optional[str]: """ Convert literals on the way from Python to the database. """ retval = str(value) if value is not None else None if DEBUG_SEMANTIC_VERSION: log.debug( "{}.process_literal_param(" "self={!r}, value={!r}, dialect={!r}) -> !r", self._coltype_name, self, value, dialect, retval, ) return retval
[docs] def process_result_value( self, value: Optional[str], dialect: Dialect ) -> Optional[Version]: """ Convert things on the way from the database to Python. """ if value is None: retval = None else: # Here we do some slightly fancier conversion to deal with all # sorts of potential rubbish coming in, so we get a properly # ordered Version out: retval = make_version(value) if DEBUG_SEMANTIC_VERSION: log.debug( "{}.process_result_value(" "self={!r}, value={!r}, dialect={!r}) -> {!r}", self._coltype_name, self, value, dialect, retval, ) return retval
''' # noinspection PyPep8Naming class comparator_factory(TypeDecorator.Comparator): """ Process SQL for when we are comparing our column, in the database, to something else. See https://docs.sqlalchemy.org/en/13/core/type_api.html#sqlalchemy.types.TypeEngine.comparator_factory. .. warning:: I'm not sure this is either (a) correct or (b) used; it may produce a string comparison of e.g. ``14.0.0`` versus ``2.0.0``, which will be alphabetical and therefore wrong. Disabled on 2019-04-28. """ # noqa def operate(self, op, *other, **kwargs): assert len(other) == 1 assert not kwargs other = other[0] if isinstance(other, Version): processed_other = str(Version) else: processed_other = other return op(self.expr, processed_other) def reverse_operate(self, op, *other, **kwargs): assert False, "I don't think this is ever being called" '''
# ============================================================================= # IdNumReferenceListColType # =============================================================================
[docs]class IdNumReferenceListColType(TypeDecorator): """ Stores a list of IdNumReference objects. On the database side, uses a comma-separated list of integers. """ impl = Text() _coltype_name = "IdNumReferenceListColType" @property def python_type(self) -> type: """ The Python type of the object. """ return list @staticmethod def _idnumdef_list_to_dbstr( idnumdef_list: Optional[List[IdNumReference]], ) -> str: """ Converts an optional list of :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference` objects to a CSV string suitable for storing in the database. """ if not idnumdef_list: return "" elements = [] # type: List[int] for idnumdef in idnumdef_list: elements.append(idnumdef.which_idnum) elements.append(idnumdef.idnum_value) return ",".join(str(x) for x in elements) @staticmethod def _dbstr_to_idnumdef_list(dbstr: Optional[str]) -> List[IdNumReference]: """ Converts a CSV string (from the database) to a list of :class:`camcops_server.cc_modules.cc_simpleobjects.IdNumReference` objects. """ idnumdef_list = [] # type: List[IdNumReference] try: intlist = [int(numstr) for numstr in dbstr.split(",")] except (AttributeError, TypeError, ValueError): return [] length = len(intlist) if length == 0 or length % 2 != 0: # enforce pairs return [] for which_idnum, idnum_value in chunks(intlist, n=2): if which_idnum < 0 or idnum_value < 0: # enforce positive integers return [] idnumdef_list.append( IdNumReference( which_idnum=which_idnum, idnum_value=idnum_value ) ) return idnumdef_list
[docs] def process_bind_param( self, value: Optional[List[IdNumReference]], dialect: Dialect ) -> str: """ Convert parameters on the way from Python to the database. """ retval = self._idnumdef_list_to_dbstr(value) if DEBUG_IDNUMDEF_LIST: log.debug( "{}.process_bind_param(" "self={!r}, value={!r}, dialect={!r}) -> {!r}", self._coltype_name, self, value, dialect, retval, ) return retval
[docs] def process_literal_param( self, value: Optional[List[IdNumReference]], dialect: Dialect ) -> str: """ Convert literals on the way from Python to the database. """ retval = self._idnumdef_list_to_dbstr(value) if DEBUG_IDNUMDEF_LIST: log.debug( "{}.process_literal_param(" "self={!r}, value={!r}, dialect={!r}) -> !r", self._coltype_name, self, value, dialect, retval, ) return retval
[docs] def process_result_value( self, value: Optional[str], dialect: Dialect ) -> List[IdNumReference]: """ Convert things on the way from the database to Python. """ retval = self._dbstr_to_idnumdef_list(value) if DEBUG_IDNUMDEF_LIST: log.debug( "{}.process_result_value(" "self={!r}, value={!r}, dialect={!r}) -> {!r}", self._coltype_name, self, value, dialect, retval, ) return retval
# ============================================================================= # UUID column type # =============================================================================
[docs]class UuidColType(TypeDecorator): # Based on: # https://docs.sqlalchemy.org/en/13/core/custom_types.html#backend-agnostic-guid-type # noqa: E501 # which will use postgresql UUID if relevant, not doing that here impl = CHAR(32) cache_ok = False @property def python_type(self) -> type: return str
[docs] def process_bind_param( self, value: uuid.UUID, dialect: Dialect ) -> Optional[str]: """ Convert parameters on the way from Python to the database. """ if value is None: return None return "%.32x" % value.int
[docs] def process_result_value( self, value: Optional[str], dialect: Dialect ) -> Optional[uuid.UUID]: """ Convert things on the way from the database to Python. """ if value is None: return None return uuid.UUID(value)
# ============================================================================= # JSON column type # =============================================================================
[docs]class JsonColType(TypeDecorator): # Unlike # https://docs.sqlalchemy.org/en/13/core/type_basics.html#sqlalchemy.types.JSON # does not use vendor-specific JSON type impl = UnicodeText cache_ok = False @property def python_type(self) -> type: return str
[docs] def process_bind_param( self, value: Any, dialect: Dialect ) -> Optional[str]: """ Convert parameters on the way from Python to the database. """ if value is None: return None return json.dumps(value)
[docs] def process_result_value(self, value: str, dialect: Dialect) -> Any: """ Convert things on the way from the database to Python. """ if value is None: return None return json.loads(value)
# ============================================================================= # Phone number column type # =============================================================================
[docs]class PhoneNumberColType(TypeDecorator): impl = Unicode(length=StringLengths.PHONE_NUMBER_MAX_LEN) cache_ok = False @property def python_type(self) -> type: return str
[docs] def process_bind_param( self, value: Any, dialect: Dialect ) -> Optional[str]: """ Convert parameters on the way from Python to the database. """ if value is None: return None return phonenumbers.format_number( value, phonenumbers.PhoneNumberFormat.E164 )
[docs] def process_result_value(self, value: str, dialect: Dialect) -> Any: """ Convert things on the way from the database to Python. """ if not value: return None # Should be stored as E164 so no need to pass a region return phonenumbers.parse(value, None)
# ============================================================================= # PermittedValueChecker: used by camcops_column # =============================================================================
[docs]class PermittedValueChecker(object): """ Represents permitted values (in columns belonging to CamCOPS tasks), and checks a value against them. """
[docs] def __init__( self, not_null: bool = False, minimum: Union[int, float] = None, maximum: Union[int, float] = None, permitted_values: Sequence[Any] = None, ) -> None: """ Args: not_null: must the value not be NULL? minimum: if specified, a numeric minimum value maximum: if specified, a numeric maximum value permitted_values: if specified, a list of permitted values """ self.not_null = not_null self.minimum = minimum self.maximum = maximum self.permitted_values = permitted_values
[docs] def is_ok(self, value: Any) -> bool: """ Does the value pass our tests? """ if value is None: return not self.not_null # If not_null is True, then the value is not OK; return False. # If not_null is False, then a null value passes all other tests. if ( self.permitted_values is not None and value not in self.permitted_values ): return False if self.minimum is not None and value < self.minimum: return False if self.maximum is not None and value > self.maximum: return False return True
[docs] def failure_msg(self, value: Any) -> str: """ Why does the value not pass our tests? """ if value is None: if self.not_null: return "value is None and NULL values are not permitted" else: return "" # value is OK if ( self.permitted_values is not None and value not in self.permitted_values ): return ( f"value {value!r} not in permitted values " f"{self.permitted_values!r}" ) if self.minimum is not None and value < self.minimum: return f"value {value!r} less than minimum of {self.minimum!r}" if self.maximum is not None and value > self.maximum: return f"value {value!r} more than maximum of {self.maximum!r}" return ""
def __repr__(self) -> str: return auto_repr(self)
[docs] def permitted_values_inc_minmax(self) -> Tuple: """ Returns permitted values, either specified directly or via a minimum/maximum. """ if self.permitted_values: return tuple(self.permitted_values) # Take a punt that integer minima/maxima mean that only integers are # permitted... if isinstance(self.minimum, int) and isinstance(self.maximum, int): return tuple(range(self.minimum, self.maximum + 1)) return ()
[docs] def permitted_values_csv(self) -> str: """ Returns a CSV representation of the permitted values. Primarily used for CRIS data dictionaries. """ return ",".join(str(x) for x in self.permitted_values_inc_minmax())
# Specific instances, to reduce object duplication and magic numbers: MIN_ZERO_CHECKER = PermittedValueChecker(minimum=0) BIT_CHECKER = PermittedValueChecker(permitted_values=PV.BIT) ZERO_TO_ONE_CHECKER = PermittedValueChecker(minimum=0, maximum=1) ZERO_TO_TWO_CHECKER = PermittedValueChecker(minimum=0, maximum=2) ZERO_TO_THREE_CHECKER = PermittedValueChecker(minimum=0, maximum=3) ZERO_TO_FOUR_CHECKER = PermittedValueChecker(minimum=0, maximum=4) ZERO_TO_FIVE_CHECKER = PermittedValueChecker(minimum=0, maximum=5) ZERO_TO_SIX_CHECKER = PermittedValueChecker(minimum=0, maximum=6) ZERO_TO_SEVEN_CHECKER = PermittedValueChecker(minimum=0, maximum=7) ZERO_TO_EIGHT_CHECKER = PermittedValueChecker(minimum=0, maximum=8) ZERO_TO_NINE_CHECKER = PermittedValueChecker(minimum=0, maximum=9) ZERO_TO_10_CHECKER = PermittedValueChecker(minimum=0, maximum=10) ZERO_TO_100_CHECKER = PermittedValueChecker(minimum=0, maximum=100) ONE_TO_TWO_CHECKER = PermittedValueChecker(minimum=1, maximum=2) ONE_TO_THREE_CHECKER = PermittedValueChecker(minimum=1, maximum=3) ONE_TO_FOUR_CHECKER = PermittedValueChecker(minimum=1, maximum=4) ONE_TO_FIVE_CHECKER = PermittedValueChecker(minimum=1, maximum=5) ONE_TO_SIX_CHECKER = PermittedValueChecker(minimum=1, maximum=6) ONE_TO_SEVEN_CHECKER = PermittedValueChecker(minimum=1, maximum=7) ONE_TO_EIGHT_CHECKER = PermittedValueChecker(minimum=1, maximum=8) ONE_TO_NINE_CHECKER = PermittedValueChecker(minimum=1, maximum=9) # ============================================================================= # camcops_column: provides extra functions over Column. # ============================================================================= # Column attributes COLATTR_BLOB_RELATIONSHIP_ATTR_NAME = "blob_relationship_attr_name" COLATTR_EXEMPT_FROM_ANONYMISATION = "exempt_from_anonymisation" COLATTR_IDENTIFIES_PATIENT = "identifies_patient" COLATTR_INCLUDE_IN_ANON_STAGING_DB = "include_in_anon_staging_db" COLATTR_IS_BLOB_ID_FIELD = "is_blob_id_field" COLATTR_IS_CAMCOPS_COLUMN = "is_camcops_column" COLATTR_PERMITTED_VALUE_CHECKER = "permitted_value_checker"
[docs]def camcops_column( *args: Any, include_in_anon_staging_db: bool = False, exempt_from_anonymisation: bool = False, identifies_patient: bool = False, is_blob_id_field: bool = False, blob_relationship_attr_name: str = "", permitted_value_checker: PermittedValueChecker = None, **kwargs: Any, ) -> Column[Any]: """ Args: *args: Arguments to the :class:`Column` constructor. include_in_anon_staging_db: Ensure this is marked for inclusion in data dictionaries for an anonymisation staging database. exempt_from_anonymisation: If true: though this field might be text, it is guaranteed not to contain identifiers (e.g. it might contain only predefined disease severity descriptions) and does not require anonymisation. identifies_patient: If true: contains a patient identifier (e.g. name). is_blob_id_field: If true: this field contains a reference (client FK) to the BLOB table. blob_relationship_attr_name: For BLOB ID fields: the name of the associated relationship attribute (which, when accessed, yields the BLOB itself) in the owning class/object. permitted_value_checker: If specified, a :class:`PermittedValueChecker` that allows soft constraints to be specified on the field's contents. (That is, no constraints are specified at the database level, but we can moan if incorrect data are present.) **kwargs: Arguments to the :class:`Column` constructor. """ if is_blob_id_field: assert blob_relationship_attr_name, ( "If specifying a BLOB ID field, must give the attribute name " "of the relationship too" ) info = { COLATTR_IS_CAMCOPS_COLUMN: True, COLATTR_INCLUDE_IN_ANON_STAGING_DB: include_in_anon_staging_db, COLATTR_EXEMPT_FROM_ANONYMISATION: exempt_from_anonymisation, COLATTR_IDENTIFIES_PATIENT: identifies_patient, COLATTR_IS_BLOB_ID_FIELD: is_blob_id_field, COLATTR_BLOB_RELATIONSHIP_ATTR_NAME: blob_relationship_attr_name, COLATTR_PERMITTED_VALUE_CHECKER: permitted_value_checker, } return Column(*args, info=info, **kwargs)
[docs]def mapped_camcops_column( # type: ignore[no-untyped-def] *args, include_in_anon_staging_db: bool = False, exempt_from_anonymisation: bool = False, identifies_patient: bool = False, is_blob_id_field: bool = False, blob_relationship_attr_name: str = "", permitted_value_checker: PermittedValueChecker = None, **kwargs, ) -> MappedColumn[Any]: """ As :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` but returns a python typing-compatible MappedColumn. Args: *args: Arguments to the :class:`Column` constructor. include_in_anon_staging_db: Ensure this is marked for inclusion in data dictionaries for an anonymisation staging database. exempt_from_anonymisation: If true: though this field might be text, it is guaranteed not to contain identifiers (e.g. it might contain only predefined disease severity descriptions) and does not require anonymisation. identifies_patient: If true: contains a patient identifier (e.g. name). is_blob_id_field: If true: this field contains a reference (client FK) to the BLOB table. blob_relationship_attr_name: For BLOB ID fields: the name of the associated relationship attribute (which, when accessed, yields the BLOB itself) in the owning class/object. permitted_value_checker: If specified, a :class:`PermittedValueChecker` that allows soft constraints to be specified on the field's contents. (That is, no constraints are specified at the database level, but we can moan if incorrect data are present.) **kwargs: Arguments to the :class:`Column` constructor. """ if is_blob_id_field: assert blob_relationship_attr_name, ( "If specifying a BLOB ID field, must give the attribute name " "of the relationship too" ) info = { COLATTR_IS_CAMCOPS_COLUMN: True, COLATTR_INCLUDE_IN_ANON_STAGING_DB: include_in_anon_staging_db, COLATTR_EXEMPT_FROM_ANONYMISATION: exempt_from_anonymisation, COLATTR_IDENTIFIES_PATIENT: identifies_patient, COLATTR_IS_BLOB_ID_FIELD: is_blob_id_field, COLATTR_BLOB_RELATIONSHIP_ATTR_NAME: blob_relationship_attr_name, COLATTR_PERMITTED_VALUE_CHECKER: permitted_value_checker, } return mapped_column(*args, info=info, **kwargs)
# ============================================================================= # Operate on Column/MappedColumn properties # =============================================================================
[docs]def gen_columns_matching_attrnames( # type: ignore[no-untyped-def] obj, attrnames: List[str] ) -> Generator[Tuple[str, Column], None, None]: """ Find columns of an SQLAlchemy ORM object whose attribute names match a list. Args: obj: SQLAlchemy ORM object to inspect attrnames: attribute names Yields: ``attrname, column`` tuples """ for attrname, column in gen_columns(obj): if attrname in attrnames: yield attrname, column
[docs]def gen_camcops_columns( # type: ignore[no-untyped-def] obj, ) -> Generator[Tuple[str, Column], None, None]: """ Finds all columns of an object that are :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` columns. Args: obj: SQLAlchemy ORM object to inspect Yields: ``attrname, column`` tuples """ for attrname, column in gen_columns(obj): if column.info.get(COLATTR_IS_CAMCOPS_COLUMN, False): yield attrname, column
[docs]def gen_camcops_blob_columns( # type: ignore[no-untyped-def] obj, ) -> Generator[Tuple[str, Column], None, None]: """ Finds all columns of an object that are :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` columns referencing the BLOB table. Args: obj: SQLAlchemy ORM object to inspect Yields: ``attrname, column`` tuples """ for attrname, column in gen_camcops_columns(obj): if column.info.get(COLATTR_IS_BLOB_ID_FIELD, False): if attrname != column.name: log.warning( "BLOB field where attribute name {!r} != SQL " "column name {!r}", attrname, column.name, ) yield attrname, column
[docs]def get_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def] """ Get a list of column attribute names from an SQLAlchemy ORM object. """ return [attrname for attrname, _ in gen_columns(obj)]
[docs]def get_camcops_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501 """ Get a list of :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` column attribute names from an SQLAlchemy ORM object. """ return [attrname for attrname, _ in gen_camcops_columns(obj)]
[docs]def get_camcops_blob_column_attr_names(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501 """ Get a list of :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` BLOB column attribute names from an SQLAlchemy ORM object. """ return [attrname for attrname, _ in gen_camcops_blob_columns(obj)]
[docs]def permitted_value_failure_msgs(obj) -> List[str]: # type: ignore[no-untyped-def] # noqa: E501 """ Checks a SQLAlchemy ORM object instance against its permitted value checks (via its :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column` columns), if it has any. Returns a list of failure messages (empty list means all OK). If you just want to know whether it passes, a quicker way is via :func:`permitted_values_ok`. """ failure_msgs = [] for attrname, camcops_column_ in gen_camcops_columns(obj): pv_checker = camcops_column_.info.get( COLATTR_PERMITTED_VALUE_CHECKER ) # type: Optional[PermittedValueChecker] if pv_checker is None: continue value = getattr(obj, attrname) failure_msg = pv_checker.failure_msg(value) if failure_msg: failure_msgs.append(f"Invalid value for {attrname}: {failure_msg}") return failure_msgs
[docs]def permitted_values_ok(obj) -> bool: # type: ignore[no-untyped-def] """ Checks whether an instance passes its permitted value checks, if it has any. If you want to know why it failed, see :func:`permitted_value_failure_msgs`. """ for attrname, camcops_column_ in gen_camcops_columns(obj): pv_checker = camcops_column_.info.get( COLATTR_PERMITTED_VALUE_CHECKER ) # type: Optional[PermittedValueChecker] if pv_checker is None: continue value = getattr(obj, attrname) if not pv_checker.is_ok(value): return False return True
[docs]def gen_ancillary_relationships( # type: ignore[no-untyped-def] obj, ) -> Generator[ Tuple[str, RelationshipProperty, Type["GenericTabletRecordMixin"]], None, None, ]: """ For an SQLAlchemy ORM object, yields tuples of ``attrname, relationship_property, related_class`` for all relationships that are marked as a CamCOPS ancillary relationship. """ for attrname, rel_prop, related_class in gen_relationships(obj): if rel_prop.info.get(RelationshipInfo.IS_ANCILLARY, None) is True: yield attrname, rel_prop, related_class
[docs]def gen_blob_relationships( # type: ignore[no-untyped-def] obj, ) -> Generator[ Tuple[str, RelationshipProperty, Type["GenericTabletRecordMixin"]], None, None, ]: """ For an SQLAlchemy ORM object, yields tuples of ``attrname, relationship_property, related_class`` for all relationships that are marked as a CamCOPS BLOB relationship. """ for attrname, rel_prop, related_class in gen_relationships(obj): if rel_prop.info.get(RelationshipInfo.IS_BLOB, None) is True: yield attrname, rel_prop, related_class
# ============================================================================= # Specializations of camcops_column to save typing # ============================================================================= def bool_column(name: str, *args: Any, **kwargs: Any) -> Column[bool]: type_arg = _get_bool_column_args(name, kwargs) return camcops_column(name, type_arg, *args, **kwargs) def mapped_bool_column( name: str, *args: Any, **kwargs: Any ) -> MappedColumn[bool]: type_arg = _get_bool_column_args(name, kwargs) return mapped_camcops_column(name, type_arg, *args, **kwargs) def _get_bool_column_args(name: str, kwargs: dict[str, Any]) -> Boolean: constraint_name = kwargs.pop( "constraint_name", None ) # type: Optional[str] if constraint_name: constraint_name_conv = conv(constraint_name) # ... see help for ``conv`` else: constraint_name_conv = None type_arg = Boolean(name=constraint_name_conv) # The "name" parameter to Boolean() specifies the name of the # (0, 1) constraint. kwargs[COLATTR_PERMITTED_VALUE_CHECKER] = BIT_CHECKER if not constraint_name and len(name) >= LONG_COLUMN_NAME_WARNING_LIMIT: log.warning( "bool_column with long column name and no constraint name: {!r}", name, ) return type_arg