diff --git a/listener/app.py b/listener/app.py index 7f8ebd4c..02092249 100644 --- a/listener/app.py +++ b/listener/app.py @@ -99,6 +99,8 @@ DEVICE_CONFIGS = [ ] MYLA_HOST = '10.10.120.89' MYLA_PORT = 8000 +MYLA_INBOUND_HOST = '0.0.0.0' +MYLA_INBOUND_PORT = 8000 MYLA_POLL_INTERVAL_SECONDS = 1 MYLA_CONNECT_RETRY_SECONDS = 5 MYLA_CONTROL_TIMEOUT_SECONDS = 6 @@ -939,34 +941,30 @@ def send_order_to_myla_astm(conn, order, peer_ip): return True def create_myla_hl7_order_message(order, msg_control_id): - timestamp = datetime.datetime.now().astimezone().strftime('%Y%m%d%H%M%S%z') - sample_id = sanitize_astm_field(order.rnoreg, max_len=32) - pid_norm = sanitize_astm_field(order.norm, max_len=32) + timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + sample_id = sanitize_astm_field(order.rnoreg, uppercase=True, max_len=32) or "000000" + pid_norm = sanitize_astm_field(order.norm, uppercase=True, max_len=32) or f"PID{sample_id}" + test_code = sanitize_astm_field(order.tes, uppercase=True, max_len=40) or "ID" + specimen_code = sanitize_astm_field(order.kd_spesimen, uppercase=True, max_len=20) or "URC" - first_name, last_name = split_patient_name(sanitize_astm_field(order.nama, max_len=80)) - pid_name = f"{last_name}^{first_name}" + # Sesuaikan pattern ID seperti contoh vendor (SPE/AWOS + accession). + compact_id = re.sub(r"[^A-Z0-9]", "", sample_id) or "000000" + spm_id = f"SPE{compact_id}"[:30] + obr_order_id = f"AWOS{compact_id}"[:30] - raw_gender = sanitize_astm_field(order.rjenis, uppercase=True, max_len=10) - pid_gender = 'M' if raw_gender.startswith("L") else 'F' - - room = sanitize_astm_field(getattr(order, "ruangan", "") or "RSSA MALANG", uppercase=True, max_len=30) - nama_tes_db = sanitize_astm_field(order.tes, max_len=60) - test_code = sanitize_astm_field(order.tes, uppercase=True, max_len=40) or "GENERAL" - - # MyLA cenderung strict: minta MSH.9 lengkap (message^trigger^structure). msh = ( - f"MSH|^~\\&|LIS|LAB|MYLA|bioMerieux|{timestamp}||OML^O33^OML_O33|{msg_control_id}|P|2.5.1" - f"||||||UNICODE UTF-8" + f"MSH|^~\\&|LIS|LAB|MYLA|BMX|{timestamp}||OML^O33^OML_O33|{msg_control_id}|P|2.5.1" + f"|||NE|AL||UNICODE UTF-8" ) - pid = f"PID|1||{pid_norm}||{pid_name}|||{pid_gender}" - pv1 = f"PV1|1|I|{room}" - - # ORC.9 (Date/Time of Transaction) wajib terisi untuk MYLA. - orc_fields = ["ORC", "NW", sample_id, "", "", "", "", "", "", timestamp] - orc = "|".join(orc_fields) - - obr = f"OBR|1|{sample_id}||{test_code}^{nama_tes_db}^L|||{timestamp}" - return f"{msh}\r{pid}\r{pv1}\r{orc}\r{obr}\r" + pid = f"PID|||{pid_norm}" + pv1 = "PV1||O" + spm = f"SPM|1|{spm_id}||{specimen_code}^{specimen_code}^99BMx|||||||P^^HL70369||||||{timestamp}" + sac = f"SAC|||{spm_id}" + # ORC.9 wajib ada; ikuti contoh MyLA: ORC|NW|||||||| + orc = f"ORC|NW||||||||{timestamp}" + tq1 = "TQ1|||||||||R^^HL7048" + obr = f"OBR|1|{obr_order_id}||{test_code}^{test_code}^99BMx" + return f"{msh}\r{pid}\r{pv1}\r{spm}\r{sac}\r{orc}\r{tq1}\r{obr}\r" def process_myla_hl7_message(conn, hl7_str, peer_ip): incoming_control_id = "" @@ -987,9 +985,9 @@ def process_myla_hl7_message(conn, hl7_str, peer_ip): except Exception: pass - if "ORU^" in message_type: - logging.info(f"[MYLA-HL7] Menerima ORU dari {peer_ip} (ID: {incoming_control_id})") - print(f"[MYLA-HL7] Menerima ORU dari {peer_ip} (ID: {incoming_control_id})") + if "ORU^" in message_type or message_type.startswith("OUL^R22"): + logging.info(f"[MYLA-HL7] Menerima hasil dari {peer_ip} (type={message_type}, ID: {incoming_control_id})") + print(f"[MYLA-HL7] Menerima hasil dari {peer_ip} (type={message_type}, ID: {incoming_control_id})") parse_myla_result(hl7_str, device_name=f"MYLA-{peer_ip}") if incoming_control_id: try: @@ -1000,7 +998,7 @@ def process_myla_hl7_message(conn, hl7_str, peer_ip): ) conn.sendall(f"\x0b{ack_msg}\x1c\r".encode("utf-8")) except Exception as e: - logging.warning(f"[MYLA-HL7] Gagal kirim ACK ORU {incoming_control_id}: {e}") + logging.warning(f"[MYLA-HL7] Gagal kirim ACK hasil {incoming_control_id}: {e}") elif message_type.startswith("ACK") or message_type.startswith("ORL^O34"): for seg in hl7_str.split('\r'): if seg.startswith("MSA|"): @@ -1098,6 +1096,37 @@ def wait_for_myla_hl7_ack(conn, expected_control_id, peer_ip, timeout_seconds=MY return False +def pump_myla_incoming(conn, peer_ip, buffer, wait_seconds=0.2): + """ + Proses pesan masuk dari MyLA saat idle (mis. ORU hasil) pada koneksi client yang sama. + """ + deadline = time.time() + max(0.05, wait_seconds) + while time.time() < deadline: + try: + conn.settimeout(max(0.05, deadline - time.time())) + data = conn.recv(4096) + if not data: + raise ConnectionError("Koneksi ditutup oleh MyLA") + buffer += data + + if b"\x1c\r" not in buffer: + continue + + chunks = buffer.split(b"\x1c\r") + for raw_msg in chunks[:-1]: + clean_msg = raw_msg.replace(b"\x0b", b"") + hl7_str = clean_msg.decode("latin-1", errors="ignore") + if "MSH|" not in hl7_str: + continue + hl7_str = hl7_str[hl7_str.find("MSH|"):] + process_myla_hl7_message(conn, hl7_str, peer_ip) + + buffer = chunks[-1] + except socket.timeout: + break + + return buffer + def send_order_to_myla_hl7(conn, order, peer_ip): msg_control_id = f"MYLA{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}{order.urut}" hl7_message = create_myla_hl7_order_message(order, msg_control_id) @@ -1153,9 +1182,9 @@ def handle_myla_client(conn, addr): incoming_control_id = msh_fields[9] except: pass - # --- PROSES HASIL (ORU) --- - if "ORU^" in hl7_str: - logging.info(f"[MYLA] Menerima hasil (ORU) ID: {incoming_control_id}") + # --- PROSES HASIL (ORU / OUL^R22) --- + if "ORU^" in hl7_str or "OUL^R22" in hl7_str: + logging.info(f"[MYLA] Menerima hasil (ORU/OUL) ID: {incoming_control_id}") parse_myla_result(hl7_str, device_name=f"MYLA-{addr[0]}") # --- PROSES QUERY (QRY/QBP) - JIKA MYLA BERTANYA ORDER --- @@ -1261,6 +1290,7 @@ def start_myla_server(host, port): """ while True: conn = None + incoming_buffer = b"" last_idle_log_at = 0.0 try: logging.info(f"[MYLA-CLIENT] Mencoba konek ke MYLA {host}:{port} ...") @@ -1271,6 +1301,7 @@ def start_myla_server(host, port): print(f"[MYLA-CLIENT] Terhubung ke MYLA {host}:{port}") while True: + incoming_buffer = pump_myla_incoming(conn, host, incoming_buffer, wait_seconds=0.15) pending_orders = get_pending_myla_orders(limit=10) if not pending_orders: now_ts = time.time() @@ -1322,6 +1353,36 @@ def start_myla_server(host, port): time.sleep(MYLA_CONNECT_RETRY_SECONDS) +def start_myla_inbound_server(host, port): + """ + Listener TCP inbound untuk menerima hasil dari connector AI_to_LIS_MyLis (BCI/MyLA). + """ + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + try: + server.bind((host, port)) + server.listen(5) + logging.info(f"[MYLA-INBOUND] Listening di {host}:{port}") + print(f"[MYLA-INBOUND] Listening di {host}:{port}") + + while True: + client_socket, addr = server.accept() + client_thread = threading.Thread( + target=handle_myla_client, + args=(client_socket, addr), + daemon=True + ) + client_thread.start() + except Exception as e: + logging.critical(f"[MYLA-INBOUND] Gagal start listener {host}:{port}: {e}") + print(f"[MYLA-INBOUND] Gagal start listener {host}:{port}: {e}") + finally: + try: + server.close() + except Exception: + pass + # ========================================== # HL7 TCP LISTENER FOR GENEXPERT # ========================================== @@ -2554,16 +2615,26 @@ if __name__ == "__main__": t_tcp.start() all_threads.append(t_tcp) - # 3. Start Thread TCP Client (MyLA Server Connector) + # 3. Start Thread TCP Client (LIS -> MyLA: kirim order) myla_thread = threading.Thread( target=start_myla_server, args=(MYLA_HOST, MYLA_PORT), - name="Manager-TCP-MyLA", + name="Manager-TCP-MyLA-Client", daemon=True ) myla_thread.start() all_threads.append(myla_thread) + # 3b. Start Thread TCP Server Inbound (MyLA/BCI -> LIS: kirim hasil) + myla_inbound_thread = threading.Thread( + target=start_myla_inbound_server, + args=(MYLA_INBOUND_HOST, MYLA_INBOUND_PORT), + name="Manager-TCP-MyLA-Inbound", + daemon=True + ) + myla_inbound_thread.start() + all_threads.append(myla_inbound_thread) + # 4. Start Thread HTTP API (Trigger dari Laravel) t_http = threading.Thread(target=run_http_api_server, name="Manager-HTTP-API", daemon=True) t_http.start()