Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
fix: concurrency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneswuerbach committed Jan 30, 2022
1 parent b6e0c2b commit 92769e4
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 58 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ test: ## Run jaeger plugin tests

lint: ## Lint the code
docker run --rm -v $(PWD):/app -w /app golangci/golangci-lint:v1.42.1 golangci-lint run -v

bench: ## Run jaeger plugin benchmarks
docker compose run --rm test go test -benchmem -bench=. ./...
13 changes: 11 additions & 2 deletions plugin/s3spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type S3API interface {
}

const (
PARQUET_CONCURRENCY = 4
PARQUET_CONCURRENCY = 1
)

var (
Expand Down Expand Up @@ -136,15 +136,18 @@ func (w *Writer) closeParquetWriter(parquetWriter *writer.ParquetWriter, parquet

func (w *Writer) rotateParquetWriter(ctx context.Context) error {
w.bufferMutex.Lock()
defer w.bufferMutex.Unlock()

parquetWriteFile := w.parquetWriteFile
parquetWriter := w.parquetWriter

if err := w.createParquetWriter(ctx); err != nil {
w.bufferMutex.Unlock()

return fmt.Errorf("failed to create parquet writer: %w", err)
}

w.bufferMutex.Unlock()

if err := w.closeParquetWriter(parquetWriter, parquetWriteFile); err != nil {
return fmt.Errorf("failed to close previous parquet writer: %w", err)
}
Expand All @@ -160,6 +163,9 @@ func (w *Writer) WriteSpan(ctx context.Context, span *model.Span) error {
return fmt.Errorf("failed to create span record: %w", err)
}

w.bufferMutex.Lock()
defer w.bufferMutex.Unlock()

if err := w.parquetWriter.Write(spanRecord); err != nil {
return fmt.Errorf("failed to write span item: %w", err)
}
Expand All @@ -170,5 +176,8 @@ func (w *Writer) Close() error {
w.ticker.Stop()
w.done <- true

w.bufferMutex.Lock()
defer w.bufferMutex.Unlock()

return w.closeParquetWriter(w.parquetWriter, w.parquetWriteFile)
}
106 changes: 50 additions & 56 deletions plugin/s3spanstore/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestWriteSpan(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockSvc := mocks.NewMockS3API(ctrl)
mockSvc.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&s3.PutObjectOutput{}, nil)

assert := assert.New(t)
func NewTestWriter(ctx context.Context, assert *assert.Assertions, mockSvc *mocks.MockS3API) *Writer {
loggerName := "jaeger-s3"

logLevel := os.Getenv("GRPC_STORAGE_PLUGIN_LOG_LEVEL")
Expand All @@ -38,14 +30,17 @@ func TestWriteSpan(t *testing.T) {
JSONFormat: true,
})

ctx := context.TODO()

writer, err := NewWriter(logger, mockSvc, config.S3{
BucketName: "jaeger-spans",
Prefix: "/spans/",
})

assert.NoError(err)

return writer
}

func NewTestSpan(assert *assert.Assertions) *model.Span {
var span model.Span
assert.NoError(jsonpb.Unmarshal(strings.NewReader(`{
"traceId": "AAAAAAAAAAAAAAAAAAAAEQ==",
Expand All @@ -71,7 +66,25 @@ func TestWriteSpan(t *testing.T) {
]
}`), &span))

assert.NoError(writer.WriteSpan(ctx, &span))
return &span
}

func TestWriteSpan(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockSvc := mocks.NewMockS3API(ctrl)
mockSvc.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&s3.PutObjectOutput{}, nil)

assert := assert.New(t)
ctx := context.TODO()

writer := NewTestWriter(ctx, assert, mockSvc)

span := NewTestSpan(assert)

assert.NoError(writer.WriteSpan(ctx, span))

assert.NoError(writer.Close())

Expand All @@ -98,59 +111,40 @@ func BenchmarkWriteSpan(b *testing.B) {
defer ctrl.Finish()

assert := assert.New(b)
loggerName := "jaeger-s3"

logLevel := os.Getenv("GRPC_STORAGE_PLUGIN_LOG_LEVEL")
if logLevel == "" {
logLevel = hclog.Warn.String()
}

logger := hclog.New(&hclog.LoggerOptions{
Level: hclog.LevelFromString(logLevel),
Name: loggerName,
JSONFormat: true,
})

ctx := context.TODO()

mockSvc := mocks.NewMockS3API(ctrl)
mockSvc.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&s3.PutObjectOutput{}, nil)

writer, err := NewWriter(logger, mockSvc, config.S3{
BucketName: "jaeger-spans",
Prefix: "/spans/",
})
assert.NoError(err)

var span model.Span
assert.NoError(jsonpb.Unmarshal(strings.NewReader(`{
"traceId": "AAAAAAAAAAAAAAAAAAAAEQ==",
"spanId": "AAAAAAAAAAM=",
"operationName": "example-operation-1",
"references": [],
"startTime": "2017-01-26T16:46:31.639875Z",
"duration": "100000ns",
"tags": [],
"process": {
"serviceName": "example-service-1",
"tags": []
},
"logs": [
{
"timestamp": "2017-01-26T16:46:31.639875Z",
"fields": []
},
{
"timestamp": "2017-01-26T16:46:31.639875Z",
"fields": []
}
]
}`), &span))
writer := NewTestWriter(ctx, assert, mockSvc)
span := NewTestSpan(assert)

// run the WriteSpan function b.N times
for n := 0; n < b.N; n++ {
assert.NoError(writer.WriteSpan(ctx, &span))
assert.NoError(writer.WriteSpan(ctx, span))
}
assert.NoError(writer.Close())
}

func BenchmarkWriteSpanParallel(b *testing.B) {
ctrl := gomock.NewController(b)
defer ctrl.Finish()

assert := assert.New(b)
ctx := context.TODO()

mockSvc := mocks.NewMockS3API(ctrl)
mockSvc.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&s3.PutObjectOutput{}, nil)

writer := NewTestWriter(ctx, assert, mockSvc)
defer writer.Close()
span := NewTestSpan(assert)

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
assert.NoError(writer.WriteSpan(ctx, span))
}
})
}

0 comments on commit 92769e4

Please sign in to comment.