Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions app/vlinsert/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package opentelemetry
import (
"fmt"
"net/http"
"sync"
"time"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
Expand Down Expand Up @@ -74,20 +75,38 @@ var (
requestProtobufDuration = metrics.NewSummary(`vl_http_request_duration_seconds{path="/insert/opentelemetry/v1/logs",format="protobuf"}`)
)

var (
exportLogsReqPool = sync.Pool{
Copy link
Member

@vadimalekseev vadimalekseev Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, to optimize OpenTelemetry format parsing we should:

  1. Avoid using unnecessary pointers or consider an arena-style allocator (see example: https://github.com/valyala/fastjson/blob/master/arena.go). Since the OpenTelemetry format is relatively flat, I think avoiding extra pointers is the better option.
  2. Eliminate memory copies during unmarshal – instead, reference the original buffer. This will require refactoring the code to use []byte instead of string.

These two optimizations would significantly reduce allocations and improve performance.

Using sync.Pool here won't help much, as it only reduces allocations by a few percent. If you run into performance issues with OpenTelemetry, let us know – in that case, we should apply the optimizations above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay in responding.

I agree that optimize OpenTelemetry format parsing does not necessarily require the use of sync.Pool. Also, I think that deeply optimizing it in the two ways mentioned above would introduce more complexity.

I believe it is best to stick with the original code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with the @vadimalekseev recommendations.

@vadimalekseev , could you create a feature request for optimizing the parse performance for logs ingested via OpenTelemetry data ingestion protocol? Then we can start working on this issue according to your recommendations.

New: func() any {
return &pb.ExportLogsServiceRequest{}
},
}
fieldsPool = sync.Pool{
New: func() any {
s := make([]logstorage.Field, 0)
return &s
},
}
)

func pushProtobufRequest(data []byte, lmp insertutil.LogMessageProcessor, msgFields []string, useDefaultStreamFields bool) error {
var req pb.ExportLogsServiceRequest
req := getExportLogsReq()
defer putExportLogsReq(req)

if err := req.UnmarshalProtobuf(data); err != nil {
errorsTotal.Inc()
return fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err)
}

var commonFields []logstorage.Field
commonFields := getFields()
defer putFields(commonFields)

for _, rl := range req.ResourceLogs {
commonFields = commonFields[:0]
commonFields = appendKeyValues(commonFields, rl.Resource.Attributes, "")
commonFieldsLen := len(commonFields)
*commonFields = (*commonFields)[:0]
*commonFields = appendKeyValues(*commonFields, rl.Resource.Attributes, "")
commonFieldsLen := len(*commonFields)
for _, sc := range rl.ScopeLogs {
commonFields = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp, msgFields, useDefaultStreamFields)
*commonFields = pushFieldsFromScopeLogs(&sc, (*commonFields)[:commonFieldsLen], lmp, msgFields, useDefaultStreamFields)
}
}

Expand Down Expand Up @@ -152,3 +171,29 @@ func appendKeyValues(fields []logstorage.Field, kvs []*pb.KeyValue, parentField
}
return fields
}

func getExportLogsReq() *pb.ExportLogsServiceRequest {
req := exportLogsReqPool.Get().(*pb.ExportLogsServiceRequest)
if req == nil {
return &pb.ExportLogsServiceRequest{}
}
return req
}

func putExportLogsReq(req *pb.ExportLogsServiceRequest) {
req.ResourceLogs = req.ResourceLogs[:0]
exportLogsReqPool.Put(req)
}

func getFields() *[]logstorage.Field {
value := fieldsPool.Get()
if value == nil {
return &[]logstorage.Field{}
}
return value.(*[]logstorage.Field)
}

func putFields(fields *[]logstorage.Field) {
*fields = (*fields)[:0]
fieldsPool.Put(fields)
}