package websocket import ( "sync" "time" ) // WebSocketBroadcaster defines the interface for broadcasting messages type WebSocketBroadcaster interface { BroadcastMessage(messageType string, data interface{}) } // Broadcaster handles server-initiated broadcasts to WebSocket clients type Broadcaster struct { handler WebSocketBroadcaster tickers []*time.Ticker quit chan struct{} mu sync.Mutex } // NewBroadcaster creates a new Broadcaster instance func NewBroadcaster(handler WebSocketBroadcaster) *Broadcaster { return &Broadcaster{ handler: handler, tickers: make([]*time.Ticker, 0), quit: make(chan struct{}), } } // StartHeartbeat starts sending periodic heartbeat messages to all clients func (b *Broadcaster) StartHeartbeat(interval time.Duration) { ticker := time.NewTicker(interval) b.tickers = append(b.tickers, ticker) go func() { defer func() { // Remove ticker from slice when done for i, t := range b.tickers { if t == ticker { b.tickers = append(b.tickers[:i], b.tickers[i+1:]...) break } } }() for { select { case <-ticker.C: b.handler.BroadcastMessage("heartbeat", map[string]interface{}{ "message": "Server heartbeat", "timestamp": time.Now().Format(time.RFC3339), }) case <-b.quit: ticker.Stop() return } } }() } // Stop stops the broadcaster func (b *Broadcaster) Stop() { close(b.quit) for _, ticker := range b.tickers { if ticker != nil { ticker.Stop() } } b.tickers = nil } // BroadcastNotification sends a notification message to all clients func (b *Broadcaster) BroadcastNotification(title, message, level string) { b.handler.BroadcastMessage("notification", map[string]interface{}{ "title": title, "message": message, "level": level, "time": time.Now().Format(time.RFC3339), }) } // SimulateDataStream simulates streaming data to clients (useful for demos) func (b *Broadcaster) SimulateDataStream() { ticker := time.NewTicker(100 * time.Millisecond) b.tickers = append(b.tickers, ticker) go func() { defer func() { // Remove ticker from slice when done for i, t := range b.tickers { if t == ticker { b.tickers = append(b.tickers[:i], b.tickers[i+1:]...) break } } }() counter := 0 for { select { case <-ticker.C: counter++ b.handler.BroadcastMessage("data_stream", map[string]interface{}{ "id": counter, "value": counter * 10, "timestamp": time.Now().Format(time.RFC3339), "type": "simulated_data", }) case <-b.quit: ticker.Stop() return } } }() }