package websocket import ( "api-service/internal/config" "api-service/internal/database" "context" "database/sql" "encoding/json" "fmt" "sync" "time" ) // Hub mengelola semua klien WebSocket type Hub struct { // Konfigurasi config config.WebSocketConfig // Klien clients map[*Client]bool clientsByID map[string]*Client clientsByIP map[string][]*Client clientsByStatic map[string]*Client // Ruangan rooms map[string]map[*Client]bool // Channel komunikasi broadcast chan WebSocketMessage register chan *Client unregister chan *Client messageQueue chan WebSocketMessage // Sinkronisasi mu sync.RWMutex // Context ctx context.Context cancel context.CancelFunc // Layanan eksternal dbService database.Service // Monitoring startTime time.Time messageCount int64 errorCount int64 activityLog []ActivityLog activityMu sync.RWMutex // Registry handler pesan messageRegistry *MessageRegistry broadcaster *Broadcaster } // ActivityLog menyimpan log aktivitas type ActivityLog struct { Timestamp time.Time `json:"timestamp"` Event string `json:"event"` ClientID string `json:"client_id"` Details string `json:"details"` } // DatabaseService mendefinisikan interface untuk layanan database type DatabaseService interface { Health() map[string]interface{} //ListDBs() []string ListenForChanges(ctx context.Context, dbName string, channels []string, callback func(channel, payload string)) error NotifyChange(dbName, channel, payload string) error GetDB(name string) (*sql.DB, error) GetPrimaryDB(name string) (*sql.DB, error) } // Global database service instance var ( dbService database.Service once sync.Once ) // Initialize the database connection func init() { once.Do(func() { dbService = database.New(config.LoadConfig()) if dbService == nil { panic("Failed to initialize database connection") } }) } // NewHub membuat hub baru dengan konfigurasi yang diberikan func NewHub(config config.WebSocketConfig) *Hub { ctx, cancel := context.WithCancel(context.Background()) hub := &Hub{ config: config, clients: make(map[*Client]bool), clientsByID: make(map[string]*Client), clientsByIP: make(map[string][]*Client), clientsByStatic: make(map[string]*Client), rooms: make(map[string]map[*Client]bool), broadcast: make(chan WebSocketMessage, 1000), register: make(chan *Client), unregister: make(chan *Client), messageQueue: make(chan WebSocketMessage, config.MessageQueueSize), ctx: ctx, cancel: cancel, dbService: dbService, startTime: time.Now(), activityLog: make([]ActivityLog, 0, config.ActivityLogSize), messageRegistry: NewMessageRegistry(), } // Setup default message handlers // SetupDefaultHandlers(hub.messageRegistry) // Setup database change listeners hub.setupDatabaseListeners() return hub } // Run menjalankan loop utama hub func (h *Hub) Run() { // Start queue workers for i := 0; i < h.config.QueueWorkers; i++ { go h.queueWorker(i) } for { select { case client := <-h.register: h.registerClient(client) case client := <-h.unregister: h.unregisterClient(client) case message := <-h.broadcast: h.messageCount++ h.broadcastToClients(message) case <-h.ctx.Done(): return } } } // queueWorker memproses pesan dari antrian func (h *Hub) queueWorker(workerID int) { for { select { case message := <-h.messageQueue: h.broadcast <- message case <-h.ctx.Done(): return } } } // registerClient mendaftarkan klien baru func (h *Hub) registerClient(client *Client) { h.mu.Lock() defer h.mu.Unlock() // Register di peta klien utama h.clients[client] = true // Register berdasarkan ID h.clientsByID[client.ID] = client // Register berdasarkan ID statis if client.StaticID != "" { h.clientsByStatic[client.StaticID] = client } // Register berdasarkan IP if h.clientsByIP[client.IPAddress] == nil { h.clientsByIP[client.IPAddress] = make([]*Client, 0) } h.clientsByIP[client.IPAddress] = append(h.clientsByIP[client.IPAddress], client) // Register di ruangan if client.Room != "" { if h.rooms[client.Room] == nil { h.rooms[client.Room] = make(map[*Client]bool) } h.rooms[client.Room][client] = true } // Log aktivitas h.logActivity("client_connected", client.ID, fmt.Sprintf("IP: %s, Static: %s, Room: %s", client.IPAddress, client.StaticID, client.Room)) } // unregisterClient menghapus klien func (h *Hub) unregisterClient(client *Client) { h.mu.Lock() defer h.mu.Unlock() if _, ok := h.clients[client]; ok { // Hapus dari peta klien utama delete(h.clients, client) close(client.Send) // Hapus dari peta berdasarkan ID delete(h.clientsByID, client.ID) // Hapus dari peta berdasarkan ID statis if client.StaticID != "" { delete(h.clientsByStatic, client.StaticID) } // Hapus dari peta berdasarkan IP if ipClients, exists := h.clientsByIP[client.IPAddress]; exists { for i, c := range ipClients { if c == client { h.clientsByIP[client.IPAddress] = append(ipClients[:i], ipClients[i+1:]...) break } } // Jika tidak ada lagi klien dari IP ini, hapus entri IP if len(h.clientsByIP[client.IPAddress]) == 0 { delete(h.clientsByIP, client.IPAddress) } } // Hapus dari ruangan if client.Room != "" { if room, exists := h.rooms[client.Room]; exists { delete(room, client) if len(room) == 0 { delete(h.rooms, client.Room) } } } } // Log aktivitas h.logActivity("client_disconnected", client.ID, fmt.Sprintf("IP: %s, Duration: %v", client.IPAddress, time.Since(client.connectedAt))) client.cancel() } // broadcastToClients mengirim pesan ke klien yang sesuai func (h *Hub) broadcastToClients(message WebSocketMessage) { h.mu.RLock() defer h.mu.RUnlock() if message.ClientID != "" { // Kirim ke klien tertentu if client, exists := h.clientsByID[message.ClientID]; exists { select { case client.Send <- message: default: go func() { h.unregister <- client }() } } return } // Periksa apakah ini pesan ruangan if message.Room != "" { if room, roomExists := h.rooms[message.Room]; roomExists { for client := range room { select { case client.Send <- message: default: go func(c *Client) { h.unregister <- c }(client) } } } return } // Broadcast ke semua klien for client := range h.clients { select { case client.Send <- message: default: go func(c *Client) { h.unregister <- c }(client) } } } // logActivity mencatat aktivitas func (h *Hub) logActivity(event, clientID, details string) { h.activityMu.Lock() defer h.activityMu.Unlock() activity := ActivityLog{ Timestamp: time.Now(), Event: event, ClientID: clientID, Details: details, } h.activityLog = append(h.activityLog, activity) // Pertahankan hanya aktivitas terakhir sesuai konfigurasi if len(h.activityLog) > h.config.ActivityLogSize { h.activityLog = h.activityLog[1:] } } // GetConfig mengembalikan konfigurasi hub func (h *Hub) GetConfig() config.WebSocketConfig { return h.config } // GetMessageRegistry mengembalikan registry pesan func (h *Hub) GetMessageRegistry() *MessageRegistry { return h.messageRegistry } // RegisterChannel mengembalikan channel register untuk handler func (h *Hub) RegisterChannel() chan *Client { return h.register } // GetStats mengembalikan statistik hub func (h *Hub) GetStats() map[string]interface{} { h.mu.RLock() defer h.mu.RUnlock() return map[string]interface{}{ "connected_clients": len(h.clients), "unique_ips": len(h.clientsByIP), "static_clients": len(h.clientsByStatic), "rooms": len(h.rooms), "message_count": h.messageCount, "error_count": h.errorCount, "uptime": time.Since(h.startTime).String(), "server_id": h.config.ServerID, "timestamp": time.Now().Unix(), } } // setupDatabaseListeners sets up database change listeners for real-time updates func (h *Hub) setupDatabaseListeners() { // Listen for changes on retribusi table channels := []string{"data_changes"} err := h.dbService.ListenForChanges(h.ctx, "simrs_prod", channels, func(channel, payload string) { h.handleDatabaseChange(channel, payload) }) if err != nil { h.logActivity("database_listener_error", "", fmt.Sprintf("Failed to setup listeners: %v", err)) } else { h.logActivity("database_listener_started", "", "Database change listeners initialized") } } // handleDatabaseChange processes database change notifications func (h *Hub) handleDatabaseChange(channel, payload string) { h.logActivity("database_change", "", fmt.Sprintf("Channel: %s, Payload: %s", channel, payload)) // Parse the payload to determine what changed var changeData map[string]interface{} if err := json.Unmarshal([]byte(payload), &changeData); err != nil { h.logActivity("database_change_parse_error", "", fmt.Sprintf("Failed to parse payload: %v", err)) return } // Create WebSocket message for broadcasting message := WebSocketMessage{ Type: DatabaseChangeMessage, Data: changeData, ClientID: "", Room: "data_updates", // Broadcast to data_updates room } // Broadcast the change to all connected clients in the data_updates room h.broadcast <- message } // GetDatabaseConnection returns a database connection for the specified database func (h *Hub) GetDatabaseConnection(dbName string) (*sql.DB, error) { return h.dbService.GetDB(dbName) } // ExecuteDatabaseQuery executes a database query and returns results func (h *Hub) ExecuteDatabaseQuery(dbName, query string, args ...interface{}) ([]map[string]interface{}, error) { db, err := h.GetDatabaseConnection(dbName) if err != nil { return nil, fmt.Errorf("failed to get database connection: %w", err) } ctx, cancel := context.WithTimeout(h.ctx, 30*time.Second) defer cancel() rows, err := db.QueryContext(ctx, query, args...) if err != nil { return nil, fmt.Errorf("failed to execute query: %w", err) } defer rows.Close() columns, err := rows.Columns() if err != nil { return nil, fmt.Errorf("failed to get columns: %w", err) } var results []map[string]interface{} for rows.Next() { values := make([]interface{}, len(columns)) valuePtrs := make([]interface{}, len(columns)) for i := range values { valuePtrs[i] = &values[i] } if err := rows.Scan(valuePtrs...); err != nil { return nil, fmt.Errorf("failed to scan row: %w", err) } row := make(map[string]interface{}) for i, col := range columns { val := values[i] if b, ok := val.([]byte); ok { val = string(b) } row[col] = val } results = append(results, row) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("rows iteration error: %w", err) } return results, nil } // NotifyDatabaseChange sends a notification to the database for real-time updates func (h *Hub) NotifyDatabaseChange(dbName, channel, payload string) error { return h.dbService.NotifyChange(dbName, channel, payload) }