Files
antrean-anjungan/internal/handlers/websocket/websocket.go
2025-09-21 17:20:14 +07:00

1622 lines
42 KiB
Go

package websocket
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"net"
"net/http"
"strings"
"sync"
"time"
"api-service/internal/config"
"api-service/internal/database"
"api-service/pkg/logger"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
const (
// Timeout configurations (diperpanjang untuk stability)
ReadTimeout = 300 * time.Second // 5 menit (diperpanjang dari 60 detik)
WriteTimeout = 30 * time.Second // 30 detik untuk write operations
PingInterval = 60 * time.Second // 1 menit untuk ping (lebih konservatif)
PongTimeout = 70 * time.Second // 70 detik untuk menunggu pong response
// Buffer sizes
ReadBufferSize = 8192 // Diperbesar untuk pesan besar
WriteBufferSize = 8192 // Diperbesar untuk pesan besar
ChannelBufferSize = 512 // Buffer untuk channel komunikasi
// Connection limits
MaxMessageSize = 8192 // Maksimum ukuran pesan
HandshakeTimeout = 45 * time.Second
)
type WebSocketMessage struct {
Type string `json:"type"`
Data interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
ClientID string `json:"client_id,omitempty"`
MessageID string `json:"message_id,omitempty"`
}
type Client struct {
ID string
StaticID string // Static ID for persistent identification
IPAddress string // Client IP address
Conn *websocket.Conn
Send chan WebSocketMessage
Hub *Hub
UserID string
Room string
ctx context.Context
cancel context.CancelFunc
lastPing time.Time
lastPong time.Time
connectedAt time.Time
mu sync.RWMutex
isActive bool
}
type ClientInfo struct {
ID string `json:"id"`
StaticID string `json:"static_id"`
IPAddress string `json:"ip_address"`
UserID string `json:"user_id"`
Room string `json:"room"`
ConnectedAt time.Time `json:"connected_at"`
LastPing time.Time `json:"last_ping"`
LastPong time.Time `json:"last_pong"`
IsActive bool `json:"is_active"`
}
type DetailedStats struct {
ConnectedClients int `json:"connected_clients"`
UniqueIPs int `json:"unique_ips"`
StaticClients int `json:"static_clients"`
ActiveRooms int `json:"active_rooms"`
IPDistribution map[string]int `json:"ip_distribution"`
RoomDistribution map[string]int `json:"room_distribution"`
MessageQueueSize int `json:"message_queue_size"`
QueueWorkers int `json:"queue_workers"`
Uptime time.Duration `json:"uptime_seconds"`
Timestamp int64 `json:"timestamp"`
}
type MonitoringData struct {
Stats DetailedStats `json:"stats"`
RecentActivity []ActivityLog `json:"recent_activity"`
SystemHealth map[string]interface{} `json:"system_health"`
Performance PerformanceMetrics `json:"performance"`
}
type ActivityLog struct {
Timestamp time.Time `json:"timestamp"`
Event string `json:"event"`
ClientID string `json:"client_id"`
Details string `json:"details"`
}
type PerformanceMetrics struct {
MessagesPerSecond float64 `json:"messages_per_second"`
AverageLatency float64 `json:"average_latency_ms"`
ErrorRate float64 `json:"error_rate_percent"`
MemoryUsage int64 `json:"memory_usage_bytes"`
}
// Tambahkan field untuk monitoring di Hub
type Hub struct {
clients map[*Client]bool
clientsByID map[string]*Client // Track clients by ID
clientsByIP map[string][]*Client // Track clients by IP
clientsByStatic map[string]*Client // Track clients by static ID
broadcast chan WebSocketMessage
register chan *Client
unregister chan *Client
rooms map[string]map[*Client]bool
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
dbService database.Service
messageQueue chan WebSocketMessage
queueWorkers int
// Monitoring fields
startTime time.Time
messageCount int64
errorCount int64
activityLog []ActivityLog
activityMu sync.RWMutex
}
type WebSocketHandler struct {
hub *Hub
logger *logger.Logger
upgrader websocket.Upgrader
config *config.Config
dbService database.Service
primaryDB string
}
func NewWebSocketHandler(cfg *config.Config, dbService database.Service) *WebSocketHandler {
ctx, cancel := context.WithCancel(context.Background())
hub := &Hub{
clients: make(map[*Client]bool),
clientsByID: make(map[string]*Client),
clientsByIP: make(map[string][]*Client),
clientsByStatic: make(map[string]*Client),
broadcast: make(chan WebSocketMessage, 1000),
register: make(chan *Client),
unregister: make(chan *Client),
rooms: make(map[string]map[*Client]bool),
ctx: ctx,
cancel: cancel,
dbService: dbService,
messageQueue: make(chan WebSocketMessage, 5000),
queueWorkers: 10,
startTime: time.Now(),
activityLog: make([]ActivityLog, 0, 1000), // Keep last 1000 activities
}
handler := &WebSocketHandler{
hub: hub,
logger: logger.Default(),
config: cfg,
dbService: dbService,
primaryDB: "default",
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
ReadBufferSize: ReadBufferSize, // Gunakan konstanta
WriteBufferSize: WriteBufferSize, // Gunakan konstanta
HandshakeTimeout: HandshakeTimeout, // Gunakan konstanta
EnableCompression: true,
},
}
// Start hub and services
go hub.Run()
go handler.StartDatabaseListener()
// go handler.StartServerBroadcasters()
go handler.StartMessageQueue()
go handler.StartConnectionMonitor()
return handler
}
// Helper function to get client IP address
func getClientIP(c *gin.Context) string {
// Check for X-Forwarded-For header (proxy/load balancer)
xff := c.GetHeader("X-Forwarded-For")
if xff != "" {
ips := strings.Split(xff, ",")
if len(ips) > 0 {
return strings.TrimSpace(ips[0])
}
}
// Check for X-Real-IP header (nginx proxy)
xri := c.GetHeader("X-Real-IP")
if xri != "" {
return strings.TrimSpace(xri)
}
// Get IP from RemoteAddr
ip, _, err := net.SplitHostPort(c.Request.RemoteAddr)
if err != nil {
return c.Request.RemoteAddr
}
return ip
}
// Generate static client ID based on IP and optional static ID
func generateClientID(ipAddress, staticID, userID string) string {
if staticID != "" {
// Use provided static ID
return staticID
}
// Generate ID based on IP and userID
data := fmt.Sprintf("%s:%s:%d", ipAddress, userID, time.Now().Unix()/3600) // Hour-based for some uniqueness
hash := sha256.Sum256([]byte(data))
return fmt.Sprintf("client_%s", hex.EncodeToString(hash[:8])) // Use first 8 bytes of hash
}
// Generate IP-based static ID
func generateIPBasedID(ipAddress string) string {
hash := sha256.Sum256([]byte(ipAddress))
return fmt.Sprintf("ip_%s", hex.EncodeToString(hash[:6])) // Use first 6 bytes of hash
}
func (h *WebSocketHandler) HandleWebSocket(c *gin.Context) {
conn, err := h.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
h.logger.Error(fmt.Sprintf("Failed to upgrade connection: %v", err))
return
}
// Get connection parameters
userID := c.Query("user_id")
if userID == "" {
userID = "anonymous"
}
room := c.Query("room")
if room == "" {
room = "default"
}
staticID := c.Query("static_id") // Optional static ID
useIPBasedID := c.Query("ip_based") // Use IP-based ID if "true"
// Get client IP address
ipAddress := getClientIP(c)
var clientID string
// Determine client ID generation strategy
if useIPBasedID == "true" {
clientID = generateIPBasedID(ipAddress)
staticID = clientID
} else if staticID != "" {
clientID = staticID
} else {
clientID = generateClientID(ipAddress, staticID, userID)
}
// Check if client with same static ID already exists
h.hub.mu.Lock()
if existingClient, exists := h.hub.clientsByStatic[clientID]; exists {
h.logger.Info(fmt.Sprintf("Disconnecting existing client %s for reconnection", clientID))
// Disconnect existing client
existingClient.cancel()
existingClient.Conn.Close()
delete(h.hub.clientsByStatic, clientID)
}
h.hub.mu.Unlock()
ctx, cancel := context.WithCancel(h.hub.ctx)
client := &Client{
ID: clientID,
StaticID: clientID,
IPAddress: ipAddress,
Conn: conn,
Send: make(chan WebSocketMessage, 256),
Hub: h.hub,
UserID: userID,
Room: room,
ctx: ctx,
cancel: cancel,
lastPing: time.Now(),
connectedAt: time.Now(),
}
client.Hub.register <- client
// Send welcome message with connection info
welcomeMsg := WebSocketMessage{
Type: "welcome",
Data: map[string]interface{}{
"message": "Connected to WebSocket server",
"client_id": client.ID,
"static_id": client.StaticID,
"ip_address": client.IPAddress,
"room": client.Room,
"user_id": client.UserID,
"connected_at": client.connectedAt.Unix(),
"id_type": h.getIDType(useIPBasedID, staticID),
},
Timestamp: time.Now(),
MessageID: uuid.New().String(),
}
select {
case client.Send <- welcomeMsg:
default:
close(client.Send)
cancel()
return
}
go client.writePump()
go client.readPump()
}
func (h *WebSocketHandler) getIDType(useIPBased, staticID string) string {
if useIPBased == "true" {
return "ip_based"
} else if staticID != "" {
return "static"
}
return "generated"
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
// Register in main clients map
h.clients[client] = true
// Register by ID
h.clientsByID[client.ID] = client
// Register by static ID
if client.StaticID != "" {
h.clientsByStatic[client.StaticID] = client
}
// Register by IP
if h.clientsByIP[client.IPAddress] == nil {
h.clientsByIP[client.IPAddress] = make([]*Client, 0)
}
h.clientsByIP[client.IPAddress] = append(h.clientsByIP[client.IPAddress], client)
// Register in room
if client.Room != "" {
if h.rooms[client.Room] == nil {
h.rooms[client.Room] = make(map[*Client]bool)
}
h.rooms[client.Room][client] = true
}
h.mu.Unlock()
// Log activity
h.logActivity("client_connected", client.ID,
fmt.Sprintf("IP: %s, Static: %s, Room: %s", client.IPAddress, client.StaticID, client.Room))
logger.Info(fmt.Sprintf("Client %s (Static: %s, IP: %s) connected to room %s",
client.ID, client.StaticID, client.IPAddress, client.Room))
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
// Remove from main clients
delete(h.clients, client)
close(client.Send)
// Remove from clientsByID
delete(h.clientsByID, client.ID)
// Remove from clientsByStatic
if client.StaticID != "" {
delete(h.clientsByStatic, client.StaticID)
}
// Remove from clientsByIP
if ipClients, exists := h.clientsByIP[client.IPAddress]; exists {
for i, c := range ipClients {
if c == client {
h.clientsByIP[client.IPAddress] = append(ipClients[:i], ipClients[i+1:]...)
break
}
}
// If no more clients from this IP, remove the IP entry
if len(h.clientsByIP[client.IPAddress]) == 0 {
delete(h.clientsByIP, client.IPAddress)
}
}
// Remove from room
if client.Room != "" {
if room, exists := h.rooms[client.Room]; exists {
delete(room, client)
if len(room) == 0 {
delete(h.rooms, client.Room)
}
}
}
}
h.mu.Unlock()
// Log activity
h.logActivity("client_disconnected", client.ID,
fmt.Sprintf("IP: %s, Duration: %v", client.IPAddress, time.Since(client.connectedAt)))
client.cancel()
logger.Info(fmt.Sprintf("Client %s (IP: %s) disconnected", client.ID, client.IPAddress))
case message := <-h.broadcast:
h.messageCount++
h.broadcastToClients(message)
case <-h.ctx.Done():
return
}
}
}
// Enhanced message handling with client info
func (c *Client) handleMessage(msg WebSocketMessage) {
switch msg.Type {
case "ping":
// Respons ping dari client dengan informasi lebih lengkap
c.sendDirectResponse("pong", map[string]interface{}{
"message": "Server is alive",
"timestamp": time.Now().Unix(),
"client_id": c.ID,
"static_id": c.StaticID,
"server_time": time.Now().Format(time.RFC3339),
"uptime": time.Since(c.connectedAt).Seconds(),
})
case "heartbeat":
// Tambahan: Handle heartbeat khusus
c.sendDirectResponse("heartbeat_ack", map[string]interface{}{
"client_id": c.ID,
"timestamp": time.Now().Unix(),
"status": "alive",
})
case "connection_test":
// Tambahan: Test koneksi
c.sendDirectResponse("connection_test_result", map[string]interface{}{
"latency_ms": 0, // Bisa dihitung jika perlu
"connection_id": c.ID,
"is_active": c.isClientActive(),
"last_ping": c.lastPing.Unix(),
"last_pong": c.lastPong.Unix(),
})
case "get_server_info":
c.Hub.mu.RLock()
connectedClients := len(c.Hub.clients)
roomsCount := len(c.Hub.rooms)
uniqueIPs := len(c.Hub.clientsByIP)
c.Hub.mu.RUnlock()
c.sendDirectResponse("server_info", map[string]interface{}{
"connected_clients": connectedClients,
"rooms_count": roomsCount,
"unique_ips": uniqueIPs,
"your_info": map[string]interface{}{
"client_id": c.ID,
"static_id": c.StaticID,
"ip_address": c.IPAddress,
"user_id": c.UserID,
"room": c.Room,
"connected_at": c.connectedAt.Unix(),
},
})
case "get_clients_by_ip":
c.handleGetClientsByIP(msg)
case "get_client_info":
c.handleGetClientInfo(msg)
case "direct_message":
c.handleDirectMessage(msg)
case "room_message":
c.handleRoomMessage(msg)
case "broadcast":
c.Hub.broadcast <- msg
c.sendDirectResponse("broadcast_sent", "Message broadcasted to all clients")
case "get_online_users":
c.sendOnlineUsers()
case "database_query":
c.handleDatabaseQuery(msg)
default:
c.sendDirectResponse("message_received", fmt.Sprintf("Message received: %v", msg.Data))
c.Hub.broadcast <- msg
}
}
// Handle get clients by IP
func (c *Client) handleGetClientsByIP(msg WebSocketMessage) {
data, ok := msg.Data.(map[string]interface{})
if !ok {
c.sendErrorResponse("Invalid request format", "Expected object with ip_address")
return
}
targetIP, exists := data["ip_address"].(string)
if !exists {
targetIP = c.IPAddress // Default to current client's IP
}
c.Hub.mu.RLock()
ipClients := c.Hub.clientsByIP[targetIP]
var clientInfos []ClientInfo
for _, client := range ipClients {
clientInfos = append(clientInfos, ClientInfo{
ID: client.ID,
StaticID: client.StaticID,
IPAddress: client.IPAddress,
UserID: client.UserID,
Room: client.Room,
ConnectedAt: client.connectedAt,
LastPing: client.lastPing,
})
}
c.Hub.mu.RUnlock()
c.sendDirectResponse("clients_by_ip", map[string]interface{}{
"ip_address": targetIP,
"clients": clientInfos,
"count": len(clientInfos),
})
}
// Handle get specific client info
func (c *Client) handleGetClientInfo(msg WebSocketMessage) {
data, ok := msg.Data.(map[string]interface{})
if !ok {
c.sendErrorResponse("Invalid request format", "Expected object with client_id or static_id")
return
}
var targetClient *Client
if clientID, exists := data["client_id"].(string); exists {
c.Hub.mu.RLock()
targetClient = c.Hub.clientsByID[clientID]
c.Hub.mu.RUnlock()
} else if staticID, exists := data["static_id"].(string); exists {
c.Hub.mu.RLock()
targetClient = c.Hub.clientsByStatic[staticID]
c.Hub.mu.RUnlock()
}
if targetClient == nil {
c.sendErrorResponse("Client not found", "No client found with the specified ID")
return
}
clientInfo := ClientInfo{
ID: targetClient.ID,
StaticID: targetClient.StaticID,
IPAddress: targetClient.IPAddress,
UserID: targetClient.UserID,
Room: targetClient.Room,
ConnectedAt: targetClient.connectedAt,
LastPing: targetClient.lastPing,
}
c.sendDirectResponse("client_info", clientInfo)
}
// Enhanced online users with IP and static ID info
func (c *Client) sendOnlineUsers() {
c.Hub.mu.RLock()
var onlineUsers []map[string]interface{}
ipStats := make(map[string]int)
for client := range c.Hub.clients {
onlineUsers = append(onlineUsers, map[string]interface{}{
"client_id": client.ID,
"static_id": client.StaticID,
"user_id": client.UserID,
"room": client.Room,
"ip_address": client.IPAddress,
"connected_at": client.connectedAt.Unix(),
"last_ping": client.lastPing.Unix(),
})
ipStats[client.IPAddress]++
}
c.Hub.mu.RUnlock()
c.sendDirectResponse("online_users", map[string]interface{}{
"users": onlineUsers,
"total": len(onlineUsers),
"ip_stats": ipStats,
"unique_ips": len(ipStats),
})
}
// Enhanced database query handler
func (c *Client) handleDatabaseQuery(msg WebSocketMessage) {
data, ok := msg.Data.(map[string]interface{})
if !ok {
c.sendErrorResponse("Invalid query format", "Expected object with query parameters")
return
}
queryType, exists := data["query_type"].(string)
if !exists {
c.sendErrorResponse("Missing query_type", "query_type is required")
return
}
switch queryType {
case "health_check":
health := c.Hub.dbService.Health()
c.sendDirectResponse("query_result", map[string]interface{}{
"type": "health_check",
"result": health,
})
case "database_list":
dbList := c.Hub.dbService.ListDBs()
c.sendDirectResponse("query_result", map[string]interface{}{
"type": "database_list",
"databases": dbList,
})
case "connection_stats":
c.Hub.mu.RLock()
stats := map[string]interface{}{
"total_clients": len(c.Hub.clients),
"unique_ips": len(c.Hub.clientsByIP),
"static_clients": len(c.Hub.clientsByStatic),
"rooms": len(c.Hub.rooms),
}
c.Hub.mu.RUnlock()
c.sendDirectResponse("query_result", map[string]interface{}{
"type": "connection_stats",
"result": stats,
})
case "trigger_notification":
channel, channelExists := data["channel"].(string)
payload, payloadExists := data["payload"].(string)
if !channelExists || !payloadExists {
c.sendErrorResponse("Missing Parameters", "channel and payload required")
return
}
err := c.Hub.dbService.NotifyChange("default", channel, payload)
if err != nil {
c.sendErrorResponse("Notification Error", err.Error())
return
}
c.sendDirectResponse("query_result", map[string]interface{}{
"type": "notification_sent",
"channel": channel,
"payload": payload,
})
default:
c.sendErrorResponse("Unsupported query", fmt.Sprintf("Query type '%s' not supported", queryType))
}
}
// Enhanced client search methods
func (h *WebSocketHandler) GetClientByID(clientID string) *Client {
h.hub.mu.RLock()
defer h.hub.mu.RUnlock()
return h.hub.clientsByID[clientID]
}
func (h *WebSocketHandler) GetClientByStaticID(staticID string) *Client {
h.hub.mu.RLock()
defer h.hub.mu.RUnlock()
return h.hub.clientsByStatic[staticID]
}
func (h *WebSocketHandler) GetClientsByIP(ipAddress string) []*Client {
h.hub.mu.RLock()
defer h.hub.mu.RUnlock()
return h.hub.clientsByIP[ipAddress]
}
func (h *WebSocketHandler) SendToClientByStaticID(staticID string, messageType string, data interface{}) bool {
client := h.GetClientByStaticID(staticID)
if client == nil {
return false
}
msg := WebSocketMessage{
Type: messageType,
Data: data,
Timestamp: time.Now(),
ClientID: client.ID,
MessageID: uuid.New().String(),
}
select {
case h.hub.messageQueue <- msg:
return true
default:
return false
}
}
func (h *WebSocketHandler) BroadcastToIP(ipAddress string, messageType string, data interface{}) int {
clients := h.GetClientsByIP(ipAddress)
count := 0
for _, client := range clients {
msg := WebSocketMessage{
Type: messageType,
Data: data,
Timestamp: time.Now(),
ClientID: client.ID,
MessageID: uuid.New().String(),
}
select {
case h.hub.messageQueue <- msg:
count++
default:
// Skip if queue is full
}
}
return count
}
// 1. SERVER BROADCAST DATA TANPA PERMINTAAN CLIENT
func (h *WebSocketHandler) StartServerBroadcasters() {
// Heartbeat broadcaster
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.hub.mu.RLock()
connectedClients := len(h.hub.clients)
uniqueIPs := len(h.hub.clientsByIP)
staticClients := len(h.hub.clientsByStatic)
h.hub.mu.RUnlock()
h.BroadcastMessage("server_heartbeat", map[string]interface{}{
"message": "Server heartbeat",
"connected_clients": connectedClients,
"unique_ips": uniqueIPs,
"static_clients": staticClients,
"timestamp": time.Now().Unix(),
"server_id": "api-service-v1",
})
case <-h.hub.ctx.Done():
return
}
}
}()
// System notification broadcaster
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
dbHealth := h.dbService.Health()
h.BroadcastMessage("system_status", map[string]interface{}{
"type": "system_notification",
"database": dbHealth,
"timestamp": time.Now().Unix(),
"uptime": time.Since(time.Now()).String(),
})
case <-h.hub.ctx.Done():
return
}
}
}()
// Data stream broadcaster (demo purpose)
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
counter := 0
for {
select {
case <-ticker.C:
counter++
h.BroadcastMessage("data_stream", map[string]interface{}{
"id": counter,
"value": counter * 10,
"timestamp": time.Now().Unix(),
"type": "real_time_data",
})
case <-h.hub.ctx.Done():
return
}
}
}()
}
// 4. SERVER MENGIRIM DATA JIKA ADA PERUBAHAN DATABASE
func (h *WebSocketHandler) StartDatabaseListener() {
// Cek apakah database utama adalah PostgreSQL
dbType, err := h.dbService.GetDBType(h.primaryDB)
if err != nil || dbType != database.Postgres {
h.logger.Error(fmt.Sprintf("Database notifications require PostgreSQL. Current DB type: %v", dbType))
return
}
channels := []string{"retribusi_changes", "peserta_changes", "system_changes"}
err = h.dbService.ListenForChanges(h.hub.ctx, h.primaryDB, channels, func(channel, payload string) {
var changeData map[string]interface{}
if err := json.Unmarshal([]byte(payload), &changeData); err != nil {
h.logger.Error(fmt.Sprintf("Failed to parse database notification: %v", err))
// Kirim raw payload jika JSON parsing gagal
changeData = map[string]interface{}{
"raw_payload": payload,
"parse_error": err.Error(),
}
}
h.BroadcastMessage("database_change", map[string]interface{}{
"channel": channel,
"operation": changeData["operation"],
"table": changeData["table"],
"data": changeData["data"],
"timestamp": time.Now().Unix(),
"database": h.primaryDB,
})
h.logger.Info(fmt.Sprintf("Database change broadcasted: %s from %s", channel, h.primaryDB))
})
if err != nil {
h.logger.Error(fmt.Sprintf("Failed to start database listener: %v", err))
}
}
func (h *WebSocketHandler) StartMessageQueue() {
// Start multiple workers for message processing
for i := 0; i < h.hub.queueWorkers; i++ {
go func(workerID int) {
for {
select {
case message := <-h.hub.messageQueue:
h.hub.broadcast <- message
case <-h.hub.ctx.Done():
return
}
}
}(i)
}
}
func (h *Hub) broadcastToClients(message WebSocketMessage) {
h.mu.RLock()
defer h.mu.RUnlock()
if message.ClientID != "" {
// Send to specific client
for client := range h.clients {
if client.ID == message.ClientID {
select {
case client.Send <- message:
default:
h.unregisterClient(client)
}
break
}
}
return
}
// Check if it's a room message
if data, ok := message.Data.(map[string]interface{}); ok {
if roomName, exists := data["room"].(string); exists {
if room, roomExists := h.rooms[roomName]; roomExists {
for client := range room {
select {
case client.Send <- message:
default:
h.unregisterClient(client)
}
}
}
return
}
}
// Broadcast to all clients
for client := range h.clients {
select {
case client.Send <- message:
default:
h.unregisterClient(client)
}
}
}
func (h *Hub) unregisterClient(client *Client) {
go func() {
h.unregister <- client
}()
}
// 3. CLIENT MENGIRIM DATA KE CLIENT LAIN
func (c *Client) handleDirectMessage(msg WebSocketMessage) {
data, ok := msg.Data.(map[string]interface{})
if !ok {
c.sendErrorResponse("Invalid direct message format", "Expected object with message data")
return
}
targetClientID, exists := data["target_client_id"].(string)
if !exists {
c.sendErrorResponse("Missing target", "target_client_id is required")
return
}
directMsg := WebSocketMessage{
Type: "direct_message_received",
Data: map[string]interface{}{
"from": c.ID,
"from_static_id": c.StaticID,
"from_ip": c.IPAddress,
"from_user_id": c.UserID,
"message": data["message"],
"original_msg_id": msg.MessageID,
},
Timestamp: time.Now(),
ClientID: targetClientID,
MessageID: uuid.New().String(),
}
c.Hub.broadcast <- directMsg
c.sendDirectResponse("direct_message_sent", map[string]interface{}{
"target_client": targetClientID,
"message_id": directMsg.MessageID,
})
}
func (c *Client) handleRoomMessage(msg WebSocketMessage) {
data, ok := msg.Data.(map[string]interface{})
if !ok {
c.sendErrorResponse("Invalid room message format", "Expected object with message data")
return
}
roomName, exists := data["room"].(string)
if !exists {
roomName = c.Room
}
roomMsg := WebSocketMessage{
Type: "room_message_received",
Data: map[string]interface{}{
"room": roomName,
"from": c.ID,
"from_static_id": c.StaticID,
"from_ip": c.IPAddress,
"from_user_id": c.UserID,
"message": data["message"],
"original_msg_id": msg.MessageID,
},
Timestamp: time.Now(),
MessageID: uuid.New().String(),
}
// Send to room members
c.Hub.mu.RLock()
if room, exists := c.Hub.rooms[roomName]; exists {
for client := range room {
if client.ID != c.ID {
select {
case client.Send <- roomMsg:
default:
logger.Error(fmt.Sprintf("Failed to send room message to client %s", client.ID))
}
}
}
}
c.Hub.mu.RUnlock()
c.sendDirectResponse("room_message_sent", map[string]interface{}{
"room": roomName,
"message_id": roomMsg.MessageID,
})
}
func (c *Client) readPump() {
defer func() {
c.Hub.unregister <- c
c.Conn.Close()
logger.Info(fmt.Sprintf("Client %s readPump terminated", c.ID))
}()
// Konfigurasi connection limits
c.Conn.SetReadLimit(MaxMessageSize)
c.resetReadDeadline() // Set initial deadline
// Ping/Pong handlers dengan logging yang lebih baik
c.Conn.SetPingHandler(func(message string) error {
logger.Debug(fmt.Sprintf("Client %s received ping", c.ID))
c.resetReadDeadline()
return c.Conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(WriteTimeout))
})
c.Conn.SetPongHandler(func(message string) error {
c.mu.Lock()
c.lastPong = time.Now()
c.isActive = true
c.mu.Unlock()
c.resetReadDeadline()
logger.Debug(fmt.Sprintf("Client %s received pong", c.ID))
return nil
})
for {
select {
case <-c.ctx.Done():
logger.Info(fmt.Sprintf("Client %s context cancelled", c.ID))
return
default:
_, message, err := c.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err,
websocket.CloseGoingAway,
websocket.CloseAbnormalClosure,
websocket.CloseNormalClosure) {
logger.Error(fmt.Sprintf("WebSocket unexpected close for client %s: %v", c.ID, err))
} else {
logger.Info(fmt.Sprintf("WebSocket closed for client %s: %v", c.ID, err))
}
return
}
// PENTING: Reset deadline setiap kali ada pesan masuk
c.resetReadDeadline()
c.updateLastActivity()
var msg WebSocketMessage
if err := json.Unmarshal(message, &msg); err != nil {
c.sendErrorResponse("Invalid message format", err.Error())
continue
}
msg.Timestamp = time.Now()
msg.ClientID = c.ID
if msg.MessageID == "" {
msg.MessageID = uuid.New().String()
}
c.handleMessage(msg)
}
}
}
// resetReadDeadline - Reset read deadline dengan timeout yang lebih panjang
func (c *Client) resetReadDeadline() {
c.Conn.SetReadDeadline(time.Now().Add(ReadTimeout))
}
// updateLastActivity - Update waktu aktivitas terakhir
func (c *Client) updateLastActivity() {
c.mu.Lock()
defer c.mu.Unlock()
c.lastPing = time.Now()
c.isActive = true
}
// sendPing - Kirim ping message dengan proper error handling
func (c *Client) sendPing() error {
c.Conn.SetWriteDeadline(time.Now().Add(WriteTimeout))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return err
}
c.mu.Lock()
c.lastPing = time.Now()
c.mu.Unlock()
logger.Debug(fmt.Sprintf("Ping sent to client %s", c.ID))
return nil
}
// isPongTimeout - Cek apakah client sudah timeout dalam merespons pong
func (c *Client) isPongTimeout() bool {
c.mu.RLock()
defer c.mu.RUnlock()
// Jika belum pernah menerima pong, gunakan lastPing sebagai baseline
lastActivity := c.lastPong
if lastActivity.IsZero() {
lastActivity = c.lastPing
}
return time.Since(lastActivity) > PongTimeout
}
// isClientActive - Cek apakah client masih aktif
func (c *Client) isClientActive() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.isActive && time.Since(c.lastPing) < PongTimeout
}
// gracefulClose - Tutup koneksi dengan graceful
func (c *Client) gracefulClose() {
c.mu.Lock()
c.isActive = false
c.mu.Unlock()
// Kirim close message
c.Conn.SetWriteDeadline(time.Now().Add(WriteTimeout))
c.Conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
// Cancel context
c.cancel()
logger.Info(fmt.Sprintf("Client %s closed gracefully", c.ID))
}
func (c *Client) sendDirectResponse(messageType string, data interface{}) {
response := WebSocketMessage{
Type: messageType,
Data: data,
Timestamp: time.Now(),
MessageID: uuid.New().String(),
}
select {
case c.Send <- response:
default:
logger.Error("Failed to send direct response to client")
}
}
func (c *Client) sendErrorResponse(error, details string) {
c.sendDirectResponse("error", map[string]interface{}{
"error": error,
"details": details,
})
}
func (c *Client) writePump() {
ticker := time.NewTicker(PingInterval)
defer func() {
ticker.Stop()
c.Conn.Close()
logger.Info(fmt.Sprintf("Client %s writePump terminated", c.ID))
}()
for {
select {
case message, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(WriteTimeout))
if !ok {
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.Conn.WriteJSON(message); err != nil {
logger.Error(fmt.Sprintf("Failed to write message to client %s: %v", c.ID, err))
return
}
case <-ticker.C:
// Kirim ping dan cek apakah client masih responsif
if err := c.sendPing(); err != nil {
logger.Error(fmt.Sprintf("Failed to send ping to client %s: %v", c.ID, err))
return
}
// Cek apakah client sudah terlalu lama tidak merespons pong
if c.isPongTimeout() {
logger.Warn(fmt.Sprintf("Client %s pong timeout, disconnecting", c.ID))
return
}
case <-c.ctx.Done():
logger.Info(fmt.Sprintf("Client %s writePump context cancelled", c.ID))
return
}
}
}
// Broadcast methods
func (h *WebSocketHandler) BroadcastMessage(messageType string, data interface{}) {
msg := WebSocketMessage{
Type: messageType,
Data: data,
Timestamp: time.Now(),
MessageID: uuid.New().String(),
}
select {
case h.hub.messageQueue <- msg:
default:
logger.Error("Message queue full, dropping message")
}
}
func (h *WebSocketHandler) BroadcastToRoom(room string, messageType string, data interface{}) {
msg := WebSocketMessage{
Type: messageType,
Data: map[string]interface{}{
"room": room,
"data": data,
},
Timestamp: time.Now(),
MessageID: uuid.New().String(),
}
select {
case h.hub.messageQueue <- msg:
default:
logger.Error("Message queue full, dropping room message")
}
}
func (h *WebSocketHandler) SendToClient(clientID string, messageType string, data interface{}) {
msg := WebSocketMessage{
Type: messageType,
Data: data,
Timestamp: time.Now(),
ClientID: clientID,
MessageID: uuid.New().String(),
}
select {
case h.hub.messageQueue <- msg:
default:
logger.Error("Message queue full, dropping client message")
}
}
func (h *WebSocketHandler) GetConnectedClients() int {
h.hub.mu.RLock()
defer h.hub.mu.RUnlock()
return len(h.hub.clients)
}
func (h *WebSocketHandler) GetRoomClients(room string) int {
h.hub.mu.RLock()
defer h.hub.mu.RUnlock()
if roomClients, exists := h.hub.rooms[room]; exists {
return len(roomClients)
}
return 0
}
func (h *WebSocketHandler) Shutdown() {
h.hub.cancel()
h.hub.mu.RLock()
for client := range h.hub.clients {
client.cancel()
}
h.hub.mu.RUnlock()
}
// 1. GetDetailedStats - Mengembalikan statistik detail
func (h *WebSocketHandler) GetDetailedStats() DetailedStats {
h.hub.mu.RLock()
// Calculate IP distribution
ipDistribution := make(map[string]int)
for ip, clients := range h.hub.clientsByIP {
ipDistribution[ip] = len(clients)
}
// Calculate room distribution
roomDistribution := make(map[string]int)
for room, clients := range h.hub.rooms {
roomDistribution[room] = len(clients)
}
stats := DetailedStats{
ConnectedClients: len(h.hub.clients),
UniqueIPs: len(h.hub.clientsByIP),
StaticClients: len(h.hub.clientsByStatic),
ActiveRooms: len(h.hub.rooms),
IPDistribution: ipDistribution,
RoomDistribution: roomDistribution,
MessageQueueSize: len(h.hub.messageQueue),
QueueWorkers: h.hub.queueWorkers,
Uptime: time.Since(h.hub.startTime),
Timestamp: time.Now().Unix(),
}
h.hub.mu.RUnlock()
return stats
}
// 2. GetAllClients - Mengembalikan semua client yang terhubung
func (h *WebSocketHandler) GetAllClients() []ClientInfo {
h.hub.mu.RLock()
defer h.hub.mu.RUnlock()
var clients []ClientInfo
for client := range h.hub.clients {
clientInfo := ClientInfo{
ID: client.ID,
StaticID: client.StaticID,
IPAddress: client.IPAddress,
UserID: client.UserID,
Room: client.Room,
ConnectedAt: client.connectedAt, // Perbaikan: gunakan connectedAt bukan ConnectedAt
LastPing: client.lastPing, // Perbaikan: gunakan lastPing bukan LastPing
}
clients = append(clients, clientInfo)
}
return clients
}
// 3. GetAllRooms - Mengembalikan semua room dan anggotanya
func (h *WebSocketHandler) GetAllRooms() map[string][]ClientInfo {
h.hub.mu.RLock()
defer h.hub.mu.RUnlock()
rooms := make(map[string][]ClientInfo)
for roomName, clients := range h.hub.rooms {
var roomClients []ClientInfo
for client := range clients {
clientInfo := ClientInfo{
ID: client.ID,
StaticID: client.StaticID,
IPAddress: client.IPAddress,
UserID: client.UserID,
Room: client.Room,
ConnectedAt: client.connectedAt,
LastPing: client.lastPing,
}
roomClients = append(roomClients, clientInfo)
}
rooms[roomName] = roomClients
}
return rooms
}
// 4. GetMonitoringData - Mengembalikan data monitoring lengkap
func (h *WebSocketHandler) GetMonitoringData() MonitoringData {
stats := h.GetDetailedStats()
h.hub.activityMu.RLock()
recentActivity := make([]ActivityLog, 0)
// Get last 100 activities
start := len(h.hub.activityLog) - 100
if start < 0 {
start = 0
}
for i := start; i < len(h.hub.activityLog); i++ {
recentActivity = append(recentActivity, h.hub.activityLog[i])
}
h.hub.activityMu.RUnlock()
// Get system health from database service
systemHealth := make(map[string]interface{})
if h.dbService != nil {
systemHealth["databases"] = h.dbService.Health()
systemHealth["available_dbs"] = h.dbService.ListDBs()
}
systemHealth["websocket_status"] = "healthy"
systemHealth["uptime_seconds"] = time.Since(h.hub.startTime).Seconds()
// Calculate performance metrics
uptime := time.Since(h.hub.startTime)
var messagesPerSecond float64
var errorRate float64
if uptime.Seconds() > 0 {
messagesPerSecond = float64(h.hub.messageCount) / uptime.Seconds()
}
if h.hub.messageCount > 0 {
errorRate = (float64(h.hub.errorCount) / float64(h.hub.messageCount)) * 100
}
performance := PerformanceMetrics{
MessagesPerSecond: messagesPerSecond,
AverageLatency: 2.5, // Mock value - implement actual latency tracking
ErrorRate: errorRate,
MemoryUsage: 0, // Mock value - implement actual memory tracking
}
return MonitoringData{
Stats: stats,
RecentActivity: recentActivity,
SystemHealth: systemHealth,
Performance: performance,
}
}
func (h *WebSocketHandler) GetRoomClientCount(room string) int {
h.hub.mu.RLock()
defer h.hub.mu.RUnlock()
if roomClients, exists := h.hub.rooms[room]; exists {
return len(roomClients)
}
return 0
}
func (h *WebSocketHandler) GetActiveClients(olderThan time.Duration) []ClientInfo {
h.hub.mu.RLock()
defer h.hub.mu.RUnlock()
var activeClients []ClientInfo
cutoff := time.Now().Add(-olderThan)
for client := range h.hub.clients {
if client.lastPing.After(cutoff) {
activeClients = append(activeClients, ClientInfo{
ID: client.ID,
StaticID: client.StaticID,
IPAddress: client.IPAddress,
UserID: client.UserID,
Room: client.Room,
ConnectedAt: client.connectedAt,
LastPing: client.lastPing,
})
}
}
return activeClients
}
func (h *WebSocketHandler) CleanupInactiveClients(inactiveTimeout time.Duration) int {
h.hub.mu.RLock()
var inactiveClients []*Client
cutoff := time.Now().Add(-inactiveTimeout)
for client := range h.hub.clients {
if client.lastPing.Before(cutoff) {
inactiveClients = append(inactiveClients, client)
}
}
h.hub.mu.RUnlock()
// Disconnect inactive clients
for _, client := range inactiveClients {
h.hub.logActivity("cleanup_disconnect", client.ID,
fmt.Sprintf("Inactive for %v", time.Since(client.lastPing)))
client.cancel()
client.Conn.Close()
}
return len(inactiveClients)
}
// 5. DisconnectClient - Memutus koneksi client tertentu
func (h *WebSocketHandler) DisconnectClient(clientID string) bool {
h.hub.mu.RLock()
client, exists := h.hub.clientsByID[clientID]
h.hub.mu.RUnlock()
if !exists {
return false
}
// Log activity
h.hub.logActivity("force_disconnect", clientID, "Client disconnected by admin")
// Cancel context and close connection
client.cancel()
client.Conn.Close()
// The client will be automatically removed from hub in the Run() loop
return true
}
func (h *WebSocketHandler) StartConnectionMonitor() {
ticker := time.NewTicker(2 * time.Minute) // Check setiap 2 menit
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.cleanupInactiveConnections()
h.logConnectionStats()
case <-h.hub.ctx.Done():
return
}
}
}
// cleanupInactiveConnections - Bersihkan koneksi yang tidak aktif
func (h *WebSocketHandler) cleanupInactiveConnections() {
h.hub.mu.RLock()
var inactiveClients []*Client
for client := range h.hub.clients {
if !client.isClientActive() {
inactiveClients = append(inactiveClients, client)
}
}
h.hub.mu.RUnlock()
// Disconnect inactive clients
for _, client := range inactiveClients {
h.hub.logActivity("cleanup_disconnect", client.ID,
fmt.Sprintf("Inactive for %v", time.Since(client.lastPing)))
client.gracefulClose()
}
if len(inactiveClients) > 0 {
logger.Info(fmt.Sprintf("Cleaned up %d inactive connections", len(inactiveClients)))
}
}
// logConnectionStats - Log statistik koneksi
func (h *WebSocketHandler) logConnectionStats() {
stats := h.GetDetailedStats()
logger.Info(fmt.Sprintf("WebSocket Stats - Clients: %d, IPs: %d, Rooms: %d, Queue: %d",
stats.ConnectedClients, stats.UniqueIPs, stats.ActiveRooms, stats.MessageQueueSize))
}
func (h *Hub) logActivity(event, clientID, details string) {
h.activityMu.Lock()
defer h.activityMu.Unlock()
activity := ActivityLog{
Timestamp: time.Now(),
Event: event,
ClientID: clientID,
Details: details,
}
h.activityLog = append(h.activityLog, activity)
// Keep only last 1000 activities
if len(h.activityLog) > 1000 {
h.activityLog = h.activityLog[1:]
}
}
// Example Database Use Triger
// -- Trigger function untuk notifikasi perubahan data
// CREATE OR REPLACE FUNCTION notify_data_change() RETURNS trigger AS $$
// DECLARE
// channel text := 'retribusi_changes';
// payload json;
// BEGIN
// -- Tentukan channel berdasarkan table
// IF TG_TABLE_NAME = 'retribusi' THEN
// channel := 'retribusi_changes';
// ELSIF TG_TABLE_NAME = 'peserta' THEN
// channel := 'peserta_changes';
// END IF;
// -- Buat payload
// IF TG_OP = 'DELETE' THEN
// payload = json_build_object(
// 'operation', TG_OP,
// 'table', TG_TABLE_NAME,
// 'data', row_to_json(OLD)
// );
// ELSE
// payload = json_build_object(
// 'operation', TG_OP,
// 'table', TG_TABLE_NAME,
// 'data', row_to_json(NEW)
// );
// END IF;
// -- Kirim notifikasi
// PERFORM pg_notify(channel, payload::text);
// RETURN COALESCE(NEW, OLD);
// END;
// $$ LANGUAGE plpgsql;
// -- Trigger untuk table retribusi
// CREATE TRIGGER retribusi_notify_trigger
// AFTER INSERT OR UPDATE OR DELETE ON retribusi
// FOR EACH ROW EXECUTE FUNCTION notify_data_change();
// -- Trigger untuk table peserta
// CREATE TRIGGER peserta_notify_trigger
// AFTER INSERT OR UPDATE OR DELETE ON peserta
// FOR EACH ROW EXECUTE FUNCTION notify_data_change();