Source code for camcops_server.camcops_server

#!/usr/bin/env python

"""
camcops_server/camcops_server.py

===============================================================================

    Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
    Created by Rudolf Cardinal (rnc1001@cam.ac.uk).

    This file is part of CamCOPS.

    CamCOPS is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    CamCOPS is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.

===============================================================================

**Command-line entry point for the CamCOPS server.**

"""

from argparse import ArgumentParser
import linecache
import logging
import os
import pprint
import sys
import tracemalloc
from typing import Dict, List, Optional, Type, TYPE_CHECKING

from cardinal_pythonlib.argparse_func import (
    ShowAllSubparserHelpAction,
    MapType,
    nonnegative_int,
)
from cardinal_pythonlib.debugging import pdb_run
from cardinal_pythonlib.docker import running_under_docker
from cardinal_pythonlib.logs import (
    BraceStyleAdapter,
    main_only_quicksetup_rootlogger,
    print_report_on_all_logs,
    set_level_for_logger_and_its_handlers,
)
from cardinal_pythonlib.process import launch_external_file
from cardinal_pythonlib.sqlalchemy.dialect import (
    ALL_SQLA_DIALECTS,
    SqlaDialectName,
)
from cardinal_pythonlib.wsgi.constants import WsgiEnvVar
from cardinal_pythonlib.wsgi.reverse_proxied_mw import ReverseProxiedConfig
from rich_argparse import (
    ArgumentDefaultsRichHelpFormatter,
    RawDescriptionRichHelpFormatter,
)

from camcops_server.cc_modules.cc_baseconstants import (
    ENVVAR_CONFIG_FILE,
    EXIT_SUCCESS,
    EXIT_FAILURE,
    DOCUMENTATION_URL,
)
from camcops_server.cc_modules.cc_config import (
    CamcopsConfig,
    get_default_config_from_os_env,
    get_demo_apache_config,
    get_demo_config,
    get_demo_supervisor_config,
)
from camcops_server.cc_modules.cc_constants import (
    CAMCOPS_URL,
    DEFAULT_FLOWER_ADDRESS,
    DEFAULT_FLOWER_PORT,
)
from camcops_server.cc_modules.cc_pythonversion import (
    assert_minimum_python_version,
)
from camcops_server.cc_modules.cc_snomed import send_athena_icd_snomed_to_xml
from camcops_server.cc_modules.cc_version import CAMCOPS_SERVER_VERSION

if TYPE_CHECKING:
    # noinspection PyProtectedMember,PyUnresolvedReferences
    from argparse import _SubParsersAction
    from pyramid.router import Router

log = BraceStyleAdapter(logging.getLogger(__name__))


# =============================================================================
# Check Python version (the shebang is not a guarantee)
# =============================================================================

assert_minimum_python_version()


# =============================================================================
# Debugging options
# =============================================================================

DEBUG_LOG_CONFIG = False
DEBUG_RUN_WITH_PDB = False
DEBUG_MEMORY_ALLOCATION = False

if any([DEBUG_LOG_CONFIG, DEBUG_RUN_WITH_PDB, DEBUG_MEMORY_ALLOCATION]):
    log.warning("Debugging options enabled!")


# =============================================================================
# Simple command-line functions
# =============================================================================


