Update Penggunaan Gorm
This commit is contained in:
@@ -12,15 +12,17 @@ import (
|
||||
"time"
|
||||
|
||||
"api-service/internal/config"
|
||||
"api-service/internal/utils/validation"
|
||||
|
||||
_ "github.com/jackc/pgx/v5" // Import pgx driver
|
||||
"github.com/lib/pq"
|
||||
_ "gorm.io/driver/postgres" // Import GORM PostgreSQL driver
|
||||
"gorm.io/driver/mysql"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/driver/sqlserver"
|
||||
"gorm.io/gorm"
|
||||
|
||||
_ "github.com/go-sql-driver/mysql" // MySQL driver for database/sql
|
||||
_ "gorm.io/driver/mysql" // GORM MySQL driver
|
||||
_ "gorm.io/driver/sqlserver" // GORM SQL Server driver
|
||||
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
@@ -40,6 +42,7 @@ const (
|
||||
type Service interface {
|
||||
Health() map[string]map[string]string
|
||||
GetDB(name string) (*sql.DB, error)
|
||||
GetGormDB(name string) (*gorm.DB, error) // New method for GORM
|
||||
GetMongoClient(name string) (*mongo.Client, error)
|
||||
GetReadDB(name string) (*sql.DB, error) // For read replicas
|
||||
Close() error
|
||||
@@ -49,18 +52,21 @@ type Service interface {
|
||||
ListenForChanges(ctx context.Context, dbName string, channels []string, callback func(string, string)) error
|
||||
NotifyChange(dbName, channel, payload string) error
|
||||
GetPrimaryDB(name string) (*sql.DB, error) // Helper untuk get primary DB
|
||||
GetSanitizer() *validation.InputSanitizer // Get sanitizer instance
|
||||
}
|
||||
|
||||
type service struct {
|
||||
sqlDatabases map[string]*sql.DB
|
||||
mongoClients map[string]*mongo.Client
|
||||
readReplicas map[string][]*sql.DB // Read replicas for load balancing
|
||||
configs map[string]config.DatabaseConfig
|
||||
readConfigs map[string][]config.DatabaseConfig
|
||||
mu sync.RWMutex
|
||||
readBalancer map[string]int // Round-robin counter for read replicas
|
||||
listeners map[string]*pq.Listener // Tambahkan untuk tracking listeners
|
||||
listenersMu sync.RWMutex
|
||||
sqlDatabases map[string]*sql.DB
|
||||
gormDatabases map[string]*gorm.DB // New field for GORM connections
|
||||
mongoClients map[string]*mongo.Client
|
||||
readReplicas map[string][]*sql.DB // Read replicas for load balancing
|
||||
configs map[string]config.DatabaseConfig
|
||||
readConfigs map[string][]config.DatabaseConfig
|
||||
mu sync.RWMutex
|
||||
readBalancer map[string]int // Round-robin counter for read replicas
|
||||
listeners map[string]*pq.Listener // Tambahkan untuk tracking listeners
|
||||
listenersMu sync.RWMutex
|
||||
sanitizer *validation.InputSanitizer // Input sanitizer for security
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -72,13 +78,15 @@ var (
|
||||
func New(cfg *config.Config) Service {
|
||||
once.Do(func() {
|
||||
dbManager = &service{
|
||||
sqlDatabases: make(map[string]*sql.DB),
|
||||
mongoClients: make(map[string]*mongo.Client),
|
||||
readReplicas: make(map[string][]*sql.DB),
|
||||
configs: make(map[string]config.DatabaseConfig),
|
||||
readConfigs: make(map[string][]config.DatabaseConfig),
|
||||
readBalancer: make(map[string]int),
|
||||
listeners: make(map[string]*pq.Listener),
|
||||
sqlDatabases: make(map[string]*sql.DB),
|
||||
gormDatabases: make(map[string]*gorm.DB),
|
||||
mongoClients: make(map[string]*mongo.Client),
|
||||
readReplicas: make(map[string][]*sql.DB),
|
||||
configs: make(map[string]config.DatabaseConfig),
|
||||
readConfigs: make(map[string][]config.DatabaseConfig),
|
||||
readBalancer: make(map[string]int),
|
||||
listeners: make(map[string]*pq.Listener),
|
||||
sanitizer: validation.NewInputSanitizer(1000), // Initialize sanitizer with max length 1000
|
||||
}
|
||||
|
||||
log.Println("Initializing database service...") // Log when the initialization starts
|
||||
@@ -161,7 +169,13 @@ func (s *service) addDatabase(name string, config config.DatabaseConfig) error {
|
||||
}
|
||||
|
||||
log.Printf("✅ Successfully connected to database: %s", name)
|
||||
return s.configureSQLDB(name, db, config.MaxOpenConns, config.MaxIdleConns, config.ConnMaxLifetime)
|
||||
err = s.configureSQLDB(name, db, config.MaxOpenConns, config.MaxIdleConns, config.ConnMaxLifetime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize GORM for SQL databases
|
||||
return s.configureGormDB(name, db, config)
|
||||
}
|
||||
|
||||
func (s *service) addReadReplica(name string, index int, config config.DatabaseConfig) error {
|
||||
@@ -317,6 +331,41 @@ func (s *service) configureSQLDB(name string, db *sql.DB, maxOpenConns, maxIdleC
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *service) configureGormDB(name string, db *sql.DB, config config.DatabaseConfig) error {
|
||||
var gormDB *gorm.DB
|
||||
var err error
|
||||
|
||||
dbType := DatabaseType(config.Type)
|
||||
|
||||
switch dbType {
|
||||
case Postgres:
|
||||
gormDB, err = gorm.Open(postgres.New(postgres.Config{
|
||||
Conn: db,
|
||||
}), &gorm.Config{})
|
||||
case MySQL:
|
||||
gormDB, err = gorm.Open(mysql.New(mysql.Config{
|
||||
Conn: db,
|
||||
}), &gorm.Config{})
|
||||
case SQLServer:
|
||||
gormDB, err = gorm.Open(sqlserver.New(sqlserver.Config{
|
||||
Conn: db,
|
||||
}), &gorm.Config{})
|
||||
case SQLite:
|
||||
gormDB, err = gorm.Open(sqlite.Open(config.Path), &gorm.Config{})
|
||||
default:
|
||||
return fmt.Errorf("unsupported database type for GORM: %s", config.Type)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize GORM for %s: %w", name, err)
|
||||
}
|
||||
|
||||
s.gormDatabases[name] = gormDB
|
||||
log.Printf("Successfully initialized GORM for database: %s", name)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Health checks the health of all database connections by pinging each database.
|
||||
func (s *service) Health() map[string]map[string]string {
|
||||
s.mu.RLock()
|
||||
@@ -486,6 +535,19 @@ func (s *service) GetReadDB(name string) (*sql.DB, error) {
|
||||
return selected, nil
|
||||
}
|
||||
|
||||
// GetGormDB returns a specific GORM database connection by name
|
||||
func (s *service) GetGormDB(name string) (*gorm.DB, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
gormDB, exists := s.gormDatabases[name]
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("GORM database %s not found", name)
|
||||
}
|
||||
|
||||
return gormDB, nil
|
||||
}
|
||||
|
||||
// GetMongoClient returns a specific MongoDB client by name
|
||||
func (s *service) GetMongoClient(name string) (*mongo.Client, error) {
|
||||
s.mu.RLock()
|
||||
@@ -583,6 +645,11 @@ func (s *service) GetPrimaryDB(name string) (*sql.DB, error) {
|
||||
return s.GetDB(name)
|
||||
}
|
||||
|
||||
// GetSanitizer returns the input sanitizer instance
|
||||
func (s *service) GetSanitizer() *validation.InputSanitizer {
|
||||
return s.sanitizer
|
||||
}
|
||||
|
||||
// ListenForChanges implements PostgreSQL LISTEN/NOTIFY for real-time updates
|
||||
func (s *service) ListenForChanges(ctx context.Context, dbName string, channels []string, callback func(string, string)) error {
|
||||
s.mu.RLock()
|
||||
|
||||
Reference in New Issue
Block a user