diff --git a/client.html b/client.html new file mode 100644 index 00000000..585066f1 --- /dev/null +++ b/client.html @@ -0,0 +1,2251 @@ + + + + WebSocket Test Client + + + +
+

🌐 WebSocket Test Client - Enhanced & Optimized

+ + +
+
+ + Status: + Disconnected + +
+
+ Health: +
+ 🔴 Poor +
+
+
+ Uptime: 0s +
+
+ Client ID: + Not connected +
+
+ Reconnect: Not needed +
+
+ + + + + +
+ + + + + +
+ + +
+
+
+ + + + +
+ + +
+
+ + +
+
+ + +
+ + +
+ +
+ + + + + +
+
+
+ + +
+
+
+ + + +
+ +
+ + + + +
+ +
+ + + + + +
+
+
+ + +
+
+
+ + + + + +
+ +
+ + + + +
+
+
+ + +
+
+
+
0
+
Messages Received
+
+
+
0
+
Messages Sent
+
+
+
0ms
+
Connection Latency
+
+
+
0
+
Reconnection Attempts
+
+
+ +
+
+ + +
+
+ No users data available. Click "Refresh Users" to load. +
+
+
+
+
+ + +
+
+
+ ⚠️ Admin functions - Use with caution! +
+
+
+ + + + +
+ +
+ + + + +
+
+
+
+ + +
+

+ 📨 Messages + 0 messages + + +

