499 lines
13 KiB
Go
499 lines
13 KiB
Go
package database
|
|
|
|
import (
|
|
"antrian-operasi/internal/config"
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/lib/pq"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
)
|
|
|
|
func (s *service) configureSQLDB(name string, db *sql.DB, config config.DatabaseConfig) error {
|
|
// Set connection pool limits
|
|
db.SetMaxOpenConns(config.MaxOpenConns)
|
|
db.SetMaxIdleConns(config.MaxIdleConns)
|
|
db.SetConnMaxLifetime(config.ConnMaxLifetime)
|
|
db.SetConnMaxIdleTime(config.MaxIdleTime)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), config.Timeout)
|
|
defer cancel()
|
|
|
|
if err := db.PingContext(ctx); err != nil {
|
|
db.Close()
|
|
return fmt.Errorf("failed to ping database: %w", err)
|
|
}
|
|
|
|
s.sqlDatabases[name] = db
|
|
|
|
// PERUBAHAN: Tambahkan pembuatan sqlx.DB dari sql.DB yang sudah ada
|
|
dbType := DatabaseType(config.Type)
|
|
var driverName string
|
|
|
|
switch dbType {
|
|
case Postgres:
|
|
driverName = "pgx"
|
|
case MySQL:
|
|
driverName = "mysql"
|
|
case SQLServer:
|
|
driverName = "sqlserver"
|
|
case SQLite:
|
|
driverName = "sqlite3"
|
|
default:
|
|
return fmt.Errorf("unsupported database type for sqlx: %s", config.Type)
|
|
}
|
|
|
|
// Buat sqlx.DB dari sql.DB yang sudah ada
|
|
sqlxDB := sqlx.NewDb(db, driverName)
|
|
s.sqlxDatabases[name] = sqlxDB
|
|
|
|
log.Printf("Successfully connected to SQL database: %s", name)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Health checks the health of all database connections by pinging each database.
|
|
func (s *service) Health() map[string]map[string]string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
result := make(map[string]map[string]string)
|
|
|
|
// Check SQL databases
|
|
for name, db := range s.sqlDatabases {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
stats := make(map[string]string)
|
|
|
|
err := db.PingContext(ctx)
|
|
if err != nil {
|
|
stats["status"] = "down"
|
|
stats["error"] = fmt.Sprintf("db down: %v", err)
|
|
stats["type"] = "sql"
|
|
stats["role"] = "primary"
|
|
result[name] = stats
|
|
continue
|
|
}
|
|
|
|
stats["status"] = "up"
|
|
stats["message"] = "It's healthy"
|
|
stats["type"] = "sql"
|
|
stats["role"] = "primary"
|
|
|
|
dbStats := db.Stats()
|
|
stats["open_connections"] = strconv.Itoa(dbStats.OpenConnections)
|
|
stats["in_use"] = strconv.Itoa(dbStats.InUse)
|
|
stats["idle"] = strconv.Itoa(dbStats.Idle)
|
|
stats["wait_count"] = strconv.FormatInt(dbStats.WaitCount, 10)
|
|
stats["wait_duration"] = dbStats.WaitDuration.String()
|
|
stats["max_idle_closed"] = strconv.FormatInt(dbStats.MaxIdleClosed, 10)
|
|
stats["max_lifetime_closed"] = strconv.FormatInt(dbStats.MaxLifetimeClosed, 10)
|
|
|
|
if dbStats.OpenConnections > 40 {
|
|
stats["message"] = "The database is experiencing heavy load."
|
|
}
|
|
|
|
if dbStats.WaitCount > 1000 {
|
|
stats["message"] = "The database has a high number of wait events, indicating potential bottlenecks."
|
|
}
|
|
|
|
if dbStats.MaxIdleClosed > int64(dbStats.OpenConnections)/2 {
|
|
stats["message"] = "Many idle connections are being closed, consider revising the connection pool settings."
|
|
}
|
|
|
|
if dbStats.MaxLifetimeClosed > int64(dbStats.OpenConnections)/2 {
|
|
stats["message"] = "Many connections are being closed due to max lifetime, consider increasing max lifetime or revising the connection usage pattern."
|
|
}
|
|
|
|
result[name] = stats
|
|
}
|
|
|
|
// Check read replicas
|
|
for name, replicas := range s.readReplicas {
|
|
for i, db := range replicas {
|
|
if db == nil {
|
|
continue
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
replicaName := fmt.Sprintf("%s_replica_%d", name, i)
|
|
stats := make(map[string]string)
|
|
|
|
err := db.PingContext(ctx)
|
|
if err != nil {
|
|
stats["status"] = "down"
|
|
stats["error"] = fmt.Sprintf("read replica down: %v", err)
|
|
stats["type"] = "sql"
|
|
stats["role"] = "replica"
|
|
result[replicaName] = stats
|
|
continue
|
|
}
|
|
|
|
stats["status"] = "up"
|
|
stats["message"] = "Read replica healthy"
|
|
stats["type"] = "sql"
|
|
stats["role"] = "replica"
|
|
|
|
dbStats := db.Stats()
|
|
stats["open_connections"] = strconv.Itoa(dbStats.OpenConnections)
|
|
stats["in_use"] = strconv.Itoa(dbStats.InUse)
|
|
stats["idle"] = strconv.Itoa(dbStats.Idle)
|
|
|
|
result[replicaName] = stats
|
|
}
|
|
}
|
|
|
|
// Check MongoDB connections
|
|
for name, client := range s.mongoClients {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
stats := make(map[string]string)
|
|
|
|
err := client.Ping(ctx, nil)
|
|
if err != nil {
|
|
stats["status"] = "down"
|
|
stats["error"] = fmt.Sprintf("mongodb down: %v", err)
|
|
stats["type"] = "mongodb"
|
|
result[name] = stats
|
|
continue
|
|
}
|
|
|
|
stats["status"] = "up"
|
|
stats["message"] = "It's healthy"
|
|
stats["type"] = "mongodb"
|
|
|
|
result[name] = stats
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// GetDB returns a specific SQL database connection by name
|
|
func (s *service) GetDB(name string) (*sql.DB, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
db, exists := s.sqlDatabases[name]
|
|
if !exists {
|
|
return nil, fmt.Errorf("database %s not found", name)
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
// PERUBAHAN: Tambahkan metode GetSQLXDB
|
|
// GetSQLXDB returns a specific SQLX database connection by name
|
|
func (s *service) GetSQLXDB(name string) (*sqlx.DB, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
db, exists := s.sqlxDatabases[name]
|
|
if !exists {
|
|
return nil, fmt.Errorf("database %s not found", name)
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
// GetReadDB returns a read replica connection using round-robin load balancing
|
|
func (s *service) GetReadDB(name string) (*sql.DB, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
replicas, exists := s.readReplicas[name]
|
|
if !exists || len(replicas) == 0 {
|
|
// Fallback to primary if no replicas available
|
|
return s.GetDB(name)
|
|
}
|
|
|
|
// Round-robin load balancing
|
|
s.readBalancer[name] = (s.readBalancer[name] + 1) % len(replicas)
|
|
selected := replicas[s.readBalancer[name]]
|
|
|
|
if selected == nil {
|
|
// Fallback to primary if replica is nil
|
|
return s.GetDB(name)
|
|
}
|
|
|
|
return selected, nil
|
|
}
|
|
|
|
// GetMongoClient returns a specific MongoDB client by name
|
|
func (s *service) GetMongoClient(name string) (*mongo.Client, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
client, exists := s.mongoClients[name]
|
|
if !exists {
|
|
return nil, fmt.Errorf("MongoDB client %s not found", name)
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// ListDBs returns list of available database names
|
|
func (s *service) ListDBs() []string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
names := make([]string, 0, len(s.sqlDatabases)+len(s.mongoClients))
|
|
|
|
for name := range s.sqlDatabases {
|
|
names = append(names, name)
|
|
}
|
|
|
|
for name := range s.mongoClients {
|
|
names = append(names, name)
|
|
}
|
|
|
|
return names
|
|
}
|
|
|
|
// GetDBType returns the type of a specific database
|
|
func (s *service) GetDBType(name string) (DatabaseType, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
config, exists := s.configs[name]
|
|
if !exists {
|
|
return "", fmt.Errorf("database %s not found", name)
|
|
}
|
|
|
|
return DatabaseType(config.Type), nil
|
|
}
|
|
|
|
// Close closes all database connections
|
|
func (s *service) Close() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
var errs []error
|
|
|
|
// Close listeners first
|
|
for name, listener := range s.listeners {
|
|
if err := listener.Close(); err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to close listener for %s: %w", name, err))
|
|
}
|
|
}
|
|
|
|
for name, db := range s.sqlDatabases {
|
|
if err := db.Close(); err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to close database %s: %w", name, err))
|
|
} else {
|
|
log.Printf("Disconnected from SQL database: %s", name)
|
|
}
|
|
}
|
|
|
|
for name, replicas := range s.readReplicas {
|
|
for i, db := range replicas {
|
|
if db != nil {
|
|
if err := db.Close(); err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to close read replica %s[%d]: %w", name, i, err))
|
|
} else {
|
|
log.Printf("Disconnected from read replica: %s[%d]", name, i)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for name, client := range s.mongoClients {
|
|
if err := client.Disconnect(context.Background()); err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to disconnect MongoDB client %s: %w", name, err))
|
|
} else {
|
|
log.Printf("Disconnected from MongoDB: %s", name)
|
|
}
|
|
}
|
|
|
|
s.sqlDatabases = make(map[string]*sql.DB)
|
|
s.sqlxDatabases = make(map[string]*sqlx.DB) // Reset map sqlx
|
|
s.mongoClients = make(map[string]*mongo.Client)
|
|
s.readReplicas = make(map[string][]*sql.DB)
|
|
s.configs = make(map[string]config.DatabaseConfig)
|
|
s.readConfigs = make(map[string][]config.DatabaseConfig)
|
|
s.listeners = make(map[string]*pq.Listener)
|
|
|
|
if len(errs) > 0 {
|
|
return fmt.Errorf("errors closing databases: %v", errs)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetPrimaryDB returns primary database connection
|
|
func (s *service) GetPrimaryDB(name string) (*sql.DB, error) {
|
|
return s.GetDB(name)
|
|
}
|
|
|
|
// ExecuteQuery executes a query with parameters and returns rows
|
|
func (s *service) ExecuteQuery(ctx context.Context, dbName string, query string, args ...interface{}) (*sql.Rows, error) {
|
|
db, err := s.GetDB(dbName)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get database %s: %w", dbName, err)
|
|
}
|
|
|
|
// Use parameterized queries to prevent SQL injection
|
|
rows, err := db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to execute query: %w", err)
|
|
}
|
|
|
|
return rows, nil
|
|
}
|
|
|
|
// ExecuteQueryRow executes a query with parameters and returns a single row
|
|
func (s *service) ExecuteQueryRow(ctx context.Context, dbName string, query string, args ...interface{}) *sql.Row {
|
|
db, err := s.GetDB(dbName)
|
|
if err != nil {
|
|
// Return an empty row with error
|
|
row := &sql.Row{}
|
|
return row
|
|
}
|
|
|
|
// Use parameterized queries to prevent SQL injection
|
|
return db.QueryRowContext(ctx, query, args...)
|
|
}
|
|
|
|
// Exec executes a query with parameters and returns the result
|
|
func (s *service) Exec(ctx context.Context, dbName string, query string, args ...interface{}) (sql.Result, error) {
|
|
db, err := s.GetDB(dbName)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get database %s: %w", dbName, err)
|
|
}
|
|
|
|
// Use parameterized queries to prevent SQL injection
|
|
result, err := db.ExecContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to execute query: %w", err)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// ListenForChanges implements PostgreSQL LISTEN/NOTIFY for real-time updates
|
|
func (s *service) ListenForChanges(ctx context.Context, dbName string, channels []string, callback func(string, string)) error {
|
|
s.mu.RLock()
|
|
config, exists := s.configs[dbName]
|
|
s.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("database %s not found", dbName)
|
|
}
|
|
|
|
// Only support PostgreSQL for LISTEN/NOTIFY
|
|
if DatabaseType(config.Type) != Postgres {
|
|
return fmt.Errorf("LISTEN/NOTIFY only supported for PostgreSQL databases")
|
|
}
|
|
|
|
// Create connection string for listener
|
|
// Convert timeout to seconds for pq
|
|
connectTimeoutSec := int(config.ConnectTimeout.Seconds())
|
|
|
|
connStr := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=%s&connect_timeout=%d",
|
|
config.Username,
|
|
config.Password,
|
|
config.Host,
|
|
config.Port,
|
|
config.Database,
|
|
config.SSLMode,
|
|
connectTimeoutSec,
|
|
)
|
|
|
|
// Create listener
|
|
listener := pq.NewListener(
|
|
connStr,
|
|
10*time.Second,
|
|
time.Minute,
|
|
func(ev pq.ListenerEventType, err error) {
|
|
if err != nil {
|
|
log.Printf("Database listener (%s) error: %v", dbName, err)
|
|
}
|
|
},
|
|
)
|
|
|
|
// Store listener for cleanup
|
|
s.listenersMu.Lock()
|
|
s.listeners[dbName] = listener
|
|
s.listenersMu.Unlock()
|
|
|
|
// Listen to specified channels
|
|
for _, channel := range channels {
|
|
err := listener.Listen(channel)
|
|
if err != nil {
|
|
listener.Close()
|
|
return fmt.Errorf("failed to listen to channel %s: %w", channel, err)
|
|
}
|
|
log.Printf("Listening to database channel: %s on %s", channel, dbName)
|
|
}
|
|
|
|
// Start listening loop
|
|
go func() {
|
|
defer func() {
|
|
listener.Close()
|
|
s.listenersMu.Lock()
|
|
delete(s.listeners, dbName)
|
|
s.listenersMu.Unlock()
|
|
log.Printf("Database listener for %s stopped", dbName)
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case n := <-listener.Notify:
|
|
if n != nil {
|
|
callback(n.Channel, n.Extra)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(90 * time.Second):
|
|
// Send ping to keep connection alive
|
|
go func() {
|
|
if err := listener.Ping(); err != nil {
|
|
log.Printf("Listener ping failed for %s: %v", dbName, err)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// NotifyChange sends a notification to a PostgreSQL channel
|
|
func (s *service) NotifyChange(dbName, channel, payload string) error {
|
|
db, err := s.GetDB(dbName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get database %s: %w", dbName, err)
|
|
}
|
|
|
|
// Check if it's PostgreSQL
|
|
s.mu.RLock()
|
|
config, exists := s.configs[dbName]
|
|
s.mu.RUnlock()
|
|
|
|
if !exists {
|
|
return fmt.Errorf("database %s configuration not found", dbName)
|
|
}
|
|
|
|
if DatabaseType(config.Type) != Postgres {
|
|
return fmt.Errorf("NOTIFY only supported for PostgreSQL databases")
|
|
}
|
|
|
|
// Execute NOTIFY with parameterized query to prevent SQL injection
|
|
query := "SELECT pg_notify($1, $2)"
|
|
_, err = db.Exec(query, channel, payload)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send notification: %w", err)
|
|
}
|
|
|
|
log.Printf("Sent notification to channel %s on %s: %s", channel, dbName, payload)
|
|
return nil
|
|
}
|