diff --git a/listener/app.py b/listener/app.py index 5cb1375d..dae0b81e 100644 --- a/listener/app.py +++ b/listener/app.py @@ -12,7 +12,7 @@ import datetime import traceback import serial # type: ignore -from flask import Flask, jsonify # type: ignore +from flask import Flask, jsonify, request # type: ignore from sqlalchemy import create_engine, Column, Integer, String, Boolean, Text # type: ignore from sqlalchemy import DateTime as SqDateTime # type: ignore from sqlalchemy import Date as SqDate # type: ignore @@ -25,6 +25,7 @@ from sqlalchemy.orm import declarative_base, sessionmaker # type: ignore # Network Configuration TCP_LISTENER_PORT = 6001 # PC GeneXpert set ke mode Client, konek ke IP:PORT ini SERVER_HOST = '0.0.0.0' # Listen di semua interface +HTTP_API_PORT = 6002 # Endpoint trigger dari Laravel -> Python # Mapping Flag ke IP Address GeneXpert # Pastikan IP ini SESUAI dengan settingan "Server IP" di masing-masing alat (Client Mode) TARGET_MAPPING = { @@ -77,6 +78,8 @@ logging.basicConfig(level=logging.INFO, handlers=[log_handler]) app = Flask(__name__) active_genexpert_connections = {} connection_lock = threading.Lock() +pending_result_queries = {} +pending_query_lock = threading.Lock() DEVICE_CONFIGS = [ { 'port': 'COM6', 'baud_rate': 9600, 'device_type': 'vitek', 'alat_name': 'Vitek 1', @@ -244,6 +247,115 @@ def extract_msg_control_id(hl7_message): return None except: return None + +def build_genexpert_result_query(accnumber, msg_control_id): + ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + # Query hasil berbasis accession number di QRD-8. + query_msg = ( + f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{ts}||QRY^Q02|{msg_control_id}|P|2.5\r" + f"QRD|{ts}|R|I|{msg_control_id}|||1^RD|{accnumber}|OTH|||T\r" + ) + return query_msg + +def select_target_genexpert_ip(preferred_ip=None): + with connection_lock: + if preferred_ip: + if preferred_ip in active_genexpert_connections: + return preferred_ip + return None + + active_ips = list(active_genexpert_connections.keys()) + if len(active_ips) == 1: + return active_ips[0] + return None + +def trigger_result_query_to_genexpert(accnumber, register_no, target_ip=None, wait_seconds=20): + resolved_ip = select_target_genexpert_ip(target_ip) + if not resolved_ip: + return {"ok": False, "message": "Koneksi GeneXpert tidak ditemukan atau ambigu. Isi target_ip."} + + with connection_lock: + conn = active_genexpert_connections.get(resolved_ip) + if not conn: + return {"ok": False, "message": f"Koneksi GeneXpert {resolved_ip} tidak aktif."} + + ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + msg_control_id = f"LISQRY{ts}{int(time.time() * 1000) % 1000:03d}" + query_message = build_genexpert_result_query(accnumber, msg_control_id) + mllp_payload = f"\x0b{query_message}\x1c\r".encode("utf-8") + + pending_event = threading.Event() + with pending_query_lock: + pending_result_queries[accnumber] = { + "register_no": register_no, + "target_ip": resolved_ip, + "msg_control_id": msg_control_id, + "requested_at": datetime.datetime.now(), + "event": pending_event, + "status": "requested", + } + + try: + conn.sendall(mllp_payload) + logging.info(f"[GENEXPERT-QUERY] Kirim query hasil accnumber={accnumber} ke {resolved_ip}") + print(f"[GENEXPERT-QUERY] Kirim query hasil accnumber={accnumber} ke {resolved_ip}") + except Exception as e: + with pending_query_lock: + pending_result_queries.pop(accnumber, None) + return {"ok": False, "message": f"Gagal kirim query ke GeneXpert {resolved_ip}: {e}"} + + if wait_seconds and wait_seconds > 0: + pending_event.wait(wait_seconds) + with pending_query_lock: + state = pending_result_queries.get(accnumber) + if state and state.get("status") == "found": + pending_result_queries.pop(accnumber, None) + return { + "ok": True, + "message": "Hasil ditemukan dan disimpan ke LisPhoenix.", + "target_ip": resolved_ip, + "accnumber": accnumber, + } + # Timeout/hasil belum masuk, biarkan state tetap ada agar response telat tetap bisa diproses. + return { + "ok": True, + "message": "Query terkirim. Menunggu hasil dari GeneXpert.", + "target_ip": resolved_ip, + "accnumber": accnumber, + } + + return { + "ok": True, + "message": "Query terkirim.", + "target_ip": resolved_ip, + "accnumber": accnumber, + } + +@app.route("/api/genexpert/query-result", methods=["POST"]) +def api_query_genexpert_result(): + payload = request.get_json(silent=True) or {} + accnumber = str(payload.get("accnumber") or "").strip() + register_no = str(payload.get("register_no") or payload.get("nomor_register") or "").strip() + target_ip = str(payload.get("target_ip") or "").strip() or None + + try: + wait_seconds = int(payload.get("wait_seconds", 20)) + except Exception: + wait_seconds = 20 + + if not accnumber: + return jsonify({"ok": False, "message": "Field 'accnumber' wajib diisi."}), 400 + if not register_no: + return jsonify({"ok": False, "message": "Field 'register_no' (nomor register pasien) wajib diisi."}), 400 + + result = trigger_result_query_to_genexpert( + accnumber=accnumber, + register_no=register_no, + target_ip=target_ip, + wait_seconds=wait_seconds, + ) + status_code = 200 if result.get("ok") else 409 + return jsonify(result), status_code def parse_hl7_result(conn, msg_id, hl7_message, device_name="GeneXpert"): session = SessionLocal() @@ -325,30 +437,47 @@ def parse_hl7_result(conn, msg_id, hl7_message, device_name="GeneXpert"): else: final_result = "; ".join(results_list) - # 4. Validasi Data Penting (DIMODIFIKASI) + # 4. Validasi Data Penting if not sample_id: - send_all_orders(conn, device_name, clean_hl7, msg_id) - print("[HL7 Parser] Meminta order") + logging.warning("[HL7 Parser] ORU diterima tanpa accession/sample_id. Data diabaikan.") + print("[HL7 Parser] ORU diterima tanpa accession/sample_id. Data diabaikan.") return + mapped_no_id = sample_id + mapped_seq_no = patient_id + mapped_alat = device_name + pending_event = None + + with pending_query_lock: + pending = pending_result_queries.get(sample_id) + if pending: + mapped_no_id = pending.get("register_no") or sample_id + mapped_seq_no = sample_id + mapped_alat = f"GeneXpert-{pending.get('target_ip')}" + pending["status"] = "found" + pending["response_at"] = datetime.datetime.now() + pending_event = pending.get("event") + logging.info(f"[HL7 Parser] Menyimpan hasil untuk Sample: {sample_id}") print(f"[HL7 Parser] Menyimpan hasil untuk Sample: {sample_id}") # 5. Simpan ke Database new_data = LisPhoenix( - no_id=sample_id, - seq_no=patient_id, + no_id=mapped_no_id, + seq_no=mapped_seq_no, rnmpas=patient_name, tgl_data=result_date, rawdt=hl7_message, organisme=final_result, - alat=device_name + alat=mapped_alat ) session.add(new_data) session.commit() logging.info(f"[DB] Berhasil simpan ke LisPhoenix: {sample_id}") print(f"[DB] Berhasil simpan ke LisPhoenix: {sample_id}") + if pending_event: + pending_event.set() except Exception as e: logging.error(f"[HL7 Parser] Error menyimpan data: {e}") @@ -450,15 +579,26 @@ def manage_tcp_server(): logging.critical(f"[TCP-SERVER] Gagal Start: {e}") print(f"[TCP-SERVER] Gagal Start: {e}") +def run_http_api_server(): + logging.info(f"[HTTP-API] Listening di port {HTTP_API_PORT}...") + print(f"[HTTP-API] Listening di port {HTTP_API_PORT}...") + app.run(host=SERVER_HOST, port=HTTP_API_PORT, debug=False, use_reloader=False, threaded=True) + def handle_genexpert_client(conn, addr): print(f"[GenExpert_TCP] Koneksi baru dari {addr}") buffer = b"" + conn.settimeout(60) + client_ip = addr[0] + with connection_lock: + active_genexpert_connections[client_ip] = conn + logging.info(f"[GenExpert_TCP] Register koneksi aktif {client_ip}") try: while True: try: data = conn.recv(4096) if not data: + logging.info(f"[GenExpert_TCP] Client {addr} menutup koneksi.") break buffer += data @@ -519,6 +659,7 @@ def handle_genexpert_client(conn, addr): lines = clean_hl7.split('\r') msh_fields = lines[0].split('|') incoming_control_id = msh_fields[9] if len(msh_fields) > 9 else "UNKNOWN" + msg_id = incoming_control_id # ========================================== # SKENARIO 1: ALAT BERTANYA (QUERY / QRY) # ========================================== @@ -625,7 +766,7 @@ def handle_genexpert_client(conn, addr): logging.info(f"[RESULT] Menerima Hasil Lab.") # 1. Parse dan Simpan Hasil (Panggil fungsi parser Anda) - parse_hl7_result(clean_hl7, msg_id, clean_hl7, device_name=f"GeneXpert-{addr[0]}") + parse_hl7_result(conn, msg_id, clean_hl7, device_name=f"GeneXpert-{addr[0]}") # 2. Kirim ACK (Terima Kasih) ack_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S') @@ -670,15 +811,31 @@ def handle_genexpert_client(conn, addr): # Jika pesan lengkap tapi tidak ada MSH (misal cuma EOT doang) pass + except ConnectionResetError: + logging.warning(f"[GenExpert_TCP] Connection reset by peer: {addr}") + break + except OSError as e: + if getattr(e, "winerror", None) == 10054: + logging.warning(f"[GenExpert_TCP] WinError 10054 dari {addr}") + break + raise + except socket.timeout: + continue except Exception as e: print(f"[Loop Error] {e}") - # Jangan break, lanjut terima data - time.sleep(0.1) + logging.exception(f"[Loop Error] Unexpected error from {addr}: {e}") + break except Exception as e: logging.error(f"[GenExpert_TCP Error] Koneksi {addr} terputus: {e}") finally: - conn.close() + with connection_lock: + if active_genexpert_connections.get(client_ip) is conn: + del active_genexpert_connections[client_ip] + try: + conn.close() + except Exception: + pass logging.info(f"[GenExpert_TCP] Koneksi {addr} ditutup.") def send_order_via_active_connection(target_ip, hl7_message): @@ -1562,6 +1719,11 @@ if __name__ == "__main__": t_tcp.start() all_threads.append(t_tcp) + # 4. Start Thread HTTP API (Trigger dari Laravel) + t_http = threading.Thread(target=run_http_api_server, name="Manager-HTTP-API", daemon=True) + t_http.start() + all_threads.append(t_http) + # 5. LOOP UTAMA (Keep-Alive & Monitoring) # Ini sekarang bisa berjalan karena Flask sudah dipindah ke thread try: