This commit is contained in:
Dwi Swandhana
2026-02-06 05:12:07 +07:00
parent 539d0a8b50
commit 6acae8ddc5
+10 -291
View File
@@ -106,7 +106,7 @@ active_serial_ports = {}
order_queues = {config['port']: Queue() for config in DEVICE_CONFIGS if config['protocol'] == 'serial'}
# ==========================================
# 2. DATABASE MODEL (SESUAIKAN)
# 2. DATABASE MODEL
# ==========================================
DATABASE_URL = "postgresql://lismikro:[email protected]:5002/lismikro"
engine = create_engine(DATABASE_URL, pool_recycle=3600)
@@ -206,18 +206,6 @@ def get_pending_orders(ip_addr):
finally:
session.close()
def build_rsp_for_order(order, msg_id):
ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
return (
f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{ts}||RSP^Z03|{msg_id}|P|2.5\r"
f"MSA|AA|{msg_id}\r"
f"QAK|OK|{msg_id}\r"
f"QPD|Z01^REQUEST TEST ORDERS|{order.rnoreg}|ALL\r"
f"ORC|NW|{order.urut}\r"
f"OBR|1|{order.urut}|{order.rnoreg}|^{order.kd_spesimen}^{order.nm_spesimen}\r"
)
def send_all_orders(conn, ip_addr, hl7_msg, msg_id):
orders = get_pending_orders(ip_addr)
session = SessionLocal()
@@ -235,7 +223,9 @@ def send_all_orders(conn, ip_addr, hl7_msg, msg_id):
print(f"[GENEXPERT] Mengirim {len(orders)} order ke {ip_addr}")
for order in orders:
rsp = build_rsp_for_order(order, msg_id)
rsp = create_hl7_dsr_response(order, msg_id, "")
print(f"[GENEXPERT] Order ditemukan untuk {ip_addr}")
mllp = f"\x0b{rsp}\x1c\r"
conn.sendall(mllp.encode('utf-8'))
flag = get_flag_by_device(ip_addr)
@@ -255,62 +245,6 @@ def extract_msg_control_id(hl7_message):
except:
return None
def extract_requested_sample_id(hl7_message):
try:
for seg in hl7_message.split('\r'):
if seg.startswith('QPD|'):
fields = seg.split('|')
if len(fields) > 2:
return fields[2].strip()
return None
except:
return None
def find_order_from_db(sample_id):
session = SessionLocal()
try:
order = session.query(PaslabOrder).filter(PaslabOrder.rnoreg == sample_id).first()
return order
finally:
session.close()
def create_hl7_orm_message(order):
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
# 1. Mapping Sample ID (Barcode)
# Gunakan 'rnoreg' atau 'urut' tergantung mana yang ditempel di tabung sampel
sample_id = str(order.rnoreg)
# 2. Mapping Gender (rjenis)
# GeneXpert butuh 'M', 'F', atau 'O'
raw_gender = str(order.rjenis).upper()
if 'LAKI' in raw_gender or raw_gender == 'L':
pid_gender = 'M'
elif 'PEREMPUAN' in raw_gender or raw_gender == 'P':
pid_gender = 'F'
else:
pid_gender = 'O'
# 3. Mapping Nama & NORM
pid_nama = order.nama if order.nama else "No Name"
pid_norm = order.norm if order.norm else ""
# 4. Mapping Test Code
# Cek kolom 'tes'. Jika mengandung kata 'TB', set kode jadi MTB.
# Kode ini HARUS SAMA dengan "Host Test Code" di alat.
test_code = GENEXPERT_TEST_MAPPING.get(order.kd_spesimen, DEFAULT_GXP_CODE)
test_name = order.tes if order.tes else "UNKNOWN TEST"
# Jika Anda punya tes lain (misal HIV), tambahkan if/else di sini berdasarkan order.tes
# --- Susun Pesan HL7 ---
msh = f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{timestamp}||ORM^O01|{sample_id}|P|2.3"
pid = f"PID|1||{pid_norm}||{pid_nama}|||{pid_gender}"
orc = f"ORC|NW|{sample_id}"
obr = f"OBR|1|{sample_id}||{test_code}^{test_name}^L|||{timestamp}"
return f"{msh}\r{pid}\r{orc}\r{obr}\r"
def parse_hl7_result(conn, msg_id, hl7_message, device_name="GeneXpert"):
session = SessionLocal()
clean_hl7 = hl7_message
@@ -423,45 +357,6 @@ def parse_hl7_result(conn, msg_id, hl7_message, device_name="GeneXpert"):
finally:
session.close()
def send_order_response(conn, hl7_msg, msg_id=None):
if not msg_id:
msg_id = extract_msg_control_id(hl7_msg)
# ambil sample_id yang diminta GeneXpert
sample_id = extract_requested_sample_id(hl7_msg)
print(f"[GENEXPERT] Request ORDER untuk sample_id: {sample_id}")
# cari order di database
order = find_order_from_db(sample_id)
if not order:
print(f"[GENEXPERT] Tidak ditemukan ORDER untuk {sample_id}. Balas NOT FOUND.")
rsp = (
f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}||RSP^Z03|{msg_id}|P|2.5\r"
f"MSA|AE|{msg_id}\r"
f"QAK|NF|{msg_id}\r"
f"QPD|Z01^REQUEST TEST ORDERS|{msg_id}|ALL\r"
)
mllp = f"\x0b{rsp}\x1c\r"
conn.sendall(mllp.encode('utf-8'))
return
response_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
rsp = (
f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{response_time}||RSP^Z03|{msg_id}|P|2.5\r"
f"MSA|AA|{msg_id}\r"
f"QAK|OK|{msg_id}\r"
f"QPD|Z01^REQUEST TEST ORDERS|{sample_id}|ALL\r"
f"ORC|NW|{order.urut}\r"
f"OBR|1|{order.urut}|{order.rnoreg}|^{order.kd_spesimen}^{order.nm_spesimen}\r"
)
mllp = f"\x0b{rsp}\x1c\r"
conn.sendall(mllp.encode('utf-8'))
print(f"[GENEXPERT] >> RSP^Z03 terkirim untuk ORDER {order.urut}")
def create_hl7_dsr_response(order, msg_control_id, qrd_segment):
"""
Membuat pesan balasan DSR^Q03 (Data Response) untuk GeneXpert.
@@ -527,41 +422,6 @@ def create_hl7_dsr_response(order, msg_control_id, qrd_segment):
# ==========================================
# HL7 TCP LISTENER FOR GENEXPERT
# ==========================================
# ==========================================
# TAMBAHAN: FUNGSI TCP LISTENER (GENEXPERT)
# ==========================================
def handle_tcp_client(client_socket, addr):
"""Menangani satu koneksi client GeneXpert"""
logging.info(f"[TCP] Koneksi diterima dari {addr}")
print(f"[TCP] Koneksi diterima dari {addr}")
try:
# Loop baca data dari client ini
while True:
data = client_socket.recv(4096)
if not data:
break
# --- PROSES DATA GENEXPERT DISINI ---
# decode, parsing ASTM, save to DB
try:
msg = data.decode('latin-1', errors='ignore')
logging.info(f"[TCP_HANDLING] Data dari {addr}: {msg[:50]}...")
print(f"[TCP_HANDLING] Data dari {addr}: {msg[:50]}...")
msg_id = extract_msg_control_id(msg)
parse_hl7_result(client_socket, msg_id, msg, device_name=addr[0] if addr else "Unknown")
# Kirim ACK ASTM jika perlu (\x06)
client_socket.send(b'\x06')
except Exception as e:
logging.error(f"[TCP_HANDLING] Error processing data: {e}")
print(f"[TCP_HANDLING] Error processing data: {e}")
except Exception as e:
logging.error(f"[TCP_HANDLING] Koneksi Error {addr}: {e}")
print(f"[TCP_HANDLING] Koneksi Error {addr}: {e}")
finally:
client_socket.close()
logging.info(f"[TCP_HANDLING] Koneksi ditutup {addr}")
print(f"[TCP_HANDLING] Koneksi ditutup {addr}")
def manage_tcp_server():
"""Thread Server Utama untuk GeneXpert"""
@@ -580,7 +440,7 @@ def manage_tcp_server():
# Buat thread kecil untuk handle client tersebut (agar server bisa terima client lain)
client_thread = threading.Thread(
target=handle_tcp_client,
target=handle_genexpert_client,
args=(client_sock, addr),
daemon=True
)
@@ -590,13 +450,6 @@ def manage_tcp_server():
logging.critical(f"[TCP-SERVER] Gagal Start: {e}")
print(f"[TCP-SERVER] Gagal Start: {e}")
def send_mllp_message(sock, hl7_msg):
"""Membungkus pesan HL7 dengan MLLP (Minimal Lower Layer Protocol)"""
# Start Block: 0x0B (<VT>)
# End Block: 0x1C 0x0D (<FS><CR>)
mllp_msg = b'\x0b' + hl7_msg.encode('utf-8') + b'\x1c\r'
sock.sendall(mllp_msg)
def handle_genexpert_client(conn, addr):
print(f"[GenExpert_TCP] Koneksi baru dari {addr}")
buffer = b""
@@ -825,26 +678,6 @@ def handle_genexpert_client(conn, addr):
conn.close()
logging.info(f"[GenExpert_TCP] Koneksi {addr} ditutup.")
def run_tcp_server():
"""Loop utama TCP Server"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server.bind((SERVER_HOST, TCP_LISTENER_PORT))
server.listen(5) # Backlog 5 koneksi
logging.info(f"TCP Listener AKTIF di port {TCP_LISTENER_PORT}. Menunggu koneksi GeneXpert...")
print(f"TCP Listener AKTIF di port {TCP_LISTENER_PORT}. Menunggu koneksi GeneXpert...")
while True:
conn, addr = server.accept()
# Buat thread baru untuk setiap alat yg konek
t = threading.Thread(target=handle_genexpert_client, args=(conn, addr), daemon=True)
t.start()
except Exception as e:
logging.critical(f"Gagal menjalankan TCP Server: {e}")
print(f"Gagal menjalankan TCP Server: {e}")
def send_order_via_active_connection(target_ip, hl7_message):
conn = None
with connection_lock:
@@ -878,35 +711,6 @@ def send_order_via_active_connection(target_ip, hl7_message):
del active_genexpert_connections[target_ip]
return False
def broadcast_order_to_all_machines(hl7_message, order_id):
connected_ips = []
with connection_lock:
connected_ips = list(active_genexpert_connections.keys())
if not connected_ips:
logging.warning(f"Order {order_id} GAGAL dikirim: Tidak ada GeneXpert yang terkoneksi saat ini.")
print(f"Order {order_id} GAGAL dikirim: Tidak ada GeneXpert yang terkoneksi saat ini.")
return False
success_count = 0
logging.info(f"Memulai broadcast Order {order_id} ke {len(connected_ips)} alat...")
print(f"Memulai broadcast Order {order_id} ke {len(connected_ips)} alat...")
for target_ip in connected_ips:
# Panggil fungsi kirim tunggal yang sudah kita buat sebelumnya
# Note: Fungsi send_order_via_active_connection ada di kode sebelumnya
if send_order_via_active_connection(target_ip, hl7_message):
logging.info(f" -> Sukses kirim ke {target_ip}")
print(f" -> Sukses kirim ke {target_ip}")
success_count += 1
else:
logging.error(f" -> Gagal kirim ke {target_ip}")
print(f" -> Gagal kirim ke {target_ip}")
# Logika bisnis: Dianggap sukses jika minimal terkirim ke SATU alat
if success_count > 0:
return True
else:
return False
# ==========================================
# VITEK PARSER
# ==========================================
@@ -1678,94 +1482,9 @@ def manage_bd_port(config):
time.sleep(5)
# ==========================================
# 5. ORDER POLLER (BROADCASTER)
# 5. Serial Manager
# ==========================================
def order_poller(stop_event):
"""Looping cek DB dan kirim order ke SEMUA alat"""
logging.info("Order Poller Berjalan...")
print("Order Poller Berjalan...")
while not stop_event.is_set():
session = SessionLocal()
try:
# 1. Ambil order yang belum dikirim (flag_genexpert = False)
orders = session.query(PaslabOrder).filter(
(PaslabOrder.flg_gxp1 == False) |
(PaslabOrder.flg_gxp2 == False) |
(PaslabOrder.flg_gxp3 == False)
).all()
# Update daftar koneksi aktif (Thread safe)
current_connections = {}
with connection_lock:
current_connections = active_genexpert_connections.copy()
if not orders:
time.sleep(5)
continue
for order in orders:
# Generate pesan HL7 sekali untuk order ini
hl7_msg = create_hl7_orm_message(order)
# --- LOGIKA KIRIM BERDASARKAN FLAG ---
# 1. Cek Target GXP 1
if order.flg_gxp1 == False:
target_ip = TARGET_MAPPING['flg_gxp1'] # 10.10.123.73
if target_ip in current_connections:
# Ada koneksi dari alat 1, kirim!
conn = current_connections[target_ip]
try:
send_mllp_message(conn, hl7_msg)
order.flg_gxp1 = True # Update Flag
logging.info(f"[SENT] Order {order.rnoreg} dikirim ke GXP-1 ({target_ip})")
print(f"[SENT] Order {order.rnoreg} dikirim ke GXP-1 ({target_ip})")
except Exception as e:
logging.error(f"[FAIL] Gagal kirim ke GXP-1: {e}")
print(f"[FAIL] Gagal kirim ke GXP-1: {e}")
else:
# Alat 1 belum connect, biarkan False (pending)
pass
# 2. Cek Target GXP 2
if order.flg_gxp2 == False:
target_ip = TARGET_MAPPING['flg_gxp2'] # 10.10.123.74
if target_ip in current_connections:
conn = current_connections[target_ip]
try:
send_mllp_message(conn, hl7_msg)
order.flg_gxp2 = True
logging.info(f"[SENT] Order {order.rnoreg} dikirim ke GXP-2 ({target_ip})")
print(f"[SENT] Order {order.rnoreg} dikirim ke GXP-2 ({target_ip})")
except Exception as e:
logging.error(f"[FAIL] Gagal kirim ke GXP-2: {e}")
print(f"[FAIL] Gagal kirim ke GXP-2: {e}")
# 3. Cek Target GXP 3
if order.flg_gxp3 == False:
target_ip = TARGET_MAPPING['flg_gxp3'] # 10.10.120.75
if target_ip in current_connections:
conn = current_connections[target_ip]
try:
send_mllp_message(conn, hl7_msg)
order.flg_gxp3 = True
logging.info(f"[SENT] Order {order.rnoreg} dikirim ke GXP-3 ({target_ip})")
print(f"[SENT] Order {order.rnoreg} dikirim ke GXP-3 ({target_ip})")
except Exception as e:
logging.error(f"[FAIL] Gagal kirim ke GXP-3: {e}")
print(f"[FAIL] Gagal kirim ke GXP-3: {e}")
# Commit perubahan flag ke database
session.commit()
except Exception as e:
logging.error(f"Error Poller: {e}")
print(f"Error Poller: {e}")
session.rollback()
finally:
session.close()
time.sleep(5) # Cek DB setiap 5 detik
def manage_serial_port(config):
"""Fungsi router yang memilih manajer yang tepat berdasarkan tipe alat."""
device_type = config.get('device_type')
@@ -1778,7 +1497,7 @@ def manage_serial_port(config):
print(f"Tipe alat tidak diketahui: '{device_type}' untuk port {config.get('port')}. Thread dihentikan.")
# ==========================================
# 7. MAIN EXECUTION
# 6. MAIN EXECUTION
# ==========================================
if __name__ == "__main__":
print("--- MEMULAI LIS INTERFACE SYSTEM ---")
@@ -1787,9 +1506,9 @@ if __name__ == "__main__":
all_threads = []
stop_event = threading.Event()
# 1. Start Thread Order Poller (Pengecek Order Baru di DB)
t_poller = threading.Thread(target=order_poller, args=(stop_event,), name="OrderPoller", daemon=True)
t_poller.start()
all_threads.append(t_poller)
#t_poller = threading.Thread(target=order_poller, args=(stop_event,), name="OrderPoller", daemon=True)
#t_poller.start()
#all_threads.append(t_poller)
# 2. Start Thread Serial Manager (Vitek & BD)
for config in DEVICE_CONFIGS: