package websocket import ( "api-service/internal/config" "context" "encoding/json" "fmt" "sync" "time" "github.com/google/uuid" "github.com/gorilla/websocket" ) // Client mewakili koneksi klien WebSocket type Client struct { // Identifikasi ID string StaticID string UserID string Room string // Koneksi IPAddress string Conn *websocket.Conn Send chan WebSocketMessage // Hub Hub *Hub // Context ctx context.Context cancel context.CancelFunc // Status lastPing time.Time lastPong time.Time connectedAt time.Time isActive bool // Sinkronisasi mu sync.RWMutex } // NewClient membuat klien baru func NewClient( id, staticID, userID, room, ipAddress string, conn *websocket.Conn, hub *Hub, config config.WebSocketConfig, ) *Client { ctx, cancel := context.WithCancel(hub.ctx) return &Client{ ID: id, StaticID: staticID, UserID: userID, Room: room, IPAddress: ipAddress, Conn: conn, Send: make(chan WebSocketMessage, config.ChannelBufferSize), Hub: hub, ctx: ctx, cancel: cancel, lastPing: time.Now(), connectedAt: time.Now(), isActive: true, } } // readPup menangani pembacaan pesan dari klien func (c *Client) readPump() { defer func() { c.Hub.unregister <- c c.Conn.Close() }() // Konfigurasi koneksi c.Conn.SetReadLimit(int64(c.Hub.config.MaxMessageSize)) c.resetReadDeadline() // Setup ping/pong handlers c.Conn.SetPingHandler(func(message string) error { c.resetReadDeadline() return c.Conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(c.Hub.config.WriteTimeout)) }) c.Conn.SetPongHandler(func(message string) error { c.mu.Lock() c.lastPong = time.Now() c.isActive = true c.mu.Unlock() c.resetReadDeadline() return nil }) for { select { case <-c.ctx.Done(): return default: _, message, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { c.Hub.errorCount++ } return } // Reset deadline setiap kali ada pesan masuk c.resetReadDeadline() c.updateLastActivity() // Parse pesan var msg WebSocketMessage if err := json.Unmarshal(message, &msg); err != nil { c.sendErrorResponse("Invalid message format", err.Error()) continue } // Set metadata pesan msg.Timestamp = time.Now() msg.ClientID = c.ID if msg.MessageID == "" { msg.MessageID = uuid.New().String() } // Proses pesan menggunakan registry c.handleMessage(msg) } } } // writePump menangani pengiriman pesan ke klien func (c *Client) writePump() { ticker := time.NewTicker(c.Hub.config.PingInterval) defer func() { ticker.Stop() c.Conn.Close() }() for { select { case message, ok := <-c.Send: c.Conn.SetWriteDeadline(time.Now().Add(c.Hub.config.WriteTimeout)) if !ok { c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) return } if err := c.Conn.WriteJSON(message); err != nil { c.Hub.errorCount++ return } case <-ticker.C: // Kirim ping dan periksa respons if err := c.sendPing(); err != nil { c.Hub.errorCount++ return } // Periksa timeout pong if c.isPongTimeout() { return } case <-c.ctx.Done(): return } } } // handleMessage memproses pesan masuk func (c *Client) handleMessage(msg WebSocketMessage) { // Dapatkan handler dari registry handler, exists := c.Hub.messageRegistry.GetHandler(msg.Type) if !exists { // Handler tidak ditemukan, gunakan handler default c.handleDefaultMessage(msg) return } // Jalankan handler if err := handler.HandleMessage(c, msg); err != nil { c.sendErrorResponse("Error handling message", err.Error()) c.Hub.errorCount++ } } // handleDefaultMessage menangani pesan tanpa handler khusus func (c *Client) handleDefaultMessage(msg WebSocketMessage) { switch msg.Type { case "broadcast": c.Hub.broadcast <- msg c.sendDirectResponse("broadcast_sent", "Message broadcasted to all clients") case "direct_message": c.handleDirectMessage(msg) case "ping": c.handlePing(msg) case "pong": // Pong sudah ditangani di level koneksi break case "heartbeat": c.handleHeartbeat(msg) case "connection_test": c.handleConnectionTest(msg) case "get_online_users": c.sendOnlineUsers() case "join_room": c.handleJoinRoom(msg) case "leave_room": c.handleLeaveRoom(msg) case "get_room_info": c.handleGetRoomInfo(msg) case "database_query", "db_query": c.handleDatabaseQuery(msg) case "db_insert": c.handleDatabaseInsert(msg) case "db_custom_query": c.handleDatabaseCustomQuery(msg) case "get_stats": c.handleGetStats(msg) case "get_server_stats": c.handleGetServerStats(msg) case "get_system_health": c.handleGetSystemHealth(msg) case "admin_kick_client": c.handleAdminKickClient(msg) case "admin_kill_server": c.handleAdminKillServer(msg) case "admin_clear_logs": c.handleAdminClearLogs(msg) default: c.sendDirectResponse("message_received", fmt.Sprintf("Message received: %v", msg.Data)) c.Hub.broadcast <- msg } } // Metode helper lainnya... func (c *Client) resetReadDeadline() { c.Conn.SetReadDeadline(time.Now().Add(c.Hub.config.ReadTimeout)) } func (c *Client) updateLastActivity() { c.mu.Lock() defer c.mu.Unlock() c.lastPing = time.Now() c.isActive = true } func (c *Client) sendPing() error { c.Conn.SetWriteDeadline(time.Now().Add(c.Hub.config.WriteTimeout)) if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { return err } c.mu.Lock() c.lastPing = time.Now() c.mu.Unlock() return nil } func (c *Client) isPongTimeout() bool { c.mu.RLock() defer c.mu.RUnlock() lastActivity := c.lastPong if lastActivity.IsZero() { lastActivity = c.lastPing } return time.Since(lastActivity) > c.Hub.config.PongTimeout } func (c *Client) isClientActive() bool { c.mu.RLock() defer c.mu.RUnlock() return c.isActive && time.Since(c.lastPing) < c.Hub.config.PongTimeout } func (c *Client) sendDirectResponse(messageType string, data interface{}) { response := NewWebSocketMessage(MessageType(messageType), data, c.ID, c.Room) select { case c.Send <- response: default: // Channel penuh, abaikan pesan } } func (c *Client) sendErrorResponse(error, details string) { c.sendDirectResponse("error", map[string]interface{}{ "error": error, "details": details, }) } func (c *Client) gracefulClose() { c.mu.Lock() c.isActive = false c.mu.Unlock() // Kirim pesan close c.Conn.SetWriteDeadline(time.Now().Add(c.Hub.config.WriteTimeout)) c.Conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) // Batalkan context c.cancel() } // ReadPump mengekspos metode readPump untuk handler func (c *Client) ReadPump() { c.readPump() } // WritePump mengekspos metode writePump untuk handler func (c *Client) WritePump() { c.writePump() } // handleDirectMessage menangani pesan direct message func (c *Client) handleDirectMessage(msg WebSocketMessage) { data, ok := msg.Data.(map[string]interface{}) if !ok { c.sendErrorResponse("Invalid direct message format", "Data must be an object") return } targetClientID, ok := data["target_client_id"].(string) if !ok || targetClientID == "" { c.sendErrorResponse("Invalid target client ID", "target_client_id is required") return } message, ok := data["message"].(string) if !ok || message == "" { c.sendErrorResponse("Invalid message content", "message is required") return } // Buat pesan direct message directMsg := NewWebSocketMessage(DirectMessage, map[string]interface{}{ "from_client_id": c.ID, "from_user_id": c.UserID, "message": message, "timestamp": msg.Timestamp, }, targetClientID, "") // Kirim ke target client select { case c.Hub.broadcast <- directMsg: c.sendDirectResponse("direct_message_sent", map[string]interface{}{ "target_client_id": targetClientID, "message": message, }) default: c.sendErrorResponse("Failed to send direct message", "Message queue is full") } } // handlePing menangani ping message func (c *Client) handlePing(msg WebSocketMessage) { // Kirim pong response pongMsg := NewWebSocketMessage(PongMessage, map[string]interface{}{ "timestamp": msg.Timestamp.Unix(), "client_id": c.ID, }, c.ID, "") select { case c.Send <- pongMsg: default: // Channel penuh, abaikan } } // handleHeartbeat menangani heartbeat message func (c *Client) handleHeartbeat(msg WebSocketMessage) { // Kirim heartbeat acknowledgment heartbeatAck := NewWebSocketMessage(MessageType("heartbeat_ack"), map[string]interface{}{ "timestamp": time.Now().Unix(), "client_uptime": time.Since(c.connectedAt).Seconds(), "server_uptime": time.Since(c.Hub.startTime).Seconds(), "client_id": c.ID, }, c.ID, "") select { case c.Send <- heartbeatAck: default: // Channel penuh, abaikan } } // handleConnectionTest menangani connection test func (c *Client) handleConnectionTest(msg WebSocketMessage) { // Kirim connection test result testResult := NewWebSocketMessage(MessageType("connection_test_result"), map[string]interface{}{ "timestamp": time.Now().Unix(), "client_id": c.ID, "connection_status": "healthy", "latency_ms": 0, // Could be calculated if ping timestamp is provided "uptime_seconds": time.Since(c.connectedAt).Seconds(), }, c.ID, "") select { case c.Send <- testResult: default: // Channel penuh, abaikan } } // handleJoinRoom menangani join room func (c *Client) handleJoinRoom(msg WebSocketMessage) { data, ok := msg.Data.(map[string]interface{}) if !ok { c.sendErrorResponse("Invalid join room format", "Data must be an object") return } roomName, ok := data["room"].(string) if !ok || roomName == "" { c.sendErrorResponse("Invalid room name", "room is required") return } // Update client room oldRoom := c.Room c.Room = roomName // Update hub room mapping c.Hub.mu.Lock() if oldRoom != "" { if roomClients, exists := c.Hub.rooms[oldRoom]; exists { delete(roomClients, c) if len(roomClients) == 0 { delete(c.Hub.rooms, oldRoom) } } } if c.Hub.rooms[roomName] == nil { c.Hub.rooms[roomName] = make(map[*Client]bool) } c.Hub.rooms[roomName][c] = true c.Hub.mu.Unlock() // Log activity c.Hub.logActivity("client_join_room", c.ID, fmt.Sprintf("Room: %s", roomName)) // Kirim response c.sendDirectResponse("room_joined", map[string]interface{}{ "room": roomName, "previous_room": oldRoom, }) } // handleLeaveRoom menangani leave room func (c *Client) handleLeaveRoom(msg WebSocketMessage) { oldRoom := c.Room if oldRoom == "" { c.sendErrorResponse("Not in any room", "Client is not currently in a room") return } // Update hub room mapping c.Hub.mu.Lock() if roomClients, exists := c.Hub.rooms[oldRoom]; exists { delete(roomClients, c) if len(roomClients) == 0 { delete(c.Hub.rooms, oldRoom) } } c.Hub.mu.Unlock() // Clear client room c.Room = "" // Log activity c.Hub.logActivity("client_leave_room", c.ID, fmt.Sprintf("Room: %s", oldRoom)) // Kirim response c.sendDirectResponse("room_left", map[string]interface{}{ "room": oldRoom, }) } // handleGetRoomInfo menangani get room info func (c *Client) handleGetRoomInfo(msg WebSocketMessage) { c.Hub.mu.RLock() defer c.Hub.mu.RUnlock() roomInfo := make(map[string]interface{}) // Info ruangan saat ini if c.Room != "" { if roomClients, exists := c.Hub.rooms[c.Room]; exists { clientIDs := make([]string, 0, len(roomClients)) for client := range roomClients { clientIDs = append(clientIDs, client.ID) } roomInfo["current_room"] = map[string]interface{}{ "name": c.Room, "client_count": len(roomClients), "clients": clientIDs, } } } // Info semua ruangan allRooms := make(map[string]int) for roomName, clients := range c.Hub.rooms { allRooms[roomName] = len(clients) } roomInfo["all_rooms"] = allRooms roomInfo["total_rooms"] = len(c.Hub.rooms) c.sendDirectResponse("room_info", roomInfo) } // handleDatabaseInsert menangani database insert func (c *Client) handleDatabaseInsert(msg WebSocketMessage) { data, ok := msg.Data.(map[string]interface{}) if !ok { c.sendErrorResponse("Invalid database insert format", "Data must be an object") return } table, ok := data["table"].(string) if !ok || table == "" { c.sendErrorResponse("Invalid table name", "table is required") return } insertData, ok := data["data"].(map[string]interface{}) if !ok { c.sendErrorResponse("Invalid insert data", "data must be an object") return } // For now, just acknowledge the insert request // In a real implementation, you would perform the actual database insert c.sendDirectResponse("db_insert_result", map[string]interface{}{ "table": table, "status": "acknowledged", "message": "Insert request received (not implemented)", "data": insertData, // Use the variable to avoid unused error }) } // handleDatabaseCustomQuery menangani custom database query func (c *Client) handleDatabaseCustomQuery(msg WebSocketMessage) { data, ok := msg.Data.(map[string]interface{}) if !ok { c.sendErrorResponse("Invalid database query format", "Data must be an object") return } database, ok := data["database"].(string) if !ok || database == "" { database = "default" } query, ok := data["query"].(string) if !ok || query == "" { c.sendErrorResponse("Invalid query", "query is required") return } // For now, just acknowledge the query request // In a real implementation, you would execute the query c.sendDirectResponse("db_query_result", map[string]interface{}{ "database": database, "query": query, "status": "acknowledged", "message": "Query request received (not implemented)", }) } // handleGetStats menangani get stats func (c *Client) handleGetStats(msg WebSocketMessage) { stats := c.Hub.GetStats() c.sendDirectResponse("stats", stats) } // handleGetServerStats menangani get server stats func (c *Client) handleGetServerStats(msg WebSocketMessage) { // Create monitoring manager instance monitoringManager := NewMonitoringManager(c.Hub) detailedStats := monitoringManager.GetDetailedStats() c.sendDirectResponse("server_stats", detailedStats) } // handleGetSystemHealth menangani get system health func (c *Client) handleGetSystemHealth(msg WebSocketMessage) { systemHealth := make(map[string]interface{}) if c.Hub.dbService != nil { systemHealth["databases"] = c.Hub.dbService.Health() systemHealth["available_dbs"] = c.Hub.dbService.ListDBs() } systemHealth["websocket_status"] = "healthy" systemHealth["uptime_seconds"] = time.Since(c.Hub.startTime).Seconds() c.sendDirectResponse("system_health", systemHealth) } // handleAdminKickClient menangani admin kick client func (c *Client) handleAdminKickClient(msg WebSocketMessage) { // This should be protected by authentication, but for now just acknowledge data, ok := msg.Data.(map[string]interface{}) if !ok { c.sendErrorResponse("Invalid admin command format", "Data must be an object") return } targetClientID, ok := data["client_id"].(string) if !ok || targetClientID == "" { c.sendErrorResponse("Invalid target client ID", "client_id is required") return } // For now, just acknowledge the kick request // In a real implementation, you would check admin permissions and kick the client c.sendDirectResponse("admin_command_result", map[string]interface{}{ "command": "kick_client", "target_client_id": targetClientID, "status": "acknowledged", "message": "Kick request received (not implemented)", }) } // handleAdminKillServer menangani admin kill server func (c *Client) handleAdminKillServer(msg WebSocketMessage) { // This should be protected by authentication, but for now just acknowledge c.sendDirectResponse("admin_command_result", map[string]interface{}{ "command": "kill_server", "status": "acknowledged", "message": "Kill server request received (not implemented - would require admin auth)", }) } // handleAdminClearLogs menangani admin clear logs func (c *Client) handleAdminClearLogs(msg WebSocketMessage) { // This should be protected by authentication, but for now just acknowledge c.sendDirectResponse("admin_command_result", map[string]interface{}{ "command": "clear_logs", "status": "acknowledged", "message": "Clear logs request received (not implemented - would require admin auth)", }) } // Implementasi metode lainnya... func (c *Client) handleDatabaseQuery(msg WebSocketMessage) { // Implementasi yang sama seperti sebelumnya data, ok := msg.Data.(map[string]interface{}) if !ok { c.sendErrorResponse("Invalid database query format", "Data must be an object") return } table, ok := data["table"].(string) if !ok || table == "" { c.sendErrorResponse("Invalid table name", "table is required") return } // For now, just acknowledge the query request c.sendDirectResponse("db_query_result", map[string]interface{}{ "table": table, "status": "acknowledged", "message": "Query request received (not implemented)", }) } func (c *Client) sendOnlineUsers() { c.Hub.mu.RLock() defer c.Hub.mu.RUnlock() users := make([]map[string]interface{}, 0, len(c.Hub.clients)) for client := range c.Hub.clients { user := map[string]interface{}{ "id": client.ID, "static_id": client.StaticID, "user_id": client.UserID, "room": client.Room, "ip_address": client.IPAddress, "connected_at": client.connectedAt, "last_ping": client.lastPing, "last_pong": client.lastPong, "is_active": client.isClientActive(), } users = append(users, user) } response := NewWebSocketMessage(OnlineUsersMessage, map[string]interface{}{ "users": users, "count": len(users), }, c.ID, "") select { case c.Send <- response: default: // Channel penuh, abaikan } }