initiate repo
This commit is contained in:
@@ -0,0 +1,187 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"antrian-operasi/internal/config"
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/lib/pq"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
type DatabaseType string
|
||||
|
||||
const (
|
||||
Postgres DatabaseType = "postgres"
|
||||
MySQL DatabaseType = "mysql"
|
||||
SQLServer DatabaseType = "sqlserver"
|
||||
SQLite DatabaseType = "sqlite"
|
||||
MongoDB DatabaseType = "mongodb"
|
||||
)
|
||||
|
||||
type Service interface {
|
||||
Health() map[string]map[string]string
|
||||
GetDB(name string) (*sql.DB, error)
|
||||
GetSQLXDB(name string) (*sqlx.DB, error) // Tambahkan metode ini
|
||||
GetMongoClient(name string) (*mongo.Client, error)
|
||||
GetReadDB(name string) (*sql.DB, error)
|
||||
Close() error
|
||||
ListDBs() []string
|
||||
GetDBType(name string) (DatabaseType, error)
|
||||
ListenForChanges(ctx context.Context, dbName string, channels []string, callback func(string, string)) error
|
||||
NotifyChange(dbName, channel, payload string) error
|
||||
GetPrimaryDB(name string) (*sql.DB, error)
|
||||
ExecuteQuery(ctx context.Context, dbName string, query string, args ...interface{}) (*sql.Rows, error)
|
||||
ExecuteQueryRow(ctx context.Context, dbName string, query string, args ...interface{}) *sql.Row
|
||||
Exec(ctx context.Context, dbName string, query string, args ...interface{}) (sql.Result, error)
|
||||
}
|
||||
|
||||
type service struct {
|
||||
sqlDatabases map[string]*sql.DB
|
||||
sqlxDatabases map[string]*sqlx.DB // Tambahkan map untuk sqlx.DB
|
||||
mongoClients map[string]*mongo.Client
|
||||
readReplicas map[string][]*sql.DB
|
||||
configs map[string]config.DatabaseConfig
|
||||
readConfigs map[string][]config.DatabaseConfig
|
||||
mu sync.RWMutex
|
||||
readBalancer map[string]int
|
||||
listeners map[string]*pq.Listener
|
||||
listenersMu sync.RWMutex
|
||||
}
|
||||
|
||||
var (
|
||||
dbManager *service
|
||||
once sync.Once
|
||||
)
|
||||
|
||||
func (s *service) loadFromConfig(cfg *config.Config) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Load primary databases
|
||||
for name, dbConfig := range cfg.Databases {
|
||||
fmt.Printf("db name : %s", name)
|
||||
s.configs[name] = dbConfig
|
||||
}
|
||||
|
||||
// Load read replicas
|
||||
for name, replicaConfigs := range cfg.ReadReplicas {
|
||||
s.readConfigs[name] = replicaConfigs
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) addDatabase(name string, config config.DatabaseConfig) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
var db *sql.DB
|
||||
var err error
|
||||
|
||||
dbType := DatabaseType(config.Type)
|
||||
log.Printf("Database type: %s", dbType)
|
||||
|
||||
switch dbType {
|
||||
case Postgres:
|
||||
db, err = s.openPostgresConnection(config)
|
||||
case MySQL:
|
||||
db, err = s.openMySQLConnection(config)
|
||||
case SQLServer:
|
||||
db, err = s.openSQLServerConnection(config)
|
||||
case SQLite:
|
||||
db, err = s.openSQLiteConnection(config)
|
||||
case MongoDB:
|
||||
return s.addMongoDB(name, config)
|
||||
default:
|
||||
return fmt.Errorf("unsupported database type: %s", config.Type)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Printf("❌ Error connecting to database %s: %v", name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("✅ Successfully connected to database: %s", name)
|
||||
return s.configureSQLDB(name, db, config)
|
||||
}
|
||||
|
||||
func (s *service) addReadReplica(name string, index int, config config.DatabaseConfig) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
var db *sql.DB
|
||||
var err error
|
||||
|
||||
dbType := DatabaseType(config.Type)
|
||||
|
||||
switch dbType {
|
||||
case Postgres:
|
||||
db, err = s.openPostgresConnection(config)
|
||||
case MySQL:
|
||||
db, err = s.openMySQLConnection(config)
|
||||
case SQLServer:
|
||||
db, err = s.openSQLServerConnection(config)
|
||||
case SQLite:
|
||||
db, err = s.openSQLiteConnection(config)
|
||||
default:
|
||||
return fmt.Errorf("unsupported database type for read replica: %s", config.Type)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.readReplicas[name] == nil {
|
||||
s.readReplicas[name] = make([]*sql.DB, 0)
|
||||
}
|
||||
|
||||
// Ensure we have enough slots
|
||||
for len(s.readReplicas[name]) <= index {
|
||||
s.readReplicas[name] = append(s.readReplicas[name], nil)
|
||||
}
|
||||
|
||||
s.readReplicas[name][index] = db
|
||||
log.Printf("Successfully connected to read replica %s[%d]", name, index)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func New(cfg *config.Config) Service {
|
||||
once.Do(func() {
|
||||
dbManager = &service{
|
||||
sqlDatabases: make(map[string]*sql.DB),
|
||||
sqlxDatabases: make(map[string]*sqlx.DB), // Inisialisasi map sqlx
|
||||
mongoClients: make(map[string]*mongo.Client),
|
||||
readReplicas: make(map[string][]*sql.DB),
|
||||
configs: make(map[string]config.DatabaseConfig),
|
||||
readConfigs: make(map[string][]config.DatabaseConfig),
|
||||
readBalancer: make(map[string]int),
|
||||
listeners: make(map[string]*pq.Listener),
|
||||
}
|
||||
|
||||
log.Println("Initializing database service...")
|
||||
dbManager.loadFromConfig(cfg)
|
||||
fmt.Printf("%#v\n", dbManager.configs)
|
||||
|
||||
// Initialize all databases
|
||||
for name, dbConfig := range dbManager.configs {
|
||||
if err := dbManager.addDatabase(name, dbConfig); err != nil {
|
||||
log.Printf("Failed to connect to database %s: %v", name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize read replicas
|
||||
for name, replicaConfigs := range dbManager.readConfigs {
|
||||
for i, replicaConfig := range replicaConfigs {
|
||||
if err := dbManager.addReadReplica(name, i, replicaConfig); err != nil {
|
||||
log.Printf("Failed to connect to read replica %s[%d]: %v", name, i, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return dbManager
|
||||
}
|
||||
Reference in New Issue
Block a user