Files
websocket-qris/internal/services/websocket/hub.go
2025-10-24 12:33:10 +00:00

445 lines
11 KiB
Go

package websocket
import (
"api-service/internal/config"
"api-service/internal/database"
"context"
"database/sql"
"encoding/json"
"fmt"
"sync"
"time"
)
// Hub mengelola semua klien WebSocket
type Hub struct {
// Konfigurasi
config config.WebSocketConfig
// Klien
clients map[*Client]bool
clientsByID map[string]*Client
clientsByIP map[string][]*Client
clientsByStatic map[string]*Client
// Ruangan
rooms map[string]map[*Client]bool
// Channel komunikasi
broadcast chan WebSocketMessage
register chan *Client
unregister chan *Client
messageQueue chan WebSocketMessage
// Sinkronisasi
mu sync.RWMutex
// Context
ctx context.Context
cancel context.CancelFunc
// Layanan eksternal
dbService database.Service
// Monitoring
startTime time.Time
messageCount int64
errorCount int64
activityLog []ActivityLog
activityMu sync.RWMutex
// Registry handler pesan
messageRegistry *MessageRegistry
broadcaster *Broadcaster
}
// ActivityLog menyimpan log aktivitas
type ActivityLog struct {
Timestamp time.Time `json:"timestamp"`
Event string `json:"event"`
ClientID string `json:"client_id"`
Details string `json:"details"`
}
// DatabaseService mendefinisikan interface untuk layanan database
type DatabaseService interface {
Health() map[string]interface{}
ListDBs() []string
ListenForChanges(ctx context.Context, dbName string, channels []string, callback func(channel, payload string)) error
NotifyChange(dbName, channel, payload string) error
GetDB(name string) (*sql.DB, error)
GetPrimaryDB(name string) (*sql.DB, error)
}
// Global database service instance
var (
dbService database.Service
once sync.Once
)
// Initialize the database connection
func init() {
once.Do(func() {
dbService = database.New(config.LoadConfig())
if dbService == nil {
panic("Failed to initialize database connection")
}
})
}
// NewHub membuat hub baru dengan konfigurasi yang diberikan
func NewHub(config config.WebSocketConfig) *Hub {
ctx, cancel := context.WithCancel(context.Background())
hub := &Hub{
config: config,
clients: make(map[*Client]bool),
clientsByID: make(map[string]*Client),
clientsByIP: make(map[string][]*Client),
clientsByStatic: make(map[string]*Client),
rooms: make(map[string]map[*Client]bool),
broadcast: make(chan WebSocketMessage, 1000),
register: make(chan *Client),
unregister: make(chan *Client),
messageQueue: make(chan WebSocketMessage, config.MessageQueueSize),
ctx: ctx,
cancel: cancel,
dbService: dbService,
startTime: time.Now(),
activityLog: make([]ActivityLog, 0, config.ActivityLogSize),
messageRegistry: NewMessageRegistry(),
}
// Setup default message handlers
// SetupDefaultHandlers(hub.messageRegistry)
// Setup database change listeners
hub.setupDatabaseListeners()
return hub
}
// Run menjalankan loop utama hub
func (h *Hub) Run() {
// Start queue workers
for i := 0; i < h.config.QueueWorkers; i++ {
go h.queueWorker(i)
}
for {
select {
case client := <-h.register:
h.registerClient(client)
case client := <-h.unregister:
h.unregisterClient(client)
case message := <-h.broadcast:
h.messageCount++
h.broadcastToClients(message)
case <-h.ctx.Done():
return
}
}
}
// queueWorker memproses pesan dari antrian
func (h *Hub) queueWorker(workerID int) {
for {
select {
case message := <-h.messageQueue:
h.broadcast <- message
case <-h.ctx.Done():
return
}
}
}
// registerClient mendaftarkan klien baru
func (h *Hub) registerClient(client *Client) {
h.mu.Lock()
defer h.mu.Unlock()
// Register di peta klien utama
h.clients[client] = true
// Register berdasarkan ID
h.clientsByID[client.ID] = client
// Register berdasarkan ID statis
if client.StaticID != "" {
h.clientsByStatic[client.StaticID] = client
}
// Register berdasarkan IP
if h.clientsByIP[client.IPAddress] == nil {
h.clientsByIP[client.IPAddress] = make([]*Client, 0)
}
h.clientsByIP[client.IPAddress] = append(h.clientsByIP[client.IPAddress], client)
// Register di ruangan
if client.Room != "" {
if h.rooms[client.Room] == nil {
h.rooms[client.Room] = make(map[*Client]bool)
}
h.rooms[client.Room][client] = true
}
// Log aktivitas
h.logActivity("client_connected", client.ID,
fmt.Sprintf("IP: %s, Static: %s, Room: %s", client.IPAddress, client.StaticID, client.Room))
}
// unregisterClient menghapus klien
func (h *Hub) unregisterClient(client *Client) {
h.mu.Lock()
defer h.mu.Unlock()
if _, ok := h.clients[client]; ok {
// Hapus dari peta klien utama
delete(h.clients, client)
close(client.Send)
// Hapus dari peta berdasarkan ID
delete(h.clientsByID, client.ID)
// Hapus dari peta berdasarkan ID statis
if client.StaticID != "" {
delete(h.clientsByStatic, client.StaticID)
}
// Hapus dari peta berdasarkan IP
if ipClients, exists := h.clientsByIP[client.IPAddress]; exists {
for i, c := range ipClients {
if c == client {
h.clientsByIP[client.IPAddress] = append(ipClients[:i], ipClients[i+1:]...)
break
}
}
// Jika tidak ada lagi klien dari IP ini, hapus entri IP
if len(h.clientsByIP[client.IPAddress]) == 0 {
delete(h.clientsByIP, client.IPAddress)
}
}
// Hapus dari ruangan
if client.Room != "" {
if room, exists := h.rooms[client.Room]; exists {
delete(room, client)
if len(room) == 0 {
delete(h.rooms, client.Room)
}
}
}
}
// Log aktivitas
h.logActivity("client_disconnected", client.ID,
fmt.Sprintf("IP: %s, Duration: %v", client.IPAddress, time.Since(client.connectedAt)))
client.cancel()
}
// broadcastToClients mengirim pesan ke klien yang sesuai
func (h *Hub) broadcastToClients(message WebSocketMessage) {
h.mu.RLock()
defer h.mu.RUnlock()
if message.ClientID != "" {
// Kirim ke klien tertentu
if client, exists := h.clientsByID[message.ClientID]; exists {
select {
case client.Send <- message:
default:
go func() {
h.unregister <- client
}()
}
}
return
}
// Periksa apakah ini pesan ruangan
if message.Room != "" {
if room, roomExists := h.rooms[message.Room]; roomExists {
for client := range room {
select {
case client.Send <- message:
default:
go func(c *Client) {
h.unregister <- c
}(client)
}
}
}
return
}
// Broadcast ke semua klien
for client := range h.clients {
select {
case client.Send <- message:
default:
go func(c *Client) {
h.unregister <- c
}(client)
}
}
}
// logActivity mencatat aktivitas
func (h *Hub) logActivity(event, clientID, details string) {
h.activityMu.Lock()
defer h.activityMu.Unlock()
activity := ActivityLog{
Timestamp: time.Now(),
Event: event,
ClientID: clientID,
Details: details,
}
h.activityLog = append(h.activityLog, activity)
// Pertahankan hanya aktivitas terakhir sesuai konfigurasi
if len(h.activityLog) > h.config.ActivityLogSize {
h.activityLog = h.activityLog[1:]
}
}
// GetConfig mengembalikan konfigurasi hub
func (h *Hub) GetConfig() config.WebSocketConfig {
return h.config
}
// GetMessageRegistry mengembalikan registry pesan
func (h *Hub) GetMessageRegistry() *MessageRegistry {
return h.messageRegistry
}
// RegisterChannel mengembalikan channel register untuk handler
func (h *Hub) RegisterChannel() chan *Client {
return h.register
}
// GetStats mengembalikan statistik hub
func (h *Hub) GetStats() map[string]interface{} {
h.mu.RLock()
defer h.mu.RUnlock()
return map[string]interface{}{
"connected_clients": len(h.clients),
"unique_ips": len(h.clientsByIP),
"static_clients": len(h.clientsByStatic),
"rooms": len(h.rooms),
"message_count": h.messageCount,
"error_count": h.errorCount,
"uptime": time.Since(h.startTime).String(),
"server_id": h.config.ServerID,
"timestamp": time.Now().Unix(),
}
}
// setupDatabaseListeners sets up database change listeners for real-time updates
func (h *Hub) setupDatabaseListeners() {
// Listen for changes on retribusi table
channels := []string{"retribusi_changes", "data_changes"}
err := h.dbService.ListenForChanges(h.ctx, "postgres_satudata", channels, func(channel, payload string) {
h.handleDatabaseChange(channel, payload)
})
if err != nil {
h.logActivity("database_listener_error", "", fmt.Sprintf("Failed to setup listeners: %v", err))
} else {
h.logActivity("database_listener_started", "", "Database change listeners initialized")
}
}
// handleDatabaseChange processes database change notifications
func (h *Hub) handleDatabaseChange(channel, payload string) {
h.logActivity("database_change", "", fmt.Sprintf("Channel: %s, Payload: %s", channel, payload))
// Parse the payload to determine what changed
var changeData map[string]interface{}
if err := json.Unmarshal([]byte(payload), &changeData); err != nil {
h.logActivity("database_change_parse_error", "", fmt.Sprintf("Failed to parse payload: %v", err))
return
}
// Create WebSocket message for broadcasting
message := WebSocketMessage{
Type: DatabaseChangeMessage,
Data: changeData,
ClientID: "",
Room: "data_updates", // Broadcast to data_updates room
}
// Broadcast the change to all connected clients in the data_updates room
h.broadcast <- message
}
// GetDatabaseConnection returns a database connection for the specified database
func (h *Hub) GetDatabaseConnection(dbName string) (*sql.DB, error) {
return h.dbService.GetDB(dbName)
}
// ExecuteDatabaseQuery executes a database query and returns results
func (h *Hub) ExecuteDatabaseQuery(dbName, query string, args ...interface{}) ([]map[string]interface{}, error) {
db, err := h.GetDatabaseConnection(dbName)
if err != nil {
return nil, fmt.Errorf("failed to get database connection: %w", err)
}
ctx, cancel := context.WithTimeout(h.ctx, 30*time.Second)
defer cancel()
rows, err := db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to execute query: %w", err)
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
return nil, fmt.Errorf("failed to get columns: %w", err)
}
var results []map[string]interface{}
for rows.Next() {
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range values {
valuePtrs[i] = &values[i]
}
if err := rows.Scan(valuePtrs...); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
row := make(map[string]interface{})
for i, col := range columns {
val := values[i]
if b, ok := val.([]byte); ok {
val = string(b)
}
row[col] = val
}
results = append(results, row)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration error: %w", err)
}
return results, nil
}
// NotifyDatabaseChange sends a notification to the database for real-time updates
func (h *Hub) NotifyDatabaseChange(dbName, channel, payload string) error {
return h.dbService.NotifyChange(dbName, channel, payload)
}