Files

220 lines
6.1 KiB
Go

package websocket
import (
"time"
)
// Broadcaster mengelola broadcasting pesan
type Broadcaster struct {
hub *Hub
}
// NewBroadcaster membuat broadcaster baru
func NewBroadcaster(hub *Hub) *Broadcaster {
return &Broadcaster{hub: hub}
}
// StartServerBroadcasters memulai broadcaster server
func (b *Broadcaster) StartServerBroadcasters() {
// Heartbeat broadcaster
// go b.startHeartbeatBroadcaster()
// System notification broadcaster
go b.startSystemNotificationBroadcaster()
// Data stream broadcaster
// go b.startDataStreamBroadcaster()
}
// startHeartbeatBroadcaster memulai broadcaster heartbeat
func (b *Broadcaster) startHeartbeatBroadcaster() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
b.hub.mu.RLock()
connectedClients := len(b.hub.clients)
uniqueIPs := len(b.hub.clientsByIP)
staticClients := len(b.hub.clientsByStatic)
b.hub.mu.RUnlock()
b.BroadcastMessage("server_heartbeat", map[string]interface{}{
"message": "Server heartbeat",
"connected_clients": connectedClients,
"unique_ips": uniqueIPs,
"static_clients": staticClients,
"timestamp": time.Now().Unix(),
"server_id": b.hub.config.ServerID,
})
case <-b.hub.ctx.Done():
return
}
}
}
// startSystemNotificationBroadcaster memulai broadcaster notifikasi sistem
func (b *Broadcaster) startSystemNotificationBroadcaster() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
dbHealth := b.hub.dbService.Health()
b.BroadcastMessage("system_status", map[string]interface{}{
"type": "system_notification",
"database": dbHealth,
"timestamp": time.Now().Unix(),
"uptime": time.Since(b.hub.startTime).String(),
})
case <-b.hub.ctx.Done():
return
}
}
}
// startDataStreamBroadcaster memulai broadcaster aliran data
func (b *Broadcaster) startDataStreamBroadcaster() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
counter := 0
for {
select {
case <-ticker.C:
counter++
b.BroadcastMessage("data_stream", map[string]interface{}{
"id": counter,
"value": counter * 10,
"timestamp": time.Now().Unix(),
"type": "real_time_data",
})
case <-b.hub.ctx.Done():
return
}
}
}
// BroadcastMessage mengirim pesan ke semua klien
func (b *Broadcaster) BroadcastMessage(messageType string, data interface{}) {
msg := NewWebSocketMessage(MessageType(messageType), data, "", "")
select {
case b.hub.messageQueue <- msg:
default:
// Antrian penuh, abaikan pesan
}
}
// BroadcastToRoom mengirim pesan ke ruangan tertentu
func (b *Broadcaster) BroadcastToRoom(room string, messageType string, data interface{}) {
msg := NewWebSocketMessage(
MessageType(messageType),
map[string]interface{}{
"room": room,
"data": data,
},
"",
room,
)
select {
case b.hub.messageQueue <- msg:
default:
// Antrian penuh, abaikan pesan
}
}
// SendToClient mengirim pesan ke klien tertentu
func (b *Broadcaster) SendToClient(clientID string, messageType string, data interface{}) {
msg := NewWebSocketMessage(MessageType(messageType), data, clientID, "")
select {
case b.hub.messageQueue <- msg:
default:
// Antrian penuh, abaikan pesan
}
}
// SendToClientByStaticID mengirim pesan ke klien berdasarkan ID statis
func (b *Broadcaster) SendToClientByStaticID(staticID string, messageType string, data interface{}) bool {
b.hub.mu.RLock()
client, exists := b.hub.clientsByStatic[staticID]
b.hub.mu.RUnlock()
if !exists {
return false
}
b.SendToClient(client.ID, messageType, data)
return true
}
// BroadcastToIP mengirim pesan ke semua klien dari IP tertentu
func (b *Broadcaster) BroadcastToIP(ipAddress string, messageType string, data interface{}) int {
b.hub.mu.RLock()
clients := b.hub.clientsByIP[ipAddress]
b.hub.mu.RUnlock()
count := 0
for _, client := range clients {
b.SendToClient(client.ID, messageType, data)
count++
}
return count
}
// BroadcastClientToClient mengirim pesan dari satu klien ke klien lain
func (b *Broadcaster) BroadcastClientToClient(fromClientID, toClientID string, messageType string, data interface{}) {
message := NewWebSocketMessage(MessageType(messageType), map[string]interface{}{
"from_client_id": fromClientID,
"data": data,
}, toClientID, "")
b.hub.broadcast <- message
}
// BroadcastServerToClient mengirim pesan dari server ke klien tertentu
func (b *Broadcaster) BroadcastServerToClient(clientID string, messageType string, data interface{}) {
message := NewWebSocketMessage(MessageType(messageType), data, clientID, "")
b.hub.broadcast <- message
}
// BroadcastServerToRoom mengirim pesan dari server ke ruangan tertentu
func (b *Broadcaster) BroadcastServerToRoom(room string, messageType string, data interface{}) {
message := NewWebSocketMessage(MessageType(messageType), data, "", room)
b.hub.broadcast <- message
}
// BroadcastServerToAll mengirim pesan dari server ke semua klien
func (b *Broadcaster) BroadcastServerToAll(messageType string, data interface{}) {
message := NewWebSocketMessage(MessageType(messageType), data, "", "")
b.hub.broadcast <- message
}
// CustomBroadcastClientToClient sends a message from one client to another.
// It intelligently adds the 'from_client_id' to the payload to avoid unnecessary nesting.
// If the payload is a map, the ID is added directly. Otherwise, the payload is wrapped.
func (b *Broadcaster) CustomBroadcastClientToClient(from, to, msgType string, data interface{}) {
// Create the message with the data directly.
msg := NewWebSocketMessage(MessageType(msgType), data, to, "")
// Ensure from_client_id is available at the same level as the payload.
// If the payload is a map[string]interface{}, add the key directly.
// If not, wrap it once as "data" to avoid losing information.
if m, ok := msg.Data.(map[string]interface{}); ok {
m["from_client_id"] = from
msg.Data = m
} else {
msg.Data = map[string]interface{}{
"data": msg.Data,
"from_client_id": from,
}
}
// Send the message to the hub for delivery.
b.hub.broadcast <- msg
}