diff --git a/listener/app.py b/listener/app.py index 95416256..7f8ebd4c 100644 --- a/listener/app.py +++ b/listener/app.py @@ -99,9 +99,11 @@ DEVICE_CONFIGS = [ ] MYLA_HOST = '10.10.120.89' MYLA_PORT = 8000 -MYLA_POLL_INTERVAL_SECONDS = 5 +MYLA_POLL_INTERVAL_SECONDS = 1 MYLA_CONNECT_RETRY_SECONDS = 5 +MYLA_CONTROL_TIMEOUT_SECONDS = 6 MYLA_ACK_TIMEOUT_SECONDS = 8 +MYLA_IDLE_LOG_INTERVAL_SECONDS = 30 # Karakter kontrol standar STX, ETX, ACK, NAK, EOT, ENQ = b'\x02', b'\x03', b'\x06', b'\x15', b'\x04', b'\x05' @@ -671,6 +673,446 @@ def parse_myla_result(hl7_message, device_name="MYLA"): session.rollback() finally: session.close() + +def get_pending_myla_orders(limit=10): + session = SessionLocal() + try: + return session.query(PaslabOrder).filter( + (PaslabOrder.flg_vitek3 == False) | (PaslabOrder.flg_vitek3 == None) + ).order_by(PaslabOrder.urut.asc()).limit(limit).all() + finally: + session.close() + +def mark_myla_order_sent(order_id): + session = SessionLocal() + try: + order = session.query(PaslabOrder).filter(PaslabOrder.urut == order_id).first() + if order: + order.flg_vitek3 = True + session.commit() + return True + return False + except Exception: + session.rollback() + raise + finally: + session.close() + +def create_myla_astm_order_message(order): + """ + ASTM E1394/LIS2-A2 style: + H, P, C, O, R, L + """ + pid = sanitize_astm_field(order.norm, max_len=32) + sid = sanitize_astm_field(order.rnoreg, max_len=32) + + first_name, last_name = split_patient_name(sanitize_astm_field(order.nama, max_len=80)) + first_name = sanitize_astm_field(first_name, uppercase=True, max_len=20) + last_name = sanitize_astm_field(last_name, uppercase=True, max_len=20) + p_name = f"{last_name}^{first_name}" + + sex_raw = sanitize_astm_field(order.rjenis, uppercase=True, max_len=10) + sex = "M" if sex_raw.startswith("L") else "F" + + raw_location = sanitize_astm_field(getattr(order, 'ruangan', "UT"), uppercase=True, max_len=40) + location = re.sub(r"[^A-Z0-9]", "", raw_location)[:10] or "UT" + + diagnosis = sanitize_astm_field(getattr(order, 'diagnosa', "Unspecified"), max_len=60) or "Unspecified" + specimen_type = sanitize_astm_field(order.kd_spesimen, uppercase=True, max_len=20) if order.kd_spesimen else "BLOOD" + specimen_field = f"{specimen_type}^VENA^^BAIK" + test_code = sanitize_astm_field(order.tes, uppercase=True, max_len=40) or "GENERAL" + + head = r"H|\^&|||LIS|||||||||P|1" + + p_rec = [""] * 36 + p_rec[0] = "P" + p_rec[1] = "1" + p_rec[2] = pid + p_rec[3] = pid + p_rec[5] = p_name + p_rec[8] = sex + p_rec[25] = location + pat_str = "|".join(p_rec[:36]) + + com_str = f"C|1|L|{diagnosis}|G" + + o_rec = [""] * 32 + o_rec[0] = "O" + o_rec[1] = "1" + o_rec[2] = sid + o_rec[4] = f"^^^{test_code}" + o_rec[5] = "R" + o_rec[11] = "A" + o_rec[15] = specimen_field + ord_str = "|".join(o_rec[:32]) + + # Placeholder R record agar struktur record lengkap H,P,C,O,R,L. + rcd_str = "R|1|^^^ORDER_STATUS|PENDING|||N|||||F" + term = "L|1|N" + + message_content = f"{head}\r{pat_str}\r{com_str}\r{ord_str}\r{rcd_str}\r{term}\r" + seq = "1" + frame_body = f"{seq}{message_content}\x03" + chk = calculate_astm_checksum(frame_body) + full_frame = f"\x02{frame_body}{chk}\r\n" + return [full_frame.encode("latin-1")] + +def parse_myla_astm_records(raw_message, device_name="MYLA"): + session = SessionLocal() + try: + sample_id = "" + patient_id = "" + patient_name = "" + results = [] + + records = [r for r in raw_message.split('\r') if r.strip()] + for rec in records: + row = rec[1:] if rec and rec[0].isdigit() else rec + fields = row.split('|') + if not fields: + continue + + rtype = fields[0] + if rtype == "P": + if len(fields) > 3: + patient_id = fields[3].strip() + if len(fields) > 5: + patient_name = fields[5].replace("^", " ").strip() + elif rtype == "O": + if len(fields) > 2: + sample_id = fields[2].replace("^", "").strip() + elif rtype == "R": + test_name = fields[2].strip() if len(fields) > 2 else "" + test_value = fields[3].strip() if len(fields) > 3 else "" + if test_name or test_value: + results.append(f"{test_name}: {test_value}".strip(": ")) + + if sample_id and results: + final_result_str = " | ".join(results)[:255] + new_entry = LisPhoenix( + no_id=sample_id, + seq_no=patient_id, + rnmpas=patient_name, + tgl_data=datetime.datetime.now(), + rawdt=raw_message, + organisme=final_result_str, + alat=device_name + ) + session.add(new_entry) + session.commit() + logging.info(f"[MYLA-ASTM] Save DB -> Sample: {sample_id}, Hasil: {final_result_str}") + else: + logging.info("[MYLA-ASTM] ASTM message diterima (tanpa hasil R untuk disimpan).") + except Exception as e: + logging.error(f"[MYLA-ASTM] Error parse/simpan: {e}") + session.rollback() + finally: + session.close() + +def _read_until_lf(conn, timeout_seconds=5): + deadline = time.time() + timeout_seconds + payload = b"" + while time.time() < deadline: + conn.settimeout(max(0.1, deadline - time.time())) + chunk = conn.recv(1) + if not chunk: + break + payload += chunk + if payload.endswith(b"\n"): + break + return payload + +def receive_myla_astm_transmission(conn, peer_ip, first_control=ENQ): + """ + Menerima transmisi ASTM dari server: + ENQ -> ACK, lalu STX frame(s) -> ACK/NAK tiap frame, diakhiri EOT. + """ + if first_control == ENQ: + conn.sendall(ACK) + + assembled = [] + pending_ctrl = first_control if first_control == STX else None + while True: + if pending_ctrl is not None: + ctrl = pending_ctrl + pending_ctrl = None + else: + conn.settimeout(5) + ctrl = conn.recv(1) + if not ctrl: + return + if ctrl == EOT: + break + if ctrl == ENQ: + conn.sendall(ACK) + continue + if ctrl != STX: + continue + + payload = _read_until_lf(conn, timeout_seconds=5) + if not payload: + conn.sendall(NAK) + continue + + frame = ctrl + payload + try: + frame_text = frame.decode("latin-1", errors="ignore") + body_start = frame_text.find("\x02") + etx_pos = frame_text.find("\x03") + if body_start == -1 or etx_pos == -1 or etx_pos <= body_start: + conn.sendall(NAK) + continue + + frame_body = frame_text[body_start + 1:etx_pos + 1] + recv_chk = frame_text[etx_pos + 1:etx_pos + 3].upper() + calc_chk = calculate_astm_checksum(frame_body).upper() + if recv_chk != calc_chk: + logging.warning(f"[MYLA-ASTM] Checksum mismatch dari {peer_ip}: recv={recv_chk}, calc={calc_chk}") + print(f"[MYLA-ASTM] Checksum mismatch dari {peer_ip}: recv={recv_chk}, calc={calc_chk}") + conn.sendall(NAK) + continue + + # Drop seq(1 char) dan ETX + data_part = frame_body[1:-1] + assembled.append(data_part) + conn.sendall(ACK) + except Exception as e: + logging.error(f"[MYLA-ASTM] Gagal parse frame dari {peer_ip}: {e}") + print(f"[MYLA-ASTM] Gagal parse frame dari {peer_ip}: {e}") + conn.sendall(NAK) + + if assembled: + raw_message = "".join(assembled) + parse_myla_astm_records(raw_message, device_name=f"MYLA-{peer_ip}") + +def wait_for_astm_control(conn, expected_controls, peer_ip, timeout_seconds=MYLA_CONTROL_TIMEOUT_SECONDS): + deadline = time.time() + timeout_seconds + while time.time() < deadline: + try: + conn.settimeout(max(0.1, deadline - time.time())) + b = conn.recv(1) + if not b: + return None + if b in expected_controls: + return b + if b == ENQ: + receive_myla_astm_transmission(conn, peer_ip, first_control=ENQ) + elif b == STX: + receive_myla_astm_transmission(conn, peer_ip, first_control=STX) + except socket.timeout: + continue + except Exception as e: + logging.error(f"[MYLA-ASTM] Error wait control: {e}") + print(f"[MYLA-ASTM] Error wait control: {e}") + return None + return None + +def send_order_to_myla_astm(conn, order, peer_ip): + frames = create_myla_astm_order_message(order) + conn.sendall(ENQ) + hs = wait_for_astm_control(conn, {ACK}, peer_ip, timeout_seconds=MYLA_CONTROL_TIMEOUT_SECONDS) + if hs != ACK: + logging.warning(f"[MYLA-ASTM] Handshake gagal untuk rnoreg={order.rnoreg}, respon={hs}") + print(f"[MYLA-ASTM] Handshake gagal untuk rnoreg={order.rnoreg}, respon={hs}") + return False + + for i, frame in enumerate(frames, start=1): + frame_ok = False + for attempt in range(1, 4): + conn.sendall(frame) + resp = wait_for_astm_control(conn, {ACK, NAK}, peer_ip, timeout_seconds=MYLA_CONTROL_TIMEOUT_SECONDS) + if resp == ACK: + frame_ok = True + break + if resp == NAK: + logging.warning(f"[MYLA-ASTM] Frame {i} NAK rnoreg={order.rnoreg}, retry={attempt}") + print(f"[MYLA-ASTM] Frame {i} NAK rnoreg={order.rnoreg}, retry={attempt}") + time.sleep(0.5) + continue + logging.warning(f"[MYLA-ASTM] Frame {i} timeout rnoreg={order.rnoreg}, retry={attempt}") + print(f"[MYLA-ASTM] Frame {i} timeout rnoreg={order.rnoreg}, retry={attempt}") + if not frame_ok: + conn.sendall(EOT) + return False + + conn.sendall(EOT) + return True + +def create_myla_hl7_order_message(order, msg_control_id): + timestamp = datetime.datetime.now().astimezone().strftime('%Y%m%d%H%M%S%z') + sample_id = sanitize_astm_field(order.rnoreg, max_len=32) + pid_norm = sanitize_astm_field(order.norm, max_len=32) + + first_name, last_name = split_patient_name(sanitize_astm_field(order.nama, max_len=80)) + pid_name = f"{last_name}^{first_name}" + + raw_gender = sanitize_astm_field(order.rjenis, uppercase=True, max_len=10) + pid_gender = 'M' if raw_gender.startswith("L") else 'F' + + room = sanitize_astm_field(getattr(order, "ruangan", "") or "RSSA MALANG", uppercase=True, max_len=30) + nama_tes_db = sanitize_astm_field(order.tes, max_len=60) + test_code = sanitize_astm_field(order.tes, uppercase=True, max_len=40) or "GENERAL" + + # MyLA cenderung strict: minta MSH.9 lengkap (message^trigger^structure). + msh = ( + f"MSH|^~\\&|LIS|LAB|MYLA|bioMerieux|{timestamp}||OML^O33^OML_O33|{msg_control_id}|P|2.5.1" + f"||||||UNICODE UTF-8" + ) + pid = f"PID|1||{pid_norm}||{pid_name}|||{pid_gender}" + pv1 = f"PV1|1|I|{room}" + + # ORC.9 (Date/Time of Transaction) wajib terisi untuk MYLA. + orc_fields = ["ORC", "NW", sample_id, "", "", "", "", "", "", timestamp] + orc = "|".join(orc_fields) + + obr = f"OBR|1|{sample_id}||{test_code}^{nama_tes_db}^L|||{timestamp}" + return f"{msh}\r{pid}\r{pv1}\r{orc}\r{obr}\r" + +def process_myla_hl7_message(conn, hl7_str, peer_ip): + incoming_control_id = "" + message_type = "" + ack_code = "" + ack_for_control_id = "" + err_text = "" + orc_control = "" + orc_status = "" + orc_ref = "" + + try: + msh_fields = hl7_str.split('\r')[0].split('|') + if len(msh_fields) > 8: + message_type = msh_fields[8].strip().upper() + if len(msh_fields) > 9: + incoming_control_id = msh_fields[9].strip() + except Exception: + pass + + if "ORU^" in message_type: + logging.info(f"[MYLA-HL7] Menerima ORU dari {peer_ip} (ID: {incoming_control_id})") + print(f"[MYLA-HL7] Menerima ORU dari {peer_ip} (ID: {incoming_control_id})") + parse_myla_result(hl7_str, device_name=f"MYLA-{peer_ip}") + if incoming_control_id: + try: + ack_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + ack_msg = ( + f"MSH|^~\\&|LIS|LAB|MYLA|bioMerieux|{ack_time}||ACK|{incoming_control_id}|P|2.5\r" + f"MSA|AA|{incoming_control_id}\r" + ) + conn.sendall(f"\x0b{ack_msg}\x1c\r".encode("utf-8")) + except Exception as e: + logging.warning(f"[MYLA-HL7] Gagal kirim ACK ORU {incoming_control_id}: {e}") + elif message_type.startswith("ACK") or message_type.startswith("ORL^O34"): + for seg in hl7_str.split('\r'): + if seg.startswith("MSA|"): + parts = seg.split('|') + if len(parts) > 1: + ack_code = parts[1].strip().upper() + if len(parts) > 2: + ack_for_control_id = parts[2].strip() + break + for seg in hl7_str.split('\r'): + if seg.startswith("ERR|"): + err_parts = seg.split('|') + if len(err_parts) > 2: + err_text = err_parts[2].strip() + if len(err_parts) > 3 and err_parts[3]: + err_text = f"{err_text} {err_parts[3].strip()}".strip() + break + for seg in hl7_str.split('\r'): + if seg.startswith("ORC|"): + o = seg.split('|') + if len(o) > 1: + orc_control = o[1].strip().upper() + if len(o) > 5: + orc_status = o[5].strip().upper() + if len(o) > 6: + orc_ref = o[6].strip() + break + + return { + "message_type": message_type, + "control_id": incoming_control_id, + "ack_code": ack_code, + "ack_for_control_id": ack_for_control_id, + "err_text": err_text, + "orc_control": orc_control, + "orc_status": orc_status, + "orc_ref": orc_ref, + } + +def wait_for_myla_hl7_ack(conn, expected_control_id, peer_ip, timeout_seconds=MYLA_ACK_TIMEOUT_SECONDS): + deadline = time.time() + timeout_seconds + buffer = b"" + + while time.time() < deadline: + try: + conn.settimeout(max(0.1, deadline - time.time())) + data = conn.recv(4096) + if not data: + return False + buffer += data + + if b"\x1c\r" not in buffer: + continue + + chunks = buffer.split(b"\x1c\r") + for raw_msg in chunks[:-1]: + clean_msg = raw_msg.replace(b"\x0b", b"") + hl7_str = clean_msg.decode("latin-1", errors="ignore") + if "MSH|" not in hl7_str: + continue + + hl7_str = hl7_str[hl7_str.find("MSH|"):] + parsed = process_myla_hl7_message(conn, hl7_str, peer_ip) + if parsed["message_type"].startswith("ACK") or parsed["message_type"].startswith("ORL^O34"): + ack_for = parsed.get("ack_for_control_id", "") + ack_code = parsed.get("ack_code", "") + err_text = parsed.get("err_text", "") + orc_control = parsed.get("orc_control", "") + orc_status = parsed.get("orc_status", "") + orc_ref = parsed.get("orc_ref", "") + if ack_for == expected_control_id: + logging.info( + f"[MYLA-HL7] Response diterima untuk {expected_control_id} " + f"(type={parsed.get('message_type')}, code={ack_code or '-'}, " + f"orc1={orc_control or '-'}, orc5={orc_status or '-'}, orc6={orc_ref or '-'}, " + f"err={err_text or '-'})" + ) + app_ok = True + if parsed["message_type"].startswith("ORL^O34"): + app_ok = (orc_control == "OK") + return (ack_code in ("AA", "CA")) and app_ok + logging.info( + f"[MYLA-HL7] Response bukan untuk pesan ini " + f"(expected={expected_control_id}, msa2={ack_for or '-'}, " + f"type={parsed.get('message_type')}, code={ack_code or '-'})" + ) + + buffer = chunks[-1] + except socket.timeout: + continue + except Exception as e: + logging.error(f"[MYLA-HL7] Error wait ACK: {e}") + print(f"[MYLA-HL7] Error wait ACK: {e}") + return False + + return False + +def send_order_to_myla_hl7(conn, order, peer_ip): + msg_control_id = f"MYLA{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}{order.urut}" + hl7_message = create_myla_hl7_order_message(order, msg_control_id) + mllp_payload = f"\x0b{hl7_message}\x1c\r".encode("utf-8") + msh_line = hl7_message.split('\r')[0] + logging.info(f"[MYLA-HL7] Kirim rnoreg={order.rnoreg}, MSH={msh_line}") + + conn.sendall(mllp_payload) + ack_ok = wait_for_myla_hl7_ack( + conn, + expected_control_id=msg_control_id, + peer_ip=peer_ip, + timeout_seconds=MYLA_ACK_TIMEOUT_SECONDS + ) + return ack_ok # ========================================== # 4. NETWORK & COMMUNICATION LOGIC # ========================================== @@ -720,7 +1162,7 @@ def handle_myla_client(conn, addr): elif "QRY^" in hl7_str or "QBP^" in hl7_str: client_ip = addr[0] logging.info(f"[MYLA] Menerima Query dari {client_ip} (Control ID: {incoming_control_id})") - + print(f"[MYLA] Menerima Query dari {client_ip} (Control ID: {incoming_control_id})") # Khusus MyLA gunakan flag VITEK3. target_flag_col = "flg_vitek3" @@ -772,14 +1214,17 @@ def handle_myla_client(conn, addr): setattr(order, target_flag_col, True) session.commit() logging.info(f"[MYLA] Order terkirim rnoreg={order.rnoreg}, set {target_flag_col}=TRUE") + print(f"[MYLA] Order terkirim rnoreg={order.rnoreg}, set {target_flag_col}=TRUE") else: reply_msg = create_hl7_dsr_response(None, incoming_control_id, qrd_line) conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8')) logging.info(f"[MYLA] Tidak ada order pending untuk sample={search_sample_id or '-'}") + print(f"[MYLA] Tidak ada order pending untuk sample={search_sample_id or '-'}") except Exception as db_err: session.rollback() logging.error(f"[MYLA] Gagal proses query: {db_err}") + print(f"[MYLA] Gagal proses query: {db_err}") reply_msg = create_hl7_dsr_response(None, incoming_control_id, qrd_line) conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8')) finally: @@ -793,39 +1238,89 @@ def handle_myla_client(conn, addr): full_ack = f"\x0b{ack_msg}\x1c\r" conn.sendall(full_ack.encode('utf-8')) logging.info(f"[MYLA ACK] Terkirim untuk ID {incoming_control_id}") + print(f"[MYLA ACK] Terkirim untuk ID {incoming_control_id}") # Sisakan bagian terakhir di buffer (kalau ada pesan yang terpotong) buffer = messages[-1] except Exception as e: logging.error(f"[MYLA-TCP] Error koneksi {addr}: {e}") + print(f"[MYLA-TCP] Error koneksi {addr}: {e}") finally: conn.close() logging.info(f"[MYLA-TCP] Koneksi {addr} ditutup.") + print(f"[MYLA-TCP] Koneksi {addr} ditutup.") def start_myla_server(host, port): - server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - try: - server.bind((host, port)) - server.listen(5) - logging.info(f"[START] MYLA TCP Server berjalan di {host}:{port}") - print(f"[START] MYLA TCP Server berjalan di {host}:{port}") - - while True: - client_socket, addr = server.accept() - # Jalankan di thread terpisah agar bisa tangani banyak koneksi - client_thread = threading.Thread(target=handle_myla_client, args=(client_socket, addr), daemon=True) - client_thread.start() - except Exception as e: - logging.critical(f"[START] Gagal start MYLA TCP Server di {host}:{port}: {e}") - print(f"[START] Gagal start MYLA TCP Server di {host}:{port}: {e}") - finally: + """ + Listener berperan sebagai TCP client: + - Konek ke MYLA server + - Poll order yang belum terkirim (flg_vitek3 = False/NULL) + - Kirim order via HL7 MLLP + - Tandai terkirim jika ACK diterima + """ + while True: + conn = None + last_idle_log_at = 0.0 try: - server.close() - except Exception: - pass + logging.info(f"[MYLA-CLIENT] Mencoba konek ke MYLA {host}:{port} ...") + print(f"[MYLA-CLIENT] Mencoba konek ke MYLA {host}:{port} ...") + conn = socket.create_connection((host, port), timeout=10) + conn.settimeout(1.0) + logging.info(f"[MYLA-CLIENT] Terhubung ke MYLA {host}:{port}") + print(f"[MYLA-CLIENT] Terhubung ke MYLA {host}:{port}") + + while True: + pending_orders = get_pending_myla_orders(limit=10) + if not pending_orders: + now_ts = time.time() + if (now_ts - last_idle_log_at) >= MYLA_IDLE_LOG_INTERVAL_SECONDS: + logging.info( + "[MYLA-CLIENT] Polling aktif: tidak ada order pending " + "(flg_vitek3 = FALSE/NULL)." + ) + print( + "[MYLA-CLIENT] Polling aktif: tidak ada order pending " + "(flg_vitek3 = FALSE/NULL)." + ) + last_idle_log_at = now_ts + time.sleep(MYLA_POLL_INTERVAL_SECONDS) + continue + + logging.info(f"[MYLA-CLIENT] Ditemukan {len(pending_orders)} order pending untuk MYLA") + print(f"[MYLA-CLIENT] Ditemukan {len(pending_orders)} order pending untuk MYLA") + last_idle_log_at = 0.0 + + for order in pending_orders: + send_ok = send_order_to_myla_hl7(conn, order, host) + if send_ok: + mark_myla_order_sent(order.urut) + logging.info(f"[MYLA-HL7] Order sukses, set flg_vitek3=TRUE rnoreg={order.rnoreg}") + print(f"[MYLA-HL7] Order sukses, set flg_vitek3=TRUE rnoreg={order.rnoreg}") + else: + logging.warning( + f"[MYLA-HL7] Pengiriman gagal/ACK timeout untuk rnoreg={order.rnoreg}, " + f"akan dicoba ulang di polling berikutnya." + ) + print( + f"[MYLA-HL7] Pengiriman gagal/ACK timeout untuk rnoreg={order.rnoreg}, " + f"akan dicoba ulang di polling berikutnya." + ) + break + + time.sleep(1) + + except Exception as e: + logging.error(f"[MYLA-CLIENT] Koneksi/pengiriman error ke {host}:{port}: {e}") + print(f"[MYLA-CLIENT] Koneksi/pengiriman error ke {host}:{port}: {e}") + finally: + if conn: + try: + conn.close() + except Exception: + pass + + time.sleep(MYLA_CONNECT_RETRY_SECONDS) # ========================================== # HL7 TCP LISTENER FOR GENEXPERT @@ -2059,7 +2554,7 @@ if __name__ == "__main__": t_tcp.start() all_threads.append(t_tcp) - # 3. Start Thread TCP Server (MyLA) + # 3. Start Thread TCP Client (MyLA Server Connector) myla_thread = threading.Thread( target=start_myla_server, args=(MYLA_HOST, MYLA_PORT),