[docs]def launch_manual() -> None: """ Use the operating system "launch something" tool to show the CamCOPS documentation. """ launch_external_file(DOCUMENTATION_URL)
# ============================================================================= # Stub command-line functions requiring more substantial imports # ============================================================================= # ----------------------------------------------------------------------------- # Database # ----------------------------------------------------------------------------- def _upgrade_database_to_head( show_sql_only: bool, reindex: bool = False ) -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects from camcops_server.cc_modules.cc_alembic import ( upgrade_database_to_head, ) # delayed import upgrade_database_to_head(show_sql_only=show_sql_only) if reindex and not show_sql_only: cfg = get_default_config_from_os_env() core.reindex(cfg) def _upgrade_database_to_revision( revision: str, show_sql_only: bool = False ) -> None: import camcops_server.camcops_server_core as core # noqa: F401 # ... delayed import; import side effects from camcops_server.cc_modules.cc_alembic import ( upgrade_database_to_revision, ) # delayed import upgrade_database_to_revision( revision=revision, show_sql_only=show_sql_only ) def _downgrade_database_to_revision( revision: str, show_sql_only: bool = False, confirm_downgrade_db: bool = False, ) -> None: import camcops_server.camcops_server_core as core # noqa: F401 # ... delayed import; import side effects from camcops_server.cc_modules.cc_alembic import ( downgrade_database_to_revision, ) # delayed import downgrade_database_to_revision( revision=revision, show_sql_only=show_sql_only, confirm_downgrade_db=confirm_downgrade_db, ) def _add_dummy_data( cfg: "CamcopsConfig", confirm_add_dummy_data: bool = False ) -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.add_dummy_data(cfg, confirm_add_dummy_data=confirm_add_dummy_data) def _create_database_from_scratch(cfg: "CamcopsConfig") -> None: import camcops_server.camcops_server_core as core # noqa: F401 # ... delayed import; import side effects from camcops_server.cc_modules.cc_alembic import ( create_database_from_scratch, ) # delayed import create_database_from_scratch(cfg=cfg) def _print_database_title() -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.print_database_title() def _show_database_schema( schemastem: str, make_image: bool = False, java: str = None, plantuml: str = None, height_width_limit: int = None, java_memory_limit_mb: int = None, ) -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.show_database_schema( schemastem=schemastem, make_image=make_image, java=java, plantuml=plantuml, height_width_limit=height_width_limit, java_memory_limit_mb=java_memory_limit_mb, ) def _merge_camcops_db( src: str, echo: bool, report_every: int, dummy_run: bool, info_only: bool, # skip_export_logs: bool, # skip_audit_logs: bool, default_group_id: Optional[int], default_group_name: Optional[str], groupnum_map: Dict[int, int], whichidnum_map: Dict[int, int], ) -> None: import camcops_server.camcops_server_core as core # noqa: F401 # ... delayed import; import side effects from camcops_server.cc_modules.merge_db import ( merge_camcops_db, ) # delayed import merge_camcops_db( src=src, echo=echo, report_every=report_every, dummy_run=dummy_run, info_only=info_only, # skip_export_logs=skip_export_logs, # skip_audit_logs=skip_audit_logs, default_group_id=default_group_id, default_group_name=default_group_name, groupnum_map=groupnum_map, whichidnum_map=whichidnum_map, ) def _get_all_ddl(dialect_name: str = SqlaDialectName.MYSQL) -> str: import camcops_server.camcops_server_core as core # noqa: F401 # ... delayed import; import side effects from camcops_server.cc_modules.cc_sqlalchemy import ( get_all_ddl, ) # delayed import return get_all_ddl(dialect_name=dialect_name) def _reindex(cfg: CamcopsConfig) -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.reindex(cfg=cfg) def _check_index(cfg: CamcopsConfig, show_all_bad: bool = False) -> bool: import camcops_server.camcops_server_core as core # ... delayed import; import side effects return core.check_index(cfg=cfg, show_all_bad=show_all_bad) # ----------------------------------------------------------------------------- # Users # ----------------------------------------------------------------------------- def _make_superuser(username: str = None, password: str = None) -> bool: import camcops_server.camcops_server_core as core # ... delayed import; import side effects return core.make_superuser(username=username, password=password) def _reset_password(username: str = None) -> bool: import camcops_server.camcops_server_core as core # ... delayed import; import side effects return core.reset_password(username=username) def _enable_user_cli(username: str = None) -> bool: import camcops_server.camcops_server_core as core # ... delayed import; import side effects return core.enable_user_cli(username=username) # ----------------------------------------------------------------------------- # Export # ----------------------------------------------------------------------------- def _cmd_export( recipient_names: List[str] = None, all_recipients: bool = False, via_index: bool = True, schedule_via_backend: bool = False, ) -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects return core.cmd_export( recipient_names=recipient_names, all_recipients=all_recipients, via_index=via_index, schedule_via_backend=schedule_via_backend, ) def _cmd_show_export_queue( recipient_names: List[str] = None, all_recipients: bool = False, via_index: bool = True, pretty: bool = False, debug_show_fhir: bool = False, debug_fhir_include_docs: bool = False, ) -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.cmd_show_export_queue( recipient_names=recipient_names, all_recipients=all_recipients, via_index=via_index, pretty=pretty, debug_show_fhir=debug_show_fhir, debug_fhir_include_docs=debug_fhir_include_docs, ) def _cmd_crate_dd(filename: str, recipient_name: str) -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.make_data_dictionary( filename=filename, recipient_name=recipient_name, cris=False ) def _cmd_cris_dd(filename: str, recipient_name: str) -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.make_data_dictionary( filename=filename, recipient_name=recipient_name, cris=True ) # ----------------------------------------------------------------------------- # Web server # -----------------------------------------------------------------------------
[docs]def make_wsgi_app_from_config() -> "Router": """ Reads the config file and creates a WSGI application. """ import camcops_server.camcops_server_core as core # ... delayed import; import side effects cfg = get_default_config_from_os_env() reverse_proxied_config = ReverseProxiedConfig( trusted_proxy_headers=cfg.trusted_proxy_headers, http_host=cfg.proxy_http_host, remote_addr=cfg.proxy_remote_addr, script_name=( cfg.proxy_script_name or os.environ.get(WsgiEnvVar.SCRIPT_NAME, "") ), server_port=cfg.proxy_server_port, server_name=cfg.proxy_server_name, url_scheme=cfg.proxy_url_scheme, rewrite_path_info=cfg.proxy_rewrite_path_info, ) return core.make_wsgi_app( debug_toolbar=cfg.debug_toolbar, reverse_proxied_config=reverse_proxied_config, debug_reverse_proxy=cfg.debug_reverse_proxy, show_requests=cfg.show_requests, show_request_immediately=cfg.show_request_immediately, show_response=cfg.show_response, show_timing=cfg.show_timing, static_cache_duration_s=cfg.static_cache_duration_s, )
def _test_serve_pyramid() -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects application = make_wsgi_app_from_config() cfg = get_default_config_from_os_env() core.test_serve_pyramid( application=application, host=cfg.host, port=cfg.port ) def _serve_cherrypy() -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects application = make_wsgi_app_from_config() cfg = get_default_config_from_os_env() core.serve_cherrypy( application=application, host=cfg.host, port=cfg.port, unix_domain_socket_filename=cfg.unix_domain_socket, threads_start=cfg.cherrypy_threads_start, threads_max=cfg.cherrypy_threads_max, server_name=cfg.cherrypy_server_name, log_screen=cfg.cherrypy_root_path, ssl_certificate=cfg.ssl_certificate, ssl_private_key=cfg.ssl_private_key, root_path=cfg.cherrypy_root_path, ) def _serve_gunicorn() -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects application = make_wsgi_app_from_config() cfg = get_default_config_from_os_env() core.serve_gunicorn( application=application, host=cfg.host, port=cfg.port, unix_domain_socket_filename=cfg.unix_domain_socket, num_workers=cfg.gunicorn_num_workers, ssl_certificate=cfg.ssl_certificate, ssl_private_key=cfg.ssl_private_key, reload=cfg.gunicorn_debug_reload, timeout_s=cfg.gunicorn_timeout_s, debug_show_gunicorn_options=cfg.debug_show_gunicorn_options, ) # ----------------------------------------------------------------------------- # Celery etc. # ----------------------------------------------------------------------------- def _launch_celery_workers(verbose: bool = False) -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.launch_celery_workers(verbose=verbose) def _launch_celery_beat(verbose: bool = False) -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.launch_celery_beat(verbose=verbose) def _launch_celery_flower( address: str = DEFAULT_FLOWER_ADDRESS, port: int = DEFAULT_FLOWER_PORT ) -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.launch_celery_flower(address=address, port=port) def _housekeeping() -> None: from camcops_server.cc_modules.celery import housekeeping # delayed import housekeeping() def _purge_jobs() -> None: from camcops_server.cc_modules.celery import purge_jobs # delayed import purge_jobs() # ----------------------------------------------------------------------------- # Testing and development # ----------------------------------------------------------------------------- def _dev_cli() -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.dev_cli() def _list_tasks() -> None: import camcops_server.camcops_server_core as core # ... delayed import; import side effects core.print_tasklist() # ============================================================================= # Command-line processor # ============================================================================= _REQNAMED = "required named arguments" # noinspection PyShadowingBuiltins
[docs]def add_sub( sp: "_SubParsersAction", cmd: str, config_mandatory: Optional[bool] = False, description: str = None, help: str = None, ) -> ArgumentParser: """ Adds (and returns) a subparser to an ArgumentParser. Args: sp: the ``_SubParsersAction`` object from a call to :func:`argparse.ArgumentParser.add_subparsers`. cmd: the command for the subparser (e.g. ``docs`` to make the command ``camcops docs``). config_mandatory: Does this subcommand require a CamCOPS config file? ``None`` = don't ask for config. ``False`` = ask for it, but not mandatory as a command-line argument. ``True`` = mandatory as a command-line argument. description: Used for the description in the detailed help, e.g. "camcops docs --help". Defaults to the value of the ``help`` argument. help: Used for this subparser's contribution to the main help summary, i.e. ``camcops --help``. Returns: the subparser """ if description is None: description = help subparser = sp.add_parser( cmd, help=help, description=description, formatter_class=ArgumentDefaultsRichHelpFormatter, ) # type: ArgumentParser # This needs to be in the top-level parser and the sub-parsers (it does not # appear in the subparsers just because it's in the top-level parser, which # sounds like an argparse bug given its help, but there you go). subparser.add_argument( "-v", "--verbose", action="store_true", help="Be verbose" ) # Config file handling if config_mandatory: # True cfg_help = "Configuration file" else: # None, False cfg_help = ( f"Configuration file (if not specified, the environment" f" variable {ENVVAR_CONFIG_FILE} is checked)" ) if config_mandatory is None: # None pass elif config_mandatory: # True g = subparser.add_argument_group(_REQNAMED) # https://stackoverflow.com/questions/24180527/argparse-required-arguments-listed-under-optional-arguments # noqa g.add_argument("--config", required=True, help=cfg_help) else: # False subparser.add_argument("--config", help=cfg_help) return subparser
# noinspection PyShadowingBuiltins
[docs]def add_req_named( sp: ArgumentParser, switch: str, help: str, action: str = None, type: Type = None, ) -> None: """ Adds a required but named argument. This is a bit unconventional; for example, making the ``--config`` option mandatory even though ``--`` is usually a prefix for optional arguments. Args: sp: the :class:`ArgumentParser` to add to switch: passed to :func:`add_argument` help: passed to :func:`add_argument` action: passed to :func:`add_argument` type: passed to :func:`add_argument` """ # noinspection PyProtectedMember reqgroup = next( (g for g in sp._action_groups if g.title == _REQNAMED), None ) if not reqgroup: reqgroup = sp.add_argument_group(_REQNAMED) kwargs = dict(required=True, help=help) if action: assert type is None, "Don't specify action and type" kwargs["action"] = action elif type: kwargs["type"] = type reqgroup.add_argument(switch, **kwargs)
[docs]def camcops_main() -> int: """ Primary command-line entry point. Parse command-line arguments and act. Note that we can't easily use delayed imports to speed up the help output, because the help system has function calls embedded into it. """ # Fetch command-line options. # ------------------------------------------------------------------------- # Base parser # ------------------------------------------------------------------------- # noinspection PyTypeChecker parser = ArgumentParser( description=( f"CamCOPS server, created by Rudolf Cardinal; version " f"{CAMCOPS_SERVER_VERSION}.\n" f"Use 'camcops_server <COMMAND> --help' for more detail on each " f"command." ), formatter_class=RawDescriptionRichHelpFormatter, # add_help=False # only do this if manually overriding the method ) # ------------------------------------------------------------------------- # Common arguments # ------------------------------------------------------------------------- # noinspection PyTypeChecker parser.add_argument( "--allhelp", action=ShowAllSubparserHelpAction, help="show help for all commands and exit", ) parser.add_argument( "--version", action="version", version=f"CamCOPS {CAMCOPS_SERVER_VERSION}", ) parser.add_argument( "-v", "--verbose", action="store_true", help="Be verbose" ) parser.add_argument( "--no_log", action="store_true", help="Disable log (stderr) entirely." ) # ------------------------------------------------------------------------- # Subcommand subparser # ------------------------------------------------------------------------- subparsers = parser.add_subparsers( title="commands", description="Valid CamCOPS commands are as follows.", help="Specify one command.", dest="command", # sorts out the help for the command being mandatory # https://stackoverflow.com/questions/23349349/argparse-with-required-subparser # noqa ) # type: _SubParsersAction subparsers.required = True # requires a command # You can't use "add_subparsers" more than once. # Subparser groups seem not yet to be supported: # https://bugs.python.org/issue9341 # https://bugs.python.org/issue14037 # ------------------------------------------------------------------------- # "Getting started" commands # ------------------------------------------------------------------------- # Launch documentation docs_parser = add_sub( subparsers, "docs", config_mandatory=None, help="Launch the main documentation (CamCOPS manual)", ) docs_parser.set_defaults(func=lambda args: launch_manual()) # Print demo CamCOPS config democonfig_parser = add_sub( subparsers, "demo_camcops_config", config_mandatory=None, help="Print a demo CamCOPS config file", ) democonfig_parser.add_argument( "--docker", action="store_true", help="Use settings for Docker" ) democonfig_parser.set_defaults( func=lambda args: print_demo_camcops_config(docker=args.docker) ) # Print demo supervisor config demosupervisorconf_parser = add_sub( subparsers, "demo_supervisor_config", config_mandatory=None, help="Print a demo 'supervisor' config file for CamCOPS", ) demosupervisorconf_parser.set_defaults( func=lambda args: print_demo_supervisor_config() ) # Print demo Apache config section demoapacheconf_parser = add_sub( subparsers, "demo_apache_config", config_mandatory=None, help="Print a demo Apache config file section for CamCOPS", ) demoapacheconf_parser.set_defaults( func=lambda args: print_demo_apache_config(rootpath=args.path) ) demoapacheconf_parser.add_argument( "--path", default="", help=( "Path where CamCOPS should appear on the server " "e.g. 'camcops' for https://example.com/camcops" ), ) # ------------------------------------------------------------------------- # Database commands # ------------------------------------------------------------------------- # Upgrade database upgradedb_parser = add_sub( subparsers, "upgrade_db", config_mandatory=True, help="Upgrade database to most recent version (via Alembic)", ) upgradedb_parser.add_argument( "--show_sql_only", action="store_true", help="Show SQL only (to stdout); don't execute it", ) upgradedb_parser.add_argument( "--no_reindex", action="store_true", help="Don't recreate the task index", ) upgradedb_parser.set_defaults( func=lambda args: _upgrade_database_to_head( show_sql_only=args.show_sql_only, reindex=not args.no_reindex ) ) # Developer: upgrade database to a specific revision dev_upgrade_db_parser = add_sub( subparsers, "dev_upgrade_db", config_mandatory=True, help="(DEVELOPER OPTION ONLY.) Upgrade a database to " "a specific revision.", ) dev_upgrade_db_parser.add_argument( "--destination_db_revision", type=str, required=True, help="The target database revision", ) dev_upgrade_db_parser.add_argument( "--show_sql_only", action="store_true", help="Show SQL only (to stdout); don't execute it", ) dev_upgrade_db_parser.set_defaults( func=lambda args: _upgrade_database_to_revision( revision=args.destination_db_revision, show_sql_only=args.show_sql_only, ) ) # Developer: downgrade database dev_downgrade_parser = add_sub( subparsers, "dev_downgrade_db", config_mandatory=True, help="(DEVELOPER OPTION ONLY.) Downgrades a database to " "a specific revision. May DESTROY DATA.", ) dev_downgrade_parser.add_argument( "--destination_db_revision", type=str, required=True, help="The target database revision", ) dev_downgrade_parser.add_argument( "--confirm_downgrade_db", action="store_true", help="Must specify this too, as a safety measure", ) dev_downgrade_parser.add_argument( "--show_sql_only", action="store_true", help="Show SQL only (to stdout); don't execute it", ) dev_downgrade_parser.set_defaults( func=lambda args: _downgrade_database_to_revision( revision=args.destination_db_revision, show_sql_only=args.show_sql_only, confirm_downgrade_db=args.confirm_downgrade_db, ) ) # Developer: create dummy database dummy_database_parser = add_sub( subparsers, "dev_add_dummy_data", config_mandatory=True, help="(DEVELOPER OPTION ONLY.) Populates the database with " "a set of dummy patients and tasks for testing.", ) dummy_database_parser.add_argument( "--confirm_add_dummy_data", action="store_true", help="Must specify this too, as a safety measure", ) dummy_database_parser.set_defaults( func=lambda args: _add_dummy_data( cfg=get_default_config_from_os_env(), confirm_add_dummy_data=args.confirm_add_dummy_data, ) ) # Show database title showdbtitle_parser = add_sub( subparsers, "show_db_title", help="Show database title" ) showdbtitle_parser.set_defaults(func=lambda args: _print_database_title()) # Show database schema showdbschema_parser = add_sub( subparsers, "show_db_schema", help="Show the database schema as PlantUML +/- PNG", ) showdbschema_parser.add_argument( "--schemastem", default="schema", help="Stem for output filenames (for schema diagrams); " "'.plantuml' and '.png' are appended", ) showdbschema_parser.add_argument( "--make_image", action="store_true", help="Create a PNG image (impractically large!)", ) showdbschema_parser.add_argument( "--java", default="java", help="Java executable (for schema diagrams)" ) showdbschema_parser.add_argument( "--plantuml", default="plantuml.jar", help="PlantUML Java .jar file (for schema diagrams)", ) showdbschema_parser.add_argument( "--height_width_limit", type=int, default=20000, help="Maximum image height/width in pixels", ) showdbschema_parser.add_argument( "--java_memory_limit_mb", type=int, default=2048, help="Java virtual machine memory limit in Mb", ) showdbschema_parser.set_defaults( func=lambda args: _show_database_schema( schemastem=args.schemastem, make_image=args.make_image, java=args.java, plantuml=args.plantuml, height_width_limit=args.height_width_limit, java_memory_limit_mb=args.java_memory_limit_mb, ) ) # Merge in data fom another database # noinspection PyTypeChecker int_int_mapper = MapType( from_type=nonnegative_int, to_type=nonnegative_int ) mergedb_parser = add_sub( subparsers, "merge_db", config_mandatory=True, help="Merge in data from an old or recent CamCOPS database", ) mergedb_parser.add_argument( "--report_every", type=int, default=10000, help="Report progress every n rows", ) mergedb_parser.add_argument( "--echo", action="store_true", help="Echo SQL to source database" ) mergedb_parser.add_argument( "--dummy_run", action="store_true", help="Perform a dummy run only; do not alter destination database", ) mergedb_parser.add_argument( "--info_only", action="store_true", help="Show table information only; don't do any work", ) # mergedb_parser.add_argument( # "--skip_export_logs", action="store_true", # help="Skip the export log tables") # mergedb_parser.add_argument( # "--skip_audit_logs", action="store_true", # help="Skip the audit log table") mergedb_parser.add_argument( "--default_group_id", type=int, default=None, help="Default group ID (integer) to apply to old records without one. " "If none is specified, a new group will be created for such " "records.", ) mergedb_parser.add_argument( "--default_group_name", type=str, default=None, help="If default_group_id is not specified, use this group name. The " "group will be looked up if it exists, and created if not.", ) add_req_named( mergedb_parser, "--src", help="Source database (specified as an SQLAlchemy URL). The contents " "of this database will be merged into the database specified " "in the config file.", ) # noinspection PyTypeChecker add_req_named( mergedb_parser, "--whichidnum_map", type=int_int_mapper, help="Map to convert ID number types, in the format " "'from_a:to_a,from_b:to_b,...', where all values are integers.", ) # noinspection PyTypeChecker add_req_named( mergedb_parser, "--groupnum_map", type=int_int_mapper, help="Map to convert group numbers, in the format " "'from_a:to_a,from_b:to_b,...', where all values are integers.", ) mergedb_parser.set_defaults( func=lambda args: _merge_camcops_db( src=args.src, echo=args.echo, report_every=args.report_every, dummy_run=args.dummy_run, info_only=args.info_only, # skip_export_logs=args.skip_export_logs, # skip_audit_logs=args.skip_audit_logs, default_group_id=args.default_group_id, default_group_name=args.default_group_name, whichidnum_map=args.whichidnum_map, groupnum_map=args.groupnum_map, ) ) # WATCH OUT. There appears to be a bug somewhere in the way that the # Pyramid debug toolbar registers itself with SQLAlchemy (see # pyramid_debugtoolbar/panels/sqla.py; look for "before_cursor_execute" # and "after_cursor_execute". Somehow, some connections (but not all) seem # to get this event registered twice. The upshot is that the sequence can # lead to an attempt to double-delete the debug toolbar's timer: # # _before_cursor_execute: <sqlalchemy.engine.base.Connection object at 0x7f5c1fa7c630>, 'SHOW CREATE TABLE `_hl7_run_log`', () # noqa # _before_cursor_execute: <sqlalchemy.engine.base.Connection object at 0x7f5c1fa7c630>, 'SHOW CREATE TABLE `_hl7_run_log`', () # noqa # ^^^ this is the problem: event called twice # _after_cursor_execute: <sqlalchemy.engine.base.Connection object at 0x7f5c1fa7c630>, 'SHOW CREATE TABLE `_hl7_run_log`', () # noqa # _after_cursor_execute: <sqlalchemy.engine.base.Connection object at 0x7f5c1fa7c630>, 'SHOW CREATE TABLE `_hl7_run_log`', () # noqa # ^^^ and this is where the problem becomes evident # Traceback (most recent call last): # ... # File "/home/rudolf/dev/venvs/camcops/lib/python3.5/site-packages/pyramid_debugtoolbar/panels/sqla.py", line 51, in _after_cursor_execute # noqa # delattr(conn, 'pdtb_start_timer') # AttributeError: pdtb_start_timer # # So the simplest thing is only to register the debug toolbar for stuff # that might need it... # Create database createdb_parser = add_sub( subparsers, "create_db", config_mandatory=True, help="Create CamCOPS database from scratch (AVOID; use the upgrade " "facility instead)", ) add_req_named( createdb_parser, "--confirm_create_db", action="store_true", help="Must specify this too, as a safety measure", ) createdb_parser.set_defaults( func=lambda args: _create_database_from_scratch( cfg=get_default_config_from_os_env() ) ) # Print database schema ddl_parser = add_sub( subparsers, "ddl", help="Print database schema (data definition language; DDL)", ) ddl_parser.add_argument( "--dialect", type=str, default=SqlaDialectName.MYSQL, help=f"SQL dialect (options: {', '.join(sorted(ALL_SQLA_DIALECTS))})", ) ddl_parser.set_defaults( func=lambda args: print(_get_all_ddl(dialect_name=args.dialect)) ) # Rebuild server indexes reindex_parser = add_sub(subparsers, "reindex", help="Recreate task index") reindex_parser.set_defaults( func=lambda args: _reindex(cfg=get_default_config_from_os_env()) ) check_index_parser = add_sub( subparsers, "check_index", help="Check index validity (exit code 0 for OK, 1 for bad)", ) check_index_parser.add_argument( "--show_all_bad", action="store_true", help="Show all bad index entries (rather than stopping at the first)", ) check_index_parser.set_defaults( func=lambda args: _check_index( cfg=get_default_config_from_os_env(), show_all_bad=args.show_all_bad, ) ) # ------------------------------------------------------------------------- # User commands # ------------------------------------------------------------------------- # Make superuser superuser_parser = add_sub( subparsers, "make_superuser", help="Make superuser, or give superuser status to an existing user", ) superuser_parser.add_argument( "--username", help="Username of superuser to create/promote (if omitted, you will " "be asked to type it in)", ) superuser_parser.add_argument( "--password", help="Password of superuser to create (if omitted, you will " "be asked to type it in)", ) superuser_parser.set_defaults( func=lambda args: _make_superuser( username=args.username, password=args.password ) ) # Reset a user's password password_parser = add_sub( subparsers, "reset_password", help="Reset a user's password" ) password_parser.add_argument( "--username", help="Username to change password for (if omitted, you will be asked " "to type it in)", ) password_parser.set_defaults( func=lambda args: _reset_password(username=args.username) ) # Re-enable a locked account enableuser_parser = add_sub( subparsers, "enable_user", help="Re-enable a locked user account" ) enableuser_parser.add_argument( "--username", help="Username to enable (if omitted, you will be asked " "to type it in)", ) enableuser_parser.set_defaults( func=lambda args: _enable_user_cli(username=args.username) ) # ------------------------------------------------------------------------- # Export options # ------------------------------------------------------------------------- def _add_export_options(sp: ArgumentParser) -> None: sp.add_argument( "--recipients", type=str, nargs="*", help="Export recipients (as named in config file)", ) sp.add_argument( "--all_recipients", action="store_true", help="Use all recipients" ) sp.add_argument( "--disable_task_index", action="store_true", help="Disable use of the task index (for debugging only)", ) # Export data export_parser = add_sub( subparsers, "export", help="Trigger pending exports" ) _add_export_options(export_parser) export_parser.add_argument( "--schedule_via_backend", action="store_true", help="Export tasks as a background job", ) export_parser.set_defaults( func=lambda args: _cmd_export( recipient_names=args.recipients, all_recipients=args.all_recipients, via_index=not args.disable_task_index, schedule_via_backend=args.schedule_via_backend, ) ) # Show export queue show_export_queue_parser = add_sub( subparsers, "show_export_queue", help="View outbound export queue (without sending)", ) _add_export_options(show_export_queue_parser) show_export_queue_parser.add_argument( "--pretty", action="store_true", help="Pretty (but slower) formatting for tasks", ) show_export_queue_parser.add_argument( "--debug_show_fhir", action="store_true", help="Show FHIR output for tasks", ) show_export_queue_parser.add_argument( "--debug_fhir_include_docs", action="store_true", help="(If --debug_show_fhir) Included FHIR documents? Large.", ) show_export_queue_parser.set_defaults( func=lambda args: _cmd_show_export_queue( recipient_names=args.recipients, all_recipients=args.all_recipients, via_index=not args.disable_task_index, pretty=args.pretty, debug_show_fhir=args.debug_show_fhir, debug_fhir_include_docs=args.debug_fhir_include_docs, ) ) # Make CRATE data dictionary crate_dd_parser = add_sub( subparsers, "crate_dd", help="Make draft data dictionary for CRATE anonymisation tool", ) crate_dd_parser.add_argument( "--filename", type=str, required=True, help="Output filename (data dictionary to write)", ) crate_dd_parser.add_argument( "--recipient", type=str, required=True, help="Export recipient (as named in config file)", ) crate_dd_parser.set_defaults( func=lambda args: _cmd_crate_dd( filename=args.filename, recipient_name=args.recipient ) ) # Make CRIS data dictionary cris_dd_parser = add_sub( subparsers, "cris_dd", help="Make draft data dictionary for CRIS anonymisation tool", ) cris_dd_parser.add_argument( "--filename", type=str, required=True, help="Filename of data dictionary to write", ) cris_dd_parser.add_argument( "--recipient", type=str, required=True, help="Export recipient (as named in config file)", ) cris_dd_parser.set_defaults( func=lambda args: _cmd_cris_dd( filename=args.filename, recipient_name=args.recipient ) ) # ------------------------------------------------------------------------- # Web server options # ------------------------------------------------------------------------- # Serve via CherryPy serve_cp_parser = add_sub( subparsers, "serve_cherrypy", help="Start web server via CherryPy" ) serve_cp_parser.set_defaults(func=lambda args: _serve_cherrypy()) # Serve via Gunicorn serve_gu_parser = add_sub( subparsers, "serve_gunicorn", help="Start web server via Gunicorn (not available under Windows)", ) serve_gu_parser.set_defaults(func=lambda args: _serve_gunicorn()) # Serve via the Pyramid test server serve_pyr_parser = add_sub( subparsers, "serve_pyramid", help="Start test web server via Pyramid (single-thread, " "single-process, HTTP-only; for development use only)", ) serve_pyr_parser.set_defaults(func=lambda args: _test_serve_pyramid()) # ------------------------------------------------------------------------- # Preprocessing options # ------------------------------------------------------------------------- athena_icd_snomed_to_xml_parser = add_sub( subparsers, "convert_athena_icd_snomed_to_xml", help="Fetch SNOMED-CT codes for ICD-9-CM and ICD-10 from the Athena " "OHDSI data set (https://athena.ohdsi.org/) and write them to " "the CamCOPS XML format", ) athena_icd_snomed_to_xml_parser.add_argument( "--athena_concept_tsv_filename", type=str, required=True, help="Path to CONCEPT.csv file from Athena download", ) athena_icd_snomed_to_xml_parser.add_argument( "--athena_concept_relationship_tsv_filename", type=str, required=True, help="Path to CONCEPT_RELATIONSHIP.csv file from Athena download", ) athena_icd_snomed_to_xml_parser.add_argument( "--icd9_xml_filename", type=str, required=True, help="Filename of ICD-9-CM/SNOMED-CT XML file to write", ) athena_icd_snomed_to_xml_parser.add_argument( "--icd10_xml_filename", type=str, required=True, help="Filename of ICD-10/SNOMED-CT XML file to write", ) athena_icd_snomed_to_xml_parser.set_defaults( func=lambda args: send_athena_icd_snomed_to_xml( athena_concept_tsv_filename=args.athena_concept_tsv_filename, athena_concept_relationship_tsv_filename=args.athena_concept_relationship_tsv_filename, # noqa icd9_xml_filename=args.icd9_xml_filename, icd10_xml_filename=args.icd10_xml_filename, ) ) # ------------------------------------------------------------------------- # Celery options # ------------------------------------------------------------------------- # Launch Celery workers celery_worker_parser = add_sub( subparsers, "launch_workers", help="Launch Celery workers, for background processing", ) celery_worker_parser.set_defaults( func=lambda args: _launch_celery_workers(verbose=args.verbose) ) # Launch Celery Bear celery_beat_parser = add_sub( subparsers, "launch_scheduler", help="Launch Celery Beat scheduler, to schedule background jobs", ) celery_beat_parser.set_defaults( func=lambda args: _launch_celery_beat(verbose=args.verbose) ) # Launch Celery Flower monitor celery_flower_parser = add_sub( subparsers, "launch_monitor", help="Launch Celery Flower monitor, to monitor background jobs", ) celery_flower_parser.add_argument( "--address", type=str, default=DEFAULT_FLOWER_ADDRESS, help="Address to use for Flower", ) celery_flower_parser.add_argument( "--port", type=int, default=DEFAULT_FLOWER_PORT, help="Port to use for Flower", ) celery_flower_parser.set_defaults( func=lambda args: _launch_celery_flower( address=args.address, port=args.port ) ) # Housekeeping task housekeeping_parser = add_sub( subparsers, "housekeeping", help="Run housekeeping tasks (remove stale sessions, etc.)", ) housekeeping_parser.set_defaults(func=lambda args: _housekeeping()) # Purge Celery tasks purge_jobs_parser = add_sub( subparsers, "purge_jobs", help="Purge any outstanding background (back-end, worker) jobs", ) purge_jobs_parser.set_defaults(func=lambda args: _purge_jobs()) # ------------------------------------------------------------------------- # Test options # ------------------------------------------------------------------------- # Launch a Python command line dev_cli_parser = add_sub( subparsers, "dev_cli", help="Developer command-line interface, with config loaded as " "'config'.", ) dev_cli_parser.set_defaults(func=lambda args: _dev_cli()) # Show tasklist list_tasks_parser = add_sub( subparsers, "list_tasks", help="List supported tasks.", ) list_tasks_parser.set_defaults(func=lambda args: _list_tasks()) # ------------------------------------------------------------------------- # OK; parser built; now parse the arguments # ------------------------------------------------------------------------- progargs = parser.parse_args() # Initial log level (overridden later by config file but helpful for start) if progargs.no_log: loglevel = logging.CRITICAL + 1 elif progargs.verbose: loglevel = logging.DEBUG else: loglevel = logging.INFO main_only_quicksetup_rootlogger( level=loglevel, with_process_id=True, with_thread_id=True ) rootlogger = logging.getLogger() set_level_for_logger_and_its_handlers(rootlogger, loglevel) # Say hello log.info( f""" # ============================================================================= # CamCOPS server version {CAMCOPS_SERVER_VERSION} # Created by Rudolf Cardinal. See {CAMCOPS_URL} # ============================================================================= """ ) log.debug( """ # ----------------------------------------------------------------------------- # Python interpreter: {interpreter!r} # This program: {thisprog!r} # Command-line arguments: {progargs} # ----------------------------------------------------------------------------- """, interpreter=sys.executable, thisprog=__file__, progargs=pprint.pformat(vars(progargs)), ) if DEBUG_LOG_CONFIG: print_report_on_all_logs() # Finalize the config filename; ensure it's in the environment variable if hasattr(progargs, "config") and progargs.config: # We want the the config filename in the environment from now on: os.environ[ENVVAR_CONFIG_FILE] = progargs.config cfg_name = os.environ.get(ENVVAR_CONFIG_FILE, None) log.debug("Using configuration file: {!r}", cfg_name) # Call the subparser function for the chosen command if progargs.func is None: raise NotImplementedError("Command-line function not implemented!") success = progargs.func(progargs) # type: Optional[bool] if success is None or success is True: return EXIT_SUCCESS else: return EXIT_FAILURE
# ============================================================================= # Command-line entry point # =============================================================================
[docs]def display_top( snapshot: tracemalloc.Snapshot, key_type: str = "lineno", limit: int = 10, short_filename: bool = False, ) -> None: """ Display e.g. lines of code allocating the most memory. Args: snapshot: a :class:`tracemalloc.Snapshot` object key_type: thing to group by limit: show the top *n* short_filename: make source code filenames shorter? """ # Modified from https://docs.python.org/3/library/tracemalloc.html print("Calculating memory allocation...") snapshot = snapshot.filter_traces( ( tracemalloc.Filter(False, "<frozen importlib._bootstrap>"), tracemalloc.Filter( False, "<frozen importlib._bootstrap_external>" ), tracemalloc.Filter(False, "<unknown>"), ) ) top_stats = snapshot.statistics(key_type) print(f"Top {limit} lines") for index, stat in enumerate(top_stats[:limit], 1): frame = stat.traceback[0] if short_filename: # replace "/path/to/module/file.py" with "module/file.py" filename = os.sep.join(frame.filename.split(os.sep)[-2:]) else: filename = frame.filename print( f"#{index}: {filename}:{frame.lineno}: " f"{stat.size / 1024:.1f} KiB" ) line = linecache.getline(frame.filename, frame.lineno).strip() if line: print(f" {line}") other = top_stats[limit:] if other: size = sum(stat.size for stat in other) print(f"{len(other)} other: {size / 1024:.1f} KiB") total = sum(stat.size for stat in top_stats) print(f"Total allocated size: {total / 1024:.1f} KiB")
[docs]def main() -> None: """ Command-line entry point. Calls :func:`camcops_main`. """ if DEBUG_MEMORY_ALLOCATION: tracemalloc.start() if DEBUG_RUN_WITH_PDB: retval = pdb_run(camcops_main) else: retval = camcops_main() if DEBUG_MEMORY_ALLOCATION: # https://docs.python.org/3/library/tracemalloc.html snapshot = tracemalloc.take_snapshot() display_top(snapshot, key_type="lineno", limit=20) sys.exit(retval)
if __name__ == "__main__": main()