Source code for camcops_server.cc_modules.cc_audit

#!/usr/bin/env python

"""
camcops_server/cc_modules/cc_audit.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Auditing.**

The Big Brother part.

"""

from typing import TYPE_CHECKING

from sqlalchemy.orm import relationship
from sqlalchemy.sql.schema import Column, ForeignKey
from sqlalchemy.sql.sqltypes import DateTime, Integer, UnicodeText

from camcops_server.cc_modules.cc_sqla_coltypes import (
    AuditSourceColType,
    IPAddressColType,
    TableNameColType,
)
from camcops_server.cc_modules.cc_sqlalchemy import Base

if TYPE_CHECKING:
    from camcops_server.cc_modules.cc_request import CamcopsRequest


MAX_AUDIT_STRING_LENGTH = 65000


# =============================================================================
# AuditEntry
# =============================================================================


[docs]class AuditEntry(Base): """ An entry in the audit table. """ __tablename__ = "_security_audit" id = Column( "id", Integer, primary_key=True, autoincrement=True, index=True, comment="Arbitrary primary key", ) when_access_utc = Column( "when_access_utc", DateTime, nullable=False, index=True, comment="Date/time of access (UTC)", ) source = Column( "source", AuditSourceColType, nullable=False, comment="Source (e.g. tablet, webviewer)", ) remote_addr = Column( "remote_addr", IPAddressColType, comment="IP address of the remote computer", ) user_id = Column( "user_id", Integer, ForeignKey("_security_users.id"), comment="ID of user, where applicable", ) user = relationship("User") device_id = Column( "device_id", Integer, ForeignKey("_security_devices.id"), comment="Device ID, where applicable", ) device = relationship("Device") table_name = Column( "table_name", TableNameColType, comment="Table involved, where applicable", ) server_pk = Column( "server_pk", Integer, comment="Server PK (table._pk), where applicable" ) patient_server_pk = Column( "patient_server_pk", Integer, comment="Server PK of the patient (patient._pk) concerned, or " "NULL if not applicable", ) details = Column( "details", UnicodeText, comment="Details of the access" ) # in practice, has 65,535 character limit and isn't Unicode.
# See MAX_AUDIT_STRING_LENGTH above. # ============================================================================= # Audit function # =============================================================================
[docs]def audit( req: "CamcopsRequest", details: str, patient_server_pk: int = None, table: str = None, server_pk: int = None, device_id: int = None, remote_addr: str = None, user_id: int = None, from_console: bool = False, from_dbclient: bool = False, ) -> None: """ Write an entry to the audit log. """ dbsession = req.dbsession if not remote_addr: remote_addr = req.remote_addr if req else None if user_id is None: user_id = req.user_id if from_console: source = "console" elif from_dbclient: source = "tablet" else: source = "webviewer" now = req.now_utc if details and len(details) > MAX_AUDIT_STRING_LENGTH: details = details[:MAX_AUDIT_STRING_LENGTH] # noinspection PyTypeChecker entry = AuditEntry( when_access_utc=now, source=source, remote_addr=remote_addr, user_id=user_id, device_id=device_id, table_name=table, server_pk=server_pk, patient_server_pk=patient_server_pk, details=details, ) dbsession.add(entry)