+
+
+
+ + + + diff --git a/go.mod b/go.mod index 1df28dea..5dd4cb6e 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/go-playground/validator/v10 v10.27.0 github.com/go-sql-driver/mysql v1.8.1 github.com/joho/godotenv v1.5.1 + github.com/lib/pq v1.10.9 github.com/mashingan/smapping v0.1.19 github.com/rs/zerolog v1.34.0 github.com/swaggo/files v1.0.1 diff --git a/go.sum b/go.sum index 7f1663a5..7ecaabbd 100644 --- a/go.sum +++ b/go.sum @@ -134,6 +134,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= diff --git a/internal/database/database.go b/internal/database/database.go index ab056e0a..b7f5b4fa 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -14,6 +14,7 @@ import ( "api-service/internal/config" _ "github.com/jackc/pgx/v5" // Import pgx driver + "github.com/lib/pq" _ "gorm.io/driver/postgres" // Import GORM PostgreSQL driver _ "github.com/go-sql-driver/mysql" // MySQL driver for database/sql @@ -44,16 +45,22 @@ type Service interface { Close() error ListDBs() []string GetDBType(name string) (DatabaseType, error) + // Tambahkan method untuk WebSocket notifications + ListenForChanges(ctx context.Context, dbName string, channels []string, callback func(string, string)) error + NotifyChange(dbName, channel, payload string) error + GetPrimaryDB(name string) (*sql.DB, error) // Helper untuk get primary DB } type service struct { - sqlDatabases map[string]*sql.DB - mongoClients map[string]*mongo.Client - readReplicas map[string][]*sql.DB // Read replicas for load balancing - configs map[string]config.DatabaseConfig - readConfigs map[string][]config.DatabaseConfig - mu sync.RWMutex - readBalancer map[string]int // Round-robin counter for read replicas + sqlDatabases map[string]*sql.DB + mongoClients map[string]*mongo.Client + readReplicas map[string][]*sql.DB // Read replicas for load balancing + configs map[string]config.DatabaseConfig + readConfigs map[string][]config.DatabaseConfig + mu sync.RWMutex + readBalancer map[string]int // Round-robin counter for read replicas + listeners map[string]*pq.Listener // Tambahkan untuk tracking listeners + listenersMu sync.RWMutex } var ( @@ -71,6 +78,7 @@ func New(cfg *config.Config) Service { configs: make(map[string]config.DatabaseConfig), readConfigs: make(map[string][]config.DatabaseConfig), readBalancer: make(map[string]int), + listeners: make(map[string]*pq.Listener), } log.Println("Initializing database service...") // Log when the initialization starts @@ -372,7 +380,7 @@ func (s *service) Health() map[string]map[string]string { if db == nil { continue } - + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -441,7 +449,7 @@ func (s *service) GetDB(name string) (*sql.DB, error) { return nil, fmt.Errorf("database %s not found", name) } - log.Printf("Current connection pool state for %s: Open: %d, In Use: %d, Idle: %d", + log.Printf("Current connection pool state for %s: Open: %d, In Use: %d, Idle: %d", name, db.Stats().OpenConnections, db.Stats().InUse, db.Stats().Idle) s.mu.RLock() defer s.mu.RUnlock() @@ -469,7 +477,7 @@ func (s *service) GetReadDB(name string) (*sql.DB, error) { // Round-robin load balancing s.readBalancer[name] = (s.readBalancer[name] + 1) % len(replicas) selected := replicas[s.readBalancer[name]] - + if selected == nil { // Fallback to primary if replica is nil return s.GetDB(name) @@ -569,3 +577,123 @@ func (s *service) Close() error { return nil } + +// GetPrimaryDB returns primary database connection +func (s *service) GetPrimaryDB(name string) (*sql.DB, error) { + return s.GetDB(name) +} + +// ListenForChanges implements PostgreSQL LISTEN/NOTIFY for real-time updates +func (s *service) ListenForChanges(ctx context.Context, dbName string, channels []string, callback func(string, string)) error { + s.mu.RLock() + config, exists := s.configs[dbName] + s.mu.RUnlock() + + if !exists { + return fmt.Errorf("database %s not found", dbName) + } + + // Only support PostgreSQL for LISTEN/NOTIFY + if DatabaseType(config.Type) != Postgres { + return fmt.Errorf("LISTEN/NOTIFY only supported for PostgreSQL databases") + } + + // Create connection string for listener + connStr := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=%s", + config.Username, + config.Password, + config.Host, + config.Port, + config.Database, + config.SSLMode, + ) + + // Create listener + listener := pq.NewListener( + connStr, + 10*time.Second, + time.Minute, + func(ev pq.ListenerEventType, err error) { + if err != nil { + log.Printf("Database listener (%s) error: %v", dbName, err) + } + }, + ) + + // Store listener for cleanup + s.listenersMu.Lock() + s.listeners[dbName] = listener + s.listenersMu.Unlock() + + // Listen to specified channels + for _, channel := range channels { + err := listener.Listen(channel) + if err != nil { + listener.Close() + return fmt.Errorf("failed to listen to channel %s: %w", channel, err) + } + log.Printf("Listening to database channel: %s on %s", channel, dbName) + } + + // Start listening loop + go func() { + defer func() { + listener.Close() + s.listenersMu.Lock() + delete(s.listeners, dbName) + s.listenersMu.Unlock() + log.Printf("Database listener for %s stopped", dbName) + }() + + for { + select { + case n := <-listener.Notify: + if n != nil { + callback(n.Channel, n.Extra) + } + case <-ctx.Done(): + return + case <-time.After(90 * time.Second): + // Send ping to keep connection alive + go func() { + if err := listener.Ping(); err != nil { + log.Printf("Listener ping failed for %s: %v", dbName, err) + } + }() + } + } + }() + + return nil +} + +// NotifyChange sends a notification to a PostgreSQL channel +func (s *service) NotifyChange(dbName, channel, payload string) error { + db, err := s.GetDB(dbName) + if err != nil { + return fmt.Errorf("failed to get database %s: %w", dbName, err) + } + + // Check if it's PostgreSQL + s.mu.RLock() + config, exists := s.configs[dbName] + s.mu.RUnlock() + + if !exists { + return fmt.Errorf("database %s configuration not found", dbName) + } + + if DatabaseType(config.Type) != Postgres { + return fmt.Errorf("NOTIFY only supported for PostgreSQL databases") + } + + // Execute NOTIFY + query := "SELECT pg_notify($1, $2)" + _, err = db.Exec(query, channel, payload) + if err != nil { + return fmt.Errorf("failed to send notification: %w", err) + } + + log.Printf("Sent notification to channel %s on %s: %s", channel, dbName, payload) + return nil +} diff --git a/internal/handlers/websocket/websocket.go b/internal/handlers/websocket/websocket.go index d767b8fd..1338019b 100644 --- a/internal/handlers/websocket/websocket.go +++ b/internal/handlers/websocket/websocket.go @@ -1,12 +1,19 @@ package websocket import ( + "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "fmt" + "net" "net/http" + "strings" "sync" "time" + "api-service/internal/config" + "api-service/internal/database" "api-service/pkg/logger" "github.com/gin-gonic/gin" @@ -14,157 +21,221 @@ import ( "github.com/gorilla/websocket" ) -// WebSocketMessage represents a message structure for WebSocket communication +const ( + // Timeout configurations (diperpanjang untuk stability) + ReadTimeout = 300 * time.Second // 5 menit (diperpanjang dari 60 detik) + WriteTimeout = 30 * time.Second // 30 detik untuk write operations + PingInterval = 60 * time.Second // 1 menit untuk ping (lebih konservatif) + PongTimeout = 70 * time.Second // 70 detik untuk menunggu pong response + + // Buffer sizes + ReadBufferSize = 8192 // Diperbesar untuk pesan besar + WriteBufferSize = 8192 // Diperbesar untuk pesan besar + ChannelBufferSize = 512 // Buffer untuk channel komunikasi + + // Connection limits + MaxMessageSize = 8192 // Maksimum ukuran pesan + HandshakeTimeout = 45 * time.Second +) + type WebSocketMessage struct { Type string `json:"type"` Data interface{} `json:"data"` Timestamp time.Time `json:"timestamp"` ClientID string `json:"client_id,omitempty"` + MessageID string `json:"message_id,omitempty"` } -// Client represents a WebSocket client connection type Client struct { - ID string - Conn *websocket.Conn - Send chan WebSocketMessage - Hub *Hub - UserID string - Room string + ID string + StaticID string // Static ID for persistent identification + IPAddress string // Client IP address + Conn *websocket.Conn + Send chan WebSocketMessage + Hub *Hub + UserID string + Room string + ctx context.Context + cancel context.CancelFunc + lastPing time.Time + lastPong time.Time + connectedAt time.Time + mu sync.RWMutex + isActive bool } -// Hub manages WebSocket connections and broadcasting +type ClientInfo struct { + ID string `json:"id"` + StaticID string `json:"static_id"` + IPAddress string `json:"ip_address"` + UserID string `json:"user_id"` + Room string `json:"room"` + ConnectedAt time.Time `json:"connected_at"` + LastPing time.Time `json:"last_ping"` + LastPong time.Time `json:"last_pong"` + IsActive bool `json:"is_active"` +} +type DetailedStats struct { + ConnectedClients int `json:"connected_clients"` + UniqueIPs int `json:"unique_ips"` + StaticClients int `json:"static_clients"` + ActiveRooms int `json:"active_rooms"` + IPDistribution map[string]int `json:"ip_distribution"` + RoomDistribution map[string]int `json:"room_distribution"` + MessageQueueSize int `json:"message_queue_size"` + QueueWorkers int `json:"queue_workers"` + Uptime time.Duration `json:"uptime_seconds"` + Timestamp int64 `json:"timestamp"` +} + +type MonitoringData struct { + Stats DetailedStats `json:"stats"` + RecentActivity []ActivityLog `json:"recent_activity"` + SystemHealth map[string]interface{} `json:"system_health"` + Performance PerformanceMetrics `json:"performance"` +} + +type ActivityLog struct { + Timestamp time.Time `json:"timestamp"` + Event string `json:"event"` + ClientID string `json:"client_id"` + Details string `json:"details"` +} + +type PerformanceMetrics struct { + MessagesPerSecond float64 `json:"messages_per_second"` + AverageLatency float64 `json:"average_latency_ms"` + ErrorRate float64 `json:"error_rate_percent"` + MemoryUsage int64 `json:"memory_usage_bytes"` +} + +// Tambahkan field untuk monitoring di Hub type Hub struct { - clients map[*Client]bool - broadcast chan WebSocketMessage - register chan *Client - unregister chan *Client - rooms map[string]map[*Client]bool - mu sync.RWMutex + clients map[*Client]bool + clientsByID map[string]*Client // Track clients by ID + clientsByIP map[string][]*Client // Track clients by IP + clientsByStatic map[string]*Client // Track clients by static ID + broadcast chan WebSocketMessage + register chan *Client + unregister chan *Client + rooms map[string]map[*Client]bool + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + dbService database.Service + messageQueue chan WebSocketMessage + queueWorkers int + + // Monitoring fields + startTime time.Time + messageCount int64 + errorCount int64 + activityLog []ActivityLog + activityMu sync.RWMutex } -// NewHub creates a new WebSocket hub -func NewHub() *Hub { - return &Hub{ - clients: make(map[*Client]bool), - broadcast: make(chan WebSocketMessage), - register: make(chan *Client), - unregister: make(chan *Client), - rooms: make(map[string]map[*Client]bool), - } -} - -// Run starts the hub and handles client registration/deregistration -func (h *Hub) Run() { - for { - select { - case client := <-h.register: - h.clients[client] = true - if client.Room != "" { - h.mu.Lock() - if h.rooms[client.Room] == nil { - h.rooms[client.Room] = make(map[*Client]bool) - } - h.rooms[client.Room][client] = true - h.mu.Unlock() - } - logger.Info(fmt.Sprintf("Client %s connected", client.ID)) - - case client := <-h.unregister: - if _, ok := h.clients[client]; ok { - delete(h.clients, client) - close(client.Send) - - if client.Room != "" { - h.mu.Lock() - if room, exists := h.rooms[client.Room]; exists { - delete(room, client) - if len(room) == 0 { - delete(h.rooms, client.Room) - } - } - h.mu.Unlock() - } - } - logger.Info(fmt.Sprintf("Client %s disconnected", client.ID)) - - case message := <-h.broadcast: - h.broadcastToClients(message) - } - } -} - -// broadcastToClients sends a message to appropriate clients -func (h *Hub) broadcastToClients(message WebSocketMessage) { - if message.ClientID != "" { - // Send to specific client - for client := range h.clients { - if client.ID == message.ClientID { - select { - case client.Send <- message: - default: - close(client.Send) - delete(h.clients, client) - } - break - } - } - } else if message.Type == "room" && message.Data.(map[string]interface{})["room"] != nil { - // Send to room - roomName := message.Data.(map[string]interface{})["room"].(string) - h.mu.RLock() - if room, exists := h.rooms[roomName]; exists { - for client := range room { - select { - case client.Send <- message: - default: - close(client.Send) - delete(h.clients, client) - } - } - } - h.mu.RUnlock() - } else { - // Broadcast to all clients - for client := range h.clients { - select { - case client.Send <- message: - default: - close(client.Send) - delete(h.clients, client) - } - } - } -} - -// WebSocketHandler handles WebSocket connections type WebSocketHandler struct { - hub *Hub - logger *logger.Logger - upgrader websocket.Upgrader - broadcaster *Broadcaster + hub *Hub + logger *logger.Logger + upgrader websocket.Upgrader + config *config.Config + dbService database.Service + primaryDB string } -// NewWebSocketHandler creates a new WebSocket handler -func NewWebSocketHandler() *WebSocketHandler { - hub := NewHub() - go hub.Run() +func NewWebSocketHandler(cfg *config.Config, dbService database.Service) *WebSocketHandler { + ctx, cancel := context.WithCancel(context.Background()) - return &WebSocketHandler{ - hub: hub, - logger: logger.Default(), + hub := &Hub{ + clients: make(map[*Client]bool), + clientsByID: make(map[string]*Client), + clientsByIP: make(map[string][]*Client), + clientsByStatic: make(map[string]*Client), + broadcast: make(chan WebSocketMessage, 1000), + register: make(chan *Client), + unregister: make(chan *Client), + rooms: make(map[string]map[*Client]bool), + ctx: ctx, + cancel: cancel, + dbService: dbService, + messageQueue: make(chan WebSocketMessage, 5000), + queueWorkers: 10, + startTime: time.Now(), + activityLog: make([]ActivityLog, 0, 1000), // Keep last 1000 activities + } + + handler := &WebSocketHandler{ + hub: hub, + logger: logger.Default(), + config: cfg, + dbService: dbService, + primaryDB: "default", upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { - // Allow connections from any origin in development - // In production, you should check the origin return true }, - ReadBufferSize: 1024, - WriteBufferSize: 1024, + ReadBufferSize: ReadBufferSize, // Gunakan konstanta + WriteBufferSize: WriteBufferSize, // Gunakan konstanta + HandshakeTimeout: HandshakeTimeout, // Gunakan konstanta + EnableCompression: true, }, } + + // Start hub and services + go hub.Run() + go handler.StartDatabaseListener() + // go handler.StartServerBroadcasters() + go handler.StartMessageQueue() + go handler.StartConnectionMonitor() + + return handler +} + +// Helper function to get client IP address +func getClientIP(c *gin.Context) string { + // Check for X-Forwarded-For header (proxy/load balancer) + xff := c.GetHeader("X-Forwarded-For") + if xff != "" { + ips := strings.Split(xff, ",") + if len(ips) > 0 { + return strings.TrimSpace(ips[0]) + } + } + + // Check for X-Real-IP header (nginx proxy) + xri := c.GetHeader("X-Real-IP") + if xri != "" { + return strings.TrimSpace(xri) + } + + // Get IP from RemoteAddr + ip, _, err := net.SplitHostPort(c.Request.RemoteAddr) + if err != nil { + return c.Request.RemoteAddr + } + + return ip +} + +// Generate static client ID based on IP and optional static ID +func generateClientID(ipAddress, staticID, userID string) string { + if staticID != "" { + // Use provided static ID + return staticID + } + + // Generate ID based on IP and userID + data := fmt.Sprintf("%s:%s:%d", ipAddress, userID, time.Now().Unix()/3600) // Hour-based for some uniqueness + hash := sha256.Sum256([]byte(data)) + return fmt.Sprintf("client_%s", hex.EncodeToString(hash[:8])) // Use first 8 bytes of hash +} + +// Generate IP-based static ID +func generateIPBasedID(ipAddress string) string { + hash := sha256.Sum256([]byte(ipAddress)) + return fmt.Sprintf("ip_%s", hex.EncodeToString(hash[:6])) // Use first 6 bytes of hash } -// HandleWebSocket handles WebSocket upgrade and connection func (h *WebSocketHandler) HandleWebSocket(c *gin.Context) { conn, err := h.upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { @@ -172,7 +243,7 @@ func (h *WebSocketHandler) HandleWebSocket(c *gin.Context) { return } - // Get user ID and room from query parameters or headers + // Get connection parameters userID := c.Query("user_id") if userID == "" { userID = "anonymous" @@ -183,147 +254,987 @@ func (h *WebSocketHandler) HandleWebSocket(c *gin.Context) { room = "default" } + staticID := c.Query("static_id") // Optional static ID + useIPBasedID := c.Query("ip_based") // Use IP-based ID if "true" + + // Get client IP address + ipAddress := getClientIP(c) + + var clientID string + + // Determine client ID generation strategy + if useIPBasedID == "true" { + clientID = generateIPBasedID(ipAddress) + staticID = clientID + } else if staticID != "" { + clientID = staticID + } else { + clientID = generateClientID(ipAddress, staticID, userID) + } + + // Check if client with same static ID already exists + h.hub.mu.Lock() + if existingClient, exists := h.hub.clientsByStatic[clientID]; exists { + h.logger.Info(fmt.Sprintf("Disconnecting existing client %s for reconnection", clientID)) + // Disconnect existing client + existingClient.cancel() + existingClient.Conn.Close() + delete(h.hub.clientsByStatic, clientID) + } + h.hub.mu.Unlock() + + ctx, cancel := context.WithCancel(h.hub.ctx) + client := &Client{ - ID: uuid.New().String(), - Conn: conn, - Send: make(chan WebSocketMessage, 256), - Hub: h.hub, - UserID: userID, - Room: room, + ID: clientID, + StaticID: clientID, + IPAddress: ipAddress, + Conn: conn, + Send: make(chan WebSocketMessage, 256), + Hub: h.hub, + UserID: userID, + Room: room, + ctx: ctx, + cancel: cancel, + lastPing: time.Now(), + connectedAt: time.Now(), } client.Hub.register <- client - // Send welcome message + // Send welcome message with connection info welcomeMsg := WebSocketMessage{ - Type: "welcome", - Data: map[string]string{"message": "Connected to WebSocket server"}, + Type: "welcome", + Data: map[string]interface{}{ + "message": "Connected to WebSocket server", + "client_id": client.ID, + "static_id": client.StaticID, + "ip_address": client.IPAddress, + "room": client.Room, + "user_id": client.UserID, + "connected_at": client.connectedAt.Unix(), + "id_type": h.getIDType(useIPBasedID, staticID), + }, Timestamp: time.Now(), - ClientID: client.ID, + MessageID: uuid.New().String(), } select { case client.Send <- welcomeMsg: default: close(client.Send) + cancel() return } - // Start goroutines for reading and writing go client.writePump() go client.readPump() } -// readPump reads messages from the WebSocket connection -func (c *Client) readPump() { - defer func() { - c.Hub.unregister <- c - c.Conn.Close() - }() - - c.Conn.SetReadLimit(512) - c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second)) - c.Conn.SetPongHandler(func(string) error { - c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second)) - return nil - }) +func (h *WebSocketHandler) getIDType(useIPBased, staticID string) string { + if useIPBased == "true" { + return "ip_based" + } else if staticID != "" { + return "static" + } + return "generated" +} +func (h *Hub) Run() { for { - _, message, err := c.Conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - logger.Error(fmt.Sprintf("WebSocket error: %v", err)) - } - break - } - - // Parse incoming message - var msg WebSocketMessage - if err := json.Unmarshal(message, &msg); err != nil { - logger.Error(fmt.Sprintf("Failed to parse message: %v", err)) - continue - } - - msg.Timestamp = time.Now() - msg.ClientID = c.ID - - // Send response back to the client - responseMsg := WebSocketMessage{ - Type: "response", - Data: map[string]interface{}{ - "message": "Message received from server: " + fmt.Sprintf("%v", msg.Data), - "original_type": msg.Type, - }, - Timestamp: time.Now(), - ClientID: c.ID, - } - select { - case c.Send <- responseMsg: - default: - close(c.Send) - delete(c.Hub.clients, c) - } + case client := <-h.register: + h.mu.Lock() - // Broadcast the message + // Register in main clients map + h.clients[client] = true + + // Register by ID + h.clientsByID[client.ID] = client + + // Register by static ID + if client.StaticID != "" { + h.clientsByStatic[client.StaticID] = client + } + + // Register by IP + if h.clientsByIP[client.IPAddress] == nil { + h.clientsByIP[client.IPAddress] = make([]*Client, 0) + } + h.clientsByIP[client.IPAddress] = append(h.clientsByIP[client.IPAddress], client) + + // Register in room + if client.Room != "" { + if h.rooms[client.Room] == nil { + h.rooms[client.Room] = make(map[*Client]bool) + } + h.rooms[client.Room][client] = true + } + + h.mu.Unlock() + + // Log activity + h.logActivity("client_connected", client.ID, + fmt.Sprintf("IP: %s, Static: %s, Room: %s", client.IPAddress, client.StaticID, client.Room)) + + logger.Info(fmt.Sprintf("Client %s (Static: %s, IP: %s) connected to room %s", + client.ID, client.StaticID, client.IPAddress, client.Room)) + + case client := <-h.unregister: + h.mu.Lock() + + if _, ok := h.clients[client]; ok { + // Remove from main clients + delete(h.clients, client) + close(client.Send) + + // Remove from clientsByID + delete(h.clientsByID, client.ID) + + // Remove from clientsByStatic + if client.StaticID != "" { + delete(h.clientsByStatic, client.StaticID) + } + + // Remove from clientsByIP + if ipClients, exists := h.clientsByIP[client.IPAddress]; exists { + for i, c := range ipClients { + if c == client { + h.clientsByIP[client.IPAddress] = append(ipClients[:i], ipClients[i+1:]...) + break + } + } + // If no more clients from this IP, remove the IP entry + if len(h.clientsByIP[client.IPAddress]) == 0 { + delete(h.clientsByIP, client.IPAddress) + } + } + + // Remove from room + if client.Room != "" { + if room, exists := h.rooms[client.Room]; exists { + delete(room, client) + if len(room) == 0 { + delete(h.rooms, client.Room) + } + } + } + } + + h.mu.Unlock() + + // Log activity + h.logActivity("client_disconnected", client.ID, + fmt.Sprintf("IP: %s, Duration: %v", client.IPAddress, time.Since(client.connectedAt))) + + client.cancel() + logger.Info(fmt.Sprintf("Client %s (IP: %s) disconnected", client.ID, client.IPAddress)) + + case message := <-h.broadcast: + h.messageCount++ + h.broadcastToClients(message) + + case <-h.ctx.Done(): + return + } + } +} + +// Enhanced message handling with client info +func (c *Client) handleMessage(msg WebSocketMessage) { + switch msg.Type { + case "ping": + // Respons ping dari client dengan informasi lebih lengkap + c.sendDirectResponse("pong", map[string]interface{}{ + "message": "Server is alive", + "timestamp": time.Now().Unix(), + "client_id": c.ID, + "static_id": c.StaticID, + "server_time": time.Now().Format(time.RFC3339), + "uptime": time.Since(c.connectedAt).Seconds(), + }) + + case "heartbeat": + // Tambahan: Handle heartbeat khusus + c.sendDirectResponse("heartbeat_ack", map[string]interface{}{ + "client_id": c.ID, + "timestamp": time.Now().Unix(), + "status": "alive", + }) + + case "connection_test": + // Tambahan: Test koneksi + c.sendDirectResponse("connection_test_result", map[string]interface{}{ + "latency_ms": 0, // Bisa dihitung jika perlu + "connection_id": c.ID, + "is_active": c.isClientActive(), + "last_ping": c.lastPing.Unix(), + "last_pong": c.lastPong.Unix(), + }) + + case "get_server_info": + c.Hub.mu.RLock() + connectedClients := len(c.Hub.clients) + roomsCount := len(c.Hub.rooms) + uniqueIPs := len(c.Hub.clientsByIP) + c.Hub.mu.RUnlock() + + c.sendDirectResponse("server_info", map[string]interface{}{ + "connected_clients": connectedClients, + "rooms_count": roomsCount, + "unique_ips": uniqueIPs, + "your_info": map[string]interface{}{ + "client_id": c.ID, + "static_id": c.StaticID, + "ip_address": c.IPAddress, + "user_id": c.UserID, + "room": c.Room, + "connected_at": c.connectedAt.Unix(), + }, + }) + + case "get_clients_by_ip": + c.handleGetClientsByIP(msg) + + case "get_client_info": + c.handleGetClientInfo(msg) + + case "direct_message": + c.handleDirectMessage(msg) + + case "room_message": + c.handleRoomMessage(msg) + + case "broadcast": + c.Hub.broadcast <- msg + c.sendDirectResponse("broadcast_sent", "Message broadcasted to all clients") + + case "get_online_users": + c.sendOnlineUsers() + + case "database_query": + c.handleDatabaseQuery(msg) + + default: + c.sendDirectResponse("message_received", fmt.Sprintf("Message received: %v", msg.Data)) c.Hub.broadcast <- msg } } -// writePump writes messages to the WebSocket connection +// Handle get clients by IP +func (c *Client) handleGetClientsByIP(msg WebSocketMessage) { + data, ok := msg.Data.(map[string]interface{}) + if !ok { + c.sendErrorResponse("Invalid request format", "Expected object with ip_address") + return + } + + targetIP, exists := data["ip_address"].(string) + if !exists { + targetIP = c.IPAddress // Default to current client's IP + } + + c.Hub.mu.RLock() + ipClients := c.Hub.clientsByIP[targetIP] + var clientInfos []ClientInfo + + for _, client := range ipClients { + clientInfos = append(clientInfos, ClientInfo{ + ID: client.ID, + StaticID: client.StaticID, + IPAddress: client.IPAddress, + UserID: client.UserID, + Room: client.Room, + ConnectedAt: client.connectedAt, + LastPing: client.lastPing, + }) + } + c.Hub.mu.RUnlock() + + c.sendDirectResponse("clients_by_ip", map[string]interface{}{ + "ip_address": targetIP, + "clients": clientInfos, + "count": len(clientInfos), + }) +} + +// Handle get specific client info +func (c *Client) handleGetClientInfo(msg WebSocketMessage) { + data, ok := msg.Data.(map[string]interface{}) + if !ok { + c.sendErrorResponse("Invalid request format", "Expected object with client_id or static_id") + return + } + + var targetClient *Client + + if clientID, exists := data["client_id"].(string); exists { + c.Hub.mu.RLock() + targetClient = c.Hub.clientsByID[clientID] + c.Hub.mu.RUnlock() + } else if staticID, exists := data["static_id"].(string); exists { + c.Hub.mu.RLock() + targetClient = c.Hub.clientsByStatic[staticID] + c.Hub.mu.RUnlock() + } + + if targetClient == nil { + c.sendErrorResponse("Client not found", "No client found with the specified ID") + return + } + + clientInfo := ClientInfo{ + ID: targetClient.ID, + StaticID: targetClient.StaticID, + IPAddress: targetClient.IPAddress, + UserID: targetClient.UserID, + Room: targetClient.Room, + ConnectedAt: targetClient.connectedAt, + LastPing: targetClient.lastPing, + } + + c.sendDirectResponse("client_info", clientInfo) +} + +// Enhanced online users with IP and static ID info +func (c *Client) sendOnlineUsers() { + c.Hub.mu.RLock() + var onlineUsers []map[string]interface{} + ipStats := make(map[string]int) + + for client := range c.Hub.clients { + onlineUsers = append(onlineUsers, map[string]interface{}{ + "client_id": client.ID, + "static_id": client.StaticID, + "user_id": client.UserID, + "room": client.Room, + "ip_address": client.IPAddress, + "connected_at": client.connectedAt.Unix(), + "last_ping": client.lastPing.Unix(), + }) + ipStats[client.IPAddress]++ + } + c.Hub.mu.RUnlock() + + c.sendDirectResponse("online_users", map[string]interface{}{ + "users": onlineUsers, + "total": len(onlineUsers), + "ip_stats": ipStats, + "unique_ips": len(ipStats), + }) +} + +// Enhanced database query handler +func (c *Client) handleDatabaseQuery(msg WebSocketMessage) { + data, ok := msg.Data.(map[string]interface{}) + if !ok { + c.sendErrorResponse("Invalid query format", "Expected object with query parameters") + return + } + + queryType, exists := data["query_type"].(string) + if !exists { + c.sendErrorResponse("Missing query_type", "query_type is required") + return + } + + switch queryType { + case "health_check": + health := c.Hub.dbService.Health() + c.sendDirectResponse("query_result", map[string]interface{}{ + "type": "health_check", + "result": health, + }) + + case "database_list": + dbList := c.Hub.dbService.ListDBs() + c.sendDirectResponse("query_result", map[string]interface{}{ + "type": "database_list", + "databases": dbList, + }) + + case "connection_stats": + c.Hub.mu.RLock() + stats := map[string]interface{}{ + "total_clients": len(c.Hub.clients), + "unique_ips": len(c.Hub.clientsByIP), + "static_clients": len(c.Hub.clientsByStatic), + "rooms": len(c.Hub.rooms), + } + c.Hub.mu.RUnlock() + + c.sendDirectResponse("query_result", map[string]interface{}{ + "type": "connection_stats", + "result": stats, + }) + + case "trigger_notification": + channel, channelExists := data["channel"].(string) + payload, payloadExists := data["payload"].(string) + + if !channelExists || !payloadExists { + c.sendErrorResponse("Missing Parameters", "channel and payload required") + return + } + + err := c.Hub.dbService.NotifyChange("default", channel, payload) + if err != nil { + c.sendErrorResponse("Notification Error", err.Error()) + return + } + + c.sendDirectResponse("query_result", map[string]interface{}{ + "type": "notification_sent", + "channel": channel, + "payload": payload, + }) + + default: + c.sendErrorResponse("Unsupported query", fmt.Sprintf("Query type '%s' not supported", queryType)) + } +} + +// Enhanced client search methods +func (h *WebSocketHandler) GetClientByID(clientID string) *Client { + h.hub.mu.RLock() + defer h.hub.mu.RUnlock() + return h.hub.clientsByID[clientID] +} + +func (h *WebSocketHandler) GetClientByStaticID(staticID string) *Client { + h.hub.mu.RLock() + defer h.hub.mu.RUnlock() + return h.hub.clientsByStatic[staticID] +} + +func (h *WebSocketHandler) GetClientsByIP(ipAddress string) []*Client { + h.hub.mu.RLock() + defer h.hub.mu.RUnlock() + return h.hub.clientsByIP[ipAddress] +} + +func (h *WebSocketHandler) SendToClientByStaticID(staticID string, messageType string, data interface{}) bool { + client := h.GetClientByStaticID(staticID) + if client == nil { + return false + } + + msg := WebSocketMessage{ + Type: messageType, + Data: data, + Timestamp: time.Now(), + ClientID: client.ID, + MessageID: uuid.New().String(), + } + + select { + case h.hub.messageQueue <- msg: + return true + default: + return false + } +} + +func (h *WebSocketHandler) BroadcastToIP(ipAddress string, messageType string, data interface{}) int { + clients := h.GetClientsByIP(ipAddress) + count := 0 + + for _, client := range clients { + msg := WebSocketMessage{ + Type: messageType, + Data: data, + Timestamp: time.Now(), + ClientID: client.ID, + MessageID: uuid.New().String(), + } + + select { + case h.hub.messageQueue <- msg: + count++ + default: + // Skip if queue is full + } + } + + return count +} + +// 1. SERVER BROADCAST DATA TANPA PERMINTAAN CLIENT +func (h *WebSocketHandler) StartServerBroadcasters() { + // Heartbeat broadcaster + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.hub.mu.RLock() + connectedClients := len(h.hub.clients) + uniqueIPs := len(h.hub.clientsByIP) + staticClients := len(h.hub.clientsByStatic) + h.hub.mu.RUnlock() + + h.BroadcastMessage("server_heartbeat", map[string]interface{}{ + "message": "Server heartbeat", + "connected_clients": connectedClients, + "unique_ips": uniqueIPs, + "static_clients": staticClients, + "timestamp": time.Now().Unix(), + "server_id": "api-service-v1", + }) + case <-h.hub.ctx.Done(): + return + } + } + }() + + // System notification broadcaster + go func() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + dbHealth := h.dbService.Health() + h.BroadcastMessage("system_status", map[string]interface{}{ + "type": "system_notification", + "database": dbHealth, + "timestamp": time.Now().Unix(), + "uptime": time.Since(time.Now()).String(), + }) + case <-h.hub.ctx.Done(): + return + } + } + }() + + // Data stream broadcaster (demo purpose) + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + counter := 0 + + for { + select { + case <-ticker.C: + counter++ + h.BroadcastMessage("data_stream", map[string]interface{}{ + "id": counter, + "value": counter * 10, + "timestamp": time.Now().Unix(), + "type": "real_time_data", + }) + case <-h.hub.ctx.Done(): + return + } + } + }() +} + +// 4. SERVER MENGIRIM DATA JIKA ADA PERUBAHAN DATABASE +func (h *WebSocketHandler) StartDatabaseListener() { + // Cek apakah database utama adalah PostgreSQL + dbType, err := h.dbService.GetDBType(h.primaryDB) + if err != nil || dbType != database.Postgres { + h.logger.Error(fmt.Sprintf("Database notifications require PostgreSQL. Current DB type: %v", dbType)) + return + } + + channels := []string{"retribusi_changes", "peserta_changes", "system_changes"} + + err = h.dbService.ListenForChanges(h.hub.ctx, h.primaryDB, channels, func(channel, payload string) { + var changeData map[string]interface{} + if err := json.Unmarshal([]byte(payload), &changeData); err != nil { + h.logger.Error(fmt.Sprintf("Failed to parse database notification: %v", err)) + // Kirim raw payload jika JSON parsing gagal + changeData = map[string]interface{}{ + "raw_payload": payload, + "parse_error": err.Error(), + } + } + + h.BroadcastMessage("database_change", map[string]interface{}{ + "channel": channel, + "operation": changeData["operation"], + "table": changeData["table"], + "data": changeData["data"], + "timestamp": time.Now().Unix(), + "database": h.primaryDB, + }) + + h.logger.Info(fmt.Sprintf("Database change broadcasted: %s from %s", channel, h.primaryDB)) + }) + + if err != nil { + h.logger.Error(fmt.Sprintf("Failed to start database listener: %v", err)) + } +} + +func (h *WebSocketHandler) StartMessageQueue() { + // Start multiple workers for message processing + for i := 0; i < h.hub.queueWorkers; i++ { + go func(workerID int) { + for { + select { + case message := <-h.hub.messageQueue: + h.hub.broadcast <- message + case <-h.hub.ctx.Done(): + return + } + } + }(i) + } +} + +func (h *Hub) broadcastToClients(message WebSocketMessage) { + h.mu.RLock() + defer h.mu.RUnlock() + + if message.ClientID != "" { + // Send to specific client + for client := range h.clients { + if client.ID == message.ClientID { + select { + case client.Send <- message: + default: + h.unregisterClient(client) + } + break + } + } + return + } + + // Check if it's a room message + if data, ok := message.Data.(map[string]interface{}); ok { + if roomName, exists := data["room"].(string); exists { + if room, roomExists := h.rooms[roomName]; roomExists { + for client := range room { + select { + case client.Send <- message: + default: + h.unregisterClient(client) + } + } + } + return + } + } + + // Broadcast to all clients + for client := range h.clients { + select { + case client.Send <- message: + default: + h.unregisterClient(client) + } + } +} + +func (h *Hub) unregisterClient(client *Client) { + go func() { + h.unregister <- client + }() +} + +// 3. CLIENT MENGIRIM DATA KE CLIENT LAIN +func (c *Client) handleDirectMessage(msg WebSocketMessage) { + data, ok := msg.Data.(map[string]interface{}) + if !ok { + c.sendErrorResponse("Invalid direct message format", "Expected object with message data") + return + } + + targetClientID, exists := data["target_client_id"].(string) + if !exists { + c.sendErrorResponse("Missing target", "target_client_id is required") + return + } + + directMsg := WebSocketMessage{ + Type: "direct_message_received", + Data: map[string]interface{}{ + "from": c.ID, + "from_static_id": c.StaticID, + "from_ip": c.IPAddress, + "from_user_id": c.UserID, + "message": data["message"], + "original_msg_id": msg.MessageID, + }, + Timestamp: time.Now(), + ClientID: targetClientID, + MessageID: uuid.New().String(), + } + + c.Hub.broadcast <- directMsg + c.sendDirectResponse("direct_message_sent", map[string]interface{}{ + "target_client": targetClientID, + "message_id": directMsg.MessageID, + }) +} + +func (c *Client) handleRoomMessage(msg WebSocketMessage) { + data, ok := msg.Data.(map[string]interface{}) + if !ok { + c.sendErrorResponse("Invalid room message format", "Expected object with message data") + return + } + + roomName, exists := data["room"].(string) + if !exists { + roomName = c.Room + } + + roomMsg := WebSocketMessage{ + Type: "room_message_received", + Data: map[string]interface{}{ + "room": roomName, + "from": c.ID, + "from_static_id": c.StaticID, + "from_ip": c.IPAddress, + "from_user_id": c.UserID, + "message": data["message"], + "original_msg_id": msg.MessageID, + }, + Timestamp: time.Now(), + MessageID: uuid.New().String(), + } + + // Send to room members + c.Hub.mu.RLock() + if room, exists := c.Hub.rooms[roomName]; exists { + for client := range room { + if client.ID != c.ID { + select { + case client.Send <- roomMsg: + default: + logger.Error(fmt.Sprintf("Failed to send room message to client %s", client.ID)) + } + } + } + } + c.Hub.mu.RUnlock() + + c.sendDirectResponse("room_message_sent", map[string]interface{}{ + "room": roomName, + "message_id": roomMsg.MessageID, + }) +} + +func (c *Client) readPump() { + defer func() { + c.Hub.unregister <- c + c.Conn.Close() + logger.Info(fmt.Sprintf("Client %s readPump terminated", c.ID)) + }() + + // Konfigurasi connection limits + c.Conn.SetReadLimit(MaxMessageSize) + c.resetReadDeadline() // Set initial deadline + + // Ping/Pong handlers dengan logging yang lebih baik + c.Conn.SetPingHandler(func(message string) error { + logger.Debug(fmt.Sprintf("Client %s received ping", c.ID)) + c.resetReadDeadline() + return c.Conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(WriteTimeout)) + }) + + c.Conn.SetPongHandler(func(message string) error { + c.mu.Lock() + c.lastPong = time.Now() + c.isActive = true + c.mu.Unlock() + c.resetReadDeadline() + logger.Debug(fmt.Sprintf("Client %s received pong", c.ID)) + return nil + }) + + for { + select { + case <-c.ctx.Done(): + logger.Info(fmt.Sprintf("Client %s context cancelled", c.ID)) + return + default: + _, message, err := c.Conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, + websocket.CloseGoingAway, + websocket.CloseAbnormalClosure, + websocket.CloseNormalClosure) { + logger.Error(fmt.Sprintf("WebSocket unexpected close for client %s: %v", c.ID, err)) + } else { + logger.Info(fmt.Sprintf("WebSocket closed for client %s: %v", c.ID, err)) + } + return + } + + // PENTING: Reset deadline setiap kali ada pesan masuk + c.resetReadDeadline() + c.updateLastActivity() + + var msg WebSocketMessage + if err := json.Unmarshal(message, &msg); err != nil { + c.sendErrorResponse("Invalid message format", err.Error()) + continue + } + + msg.Timestamp = time.Now() + msg.ClientID = c.ID + if msg.MessageID == "" { + msg.MessageID = uuid.New().String() + } + + c.handleMessage(msg) + } + } +} + +// resetReadDeadline - Reset read deadline dengan timeout yang lebih panjang +func (c *Client) resetReadDeadline() { + c.Conn.SetReadDeadline(time.Now().Add(ReadTimeout)) +} + +// updateLastActivity - Update waktu aktivitas terakhir +func (c *Client) updateLastActivity() { + c.mu.Lock() + defer c.mu.Unlock() + c.lastPing = time.Now() + c.isActive = true +} + +// sendPing - Kirim ping message dengan proper error handling +func (c *Client) sendPing() error { + c.Conn.SetWriteDeadline(time.Now().Add(WriteTimeout)) + if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return err + } + + c.mu.Lock() + c.lastPing = time.Now() + c.mu.Unlock() + + logger.Debug(fmt.Sprintf("Ping sent to client %s", c.ID)) + return nil +} + +// isPongTimeout - Cek apakah client sudah timeout dalam merespons pong +func (c *Client) isPongTimeout() bool { + c.mu.RLock() + defer c.mu.RUnlock() + + // Jika belum pernah menerima pong, gunakan lastPing sebagai baseline + lastActivity := c.lastPong + if lastActivity.IsZero() { + lastActivity = c.lastPing + } + + return time.Since(lastActivity) > PongTimeout +} + +// isClientActive - Cek apakah client masih aktif +func (c *Client) isClientActive() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.isActive && time.Since(c.lastPing) < PongTimeout +} + +// gracefulClose - Tutup koneksi dengan graceful +func (c *Client) gracefulClose() { + c.mu.Lock() + c.isActive = false + c.mu.Unlock() + + // Kirim close message + c.Conn.SetWriteDeadline(time.Now().Add(WriteTimeout)) + c.Conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + + // Cancel context + c.cancel() + + logger.Info(fmt.Sprintf("Client %s closed gracefully", c.ID)) +} +func (c *Client) sendDirectResponse(messageType string, data interface{}) { + response := WebSocketMessage{ + Type: messageType, + Data: data, + Timestamp: time.Now(), + MessageID: uuid.New().String(), + } + + select { + case c.Send <- response: + default: + logger.Error("Failed to send direct response to client") + } +} + +func (c *Client) sendErrorResponse(error, details string) { + c.sendDirectResponse("error", map[string]interface{}{ + "error": error, + "details": details, + }) +} + func (c *Client) writePump() { - ticker := time.NewTicker(54 * time.Second) + ticker := time.NewTicker(PingInterval) defer func() { ticker.Stop() c.Conn.Close() + logger.Info(fmt.Sprintf("Client %s writePump terminated", c.ID)) }() for { select { case message, ok := <-c.Send: - c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + c.Conn.SetWriteDeadline(time.Now().Add(WriteTimeout)) if !ok { c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) return } if err := c.Conn.WriteJSON(message); err != nil { - logger.Error(fmt.Sprintf("Failed to write message: %v", err)) + logger.Error(fmt.Sprintf("Failed to write message to client %s: %v", c.ID, err)) return } case <-ticker.C: - c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) - if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { + // Kirim ping dan cek apakah client masih responsif + if err := c.sendPing(); err != nil { + logger.Error(fmt.Sprintf("Failed to send ping to client %s: %v", c.ID, err)) return } + + // Cek apakah client sudah terlalu lama tidak merespons pong + if c.isPongTimeout() { + logger.Warn(fmt.Sprintf("Client %s pong timeout, disconnecting", c.ID)) + return + } + + case <-c.ctx.Done(): + logger.Info(fmt.Sprintf("Client %s writePump context cancelled", c.ID)) + return } } } -// BroadcastMessage broadcasts a message to all connected clients +// Broadcast methods func (h *WebSocketHandler) BroadcastMessage(messageType string, data interface{}) { msg := WebSocketMessage{ Type: messageType, Data: data, Timestamp: time.Now(), + MessageID: uuid.New().String(), } - h.hub.broadcast <- msg -} - -// NotifyDataChange sends a notification message to all clients when data changes in the database -func (h *WebSocketHandler) NotifyDataChange(data interface{}) { - msg := WebSocketMessage{ - Type: "data_change", - Data: data, - Timestamp: time.Now(), + select { + case h.hub.messageQueue <- msg: + default: + logger.Error("Message queue full, dropping message") } - - h.hub.broadcast <- msg } -// BroadcastToRoom broadcasts a message to clients in a specific room func (h *WebSocketHandler) BroadcastToRoom(room string, messageType string, data interface{}) { msg := WebSocketMessage{ Type: messageType, @@ -332,35 +1243,379 @@ func (h *WebSocketHandler) BroadcastToRoom(room string, messageType string, data "data": data, }, Timestamp: time.Now(), + MessageID: uuid.New().String(), } - h.hub.broadcast <- msg + select { + case h.hub.messageQueue <- msg: + default: + logger.Error("Message queue full, dropping room message") + } } -// SendToClient sends a message to a specific client func (h *WebSocketHandler) SendToClient(clientID string, messageType string, data interface{}) { msg := WebSocketMessage{ Type: messageType, Data: data, Timestamp: time.Now(), ClientID: clientID, + MessageID: uuid.New().String(), } - h.hub.broadcast <- msg + select { + case h.hub.messageQueue <- msg: + default: + logger.Error("Message queue full, dropping client message") + } } -// GetConnectedClients returns the number of connected clients func (h *WebSocketHandler) GetConnectedClients() int { + h.hub.mu.RLock() + defer h.hub.mu.RUnlock() return len(h.hub.clients) } -// GetRoomClients returns the number of clients in a specific room func (h *WebSocketHandler) GetRoomClients(room string) int { h.hub.mu.RLock() defer h.hub.mu.RUnlock() + if roomClients, exists := h.hub.rooms[room]; exists { + return len(roomClients) + } + return 0 +} + +func (h *WebSocketHandler) Shutdown() { + h.hub.cancel() + + h.hub.mu.RLock() + for client := range h.hub.clients { + client.cancel() + } + h.hub.mu.RUnlock() +} + +// 1. GetDetailedStats - Mengembalikan statistik detail +func (h *WebSocketHandler) GetDetailedStats() DetailedStats { + h.hub.mu.RLock() + + // Calculate IP distribution + ipDistribution := make(map[string]int) + for ip, clients := range h.hub.clientsByIP { + ipDistribution[ip] = len(clients) + } + + // Calculate room distribution + roomDistribution := make(map[string]int) + for room, clients := range h.hub.rooms { + roomDistribution[room] = len(clients) + } + + stats := DetailedStats{ + ConnectedClients: len(h.hub.clients), + UniqueIPs: len(h.hub.clientsByIP), + StaticClients: len(h.hub.clientsByStatic), + ActiveRooms: len(h.hub.rooms), + IPDistribution: ipDistribution, + RoomDistribution: roomDistribution, + MessageQueueSize: len(h.hub.messageQueue), + QueueWorkers: h.hub.queueWorkers, + Uptime: time.Since(h.hub.startTime), + Timestamp: time.Now().Unix(), + } + + h.hub.mu.RUnlock() + + return stats +} + +// 2. GetAllClients - Mengembalikan semua client yang terhubung +func (h *WebSocketHandler) GetAllClients() []ClientInfo { + h.hub.mu.RLock() + defer h.hub.mu.RUnlock() + + var clients []ClientInfo + + for client := range h.hub.clients { + clientInfo := ClientInfo{ + ID: client.ID, + StaticID: client.StaticID, + IPAddress: client.IPAddress, + UserID: client.UserID, + Room: client.Room, + ConnectedAt: client.connectedAt, // Perbaikan: gunakan connectedAt bukan ConnectedAt + LastPing: client.lastPing, // Perbaikan: gunakan lastPing bukan LastPing + } + clients = append(clients, clientInfo) + } + + return clients +} + +// 3. GetAllRooms - Mengembalikan semua room dan anggotanya +func (h *WebSocketHandler) GetAllRooms() map[string][]ClientInfo { + h.hub.mu.RLock() + defer h.hub.mu.RUnlock() + + rooms := make(map[string][]ClientInfo) + + for roomName, clients := range h.hub.rooms { + var roomClients []ClientInfo + + for client := range clients { + clientInfo := ClientInfo{ + ID: client.ID, + StaticID: client.StaticID, + IPAddress: client.IPAddress, + UserID: client.UserID, + Room: client.Room, + ConnectedAt: client.connectedAt, + LastPing: client.lastPing, + } + roomClients = append(roomClients, clientInfo) + } + + rooms[roomName] = roomClients + } + + return rooms +} + +// 4. GetMonitoringData - Mengembalikan data monitoring lengkap +func (h *WebSocketHandler) GetMonitoringData() MonitoringData { + stats := h.GetDetailedStats() + + h.hub.activityMu.RLock() + recentActivity := make([]ActivityLog, 0) + // Get last 100 activities + start := len(h.hub.activityLog) - 100 + if start < 0 { + start = 0 + } + for i := start; i < len(h.hub.activityLog); i++ { + recentActivity = append(recentActivity, h.hub.activityLog[i]) + } + h.hub.activityMu.RUnlock() + + // Get system health from database service + systemHealth := make(map[string]interface{}) + if h.dbService != nil { + systemHealth["databases"] = h.dbService.Health() + systemHealth["available_dbs"] = h.dbService.ListDBs() + } + systemHealth["websocket_status"] = "healthy" + systemHealth["uptime_seconds"] = time.Since(h.hub.startTime).Seconds() + + // Calculate performance metrics + uptime := time.Since(h.hub.startTime) + var messagesPerSecond float64 + var errorRate float64 + + if uptime.Seconds() > 0 { + messagesPerSecond = float64(h.hub.messageCount) / uptime.Seconds() + } + + if h.hub.messageCount > 0 { + errorRate = (float64(h.hub.errorCount) / float64(h.hub.messageCount)) * 100 + } + + performance := PerformanceMetrics{ + MessagesPerSecond: messagesPerSecond, + AverageLatency: 2.5, // Mock value - implement actual latency tracking + ErrorRate: errorRate, + MemoryUsage: 0, // Mock value - implement actual memory tracking + } + + return MonitoringData{ + Stats: stats, + RecentActivity: recentActivity, + SystemHealth: systemHealth, + Performance: performance, + } +} +func (h *WebSocketHandler) GetRoomClientCount(room string) int { + h.hub.mu.RLock() + defer h.hub.mu.RUnlock() if roomClients, exists := h.hub.rooms[room]; exists { return len(roomClients) } return 0 } +func (h *WebSocketHandler) GetActiveClients(olderThan time.Duration) []ClientInfo { + h.hub.mu.RLock() + defer h.hub.mu.RUnlock() + + var activeClients []ClientInfo + cutoff := time.Now().Add(-olderThan) + + for client := range h.hub.clients { + if client.lastPing.After(cutoff) { + activeClients = append(activeClients, ClientInfo{ + ID: client.ID, + StaticID: client.StaticID, + IPAddress: client.IPAddress, + UserID: client.UserID, + Room: client.Room, + ConnectedAt: client.connectedAt, + LastPing: client.lastPing, + }) + } + } + + return activeClients +} +func (h *WebSocketHandler) CleanupInactiveClients(inactiveTimeout time.Duration) int { + h.hub.mu.RLock() + var inactiveClients []*Client + cutoff := time.Now().Add(-inactiveTimeout) + + for client := range h.hub.clients { + if client.lastPing.Before(cutoff) { + inactiveClients = append(inactiveClients, client) + } + } + h.hub.mu.RUnlock() + + // Disconnect inactive clients + for _, client := range inactiveClients { + h.hub.logActivity("cleanup_disconnect", client.ID, + fmt.Sprintf("Inactive for %v", time.Since(client.lastPing))) + client.cancel() + client.Conn.Close() + } + + return len(inactiveClients) +} + +// 5. DisconnectClient - Memutus koneksi client tertentu +func (h *WebSocketHandler) DisconnectClient(clientID string) bool { + h.hub.mu.RLock() + client, exists := h.hub.clientsByID[clientID] + h.hub.mu.RUnlock() + + if !exists { + return false + } + + // Log activity + h.hub.logActivity("force_disconnect", clientID, "Client disconnected by admin") + + // Cancel context and close connection + client.cancel() + client.Conn.Close() + + // The client will be automatically removed from hub in the Run() loop + return true +} + +func (h *WebSocketHandler) StartConnectionMonitor() { + ticker := time.NewTicker(2 * time.Minute) // Check setiap 2 menit + defer ticker.Stop() + + for { + select { + case <-ticker.C: + h.cleanupInactiveConnections() + h.logConnectionStats() + + case <-h.hub.ctx.Done(): + return + } + } +} + +// cleanupInactiveConnections - Bersihkan koneksi yang tidak aktif +func (h *WebSocketHandler) cleanupInactiveConnections() { + h.hub.mu.RLock() + var inactiveClients []*Client + + for client := range h.hub.clients { + if !client.isClientActive() { + inactiveClients = append(inactiveClients, client) + } + } + h.hub.mu.RUnlock() + + // Disconnect inactive clients + for _, client := range inactiveClients { + h.hub.logActivity("cleanup_disconnect", client.ID, + fmt.Sprintf("Inactive for %v", time.Since(client.lastPing))) + client.gracefulClose() + } + + if len(inactiveClients) > 0 { + logger.Info(fmt.Sprintf("Cleaned up %d inactive connections", len(inactiveClients))) + } +} + +// logConnectionStats - Log statistik koneksi +func (h *WebSocketHandler) logConnectionStats() { + stats := h.GetDetailedStats() + logger.Info(fmt.Sprintf("WebSocket Stats - Clients: %d, IPs: %d, Rooms: %d, Queue: %d", + stats.ConnectedClients, stats.UniqueIPs, stats.ActiveRooms, stats.MessageQueueSize)) +} + +func (h *Hub) logActivity(event, clientID, details string) { + h.activityMu.Lock() + defer h.activityMu.Unlock() + + activity := ActivityLog{ + Timestamp: time.Now(), + Event: event, + ClientID: clientID, + Details: details, + } + + h.activityLog = append(h.activityLog, activity) + + // Keep only last 1000 activities + if len(h.activityLog) > 1000 { + h.activityLog = h.activityLog[1:] + } +} + +// Example Database Use Triger +// -- Trigger function untuk notifikasi perubahan data +// CREATE OR REPLACE FUNCTION notify_data_change() RETURNS trigger AS $$ +// DECLARE +// channel text := 'retribusi_changes'; +// payload json; +// BEGIN +// -- Tentukan channel berdasarkan table +// IF TG_TABLE_NAME = 'retribusi' THEN +// channel := 'retribusi_changes'; +// ELSIF TG_TABLE_NAME = 'peserta' THEN +// channel := 'peserta_changes'; +// END IF; + +// -- Buat payload +// IF TG_OP = 'DELETE' THEN +// payload = json_build_object( +// 'operation', TG_OP, +// 'table', TG_TABLE_NAME, +// 'data', row_to_json(OLD) +// ); +// ELSE +// payload = json_build_object( +// 'operation', TG_OP, +// 'table', TG_TABLE_NAME, +// 'data', row_to_json(NEW) +// ); +// END IF; + +// -- Kirim notifikasi +// PERFORM pg_notify(channel, payload::text); + +// RETURN COALESCE(NEW, OLD); +// END; +// $$ LANGUAGE plpgsql; + +// -- Trigger untuk table retribusi +// CREATE TRIGGER retribusi_notify_trigger +// AFTER INSERT OR UPDATE OR DELETE ON retribusi +// FOR EACH ROW EXECUTE FUNCTION notify_data_change(); + +// -- Trigger untuk table peserta +// CREATE TRIGGER peserta_notify_trigger +// AFTER INSERT OR UPDATE OR DELETE ON peserta +// FOR EACH ROW EXECUTE FUNCTION notify_data_change(); diff --git a/internal/routes/v1/routes.go b/internal/routes/v1/routes.go index d2fa4948..98f49255 100644 --- a/internal/routes/v1/routes.go +++ b/internal/routes/v1/routes.go @@ -7,10 +7,14 @@ import ( healthcheckHandlers "api-service/internal/handlers/healthcheck" pesertaHandlers "api-service/internal/handlers/peserta" retribusiHandlers "api-service/internal/handlers/retribusi" + "api-service/internal/handlers/websocket" websocketHandlers "api-service/internal/handlers/websocket" "api-service/internal/middleware" services "api-service/internal/services/auth" "api-service/pkg/logger" + "encoding/json" + "strconv" + "time" "github.com/gin-gonic/gin" "github.com/go-playground/validator/v10" @@ -36,30 +40,61 @@ func RegisterRoutes(cfg *config.Config) *gin.Engine { logger.Fatal("Failed to initialize auth service") } - // Initialize database service for health check + // Initialize database service dbService := database.New(cfg) - // Health check endpoint + // Initialize WebSocket handler with enhanced features + websocketHandler := websocketHandlers.NewWebSocketHandler(cfg, dbService) + + // ============================================================================= + // HEALTH CHECK & SYSTEM ROUTES + // ============================================================================= + healthCheckHandler := healthcheckHandlers.NewHealthCheckHandler(dbService) sistem := router.Group("/api/sistem") - sistem.GET("/health", healthCheckHandler.CheckHealth) + { + sistem.GET("/health", healthCheckHandler.CheckHealth) + sistem.GET("/databases", func(c *gin.Context) { + c.JSON(200, gin.H{ + "databases": dbService.ListDBs(), + "health": dbService.Health(), + "timestamp": time.Now().Unix(), + }) + }) + sistem.GET("/info", func(c *gin.Context) { + c.JSON(200, gin.H{ + "service": "API Service v1.0.0", + "websocket_active": true, + "connected_clients": websocketHandler.GetConnectedClients(), + "databases": dbService.ListDBs(), + "timestamp": time.Now().Unix(), + }) + }) + } + + // ============================================================================= + // SWAGGER DOCUMENTATION + // ============================================================================= - // Swagger UI route router.GET("/swagger/*any", ginSwagger.WrapHandler( - swaggerFiles.Handler, // Models configuration - ginSwagger.DefaultModelsExpandDepth(-1), // Hide models completely - // ginSwagger.DefaultModelExpandDepth(0), // Keep individual models collapsed - - // General UI configuration - // ginSwagger.DocExpansion("none"), // Collapse all sections - ginSwagger.DeepLinking(true), // Enable deep linking - // ginSwagger.PersistAuthorization(true), // Persist auth between refreshes - - // // Optional: Custom title - // ginSwagger.InstanceName("API Service v1.0.0"), + swaggerFiles.Handler, + ginSwagger.DefaultModelsExpandDepth(-1), + ginSwagger.DeepLinking(true), )) - // API v1 group + // ============================================================================= + // WEBSOCKET TEST CLIENT + // ============================================================================= + + // router.GET("/websocket-test", func(c *gin.Context) { + // c.Header("Content-Type", "text/html") + // c.String(http.StatusOK, getWebSocketTestHTML()) + // }) + + // ============================================================================= + // API v1 GROUP + // ============================================================================= + v1 := router.Group("/api/v1") // ============================================================================= @@ -75,15 +110,566 @@ func RegisterRoutes(cfg *config.Config) *gin.Engine { v1.POST("/auth/register", authHandler.Register) v1.POST("/auth/refresh", authHandler.RefreshToken) - // Token generation routes (keep public if needed) + // Token generation routes v1.POST("/token/generate", tokenHandler.GenerateToken) v1.POST("/token/generate-direct", tokenHandler.GenerateTokenDirect) - // WebSocket endpoint - websocketHandler := websocketHandlers.NewWebSocketHandler() + // ============================================================================= + // WEBSOCKET ROUTES + // ============================================================================= + + // Main WebSocket endpoint with enhanced features v1.GET("/ws", websocketHandler.HandleWebSocket) - // ============= PUBLISHED ROUTES =============================================== + // WebSocket management API + wsAPI := router.Group("/api/websocket") + { + // ============================================================================= + // BASIC BROADCASTING + // ============================================================================= + + wsAPI.POST("/broadcast", func(c *gin.Context) { + var req struct { + Type string `json:"type"` + Message interface{} `json:"message"` + Database string `json:"database,omitempty"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + + websocketHandler.BroadcastMessage(req.Type, req.Message) + c.JSON(200, gin.H{ + "status": "broadcast sent", + "clients_count": websocketHandler.GetConnectedClients(), + "timestamp": time.Now().Unix(), + }) + }) + + wsAPI.POST("/broadcast/room/:room", func(c *gin.Context) { + room := c.Param("room") + var req struct { + Type string `json:"type"` + Message interface{} `json:"message"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + + websocketHandler.BroadcastToRoom(room, req.Type, req.Message) + c.JSON(200, gin.H{ + "status": "room broadcast sent", + "room": room, + "clients_count": websocketHandler.GetRoomClientCount(room), // Fix: gunakan GetRoomClientCount + "timestamp": time.Now().Unix(), + }) + }) + + // ============================================================================= + // ENHANCED CLIENT TARGETING + // ============================================================================= + + wsAPI.POST("/send/:clientId", func(c *gin.Context) { + clientID := c.Param("clientId") + var req struct { + Type string `json:"type"` + Message interface{} `json:"message"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + + websocketHandler.SendToClient(clientID, req.Type, req.Message) + c.JSON(200, gin.H{ + "status": "message sent", + "client_id": clientID, + "timestamp": time.Now().Unix(), + }) + }) + + // Send to client by static ID + wsAPI.POST("/send/static/:staticId", func(c *gin.Context) { + staticID := c.Param("staticId") + logger.Infof("Sending message to static client: %s", staticID) + var req struct { + Type string `json:"type"` + Message interface{} `json:"message"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + + success := websocketHandler.SendToClientByStaticID(staticID, req.Type, req.Message) + if success { + c.JSON(200, gin.H{ + "status": "message sent to static client", + "static_id": staticID, + "timestamp": time.Now().Unix(), + }) + } else { + c.JSON(404, gin.H{ + "error": "static client not found", + "static_id": staticID, + "timestamp": time.Now().Unix(), + }) + } + }) + + // Broadcast to all clients from specific IP + wsAPI.POST("/broadcast/ip/:ipAddress", func(c *gin.Context) { + ipAddress := c.Param("ipAddress") + var req struct { + Type string `json:"type"` + Message interface{} `json:"message"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + + count := websocketHandler.BroadcastToIP(ipAddress, req.Type, req.Message) + c.JSON(200, gin.H{ + "status": "ip broadcast sent", + "ip_address": ipAddress, + "clients_count": count, + "timestamp": time.Now().Unix(), + }) + }) + + // ============================================================================= + // CLIENT INFORMATION & STATISTICS + // ============================================================================= + + wsAPI.GET("/stats", func(c *gin.Context) { + c.JSON(200, gin.H{ + "connected_clients": websocketHandler.GetConnectedClients(), + "databases": dbService.ListDBs(), + "database_health": dbService.Health(), + "timestamp": time.Now().Unix(), + }) + }) + + wsAPI.GET("/stats/detailed", func(c *gin.Context) { + stats := websocketHandler.GetDetailedStats() + c.JSON(200, gin.H{ + "stats": stats, + "timestamp": time.Now().Unix(), + }) + }) + + wsAPI.GET("/clients", func(c *gin.Context) { + clients := websocketHandler.GetAllClients() + c.JSON(200, gin.H{ + "clients": clients, + "count": len(clients), + "timestamp": time.Now().Unix(), + }) + }) + + // Fix: Perbaiki GetClientsByIP untuk menggunakan ClientInfo + wsAPI.GET("/clients/by-ip/:ipAddress", func(c *gin.Context) { + ipAddress := c.Param("ipAddress") + client := websocketHandler.GetClientsByIP(ipAddress) + if client == nil { + c.JSON(404, gin.H{ + "error": "client not found", + "ip_address": ipAddress, + "timestamp": time.Now().Unix(), + }) + return + } + + // Use ClientInfo struct instead of direct field access + clientInfo := websocketHandler.GetAllClients() + var targetClientInfo *websocket.ClientInfo + for i := range clientInfo { + if clientInfo[i].ID == ipAddress { + targetClientInfo = &clientInfo[i] + break + } + } + + if targetClientInfo == nil { + c.JSON(404, gin.H{ + "error": "ipAddress not found", + "client_id": ipAddress, + "timestamp": time.Now().Unix(), + }) + return + } + + c.JSON(200, gin.H{ + "client": map[string]interface{}{ + "id": targetClientInfo.ID, + "static_id": targetClientInfo.StaticID, + "ip_address": targetClientInfo.IPAddress, + "user_id": targetClientInfo.UserID, + "room": targetClientInfo.Room, + "connected_at": targetClientInfo.ConnectedAt.Unix(), // Fixed: use exported field + "last_ping": targetClientInfo.LastPing.Unix(), // Fixed: use exported field + }, + "timestamp": time.Now().Unix(), + }) + + }) + + // Fix: Perbaiki GetClientByID response + wsAPI.GET("/client/:clientId", func(c *gin.Context) { + clientID := c.Param("clientId") + client := websocketHandler.GetClientByID(clientID) + + if client == nil { + c.JSON(404, gin.H{ + "error": "client not found", + "client_id": clientID, + "timestamp": time.Now().Unix(), + }) + return + } + + // Use ClientInfo struct instead of direct field access + clientInfo := websocketHandler.GetAllClients() + var targetClientInfo *websocket.ClientInfo + for i := range clientInfo { + if clientInfo[i].ID == clientID { + targetClientInfo = &clientInfo[i] + break + } + } + + if targetClientInfo == nil { + c.JSON(404, gin.H{ + "error": "client not found", + "client_id": clientID, + "timestamp": time.Now().Unix(), + }) + return + } + + c.JSON(200, gin.H{ + "client": map[string]interface{}{ + "id": targetClientInfo.ID, + "static_id": targetClientInfo.StaticID, + "ip_address": targetClientInfo.IPAddress, + "user_id": targetClientInfo.UserID, + "room": targetClientInfo.Room, + "connected_at": targetClientInfo.ConnectedAt.Unix(), // Fixed: use exported field + "last_ping": targetClientInfo.LastPing.Unix(), // Fixed: use exported field + }, + "timestamp": time.Now().Unix(), + }) + }) + + // Fix: Perbaiki GetClientByStaticID response + wsAPI.GET("/client/static/:staticId", func(c *gin.Context) { + staticID := c.Param("staticId") + client := websocketHandler.GetClientByStaticID(staticID) + + if client == nil { + c.JSON(404, gin.H{ + "error": "static client not found", + "static_id": staticID, + "timestamp": time.Now().Unix(), + }) + return + } + + // Use ClientInfo struct instead of direct field access + clientInfo := websocketHandler.GetAllClients() + var targetClientInfo *websocket.ClientInfo + for i := range clientInfo { + if clientInfo[i].StaticID == staticID { + targetClientInfo = &clientInfo[i] + break + } + } + + if targetClientInfo == nil { + c.JSON(404, gin.H{ + "error": "static client not found", + "static_id": staticID, + "timestamp": time.Now().Unix(), + }) + return + } + + c.JSON(200, gin.H{ + "client": map[string]interface{}{ + "id": targetClientInfo.ID, + "static_id": targetClientInfo.StaticID, + "ip_address": targetClientInfo.IPAddress, + "user_id": targetClientInfo.UserID, + "room": targetClientInfo.Room, + "connected_at": targetClientInfo.ConnectedAt.Unix(), // Fixed: use exported field + "last_ping": targetClientInfo.LastPing.Unix(), // Fixed: use exported field + }, + "timestamp": time.Now().Unix(), + }) + }) + + // ============================================================================= + // ACTIVE CLIENTS & CLEANUP + // ============================================================================= + + // Tambahkan endpoint untuk active clients + wsAPI.GET("/clients/active", func(c *gin.Context) { + // Default: clients active dalam 5 menit terakhir + minutes := c.DefaultQuery("minutes", "5") + minutesInt, err := strconv.Atoi(minutes) + if err != nil { + minutesInt = 5 + } + + activeClients := websocketHandler.GetActiveClients(time.Duration(minutesInt) * time.Minute) + c.JSON(200, gin.H{ + "active_clients": activeClients, + "count": len(activeClients), + "threshold_minutes": minutesInt, + "timestamp": time.Now().Unix(), + }) + }) + + // Tambahkan endpoint untuk cleanup inactive clients + wsAPI.POST("/cleanup/inactive", func(c *gin.Context) { + var req struct { + InactiveMinutes int `json:"inactive_minutes"` + } + if err := c.ShouldBindJSON(&req); err != nil { + req.InactiveMinutes = 30 // Default 30 minutes + } + + if req.InactiveMinutes <= 0 { + req.InactiveMinutes = 30 + } + + cleanedCount := websocketHandler.CleanupInactiveClients(time.Duration(req.InactiveMinutes) * time.Minute) + c.JSON(200, gin.H{ + "status": "cleanup completed", + "cleaned_clients": cleanedCount, + "inactive_minutes": req.InactiveMinutes, + "timestamp": time.Now().Unix(), + }) + }) + + // ============================================================================= + // DATABASE NOTIFICATIONS + // ============================================================================= + + wsAPI.POST("/notify/:database/:channel", func(c *gin.Context) { + database := c.Param("database") + channel := c.Param("channel") + + var req struct { + Payload interface{} `json:"payload"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + + payloadJSON, _ := json.Marshal(req.Payload) + err := dbService.NotifyChange(database, channel, string(payloadJSON)) + if err != nil { + c.JSON(500, gin.H{ + "error": err.Error(), + "database": database, + "channel": channel, + "timestamp": time.Now().Unix(), + }) + return + } + + c.JSON(200, gin.H{ + "status": "notification sent", + "database": database, + "channel": channel, + "timestamp": time.Now().Unix(), + }) + }) + + // Test database notification + wsAPI.POST("/test-notification", func(c *gin.Context) { + var req struct { + Database string `json:"database"` + Channel string `json:"channel"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + + // Default values + if req.Database == "" { + req.Database = "default" + } + if req.Channel == "" { + req.Channel = "system_changes" + } + if req.Message == "" { + req.Message = "Test notification from API" + } + + payload := map[string]interface{}{ + "operation": "API_TEST", + "table": "manual_test", + "data": map[string]interface{}{ + "message": req.Message, + "test_data": req.Data, + "timestamp": time.Now().Unix(), + }, + } + + payloadJSON, _ := json.Marshal(payload) + err := dbService.NotifyChange(req.Database, req.Channel, string(payloadJSON)) + if err != nil { + c.JSON(500, gin.H{"error": err.Error()}) + return + } + + c.JSON(200, gin.H{ + "status": "test notification sent", + "database": req.Database, + "channel": req.Channel, + "payload": payload, + "timestamp": time.Now().Unix(), + }) + }) + + // ============================================================================= + // ROOM MANAGEMENT + // ============================================================================= + + wsAPI.GET("/rooms", func(c *gin.Context) { + rooms := websocketHandler.GetAllRooms() + c.JSON(200, gin.H{ + "rooms": rooms, + "count": len(rooms), + "timestamp": time.Now().Unix(), + }) + }) + + wsAPI.GET("/room/:room/clients", func(c *gin.Context) { + room := c.Param("room") + clientCount := websocketHandler.GetRoomClientCount(room) + + // Get detailed room info + allRooms := websocketHandler.GetAllRooms() + roomClients := allRooms[room] + + c.JSON(200, gin.H{ + "room": room, + "client_count": clientCount, + "clients": roomClients, + "timestamp": time.Now().Unix(), + }) + }) + + // ============================================================================= + // MONITORING & DEBUGGING + // ============================================================================= + + wsAPI.GET("/monitor", func(c *gin.Context) { + monitor := websocketHandler.GetMonitoringData() + c.JSON(200, monitor) + }) + + wsAPI.POST("/ping-client/:clientId", func(c *gin.Context) { + clientID := c.Param("clientId") + websocketHandler.SendToClient(clientID, "server_ping", map[string]interface{}{ + "message": "Ping from server", + "timestamp": time.Now().Unix(), + }) + c.JSON(200, gin.H{ + "status": "ping sent", + "client_id": clientID, + "timestamp": time.Now().Unix(), + }) + }) + + // Disconnect specific client + wsAPI.POST("/disconnect/:clientId", func(c *gin.Context) { + clientID := c.Param("clientId") + success := websocketHandler.DisconnectClient(clientID) + if success { + c.JSON(200, gin.H{ + "status": "client disconnected", + "client_id": clientID, + "timestamp": time.Now().Unix(), + }) + } else { + c.JSON(404, gin.H{ + "error": "client not found", + "client_id": clientID, + "timestamp": time.Now().Unix(), + }) + } + }) + + // ============================================================================= + // BULK OPERATIONS + // ============================================================================= + + // Broadcast to multiple clients + wsAPI.POST("/broadcast/bulk", func(c *gin.Context) { + var req struct { + ClientIDs []string `json:"client_ids"` + Type string `json:"type"` + Message interface{} `json:"message"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + + successCount := 0 + for _, clientID := range req.ClientIDs { + websocketHandler.SendToClient(clientID, req.Type, req.Message) + successCount++ + } + + c.JSON(200, gin.H{ + "status": "bulk broadcast sent", + "total_clients": len(req.ClientIDs), + "success_count": successCount, + "timestamp": time.Now().Unix(), + }) + }) + + // Disconnect multiple clients + wsAPI.POST("/disconnect/bulk", func(c *gin.Context) { + var req struct { + ClientIDs []string `json:"client_ids"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": err.Error()}) + return + } + + successCount := 0 + for _, clientID := range req.ClientIDs { + if websocketHandler.DisconnectClient(clientID) { + successCount++ + } + } + + c.JSON(200, gin.H{ + "status": "bulk disconnect completed", + "total_clients": len(req.ClientIDs), + "success_count": successCount, + "timestamp": time.Now().Unix(), + }) + }) + } + + // ============================================================================= + // PUBLISHED ROUTES + // ============================================================================= // Participant eligibility information (peserta) routes pesertaHandler := pesertaHandlers.NewPesertaHandler(pesertaHandlers.PesertaHandlerConfig{ @@ -95,86 +681,106 @@ func RegisterRoutes(cfg *config.Config) *gin.Engine { pesertaGroup.GET("/nokartu/:nokartu", pesertaHandler.GetBynokartu) pesertaGroup.GET("/nik/:nik", pesertaHandler.GetBynik) - // // Rujukan management endpoints (rujukan) routes - // rujukanHandler := rujukanHandlers.NewRujukanHandler(rujukanHandlers.RujukanHandlerConfig{ - // BpjsConfig: cfg.Bpjs, - // Logger: *logger.Default(), - // Validator: validator.New(), - // }) - // rujukanGroup := v1.Group("/rujukan") - // rujukanGroup.POST("/Rujukan/:norujukan", rujukanHandler.CreateRujukan) - // rujukanGroup.PUT("/Rujukan/:norujukan", rujukanHandler.UpdateRujukan) - // rujukanGroup.DELETE("/Rujukan/:norujukan", rujukanHandler.DeleteRujukan) - // rujukanGroup.POST("/Rujukanbalik/:norujukan", rujukanHandler.CreateRujukanbalik) - // rujukanGroup.PUT("/Rujukanbalik/:norujukan", rujukanHandler.UpdateRujukanbalik) - // rujukanGroup.DELETE("/Rujukanbalik/:norujukan", rujukanHandler.DeleteRujukanbalik) - - // // Search for rujukan endpoints (search) routes - // searchHandler := rujukanHandlers.NewSearchHandler(rujukanHandlers.SearchHandlerConfig{ - // BpjsConfig: cfg.Bpjs, - // Logger: *logger.Default(), - // Validator: validator.New(), - // }) - // searchGroup := v1.Group("/search") - // searchGroup.GET("/bynorujukan/:norujukan", searchHandler.GetBynorujukan) - // searchGroup.GET("/bynokartu/:nokartu", searchHandler.GetBynokartu) - - // // Retribusi endpoints + // Retribusi endpoints with WebSocket notifications retribusiHandler := retribusiHandlers.NewRetribusiHandler() retribusiGroup := v1.Group("/retribusi") { retribusiGroup.GET("", retribusiHandler.GetRetribusi) - retribusiGroup.GET("/dynamic", retribusiHandler.GetRetribusiDynamic) // Route baru - retribusiGroup.GET("/search", retribusiHandler.SearchRetribusiAdvanced) // Route pencarian + retribusiGroup.GET("/dynamic", retribusiHandler.GetRetribusiDynamic) + retribusiGroup.GET("/search", retribusiHandler.SearchRetribusiAdvanced) retribusiGroup.GET("/id/:id", retribusiHandler.GetRetribusiByID) - retribusiGroup.POST("", retribusiHandler.CreateRetribusi) - retribusiGroup.PUT("/id/:id", retribusiHandler.UpdateRetribusi) - retribusiGroup.DELETE("/id/:id", retribusiHandler.DeleteRetribusi) + + // POST/PUT/DELETE with automatic WebSocket notifications + retribusiGroup.POST("", func(c *gin.Context) { + retribusiHandler.CreateRetribusi(c) + + // Trigger WebSocket notification after successful creation + if c.Writer.Status() == 200 || c.Writer.Status() == 201 { + websocketHandler.BroadcastMessage("retribusi_created", map[string]interface{}{ + "message": "New retribusi record created", + "timestamp": time.Now().Unix(), + }) + } + }) + + retribusiGroup.PUT("/id/:id", func(c *gin.Context) { + id := c.Param("id") + retribusiHandler.UpdateRetribusi(c) + + // Trigger WebSocket notification after successful update + if c.Writer.Status() == 200 { + websocketHandler.BroadcastMessage("retribusi_updated", map[string]interface{}{ + "message": "Retribusi record updated", + "id": id, + "timestamp": time.Now().Unix(), + }) + } + }) + + retribusiGroup.DELETE("/id/:id", func(c *gin.Context) { + id := c.Param("id") + retribusiHandler.DeleteRetribusi(c) + + // Trigger WebSocket notification after successful deletion + if c.Writer.Status() == 200 { + websocketHandler.BroadcastMessage("retribusi_deleted", map[string]interface{}{ + "message": "Retribusi record deleted", + "id": id, + "timestamp": time.Now().Unix(), + }) + } + }) } + // ============================================================================= // PROTECTED ROUTES (Authentication Required) // ============================================================================= - // Create protected group with configurable authentication protected := v1.Group("/") - protected.Use(middleware.ConfigurableAuthMiddleware(cfg)) // Use configurable authentication + protected.Use(middleware.ConfigurableAuthMiddleware(cfg)) - // User profile (protected) - // protected.GET("/auth/me", authHandler.Me) + // Protected WebSocket management (optional) + protectedWS := protected.Group("/ws-admin") + { + protectedWS.GET("/stats", func(c *gin.Context) { + detailedStats := websocketHandler.GetDetailedStats() + c.JSON(200, gin.H{ + "admin_stats": detailedStats, + "timestamp": time.Now().Unix(), + }) + }) - // // Retribusi endpoints (CRUD operations - should be protected) - // retribusiHandler := retribusiHandlers.NewRetribusiHandler() - // protectedRetribusi := protected.Group("/retribusi") - // { - // protectedRetribusi.GET("", retribusiHandler.GetRetribusi) // GET /api/v1/retribusi - // protectedRetribusi.GET("/:id", retribusiHandler.GetRetribusiByID) // GET /api/v1/retribusi/:id - // protectedRetribusi.POST("/", retribusiHandler.CreateRetribusi) // POST /api/v1/retribusi/ - // protectedRetribusi.PUT("/:id", retribusiHandler.UpdateRetribusi) // PUT /api/v1/retribusi/:id - // protectedRetribusi.DELETE("/:id", retribusiHandler.DeleteRetribusi) // DELETE /api/v1/retribusi/:id - // } + protectedWS.POST("/force-disconnect/:clientId", func(c *gin.Context) { + clientID := c.Param("clientId") + success := websocketHandler.DisconnectClient(clientID) + c.JSON(200, gin.H{ + "status": "force disconnect attempted", + "client_id": clientID, + "success": success, + "timestamp": time.Now().Unix(), + }) + }) - // // BPJS VClaim endpoints (require authentication) - // // Peserta routes - // pesertaHandler := peserta.NewVClaimHandler(peserta.VClaimHandlerConfig{ - // BpjsConfig: cfg.Bpjs, - // Logger: *logger.Default(), - // Validator: nil, - // }) - // protectedPeserta := protected.Group("/peserta") - // protectedPeserta.GET("/peserta/:nokartu", pesertaHandler.GetPesertaBynokartu) - // protectedPeserta.GET("/peserta/nik/:nik", pesertaHandler.GetPesertaBynik) + protectedWS.POST("/cleanup/force", func(c *gin.Context) { + var req struct { + InactiveMinutes int `json:"inactive_minutes"` + Force bool `json:"force"` + } + if err := c.ShouldBindJSON(&req); err != nil { + req.InactiveMinutes = 10 + req.Force = false + } - // // Sep routes - // sepHandler := sep.NewVClaimHandler(sep.VClaimHandlerConfig{ - // BpjsConfig: cfg.Bpjs, - // Logger: *logger.Default(), - // Validator: nil, - // }) - // protectedSep := protected.Group("/sep") - // protectedSep.GET("/sep/:nosep", sepHandler.GetSepSep) - // protectedSep.POST("/sep", sepHandler.CreateSepSep) - // protectedSep.PUT("/sep/:nosep", sepHandler.UpdateSepSep) - // protectedSep.DELETE("/sep/:nosep", sepHandler.DeleteSepSep) + cleanedCount := websocketHandler.CleanupInactiveClients(time.Duration(req.InactiveMinutes) * time.Minute) + c.JSON(200, gin.H{ + "status": "admin cleanup completed", + "cleaned_clients": cleanedCount, + "inactive_minutes": req.InactiveMinutes, + "force": req.Force, + "timestamp": time.Now().Unix(), + }) + }) + } return router }