Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #26 from johanneswuerbach/dependencies-prefetch
Browse files Browse the repository at this point in the history
feat: dependencies prefetch
  • Loading branch information
johanneswuerbach authored Mar 1, 2022
2 parents ab79357 + ba3cc81 commit e3e24d3
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 13 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {

logger.Debug("plugin configured")

s3Plugin, err := plugin.NewS3Plugin(logger, s3Svc, configuration.S3, athenaSvc, configuration.Athena)
s3Plugin, err := plugin.NewS3Plugin(ctx, logger, s3Svc, configuration.S3, athenaSvc, configuration.Athena)
if err != nil {
log.Fatalf("unable to create plugin, %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions plugin/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Athena struct {
MaxSpanAge string
DependenciesQueryTTL string
ServicesQueryTTL string
DependenciesPrefetch bool
}

type Configuration struct {
Expand Down
7 changes: 4 additions & 3 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package plugin

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go-v2/service/athena"
Expand All @@ -12,13 +13,13 @@ import (
"github.com/johanneswuerbach/jaeger-s3/plugin/s3spanstore"
)

func NewS3Plugin(logger hclog.Logger, s3Svc *s3.Client, s3Config config.S3, athenaSvc *athena.Client, athenaConfig config.Athena) (*S3Plugin, error) {
spanWriter, err := s3spanstore.NewWriter(logger, s3Svc, s3Config)
func NewS3Plugin(ctx context.Context, logger hclog.Logger, s3Svc *s3.Client, s3Config config.S3, athenaSvc *athena.Client, athenaConfig config.Athena) (*S3Plugin, error) {
spanWriter, err := s3spanstore.NewWriter(ctx, logger, s3Svc, s3Config)
if err != nil {
return nil, fmt.Errorf("failed to create span writer, %v", err)
}

spanReader, err := s3spanstore.NewReader(logger, athenaSvc, athenaConfig)
spanReader, err := s3spanstore.NewReader(ctx, logger, athenaSvc, athenaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create span reader, %v", err)
}
Expand Down
79 changes: 79 additions & 0 deletions plugin/s3spanstore/dependencies_prefetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package s3spanstore

import (
"context"
"math/rand"
"time"

"github.com/hashicorp/go-hclog"
"github.com/jaegertracing/jaeger/model"
)

type DependenciesPrefetch struct {
logger hclog.Logger
reader ReaderWithDependencies
ticker *time.Ticker
enabled bool
done chan bool
ctx context.Context
sleepDuration time.Duration
}

type ReaderWithDependencies interface {
GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error)
}

func NewDependenciesPrefetch(ctx context.Context, logger hclog.Logger, reader ReaderWithDependencies, interval time.Duration, enabled bool) *DependenciesPrefetch {
s1 := rand.NewSource(time.Now().UnixNano())
r1 := rand.New(s1)

return &DependenciesPrefetch{
logger: logger,
reader: reader,
ticker: time.NewTicker(interval),
enabled: enabled,
done: make(chan bool),
ctx: ctx,
sleepDuration: time.Second * time.Duration(r1.Intn(180)),
}
}

func (d *DependenciesPrefetch) Start() {
if !d.enabled {
return
}

go func() {
// Do an initial prefetch
d.prefetchDependencies()

// Schedule background prefetches
for {
select {
case <-d.done:
return
case <-d.ticker.C:
d.prefetchDependencies()
}
}
}()
}

func (d *DependenciesPrefetch) prefetchDependencies() {
// Ensure different readers don't refresh at the same time
time.Sleep(d.sleepDuration)

// GetDependencies to ensure the result is cached
if _, err := d.reader.GetDependencies(d.ctx, time.Now(), time.Hour*24*7); err != nil {
d.logger.Error("failed to get dependencies", err)
}
}

func (d *DependenciesPrefetch) Stop() {
if !d.enabled {
return
}

d.ticker.Stop()
d.done <- true
}
74 changes: 74 additions & 0 deletions plugin/s3spanstore/dependencies_prefetch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package s3spanstore

import (
"context"
"os"
"testing"
"time"

"github.com/hashicorp/go-hclog"
"github.com/jaegertracing/jaeger/model"
"github.com/stretchr/testify/assert"
)

func NewTestDependencyPrefetch(ctx context.Context, assert *assert.Assertions, reader ReaderWithDependencies, enabled bool) *DependenciesPrefetch {
loggerName := "jaeger-s3"

logLevel := os.Getenv("GRPC_STORAGE_PLUGIN_LOG_LEVEL")
if logLevel == "" {
logLevel = hclog.Debug.String()
}

logger := hclog.New(&hclog.LoggerOptions{
Level: hclog.LevelFromString(logLevel),
Name: loggerName,
JSONFormat: true,
})

prefetch := NewDependenciesPrefetch(ctx, logger, reader, 100*time.Millisecond, enabled)
prefetch.sleepDuration = time.Millisecond * 1

return prefetch
}

type testReader struct {
called int
}

func (r *testReader) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
r.called++
return nil, nil
}

func TestDependenciesPrefetchEnabled(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()

testReader := &testReader{called: 0}
prefetch := NewTestDependencyPrefetch(ctx, assert, testReader, true)
prefetch.Start()
time.Sleep(10 * time.Millisecond)

assert.Equal(1, testReader.called)

time.Sleep(150 * time.Millisecond)

assert.Equal(2, testReader.called)

prefetch.Stop()
}

func TestDependenciesPrefetchDisabled(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()

testReader := &testReader{called: 0}
prefetch := NewTestDependencyPrefetch(ctx, assert, testReader, false)
prefetch.Start()

time.Sleep(150 * time.Millisecond)

assert.Equal(0, testReader.called)

prefetch.Stop()
}
12 changes: 9 additions & 3 deletions plugin/s3spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type AthenaAPI interface {
StopQueryExecution(ctx context.Context, params *athena.StopQueryExecutionInput, optFns ...func(*athena.Options)) (*athena.StopQueryExecutionOutput, error)
}

func NewReader(logger hclog.Logger, svc AthenaAPI, cfg config.Athena) (*Reader, error) {
func NewReader(ctx context.Context, logger hclog.Logger, svc AthenaAPI, cfg config.Athena) (*Reader, error) {
maxSpanAge, err := time.ParseDuration(cfg.MaxSpanAge)
if err != nil {
return nil, fmt.Errorf("failed to parse max timeframe: %w", err)
Expand All @@ -44,15 +44,20 @@ func NewReader(logger hclog.Logger, svc AthenaAPI, cfg config.Athena) (*Reader,
return nil, fmt.Errorf("failed to parse services query ttl: %w", err)
}

return &Reader{
reader := &Reader{
svc: svc,
cfg: cfg,
logger: logger,
maxSpanAge: maxSpanAge,
dependenciesQueryTTL: dependenciesQueryTTL,
servicesQueryTTL: servicesQueryTTL,
athenaQueryCache: NewAthenaQueryCache(logger, svc, cfg.WorkGroup),
}, nil
}

reader.dependenciesPrefetch = NewDependenciesPrefetch(ctx, logger, reader, dependenciesQueryTTL, cfg.DependenciesPrefetch)
reader.dependenciesPrefetch.Start()

return reader, nil
}

type Reader struct {
Expand All @@ -63,6 +68,7 @@ type Reader struct {
dependenciesQueryTTL time.Duration
servicesQueryTTL time.Duration
athenaQueryCache *AthenaQueryCache
dependenciesPrefetch *DependenciesPrefetch
}

const (
Expand Down
2 changes: 1 addition & 1 deletion plugin/s3spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewTestReader(ctx context.Context, assert *assert.Assertions, mockSvc *mock
JSONFormat: true,
})

reader, err := NewReader(logger, mockSvc, config.Athena{
reader, err := NewReader(ctx, logger, mockSvc, config.Athena{
DatabaseName: "default",
SpansTableName: "jaeger_spans",
OperationsTableName: "jaeger_operations",
Expand Down
4 changes: 1 addition & 3 deletions plugin/s3spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func EmptyBucket(ctx context.Context, svc S3API, bucketName string) error {
return nil
}

func NewWriter(logger hclog.Logger, svc S3API, s3Config config.S3) (*Writer, error) {
func NewWriter(ctx context.Context, logger hclog.Logger, svc S3API, s3Config config.S3) (*Writer, error) {
rand.Seed(time.Now().UnixNano())

bufferDuration := time.Second * 60
Expand All @@ -86,8 +86,6 @@ func NewWriter(logger hclog.Logger, svc S3API, s3Config config.S3) (*Writer, err
operationsDedupeCacheSize = s3Config.OperationsDedupeCacheSize
}

ctx := context.Background()

if s3Config.EmptyBucket {
if err := EmptyBucket(ctx, svc, s3Config.BucketName); err != nil {
return nil, fmt.Errorf("failed to empty s3 bucket: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion plugin/s3spanstore/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewTestWriter(ctx context.Context, assert *assert.Assertions, mockSvc *mock
JSONFormat: true,
})

writer, err := NewWriter(logger, mockSvc, config.S3{
writer, err := NewWriter(ctx, logger, mockSvc, config.S3{
BucketName: "jaeger-spans",
SpansPrefix: "/spans/",
OperationsPrefix: "/operations/",
Expand Down
3 changes: 2 additions & 1 deletion test-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ athena:
outputLocation: s3://jaeger-s3-test-results/
workGroup: jaeger
maxSpanAge: 336h
dependenciesQueryTtl: 6h
dependenciesQueryTtl: 24h
dependenciesPrefetch: true
servicesQueryTtl: 10s

0 comments on commit e3e24d3

Please sign in to comment.