websocket
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
"api-service/internal/config"
|
||||
|
||||
_ "github.com/jackc/pgx/v5" // Import pgx driver
|
||||
"github.com/lib/pq"
|
||||
_ "gorm.io/driver/postgres" // Import GORM PostgreSQL driver
|
||||
|
||||
_ "github.com/go-sql-driver/mysql" // MySQL driver for database/sql
|
||||
@@ -44,16 +45,22 @@ type Service interface {
|
||||
Close() error
|
||||
ListDBs() []string
|
||||
GetDBType(name string) (DatabaseType, error)
|
||||
// Tambahkan method untuk WebSocket notifications
|
||||
ListenForChanges(ctx context.Context, dbName string, channels []string, callback func(string, string)) error
|
||||
NotifyChange(dbName, channel, payload string) error
|
||||
GetPrimaryDB(name string) (*sql.DB, error) // Helper untuk get primary DB
|
||||
}
|
||||
|
||||
type service struct {
|
||||
sqlDatabases map[string]*sql.DB
|
||||
mongoClients map[string]*mongo.Client
|
||||
readReplicas map[string][]*sql.DB // Read replicas for load balancing
|
||||
configs map[string]config.DatabaseConfig
|
||||
readConfigs map[string][]config.DatabaseConfig
|
||||
mu sync.RWMutex
|
||||
readBalancer map[string]int // Round-robin counter for read replicas
|
||||
sqlDatabases map[string]*sql.DB
|
||||
mongoClients map[string]*mongo.Client
|
||||
readReplicas map[string][]*sql.DB // Read replicas for load balancing
|
||||
configs map[string]config.DatabaseConfig
|
||||
readConfigs map[string][]config.DatabaseConfig
|
||||
mu sync.RWMutex
|
||||
readBalancer map[string]int // Round-robin counter for read replicas
|
||||
listeners map[string]*pq.Listener // Tambahkan untuk tracking listeners
|
||||
listenersMu sync.RWMutex
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -71,6 +78,7 @@ func New(cfg *config.Config) Service {
|
||||
configs: make(map[string]config.DatabaseConfig),
|
||||
readConfigs: make(map[string][]config.DatabaseConfig),
|
||||
readBalancer: make(map[string]int),
|
||||
listeners: make(map[string]*pq.Listener),
|
||||
}
|
||||
|
||||
log.Println("Initializing database service...") // Log when the initialization starts
|
||||
@@ -372,7 +380,7 @@ func (s *service) Health() map[string]map[string]string {
|
||||
if db == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -441,7 +449,7 @@ func (s *service) GetDB(name string) (*sql.DB, error) {
|
||||
return nil, fmt.Errorf("database %s not found", name)
|
||||
}
|
||||
|
||||
log.Printf("Current connection pool state for %s: Open: %d, In Use: %d, Idle: %d",
|
||||
log.Printf("Current connection pool state for %s: Open: %d, In Use: %d, Idle: %d",
|
||||
name, db.Stats().OpenConnections, db.Stats().InUse, db.Stats().Idle)
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
@@ -469,7 +477,7 @@ func (s *service) GetReadDB(name string) (*sql.DB, error) {
|
||||
// Round-robin load balancing
|
||||
s.readBalancer[name] = (s.readBalancer[name] + 1) % len(replicas)
|
||||
selected := replicas[s.readBalancer[name]]
|
||||
|
||||
|
||||
if selected == nil {
|
||||
// Fallback to primary if replica is nil
|
||||
return s.GetDB(name)
|
||||
@@ -569,3 +577,123 @@ func (s *service) Close() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPrimaryDB returns primary database connection
|
||||
func (s *service) GetPrimaryDB(name string) (*sql.DB, error) {
|
||||
return s.GetDB(name)
|
||||
}
|
||||
|
||||
// ListenForChanges implements PostgreSQL LISTEN/NOTIFY for real-time updates
|
||||
func (s *service) ListenForChanges(ctx context.Context, dbName string, channels []string, callback func(string, string)) error {
|
||||
s.mu.RLock()
|
||||
config, exists := s.configs[dbName]
|
||||
s.mu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
return fmt.Errorf("database %s not found", dbName)
|
||||
}
|
||||
|
||||
// Only support PostgreSQL for LISTEN/NOTIFY
|
||||
if DatabaseType(config.Type) != Postgres {
|
||||
return fmt.Errorf("LISTEN/NOTIFY only supported for PostgreSQL databases")
|
||||
}
|
||||
|
||||
// Create connection string for listener
|
||||
connStr := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=%s",
|
||||
config.Username,
|
||||
config.Password,
|
||||
config.Host,
|
||||
config.Port,
|
||||
config.Database,
|
||||
config.SSLMode,
|
||||
)
|
||||
|
||||
// Create listener
|
||||
listener := pq.NewListener(
|
||||
connStr,
|
||||
10*time.Second,
|
||||
time.Minute,
|
||||
func(ev pq.ListenerEventType, err error) {
|
||||
if err != nil {
|
||||
log.Printf("Database listener (%s) error: %v", dbName, err)
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
// Store listener for cleanup
|
||||
s.listenersMu.Lock()
|
||||
s.listeners[dbName] = listener
|
||||
s.listenersMu.Unlock()
|
||||
|
||||
// Listen to specified channels
|
||||
for _, channel := range channels {
|
||||
err := listener.Listen(channel)
|
||||
if err != nil {
|
||||
listener.Close()
|
||||
return fmt.Errorf("failed to listen to channel %s: %w", channel, err)
|
||||
}
|
||||
log.Printf("Listening to database channel: %s on %s", channel, dbName)
|
||||
}
|
||||
|
||||
// Start listening loop
|
||||
go func() {
|
||||
defer func() {
|
||||
listener.Close()
|
||||
s.listenersMu.Lock()
|
||||
delete(s.listeners, dbName)
|
||||
s.listenersMu.Unlock()
|
||||
log.Printf("Database listener for %s stopped", dbName)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case n := <-listener.Notify:
|
||||
if n != nil {
|
||||
callback(n.Channel, n.Extra)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(90 * time.Second):
|
||||
// Send ping to keep connection alive
|
||||
go func() {
|
||||
if err := listener.Ping(); err != nil {
|
||||
log.Printf("Listener ping failed for %s: %v", dbName, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NotifyChange sends a notification to a PostgreSQL channel
|
||||
func (s *service) NotifyChange(dbName, channel, payload string) error {
|
||||
db, err := s.GetDB(dbName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get database %s: %w", dbName, err)
|
||||
}
|
||||
|
||||
// Check if it's PostgreSQL
|
||||
s.mu.RLock()
|
||||
config, exists := s.configs[dbName]
|
||||
s.mu.RUnlock()
|
||||
|
||||
if !exists {
|
||||
return fmt.Errorf("database %s configuration not found", dbName)
|
||||
}
|
||||
|
||||
if DatabaseType(config.Type) != Postgres {
|
||||
return fmt.Errorf("NOTIFY only supported for PostgreSQL databases")
|
||||
}
|
||||
|
||||
// Execute NOTIFY
|
||||
query := "SELECT pg_notify($1, $2)"
|
||||
_, err = db.Exec(query, channel, payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to send notification: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("Sent notification to channel %s on %s: %s", channel, dbName, payload)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user