diff --git a/plugin/s3spanstore/dedupe_parquet_writer.go b/plugin/s3spanstore/dedupe_parquet_writer.go index 928ddb9..8cbac7d 100644 --- a/plugin/s3spanstore/dedupe_parquet_writer.go +++ b/plugin/s3spanstore/dedupe_parquet_writer.go @@ -16,6 +16,10 @@ type DedupeParquetWriter struct { parquetWriter IParquetWriter } +type DeduplicatableRow interface { + DedupeKey() string +} + func NewDedupeParquetWriter(logger hclog.Logger, dedupeDuration time.Duration, dedupeCacheSize int, parquetWriter IParquetWriter) (*DedupeParquetWriter, error) { dedupeCache, err := lru.New(dedupeCacheSize) if err != nil { @@ -32,12 +36,12 @@ func NewDedupeParquetWriter(logger hclog.Logger, dedupeDuration time.Duration, d return w, nil } -func (w *DedupeParquetWriter) Write(ctx context.Context, rowTime time.Time, row interface{}) error { - if nextWriteTime, ok := w.dedupeCache.Get(row); !ok || rowTime.After(nextWriteTime.(time.Time)) { +func (w *DedupeParquetWriter) Write(ctx context.Context, rowTime time.Time, row DeduplicatableRow) error { + if nextWriteTime, ok := w.dedupeCache.Get(row.DedupeKey()); !ok || rowTime.After(nextWriteTime.(time.Time)) { if err := w.parquetWriter.Write(ctx, rowTime, row); err != nil { return fmt.Errorf("failed to write row: %w", err) } - w.dedupeCache.Add(row, rowTime.Add(w.dedupeDuration)) + w.dedupeCache.Add(row.DedupeKey(), rowTime.Add(w.dedupeDuration)) } return nil diff --git a/plugin/s3spanstore/dedupe_parquet_writer_test.go b/plugin/s3spanstore/dedupe_parquet_writer_test.go index 40bb014..f8c8ff4 100644 --- a/plugin/s3spanstore/dedupe_parquet_writer_test.go +++ b/plugin/s3spanstore/dedupe_parquet_writer_test.go @@ -49,6 +49,10 @@ type operation struct { name string } +func (o operation) DedupeKey() string { + return o.name +} + func TestDedupeParquetWriter(t *testing.T) { assert := assert.New(t) ctx := context.Background() diff --git a/plugin/s3spanstore/operationrecord.go b/plugin/s3spanstore/operationrecord.go index 6de35a1..ffe448b 100644 --- a/plugin/s3spanstore/operationrecord.go +++ b/plugin/s3spanstore/operationrecord.go @@ -1,6 +1,8 @@ package s3spanstore import ( + "fmt" + "github.com/jaegertracing/jaeger/model" ) @@ -20,3 +22,7 @@ func NewOperationRecordFromSpan(span *model.Span) (*OperationRecord, error) { ServiceName: span.Process.ServiceName, }, nil } + +func (w *OperationRecord) DedupeKey() string { + return fmt.Sprintf("%s/%s/%s", w.OperationName, w.SpanKind, w.ServiceName) +} diff --git a/plugin/s3spanstore/writer.go b/plugin/s3spanstore/writer.go index f1eefca..3324ecb 100644 --- a/plugin/s3spanstore/writer.go +++ b/plugin/s3spanstore/writer.go @@ -32,7 +32,7 @@ type Writer struct { logger hclog.Logger spanParquetWriter IParquetWriter - operationsParquetWriter IParquetWriter + operationsParquetWriter *DedupeParquetWriter } func EmptyBucket(ctx context.Context, svc S3API, bucketName string) error { diff --git a/plugin/s3spanstore/writer_test.go b/plugin/s3spanstore/writer_test.go index fd81d6c..34b92da 100644 --- a/plugin/s3spanstore/writer_test.go +++ b/plugin/s3spanstore/writer_test.go @@ -196,6 +196,93 @@ func TestWriteSpan(t *testing.T) { assert.NoError(localFileReader.Close()) } +type S3PutObject struct { + key string + fileName string +} + +func TestWriteSpanTwice(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockSvc := mocks.NewMockS3API(ctrl) + + assert := assert.New(t) + ctx := context.TODO() + + objects := []*S3PutObject{} + defer func() { + for _, object := range objects { + os.Remove(object.fileName) + } + }() + + mockSvc.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, input *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + file, err := ioutil.TempFile("", "write-span") + assert.NoError(err) + objects = append(objects, &S3PutObject{ + key: *input.Key, + fileName: file.Name(), + }) + + dat, err := ioutil.ReadAll(input.Body) + assert.NoError(err) + assert.NoError(ioutil.WriteFile(file.Name(), dat, 0644)) + + return &s3.PutObjectOutput{}, nil + }).Times(2) + + writer := NewTestWriter(ctx, assert, mockSvc) + + span := NewTestSpan(assert) + + assert.NoError(writer.WriteSpan(ctx, span)) + assert.NoError(writer.WriteSpan(ctx, span)) + + assert.NoError(writer.Close()) + + spansFile := "" + for _, object := range objects { + if strings.HasPrefix(object.key, "/spans") { + spansFile = object.fileName + } + } + + assert.NotEmpty(spansFile) + + localFileReader, err := local.NewLocalFileReader(spansFile) + assert.NoError(err) + pr, err := reader.NewParquetReader(localFileReader, new(SpanRecord), 1) + assert.NoError(err) + + num := int(pr.GetNumRows()) + assert.Equal(2, num) + + pr.ReadStop() + assert.NoError(localFileReader.Close()) + + operationsFile := "" + for _, object := range objects { + if strings.HasPrefix(object.key, "/operations") { + operationsFile = object.fileName + } + } + + assert.NotEmpty(operationsFile) + + localFileReaderOperations, err := local.NewLocalFileReader(operationsFile) + assert.NoError(err) + prOperations, err := reader.NewParquetReader(localFileReaderOperations, new(OperationRecord), 1) + assert.NoError(err) + + numOperations := int(prOperations.GetNumRows()) + assert.Equal(1, numOperations) + + prOperations.ReadStop() + assert.NoError(localFileReaderOperations.Close()) +} + func TestWriteSpanWithTagsAndReferences(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish()