Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
fix: prevent empty parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
johanneswuerbach committed Jan 30, 2022
1 parent 92769e4 commit 1cc0ddf
Showing 1 changed file with 12 additions and 13 deletions.
25 changes: 12 additions & 13 deletions plugin/s3spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ func NewWriter(logger hclog.Logger, svc S3API, s3Config config.S3) (*Writer, err

ctx := context.Background()

if err := w.createParquetWriter(ctx); err != nil {
return nil, fmt.Errorf("failed to create parquet writer: %w", err)
}

go func() {
for {
select {
Expand All @@ -93,20 +89,23 @@ func NewWriter(logger hclog.Logger, svc S3API, s3Config config.S3) (*Writer, err
return w, nil
}

func (w *Writer) createParquetWriter(ctx context.Context) error {
func (w *Writer) ensureParquetWriter(ctx context.Context) error {
if w.parquetWriter != nil {
return nil
}

writeFile, err := s3v2.NewS3FileWriterWithClient(ctx, w.svc, w.bucketName, w.parquetKey(), nil)
if err != nil {
return fmt.Errorf("failed to create parquet s3 client: %w", err)
}

w.parquetWriteFile = writeFile

parquetWriter, err := writer.NewParquetWriter(writeFile, new(SpanRecord), PARQUET_CONCURRENCY)
if err != nil {
w.parquetWriteFile.Close()
w.parquetWriteFile = nil
return fmt.Errorf("failed to create parquet writer: %w", err)
}

w.parquetWriter = parquetWriter

return nil
Expand Down Expand Up @@ -139,12 +138,8 @@ func (w *Writer) rotateParquetWriter(ctx context.Context) error {

parquetWriteFile := w.parquetWriteFile
parquetWriter := w.parquetWriter

if err := w.createParquetWriter(ctx); err != nil {
w.bufferMutex.Unlock()

return fmt.Errorf("failed to create parquet writer: %w", err)
}
w.parquetWriteFile = nil
w.parquetWriter = nil

w.bufferMutex.Unlock()

Expand All @@ -166,6 +161,10 @@ func (w *Writer) WriteSpan(ctx context.Context, span *model.Span) error {
w.bufferMutex.Lock()
defer w.bufferMutex.Unlock()

if err := w.ensureParquetWriter(ctx); err != nil {
return fmt.Errorf("failed to ensure parquet writer: %w", err)
}

if err := w.parquetWriter.Write(spanRecord); err != nil {
return fmt.Errorf("failed to write span item: %w", err)
}
Expand Down

0 comments on commit 1cc0ddf

Please sign in to comment.