diff --git a/listener/app.py b/listener/app.py index 8cb8b012..85ff4ff7 100644 --- a/listener/app.py +++ b/listener/app.py @@ -1,6 +1,8 @@ +import builtins from enum import Enum from logging import config from logging.handlers import TimedRotatingFileHandler +import os from queue import Queue import re import socket @@ -30,6 +32,33 @@ formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(threadName)s - %( log_handler.setFormatter(formatter) logging.basicConfig(level=logging.INFO, handlers=[log_handler]) +THREAD_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "thread_logs") +thread_log_lock = threading.Lock() + +def _sanitize_thread_log_name(thread_name): + safe_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", str(thread_name or "main").strip()) + return safe_name or "main" + +def _write_thread_log(message): + try: + os.makedirs(THREAD_LOG_DIR, exist_ok=True) + thread_name = threading.current_thread().name + safe_thread_name = _sanitize_thread_log_name(thread_name) + log_path = os.path.join(THREAD_LOG_DIR, f"{safe_thread_name}.log") + timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + with thread_log_lock: + with open(log_path, "a", encoding="utf-8") as fh: + fh.write(f"{timestamp} | {message}\n") + except Exception: + pass + +def print(*args, **kwargs): + sep = kwargs.get("sep", " ") + end = kwargs.get("end", "\n") + message = sep.join(str(arg) for arg in args) + _write_thread_log(message) + return builtins.print(*args, **kwargs) + # ========================================== # 1. KONFIGURASI SISTEM # ========================================== @@ -82,7 +111,7 @@ GENEXPERT_TEST_MAPPING = { GENEXPERT_IP_CAPABILITIES = { "10.10.120.75": ["MTB-RIF", "MTB-RIF_ULTRA2", "MTB-XDR", "HIV-1_VL", "COV-2 2"], "10.10.120.74": ["HCV", "HBV"], - "10.10.120.73": ["MTB-RIF"] # Belum ada yang aktif + "10.10.120.73": ["MTB-RIF"] } # Default code jika nama tes di database tidak dikenali DEFAULT_GXP_CODE = "MTB-RIF" @@ -309,34 +338,11 @@ def get_genexpert_query_orders(ip_addr, hl7_msg): for order in base_orders: if str(order.rnoreg or "").strip() != requested_sample_id: continue - test_code = resolve_genexpert_test_code(order, ip_addr) - if test_code: - return [order] - print( - f"[GENEXPERT] Tidak ada order cocok/didukung untuk sample_id={requested_sample_id} di {ip_addr}" - ) + return [order] + print(f"[GENEXPERT] Tidak ada order untuk sample_id={requested_sample_id} di {ip_addr}") return [] - selected_orders = [] - selected_codes = set() - max_orders = max(len(supported_codes), 1) - - for supported_code in supported_codes: - if len(selected_orders) >= max_orders: - break - for order in base_orders: - if any(existing.urut == order.urut for existing in selected_orders): - continue - test_code = resolve_genexpert_test_code(order, ip_addr) - if test_code != supported_code: - continue - if test_code in selected_codes: - continue - selected_orders.append(order) - selected_codes.add(test_code) - break - - return selected_orders + return base_orders[:1] finally: session.close() @@ -354,12 +360,20 @@ def create_genexpert_ack_j01_response(incoming_hl7): msa = f"MSA|CA|{incoming_control_id}" return f"{msh}\r{msa}\r" -def create_genexpert_rsp_z02_response(orders, incoming_hl7): +def create_genexpert_ack_r01_response(incoming_hl7): + incoming_control_id = extract_msg_control_id(incoming_hl7) or "UNKNOWN" + resp_control_id = f"ACK{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}" + msh = build_genexpert_response_msh("ACK^R01", incoming_hl7, resp_control_id) + msa = f"MSA|CA|{incoming_control_id}" + return f"{msh}\r{msa}\r" + +def create_genexpert_rsp_z02_response(orders, incoming_hl7, ip_addr=None): qpd_segment = extract_segment(incoming_hl7, "QPD") qpd = parse_genexpert_qpd(qpd_segment) query_tag = qpd.get("query_tag") or (extract_msg_control_id(incoming_hl7) or "UNKNOWN") query_name = qpd.get("query_name") or "Z03^HOST QUERY" resp_control_id = f"RSP{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}" + supported_codes = GENEXPERT_IP_CAPABILITIES.get(str(ip_addr or "").strip(), []) msh = build_genexpert_response_msh("RSP^Z02", incoming_hl7, resp_control_id) msa = f"MSA|AA|{query_tag}" @@ -371,15 +385,18 @@ def create_genexpert_rsp_z02_response(orders, incoming_hl7): for patient_idx, order in enumerate(orders, start=1): patient_id = str(order.norm or order.rnoreg or "").strip() sample_id = str(order.rnoreg or "").strip() - assay_name = str(order.tes or "").strip() - test_code = resolve_genexpert_test_code(order) or GENEXPERT_TEST_MAPPING.get(assay_name) or DEFAULT_GXP_CODE order_ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - segments.append(f"PID|{patient_idx}||{patient_id}") - segments.append(f"ORC|NW|{patient_idx}|||||||{order_ts}") - segments.append(f"OBR|{patient_idx}|||{test_code}|||||||A") - segments.append("TQ1|||||||||R") - segments.append(f"SPM|{patient_idx}|{sample_id}^||ORH|||||||P") + + test_codes = supported_codes if supported_codes else [] + if not test_codes: + print(f"[GENEXPERT] Tidak ada test_code capability untuk payload ke {ip_addr}") + + for order_idx, test_code in enumerate(test_codes, start=1): + segments.append(f"ORC|NW|{order_idx}|||||||{order_ts}") + segments.append(f"OBR|{order_idx}|||{test_code}|||||||A") + segments.append("TQ1|||||||||R") + segments.append(f"SPM|{order_idx}|{sample_id}^||ORH|||||||P") return "\r".join(segments) + "\r" @@ -388,14 +405,14 @@ def send_all_orders(conn, ip_addr, hl7_msg, msg_id): scheduled_orders = [] if not orders: print(f"[GENEXPERT] Tidak ada order pending untuk {ip_addr}") - rsp = create_genexpert_rsp_z02_response([], hl7_msg) + rsp = create_genexpert_rsp_z02_response([], hl7_msg, ip_addr=ip_addr) log_genexpert_hl7("OUT", ip_addr, rsp, label="qbp-empty") log_genexpert_hl7_full("OUT", ip_addr, rsp, label="qbp-empty") conn.sendall(f"\x0b{rsp}\x1c\r".encode('utf-8')) return print(f"[GENEXPERT] Mengirim {len(orders)} order ke {ip_addr}") - rsp = create_genexpert_rsp_z02_response(orders, hl7_msg) + rsp = create_genexpert_rsp_z02_response(orders, hl7_msg, ip_addr=ip_addr) mllp = f"\x0b{rsp}\x1c\r" first_accnumber = str(orders[0].rnoreg or "").strip() if orders else "" log_genexpert_hl7("OUT", ip_addr, rsp, label=f"qbp-order:{first_accnumber}") @@ -547,6 +564,10 @@ def stop_scheduled_result_query(accnumber, reason="completed"): return True def scheduled_result_query_worker(accnumber): + if not GENEXPERT_ENABLE_RESULT_QUERY_SCHEDULER: + stop_scheduled_result_query(accnumber, reason="disabled") + return + while True: if not get_active_genexpert_ips(): stop_all_scheduled_result_queries(reason="no-active-genexpert") @@ -691,7 +712,7 @@ def trigger_result_query_to_genexpert(accnumber, register_no, target_ip=None, wa if not GENEXPERT_ENABLE_RESULT_QUERY_SCHEDULER: return { "ok": False, - "message": "Auto query hasil GeneXpert sedang dinonaktifkan sementara.", + "message": "Mode GeneXpert pasif aktif. Host hanya menjawab request dari alat.", } active_ips = get_active_genexpert_ips() @@ -804,6 +825,12 @@ def trigger_result_query_to_genexpert(accnumber, register_no, target_ip=None, wa @app.route("/api/genexpert/query-result", methods=["POST"]) def api_query_genexpert_result(): + if not GENEXPERT_ENABLE_RESULT_QUERY_SCHEDULER: + return jsonify({ + "ok": False, + "message": "Mode GeneXpert pasif aktif. Host hanya menjawab request dari alat.", + }), 409 + payload = request.get_json(silent=True) or {} accnumber = str(payload.get("accnumber") or "").strip() register_no = str(payload.get("register_no") or payload.get("nomor_register") or "").strip() @@ -1954,102 +1981,13 @@ def handle_genexpert_client(conn, addr): incoming_control_id = msh_fields[9] if len(msh_fields) > 9 else "UNKNOWN" msg_id = incoming_control_id # ========================================== - # SKENARIO 1: ALAT BERTANYA (QUERY / QRY) + # SKENARIO 1: LEGACY QRY TIDAK DIDUKUNG UNTUK GENEXPERT # ========================================== if "QRY^" in clean_hl7 or "QRY|" in clean_hl7: - client_ip = addr[0] - logging.info(f"[QUERY] Request dari IP: {client_ip} (Control ID: {incoming_control_id})") - print(f"[QUERY] Request dari IP: {client_ip} (Control ID: {incoming_control_id})") - - # 1. Tentukan Kolom Flag mana yang harus dicek berdasarkan IP - # Kita balik mappingnya: IP -> Nama Kolom - target_flag_col = None - for col_name, ip_addr in TARGET_MAPPING.items(): - if ip_addr == client_ip: - target_flag_col = col_name - break - - if not target_flag_col: - logging.warning(f"[DENIED] IP {client_ip} tidak terdaftar di TARGET_MAPPING. Abaikan.") - print(f"[DENIED] IP {client_ip} tidak terdaftar di TARGET_MAPPING. Abaikan.") - # Kirim jawaban kosong agar alat tidak hang - reply_msg = create_hl7_dsr_response(None, incoming_control_id, "") - log_genexpert_hl7("OUT", client_ip, reply_msg, label="query-denied-empty") - log_genexpert_hl7_full("OUT", client_ip, reply_msg, label="query-denied-empty") - conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8')) - continue # Skip proses selanjutnya - - print(f"[TARGET] IP {client_ip} akan mengecek kolom: {target_flag_col}") - - # 2. Cari Sample ID di pesan QRY - search_sample_id = None - qrd_line = "" - for line in lines: - if line.startswith("QRD|"): - qrd_line = line - fields = line.split('|') - if len(fields) > 8: - search_sample_id = fields[8] - break - - if search_sample_id: - session = SessionLocal() - try: - # 3. Query Database Dinamis (Pakai getattr) - # Kita cari RN yang cocok DAN (Flag Kolom Tersebut False ATAU Null) - - # Ambil atribut kolom Paslab berdasarkan nama string (misal: Paslab.flg_gxp3) - flag_attr = getattr(PaslabOrder, target_flag_col, None) - - if flag_attr is None: - logging.error(f"Kolom '{target_flag_col}' tidak ditemukan di Model Paslab!") - print(f"Kolom '{target_flag_col}' tidak ditemukan di Model Paslab!") - raise Exception("Invalid Column Name") - - print(f"[DB LOOKUP] Mencari {search_sample_id} dimana {target_flag_col} == False") - - order = session.query(PaslabOrder).filter( - PaslabOrder.rnoreg == search_sample_id, - (flag_attr == False) | (flag_attr == None) - ).first() - - # Variabel kontrol kirim - send_order = False - - if order: - # (Opsional) Validasi Kapabilitas Mesin di sini jika perlu - # ... - send_order = True - - # 4. Kirim Respon - if send_order and order: - print(f"[FOUND] Order ditemukan: {order.nama}. Mengirim ke {client_ip}...") - - reply_msg = create_hl7_dsr_response(order, incoming_control_id, qrd_line) - log_genexpert_hl7("OUT", client_ip, reply_msg, label=f"query-order:{search_sample_id}") - log_genexpert_hl7_full("OUT", client_ip, reply_msg, label=f"query-order:{search_sample_id}") - conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8')) - schedule_result_query_for_order( - accnumber=str(order.rnoreg or "").strip(), - register_no=str(order.rnoreg or "").strip(), - target_ip=client_ip, - ) - - else: - print(f"[NOT FOUND/ALREADY SENT] Tidak ada order baru untuk {search_sample_id} di kolom {target_flag_col}") - # Kirim DSR Kosong (NF) - reply_msg = create_hl7_dsr_response(None, incoming_control_id, qrd_line) - log_genexpert_hl7("OUT", client_ip, reply_msg, label=f"query-empty:{search_sample_id}") - log_genexpert_hl7_full("OUT", client_ip, reply_msg, label=f"query-empty:{search_sample_id}") - conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8')) - - except Exception as db_err: - logging.error(f"Database Error: {db_err}") - print(f"Database Error: {db_err}") - session.rollback() - finally: - session.close() - # Selesai tangani QRY, jangan lanjut parsing umum + print( + f"[GENEXPERT] Legacy QRY diterima dari {addr[0]} tetapi diabaikan. " + "Host hanya mendukung QBP^Z01/QBP^Z03 untuk order query." + ) continue # ========================================== @@ -2062,10 +2000,10 @@ def handle_genexpert_client(conn, addr): parse_hl7_result(conn, msg_id, clean_hl7, device_name=f"GeneXpert-{addr[0]}") # 2. Kirim ACK (Terima Kasih) - ack_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - ack_msg = f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{ack_time}||ACK|{incoming_control_id}|P|2.5\rMSA|AA|{incoming_control_id}\r" + ack_msg = create_genexpert_ack_r01_response(clean_hl7) full_ack = f"\x0b{ack_msg}\x1c\r" log_genexpert_hl7("OUT", addr[0], ack_msg, label="oru-ack") + log_genexpert_hl7_full("OUT", addr[0], ack_msg, label="oru-ack") conn.sendall(full_ack.encode('utf-8')) print(f"[ACK SENT] Untuk hasil ID {incoming_control_id}") continue @@ -2073,7 +2011,7 @@ def handle_genexpert_client(conn, addr): # ========================================== # SKENARIO 3: Handle Pesan Apa Adanya # ========================================== - elif "QBP^Z03" in clean_hl7: + elif "QBP^Z01" in clean_hl7 or "QBP^Z03" in clean_hl7: print("[GENEXPERT] Alat meminta ORDER") msg_id = extract_msg_control_id(clean_hl7) send_all_orders(conn, addr[0], clean_hl7, msg_id)