This commit is contained in:
Dwi Swandhana
2026-04-14 09:21:30 +07:00
parent 20ca52ca4d
commit 0c17c91f92
3 changed files with 291 additions and 102 deletions
@@ -62,7 +62,7 @@ class DokterController extends Controller
{
protected $lisServiceUrl;
public function __construct() {
$this->lisServiceUrl = env('LIS_SERVICE_URL', 'http://10.10.120.72:5000');
$this->lisServiceUrl = env('LIS_SERVICE_URL', 'http://10.10.120.14:6002');
}
protected static function genSurat($id, $tabel){
if ($tabel == 'dengan kop' OR $tabel == 'PDF'){
@@ -472,8 +472,6 @@ class DokterController extends Controller
$tglverifikasi = $periksa->verifikasi ?? $today;
$cekganda = Periksa::where('noregister', $noregister)->where('nmadendum', $nmadendum)->where('nm_spesimen', 'SPUTUM')->where('id', '!=', $idperiksa)->get();
$statusdraft = 'Proses Analisis Sampel';
KomponenJawaban::where('accnumber', $nofoto)->where('template', $dlp)->delete();
$data = $request->except(['_token', 'periksa_id', 'val01', 'val10', 'acc_number']);
foreach ($data as $key => $value) {
@@ -585,7 +583,6 @@ class DokterController extends Controller
$cekanalis = optional(User::find($analis));
$cekppds3 = optional(User::find($ppds3));
switch ($kerja) {
case 'verifikasi':
$updateData = [
'keterangan' => $keterangan,
@@ -1262,9 +1262,9 @@ class FrontpageController extends Controller
$pesan = $nofoto;
$daftaridtcm = [79,80,81,82,83,84,203, 14,28,55,62,94,104,105,124,138,146];
if (in_array($poli_id, $daftaridtcm)){
$genexpert = true;
} else {
$genexpert = false;
} else {
$genexpert = 1;
}
Paslab::updateOrCreate(
[
+288 -96
View File
@@ -41,6 +41,8 @@ pending_result_queries = {}
pending_query_lock = threading.Lock()
scheduled_result_queries = {}
scheduled_result_query_lock = threading.Lock()
genexpert_query_inflight_by_ip = {}
genexpert_query_inflight_lock = threading.Lock()
# Network Configuration
TCP_LISTENER_PORT = 6001 # PC GeneXpert set ke mode Client, konek ke IP:PORT ini
SERVER_HOST = '0.0.0.0' # Listen di semua interface
@@ -48,6 +50,7 @@ HTTP_API_PORT = 6002 # Endpoint trigger dari Laravel -> Python
GENEXPERT_RESULT_QUERY_INITIAL_DELAY_SECONDS = 60
GENEXPERT_RESULT_QUERY_INTERVAL_SECONDS = 120
GENEXPERT_RESULT_QUERY_MAX_DURATION_SECONDS = 21600
GENEXPERT_RESULT_QUERY_INFLIGHT_TIMEOUT_SECONDS = 45
# Mapping Flag ke IP Address GeneXpert
# Pastikan IP ini SESUAI dengan settingan "Server IP" di masing-masing alat (Client Mode)
TARGET_MAPPING = {
@@ -222,48 +225,135 @@ def get_pending_orders(ip_addr):
finally:
session.close()
def send_all_orders(conn, ip_addr, hl7_msg, msg_id):
orders = get_pending_orders(ip_addr)
def parse_hl7_segments(hl7_message):
return [segment for segment in str(hl7_message or "").split('\r') if segment]
def extract_segment(hl7_message, segment_name):
prefix = f"{segment_name}|"
for segment in parse_hl7_segments(hl7_message):
if segment.startswith(prefix):
return segment
return ""
def parse_genexpert_qpd(qpd_segment):
fields = str(qpd_segment or "").split('|')
query_name = fields[1] if len(fields) > 1 else ""
query_tag = fields[2] if len(fields) > 2 else ""
param_1 = fields[3] if len(fields) > 3 else ""
param_2 = fields[4] if len(fields) > 4 else ""
return {
"query_name": query_name,
"query_tag": query_tag,
"param_1": param_1,
"param_2": param_2,
}
def get_genexpert_query_orders(ip_addr, hl7_msg):
flag = get_flag_by_device(ip_addr)
if not flag:
return []
qpd_segment = extract_segment(hl7_msg, "QPD")
qpd = parse_genexpert_qpd(qpd_segment)
param_1 = str(qpd.get("param_1") or "").strip()
param_2 = str(qpd.get("param_2") or "").strip()
session = SessionLocal()
scheduled_orders = []
try:
if not orders:
print(f"[GENEXPERT] Tidak ada order pending untuk {ip_addr}")
rsp = (
f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}||RSP^Z03|{msg_id}|P|2.5\r"
f"MSA|AA|{msg_id}\r"
f"QAK|NF|{msg_id}\r"
)
conn.sendall(f"\x0b{rsp}\x1c\r".encode('utf-8'))
return
flag_attr = getattr(PaslabOrder, flag, None)
if flag_attr is None:
return []
print(f"[GENEXPERT] Mengirim {len(orders)} order ke {ip_addr}")
query = session.query(PaslabOrder).filter((flag_attr == False) | (flag_attr == None))
for order in orders:
rsp = create_hl7_dsr_response(order, msg_id, "")
print(f"[GENEXPERT] Order ditemukan untuk {ip_addr}")
mllp = f"\x0b{rsp}\x1c\r"
conn.sendall(mllp.encode('utf-8'))
flag = get_flag_by_device(ip_addr)
setattr(order, flag, True)
session.add(order)
scheduled_orders.append({
"accnumber": str(order.rnoreg or "").strip(),
"register_no": str(order.rnoreg or "").strip(),
"target_ip": ip_addr,
})
if param_1.upper() == "ALL":
return query.order_by(PaslabOrder.urut.asc()).all()
session.commit()
for scheduled_order in scheduled_orders:
schedule_result_query_for_order(
accnumber=scheduled_order["accnumber"],
register_no=scheduled_order["register_no"],
target_ip=scheduled_order["target_ip"],
)
requested_sample_id = param_2 or param_1
if requested_sample_id:
return query.filter(PaslabOrder.rnoreg == requested_sample_id).order_by(PaslabOrder.urut.asc()).all()
return query.order_by(PaslabOrder.urut.asc()).all()
finally:
session.close()
def build_genexpert_response_msh(message_code, incoming_hl7, resp_control_id):
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
msh_fields = extract_segment(incoming_hl7, "MSH").split('|')
sender_app = msh_fields[2] if len(msh_fields) > 2 else "GeneXpert"
sender_fac = msh_fields[3] if len(msh_fields) > 3 else ""
return f"MSH|^~\\&|LIS||{sender_app}|{sender_fac}|{timestamp}||{message_code}|{resp_control_id}|P|2.5|||NE|NE"
def create_genexpert_ack_j01_response(incoming_hl7):
incoming_control_id = extract_msg_control_id(incoming_hl7) or "UNKNOWN"
resp_control_id = f"ACK{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"
msh = build_genexpert_response_msh("ACK^J01", incoming_hl7, resp_control_id)
msa = f"MSA|CA|{incoming_control_id}"
return f"{msh}\r{msa}\r"
def create_genexpert_rsp_z02_response(orders, incoming_hl7):
qpd_segment = extract_segment(incoming_hl7, "QPD")
qpd = parse_genexpert_qpd(qpd_segment)
query_tag = qpd.get("query_tag") or (extract_msg_control_id(incoming_hl7) or "UNKNOWN")
query_name = qpd.get("query_name") or "Z03^HOST QUERY"
resp_control_id = f"RSP{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}"
msh = build_genexpert_response_msh("RSP^Z02", incoming_hl7, resp_control_id)
msa = f"MSA|AA|{query_tag}"
qak = f"QAK|{query_tag}|OK|{query_name}"
segments = [msh, msa, qak]
if qpd_segment:
segments.append(qpd_segment)
for patient_idx, order in enumerate(orders, start=1):
patient_id = str(order.norm or order.rnoreg or "").strip()
sample_id = str(order.rnoreg or "").strip()
assay_name = str(order.tes or "").strip()
test_code = GENEXPERT_TEST_MAPPING.get(assay_name, DEFAULT_GXP_CODE)
order_ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
segments.append(f"PID|{patient_idx}||{patient_id}")
segments.append(f"ORC|NW|{patient_idx}|||||||{order_ts}")
segments.append(f"OBR|{patient_idx}|||{test_code}|||||||A")
segments.append("TQ1|||||||||R")
segments.append(f"SPM|{patient_idx}|{sample_id}^||ORH|||||||P")
return "\r".join(segments) + "\r"
def send_all_orders(conn, ip_addr, hl7_msg, msg_id):
orders = get_genexpert_query_orders(ip_addr, hl7_msg)
scheduled_orders = []
if not orders:
print(f"[GENEXPERT] Tidak ada order pending untuk {ip_addr}")
rsp = create_genexpert_rsp_z02_response([], hl7_msg)
log_genexpert_hl7("OUT", ip_addr, rsp, label="qbp-empty")
log_genexpert_hl7_full("OUT", ip_addr, rsp, label="qbp-empty")
conn.sendall(f"\x0b{rsp}\x1c\r".encode('utf-8'))
return
print(f"[GENEXPERT] Mengirim {len(orders)} order ke {ip_addr}")
rsp = create_genexpert_rsp_z02_response(orders, hl7_msg)
mllp = f"\x0b{rsp}\x1c\r"
first_accnumber = str(orders[0].rnoreg or "").strip() if orders else ""
log_genexpert_hl7("OUT", ip_addr, rsp, label=f"qbp-order:{first_accnumber}")
log_genexpert_hl7_full("OUT", ip_addr, rsp, label=f"qbp-order:{first_accnumber}")
conn.sendall(mllp.encode('utf-8'))
for order in orders:
print(f"[GENEXPERT] Order ditawarkan ke {ip_addr}: {order.rnoreg}")
scheduled_orders.append({
"accnumber": str(order.rnoreg or "").strip(),
"register_no": str(order.rnoreg or "").strip(),
"target_ip": ip_addr,
})
for scheduled_order in scheduled_orders:
schedule_result_query_for_order(
accnumber=scheduled_order["accnumber"],
register_no=scheduled_order["register_no"],
target_ip=scheduled_order["target_ip"],
)
def extract_msg_control_id(hl7_message):
try:
segments = hl7_message.split('\r')
@@ -284,6 +374,38 @@ def extract_message_type(hl7_message):
except:
return ""
def build_hl7_preview(hl7_message, max_segments=4):
try:
segments = [segment.strip() for segment in str(hl7_message or "").split('\r') if segment.strip()]
preview = " | ".join(segments[:max_segments])
return preview[:800]
except Exception:
return str(hl7_message or "")[:800]
def log_genexpert_hl7(direction, ip_addr, hl7_message, label=""):
message_type = extract_message_type(hl7_message) or "UNKNOWN"
control_id = extract_msg_control_id(hl7_message) or "UNKNOWN"
suffix = f", label={label}" if label else ""
preview = build_hl7_preview(hl7_message)
logging.info(
f"[GENEXPERT-HL7-{direction}] ip={ip_addr}, type={message_type}, control_id={control_id}{suffix}, payload={preview}"
)
print(
f"[GENEXPERT-HL7-{direction}] ip={ip_addr}, type={message_type}, control_id={control_id}{suffix}, payload={preview}"
)
def log_genexpert_hl7_full(direction, ip_addr, hl7_message, label=""):
message_type = extract_message_type(hl7_message) or "UNKNOWN"
control_id = extract_msg_control_id(hl7_message) or "UNKNOWN"
suffix = f", label={label}" if label else ""
payload = str(hl7_message or "").replace("\r", "\\r\n")
logging.info(
f"[GENEXPERT-HL7-{direction}-FULL] ip={ip_addr}, type={message_type}, control_id={control_id}{suffix}, payload={payload}"
)
print(
f"[GENEXPERT-HL7-{direction}-FULL] ip={ip_addr}, type={message_type}, control_id={control_id}{suffix}, payload={payload}"
)
def build_genexpert_result_query(accnumber, msg_control_id):
ts = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
# Query hasil berbasis accession number di QRD-8.
@@ -297,6 +419,51 @@ def get_active_genexpert_ips():
with connection_lock:
return list(active_genexpert_connections.keys())
def clear_genexpert_inflight_for_ip(ip_addr, reason="cleared"):
ip_addr = str(ip_addr or "").strip()
if not ip_addr:
return False
with genexpert_query_inflight_lock:
state = genexpert_query_inflight_by_ip.pop(ip_addr, None)
if state:
logging.info(
f"[GENEXPERT-QUERY] Clear inflight ip={ip_addr}, reason={reason}, accnumber={state.get('accnumber')}"
)
print(
f"[GENEXPERT-QUERY] Clear inflight ip={ip_addr}, reason={reason}, accnumber={state.get('accnumber')}"
)
return True
return False
def stop_all_scheduled_result_queries(reason="no-active-genexpert"):
with scheduled_result_query_lock:
if not scheduled_result_queries:
return 0
stopped_accnumbers = list(scheduled_result_queries.keys())
for accnumber in stopped_accnumbers:
state = scheduled_result_queries.get(accnumber)
if not state:
continue
state["status"] = reason
stop_event = state.get("stop_event")
if stop_event:
stop_event.set()
scheduled_result_queries.clear()
logging.info(
f"[GENEXPERT-SCHEDULER] Stop semua jadwal, reason={reason}, total={len(stopped_accnumbers)}"
)
print(
f"[GENEXPERT-SCHEDULER] Stop semua jadwal, reason={reason}, total={len(stopped_accnumbers)}"
)
with genexpert_query_inflight_lock:
genexpert_query_inflight_by_ip.clear()
return len(stopped_accnumbers)
def stop_scheduled_result_query(accnumber, reason="completed"):
accnumber = str(accnumber or "").strip()
if not accnumber:
@@ -318,6 +485,10 @@ def stop_scheduled_result_query(accnumber, reason="completed"):
def scheduled_result_query_worker(accnumber):
while True:
if not get_active_genexpert_ips():
stop_all_scheduled_result_queries(reason="no-active-genexpert")
return
with scheduled_result_query_lock:
state = scheduled_result_queries.get(accnumber)
if not state:
@@ -368,10 +539,6 @@ def scheduled_result_query_worker(accnumber):
state["last_requested_at"] = datetime.datetime.now()
attempt_no = state["attempt"]
logging.info(
f"[GENEXPERT-SCHEDULER] Trigger query hasil accnumber={accnumber}, "
f"register_no={register_no}, target_ip={target_ip}, attempt={attempt_no}"
)
print(
f"[GENEXPERT-SCHEDULER] Trigger query hasil accnumber={accnumber}, "
f"register_no={register_no}, target_ip={target_ip}, attempt={attempt_no}"
@@ -389,6 +556,10 @@ def scheduled_result_query_worker(accnumber):
if not state:
return
state["last_result"] = result
if result.get("ok"):
state["last_successful_query_at"] = datetime.datetime.now()
else:
state["last_error"] = result.get("message")
def schedule_result_query_for_order(
accnumber,
@@ -403,7 +574,6 @@ def schedule_result_query_for_order(
target_ip = str(target_ip or "").strip() or None
if not accnumber:
logging.warning("[GENEXPERT-SCHEDULER] Jadwal query hasil dilewati karena accnumber kosong.")
print("[GENEXPERT-SCHEDULER] Jadwal query hasil dilewati karena accnumber kosong.")
return False
@@ -416,7 +586,6 @@ def schedule_result_query_for_order(
existing["interval_seconds"] = interval_seconds
existing["max_duration_seconds"] = max_duration_seconds
existing["status"] = "active"
logging.info(f"[GENEXPERT-SCHEDULER] Jadwal sudah aktif untuk accnumber={accnumber}")
print(f"[GENEXPERT-SCHEDULER] Jadwal sudah aktif untuk accnumber={accnumber}")
return True
@@ -441,12 +610,6 @@ def schedule_result_query_for_order(
}
worker.start()
logging.info(
f"[GENEXPERT-SCHEDULER] Jadwal dibuat accnumber={accnumber}, "
f"register_no={register_no}, target_ip={target_ip}, "
f"initial_delay={initial_delay_seconds}s, interval={interval_seconds}s, "
f"max_duration={max_duration_seconds}s"
)
print(
f"[GENEXPERT-SCHEDULER] Jadwal dibuat accnumber={accnumber}, "
f"register_no={register_no}, target_ip={target_ip}, "
@@ -486,17 +649,48 @@ def trigger_result_query_to_genexpert(accnumber, register_no, target_ip=None, wa
sent_ips = []
failed_ips = []
for ip in target_ips:
now = datetime.datetime.now()
with genexpert_query_inflight_lock:
inflight = genexpert_query_inflight_by_ip.get(ip)
if inflight:
inflight_age = max(
(now - inflight.get("requested_at", now)).total_seconds(),
0
)
if inflight_age < GENEXPERT_RESULT_QUERY_INFLIGHT_TIMEOUT_SECONDS:
failed_ips.append({
"ip": ip,
"error": f"query-inflight:{inflight.get('accnumber')}",
})
print(
f"[GENEXPERT-QUERY] Skip query accnumber={accnumber} ke {ip} "
f"karena masih menunggu accnumber={inflight.get('accnumber')} "
f"({int(inflight_age)}s)"
)
continue
genexpert_query_inflight_by_ip.pop(ip, None)
with connection_lock:
conn = active_genexpert_connections.get(ip)
if not conn:
failed_ips.append({"ip": ip, "error": "connection-not-active"})
continue
try:
log_genexpert_hl7("OUT", ip, query_message, label=f"result-query:{accnumber}")
log_genexpert_hl7_full("OUT", ip, query_message, label=f"result-query:{accnumber}")
conn.sendall(mllp_payload)
with genexpert_query_inflight_lock:
genexpert_query_inflight_by_ip[ip] = {
"accnumber": accnumber,
"register_no": register_no,
"requested_at": now,
"msg_control_id": msg_control_id,
}
sent_ips.append(ip)
print(f"[GENEXPERT-QUERY] Kirim query hasil accnumber={accnumber} ke {ip}")
except Exception as e:
failed_ips.append({"ip": ip, "error": str(e)})
clear_genexpert_inflight_for_ip(ip, reason="send-failed")
if not sent_ips:
with pending_query_lock:
@@ -679,8 +873,22 @@ def parse_hl7_result(conn, msg_id, hl7_message, device_name="GeneXpert"):
pending["source_ip"] = source_ip
pending["response_at"] = datetime.datetime.now()
pending_event = pending.get("event")
clear_genexpert_inflight_for_ip(source_ip, reason="result-received")
source_ip = ""
try:
source_ip = str(conn.getpeername()[0]).strip()
except Exception:
source_ip = str(device_name).replace("GeneXpert-", "").replace(",", " ").strip()
if source_ip:
flag_name = get_flag_by_device(source_ip)
if flag_name:
order = session.query(PaslabOrder).filter(PaslabOrder.rnoreg == sample_id).first()
if order:
setattr(order, flag_name, True)
print(f"[GENEXPERT] Hasil diterima, set {flag_name}=TRUE untuk {sample_id}")
logging.info(f"[HL7 Parser] Menyimpan hasil untuk Sample: {sample_id}")
print(f"[HL7 Parser] Menyimpan hasil untuk Sample: {sample_id}")
# 5. Simpan ke Database
@@ -696,7 +904,6 @@ def parse_hl7_result(conn, msg_id, hl7_message, device_name="GeneXpert"):
session.add(new_data)
session.commit()
logging.info(f"[DB] Berhasil simpan ke LisPhoenix: {sample_id}")
print(f"[DB] Berhasil simpan ke LisPhoenix: {sample_id}")
stop_scheduled_result_query(sample_id, reason="result-received")
if pending_event:
@@ -849,7 +1056,6 @@ def parse_myla_result(hl7_message, device_name="MYLA"):
sample_id = f"ERR_MYLA_{datetime.datetime.now().strftime('%H%M%S')}"
final_result_str = f"[NO_ID] {final_result_str}"
logging.info(f"[MYLA] Save DB -> Sample: {sample_id}, Hasil: {final_result_str}")
new_entry = LisPhoenix(
no_id=sample_id,
@@ -995,9 +1201,8 @@ def parse_myla_astm_records(raw_message, device_name="MYLA"):
)
session.add(new_entry)
session.commit()
logging.info(f"[MYLA-ASTM] Save DB -> Sample: {sample_id}, Hasil: {final_result_str}")
else:
logging.info("[MYLA-ASTM] ASTM message diterima (tanpa hasil R untuk disimpan).")
print("[MYLA-ASTM] ASTM message diterima (tanpa hasil R untuk disimpan).")
except Exception as e:
logging.error(f"[MYLA-ASTM] Error parse/simpan: {e}")
session.rollback()
@@ -1179,7 +1384,6 @@ def process_myla_hl7_message(conn, hl7_str, peer_ip):
pass
if "ORU^" in message_type or message_type.startswith("OUL^R22"):
logging.info(f"[MYLA-HL7] Menerima hasil dari {peer_ip} (type={message_type}, ID: {incoming_control_id})")
print(f"[MYLA-HL7] Menerima hasil dari {peer_ip} (type={message_type}, ID: {incoming_control_id})")
parse_myla_result(hl7_str, device_name=f"MYLA-{peer_ip}")
if incoming_control_id:
@@ -1325,7 +1529,7 @@ def send_order_to_myla_hl7(conn, order, peer_ip):
hl7_message = create_myla_hl7_order_message(order, msg_control_id)
mllp_payload = f"\x0b{hl7_message}\x1c\r".encode("utf-8")
msh_line = hl7_message.split('\r')[0]
logging.info(f"[MYLA-HL7] Kirim rnoreg={order.rnoreg}, MSH={msh_line}")
print(f"[MYLA-HL7] Kirim rnoreg={order.rnoreg}, MSH={msh_line}")
conn.sendall(mllp_payload)
ack_ok = wait_for_myla_hl7_ack(
@@ -1343,7 +1547,7 @@ def handle_myla_client(conn, addr):
TCP Handler untuk bioMérieux MYLA.
Hanya menerima HL7 berbungkus MLLP (\x0b ... \x1c\r).
"""
logging.info(f"[MYLA-TCP] Koneksi baru dari {addr}")
print(f"[MYLA-TCP] Koneksi baru dari {addr}")
buffer = b""
try:
@@ -1377,13 +1581,12 @@ def handle_myla_client(conn, addr):
# --- PROSES HASIL (ORU / OUL^R22) ---
if "ORU^" in hl7_str or "OUL^R22" in hl7_str:
logging.info(f"[MYLA] Menerima hasil (ORU/OUL) ID: {incoming_control_id}")
print(f"[MYLA] Menerima hasil (ORU/OUL) ID: {incoming_control_id}")
parse_myla_result(hl7_str, device_name=f"MYLA-{addr[0]}")
# --- PROSES QUERY (QRY/QBP) - JIKA MYLA BERTANYA ORDER ---
elif "QRY^" in hl7_str or "QBP^" in hl7_str:
client_ip = addr[0]
logging.info(f"[MYLA] Menerima Query dari {client_ip} (Control ID: {incoming_control_id})")
print(f"[MYLA] Menerima Query dari {client_ip} (Control ID: {incoming_control_id})")
# Khusus MyLA gunakan flag VITEK3.
target_flag_col = "flg_vitek3"
@@ -1435,12 +1638,10 @@ def handle_myla_client(conn, addr):
setattr(order, target_flag_col, True)
session.commit()
logging.info(f"[MYLA] Order terkirim rnoreg={order.rnoreg}, set {target_flag_col}=TRUE")
print(f"[MYLA] Order terkirim rnoreg={order.rnoreg}, set {target_flag_col}=TRUE")
else:
reply_msg = create_hl7_dsr_response(None, incoming_control_id, qrd_line)
conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8'))
logging.info(f"[MYLA] Tidak ada order pending untuk sample={search_sample_id or '-'}")
print(f"[MYLA] Tidak ada order pending untuk sample={search_sample_id or '-'}")
except Exception as db_err:
@@ -1459,7 +1660,6 @@ def handle_myla_client(conn, addr):
ack_msg = f"MSH|^~\\&|LIS|LAB|MYLA|bioMerieux|{ack_time}||ACK|{incoming_control_id}|P|2.5\rMSA|AA|{incoming_control_id}\r"
full_ack = f"\x0b{ack_msg}\x1c\r"
conn.sendall(full_ack.encode('utf-8'))
logging.info(f"[MYLA ACK] Terkirim untuk ID {incoming_control_id}")
print(f"[MYLA ACK] Terkirim untuk ID {incoming_control_id}")
# Sisakan bagian terakhir di buffer (kalau ada pesan yang terpotong)
@@ -1486,11 +1686,9 @@ def start_myla_server(host, port):
incoming_buffer = b""
last_idle_log_at = 0.0
try:
logging.info(f"[MYLA-CLIENT] Mencoba konek ke MYLA {host}:{port} ...")
print(f"[MYLA-CLIENT] Mencoba konek ke MYLA {host}:{port} ...")
conn = socket.create_connection((host, port), timeout=10)
conn.settimeout(1.0)
logging.info(f"[MYLA-CLIENT] Terhubung ke MYLA {host}:{port}")
print(f"[MYLA-CLIENT] Terhubung ke MYLA {host}:{port}")
while True:
@@ -1499,10 +1697,6 @@ def start_myla_server(host, port):
if not pending_orders:
now_ts = time.time()
if (now_ts - last_idle_log_at) >= MYLA_IDLE_LOG_INTERVAL_SECONDS:
logging.info(
"[MYLA-CLIENT] Polling aktif: tidak ada order pending "
"(flg_vitek3 = FALSE/NULL)."
)
print(
"[MYLA-CLIENT] Polling aktif: tidak ada order pending "
"(flg_vitek3 = FALSE/NULL)."
@@ -1511,7 +1705,6 @@ def start_myla_server(host, port):
time.sleep(MYLA_POLL_INTERVAL_SECONDS)
continue
logging.info(f"[MYLA-CLIENT] Ditemukan {len(pending_orders)} order pending untuk MYLA")
print(f"[MYLA-CLIENT] Ditemukan {len(pending_orders)} order pending untuk MYLA")
last_idle_log_at = 0.0
@@ -1519,7 +1712,6 @@ def start_myla_server(host, port):
send_ok = send_order_to_myla_hl7(conn, order, host)
if send_ok:
mark_myla_order_sent(order.urut)
logging.info(f"[MYLA-HL7] Order sukses, set flg_vitek3=TRUE rnoreg={order.rnoreg}")
print(f"[MYLA-HL7] Order sukses, set flg_vitek3=TRUE rnoreg={order.rnoreg}")
else:
logging.warning(
@@ -1556,7 +1748,6 @@ def start_myla_inbound_server(host, port):
try:
server.bind((host, port))
server.listen(5)
logging.info(f"[MYLA-INBOUND] Listening di {host}:{port}")
print(f"[MYLA-INBOUND] Listening di {host}:{port}")
while True:
@@ -1589,7 +1780,6 @@ def manage_tcp_server():
try:
server.bind((SERVER_HOST, TCP_LISTENER_PORT))
server.listen(5) # Bisa antri 5 koneksi
logging.info(f"[TCP-SERVER] Listening GeneXpert di port {TCP_LISTENER_PORT}...")
print(f"[TCP-SERVER] Listening GeneXpert di port {TCP_LISTENER_PORT}...")
while True:
# Accept koneksi baru (Blocking, tapi aman karena di thread sendiri)
@@ -1608,7 +1798,6 @@ def manage_tcp_server():
print(f"[TCP-SERVER] Gagal Start: {e}")
def run_http_api_server():
logging.info(f"[HTTP-API] Listening di port {HTTP_API_PORT}...")
print(f"[HTTP-API] Listening di port {HTTP_API_PORT}...")
app.run(host=SERVER_HOST, port=HTTP_API_PORT, debug=False, use_reloader=False, threaded=True)
@@ -1619,14 +1808,14 @@ def handle_genexpert_client(conn, addr):
client_ip = addr[0]
with connection_lock:
active_genexpert_connections[client_ip] = conn
logging.info(f"[GenExpert_TCP] Register koneksi aktif {client_ip}")
print(f"[GenExpert_TCP] Register koneksi aktif {client_ip}")
try:
while True:
try:
data = conn.recv(4096)
if not data:
logging.info(f"[GenExpert_TCP] Client {addr} menutup koneksi.")
print(f"[GenExpert_TCP] Client {addr} menutup koneksi.")
break
buffer += data
@@ -1684,6 +1873,7 @@ def handle_genexpert_client(conn, addr):
if "MSH|" in temp_str:
msh_index = temp_str.find("MSH|")
clean_hl7 = temp_str[msh_index:]
log_genexpert_hl7("IN", addr[0], clean_hl7)
lines = clean_hl7.split('\r')
msh_fields = lines[0].split('|')
incoming_control_id = msh_fields[9] if len(msh_fields) > 9 else "UNKNOWN"
@@ -1709,10 +1899,11 @@ def handle_genexpert_client(conn, addr):
print(f"[DENIED] IP {client_ip} tidak terdaftar di TARGET_MAPPING. Abaikan.")
# Kirim jawaban kosong agar alat tidak hang
reply_msg = create_hl7_dsr_response(None, incoming_control_id, "")
log_genexpert_hl7("OUT", client_ip, reply_msg, label="query-denied-empty")
log_genexpert_hl7_full("OUT", client_ip, reply_msg, label="query-denied-empty")
conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8'))
continue # Skip proses selanjutnya
logging.info(f"[TARGET] IP {client_ip} akan mengecek kolom: {target_flag_col}")
print(f"[TARGET] IP {client_ip} akan mengecek kolom: {target_flag_col}")
# 2. Cari Sample ID di pesan QRY
@@ -1740,7 +1931,6 @@ def handle_genexpert_client(conn, addr):
print(f"Kolom '{target_flag_col}' tidak ditemukan di Model Paslab!")
raise Exception("Invalid Column Name")
logging.info(f"[DB LOOKUP] Mencari {search_sample_id} dimana {target_flag_col} == False")
print(f"[DB LOOKUP] Mencari {search_sample_id} dimana {target_flag_col} == False")
order = session.query(PaslabOrder).filter(
@@ -1758,18 +1948,12 @@ def handle_genexpert_client(conn, addr):
# 4. Kirim Respon
if send_order and order:
logging.info(f"[FOUND] Order ditemukan: {order.nama}. Mengirim ke {client_ip}...")
print(f"[FOUND] Order ditemukan: {order.nama}. Mengirim ke {client_ip}...")
reply_msg = create_hl7_dsr_response(order, incoming_control_id, qrd_line)
log_genexpert_hl7("OUT", client_ip, reply_msg, label=f"query-order:{search_sample_id}")
log_genexpert_hl7_full("OUT", client_ip, reply_msg, label=f"query-order:{search_sample_id}")
conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8'))
# 5. UPDATE FLAG DINAMIS (PENTING)
# Set kolom yang sesuai (misal flg_gxp3) menjadi True
setattr(order, target_flag_col, True)
session.commit()
logging.info(f"[UPDATE] {target_flag_col} diset TRUE untuk {search_sample_id}")
print(f"[UPDATE] {target_flag_col} diset TRUE untuk {search_sample_id}")
schedule_result_query_for_order(
accnumber=str(order.rnoreg or "").strip(),
register_no=str(order.rnoreg or "").strip(),
@@ -1777,10 +1961,11 @@ def handle_genexpert_client(conn, addr):
)
else:
logging.warning(f"[NOT FOUND/ALREADY SENT] Tidak ada order baru untuk {search_sample_id} di kolom {target_flag_col}")
print(f"[NOT FOUND/ALREADY SENT] Tidak ada order baru untuk {search_sample_id} di kolom {target_flag_col}")
# Kirim DSR Kosong (NF)
reply_msg = create_hl7_dsr_response(None, incoming_control_id, qrd_line)
log_genexpert_hl7("OUT", client_ip, reply_msg, label=f"query-empty:{search_sample_id}")
log_genexpert_hl7_full("OUT", client_ip, reply_msg, label=f"query-empty:{search_sample_id}")
conn.sendall(f"\x0b{reply_msg}\x1c\r".encode('utf-8'))
except Exception as db_err:
@@ -1796,7 +1981,7 @@ def handle_genexpert_client(conn, addr):
# SKENARIO 2: ALAT KIRIM HASIL (RESULT / ORU)
# ==========================================
elif "ORU^" in clean_hl7:
logging.info(f"[RESULT] Menerima Hasil Lab.")
print(f"[RESULT] Menerima Hasil Lab.")
# 1. Parse dan Simpan Hasil (Panggil fungsi parser Anda)
parse_hl7_result(conn, msg_id, clean_hl7, device_name=f"GeneXpert-{addr[0]}")
@@ -1805,8 +1990,8 @@ def handle_genexpert_client(conn, addr):
ack_time = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
ack_msg = f"MSH|^~\\&|LIS|LAB|GeneXpert|Cepheid|{ack_time}||ACK|{incoming_control_id}|P|2.5\rMSA|AA|{incoming_control_id}\r"
full_ack = f"\x0b{ack_msg}\x1c\r"
log_genexpert_hl7("OUT", addr[0], ack_msg, label="oru-ack")
conn.sendall(full_ack.encode('utf-8'))
logging.info(f"[ACK SENT] Untuk hasil ID {incoming_control_id}")
print(f"[ACK SENT] Untuk hasil ID {incoming_control_id}")
continue
@@ -1820,7 +2005,11 @@ def handle_genexpert_client(conn, addr):
continue
elif "QCN^J01" in clean_hl7:
logging.info(f"[GENEXPERT] Menerima konfirmasi query dari {addr[0]}.")
clear_genexpert_inflight_for_ip(addr[0], reason="query-confirmation")
ack_msg = create_genexpert_ack_j01_response(clean_hl7)
log_genexpert_hl7("OUT", addr[0], ack_msg, label="qcn-ack")
log_genexpert_hl7_full("OUT", addr[0], ack_msg, label="qcn-ack")
conn.sendall(f"\x0b{ack_msg}\x1c\r".encode('utf-8'))
print(f"[GENEXPERT] Menerima konfirmasi query dari {addr[0]}.")
continue
@@ -1841,6 +2030,7 @@ def handle_genexpert_client(conn, addr):
# Kirim ACK format MLLP
full_ack = f"\x0b{ack_msg}\x1c\r"
log_genexpert_hl7("OUT", addr[0], ack_msg, label="generic-ack")
conn.sendall(full_ack.encode('utf-8'))
print(f"[ACK] Terkirim untuk ID {msg_control_id}")
except Exception as e:
@@ -1870,6 +2060,10 @@ def handle_genexpert_client(conn, addr):
with connection_lock:
if active_genexpert_connections.get(client_ip) is conn:
del active_genexpert_connections[client_ip]
remaining_connections = len(active_genexpert_connections)
clear_genexpert_inflight_for_ip(client_ip, reason="connection-closed")
if remaining_connections == 0:
stop_all_scheduled_result_queries(reason="no-active-genexpert")
try:
conn.close()
except Exception:
@@ -1882,9 +2076,9 @@ def send_order_via_active_connection(target_ip, hl7_message):
conn = active_genexpert_connections.get(target_ip)
if not conn:
logging.warning(f"Gagal kirim Order: GeneXpert dengan IP {target_ip} BELUM TERKONEKSI ke Python.")
print(f"Gagal kirim Order: GeneXpert dengan IP {target_ip} BELUM TERKONEKSI ke Python.")
return False # Indikasi gagal
logging.warning(f"Gagal kirim Order: GeneXpert dengan IP {target_ip} BELUM TERKONEKSI ke Listener.")
print(f"Gagal kirim Order: GeneXpert dengan IP {target_ip} BELUM TERKONEKSI ke Listener.")
return False
try:
# Bungkus pesan dengan MLLP (Minimal Lower Layer Protocol) standard HL7
@@ -1892,13 +2086,14 @@ def send_order_via_active_connection(target_ip, hl7_message):
mllp_msg = f"\x0b{hl7_message}\x1c\r"
logging.info(f"Mengirim Order ke {target_ip}...")
print(f"Mengirim Order ke {target_ip}...")
conn.sendall(mllp_msg.encode('utf-8'))
# Opsi: Jika ingin menunggu ACK balasan untuk Order
# Namun hati-hati ini bisa blocking jika alat lambat
# ack = conn.recv(1024)
# logging.info(f"Dapat ACK Order dari {target_ip}: {ack}")
ack = conn.recv(1024)
logging.info(f"Dapat ACK Order dari {target_ip}: {ack}")
print(f"Dapat ACK Order dari {target_ip}: {ack}")
return True
except Exception as e:
logging.error(f"Error mengirim ke {target_ip}: {e}")
@@ -2011,7 +2206,6 @@ def parse_and_save_vitek_result(raw_data, port_name="VITEK"):
if existing_data:
# === SKENARIO PESAN KEDUA (UPDATE) ===
logging.info(f"[{port_name}] UPDATE Data (Tahap 2) -> ID: {sample_id}, Pasien: {patient_name}")
print(f"[{port_name}] UPDATE Data -> ID: {sample_id} (Hasil Lengkap)")
new_entry = LisPhoenix(
no_id=sample_id,
@@ -2026,7 +2220,6 @@ def parse_and_save_vitek_result(raw_data, port_name="VITEK"):
else:
# === SKENARIO PESAN PERTAMA (INSERT) ===
logging.info(f"[{port_name}] INSERT Data Baru (Tahap 1) -> ID: {sample_id}, Pasien: {patient_name}")
print(f"[{port_name}] INSERT Data -> ID: {sample_id} (Identifikasi Awal)")
new_entry = LisPhoenix(
@@ -2461,7 +2654,6 @@ def parse_and_save_bd_result(raw_data, port_name="BD Bactec"):
if specimen_type:
final_res_string += f" ({specimen_type})"
logging.info(f"[{port_name}] Save DB -> ID: {sample_id}, Pasien: {patient_name}, Hasil: {final_res_string}")
print(f"[{port_name}] Save DB -> ID: {sample_id}, Pasien: {patient_name}, Hasil: {final_res_string}")
new_entry = LisPhoenix(
no_id=sample_id,
@@ -2640,6 +2832,7 @@ def manage_bd_port(config):
# WAJIB: Kirim ACK agar alat lanjut kirim baris berikutnya
ser.write(b'\x06')
# logging.debug(f"[{port_name}] Frame diterima, ACK dikirim.")
print(f"[{port_name}] Frame diterima, ACK dikirim.")
except Exception as e:
logging.error(f"Error decode frame: {e}")
print(f"Error decode frame: {e}")
@@ -2784,7 +2977,6 @@ def manage_serial_port(config):
elif device_type in ['bd_mgit', 'bd_bactec', 'bd']:
manage_bd_port(config)
else:
logging.error(f"Tipe alat tidak diketahui: '{device_type}' untuk port {config.get('port')}. Thread dihentikan.")
print(f"Tipe alat tidak diketahui: '{device_type}' untuk port {config.get('port')}. Thread dihentikan.")
# ==========================================