Skip to content

Commit

Permalink
feat(factory): embrace the factory pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
grandwizard28 committed Jan 14, 2025
1 parent 50a06d1 commit e35d1c2
Show file tree
Hide file tree
Showing 43 changed files with 718 additions and 451 deletions.
24 changes: 12 additions & 12 deletions ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,22 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {

// NewServer creates and initializes Server
func NewServer(serverOptions *ServerOptions, config signoz.Config, signoz *signoz.SigNoz) (*Server, error) {
modelDao, err := dao.InitDao(signoz.SqlStore.Provider().SqlxDB())
modelDao, err := dao.InitDao(signoz.SQLStore.SQLxDB())
if err != nil {
return nil, err
}

baseexplorer.InitWithDB(signoz.SqlStore.Provider().SqlxDB())
preferences.InitDB(signoz.SqlStore.Provider().SqlxDB())
dashboards.InitDB(signoz.SqlStore.Provider().SqlxDB())
baseexplorer.InitWithDB(signoz.SQLStore.SQLxDB())
preferences.InitDB(signoz.SQLStore.SQLxDB())
dashboards.InitDB(signoz.SQLStore.SQLxDB())

gatewayProxy, err := gateway.NewProxy(serverOptions.GatewayUrl, gateway.RoutePrefix)
if err != nil {
return nil, err
}

// initiate license manager
lm, err := licensepkg.StartManager("sqlite", signoz.SqlStore.Provider().SqlxDB())
lm, err := licensepkg.StartManager("sqlite", signoz.SQLStore.SQLxDB())
if err != nil {
return nil, err
}
Expand All @@ -140,7 +140,7 @@ func NewServer(serverOptions *ServerOptions, config signoz.Config, signoz *signo
if storage == "clickhouse" {
zap.L().Info("Using ClickHouse as datastore ...")
qb := db.NewDataConnector(
signoz.SqlStore.Provider().SqlxDB(),
signoz.SQLStore.SQLxDB(),
serverOptions.PromConfigPath,
lm,
serverOptions.MaxIdleConns,
Expand Down Expand Up @@ -176,7 +176,7 @@ func NewServer(serverOptions *ServerOptions, config signoz.Config, signoz *signo
rm, err := makeRulesManager(serverOptions.PromConfigPath,
baseconst.GetAlertManagerApiPrefix(),
serverOptions.RuleRepoURL,
signoz.SqlStore.Provider().SqlxDB(),
signoz.SQLStore.SQLxDB(),
reader,
c,
serverOptions.DisableRules,
Expand All @@ -197,16 +197,16 @@ func NewServer(serverOptions *ServerOptions, config signoz.Config, signoz *signo
}()

// initiate opamp
_ = opAmpModel.InitDB(signoz.SqlStore.Provider().SqlxDB())
_ = opAmpModel.InitDB(signoz.SQLStore.SQLxDB())

integrationsController, err := integrations.NewController(signoz.SqlStore.Provider().SqlxDB())
integrationsController, err := integrations.NewController(signoz.SQLStore.SQLxDB())
if err != nil {
return nil, fmt.Errorf(
"couldn't create integrations controller: %w", err,
)
}

cloudIntegrationsController, err := cloudintegrations.NewController(signoz.SqlStore.Provider().SqlxDB())
cloudIntegrationsController, err := cloudintegrations.NewController(signoz.SQLStore.SQLxDB())
if err != nil {
return nil, fmt.Errorf(
"couldn't create cloud provider integrations controller: %w", err,
Expand All @@ -215,15 +215,15 @@ func NewServer(serverOptions *ServerOptions, config signoz.Config, signoz *signo

// ingestion pipelines manager
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
signoz.SqlStore.Provider().SqlxDB(), integrationsController.GetPipelinesForInstalledIntegrations,
signoz.SQLStore.SQLxDB(), integrationsController.GetPipelinesForInstalledIntegrations,
)
if err != nil {
return nil, err
}

// initiate agent config handler
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: signoz.SqlStore.Provider().SqlxDB(),
DB: signoz.SQLStore.SQLxDB(),
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
})
if err != nil {
Expand Down
20 changes: 19 additions & 1 deletion ee/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"go.signoz.io/signoz/pkg/config"
signozconfig "go.signoz.io/signoz/pkg/config"
"go.signoz.io/signoz/pkg/config/provider/envprovider"
"go.signoz.io/signoz/pkg/instrumentation"
"go.signoz.io/signoz/pkg/query-service/auth"
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/version"
"go.signoz.io/signoz/pkg/signoz"
pkgversion "go.signoz.io/signoz/pkg/version"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

Expand Down Expand Up @@ -143,7 +145,23 @@ func main() {
zap.L().Fatal("Failed to create config", zap.Error(err))
}

signoz, err := signoz.New(config)
instrumentation, err := instrumentation.New(context.Background(), pkgversion.Build{}, instrumentation.Config{
Logs: instrumentation.LogsConfig{
Enabled: false,
Level: zapcore.InfoLevel,
},
Traces: instrumentation.TracesConfig{
Enabled: false,
},
Metrics: instrumentation.MetricsConfig{
Enabled: false,
},
})
if err != nil {
zap.L().Fatal("Failed to create instrumentation", zap.Error(err))
}

signoz, err := signoz.New(context.Background(), instrumentation, config, signoz.NewProviderFactories())
if err != nil {
zap.L().Fatal("Failed to create signoz struct", zap.Error(err))
}
Expand Down
17 changes: 5 additions & 12 deletions pkg/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ import (
"time"

go_cache "github.com/patrickmn/go-cache"
"go.signoz.io/signoz/pkg/config"
"go.signoz.io/signoz/pkg/factory"
)

// Config satisfies the confmap.Config interface
var _ config.Config = (*Config)(nil)

type Memory struct {
TTL time.Duration `mapstructure:"ttl"`
CleanupInterval time.Duration `mapstructure:"cleanupInterval"`
Expand All @@ -28,11 +25,11 @@ type Config struct {
Redis Redis `mapstructure:"redis"`
}

func NewConfigFactory() config.ConfigFactory {
return config.NewConfigFactory(newConfig)
func NewConfigFactory() factory.ConfigFactory {
return factory.NewConfigFactory(factory.MustNewName("cache"), newConfig)
}

func newConfig() config.Config {
func newConfig() factory.Config {
return &Config{
Provider: "memory",
Memory: Memory{
Expand All @@ -49,10 +46,6 @@ func newConfig() config.Config {

}

func (c *Config) Key() string {
return "cache"
}

func (c *Config) Validate() error {
func (c Config) Validate() error {
return nil
}
101 changes: 101 additions & 0 deletions pkg/cache/provider/memory/memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package memory

import (
"context"
"fmt"
"reflect"
"time"

gocache "github.com/patrickmn/go-cache"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/factory"
)

type memory struct {
cc *gocache.Cache
}

func NewFactory() factory.ProviderFactory[cache.Cache, cache.Config] {
return factory.NewProviderFactory(factory.MustNewName("memory"), New)
}

func New(ctx context.Context, settings factory.ProviderSettings, config cache.Config) (cache.Cache, error) {
return &memory{cc: gocache.New(config.Memory.TTL, config.Memory.CleanupInterval)}, nil
}

// Connect does nothing
func (c *memory) Connect(_ context.Context) error {
return nil
}

// Store stores the data in the cache
func (c *memory) Store(_ context.Context, cacheKey string, data cache.CacheableEntity, ttl time.Duration) error {
// check if the data being passed is a pointer and is not nil
rv := reflect.ValueOf(data)
if rv.Kind() != reflect.Pointer || rv.IsNil() {
return cache.WrapCacheableEntityErrors(reflect.TypeOf(data), "inmemory")
}

c.cc.Set(cacheKey, data, ttl)
return nil
}

// Retrieve retrieves the data from the cache
func (c *memory) Retrieve(_ context.Context, cacheKey string, dest cache.CacheableEntity, allowExpired bool) (cache.RetrieveStatus, error) {
// check if the destination being passed is a pointer and is not nil
dstv := reflect.ValueOf(dest)
if dstv.Kind() != reflect.Pointer || dstv.IsNil() {
return cache.RetrieveStatusError, cache.WrapCacheableEntityErrors(reflect.TypeOf(dest), "inmemory")
}

// check if the destination value is settable
if !dstv.Elem().CanSet() {
return cache.RetrieveStatusError, fmt.Errorf("destination value is not settable, %s", dstv.Elem())
}

data, found := c.cc.Get(cacheKey)
if !found {
return cache.RetrieveStatusKeyMiss, nil
}

// check the type compatbility between the src and dest
srcv := reflect.ValueOf(data)
if !srcv.Type().AssignableTo(dstv.Type()) {
return cache.RetrieveStatusError, fmt.Errorf("src type is not assignable to dst type")
}

// set the value to from src to dest
dstv.Elem().Set(srcv.Elem())
return cache.RetrieveStatusHit, nil
}

// SetTTL sets the TTL for the cache entry
func (c *memory) SetTTL(_ context.Context, cacheKey string, ttl time.Duration) {
item, found := c.cc.Get(cacheKey)
if !found {
return
}
c.cc.Replace(cacheKey, item, ttl)
}

// Remove removes the cache entry
func (c *memory) Remove(_ context.Context, cacheKey string) {
c.cc.Delete(cacheKey)
}

// BulkRemove removes the cache entries
func (c *memory) BulkRemove(_ context.Context, cacheKeys []string) {
for _, cacheKey := range cacheKeys {
c.cc.Delete(cacheKey)
}
}

// Close does nothing
func (c *memory) Close(_ context.Context) error {
return nil
}

// Configuration returns the cache configuration
func (c *memory) Configuration() *cache.Memory {
return nil
}
Loading

0 comments on commit e35d1c2

Please sign in to comment.