Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/scdbserver/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ engine:
"protocol": "SEMI2K",
"field": "FM64"
}
infoschema_cache:
enabled: true
ttl: 10m
1 change: 1 addition & 0 deletions cmd/scdbserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func main() {

log.Info("Starting to connect to database and do bootstrap if necessary...")
storage.InitPasswordValidation(cfg.PasswordCheck)
storage.InitInfoSchemaCache(cfg.InfoSchemaCache.Enabled, cfg.InfoSchemaCache.TTL)
store, err := server.NewDbConnWithBootstrap(&cfg.Storage)
if err != nil {
log.Fatalf("Failed to connect to database and bootstrap it: %v", err)
Expand Down
11 changes: 11 additions & 0 deletions pkg/scdb/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
DefaultProtocol = "https"
DefaultLogLevel = "info"
DefaultEngineClientMode = "HTTP"
DefaultInfoSchemaCacheTTL = 10 * time.Minute // 10 minutes
)

type EngineConfig struct {
Expand All @@ -62,6 +63,11 @@ type SecurityCompromiseConf struct {
RevealGroupCount bool `yaml:"reveal_group_count"`
}

type InfoSchemaCacheConf struct {
Enabled bool `yaml:"enabled"`
TTL time.Duration `yaml:"ttl"`
}

// Config contains bootstrap configuration for SCDB
type Config struct {
// SCDBHost is used as callback url for engine worked in async mode
Expand All @@ -79,6 +85,7 @@ type Config struct {
Engine EngineConfig `yaml:"engine"`
SecurityCompromise SecurityCompromiseConf `yaml:"security_compromise"`
PartyAuth PartyAuthConf `yaml:"party_auth"`
InfoSchemaCache InfoSchemaCacheConf `yaml:"infoschema_cache"`
}

const (
Expand Down Expand Up @@ -164,6 +171,10 @@ func NewDefaultConfig() *Config {
EnableTimestampCheck: true,
ValidityPeriod: 30 * time.Second, // 30s
}
config.InfoSchemaCache = InfoSchemaCacheConf{
Enabled: true,
TTL: DefaultInfoSchemaCacheTTL,
}
return &config
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/scdb/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ security_compromise:
ValidityPeriod: time.Minute * 3,
},
SecurityCompromise: SecurityCompromiseConf{GroupByThreshold: 3},
InfoSchemaCache: InfoSchemaCacheConf{
Enabled: true,
TTL: DefaultInfoSchemaCacheTTL,
},
}
expectedCfg.Engine.SpuRuntimeCfg = strings.ReplaceAll(expectedCfg.Engine.SpuRuntimeCfg, "\t", " ")
r.Equal(expectedCfg, cfg)
Expand Down Expand Up @@ -206,6 +210,10 @@ party_auth:
EnableTimestampCheck: true,
ValidityPeriod: time.Minute * 3,
},
InfoSchemaCache: InfoSchemaCacheConf{
Enabled: true,
TTL: DefaultInfoSchemaCacheTTL,
},
}
expectedCfg.Engine.SpuRuntimeCfg = strings.ReplaceAll(expectedCfg.Engine.SpuRuntimeCfg, "\t", " ")
r.Equal(expectedCfg, cfg)
Expand Down
27 changes: 27 additions & 0 deletions pkg/scdb/executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,41 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
switch x := e.stmt.(type) {
case *ast.CreateDatabaseStmt:
err = e.executeCreateDatabase(x)
if err == nil {
storage.InvalidateInfoSchemaCache(strings.ToLower(x.Name))
}
case *ast.CreateTableStmt:
err = e.executeCreateTable(x)
if err == nil {
dbName := x.Table.Schema.O
if dbName == "" {
dbName = e.ctx.GetSessionVars().CurrentDB
}
storage.InvalidateInfoSchemaCache(dbName)
}
case *ast.DropDatabaseStmt:
err = e.executeDropDatabase(x)
if err == nil {
storage.InvalidateInfoSchemaCache(strings.ToLower(x.Name))
}
case *ast.DropTableStmt:
err = e.executeDropTableOrView(x)
if err == nil && len(x.Tables) > 0 {
dbName := x.Tables[0].Schema.L
if dbName == "" {
dbName = e.ctx.GetSessionVars().CurrentDB
}
storage.InvalidateInfoSchemaCache(dbName)
}
case *ast.CreateViewStmt:
err = e.executeCreateView(x)
if err == nil {
dbName := x.ViewName.Schema.L
if dbName == "" {
dbName = e.ctx.GetSessionVars().CurrentDB
}
storage.InvalidateInfoSchemaCache(dbName)
}
default:
err = fmt.Errorf("ddl.Next: Unsupported statement %v", x)

Expand Down
79 changes: 79 additions & 0 deletions pkg/scdb/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/sethvargo/go-password/password"
"gorm.io/gorm"
Expand Down Expand Up @@ -53,6 +56,30 @@ var EnablePasswordCheck = false
// keep creating order
var allTables = []interface{}{&User{}, &Database{}, &Table{}, &Column{}, &DatabasePriv{}, &TablePriv{}, &ColumnPriv{}}

// InfoSchema cache entry with TTL support
type infoCacheEntry struct {
schema infoschema.InfoSchema
createTime time.Time
}

// InfoSchema cache to avoid frequent database queries
var (
infoSchemaCache sync.Map // map[string]*infoCacheEntry, key is database name
infoCacheHitCount int64 // accessed via atomic operations
infoCacheMissCount int64 // accessed via atomic operations
cacheEnabled bool // whether cache is enabled
cacheTTL time.Duration
)

// InitInfoSchemaCache initializes the InfoSchema cache configuration
func InitInfoSchemaCache(enabled bool, ttl time.Duration) {
cacheEnabled = enabled
cacheTTL = ttl
if ttl <= 0 {
ttl = 10 * time.Minute // default to 10 minutes
}
}

// NeedBootstrap checks if the store is empty
func NeedBootstrap(store *gorm.DB) bool {
for _, tn := range allTables {
Expand Down Expand Up @@ -173,6 +200,32 @@ func FindUserByParty(store *gorm.DB, partyCode string) (*User, error) {
return &user, nil
}

// InvalidateInfoSchemaCache invalidates the InfoSchema cache for a specific database
// If dbName is empty, it invalidates all cached InfoSchemas
func InvalidateInfoSchemaCache(dbName string) {
if dbName == "" {
// Clear all cache by iterating and deleting each key to avoid race conditions
infoSchemaCache.Range(func(key, value interface{}) bool {
infoSchemaCache.Delete(key)
return true
})
} else {
// Clear specific database cache
infoSchemaCache.Delete(dbName)
}
}

// ResetInfoSchemaCacheStats resets cache statistics (for testing)
func ResetInfoSchemaCacheStats() {
atomic.StoreInt64(&infoCacheHitCount, 0)
atomic.StoreInt64(&infoCacheMissCount, 0)
}

// GetInfoSchemaCacheStats returns cache hit and miss statistics
func GetInfoSchemaCacheStats() (hits, misses int64) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这两个func只是为了测试用吗?可以看看是否适合加到metrics里面

return atomic.LoadInt64(&infoCacheHitCount), atomic.LoadInt64(&infoCacheMissCount)
}

func QueryInfoSchema(store *gorm.DB) (result infoschema.InfoSchema, err error) {
callFc := func(tx *gorm.DB) error {
result, err = queryInfoSchema(tx)
Expand Down Expand Up @@ -244,13 +297,39 @@ func queryInfoSchema(store *gorm.DB) (infoschema.InfoSchema, error) {
}

func QueryDBInfoSchema(store *gorm.DB, dbName string) (result infoschema.InfoSchema, err error) {
// Try to get from cache first if enabled
if cacheEnabled {
if cached, ok := infoSchemaCache.Load(dbName); ok {
entry := cached.(*infoCacheEntry)
// Check if cache entry is still valid (not expired)
if time.Since(entry.createTime) < cacheTTL {
atomic.AddInt64(&infoCacheHitCount, 1)
return entry.schema, nil
}
// Cache expired, remove it
infoSchemaCache.Delete(dbName)
}
}

// Cache miss or disabled, query from database
atomic.AddInt64(&infoCacheMissCount, 1)

callFc := func(tx *gorm.DB) error {
result, err = queryDBInfoSchema(tx, dbName)
return err
}
if err := store.Transaction(callFc, &sql.TxOptions{ReadOnly: true}); err != nil {
return nil, fmt.Errorf("queryDBInfoSchema: %v", err)
}

// Store in cache if enabled
if cacheEnabled && result != nil {
infoSchemaCache.Store(dbName, &infoCacheEntry{
schema: result,
createTime: time.Now(),
})
}

return result, nil
}

Expand Down
123 changes: 123 additions & 0 deletions pkg/scdb/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package storage

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"gorm.io/driver/sqlite"
Expand Down Expand Up @@ -107,3 +108,125 @@ func batchInsert(db *gorm.DB, records []interface{}) error {
}
return nil
}

func createTestDB(t *testing.T, db *gorm.DB, dbName string) {
r := require.New(t)
records := []interface{}{
&Database{Db: dbName},
&Table{Db: dbName, Table: "t1", Owner: "root", Host: "%", RefDb: "ref", RefTable: "ref", DBType: 0},
&Column{Db: dbName, TableName: "t1", ColumnName: "c1", Type: "int"},
}
r.NoError(batchInsert(db, records))
}

func TestInfoSchemaCache(t *testing.T) {
r := require.New(t)
db, err := newDbStore()
r.NoError(err)

// Initialize cache with enabled=true and a reasonable TTL
InitInfoSchemaCache(true, 5*time.Minute)
InvalidateInfoSchemaCache("")
ResetInfoSchemaCacheStats()

createTestDB(t, db, "db1")
createTestDB(t, db, "db2")

// Test cache miss and hit
_, err = QueryDBInfoSchema(db, "db1")
r.NoError(err)
hits, misses := GetInfoSchemaCacheStats()
r.Equal(int64(0), hits)
r.Equal(int64(1), misses)

_, err = QueryDBInfoSchema(db, "db1")
r.NoError(err)
hits, misses = GetInfoSchemaCacheStats()
r.Equal(int64(1), hits)
r.Equal(int64(1), misses)

// Test cache invalidation
InvalidateInfoSchemaCache("db1")
_, err = QueryDBInfoSchema(db, "db1")
r.NoError(err)
hits, misses = GetInfoSchemaCacheStats()
r.Equal(int64(1), hits)
r.Equal(int64(2), misses)

// Test multiple databases cache isolation
_, err = QueryDBInfoSchema(db, "db2")
r.NoError(err)
hits, misses = GetInfoSchemaCacheStats()
r.Equal(int64(1), hits)
r.Equal(int64(3), misses)

InvalidateInfoSchemaCache("db2")
_, err = QueryDBInfoSchema(db, "db1")
r.NoError(err)
hits, misses = GetInfoSchemaCacheStats()
r.Equal(int64(2), hits) // db1 still cached
r.Equal(int64(3), misses)
}

func TestInfoSchemaCacheTTL(t *testing.T) {
r := require.New(t)
db, err := newDbStore()
r.NoError(err)

// Initialize cache with a short TTL for testing
InitInfoSchemaCache(true, 100*time.Millisecond)
InvalidateInfoSchemaCache("")
ResetInfoSchemaCacheStats()

createTestDB(t, db, "db1")

// First query - cache miss
_, err = QueryDBInfoSchema(db, "db1")
r.NoError(err)
hits, misses := GetInfoSchemaCacheStats()
r.Equal(int64(0), hits)
r.Equal(int64(1), misses)

// Second query immediately - cache hit
_, err = QueryDBInfoSchema(db, "db1")
r.NoError(err)
hits, misses = GetInfoSchemaCacheStats()
r.Equal(int64(1), hits)
r.Equal(int64(1), misses)

// Wait for cache to expire
time.Sleep(150 * time.Millisecond)

// Query after TTL expired - should be cache miss
_, err = QueryDBInfoSchema(db, "db1")
r.NoError(err)
hits, misses = GetInfoSchemaCacheStats()
r.Equal(int64(1), hits)
r.Equal(int64(2), misses)
}

func TestInfoSchemaCacheDisabled(t *testing.T) {
r := require.New(t)
db, err := newDbStore()
r.NoError(err)

// Disable cache
InitInfoSchemaCache(false, 5*time.Minute)
InvalidateInfoSchemaCache("")
ResetInfoSchemaCacheStats()

createTestDB(t, db, "db1")

// All queries should be cache misses when disabled
_, err = QueryDBInfoSchema(db, "db1")
r.NoError(err)
hits, misses := GetInfoSchemaCacheStats()
r.Equal(int64(0), hits)
r.Equal(int64(1), misses)

_, err = QueryDBInfoSchema(db, "db1")
r.NoError(err)
hits, misses = GetInfoSchemaCacheStats()
r.Equal(int64(0), hits)
r.Equal(int64(2), misses)
}
Loading