572 lines
15 KiB
Go
572 lines
15 KiB
Go
package database
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log" // Import runtime package
|
|
|
|
// Import debug package
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"api-service/internal/config"
|
|
|
|
_ "github.com/jackc/pgx/v5" // Import pgx driver
|
|
_ "gorm.io/driver/postgres" // Import GORM PostgreSQL driver
|
|
|
|
_ "github.com/go-sql-driver/mysql" // MySQL driver for database/sql
|
|
_ "gorm.io/driver/mysql" // GORM MySQL driver
|
|
_ "gorm.io/driver/sqlserver" // GORM SQL Server driver
|
|
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
)
|
|
|
|
// DatabaseType represents supported database types
|
|
type DatabaseType string
|
|
|
|
const (
|
|
Postgres DatabaseType = "postgres"
|
|
MySQL DatabaseType = "mysql"
|
|
SQLServer DatabaseType = "sqlserver"
|
|
SQLite DatabaseType = "sqlite"
|
|
MongoDB DatabaseType = "mongodb"
|
|
)
|
|
|
|
// Service represents a service that interacts with multiple databases
|
|
type Service interface {
|
|
Health() map[string]map[string]string
|
|
GetDB(name string) (*sql.DB, error)
|
|
GetMongoClient(name string) (*mongo.Client, error)
|
|
GetReadDB(name string) (*sql.DB, error) // For read replicas
|
|
Close() error
|
|
ListDBs() []string
|
|
GetDBType(name string) (DatabaseType, error)
|
|
}
|
|
|
|
type service struct {
|
|
sqlDatabases map[string]*sql.DB
|
|
mongoClients map[string]*mongo.Client
|
|
readReplicas map[string][]*sql.DB // Read replicas for load balancing
|
|
configs map[string]config.DatabaseConfig
|
|
readConfigs map[string][]config.DatabaseConfig
|
|
mu sync.RWMutex
|
|
readBalancer map[string]int // Round-robin counter for read replicas
|
|
}
|
|
|
|
var (
|
|
dbManager *service
|
|
once sync.Once
|
|
)
|
|
|
|
// New creates a new database service with multiple connections
|
|
func New(cfg *config.Config) Service {
|
|
once.Do(func() {
|
|
dbManager = &service{
|
|
sqlDatabases: make(map[string]*sql.DB),
|
|
mongoClients: make(map[string]*mongo.Client),
|
|
readReplicas: make(map[string][]*sql.DB),
|
|
configs: make(map[string]config.DatabaseConfig),
|
|
readConfigs: make(map[string][]config.DatabaseConfig),
|
|
readBalancer: make(map[string]int),
|
|
}
|
|
|
|
log.Println("Initializing database service...") // Log when the initialization starts
|
|
// log.Printf("Current Goroutine ID: %d", runtime.NumGoroutine()) // Log the number of goroutines
|
|
// log.Printf("Stack Trace: %s", debug.Stack()) // Log the stack trace
|
|
dbManager.loadFromConfig(cfg)
|
|
|
|
// Initialize all databases
|
|
for name, dbConfig := range dbManager.configs {
|
|
if err := dbManager.addDatabase(name, dbConfig); err != nil {
|
|
log.Printf("Failed to connect to database %s: %v", name, err)
|
|
}
|
|
}
|
|
|
|
// Initialize read replicas
|
|
for name, replicaConfigs := range dbManager.readConfigs {
|
|
for i, replicaConfig := range replicaConfigs {
|
|
if err := dbManager.addReadReplica(name, i, replicaConfig); err != nil {
|
|
log.Printf("Failed to connect to read replica %s[%d]: %v", name, i, err)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
return dbManager
|
|
}
|
|
|
|
func (s *service) loadFromConfig(cfg *config.Config) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
// Load primary databases
|
|
for name, dbConfig := range cfg.Databases {
|
|
s.configs[name] = dbConfig
|
|
}
|
|
|
|
// Load read replicas
|
|
for name, replicaConfigs := range cfg.ReadReplicas {
|
|
s.readConfigs[name] = replicaConfigs
|
|
}
|
|
}
|
|
|
|
func (s *service) addDatabase(name string, config config.DatabaseConfig) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
log.Printf("=== Database Connection Debug ===")
|
|
log.Printf("Database: %s", name)
|
|
log.Printf("Type: %s", config.Type)
|
|
log.Printf("Host: %s", config.Host)
|
|
log.Printf("Port: %d", config.Port)
|
|
log.Printf("Database: %s", config.Database)
|
|
log.Printf("Username: %s", config.Username)
|
|
log.Printf("SSLMode: %s", config.SSLMode)
|
|
|
|
var db *sql.DB
|
|
var err error
|
|
|
|
dbType := DatabaseType(config.Type)
|
|
|
|
switch dbType {
|
|
case Postgres:
|
|
db, err = s.openPostgresConnection(config)
|
|
case MySQL:
|
|
db, err = s.openMySQLConnection(config)
|
|
case SQLServer:
|
|
db, err = s.openSQLServerConnection(config)
|
|
case SQLite:
|
|
db, err = s.openSQLiteConnection(config)
|
|
case MongoDB:
|
|
return s.addMongoDB(name, config)
|
|
default:
|
|
return fmt.Errorf("unsupported database type: %s", config.Type)
|
|
}
|
|
|
|
if err != nil {
|
|
log.Printf("❌ Error connecting to database %s: %v", name, err)
|
|
log.Printf(" Database: %s@%s:%d/%s", config.Username, config.Host, config.Port, config.Database)
|
|
return err
|
|
}
|
|
|
|
log.Printf("✅ Successfully connected to database: %s", name)
|
|
return s.configureSQLDB(name, db, config.MaxOpenConns, config.MaxIdleConns, config.ConnMaxLifetime)
|
|
}
|
|
|
|
func (s *service) addReadReplica(name string, index int, config config.DatabaseConfig) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
var db *sql.DB
|
|
var err error
|
|
|
|
dbType := DatabaseType(config.Type)
|
|
|
|
switch dbType {
|
|
case Postgres:
|
|
db, err = s.openPostgresConnection(config)
|
|
case MySQL:
|
|
db, err = s.openMySQLConnection(config)
|
|
case SQLServer:
|
|
db, err = s.openSQLServerConnection(config)
|
|
case SQLite:
|
|
db, err = s.openSQLiteConnection(config)
|
|
default:
|
|
return fmt.Errorf("unsupported database type for read replica: %s", config.Type)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if s.readReplicas[name] == nil {
|
|
s.readReplicas[name] = make([]*sql.DB, 0)
|
|
}
|
|
|
|
// Ensure we have enough slots
|
|
for len(s.readReplicas[name]) <= index {
|
|
s.readReplicas[name] = append(s.readReplicas[name], nil)
|
|
}
|
|
|
|
s.readReplicas[name][index] = db
|
|
log.Printf("Successfully connected to read replica %s[%d]", name, index)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) openPostgresConnection(config config.DatabaseConfig) (*sql.DB, error) {
|
|
connStr := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=%s",
|
|
config.Username,
|
|
config.Password,
|
|
config.Host,
|
|
config.Port,
|
|
config.Database,
|
|
config.SSLMode,
|
|
)
|
|
|
|
if config.Schema != "" {
|
|
connStr += "&search_path=" + config.Schema
|
|
}
|
|
|
|
db, err := sql.Open("pgx", connStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open PostgreSQL connection: %w", err)
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
func (s *service) openMySQLConnection(config config.DatabaseConfig) (*sql.DB, error) {
|
|
connStr := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true",
|
|
config.Username,
|
|
config.Password,
|
|
config.Host,
|
|
config.Port,
|
|
config.Database,
|
|
)
|
|
|
|
db, err := sql.Open("mysql", connStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open MySQL connection: %w", err)
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
func (s *service) openSQLServerConnection(config config.DatabaseConfig) (*sql.DB, error) {
|
|
connStr := fmt.Sprintf("sqlserver://%s:%s@%s:%d?database=%s",
|
|
config.Username,
|
|
config.Password,
|
|
config.Host,
|
|
config.Port,
|
|
config.Database,
|
|
)
|
|
|
|
db, err := sql.Open("sqlserver", connStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open SQL Server connection: %w", err)
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
func (s *service) openSQLiteConnection(config config.DatabaseConfig) (*sql.DB, error) {
|
|
dbPath := config.Path
|
|
if dbPath == "" {
|
|
dbPath = fmt.Sprintf("./data/%s.db", config.Database)
|
|
}
|
|
|
|
db, err := sql.Open("sqlite3", dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open SQLite connection: %w", err)
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
func (s *service) addMongoDB(name string, config config.DatabaseConfig) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
uri := fmt.Sprintf("mongodb://%s:%s@%s:%d/%s",
|
|
config.Username,
|
|
config.Password,
|
|
config.Host,
|
|
config.Port,
|
|
config.Database,
|
|
)
|
|
|
|
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to MongoDB: %w", err)
|
|
}
|
|
|
|
s.mongoClients[name] = client
|
|
log.Printf("Successfully connected to MongoDB: %s", name)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) configureSQLDB(name string, db *sql.DB, maxOpenConns, maxIdleConns int, connMaxLifetime time.Duration) error {
|
|
db.SetMaxOpenConns(maxOpenConns)
|
|
db.SetMaxIdleConns(maxIdleConns)
|
|
db.SetConnMaxLifetime(connMaxLifetime)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if err := db.PingContext(ctx); err != nil {
|
|
db.Close()
|
|
return fmt.Errorf("failed to ping database: %w", err)
|
|
}
|
|
|
|
s.sqlDatabases[name] = db
|
|
log.Printf("Successfully connected to SQL database: %s", name)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Health checks the health of all database connections by pinging each database.
|
|
func (s *service) Health() map[string]map[string]string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
result := make(map[string]map[string]string)
|
|
|
|
// Check SQL databases
|
|
for name, db := range s.sqlDatabases {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
stats := make(map[string]string)
|
|
|
|
err := db.PingContext(ctx)
|
|
if err != nil {
|
|
stats["status"] = "down"
|
|
stats["error"] = fmt.Sprintf("db down: %v", err)
|
|
stats["type"] = "sql"
|
|
stats["role"] = "primary"
|
|
result[name] = stats
|
|
continue
|
|
}
|
|
|
|
stats["status"] = "up"
|
|
stats["message"] = "It's healthy"
|
|
stats["type"] = "sql"
|
|
stats["role"] = "primary"
|
|
|
|
dbStats := db.Stats()
|
|
stats["open_connections"] = strconv.Itoa(dbStats.OpenConnections)
|
|
stats["in_use"] = strconv.Itoa(dbStats.InUse)
|
|
stats["idle"] = strconv.Itoa(dbStats.Idle)
|
|
stats["wait_count"] = strconv.FormatInt(dbStats.WaitCount, 10)
|
|
stats["wait_duration"] = dbStats.WaitDuration.String()
|
|
stats["max_idle_closed"] = strconv.FormatInt(dbStats.MaxIdleClosed, 10)
|
|
stats["max_lifetime_closed"] = strconv.FormatInt(dbStats.MaxLifetimeClosed, 10)
|
|
|
|
if dbStats.OpenConnections > 40 {
|
|
stats["message"] = "The database is experiencing heavy load."
|
|
}
|
|
|
|
if dbStats.WaitCount > 1000 {
|
|
stats["message"] = "The database has a high number of wait events, indicating potential bottlenecks."
|
|
}
|
|
|
|
if dbStats.MaxIdleClosed > int64(dbStats.OpenConnections)/2 {
|
|
stats["message"] = "Many idle connections are being closed, consider revising the connection pool settings."
|
|
}
|
|
|
|
if dbStats.MaxLifetimeClosed > int64(dbStats.OpenConnections)/2 {
|
|
stats["message"] = "Many connections are being closed due to max lifetime, consider increasing max lifetime or revising the connection usage pattern."
|
|
}
|
|
|
|
result[name] = stats
|
|
}
|
|
|
|
// Check read replicas
|
|
for name, replicas := range s.readReplicas {
|
|
for i, db := range replicas {
|
|
if db == nil {
|
|
continue
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
replicaName := fmt.Sprintf("%s_replica_%d", name, i)
|
|
stats := make(map[string]string)
|
|
|
|
err := db.PingContext(ctx)
|
|
if err != nil {
|
|
stats["status"] = "down"
|
|
stats["error"] = fmt.Sprintf("read replica down: %v", err)
|
|
stats["type"] = "sql"
|
|
stats["role"] = "replica"
|
|
result[replicaName] = stats
|
|
continue
|
|
}
|
|
|
|
stats["status"] = "up"
|
|
stats["message"] = "Read replica healthy"
|
|
stats["type"] = "sql"
|
|
stats["role"] = "replica"
|
|
|
|
dbStats := db.Stats()
|
|
stats["open_connections"] = strconv.Itoa(dbStats.OpenConnections)
|
|
stats["in_use"] = strconv.Itoa(dbStats.InUse)
|
|
stats["idle"] = strconv.Itoa(dbStats.Idle)
|
|
|
|
result[replicaName] = stats
|
|
}
|
|
}
|
|
|
|
// Check MongoDB connections
|
|
for name, client := range s.mongoClients {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
stats := make(map[string]string)
|
|
|
|
err := client.Ping(ctx, nil)
|
|
if err != nil {
|
|
stats["status"] = "down"
|
|
stats["error"] = fmt.Sprintf("mongodb down: %v", err)
|
|
stats["type"] = "mongodb"
|
|
result[name] = stats
|
|
continue
|
|
}
|
|
|
|
stats["status"] = "up"
|
|
stats["message"] = "It's healthy"
|
|
stats["type"] = "mongodb"
|
|
|
|
result[name] = stats
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// GetDB returns a specific SQL database connection by name
|
|
func (s *service) GetDB(name string) (*sql.DB, error) {
|
|
log.Printf("Attempting to get database connection for: %s", name)
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
db, exists := s.sqlDatabases[name]
|
|
if !exists {
|
|
log.Printf("Error: database %s not found", name) // Log the error
|
|
return nil, fmt.Errorf("database %s not found", name)
|
|
}
|
|
|
|
log.Printf("Current connection pool state for %s: Open: %d, In Use: %d, Idle: %d",
|
|
name, db.Stats().OpenConnections, db.Stats().InUse, db.Stats().Idle)
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
// db, exists := s.sqlDatabases[name]
|
|
// if !exists {
|
|
// log.Printf("Error: database %s not found", name) // Log the error
|
|
// return nil, fmt.Errorf("database %s not found", name)
|
|
// }
|
|
|
|
return db, nil
|
|
}
|
|
|
|
// GetReadDB returns a read replica connection using round-robin load balancing
|
|
func (s *service) GetReadDB(name string) (*sql.DB, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
replicas, exists := s.readReplicas[name]
|
|
if !exists || len(replicas) == 0 {
|
|
// Fallback to primary if no replicas available
|
|
return s.GetDB(name)
|
|
}
|
|
|
|
// Round-robin load balancing
|
|
s.readBalancer[name] = (s.readBalancer[name] + 1) % len(replicas)
|
|
selected := replicas[s.readBalancer[name]]
|
|
|
|
if selected == nil {
|
|
// Fallback to primary if replica is nil
|
|
return s.GetDB(name)
|
|
}
|
|
|
|
return selected, nil
|
|
}
|
|
|
|
// GetMongoClient returns a specific MongoDB client by name
|
|
func (s *service) GetMongoClient(name string) (*mongo.Client, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
client, exists := s.mongoClients[name]
|
|
if !exists {
|
|
return nil, fmt.Errorf("MongoDB client %s not found", name)
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// ListDBs returns list of available database names
|
|
func (s *service) ListDBs() []string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
names := make([]string, 0, len(s.sqlDatabases)+len(s.mongoClients))
|
|
|
|
for name := range s.sqlDatabases {
|
|
names = append(names, name)
|
|
}
|
|
|
|
for name := range s.mongoClients {
|
|
names = append(names, name)
|
|
}
|
|
|
|
return names
|
|
}
|
|
|
|
// GetDBType returns the type of a specific database
|
|
func (s *service) GetDBType(name string) (DatabaseType, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
config, exists := s.configs[name]
|
|
if !exists {
|
|
return "", fmt.Errorf("database %s not found", name)
|
|
}
|
|
|
|
return DatabaseType(config.Type), nil
|
|
}
|
|
|
|
// Close closes all database connections
|
|
func (s *service) Close() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
var errs []error
|
|
|
|
for name, db := range s.sqlDatabases {
|
|
if err := db.Close(); err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to close database %s: %w", name, err))
|
|
} else {
|
|
log.Printf("Disconnected from SQL database: %s", name)
|
|
}
|
|
}
|
|
|
|
for name, replicas := range s.readReplicas {
|
|
for i, db := range replicas {
|
|
if db != nil {
|
|
if err := db.Close(); err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to close read replica %s[%d]: %w", name, i, err))
|
|
} else {
|
|
log.Printf("Disconnected from read replica: %s[%d]", name, i)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for name, client := range s.mongoClients {
|
|
if err := client.Disconnect(context.Background()); err != nil {
|
|
errs = append(errs, fmt.Errorf("failed to disconnect MongoDB client %s: %w", name, err))
|
|
} else {
|
|
log.Printf("Disconnected from MongoDB: %s", name)
|
|
}
|
|
}
|
|
|
|
s.sqlDatabases = make(map[string]*sql.DB)
|
|
s.mongoClients = make(map[string]*mongo.Client)
|
|
s.readReplicas = make(map[string][]*sql.DB)
|
|
s.configs = make(map[string]config.DatabaseConfig)
|
|
s.readConfigs = make(map[string][]config.DatabaseConfig)
|
|
|
|
if len(errs) > 0 {
|
|
return fmt.Errorf("errors closing databases: %v", errs)
|
|
}
|
|
|
|
return nil
|
|
}
|