255 lines
7.3 KiB
Go
255 lines
7.3 KiB
Go
package websocket
|
|
|
|
import (
|
|
"api-service/pkg/logger"
|
|
"time"
|
|
)
|
|
|
|
// Broadcaster mengelola broadcasting pesan
|
|
type Broadcaster struct {
|
|
hub *Hub
|
|
}
|
|
|
|
// NewBroadcaster membuat broadcaster baru
|
|
func NewBroadcaster(hub *Hub) *Broadcaster {
|
|
return &Broadcaster{hub: hub}
|
|
}
|
|
|
|
// StartServerBroadcasters memulai broadcaster server
|
|
func (b *Broadcaster) StartServerBroadcasters() {
|
|
// Heartbeat broadcaster
|
|
go b.startHeartbeatBroadcaster()
|
|
|
|
// System notification broadcaster
|
|
go b.startSystemNotificationBroadcaster()
|
|
|
|
// Data stream broadcaster
|
|
go b.startDataStreamBroadcaster()
|
|
}
|
|
|
|
// startHeartbeatBroadcaster memulai broadcaster heartbeat
|
|
func (b *Broadcaster) startHeartbeatBroadcaster() {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
b.hub.mu.RLock()
|
|
connectedClients := len(b.hub.clients)
|
|
uniqueIPs := len(b.hub.clientsByIP)
|
|
staticClients := len(b.hub.clientsByStatic)
|
|
b.hub.mu.RUnlock()
|
|
|
|
b.BroadcastMessage("server_heartbeat", map[string]interface{}{
|
|
"message": "Server heartbeat",
|
|
"connected_clients": connectedClients,
|
|
"unique_ips": uniqueIPs,
|
|
"static_clients": staticClients,
|
|
"timestamp": time.Now().Unix(),
|
|
"server_id": b.hub.config.ServerID,
|
|
})
|
|
case <-b.hub.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// startSystemNotificationBroadcaster memulai broadcaster notifikasi sistem
|
|
func (b *Broadcaster) startSystemNotificationBroadcaster() {
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
dbHealth := b.hub.dbService.Health()
|
|
b.BroadcastMessage("system_status", map[string]interface{}{
|
|
"type": "system_notification",
|
|
"database": dbHealth,
|
|
"timestamp": time.Now().Unix(),
|
|
"uptime": time.Since(b.hub.startTime).String(),
|
|
})
|
|
case <-b.hub.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// startDataStreamBroadcaster memulai broadcaster aliran data
|
|
func (b *Broadcaster) startDataStreamBroadcaster() {
|
|
ticker := time.NewTicker(10 * time.Second)
|
|
defer ticker.Stop()
|
|
counter := 0
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
counter++
|
|
b.BroadcastMessage("data_stream", map[string]interface{}{
|
|
"id": counter,
|
|
"value": counter * 10,
|
|
"timestamp": time.Now().Unix(),
|
|
"type": "real_time_data",
|
|
})
|
|
case <-b.hub.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// BroadcastMessage mengirim pesan ke semua klien
|
|
func (b *Broadcaster) BroadcastMessage(messageType string, data interface{}) {
|
|
msg := NewWebSocketMessage(MessageType(messageType), data, "", "")
|
|
|
|
select {
|
|
case b.hub.messageQueue <- msg:
|
|
default:
|
|
// Antrian penuh, abaikan pesan
|
|
}
|
|
}
|
|
|
|
// BroadcastQris godoc
|
|
// @Summary Broadcast a QRIS-related WebSocket message
|
|
// @Description Creates and broadcasts a WebSocket message with the specified type and data for QRIS operations
|
|
// @Tags WebSocket QRIS
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param messageType path string true "Type of the QRIS message to broadcast"
|
|
// @Param data body interface{} true "QRIS data payload for the message"
|
|
// @Success 200 {object} map[string]string "QRIS message successfully queued for broadcast"
|
|
// @Failure 500 {object} map[string]string "Failed to queue QRIS message (queue full)"
|
|
// @Router /ws/broadcast/qris [post]
|
|
func (b *Broadcaster) BroadcastQris(messageType string, data interface{}) {
|
|
msg := NewWebSocketMessage(MessageType(messageType), data, "", "")
|
|
|
|
select {
|
|
case b.hub.messageQueue <- msg:
|
|
default:
|
|
// Antrian penuh, abaikan pesan
|
|
logger.Error("Message queue full, dropping message")
|
|
}
|
|
|
|
// Show posdevice if present
|
|
// if m, ok := data.(map[string]interface{}); ok {
|
|
// fmt.Println("BroadcastQris called with IP display: ", m["posdevice"])
|
|
// }
|
|
|
|
//tabel m_deviceqris
|
|
//kolom posdevice dari nama lokasi jadi ip display
|
|
//kolom ip dari ip simrs
|
|
}
|
|
|
|
// BroadcastCheck godoc
|
|
// @Summary Broadcast a WebSocket message
|
|
// @Description Creates and broadcasts a WebSocket message with the specified type and data
|
|
// @Tags WebSocket QRIS
|
|
// @Accept json
|
|
// @Produce json
|
|
// @Param messageType path string true "Type of the message to broadcast"
|
|
// @Param data body interface{} true "Data payload for the message"
|
|
// @Success 200 {object} map[string]string "Message successfully queued for broadcast"
|
|
// @Failure 500 {object} map[string]string "Failed to queue message (queue full)"
|
|
// @Router /ws/broadcast/check [post]
|
|
func (b *Broadcaster) BroadcastCheck(messageType string, data interface{}) {
|
|
msg := NewWebSocketMessage(MessageType(messageType), data, "", "")
|
|
|
|
select {
|
|
case b.hub.messageQueue <- msg:
|
|
default:
|
|
// Antrian penuh, abaikan pesan
|
|
logger.Error("Message queue full, dropping message")
|
|
}
|
|
|
|
// Show posdevice if present
|
|
// if m, ok := data.(map[string]interface{}); ok {
|
|
// fmt.Println("BroadcastCheck called with IP display: ", m["posdevice"])
|
|
// }
|
|
}
|
|
|
|
// BroadcastToRoom mengirim pesan ke ruangan tertentu
|
|
func (b *Broadcaster) BroadcastToRoom(room string, messageType string, data interface{}) {
|
|
msg := NewWebSocketMessage(
|
|
MessageType(messageType),
|
|
map[string]interface{}{
|
|
"room": room,
|
|
"data": data,
|
|
},
|
|
"",
|
|
room,
|
|
)
|
|
|
|
select {
|
|
case b.hub.messageQueue <- msg:
|
|
default:
|
|
// Antrian penuh, abaikan pesan
|
|
}
|
|
}
|
|
|
|
// SendToClient mengirim pesan ke klien tertentu
|
|
func (b *Broadcaster) SendToClient(clientID string, messageType string, data interface{}) {
|
|
msg := NewWebSocketMessage(MessageType(messageType), data, clientID, "")
|
|
|
|
select {
|
|
case b.hub.messageQueue <- msg:
|
|
default:
|
|
// Antrian penuh, abaikan pesan
|
|
}
|
|
}
|
|
|
|
// SendToClientByStaticID mengirim pesan ke klien berdasarkan ID statis
|
|
func (b *Broadcaster) SendToClientByStaticID(staticID string, messageType string, data interface{}) bool {
|
|
b.hub.mu.RLock()
|
|
client, exists := b.hub.clientsByStatic[staticID]
|
|
b.hub.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
b.SendToClient(client.ID, messageType, data)
|
|
return true
|
|
}
|
|
|
|
// BroadcastToIP mengirim pesan ke semua klien dari IP tertentu
|
|
func (b *Broadcaster) BroadcastToIP(ipAddress string, messageType string, data interface{}) int {
|
|
b.hub.mu.RLock()
|
|
clients := b.hub.clientsByIP[ipAddress]
|
|
b.hub.mu.RUnlock()
|
|
|
|
count := 0
|
|
for _, client := range clients {
|
|
b.SendToClient(client.ID, messageType, data)
|
|
count++
|
|
}
|
|
|
|
return count
|
|
}
|
|
|
|
// BroadcastClientToClient mengirim pesan dari satu klien ke klien lain
|
|
func (b *Broadcaster) BroadcastClientToClient(fromClientID, toClientID string, messageType string, data interface{}) {
|
|
message := NewWebSocketMessage(MessageType(messageType), map[string]interface{}{
|
|
"from_client_id": fromClientID,
|
|
"data": data,
|
|
}, toClientID, "")
|
|
b.hub.broadcast <- message
|
|
}
|
|
|
|
// BroadcastServerToClient mengirim pesan dari server ke klien tertentu
|
|
func (b *Broadcaster) BroadcastServerToClient(clientID string, messageType string, data interface{}) {
|
|
message := NewWebSocketMessage(MessageType(messageType), data, clientID, "")
|
|
b.hub.broadcast <- message
|
|
}
|
|
|
|
// BroadcastServerToRoom mengirim pesan dari server ke ruangan tertentu
|
|
func (b *Broadcaster) BroadcastServerToRoom(room string, messageType string, data interface{}) {
|
|
message := NewWebSocketMessage(MessageType(messageType), data, "", room)
|
|
b.hub.broadcast <- message
|
|
}
|
|
|
|
// BroadcastServerToAll mengirim pesan dari server ke semua klien
|
|
func (b *Broadcaster) BroadcastServerToAll(messageType string, data interface{}) {
|
|
message := NewWebSocketMessage(MessageType(messageType), data, "", "")
|
|
b.hub.broadcast <- message
|
|
}
|