From 6acae8ddc5a9579768e615c4eb991186c76ce284 Mon Sep 17 00:00:00 2001 From: Dwi Swandhana Date: Fri, 6 Feb 2026 05:12:07 +0700 Subject: [PATCH] update --- listener/app.py | 301 ++---------------------------------------------- 1 file changed, 10 insertions(+), 291 deletions(-) diff --git a/listener/app.py b/listener/app.py index 8286affb..0c3cec5a 100644 --- a/listener/app.py +++ b/listener/app.py @@ -106,7 +106,7 @@ active_serial_ports = {} order_queues = {config['port']: Queue() for config in DEVICE_CONFIGS if config['protocol'] == 'serial'} # ========================================== -# 2. DATABASE MODEL (SESUAIKAN) +# 2. DATABASE MODEL # ========================================== DATABASE_URL = "postgresql://lismikro:lismikro@10.10.123.193:5002/lismikro" engine = create_engine(DATABASE_URL, pool_recycle=3600) @@ -206,18 +206,6 @@ def get_pending_orders(ip_addr): finally: session.close() -def build_rsp_for_order(order, msg_id): - ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - - return ( - f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{ts}||RSP^Z03|{msg_id}|P|2.5\r" - f"MSA|AA|{msg_id}\r" - f"QAK|OK|{msg_id}\r" - f"QPD|Z01^REQUEST TEST ORDERS|{order.rnoreg}|ALL\r" - f"ORC|NW|{order.urut}\r" - f"OBR|1|{order.urut}|{order.rnoreg}|^{order.kd_spesimen}^{order.nm_spesimen}\r" - ) - def send_all_orders(conn, ip_addr, hl7_msg, msg_id): orders = get_pending_orders(ip_addr) session = SessionLocal() @@ -235,7 +223,9 @@ def send_all_orders(conn, ip_addr, hl7_msg, msg_id): print(f"[GENEXPERT] Mengirim {len(orders)} order ke {ip_addr}") for order in orders: - rsp = build_rsp_for_order(order, msg_id) + rsp = create_hl7_dsr_response(order, msg_id, "") + print(f"[GENEXPERT] Order ditemukan untuk {ip_addr}") + mllp = f"\x0b{rsp}\x1c\r" conn.sendall(mllp.encode('utf-8')) flag = get_flag_by_device(ip_addr) @@ -255,62 +245,6 @@ def extract_msg_control_id(hl7_message): except: return None -def extract_requested_sample_id(hl7_message): - try: - for seg in hl7_message.split('\r'): - if seg.startswith('QPD|'): - fields = seg.split('|') - if len(fields) > 2: - return fields[2].strip() - return None - except: - return None - -def find_order_from_db(sample_id): - session = SessionLocal() - try: - order = session.query(PaslabOrder).filter(PaslabOrder.rnoreg == sample_id).first() - return order - finally: - session.close() - -def create_hl7_orm_message(order): - timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - - # 1. Mapping Sample ID (Barcode) - # Gunakan 'rnoreg' atau 'urut' tergantung mana yang ditempel di tabung sampel - sample_id = str(order.rnoreg) - - # 2. Mapping Gender (rjenis) - # GeneXpert butuh 'M', 'F', atau 'O' - raw_gender = str(order.rjenis).upper() - if 'LAKI' in raw_gender or raw_gender == 'L': - pid_gender = 'M' - elif 'PEREMPUAN' in raw_gender or raw_gender == 'P': - pid_gender = 'F' - else: - pid_gender = 'O' - - # 3. Mapping Nama & NORM - pid_nama = order.nama if order.nama else "No Name" - pid_norm = order.norm if order.norm else "" - - # 4. Mapping Test Code - # Cek kolom 'tes'. Jika mengandung kata 'TB', set kode jadi MTB. - # Kode ini HARUS SAMA dengan "Host Test Code" di alat. - test_code = GENEXPERT_TEST_MAPPING.get(order.kd_spesimen, DEFAULT_GXP_CODE) - test_name = order.tes if order.tes else "UNKNOWN TEST" - - # Jika Anda punya tes lain (misal HIV), tambahkan if/else di sini berdasarkan order.tes - - # --- Susun Pesan HL7 --- - msh = f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{timestamp}||ORM^O01|{sample_id}|P|2.3" - pid = f"PID|1||{pid_norm}||{pid_nama}|||{pid_gender}" - orc = f"ORC|NW|{sample_id}" - obr = f"OBR|1|{sample_id}||{test_code}^{test_name}^L|||{timestamp}" - - return f"{msh}\r{pid}\r{orc}\r{obr}\r" - def parse_hl7_result(conn, msg_id, hl7_message, device_name="GeneXpert"): session = SessionLocal() clean_hl7 = hl7_message @@ -423,45 +357,6 @@ def parse_hl7_result(conn, msg_id, hl7_message, device_name="GeneXpert"): finally: session.close() -def send_order_response(conn, hl7_msg, msg_id=None): - if not msg_id: - msg_id = extract_msg_control_id(hl7_msg) - - # ambil sample_id yang diminta GeneXpert - sample_id = extract_requested_sample_id(hl7_msg) - - print(f"[GENEXPERT] Request ORDER untuk sample_id: {sample_id}") - - # cari order di database - order = find_order_from_db(sample_id) - - if not order: - print(f"[GENEXPERT] Tidak ditemukan ORDER untuk {sample_id}. Balas NOT FOUND.") - rsp = ( - f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}||RSP^Z03|{msg_id}|P|2.5\r" - f"MSA|AE|{msg_id}\r" - f"QAK|NF|{msg_id}\r" - f"QPD|Z01^REQUEST TEST ORDERS|{msg_id}|ALL\r" - ) - mllp = f"\x0b{rsp}\x1c\r" - conn.sendall(mllp.encode('utf-8')) - return - - response_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S') - - rsp = ( - f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{response_time}||RSP^Z03|{msg_id}|P|2.5\r" - f"MSA|AA|{msg_id}\r" - f"QAK|OK|{msg_id}\r" - f"QPD|Z01^REQUEST TEST ORDERS|{sample_id}|ALL\r" - f"ORC|NW|{order.urut}\r" - f"OBR|1|{order.urut}|{order.rnoreg}|^{order.kd_spesimen}^{order.nm_spesimen}\r" - ) - - mllp = f"\x0b{rsp}\x1c\r" - conn.sendall(mllp.encode('utf-8')) - print(f"[GENEXPERT] >> RSP^Z03 terkirim untuk ORDER {order.urut}") - def create_hl7_dsr_response(order, msg_control_id, qrd_segment): """ Membuat pesan balasan DSR^Q03 (Data Response) untuk GeneXpert. @@ -527,41 +422,6 @@ def create_hl7_dsr_response(order, msg_control_id, qrd_segment): # ========================================== # HL7 TCP LISTENER FOR GENEXPERT # ========================================== -# ========================================== -# TAMBAHAN: FUNGSI TCP LISTENER (GENEXPERT) -# ========================================== -def handle_tcp_client(client_socket, addr): - """Menangani satu koneksi client GeneXpert""" - logging.info(f"[TCP] Koneksi diterima dari {addr}") - print(f"[TCP] Koneksi diterima dari {addr}") - try: - # Loop baca data dari client ini - while True: - data = client_socket.recv(4096) - if not data: - break - - # --- PROSES DATA GENEXPERT DISINI --- - # decode, parsing ASTM, save to DB - try: - msg = data.decode('latin-1', errors='ignore') - logging.info(f"[TCP_HANDLING] Data dari {addr}: {msg[:50]}...") - print(f"[TCP_HANDLING] Data dari {addr}: {msg[:50]}...") - msg_id = extract_msg_control_id(msg) - parse_hl7_result(client_socket, msg_id, msg, device_name=addr[0] if addr else "Unknown") - - # Kirim ACK ASTM jika perlu (\x06) - client_socket.send(b'\x06') - except Exception as e: - logging.error(f"[TCP_HANDLING] Error processing data: {e}") - print(f"[TCP_HANDLING] Error processing data: {e}") - except Exception as e: - logging.error(f"[TCP_HANDLING] Koneksi Error {addr}: {e}") - print(f"[TCP_HANDLING] Koneksi Error {addr}: {e}") - finally: - client_socket.close() - logging.info(f"[TCP_HANDLING] Koneksi ditutup {addr}") - print(f"[TCP_HANDLING] Koneksi ditutup {addr}") def manage_tcp_server(): """Thread Server Utama untuk GeneXpert""" @@ -580,7 +440,7 @@ def manage_tcp_server(): # Buat thread kecil untuk handle client tersebut (agar server bisa terima client lain) client_thread = threading.Thread( - target=handle_tcp_client, + target=handle_genexpert_client, args=(client_sock, addr), daemon=True ) @@ -590,13 +450,6 @@ def manage_tcp_server(): logging.critical(f"[TCP-SERVER] Gagal Start: {e}") print(f"[TCP-SERVER] Gagal Start: {e}") -def send_mllp_message(sock, hl7_msg): - """Membungkus pesan HL7 dengan MLLP (Minimal Lower Layer Protocol)""" - # Start Block: 0x0B () - # End Block: 0x1C 0x0D () - mllp_msg = b'\x0b' + hl7_msg.encode('utf-8') + b'\x1c\r' - sock.sendall(mllp_msg) - def handle_genexpert_client(conn, addr): print(f"[GenExpert_TCP] Koneksi baru dari {addr}") buffer = b"" @@ -825,26 +678,6 @@ def handle_genexpert_client(conn, addr): conn.close() logging.info(f"[GenExpert_TCP] Koneksi {addr} ditutup.") -def run_tcp_server(): - """Loop utama TCP Server""" - server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - try: - server.bind((SERVER_HOST, TCP_LISTENER_PORT)) - server.listen(5) # Backlog 5 koneksi - logging.info(f"TCP Listener AKTIF di port {TCP_LISTENER_PORT}. Menunggu koneksi GeneXpert...") - print(f"TCP Listener AKTIF di port {TCP_LISTENER_PORT}. Menunggu koneksi GeneXpert...") - while True: - conn, addr = server.accept() - # Buat thread baru untuk setiap alat yg konek - t = threading.Thread(target=handle_genexpert_client, args=(conn, addr), daemon=True) - t.start() - - except Exception as e: - logging.critical(f"Gagal menjalankan TCP Server: {e}") - print(f"Gagal menjalankan TCP Server: {e}") - def send_order_via_active_connection(target_ip, hl7_message): conn = None with connection_lock: @@ -878,35 +711,6 @@ def send_order_via_active_connection(target_ip, hl7_message): del active_genexpert_connections[target_ip] return False -def broadcast_order_to_all_machines(hl7_message, order_id): - connected_ips = [] - with connection_lock: - connected_ips = list(active_genexpert_connections.keys()) - - if not connected_ips: - logging.warning(f"Order {order_id} GAGAL dikirim: Tidak ada GeneXpert yang terkoneksi saat ini.") - print(f"Order {order_id} GAGAL dikirim: Tidak ada GeneXpert yang terkoneksi saat ini.") - return False - - success_count = 0 - logging.info(f"Memulai broadcast Order {order_id} ke {len(connected_ips)} alat...") - print(f"Memulai broadcast Order {order_id} ke {len(connected_ips)} alat...") - for target_ip in connected_ips: - # Panggil fungsi kirim tunggal yang sudah kita buat sebelumnya - # Note: Fungsi send_order_via_active_connection ada di kode sebelumnya - if send_order_via_active_connection(target_ip, hl7_message): - logging.info(f" -> Sukses kirim ke {target_ip}") - print(f" -> Sukses kirim ke {target_ip}") - success_count += 1 - else: - logging.error(f" -> Gagal kirim ke {target_ip}") - print(f" -> Gagal kirim ke {target_ip}") - - # Logika bisnis: Dianggap sukses jika minimal terkirim ke SATU alat - if success_count > 0: - return True - else: - return False # ========================================== # VITEK PARSER # ========================================== @@ -1678,94 +1482,9 @@ def manage_bd_port(config): time.sleep(5) # ========================================== -# 5. ORDER POLLER (BROADCASTER) +# 5. Serial Manager # ========================================== -def order_poller(stop_event): - """Looping cek DB dan kirim order ke SEMUA alat""" - logging.info("Order Poller Berjalan...") - print("Order Poller Berjalan...") - while not stop_event.is_set(): - session = SessionLocal() - try: - # 1. Ambil order yang belum dikirim (flag_genexpert = False) - - orders = session.query(PaslabOrder).filter( - (PaslabOrder.flg_gxp1 == False) | - (PaslabOrder.flg_gxp2 == False) | - (PaslabOrder.flg_gxp3 == False) - ).all() - # Update daftar koneksi aktif (Thread safe) - current_connections = {} - with connection_lock: - current_connections = active_genexpert_connections.copy() - - if not orders: - time.sleep(5) - continue - for order in orders: - # Generate pesan HL7 sekali untuk order ini - hl7_msg = create_hl7_orm_message(order) - - # --- LOGIKA KIRIM BERDASARKAN FLAG --- - - # 1. Cek Target GXP 1 - if order.flg_gxp1 == False: - target_ip = TARGET_MAPPING['flg_gxp1'] # 10.10.123.73 - if target_ip in current_connections: - # Ada koneksi dari alat 1, kirim! - conn = current_connections[target_ip] - try: - send_mllp_message(conn, hl7_msg) - order.flg_gxp1 = True # Update Flag - logging.info(f"[SENT] Order {order.rnoreg} dikirim ke GXP-1 ({target_ip})") - print(f"[SENT] Order {order.rnoreg} dikirim ke GXP-1 ({target_ip})") - except Exception as e: - logging.error(f"[FAIL] Gagal kirim ke GXP-1: {e}") - print(f"[FAIL] Gagal kirim ke GXP-1: {e}") - else: - # Alat 1 belum connect, biarkan False (pending) - pass - - # 2. Cek Target GXP 2 - if order.flg_gxp2 == False: - target_ip = TARGET_MAPPING['flg_gxp2'] # 10.10.123.74 - if target_ip in current_connections: - conn = current_connections[target_ip] - try: - send_mllp_message(conn, hl7_msg) - order.flg_gxp2 = True - logging.info(f"[SENT] Order {order.rnoreg} dikirim ke GXP-2 ({target_ip})") - print(f"[SENT] Order {order.rnoreg} dikirim ke GXP-2 ({target_ip})") - except Exception as e: - logging.error(f"[FAIL] Gagal kirim ke GXP-2: {e}") - print(f"[FAIL] Gagal kirim ke GXP-2: {e}") - - # 3. Cek Target GXP 3 - if order.flg_gxp3 == False: - target_ip = TARGET_MAPPING['flg_gxp3'] # 10.10.120.75 - if target_ip in current_connections: - conn = current_connections[target_ip] - try: - send_mllp_message(conn, hl7_msg) - order.flg_gxp3 = True - logging.info(f"[SENT] Order {order.rnoreg} dikirim ke GXP-3 ({target_ip})") - print(f"[SENT] Order {order.rnoreg} dikirim ke GXP-3 ({target_ip})") - except Exception as e: - logging.error(f"[FAIL] Gagal kirim ke GXP-3: {e}") - print(f"[FAIL] Gagal kirim ke GXP-3: {e}") - - # Commit perubahan flag ke database - session.commit() - except Exception as e: - logging.error(f"Error Poller: {e}") - print(f"Error Poller: {e}") - session.rollback() - finally: - session.close() - - time.sleep(5) # Cek DB setiap 5 detik - def manage_serial_port(config): """Fungsi router yang memilih manajer yang tepat berdasarkan tipe alat.""" device_type = config.get('device_type') @@ -1778,7 +1497,7 @@ def manage_serial_port(config): print(f"Tipe alat tidak diketahui: '{device_type}' untuk port {config.get('port')}. Thread dihentikan.") # ========================================== -# 7. MAIN EXECUTION +# 6. MAIN EXECUTION # ========================================== if __name__ == "__main__": print("--- MEMULAI LIS INTERFACE SYSTEM ---") @@ -1787,9 +1506,9 @@ if __name__ == "__main__": all_threads = [] stop_event = threading.Event() # 1. Start Thread Order Poller (Pengecek Order Baru di DB) - t_poller = threading.Thread(target=order_poller, args=(stop_event,), name="OrderPoller", daemon=True) - t_poller.start() - all_threads.append(t_poller) + #t_poller = threading.Thread(target=order_poller, args=(stop_event,), name="OrderPoller", daemon=True) + #t_poller.start() + #all_threads.append(t_poller) # 2. Start Thread Serial Manager (Vitek & BD) for config in DEVICE_CONFIGS: