Fix tp error persistence

This commit is contained in:
Dwi Swandhana
2026-02-19 09:24:28 +07:00
parent b5ee2d995e
commit 6d859d1c6a
+173 -11
View File
@@ -12,7 +12,7 @@ import datetime
import traceback
import serial # type: ignore
from flask import Flask, jsonify # type: ignore
from flask import Flask, jsonify, request # type: ignore
from sqlalchemy import create_engine, Column, Integer, String, Boolean, Text # type: ignore
from sqlalchemy import DateTime as SqDateTime # type: ignore
from sqlalchemy import Date as SqDate # type: ignore
@@ -25,6 +25,7 @@ from sqlalchemy.orm import declarative_base, sessionmaker # type: ignore
# Network Configuration
TCP_LISTENER_PORT = 6001 # PC GeneXpert set ke mode Client, konek ke IP:PORT ini
SERVER_HOST = '0.0.0.0' # Listen di semua interface
HTTP_API_PORT = 6002 # Endpoint trigger dari Laravel -> Python
# Mapping Flag ke IP Address GeneXpert
# Pastikan IP ini SESUAI dengan settingan "Server IP" di masing-masing alat (Client Mode)
TARGET_MAPPING = {
@@ -77,6 +78,8 @@ logging.basicConfig(level=logging.INFO, handlers=[log_handler])
app = Flask(__name__)
active_genexpert_connections = {}
connection_lock = threading.Lock()
pending_result_queries = {}
pending_query_lock = threading.Lock()
DEVICE_CONFIGS = [
{
'port': 'COM6', 'baud_rate': 9600, 'device_type': 'vitek', 'alat_name': 'Vitek 1',
@@ -244,6 +247,115 @@ def extract_msg_control_id(hl7_message):
return None
except:
return None
def build_genexpert_result_query(accnumber, msg_control_id):
ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
# Query hasil berbasis accession number di QRD-8.
query_msg = (
f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{ts}||QRY^Q02|{msg_control_id}|P|2.5\r"
f"QRD|{ts}|R|I|{msg_control_id}|||1^RD|{accnumber}|OTH|||T\r"
)
return query_msg
def select_target_genexpert_ip(preferred_ip=None):
with connection_lock:
if preferred_ip:
if preferred_ip in active_genexpert_connections:
return preferred_ip
return None
active_ips = list(active_genexpert_connections.keys())
if len(active_ips) == 1:
return active_ips[0]
return None
def trigger_result_query_to_genexpert(accnumber, register_no, target_ip=None, wait_seconds=20):
resolved_ip = select_target_genexpert_ip(target_ip)
if not resolved_ip:
return {"ok": False, "message": "Koneksi GeneXpert tidak ditemukan atau ambigu. Isi target_ip."}
with connection_lock:
conn = active_genexpert_connections.get(resolved_ip)
if not conn:
return {"ok": False, "message": f"Koneksi GeneXpert {resolved_ip} tidak aktif."}
ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
msg_control_id = f"LISQRY{ts}{int(time.time() * 1000) % 1000:03d}"
query_message = build_genexpert_result_query(accnumber, msg_control_id)
mllp_payload = f"\x0b{query_message}\x1c\r".encode("utf-8")
pending_event = threading.Event()
with pending_query_lock:
pending_result_queries[accnumber] = {
"register_no": register_no,
"target_ip": resolved_ip,
"msg_control_id": msg_control_id,
"requested_at": datetime.datetime.now(),
"event": pending_event,
"status": "requested",
}
try:
conn.sendall(mllp_payload)
logging.info(f"[GENEXPERT-QUERY] Kirim query hasil accnumber={accnumber} ke {resolved_ip}")
print(f"[GENEXPERT-QUERY] Kirim query hasil accnumber={accnumber} ke {resolved_ip}")
except Exception as e:
with pending_query_lock:
pending_result_queries.pop(accnumber, None)
return {"ok": False, "message": f"Gagal kirim query ke GeneXpert {resolved_ip}: {e}"}
if wait_seconds and wait_seconds > 0:
pending_event.wait(wait_seconds)
with pending_query_lock:
state = pending_result_queries.get(accnumber)
if state and state.get("status") == "found":
pending_result_queries.pop(accnumber, None)
return {
"ok": True,
"message": "Hasil ditemukan dan disimpan ke LisPhoenix.",
"target_ip": resolved_ip,
"accnumber": accnumber,
}
# Timeout/hasil belum masuk, biarkan state tetap ada agar response telat tetap bisa diproses.
return {
"ok": True,
"message": "Query terkirim. Menunggu hasil dari GeneXpert.",
"target_ip": resolved_ip,
"accnumber": accnumber,
}
return {
"ok": True,
"message": "Query terkirim.",
"target_ip": resolved_ip,
"accnumber": accnumber,
}
@app.route("/api/genexpert/query-result", methods=["POST"])
def api_query_genexpert_result():
payload = request.get_json(silent=True) or {}
accnumber = str(payload.get("accnumber") or "").strip()
register_no = str(payload.get("register_no") or payload.get("nomor_register") or "").strip()
target_ip = str(payload.get("target_ip") or "").strip() or None
try:
wait_seconds = int(payload.get("wait_seconds", 20))
except Exception:
wait_seconds = 20
if not accnumber:
return jsonify({"ok": False, "message": "Field 'accnumber' wajib diisi."}), 400
if not register_no:
return jsonify({"ok": False, "message": "Field 'register_no' (nomor register pasien) wajib diisi."}), 400
result = trigger_result_query_to_genexpert(
accnumber=accnumber,
register_no=register_no,
target_ip=target_ip,
wait_seconds=wait_seconds,
)
status_code = 200 if result.get("ok") else 409
return jsonify(result), status_code
def parse_hl7_result(conn, msg_id, hl7_message, device_name="GeneXpert"):
session = SessionLocal()
@@ -325,30 +437,47 @@ def parse_hl7_result(conn, msg_id, hl7_message, device_name="GeneXpert"):
else:
final_result = "; ".join(results_list)
# 4. Validasi Data Penting (DIMODIFIKASI)
# 4. Validasi Data Penting
if not sample_id:
send_all_orders(conn, device_name, clean_hl7, msg_id)
print("[HL7 Parser] Meminta order")
logging.warning("[HL7 Parser] ORU diterima tanpa accession/sample_id. Data diabaikan.")
print("[HL7 Parser] ORU diterima tanpa accession/sample_id. Data diabaikan.")
return
mapped_no_id = sample_id
mapped_seq_no = patient_id
mapped_alat = device_name
pending_event = None
with pending_query_lock:
pending = pending_result_queries.get(sample_id)
if pending:
mapped_no_id = pending.get("register_no") or sample_id
mapped_seq_no = sample_id
mapped_alat = f"GeneXpert-{pending.get('target_ip')}"
pending["status"] = "found"
pending["response_at"] = datetime.datetime.now()
pending_event = pending.get("event")
logging.info(f"[HL7 Parser] Menyimpan hasil untuk Sample: {sample_id}")
print(f"[HL7 Parser] Menyimpan hasil untuk Sample: {sample_id}")
# 5. Simpan ke Database
new_data = LisPhoenix(
no_id=sample_id,
seq_no=patient_id,
no_id=mapped_no_id,
seq_no=mapped_seq_no,
rnmpas=patient_name,
tgl_data=result_date,
rawdt=hl7_message,
organisme=final_result,
alat=device_name
alat=mapped_alat
)
session.add(new_data)
session.commit()
logging.info(f"[DB] Berhasil simpan ke LisPhoenix: {sample_id}")
print(f"[DB] Berhasil simpan ke LisPhoenix: {sample_id}")
if pending_event:
pending_event.set()
except Exception as e:
logging.error(f"[HL7 Parser] Error menyimpan data: {e}")
@@ -450,15 +579,26 @@ def manage_tcp_server():
logging.critical(f"[TCP-SERVER] Gagal Start: {e}")
print(f"[TCP-SERVER] Gagal Start: {e}")
def run_http_api_server():
logging.info(f"[HTTP-API] Listening di port {HTTP_API_PORT}...")
print(f"[HTTP-API] Listening di port {HTTP_API_PORT}...")
app.run(host=SERVER_HOST, port=HTTP_API_PORT, debug=False, use_reloader=False, threaded=True)
def handle_genexpert_client(conn, addr):
print(f"[GenExpert_TCP] Koneksi baru dari {addr}")
buffer = b""
conn.settimeout(60)
client_ip = addr[0]
with connection_lock:
active_genexpert_connections[client_ip] = conn
logging.info(f"[GenExpert_TCP] Register koneksi aktif {client_ip}")
try:
while True:
try:
data = conn.recv(4096)
if not data:
logging.info(f"[GenExpert_TCP] Client {addr} menutup koneksi.")
break
buffer += data
@@ -519,6 +659,7 @@ def handle_genexpert_client(conn, addr):
lines = clean_hl7.split('\r')
msh_fields = lines[0].split('|')
incoming_control_id = msh_fields[9] if len(msh_fields) > 9 else "UNKNOWN"
msg_id = incoming_control_id
# ==========================================
# SKENARIO 1: ALAT BERTANYA (QUERY / QRY)
# ==========================================
@@ -625,7 +766,7 @@ def handle_genexpert_client(conn, addr):
logging.info(f"[RESULT] Menerima Hasil Lab.")
# 1. Parse dan Simpan Hasil (Panggil fungsi parser Anda)
parse_hl7_result(clean_hl7, msg_id, clean_hl7, device_name=f"GeneXpert-{addr[0]}")
parse_hl7_result(conn, msg_id, clean_hl7, device_name=f"GeneXpert-{addr[0]}")
# 2. Kirim ACK (Terima Kasih)
ack_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
@@ -670,15 +811,31 @@ def handle_genexpert_client(conn, addr):
# Jika pesan lengkap tapi tidak ada MSH (misal cuma EOT doang)
pass
except ConnectionResetError:
logging.warning(f"[GenExpert_TCP] Connection reset by peer: {addr}")
break
except OSError as e:
if getattr(e, "winerror", None) == 10054:
logging.warning(f"[GenExpert_TCP] WinError 10054 dari {addr}")
break
raise
except socket.timeout:
continue
except Exception as e:
print(f"[Loop Error] {e}")
# Jangan break, lanjut terima data
time.sleep(0.1)
logging.exception(f"[Loop Error] Unexpected error from {addr}: {e}")
break
except Exception as e:
logging.error(f"[GenExpert_TCP Error] Koneksi {addr} terputus: {e}")
finally:
conn.close()
with connection_lock:
if active_genexpert_connections.get(client_ip) is conn:
del active_genexpert_connections[client_ip]
try:
conn.close()
except Exception:
pass
logging.info(f"[GenExpert_TCP] Koneksi {addr} ditutup.")
def send_order_via_active_connection(target_ip, hl7_message):
@@ -1562,6 +1719,11 @@ if __name__ == "__main__":
t_tcp.start()
all_threads.append(t_tcp)
# 4. Start Thread HTTP API (Trigger dari Laravel)
t_http = threading.Thread(target=run_http_api_server, name="Manager-HTTP-API", daemon=True)
t_http.start()
all_threads.append(t_http)
# 5. LOOP UTAMA (Keep-Alive & Monitoring)
# Ini sekarang bisa berjalan karena Flask sudah dipindah ke thread
try: