package database import ( "antrian-operasi/internal/config" "context" "database/sql" "fmt" "log" "sync" "github.com/jmoiron/sqlx" "github.com/lib/pq" "go.mongodb.org/mongo-driver/mongo" ) type DatabaseType string const ( Postgres DatabaseType = "postgres" MySQL DatabaseType = "mysql" SQLServer DatabaseType = "sqlserver" SQLite DatabaseType = "sqlite" MongoDB DatabaseType = "mongodb" ) type Service interface { Health() map[string]map[string]string GetDB(name string) (*sql.DB, error) GetSQLXDB(name string) (*sqlx.DB, error) // Tambahkan metode ini GetMongoClient(name string) (*mongo.Client, error) GetReadDB(name string) (*sql.DB, error) Close() error ListDBs() []string GetDBType(name string) (DatabaseType, error) ListenForChanges(ctx context.Context, dbName string, channels []string, callback func(string, string)) error NotifyChange(dbName, channel, payload string) error GetPrimaryDB(name string) (*sql.DB, error) ExecuteQuery(ctx context.Context, dbName string, query string, args ...interface{}) (*sql.Rows, error) ExecuteQueryRow(ctx context.Context, dbName string, query string, args ...interface{}) *sql.Row Exec(ctx context.Context, dbName string, query string, args ...interface{}) (sql.Result, error) } type service struct { sqlDatabases map[string]*sql.DB sqlxDatabases map[string]*sqlx.DB // Tambahkan map untuk sqlx.DB mongoClients map[string]*mongo.Client readReplicas map[string][]*sql.DB configs map[string]config.DatabaseConfig readConfigs map[string][]config.DatabaseConfig mu sync.RWMutex readBalancer map[string]int listeners map[string]*pq.Listener listenersMu sync.RWMutex } var ( dbManager *service once sync.Once ) func (s *service) loadFromConfig(cfg *config.Config) { s.mu.Lock() defer s.mu.Unlock() // Load primary databases for name, dbConfig := range cfg.Databases { fmt.Printf("db name : %s", name) s.configs[name] = dbConfig } // Load read replicas for name, replicaConfigs := range cfg.ReadReplicas { s.readConfigs[name] = replicaConfigs } } func (s *service) addDatabase(name string, config config.DatabaseConfig) error { s.mu.Lock() defer s.mu.Unlock() var db *sql.DB var err error dbType := DatabaseType(config.Type) log.Printf("Database type: %s", dbType) switch dbType { case Postgres: db, err = s.openPostgresConnection(config) case MySQL: db, err = s.openMySQLConnection(config) case SQLServer: db, err = s.openSQLServerConnection(config) case SQLite: db, err = s.openSQLiteConnection(config) case MongoDB: return s.addMongoDB(name, config) default: return fmt.Errorf("unsupported database type: %s", config.Type) } if err != nil { log.Printf("❌ Error connecting to database %s: %v", name, err) return err } log.Printf("✅ Successfully connected to database: %s", name) return s.configureSQLDB(name, db, config) } func (s *service) addReadReplica(name string, index int, config config.DatabaseConfig) error { s.mu.Lock() defer s.mu.Unlock() var db *sql.DB var err error dbType := DatabaseType(config.Type) switch dbType { case Postgres: db, err = s.openPostgresConnection(config) case MySQL: db, err = s.openMySQLConnection(config) case SQLServer: db, err = s.openSQLServerConnection(config) case SQLite: db, err = s.openSQLiteConnection(config) default: return fmt.Errorf("unsupported database type for read replica: %s", config.Type) } if err != nil { return err } if s.readReplicas[name] == nil { s.readReplicas[name] = make([]*sql.DB, 0) } // Ensure we have enough slots for len(s.readReplicas[name]) <= index { s.readReplicas[name] = append(s.readReplicas[name], nil) } s.readReplicas[name][index] = db log.Printf("Successfully connected to read replica %s[%d]", name, index) return nil } func New(cfg *config.Config) Service { once.Do(func() { dbManager = &service{ sqlDatabases: make(map[string]*sql.DB), sqlxDatabases: make(map[string]*sqlx.DB), // Inisialisasi map sqlx mongoClients: make(map[string]*mongo.Client), readReplicas: make(map[string][]*sql.DB), configs: make(map[string]config.DatabaseConfig), readConfigs: make(map[string][]config.DatabaseConfig), readBalancer: make(map[string]int), listeners: make(map[string]*pq.Listener), } log.Println("Initializing database service...") dbManager.loadFromConfig(cfg) fmt.Printf("%#v\n", dbManager.configs) // Initialize all databases for name, dbConfig := range dbManager.configs { if err := dbManager.addDatabase(name, dbConfig); err != nil { log.Printf("Failed to connect to database %s: %v", name, err) } } // Initialize read replicas for name, replicaConfigs := range dbManager.readConfigs { for i, replicaConfig := range replicaConfigs { if err := dbManager.addReadReplica(name, i, replicaConfig); err != nil { log.Printf("Failed to connect to read replica %s[%d]: %v", name, i, err) } } } }) return dbManager }