Align GeneXpert query handling with passive server mode

This commit is contained in:
Dwi Swandhana
2026-04-14 11:02:10 +07:00
parent 676ec49cb2
commit 2b91b18275
+73 -135
View File
@@ -1,6 +1,8 @@
import builtins
from enum import Enum
from logging import config
from logging.handlers import TimedRotatingFileHandler
import os
from queue import Queue
import re
import socket
@@ -30,6 +32,33 @@ formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(threadName)s - %(
log_handler.setFormatter(formatter)
logging.basicConfig(level=logging.INFO, handlers=[log_handler])
THREAD_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "thread_logs")
thread_log_lock = threading.Lock()
def _sanitize_thread_log_name(thread_name):
safe_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", str(thread_name or "main").strip())
return safe_name or "main"
def _write_thread_log(message):
try:
os.makedirs(THREAD_LOG_DIR, exist_ok=True)
thread_name = threading.current_thread().name
safe_thread_name = _sanitize_thread_log_name(thread_name)
log_path = os.path.join(THREAD_LOG_DIR, f"{safe_thread_name}.log")
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with thread_log_lock:
with open(log_path, "a", encoding="utf-8") as fh:
fh.write(f"{timestamp} | {message}\n")
except Exception:
pass
def print(*args, **kwargs):
sep = kwargs.get("sep", " ")
end = kwargs.get("end", "\n")
message = sep.join(str(arg) for arg in args)
_write_thread_log(message)
return builtins.print(*args, **kwargs)
# ==========================================
# 1. KONFIGURASI SISTEM
# ==========================================
@@ -82,7 +111,7 @@ GENEXPERT_TEST_MAPPING = {
GENEXPERT_IP_CAPABILITIES = {
"10.10.120.75": ["MTB-RIF", "MTB-RIF_ULTRA2", "MTB-XDR", "HIV-1_VL", "COV-2 2"],
"10.10.120.74": ["HCV", "HBV"],
"10.10.120.73": ["MTB-RIF"] # Belum ada yang aktif
"10.10.120.73": ["MTB-RIF"]
}
# Default code jika nama tes di database tidak dikenali
DEFAULT_GXP_CODE = "MTB-RIF"
@@ -309,34 +338,11 @@ def get_genexpert_query_orders(ip_addr, hl7_msg):
for order in base_orders:
if str(order.rnoreg or "").strip() != requested_sample_id:
continue
test_code = resolve_genexpert_test_code(order, ip_addr)
if test_code:
return [order]
print(
f"[GENEXPERT] Tidak ada order cocok/didukung untuk sample_id={requested_sample_id} di {ip_addr}"
)
return [order]
print(f"[GENEXPERT] Tidak ada order untuk sample_id={requested_sample_id} di {ip_addr}")
return []
selected_orders = []
selected_codes = set()
max_orders = max(len(supported_codes), 1)
for supported_code in supported_codes:
if len(selected_orders) >= max_orders:
break
for order in base_orders:
if any(existing.urut == order.urut for existing in selected_orders):
continue
test_code = resolve_genexpert_test_code(order, ip_addr)
if test_code != supported_code:
continue
if test_code in selected_codes:
continue
selected_orders.append(order)
selected_codes.add(test_code)
break
return selected_orders
return base_orders[:1]
finally:
session.close()
@@ -354,12 +360,20 @@ def create_genexpert_ack_j01_response(incoming_hl7):
msa = f"MSA|CA|{incoming_control_id}"
return f"{msh}\r{msa}\r"
def create_genexpert_rsp_z02_response(orders, incoming_hl7):
def create_genexpert_ack_r01_response(incoming_hl7):
incoming_control_id = extract_msg_control_id(incoming_hl7) or "UNKNOWN"
resp_control_id = f"ACK{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"
msh = build_genexpert_response_msh("ACK^R01", incoming_hl7, resp_control_id)
msa = f"MSA|CA|{incoming_control_id}"
return f"{msh}\r{msa}\r"
def create_genexpert_rsp_z02_response(orders, incoming_hl7, ip_addr=None):
qpd_segment = extract_segment(incoming_hl7, "QPD")
qpd = parse_genexpert_qpd(qpd_segment)
query_tag = qpd.get("query_tag") or (extract_msg_control_id(incoming_hl7) or "UNKNOWN")
query_name = qpd.get("query_name") or "Z03^HOST QUERY"
resp_control_id = f"RSP{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"
supported_codes = GENEXPERT_IP_CAPABILITIES.get(str(ip_addr or "").strip(), [])
msh = build_genexpert_response_msh("RSP^Z02", incoming_hl7, resp_control_id)
msa = f"MSA|AA|{query_tag}"
@@ -371,15 +385,18 @@ def create_genexpert_rsp_z02_response(orders, incoming_hl7):
for patient_idx, order in enumerate(orders, start=1):
patient_id = str(order.norm or order.rnoreg or "").strip()
sample_id = str(order.rnoreg or "").strip()
assay_name = str(order.tes or "").strip()
test_code = resolve_genexpert_test_code(order) or GENEXPERT_TEST_MAPPING.get(assay_name) or DEFAULT_GXP_CODE
order_ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
segments.append(f"PID|{patient_idx}||{patient_id}")
segments.append(f"ORC|NW|{patient_idx}|||||||{order_ts}")
segments.append(f"OBR|{patient_idx}|||{test_code}|||||||A")
segments.append("TQ1|||||||||R")
segments.append(f"SPM|{patient_idx}|{sample_id}^||ORH|||||||P")
test_codes = supported_codes if supported_codes else []
if not test_codes:
print(f"[GENEXPERT] Tidak ada test_code capability untuk payload ke {ip_addr}")
for order_idx, test_code in enumerate(test_codes, start=1):
segments.append(f"ORC|NW|{order_idx}|||||||{order_ts}")
segments.append(f"OBR|{order_idx}|||{test_code}|||||||A")
segments.append("TQ1|||||||||R")
segments.append(f"SPM|{order_idx}|{sample_id}^||ORH|||||||P")
return "\r".join(segments) + "\r"
@@ -388,14 +405,14 @@ def send_all_orders(conn, ip_addr, hl7_msg, msg_id):
scheduled_orders = []
if not orders:
print(f"[GENEXPERT] Tidak ada order pending untuk {ip_addr}")
rsp = create_genexpert_rsp_z02_response([], hl7_msg)
rsp = create_genexpert_rsp_z02_response([], hl7_msg, ip_addr=ip_addr)
log_genexpert_hl7("OUT", ip_addr, rsp, label="qbp-empty")
log_genexpert_hl7_full("OUT", ip_addr, rsp, label="qbp-empty")
conn.sendall(f"\x0b{rsp}\x1c\r".encode('utf-8'))
return
print(f"[GENEXPERT] Mengirim {len(orders)} order ke {ip_addr}")
rsp = create_genexpert_rsp_z02_response(orders, hl7_msg)
rsp = create_genexpert_rsp_z02_response(orders, hl7_msg, ip_addr=ip_addr)
mllp = f"\x0b{rsp}\x1c\r"
first_accnumber = str(orders[0].rnoreg or "").strip() if orders else ""
log_genexpert_hl7("OUT", ip_addr, rsp, label=f"qbp-order:{first_accnumber}")
@@ -547,6 +564,10 @@ def stop_scheduled_result_query(accnumber, reason="completed"):
return True
def scheduled_result_query_worker(accnumber):
if not GENEXPERT_ENABLE_RESULT_QUERY_SCHEDULER:
stop_scheduled_result_query(accnumber, reason="disabled")
return
while True:
if not get_active_genexpert_ips():
stop_all_scheduled_result_queries(reason="no-active-genexpert")
@@ -691,7 +712,7 @@ def trigger_result_query_to_genexpert(accnumber, register_no, target_ip=None, wa
if not GENEXPERT_ENABLE_RESULT_QUERY_SCHEDULER:
return {
"ok": False,
"message": "Auto query hasil GeneXpert sedang dinonaktifkan sementara.",
"message": "Mode GeneXpert pasif aktif. Host hanya menjawab request dari alat.",
}
active_ips = get_active_genexpert_ips()
@@ -804,6 +825,12 @@ def trigger_result_query_to_genexpert(accnumber, register_no, target_ip=None, wa
@app.route("/api/genexpert/query-result", methods=["POST"])
def api_query_genexpert_result():
if not GENEXPERT_ENABLE_RESULT_QUERY_SCHEDULER:
return jsonify({
"ok": False,
"message": "Mode GeneXpert pasif aktif. Host hanya menjawab request dari alat.",
}), 409
payload = request.get_json(silent=True) or {}
accnumber = str(payload.get("accnumber") or "").strip()
register_no = str(payload.get("register_no") or payload.get("nomor_register") or "").strip()
@@ -1954,102 +1981,13 @@ def handle_genexpert_client(conn, addr):
incoming_control_id = msh_fields[9] if len(msh_fields) > 9 else "UNKNOWN"
msg_id = incoming_control_id
# ==========================================
# SKENARIO 1: ALAT BERTANYA (QUERY / QRY)
# SKENARIO 1: LEGACY QRY TIDAK DIDUKUNG UNTUK GENEXPERT
# ==========================================
if "QRY^" in clean_hl7 or "QRY|" in clean_hl7:
client_ip = addr[0]
logging.info(f"[QUERY] Request dari IP: {client_ip} (Control ID: {incoming_control_id})")
print(f"[QUERY] Request dari IP: {client_ip} (Control ID: {incoming_control_id})")
# 1. Tentukan Kolom Flag mana yang harus dicek berdasarkan IP
# Kita balik mappingnya: IP -> Nama Kolom
target_flag_col = None
for col_name, ip_addr in TARGET_MAPPING.items():
if ip_addr == client_ip:
target_flag_col = col_name
break
if not target_flag_col:
logging.warning(f"[DENIED] IP {client_ip} tidak terdaftar di TARGET_MAPPING. Abaikan.")
print(f"[DENIED] IP {client_ip} tidak terdaftar di TARGET_MAPPING. Abaikan.")
# Kirim jawaban kosong agar alat tidak hang
reply_msg = create_hl7_dsr_response(None, incoming_control_id, "")
log_genexpert_hl7("OUT", client_ip, reply_msg, label="query-denied-empty")
log_genexpert_hl7_full("OUT", client_ip, reply_msg, label="query-denied-empty")
conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8'))
continue # Skip proses selanjutnya
print(f"[TARGET] IP {client_ip} akan mengecek kolom: {target_flag_col}")
# 2. Cari Sample ID di pesan QRY
search_sample_id = None
qrd_line = ""
for line in lines:
if line.startswith("QRD|"):
qrd_line = line
fields = line.split('|')
if len(fields) > 8:
search_sample_id = fields[8]
break
if search_sample_id:
session = SessionLocal()
try:
# 3. Query Database Dinamis (Pakai getattr)
# Kita cari RN yang cocok DAN (Flag Kolom Tersebut False ATAU Null)
# Ambil atribut kolom Paslab berdasarkan nama string (misal: Paslab.flg_gxp3)
flag_attr = getattr(PaslabOrder, target_flag_col, None)
if flag_attr is None:
logging.error(f"Kolom '{target_flag_col}' tidak ditemukan di Model Paslab!")
print(f"Kolom '{target_flag_col}' tidak ditemukan di Model Paslab!")
raise Exception("Invalid Column Name")
print(f"[DB LOOKUP] Mencari {search_sample_id} dimana {target_flag_col} == False")
order = session.query(PaslabOrder).filter(
PaslabOrder.rnoreg == search_sample_id,
(flag_attr == False) | (flag_attr == None)
).first()
# Variabel kontrol kirim
send_order = False
if order:
# (Opsional) Validasi Kapabilitas Mesin di sini jika perlu
# ...
send_order = True
# 4. Kirim Respon
if send_order and order:
print(f"[FOUND] Order ditemukan: {order.nama}. Mengirim ke {client_ip}...")
reply_msg = create_hl7_dsr_response(order, incoming_control_id, qrd_line)
log_genexpert_hl7("OUT", client_ip, reply_msg, label=f"query-order:{search_sample_id}")
log_genexpert_hl7_full("OUT", client_ip, reply_msg, label=f"query-order:{search_sample_id}")
conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8'))
schedule_result_query_for_order(
accnumber=str(order.rnoreg or "").strip(),
register_no=str(order.rnoreg or "").strip(),
target_ip=client_ip,
)
else:
print(f"[NOT FOUND/ALREADY SENT] Tidak ada order baru untuk {search_sample_id} di kolom {target_flag_col}")
# Kirim DSR Kosong (NF)
reply_msg = create_hl7_dsr_response(None, incoming_control_id, qrd_line)
log_genexpert_hl7("OUT", client_ip, reply_msg, label=f"query-empty:{search_sample_id}")
log_genexpert_hl7_full("OUT", client_ip, reply_msg, label=f"query-empty:{search_sample_id}")
conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8'))
except Exception as db_err:
logging.error(f"Database Error: {db_err}")
print(f"Database Error: {db_err}")
session.rollback()
finally:
session.close()
# Selesai tangani QRY, jangan lanjut parsing umum
print(
f"[GENEXPERT] Legacy QRY diterima dari {addr[0]} tetapi diabaikan. "
"Host hanya mendukung QBP^Z01/QBP^Z03 untuk order query."
)
continue
# ==========================================
@@ -2062,10 +2000,10 @@ def handle_genexpert_client(conn, addr):
parse_hl7_result(conn, msg_id, clean_hl7, device_name=f"GeneXpert-{addr[0]}")
# 2. Kirim ACK (Terima Kasih)
ack_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
ack_msg = f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{ack_time}||ACK|{incoming_control_id}|P|2.5\rMSA|AA|{incoming_control_id}\r"
ack_msg = create_genexpert_ack_r01_response(clean_hl7)
full_ack = f"\x0b{ack_msg}\x1c\r"
log_genexpert_hl7("OUT", addr[0], ack_msg, label="oru-ack")
log_genexpert_hl7_full("OUT", addr[0], ack_msg, label="oru-ack")
conn.sendall(full_ack.encode('utf-8'))
print(f"[ACK SENT] Untuk hasil ID {incoming_control_id}")
continue
@@ -2073,7 +2011,7 @@ def handle_genexpert_client(conn, addr):
# ==========================================
# SKENARIO 3: Handle Pesan Apa Adanya
# ==========================================
elif "QBP^Z03" in clean_hl7:
elif "QBP^Z01" in clean_hl7 or "QBP^Z03" in clean_hl7:
print("[GENEXPERT] Alat meminta ORDER")
msg_id = extract_msg_control_id(clean_hl7)
send_all_orders(conn, addr[0], clean_hl7, msg_id)