first commit
This commit is contained in:
111
internal/handlers/websocket/broadcast.go
Normal file
111
internal/handlers/websocket/broadcast.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package websocket
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// WebSocketBroadcaster defines the interface for broadcasting messages
|
||||
type WebSocketBroadcaster interface {
|
||||
BroadcastMessage(messageType string, data interface{})
|
||||
}
|
||||
|
||||
// Broadcaster handles server-initiated broadcasts to WebSocket clients
|
||||
type Broadcaster struct {
|
||||
handler WebSocketBroadcaster
|
||||
tickers []*time.Ticker
|
||||
quit chan struct{}
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewBroadcaster creates a new Broadcaster instance
|
||||
func NewBroadcaster(handler WebSocketBroadcaster) *Broadcaster {
|
||||
return &Broadcaster{
|
||||
handler: handler,
|
||||
tickers: make([]*time.Ticker, 0),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// StartHeartbeat starts sending periodic heartbeat messages to all clients
|
||||
func (b *Broadcaster) StartHeartbeat(interval time.Duration) {
|
||||
ticker := time.NewTicker(interval)
|
||||
b.tickers = append(b.tickers, ticker)
|
||||
go func() {
|
||||
defer func() {
|
||||
// Remove ticker from slice when done
|
||||
for i, t := range b.tickers {
|
||||
if t == ticker {
|
||||
b.tickers = append(b.tickers[:i], b.tickers[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
b.handler.BroadcastMessage("heartbeat", map[string]interface{}{
|
||||
"message": "Server heartbeat",
|
||||
"timestamp": time.Now().Format(time.RFC3339),
|
||||
})
|
||||
case <-b.quit:
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop stops the broadcaster
|
||||
func (b *Broadcaster) Stop() {
|
||||
close(b.quit)
|
||||
for _, ticker := range b.tickers {
|
||||
if ticker != nil {
|
||||
ticker.Stop()
|
||||
}
|
||||
}
|
||||
b.tickers = nil
|
||||
}
|
||||
|
||||
// BroadcastNotification sends a notification message to all clients
|
||||
func (b *Broadcaster) BroadcastNotification(title, message, level string) {
|
||||
b.handler.BroadcastMessage("notification", map[string]interface{}{
|
||||
"title": title,
|
||||
"message": message,
|
||||
"level": level,
|
||||
"time": time.Now().Format(time.RFC3339),
|
||||
})
|
||||
}
|
||||
|
||||
// SimulateDataStream simulates streaming data to clients (useful for demos)
|
||||
func (b *Broadcaster) SimulateDataStream() {
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
b.tickers = append(b.tickers, ticker)
|
||||
go func() {
|
||||
defer func() {
|
||||
// Remove ticker from slice when done
|
||||
for i, t := range b.tickers {
|
||||
if t == ticker {
|
||||
b.tickers = append(b.tickers[:i], b.tickers[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
counter := 0
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
counter++
|
||||
b.handler.BroadcastMessage("data_stream", map[string]interface{}{
|
||||
"id": counter,
|
||||
"value": counter * 10,
|
||||
"timestamp": time.Now().Format(time.RFC3339),
|
||||
"type": "simulated_data",
|
||||
})
|
||||
case <-b.quit:
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
Reference in New Issue
Block a user