diff --git a/listener/app.py b/listener/app.py index be75d59a..e2ab4fb6 100644 --- a/listener/app.py +++ b/listener/app.py @@ -184,6 +184,91 @@ Base.metadata.create_all(bind=engine) # ========================================== # 3. HL7 HELPER FUNCTIONS # ========================================== +def get_flag_by_device(ip_addr): + for flag, ip in TARGET_MAPPING.items(): + if ip == ip_addr: + return flag + return None + +def get_pending_orders(ip_addr): + flag = get_flag_by_device(ip_addr) + if not flag: + return [] + + session = SessionLocal() + try: + q = session.query(PaslabOrder).filter(getattr(PaslabOrder, flag) == False) + return q.all() + finally: + session.close() + +def build_rsp_for_order(order, msg_id): + ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + + return ( + f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{ts}||RSP^Z03|{msg_id}|P|2.5\r" + f"MSA|AA|{msg_id}\r" + f"QAK|OK|{msg_id}\r" + f"QPD|Z01^REQUEST TEST ORDERS|{order.rnoreg}|ALL\r" + f"ORC|NW|{order.urut}\r" + f"OBR|1|{order.urut}|{order.rnoreg}|^{order.kd_spesimen}^{order.nm_spesimen}\r" + ) + +def send_all_orders(conn, ip_addr, hl7_msg, msg_id): + orders = get_pending_orders(ip_addr) + session = SessionLocal() + + if not orders: + print(f"[GENEXPERT] Tidak ada order pending untuk {ip_addr}") + rsp = ( + f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}||RSP^Z03|{msg_id}|P|2.5\r" + f"MSA|AA|{msg_id}\r" + f"QAK|NF|{msg_id}\r" + ) + conn.sendall(f"\x0b{rsp}\x1c\r".encode('utf-8')) + return + + print(f"[GENEXPERT] Mengirim {len(orders)} order ke {ip_addr}") + + for order in orders: + rsp = build_rsp_for_order(order, msg_id) + mllp = f"\x0b{rsp}\x1c\r" + conn.sendall(mllp.encode('utf-8')) + flag = get_flag_by_device(ip_addr) + setattr(order, flag, True) + session.add(order) + + session.commit() + session.close() + +def extract_msg_control_id(hl7_message): + try: + segments = hl7_message.split('\r') + msh = segments[0].split('|') + if len(msh) > 9: + return msh[9].strip() + return None + except: + return None + +def extract_requested_sample_id(hl7_message): + try: + for seg in hl7_message.split('\r'): + if seg.startswith('QPD|'): + fields = seg.split('|') + if len(fields) > 2: + return fields[2].strip() + return None + except: + return None + +def find_order_from_db(sample_id): + session = SessionLocal() + try: + order = session.query(PaslabOrder).filter(PaslabOrder.rnoreg == sample_id).first() + return order + finally: + session.close() def create_hl7_orm_message(order): timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') @@ -222,8 +307,9 @@ def create_hl7_orm_message(order): return f"{msh}\r{pid}\r{orc}\r{obr}\r" -def parse_hl7_result(hl7_message, device_name="GeneXpert"): +def parse_hl7_result(conn, msg_id, hl7_message, device_name="GeneXpert"): session = SessionLocal() + clean_hl7 = hl7_message try: # 1. Bersihkan dan Split Pesan # HL7 dipisahkan oleh \r (Carriage Return) @@ -303,13 +389,9 @@ def parse_hl7_result(hl7_message, device_name="GeneXpert"): # 4. Validasi Data Penting (DIMODIFIKASI) if not sample_id: - # JANGAN RETURN, tapi buat ID palsu agar tetap bisa masuk database - timestamp_err = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - sample_id = f"ERR_{timestamp_err}" - logging.warning(f"[HL7 Parser] Sample ID kosong. Disimpan sebagai {sample_id} untuk audit.") - print(f"[HL7 Parser] Sample ID kosong. Disimpan sebagai {sample_id} untuk audit.") - # Tambahkan penanda di hasil agar Anda tahu ini error - final_result = f"[PARSE FAIL] {final_result}" + send_all_orders(conn, device_name, clean_hl7, msg_id) + print("[HL7 Parser] Meminta order") + return logging.info(f"[HL7 Parser] Menyimpan hasil untuk Sample: {sample_id}") print(f"[HL7 Parser] Menyimpan hasil untuk Sample: {sample_id}") @@ -337,6 +419,46 @@ def parse_hl7_result(hl7_message, device_name="GeneXpert"): finally: session.close() +def send_order_response(conn, hl7_msg, msg_id=None): + if not msg_id: + msg_id = extract_msg_control_id(hl7_msg) + + # ambil sample_id yang diminta GeneXpert + sample_id = extract_requested_sample_id(hl7_msg) + + print(f"[GENEXPERT] Request ORDER untuk sample_id: {sample_id}") + + # cari order di database + order = find_order_from_db(sample_id) + + if not order: + print(f"[GENEXPERT] Tidak ditemukan ORDER untuk {sample_id}. Balas NOT FOUND.") + rsp = ( + f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}||RSP^Z03|{msg_id}|P|2.5\r" + f"MSA|AE|{msg_id}\r" + f"QAK|NF|{msg_id}\r" + f"QPD|Z01^REQUEST TEST ORDERS|{msg_id}|ALL\r" + ) + mllp = f"\x0b{rsp}\x1c\r" + conn.sendall(mllp.encode('utf-8')) + return + + response_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + + rsp = ( + f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{response_time}||RSP^Z03|{msg_id}|P|2.5\r" + f"MSA|AA|{msg_id}\r" + f"QAK|OK|{msg_id}\r" + f"QPD|Z01^REQUEST TEST ORDERS|{sample_id}|ALL\r" + f"ORC|NW|{order.urut}\r" + f"OBR|1|{order.urut}|{order.rnoreg}|^{order.kd_spesimen}^{order.nm_spesimen}\r" + ) + + mllp = f"\x0b{rsp}\x1c\r" + conn.sendall(mllp.encode('utf-8')) + print(f"[GENEXPERT] >> RSP^Z03 terkirim untuk ORDER {order.urut}") + + # ========================================== # 4. NETWORK & COMMUNICATION LOGIC # ========================================== @@ -361,23 +483,23 @@ def handle_tcp_client(client_socket, addr): # decode, parsing ASTM, save to DB try: msg = data.decode('latin-1', errors='ignore') - logging.info(f"[TCP] Data dari {addr}: {msg[:50]}...") - print(f"[TCP] Data dari {addr}: {msg[:50]}...") - # Panggil fungsi parser GeneXpert Anda disini - parse_hl7_result(msg, device_name=addr[0] if addr else "Unknown") + logging.info(f"[TCP_HANDLING] Data dari {addr}: {msg[:50]}...") + print(f"[TCP_HANDLING] Data dari {addr}: {msg[:50]}...") + msg_id = extract_msg_control_id(msg) + parse_hl7_result(client_socket, msg_id, msg, device_name=addr[0] if addr else "Unknown") # Kirim ACK ASTM jika perlu (\x06) client_socket.send(b'\x06') except Exception as e: - logging.error(f"[TCP] Error processing data: {e}") - print(f"[TCP] Error processing data: {e}") + logging.error(f"[TCP_HANDLING] Error processing data: {e}") + print(f"[TCP_HANDLING] Error processing data: {e}") except Exception as e: - logging.error(f"[TCP] Koneksi Error {addr}: {e}") - print(f"[TCP] Koneksi Error {addr}: {e}") + logging.error(f"[TCP_HANDLING] Koneksi Error {addr}: {e}") + print(f"[TCP_HANDLING] Koneksi Error {addr}: {e}") finally: client_socket.close() - logging.info(f"[TCP] Koneksi ditutup {addr}") - print(f"[TCP] Koneksi ditutup {addr}") + logging.info(f"[TCP_HANDLING] Koneksi ditutup {addr}") + print(f"[TCP_HANDLING] Koneksi ditutup {addr}") def manage_tcp_server(): """Thread Server Utama untuk GeneXpert""" @@ -419,7 +541,7 @@ def send_mllp_message(sock, hl7_msg): sock.sendall(mllp_msg) def handle_genexpert_client(conn, addr): - print(f"[TCP] Koneksi baru dari {addr}") + print(f"[GenExpert_TCP] Koneksi baru dari {addr}") buffer = b"" try: @@ -484,12 +606,17 @@ def handle_genexpert_client(conn, addr): if "MSH|" in temp_str: msh_index = temp_str.find("MSH|") clean_hl7 = temp_str[msh_index:] - + if "QBP^Z03" in clean_hl7: + print("[GENEXPERT] Alat meminta ORDER") + msg_id = extract_msg_control_id(clean_hl7) + send_all_orders(conn, addr[0], clean_hl7, msg_id) + continue + # Logging sample data (50 karakter) - print(f"[TCP] Pesan Lengkap Diterima: {clean_hl7[:50]}...") + print(f"[GenExpert_TCP] Pesan Lengkap Diterima: {clean_hl7[:50]}...") # Panggil Parser - parse_hl7_result(clean_hl7, device_name=f"GeneXpert-{addr[0]}") + parse_hl7_result(conn, msg_id, clean_hl7, device_name=f"GeneXpert-{addr[0]}, ") # --- KIRIM ACK --- try: @@ -516,10 +643,10 @@ def handle_genexpert_client(conn, addr): time.sleep(0.1) except Exception as e: - logging.error(f"[TCP Error] Koneksi {addr} terputus: {e}") + logging.error(f"[GenExpert_TCP Error] Koneksi {addr} terputus: {e}") finally: conn.close() - logging.info(f"[TCP] Koneksi {addr} ditutup.") + logging.info(f"[GenExpert_TCP] Koneksi {addr} ditutup.") def run_tcp_server(): """Loop utama TCP Server"""