perbaikan config

This commit is contained in:
2025-08-17 07:10:56 +07:00
parent 229782dad8
commit a7d6005649
10 changed files with 658 additions and 545 deletions

View File

@@ -5,12 +5,12 @@ import (
"database/sql"
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
"api-service/internal/config"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
@@ -26,47 +26,25 @@ const (
MongoDB DatabaseType = "mongodb"
)
// DatabaseConfig represents configuration for a single database connection
type DatabaseConfig struct {
Name string
Type DatabaseType
Host string
Port string
Database string
Username string
Password string
Schema string
SSLMode string
Path string // For SQLite
Options string // Additional connection options
}
// Service represents a service that interacts with multiple databases
type Service interface {
// Health returns health status for all databases
Health() map[string]map[string]string
// GetDB returns a specific SQL database connection by name
GetDB(name string) (*sql.DB, error)
// GetMongoClient returns a specific MongoDB client by name
GetMongoClient(name string) (*mongo.Client, error)
// Close terminates all database connections
GetReadDB(name string) (*sql.DB, error) // For read replicas
Close() error
// ListDBs returns list of available database names
ListDBs() []string
// GetDBType returns the type of a specific database
GetDBType(name string) (DatabaseType, error)
}
type service struct {
sqlDatabases map[string]*sql.DB
mongoClients map[string]*mongo.Client
configs map[string]DatabaseConfig
mu sync.RWMutex
sqlDatabases map[string]*sql.DB
mongoClients map[string]*mongo.Client
readReplicas map[string][]*sql.DB // Read replicas for load balancing
configs map[string]config.DatabaseConfig
readConfigs map[string][]config.DatabaseConfig
mu sync.RWMutex
readBalancer map[string]int // Round-robin counter for read replicas
}
var (
@@ -75,21 +53,33 @@ var (
)
// New creates a new database service with multiple connections
func New() Service {
func New(cfg *config.Config) Service {
once.Do(func() {
dbManager = &service{
sqlDatabases: make(map[string]*sql.DB),
mongoClients: make(map[string]*mongo.Client),
configs: make(map[string]DatabaseConfig),
readReplicas: make(map[string][]*sql.DB),
configs: make(map[string]config.DatabaseConfig),
readConfigs: make(map[string][]config.DatabaseConfig),
readBalancer: make(map[string]int),
}
// Load database configurations from environment
configs := loadDatabaseConfigs()
// Load configurations from config
dbManager.loadFromConfig(cfg)
// Initialize all database connections
for _, config := range configs {
if err := dbManager.addDatabase(config); err != nil {
log.Printf("Failed to connect to database %s: %v", config.Name, err)
// Initialize all databases
for name, dbConfig := range dbManager.configs {
if err := dbManager.addDatabase(name, dbConfig); err != nil {
log.Printf("Failed to connect to database %s: %v", name, err)
}
}
// Initialize read replicas
for name, replicaConfigs := range dbManager.readConfigs {
for i, replicaConfig := range replicaConfigs {
if err := dbManager.addReadReplica(name, i, replicaConfig); err != nil {
log.Printf("Failed to connect to read replica %s[%d]: %v", name, i, err)
}
}
}
})
@@ -97,139 +87,95 @@ func New() Service {
return dbManager
}
// loadDatabaseConfigs loads database configurations from environment variables
func loadDatabaseConfigs() []DatabaseConfig {
var configs []DatabaseConfig
// Load configurations from environment
// Format: DB_{NAME}_{PROPERTY}
// Check for DB_ prefixed configurations
envVars := os.Environ()
dbConfigs := make(map[string]map[string]string)
for _, envVar := range envVars {
parts := strings.SplitN(envVar, "=", 2)
if len(parts) != 2 {
continue
}
key := parts[0]
value := parts[1]
if strings.HasPrefix(key, "DB_") {
segments := strings.Split(key, "_")
if len(segments) >= 3 {
dbName := strings.ToLower(segments[1])
property := strings.ToLower(strings.Join(segments[2:], "_"))
if dbConfigs[dbName] == nil {
dbConfigs[dbName] = make(map[string]string)
}
dbConfigs[dbName][property] = value
}
}
}
// Convert map to DatabaseConfig structs
for name, config := range dbConfigs {
dbType := DatabaseType(getEnvFromMap(config, "type", "postgres"))
dbConfig := DatabaseConfig{
Name: name,
Type: dbType,
Host: getEnvFromMap(config, "host", "localhost"),
Port: getEnvFromMap(config, "port", getDefaultPort(dbType)),
Database: getEnvFromMap(config, "database", name),
Username: getEnvFromMap(config, "username", ""),
Password: getEnvFromMap(config, "password", ""),
Schema: getEnvFromMap(config, "schema", ""),
SSLMode: getEnvFromMap(config, "sslmode", "disable"),
Path: getEnvFromMap(config, "path", ""),
Options: getEnvFromMap(config, "options", ""),
}
configs = append(configs, dbConfig)
}
// If no configurations found, use default
if len(configs) == 0 {
configs = []DatabaseConfig{
{
Name: "primary",
Type: Postgres,
Host: getEnv("DB_PRIMARY_HOST", "localhost"),
Port: getEnv("DB_PRIMARY_PORT", "5432"),
Database: getEnv("DB_PRIMARY_DATABASE", "blueprint"),
Username: getEnv("DB_PRIMARY_USERNAME", "postgres"),
Password: getEnv("DB_PRIMARY_PASSWORD", ""),
Schema: getEnv("DB_PRIMARY_SCHEMA", "public"),
SSLMode: getEnv("DB_PRIMARY_SSLMODE", "disable"),
},
}
}
return configs
}
// getEnvFromMap helper function
func getEnvFromMap(config map[string]string, key, defaultValue string) string {
if value, exists := config[key]; exists {
return value
}
return defaultValue
}
// getEnv helper function
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
// getDefaultPort returns default port for database type
func getDefaultPort(dbType DatabaseType) string {
switch dbType {
case Postgres:
return "5432"
case MySQL:
return "3306"
case SQLServer:
return "1433"
case MongoDB:
return "27017"
case SQLite:
return ""
default:
return "5432"
}
}
// addDatabase adds a new database connection
func (s *service) addDatabase(config DatabaseConfig) error {
func (s *service) loadFromConfig(cfg *config.Config) {
s.mu.Lock()
defer s.mu.Unlock()
switch config.Type {
case Postgres:
return s.addPostgres(config)
case MySQL:
return s.addMySQL(config)
case SQLServer:
return s.addSQLServer(config)
case SQLite:
return s.addSQLite(config)
case MongoDB:
return s.addMongoDB(config)
default:
return fmt.Errorf("unsupported database type: %s", config.Type)
// Load primary databases
for name, dbConfig := range cfg.Databases {
s.configs[name] = dbConfig
}
// Load read replicas
for name, replicaConfigs := range cfg.ReadReplicas {
s.readConfigs[name] = replicaConfigs
}
}
// addPostgres adds PostgreSQL connection
func (s *service) addPostgres(config DatabaseConfig) error {
connStr := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=%s",
func (s *service) addDatabase(name string, config config.DatabaseConfig) error {
s.mu.Lock()
defer s.mu.Unlock()
var db *sql.DB
var err error
dbType := DatabaseType(config.Type)
switch dbType {
case Postgres:
db, err = s.openPostgresConnection(config)
case MySQL:
db, err = s.openMySQLConnection(config)
case SQLServer:
db, err = s.openSQLServerConnection(config)
case SQLite:
db, err = s.openSQLiteConnection(config)
case MongoDB:
return s.addMongoDB(name, config)
default:
return fmt.Errorf("unsupported database type: %s", config.Type)
}
if err != nil {
return err
}
return s.configureSQLDB(name, db, config.MaxOpenConns, config.MaxIdleConns, config.ConnMaxLifetime)
}
func (s *service) addReadReplica(name string, index int, config config.DatabaseConfig) error {
s.mu.Lock()
defer s.mu.Unlock()
var db *sql.DB
var err error
dbType := DatabaseType(config.Type)
switch dbType {
case Postgres:
db, err = s.openPostgresConnection(config)
case MySQL:
db, err = s.openMySQLConnection(config)
case SQLServer:
db, err = s.openSQLServerConnection(config)
case SQLite:
db, err = s.openSQLiteConnection(config)
default:
return fmt.Errorf("unsupported database type for read replica: %s", config.Type)
}
if err != nil {
return err
}
if s.readReplicas[name] == nil {
s.readReplicas[name] = make([]*sql.DB, 0)
}
// Ensure we have enough slots
for len(s.readReplicas[name]) <= index {
s.readReplicas[name] = append(s.readReplicas[name], nil)
}
s.readReplicas[name][index] = db
log.Printf("Successfully connected to read replica %s[%d]", name, index)
return nil
}
func (s *service) openPostgresConnection(config config.DatabaseConfig) (*sql.DB, error) {
connStr := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=%s",
config.Username,
config.Password,
config.Host,
@@ -244,15 +190,14 @@ func (s *service) addPostgres(config DatabaseConfig) error {
db, err := sql.Open("pgx", connStr)
if err != nil {
return fmt.Errorf("failed to open PostgreSQL connection: %w", err)
return nil, fmt.Errorf("failed to open PostgreSQL connection: %w", err)
}
return s.configureSQLDB(config.Name, db)
return db, nil
}
// addMySQL adds MySQL connection
func (s *service) addMySQL(config DatabaseConfig) error {
connStr := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true",
func (s *service) openMySQLConnection(config config.DatabaseConfig) (*sql.DB, error) {
connStr := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true",
config.Username,
config.Password,
config.Host,
@@ -262,15 +207,14 @@ func (s *service) addMySQL(config DatabaseConfig) error {
db, err := sql.Open("mysql", connStr)
if err != nil {
return fmt.Errorf("failed to open MySQL connection: %w", err)
return nil, fmt.Errorf("failed to open MySQL connection: %w", err)
}
return s.configureSQLDB(config.Name, db)
return db, nil
}
// addSQLServer adds SQL Server connection
func (s *service) addSQLServer(config DatabaseConfig) error {
connStr := fmt.Sprintf("sqlserver://%s:%s@%s:%s?database=%s",
func (s *service) openSQLServerConnection(config config.DatabaseConfig) (*sql.DB, error) {
connStr := fmt.Sprintf("sqlserver://%s:%s@%s:%d?database=%s",
config.Username,
config.Password,
config.Host,
@@ -280,30 +224,31 @@ func (s *service) addSQLServer(config DatabaseConfig) error {
db, err := sql.Open("sqlserver", connStr)
if err != nil {
return fmt.Errorf("failed to open SQL Server connection: %w", err)
return nil, fmt.Errorf("failed to open SQL Server connection: %w", err)
}
return s.configureSQLDB(config.Name, db)
return db, nil
}
// addSQLite adds SQLite connection
func (s *service) addSQLite(config DatabaseConfig) error {
func (s *service) openSQLiteConnection(config config.DatabaseConfig) (*sql.DB, error) {
dbPath := config.Path
if dbPath == "" {
dbPath = fmt.Sprintf("./data/%s.db", config.Name)
dbPath = fmt.Sprintf("./data/%s.db", config.Database)
}
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return fmt.Errorf("failed to open SQLite connection: %w", err)
return nil, fmt.Errorf("failed to open SQLite connection: %w", err)
}
return s.configureSQLDB(config.Name, db)
return db, nil
}
// addMongoDB adds MongoDB connection
func (s *service) addMongoDB(config DatabaseConfig) error {
uri := fmt.Sprintf("mongodb://%s:%s@%s:%s/%s",
func (s *service) addMongoDB(name string, config config.DatabaseConfig) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
uri := fmt.Sprintf("mongodb://%s:%s@%s:%d/%s",
config.Username,
config.Password,
config.Host,
@@ -311,36 +256,22 @@ func (s *service) addMongoDB(config DatabaseConfig) error {
config.Database,
)
clientOptions := options.Client().ApplyURI(uri)
client, err := mongo.Connect(context.Background(), clientOptions)
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err != nil {
return fmt.Errorf("failed to connect to MongoDB: %w", err)
}
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := client.Ping(ctx, nil); err != nil {
client.Disconnect(context.Background())
return fmt.Errorf("failed to ping MongoDB: %w", err)
}
s.mongoClients[config.Name] = client
s.configs[config.Name] = config
log.Printf("Successfully connected to MongoDB: %s", config.Name)
s.mongoClients[name] = client
log.Printf("Successfully connected to MongoDB: %s", name)
return nil
}
// configureSQLDB configures common SQL database settings
func (s *service) configureSQLDB(name string, db *sql.DB) error {
// Configure connection pool
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(25)
db.SetConnMaxLifetime(5 * time.Minute)
func (s *service) configureSQLDB(name string, db *sql.DB, maxOpenConns, maxIdleConns int, connMaxLifetime time.Duration) error {
db.SetMaxOpenConns(maxOpenConns)
db.SetMaxIdleConns(maxIdleConns)
db.SetConnMaxLifetime(connMaxLifetime)
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
@@ -355,48 +286,7 @@ func (s *service) configureSQLDB(name string, db *sql.DB) error {
return nil
}
// # Example multi-database configuration for different database types
// # PostgreSQL
// DB_TYPE_PRIMARY=postgres
// DB_HOST_PRIMARY=localhost
// DB_PORT_PRIMARY=5432
// DB_NAME_PRIMARY=myapp_postgres
// DB_USER_PRIMARY=postgres
// DB_PASS_PRIMARY=postgres_password
// DB_SCHEMA_PRIMARY=public
// DB_SSLMODE_PRIMARY=disable
// # MySQL
// DB_TYPE_MYSQL=mysql
// DB_HOST_MYSQL=localhost
// DB_PORT_MYSQL=3306
// DB_NAME_MYSQL=myapp_mysql
// DB_USER_MYSQL=root
// DB_PASS_MYSQL=mysql_password
// # SQL Server
// DB_TYPE_SQLSERVER=mssql
// DB_HOST_SQLSERVER=localhost
// DB_PORT_SQLSERVER=1433
// DB_NAME_SQLSERVER=myapp_mssql
// DB_USER_SQLSERVER=sa
// DB_PASS_SQLSERVER=mssql_password
// # MongoDB
// DB_TYPE_MONGODB=mongodb
// DB_HOST_MONGODB=localhost
// DB_PORT_MONGODB=27017
// DB_NAME_MONGODB=myapp_mongo
// DB_USER_MONGODB=mongo_user
// DB_PASS_MONGODB=mongo_password
// # SQLite
// DB_TYPE_SQLITE=sqlite
// DB_PATH_SQLITE=./data/myapp_sqlite.db
// Health checks the health of all database connections by pinging each database.
// It returns a map with database names as keys and their health statistics as values.
func (s *service) Health() map[string]map[string]string {
s.mu.RLock()
defer s.mu.RUnlock()
@@ -410,22 +300,21 @@ func (s *service) Health() map[string]map[string]string {
stats := make(map[string]string)
// Ping the database
err := db.PingContext(ctx)
if err != nil {
stats["status"] = "down"
stats["error"] = fmt.Sprintf("db down: %v", err)
stats["type"] = "sql"
stats["role"] = "primary"
result[name] = stats
continue
}
// Database is up, add more statistics
stats["status"] = "up"
stats["message"] = "It's healthy"
stats["type"] = "sql"
stats["role"] = "primary"
// Get database stats
dbStats := db.Stats()
stats["open_connections"] = strconv.Itoa(dbStats.OpenConnections)
stats["in_use"] = strconv.Itoa(dbStats.InUse)
@@ -435,7 +324,6 @@ func (s *service) Health() map[string]map[string]string {
stats["max_idle_closed"] = strconv.FormatInt(dbStats.MaxIdleClosed, 10)
stats["max_lifetime_closed"] = strconv.FormatInt(dbStats.MaxLifetimeClosed, 10)
// Evaluate stats to provide health messages
if dbStats.OpenConnections > 40 {
stats["message"] = "The database is experiencing heavy load."
}
@@ -455,6 +343,43 @@ func (s *service) Health() map[string]map[string]string {
result[name] = stats
}
// Check read replicas
for name, replicas := range s.readReplicas {
for i, db := range replicas {
if db == nil {
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
replicaName := fmt.Sprintf("%s_replica_%d", name, i)
stats := make(map[string]string)
err := db.PingContext(ctx)
if err != nil {
stats["status"] = "down"
stats["error"] = fmt.Sprintf("read replica down: %v", err)
stats["type"] = "sql"
stats["role"] = "replica"
result[replicaName] = stats
continue
}
stats["status"] = "up"
stats["message"] = "Read replica healthy"
stats["type"] = "sql"
stats["role"] = "replica"
dbStats := db.Stats()
stats["open_connections"] = strconv.Itoa(dbStats.OpenConnections)
stats["in_use"] = strconv.Itoa(dbStats.InUse)
stats["idle"] = strconv.Itoa(dbStats.Idle)
result[replicaName] = stats
}
}
// Check MongoDB connections
for name, client := range s.mongoClients {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@@ -462,7 +387,6 @@ func (s *service) Health() map[string]map[string]string {
stats := make(map[string]string)
// Ping the MongoDB
err := client.Ping(ctx, nil)
if err != nil {
stats["status"] = "down"
@@ -472,7 +396,6 @@ func (s *service) Health() map[string]map[string]string {
continue
}
// MongoDB is up
stats["status"] = "up"
stats["message"] = "It's healthy"
stats["type"] = "mongodb"
@@ -496,6 +419,29 @@ func (s *service) GetDB(name string) (*sql.DB, error) {
return db, nil
}
// GetReadDB returns a read replica connection using round-robin load balancing
func (s *service) GetReadDB(name string) (*sql.DB, error) {
s.mu.RLock()
defer s.mu.RUnlock()
replicas, exists := s.readReplicas[name]
if !exists || len(replicas) == 0 {
// Fallback to primary if no replicas available
return s.GetDB(name)
}
// Round-robin load balancing
s.readBalancer[name] = (s.readBalancer[name] + 1) % len(replicas)
selected := replicas[s.readBalancer[name]]
if selected == nil {
// Fallback to primary if replica is nil
return s.GetDB(name)
}
return selected, nil
}
// GetMongoClient returns a specific MongoDB client by name
func (s *service) GetMongoClient(name string) (*mongo.Client, error) {
s.mu.RLock()
@@ -516,12 +462,10 @@ func (s *service) ListDBs() []string {
names := make([]string, 0, len(s.sqlDatabases)+len(s.mongoClients))
// Add SQL databases
for name := range s.sqlDatabases {
names = append(names, name)
}
// Add MongoDB clients
for name := range s.mongoClients {
names = append(names, name)
}
@@ -539,18 +483,16 @@ func (s *service) GetDBType(name string) (DatabaseType, error) {
return "", fmt.Errorf("database %s not found", name)
}
return config.Type, nil
return DatabaseType(config.Type), nil
}
// Close closes all database connections
// It logs messages indicating disconnection from each database
func (s *service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
var errs []error
// Close SQL databases
for name, db := range s.sqlDatabases {
if err := db.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close database %s: %w", name, err))
@@ -559,7 +501,18 @@ func (s *service) Close() error {
}
}
// Close MongoDB clients
for name, replicas := range s.readReplicas {
for i, db := range replicas {
if db != nil {
if err := db.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close read replica %s[%d]: %w", name, i, err))
} else {
log.Printf("Disconnected from read replica: %s[%d]", name, i)
}
}
}
}
for name, client := range s.mongoClients {
if err := client.Disconnect(context.Background()); err != nil {
errs = append(errs, fmt.Errorf("failed to disconnect MongoDB client %s: %w", name, err))
@@ -570,7 +523,9 @@ func (s *service) Close() error {
s.sqlDatabases = make(map[string]*sql.DB)
s.mongoClients = make(map[string]*mongo.Client)
s.configs = make(map[string]DatabaseConfig)
s.readReplicas = make(map[string][]*sql.DB)
s.configs = make(map[string]config.DatabaseConfig)
s.readConfigs = make(map[string][]config.DatabaseConfig)
if len(errs) > 0 {
return fmt.Errorf("errors closing databases: %v", errs)
@@ -578,3 +533,6 @@ func (s *service) Close() error {
return nil
}
// Import necessary packages