# noinspection HttpUrlsUsage
"""
camcops_server/cc_modules/cc_hl7.py
===============================================================================
Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
This file is part of CamCOPS.
CamCOPS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
CamCOPS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
===============================================================================
**Core HL7 functions, e.g. to build HL7 v2 messages.**
General HL7 sources:
- https://python-hl7.readthedocs.org/en/latest/
- http://www.interfaceware.com/manual/v3gen_python_library_details.html
- http://www.interfaceware.com/hl7_video_vault.html#how
- http://www.interfaceware.com/hl7-standard/hl7-segments.html
- https://www.hl7.org/special/committees/vocab/v26_appendix_a.pdf
- https://www.ncbi.nlm.nih.gov/pmc/articles/PMC130066/
To consider
- batched messages (HL7 batching protocol);
https://docs.oracle.com/cd/E23943_01/user.1111/e23486/app_hl7batching.htm
- note: DG1 segment = diagnosis
Basic HL7 message structure:
- can package into HL7 2.X message as encapsulated PDF;
https://www.hl7standards.com/blog/2007/11/27/pdf-attachment-in-hl7-message/
- message ORU^R01
https://www.corepointhealth.com/resource-center/hl7-resources/hl7-messages
- MESSAGES: http://www.interfaceware.com/hl7-standard/hl7-messages.html
- OBX segment = observation/result segment;
https://www.corepointhealth.com/resource-center/hl7-resources/hl7-obx-segment;
http://www.interfaceware.com/hl7-standard/hl7-segment-OBX.html
- SEGMENTS:
https://www.corepointhealth.com/resource-center/hl7-resources/hl7-segments
- ED field (= encapsulated data);
http://www.interfaceware.com/hl7-standard/hl7-fields.html
- base-64 encoding
We can then add an option for structure (XML), HTML, PDF export.
"""
import base64
import logging
import socket
from types import TracebackType
from typing import List, Optional, Tuple, Type, TYPE_CHECKING, Union
from cardinal_pythonlib.datetimefunc import format_datetime
from cardinal_pythonlib.logs import BraceStyleAdapter
import hl7
from pendulum import Date, DateTime as Pendulum
from camcops_server.cc_modules.cc_constants import DateFormat, FileType
from camcops_server.cc_modules.cc_simpleobjects import HL7PatientIdentifier
if TYPE_CHECKING:
from camcops_server.cc_modules.cc_request import CamcopsRequest
from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions
from camcops_server.cc_modules.cc_task import Task
log = BraceStyleAdapter(logging.getLogger(__name__))
# =============================================================================
# Constants
# =============================================================================
# STRUCTURE OF HL7 MESSAGES
# MESSAGE = list of segments, separated by carriage returns
SEGMENT_SEPARATOR = "\r"
# SEGMENT = list of fields (= composites), separated by pipes
FIELD_SEPARATOR = "|"
# FIELD (= COMPOSITE) = string, or list of components separated by carets
COMPONENT_SEPARATOR = "^"
# Component = string, or lists of subcomponents separated by ampersands
SUBCOMPONENT_SEPARATOR = "&"
# Subcomponents must be primitive data types (i.e. strings).
# ... http://www.interfaceware.com/blog/hl7-composites/
REPETITION_SEPARATOR = "~"
ESCAPE_CHARACTER = "\\"
# Fields are specified in terms of DATA TYPES:
# http://www.corepointhealth.com/resource-center/hl7-resources/hl7-data-types
# Some of those are COMPOSITE TYPES:
# http://amisha.pragmaticdata.com/~gunther/oldhtml/composites.html#COMPOSITES
# =============================================================================
# HL7 helper functions
# =============================================================================
[docs]def get_mod11_checkdigit(strnum: str) -> str:
# noinspection HttpUrlsUsage
"""
Input: string containing integer. Output: MOD11 check digit (string).
See:
- http://www.mexi.be/documents/hl7/ch200025.htm
- https://stackoverflow.com/questions/7006109
- http://www.pgrocer.net/Cis51/mod11.html
"""
total = 0
multiplier = 2 # 2 for units digit, increases to 7, then resets to 2
try:
for i in reversed(range(len(strnum))):
total += int(strnum[i]) * multiplier
multiplier += 1
if multiplier == 8:
multiplier = 2
c = str(11 - (total % 11))
if c == "11":
c = "0"
elif c == "10":
c = "X"
return c
except (TypeError, ValueError):
# garbage in...
return ""
[docs]def make_msh_segment(
message_datetime: Pendulum, message_control_id: str
) -> hl7.Segment:
"""
Creates an HL7 message header (MSH) segment.
- MSH: https://www.hl7.org/documentcenter/public/wg/conf/HL7MSH.htm
- We're making an ORU^R01 message = unsolicited result.
- ORU = Observational Report - Unsolicited
- ORU^R01 = Unsolicited transmission of an observation message
- https://www.corepointhealth.com/resource-center/hl7-resources/hl7-oru-message
- https://www.hl7kit.com/joomla/index.php/hl7resources/examples/107-orur01
""" # noqa
segment_id = "MSH"
encoding_characters = (
COMPONENT_SEPARATOR
+ REPETITION_SEPARATOR
+ ESCAPE_CHARACTER
+ SUBCOMPONENT_SEPARATOR
)
sending_application = "CamCOPS"
sending_facility = ""
receiving_application = ""
receiving_facility = ""
date_time_of_message = format_datetime(
message_datetime, DateFormat.HL7_DATETIME
)
security = ""
message_type = hl7.Field(
COMPONENT_SEPARATOR,
[
"ORU", # message type ID = Observ result/unsolicited
"R01", # trigger event ID = ORU/ACK - Unsolicited transmission
# of an observation message
],
)
processing_id = "P" # production (processing mode: current)
version_id = "2.3" # HL7 version
sequence_number = ""
continuation_pointer = ""
accept_acknowledgement_type = ""
application_acknowledgement_type = "AL" # always
country_code = ""
character_set = "UNICODE UTF-8"
# http://wiki.hl7.org/index.php?title=Character_Set_used_in_v2_messages
principal_language_of_message = ""
fields = [
segment_id,
# field separator inserted automatically; HL7 standard considers it a
# field but the python-hl7 processor doesn't when it parses
encoding_characters,
sending_application,
sending_facility,
receiving_application,
receiving_facility,
date_time_of_message,
security,
message_type,
message_control_id,
processing_id,
version_id,
sequence_number,
continuation_pointer,
accept_acknowledgement_type,
application_acknowledgement_type,
country_code,
character_set,
principal_language_of_message,
]
segment = hl7.Segment(FIELD_SEPARATOR, fields)
return segment
[docs]def make_pid_segment(
forename: str,
surname: str,
dob: Date,
sex: str,
address: str,
patient_id_list: List[HL7PatientIdentifier] = None,
) -> hl7.Segment:
"""
Creates an HL7 patient identification (PID) segment.
- https://www.corepointhealth.com/resource-center/hl7-resources/hl7-pid-segment
- https://www.hl7.org/documentcenter/public/wg/conf/Msgadt.pdf (s5.4.8)
- ID numbers...
https://www.cdc.gov/vaccines/programs/iis/technical-guidance/downloads/hl7guide-1-4-2012-08.pdf
""" # noqa
patient_id_list = patient_id_list or [] # type: List[HL7PatientIdentifier]
segment_id = "PID"
set_id = ""
# External ID
patient_external_id = ""
# ... this one is deprecated
# http://www.j4jayant.com/articles/hl7/16-patient-id
# Internal ID
internal_id_element_list = []
for i in range(len(patient_id_list)):
if not patient_id_list[i].pid:
continue
ptidentifier = patient_id_list[i]
pid = ptidentifier.pid
check_digit = get_mod11_checkdigit(pid)
check_digit_scheme = "M11" # Mod 11 algorithm
type_id = patient_id_list[i].id_type
assigning_authority = patient_id_list[i].assigning_authority
# Now, as per Table 4.6 "Extended composite ID" of
# hl7guide-1-4-2012-08.pdf:
internal_id_element = hl7.Field(
COMPONENT_SEPARATOR,
[
pid,
check_digit,
check_digit_scheme,
assigning_authority,
type_id, # length "2..5" meaning 2-5
],
)
internal_id_element_list.append(internal_id_element)
patient_internal_id = hl7.Field(
REPETITION_SEPARATOR, internal_id_element_list
)
# Alternate ID
alternate_patient_id = ""
# ... this one is deprecated
# http://www.j4jayant.com/articles/hl7/16-patient-id
patient_name = hl7.Field(
COMPONENT_SEPARATOR,
[
forename, # surname
surname, # forename
"", # middle initial/name
"", # suffix (e.g. Jr, III)
"", # prefix (e.g. Dr)
"", # degree (e.g. MD)
],
)
mothers_maiden_name = ""
date_of_birth = format_datetime(dob, DateFormat.HL7_DATE)
alias = ""
race = ""
country_code = ""
home_phone_number = ""
business_phone_number = ""
language = ""
marital_status = ""
religion = ""
account_number = ""
social_security_number = ""
drivers_license_number = ""
mother_identifier = ""
ethnic_group = ""
birthplace = ""
birth_order = ""
citizenship = ""
veterans_military_status = ""
fields = [
segment_id,
set_id, # PID.1
patient_external_id, # PID.2
patient_internal_id, # known as "PID-3" or "PID.3"
alternate_patient_id, # PID.4
patient_name,
mothers_maiden_name,
date_of_birth,
sex,
alias,
race,
address,
country_code,
home_phone_number,
business_phone_number,
language,
marital_status,
religion,
account_number,
social_security_number,
drivers_license_number,
mother_identifier,
ethnic_group,
birthplace,
birth_order,
citizenship,
veterans_military_status,
]
segment = hl7.Segment(FIELD_SEPARATOR, fields)
return segment
# noinspection PyUnusedLocal
[docs]def make_obr_segment(task: "Task") -> hl7.Segment:
# noinspection HttpUrlsUsage
"""
Creates an HL7 observation request (OBR) segment.
- http://hl7reference.com/HL7%20Specifications%20ORM-ORU.PDF
- Required in ORU^R01 message:
- https://www.corepointhealth.com/resource-center/hl7-resources/hl7-oru-message
- https://www.corepointhealth.com/resource-center/hl7-resources/hl7-obr-segment
""" # noqa
segment_id = "OBR"
set_id = "1"
placer_order_number = "CamCOPS"
filler_order_number = "CamCOPS"
universal_service_id = hl7.Field(
COMPONENT_SEPARATOR,
["CamCOPS", "CamCOPS psychiatric/cognitive assessment"],
)
# unused below here, apparently
priority = ""
requested_date_time = ""
observation_date_time = ""
observation_end_date_time = ""
collection_volume = ""
collector_identifier = ""
specimen_action_code = ""
danger_code = ""
relevant_clinical_information = ""
specimen_received_date_time = ""
ordering_provider = ""
order_callback_phone_number = ""
placer_field_1 = ""
placer_field_2 = ""
filler_field_1 = ""
filler_field_2 = ""
results_report_status_change_date_time = ""
charge_to_practice = ""
diagnostic_service_section_id = ""
result_status = ""
parent_result = ""
quantity_timing = ""
result_copies_to = ""
parent = ""
transportation_mode = ""
reason_for_study = ""
principal_result_interpreter = ""
assistant_result_interpreter = ""
technician = ""
transcriptionist = ""
scheduled_date_time = ""
number_of_sample_containers = ""
transport_logistics_of_collected_samples = ""
collectors_comment = ""
transport_arrangement_responsibility = ""
transport_arranged = ""
escort_required = ""
planned_patient_transport_comment = ""
fields = [
segment_id,
set_id,
placer_order_number,
filler_order_number,
universal_service_id,
priority,
requested_date_time,
observation_date_time,
observation_end_date_time,
collection_volume,
collector_identifier,
specimen_action_code,
danger_code,
relevant_clinical_information,
specimen_received_date_time,
ordering_provider,
order_callback_phone_number,
placer_field_1,
placer_field_2,
filler_field_1,
filler_field_2,
results_report_status_change_date_time,
charge_to_practice,
diagnostic_service_section_id,
result_status,
parent_result,
quantity_timing,
result_copies_to,
parent,
transportation_mode,
reason_for_study,
principal_result_interpreter,
assistant_result_interpreter,
technician,
transcriptionist,
scheduled_date_time,
number_of_sample_containers,
transport_logistics_of_collected_samples,
collectors_comment,
transport_arrangement_responsibility,
transport_arranged,
escort_required,
planned_patient_transport_comment,
]
segment = hl7.Segment(FIELD_SEPARATOR, fields)
return segment
[docs]def make_obx_segment(
req: "CamcopsRequest",
task: "Task",
task_format: str,
observation_identifier: str,
observation_datetime: Pendulum,
responsible_observer: str,
export_options: "TaskExportOptions",
) -> hl7.Segment:
# noinspection HttpUrlsUsage
"""
Creates an HL7 observation result (OBX) segment.
- http://www.hl7standards.com/blog/2006/10/18/how-do-i-send-a-binary-file-inside-of-an-hl7-message
- http://www.hl7standards.com/blog/2007/11/27/pdf-attachment-in-hl7-message/
- http://www.hl7standards.com/blog/2006/12/01/sending-images-or-formatted-documents-via-hl7-messaging/
- https://www.hl7.org/documentcenter/public/wg/ca/HL7ClmAttIG.PDF
- type of data:
https://www.hl7.org/implement/standards/fhir/v2/0191/index.html
- subtype of data:
https://www.hl7.org/implement/standards/fhir/v2/0291/index.html
""" # noqa
segment_id = "OBX"
set_id = str(1)
source_application = "CamCOPS"
if task_format == FileType.PDF:
value_type = "ED" # Encapsulated data (ED) field
observation_value = hl7.Field(
COMPONENT_SEPARATOR,
[
source_application,
"Application", # type of data
"PDF", # data subtype
"Base64", # base 64 encoding
base64.standard_b64encode(task.get_pdf(req)), # data
],
)
elif task_format == FileType.HTML:
value_type = "ED" # Encapsulated data (ED) field
observation_value = hl7.Field(
COMPONENT_SEPARATOR,
[
source_application,
"TEXT", # type of data
"HTML", # data subtype
"A", # no encoding (see table 0299), but need to escape
escape_hl7_text(task.get_html(req)), # data
],
)
elif task_format == FileType.XML:
value_type = "ED" # Encapsulated data (ED) field
observation_value = hl7.Field(
COMPONENT_SEPARATOR,
[
source_application,
"TEXT", # type of data
"XML", # data subtype
"A", # no encoding (see table 0299), but need to escape
escape_hl7_text(
task.get_xml(
req, indent_spaces=0, eol="", options=export_options
)
), # data
],
)
else:
raise AssertionError(
f"make_obx_segment: invalid task_format: {task_format}"
)
observation_sub_id = ""
units = ""
reference_range = ""
abnormal_flags = ""
probability = ""
nature_of_abnormal_test = ""
observation_result_status = ""
date_of_last_observation_normal_values = ""
user_defined_access_checks = ""
date_and_time_of_observation = format_datetime(
observation_datetime, DateFormat.HL7_DATETIME
)
producer_id = ""
observation_method = ""
equipment_instance_identifier = ""
date_time_of_analysis = ""
fields = [
segment_id,
set_id,
value_type,
observation_identifier,
observation_sub_id,
observation_value,
units,
reference_range,
abnormal_flags,
probability,
nature_of_abnormal_test,
observation_result_status,
date_of_last_observation_normal_values,
user_defined_access_checks,
date_and_time_of_observation,
producer_id,
responsible_observer,
observation_method,
equipment_instance_identifier,
date_time_of_analysis,
]
segment = hl7.Segment(FIELD_SEPARATOR, fields)
return segment
[docs]def make_dg1_segment(
set_id: int,
diagnosis_datetime: Pendulum,
coding_system: str,
diagnosis_identifier: str,
diagnosis_text: str,
alternate_coding_system: str = "",
alternate_diagnosis_identifier: str = "",
alternate_diagnosis_text: str = "",
diagnosis_type: str = "F",
diagnosis_classification: str = "D",
confidential_indicator: str = "N",
clinician_id_number: Union[str, int] = None,
clinician_surname: str = "",
clinician_forename: str = "",
clinician_middle_name_or_initial: str = "",
clinician_suffix: str = "",
clinician_prefix: str = "",
clinician_degree: str = "",
clinician_source_table: str = "",
clinician_assigning_authority: str = "",
clinician_name_type_code: str = "",
clinician_identifier_type_code: str = "",
clinician_assigning_facility: str = "",
attestation_datetime: Pendulum = None,
) -> hl7.Segment:
# noinspection HttpUrlsUsage
"""
Creates an HL7 diagnosis (DG1) segment.
Args:
.. code-block:: none
set_id: Diagnosis sequence number, starting with 1 (use higher numbers
for >1 diagnosis).
diagnosis_datetime: Date/time diagnosis was made.
coding_system: E.g. "I9C" for ICD9-CM; "I10" for ICD10.
diagnosis_identifier: Code.
diagnosis_text: Text.
alternate_coding_system: Optional alternate coding system.
alternate_diagnosis_identifier: Optional alternate code.
alternate_diagnosis_text: Optional alternate text.
diagnosis_type: A admitting, W working, F final.
diagnosis_classification: C consultation, D diagnosis, M medication,
O other, R radiological scheduling, S sign and symptom,
T tissue diagnosis, I invasive procedure not classified elsewhere.
confidential_indicator: Y yes, N no
clinician_id_number: } Diagnosing clinician.
clinician_surname: }
clinician_forename: }
clinician_middle_name_or_initial: }
clinician_suffix: }
clinician_prefix: }
clinician_degree: }
clinician_source_table: }
clinician_assigning_authority: }
clinician_name_type_code: }
clinician_identifier_type_code: }
clinician_assigning_facility: }
attestation_datetime: Date/time the diagnosis was attested.
- http://www.mexi.be/documents/hl7/ch600012.htm
- https://www.hl7.org/special/committees/vocab/V26_Appendix_A.pdf
"""
segment_id = "DG1"
try:
int(set_id)
set_id = str(set_id) # type: ignore[assignment]
except Exception:
raise AssertionError("make_dg1_segment: set_id invalid")
diagnosis_coding_method = ""
diagnosis_code = hl7.Field(
COMPONENT_SEPARATOR,
[
diagnosis_identifier,
diagnosis_text,
coding_system,
alternate_diagnosis_identifier,
alternate_diagnosis_text,
alternate_coding_system,
],
)
diagnosis_description = ""
diagnosis_datetime = format_datetime(
diagnosis_datetime, DateFormat.HL7_DATETIME
)
if diagnosis_type not in ("A", "W", "F"):
raise AssertionError("make_dg1_segment: diagnosis_type invalid")
major_diagnostic_category = ""
diagnostic_related_group = ""
drg_approval_indicator = ""
drg_grouper_review_code = ""
outlier_type = ""
outlier_days = ""
outlier_cost = ""
grouper_version_and_type = ""
diagnosis_priority = ""
try:
clinician_id_number = (
str(int(clinician_id_number))
if clinician_id_number is not None
else ""
)
except Exception:
raise AssertionError(
"make_dg1_segment: diagnosing_clinician_id_number" " invalid"
)
if clinician_id_number:
clinician_id_check_digit = get_mod11_checkdigit(clinician_id_number)
clinician_checkdigit_scheme = "M11" # Mod 11 algorithm
else:
clinician_id_check_digit = ""
clinician_checkdigit_scheme = ""
diagnosing_clinician = hl7.Field(
COMPONENT_SEPARATOR,
[
clinician_id_number,
clinician_surname or "",
clinician_forename or "",
clinician_middle_name_or_initial or "",
clinician_suffix or "",
clinician_prefix or "",
clinician_degree or "",
clinician_source_table or "",
clinician_assigning_authority or "",
clinician_name_type_code or "",
clinician_id_check_digit or "",
clinician_checkdigit_scheme or "",
clinician_identifier_type_code or "",
clinician_assigning_facility or "",
],
)
if diagnosis_classification not in (
"C",
"D",
"M",
"O",
"R",
"S",
"T",
"I",
):
raise AssertionError(
"make_dg1_segment: diagnosis_classification invalid"
)
if confidential_indicator not in ("Y", "N"):
raise AssertionError(
"make_dg1_segment: confidential_indicator invalid"
)
attestation_datetime = (
format_datetime(attestation_datetime, DateFormat.HL7_DATETIME)
if attestation_datetime
else ""
)
fields = [
segment_id,
set_id,
diagnosis_coding_method,
diagnosis_code,
diagnosis_description,
diagnosis_datetime,
diagnosis_type,
major_diagnostic_category,
diagnostic_related_group,
drg_approval_indicator,
drg_grouper_review_code,
outlier_type,
outlier_days,
outlier_cost,
grouper_version_and_type,
diagnosis_priority,
diagnosing_clinician,
diagnosis_classification,
confidential_indicator,
attestation_datetime,
]
segment = hl7.Segment(FIELD_SEPARATOR, fields)
return segment
[docs]def escape_hl7_text(s: str) -> str:
# noinspection HttpUrlsUsage
"""
Escapes HL7 special characters.
- http://www.mexi.be/documents/hl7/ch200034.htm
- http://www.mexi.be/documents/hl7/ch200071.htm
"""
esc_escape = ESCAPE_CHARACTER + ESCAPE_CHARACTER + ESCAPE_CHARACTER
esc_fieldsep = ESCAPE_CHARACTER + "F" + ESCAPE_CHARACTER
esc_componentsep = ESCAPE_CHARACTER + "S" + ESCAPE_CHARACTER
esc_subcomponentsep = ESCAPE_CHARACTER + "T" + ESCAPE_CHARACTER
esc_repetitionsep = ESCAPE_CHARACTER + "R" + ESCAPE_CHARACTER
# Linebreaks:
# http://www.healthintersections.com.au/?p=344
# https://groups.google.com/forum/#!topic/ensemble-in-healthcare/wP2DWMeFrPA # noqa
# http://www.hermetechnz.com/documentation/sqlschema/index.html?hl7_escape_rules.htm # noqa
esc_linebreak = ESCAPE_CHARACTER + ".br" + ESCAPE_CHARACTER
s = s.replace(ESCAPE_CHARACTER, esc_escape) # this one first!
s = s.replace(FIELD_SEPARATOR, esc_fieldsep)
s = s.replace(COMPONENT_SEPARATOR, esc_componentsep)
s = s.replace(SUBCOMPONENT_SEPARATOR, esc_subcomponentsep)
s = s.replace(REPETITION_SEPARATOR, esc_repetitionsep)
s = s.replace("\n", esc_linebreak)
return s
[docs]def msg_is_successful_ack(msg: hl7.Message) -> Tuple[bool, Optional[str]]:
# noinspection HttpUrlsUsage
"""
Checks whether msg represents a successful acknowledgement message.
- http://hl7reference.com/HL7%20Specifications%20ORM-ORU.PDF
"""
if msg is None:
return False, "Reply is None"
# Get segments (MSH, MSA)
if len(msg) != 2:
return False, f"Reply doesn't have 2 segments (has {len(msg)})"
msh_segment = msg[0]
msa_segment = msg[1]
# Check MSH segment
if len(msh_segment) < 9:
return (
False,
f"First (MSH) segment has <9 fields (has {len(msh_segment)})",
)
msh_segment_id = msh_segment[0]
msh_message_type = msh_segment[8]
if msh_segment_id != ["MSH"]:
return (
False,
f"First (MSH) segment ID is not 'MSH' (is {msh_segment_id})",
)
if msh_message_type != ["ACK"]:
return (
False,
f"MSH message type is not 'ACK' (is {msh_message_type})",
)
# Check MSA segment
if len(msa_segment) < 2:
return (
False,
f"Second (MSA) segment has <2 fields (has {len(msa_segment)})",
)
msa_segment_id = msa_segment[0]
msa_acknowledgment_code = msa_segment[1]
if msa_segment_id != ["MSA"]:
return (
False,
f"Second (MSA) segment ID is not 'MSA' (is {msa_segment_id})",
)
if msa_acknowledgment_code != ["AA"]:
# AA for success, AE for error
return (
False,
(
f"MSA acknowledgement code is not 'AA' "
f"(is {msa_acknowledgment_code})"
),
)
return True, None
# =============================================================================
# MLLPTimeoutClient
# =============================================================================
# Modification of MLLPClient from python-hl7, to allow timeouts and failure.
SB = "\x0b" # <SB>, vertical tab
EB = "\x1c" # <EB>, file separator
CR = "\x0d" # <CR>, \r
FF = "\x0c" # <FF>, new page form feed
RECV_BUFFER = 4096
[docs]class MLLPTimeoutClient(object):
"""
Class for MLLP TCP/IP transmission that implements timeouts.
"""
[docs] def __init__(self, host: str, port: int, timeout_ms: int = None) -> None:
"""Creates MLLP client and opens socket."""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
timeout_s = (
float(timeout_ms) / float(1000) if timeout_ms is not None else None
)
self.socket.settimeout(timeout_s)
self.socket.connect((host, port))
self.encoding = "utf-8"
def __enter__(self) -> "MLLPTimeoutClient":
"""
For use with "with" statement.
"""
return self
# noinspection PyUnusedLocal
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
"""
For use with "with" statement.
"""
self.close()
return None
[docs] def close(self) -> None:
"""
Release the socket connection.
"""
self.socket.close()
[docs] def send_message(
self, message: Union[str, hl7.Message]
) -> Tuple[bool, Optional[str]]:
"""
Wraps a string or :class:`hl7.Message` in a MLLP container
and sends the message to the server.
Returns ``success, ack_msg``.
"""
if isinstance(message, hl7.Message):
message = str(message)
# wrap in MLLP message container
data = SB + message + CR + EB + CR
# ... the CR immediately after the message is my addition, because
# HL7 Inspector otherwise says: "Warning: last segment have no segment
# termination char 0x0d !" (sic).
return self.send(data.encode(self.encoding))
[docs] def send(self, data: bytes) -> Tuple[bool, Optional[str]]:
"""
Low-level, direct access to the ``socket.send`` function (data must be
already wrapped in an MLLP container). Blocks until the server
returns.
Returns ``success, ack_msg``.
"""
# upload the data
self.socket.send(data)
# wait for the ACK/NACK
try:
ack_msg = self.socket.recv(RECV_BUFFER).decode(self.encoding)
return True, ack_msg
except socket.timeout:
return False, None