Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 9 additions & 70 deletions app/vlinsert/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/metrics"

Expand Down Expand Up @@ -75,80 +74,20 @@ var (
)

func pushProtobufRequest(data []byte, lmp insertutil.LogMessageProcessor, msgFields []string, useDefaultStreamFields bool) error {
var req pb.ExportLogsServiceRequest
if err := req.UnmarshalProtobuf(data); err != nil {
errorsTotal.Inc()
return fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err)
}

var commonFields []logstorage.Field
for _, rl := range req.ResourceLogs {
commonFields = commonFields[:0]
commonFields = appendKeyValues(commonFields, rl.Resource.Attributes, "")
commonFieldsLen := len(commonFields)
for _, sc := range rl.ScopeLogs {
commonFields = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp, msgFields, useDefaultStreamFields)
}
}

return nil
}

func pushFieldsFromScopeLogs(sc *pb.ScopeLogs, commonFields []logstorage.Field, lmp insertutil.LogMessageProcessor, msgFields []string, useDefaultStreamFields bool) []logstorage.Field {
fields := commonFields
for _, lr := range sc.LogRecords {
fields = fields[:len(commonFields)]
if lr.Body.KeyValueList != nil {
fields = appendKeyValues(fields, lr.Body.KeyValueList.Values, "")
logstorage.RenameField(fields[len(commonFields):], msgFields, "_msg")
} else {
fields = append(fields, logstorage.Field{
Name: "_msg",
Value: lr.Body.FormatString(true),
})
}
fields = appendKeyValues(fields, lr.Attributes, "")
if len(lr.TraceID) > 0 {
fields = append(fields, logstorage.Field{
Name: "trace_id",
Value: lr.TraceID,
})
}
if len(lr.SpanID) > 0 {
fields = append(fields, logstorage.Field{
Name: "span_id",
Value: lr.SpanID,
})
}
fields = append(fields, logstorage.Field{
Name: "severity",
Value: lr.FormatSeverity(),
})
push := func(timestamp int64, resource, attributes []logstorage.Field) {
logstorage.RenameField(attributes, msgFields, "_msg")

var streamFields []logstorage.Field
if useDefaultStreamFields {
streamFields = commonFields
streamFields = resource
}
lmp.AddRow(lr.ExtractTimestampNano(), fields, streamFields)
}
return fields
}

func appendKeyValues(fields []logstorage.Field, kvs []*pb.KeyValue, parentField string) []logstorage.Field {
for _, attr := range kvs {
fieldName := attr.Key
if parentField != "" {
fieldName = parentField + "." + fieldName
}
lmp.AddRow(timestamp, attributes, streamFields)
}

if attr.Value.KeyValueList != nil {
fields = appendKeyValues(fields, attr.Value.KeyValueList.Values, fieldName)
} else {
fields = append(fields, logstorage.Field{
Name: fieldName,
Value: attr.Value.FormatString(true),
})
}
if err := decodeRequest(data, push); err != nil {
errorsTotal.Inc()
return fmt.Errorf("cannot decode request from %d bytes: %w", len(data), err)
}
return fields
return nil
}
52 changes: 51 additions & 1 deletion app/vlinsert/opentelemetry/opentelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaLogs/app/vlinsert/insertutil"
)

func TestPushProtoOk(t *testing.T) {
func TestPushProtoOK(t *testing.T) {
f := func(src []pb.ResourceLogs, timestampsExpected []int64, resultExpected string) {
t.Helper()
lr := pb.ExportLogsServiceRequest{
Expand Down Expand Up @@ -202,8 +202,58 @@ func TestPushProtoOk(t *testing.T) {
`{"_msg":"nested fields","error.type":"document_parsing_exception","error.reason":"failed to parse field [_msg] of type [text]",`+
`"error.caused_by.type":"x_content_parse_exception","error.caused_by.reason":"unexpected end-of-input in VALUE_STRING",`+
`"error.caused_by.caused_by.type":"json_e_o_f_exception","error.caused_by.caused_by.reason":"eof","severity":"Unspecified"}`)

// decode ArrayValue
f(newLogWithBody(1234, pb.AnyValue{
ArrayValue: &pb.ArrayValue{Values: []*pb.AnyValue{{StringValue: ptrTo("foo bar")}}},
}),
[]int64{1234},
`{"_msg":"[\"foo bar\"]","severity":"Unspecified"}`,
)

// decode ArrayValue of ArrayValue
f(newLogWithBody(1234, pb.AnyValue{
ArrayValue: &pb.ArrayValue{Values: []*pb.AnyValue{
{ArrayValue: &pb.ArrayValue{Values: []*pb.AnyValue{{StringValue: ptrTo("foo")}}}},
{ArrayValue: &pb.ArrayValue{Values: []*pb.AnyValue{{StringValue: ptrTo("bar")}}}},
{ArrayValue: &pb.ArrayValue{Values: []*pb.AnyValue{{StringValue: ptrTo("buz")}}}},
}},
}),
[]int64{1234},
`{"_msg":"[[\"foo\"],[\"bar\"],[\"buz\"]]","severity":"Unspecified"}`,
)

// decode BytesValue
f(newLogWithBody(1234, pb.AnyValue{
BytesValue: ptrTo([]byte("foo bar")),
}),
[]int64{1234},
`{"_msg":"Zm9vIGJhcg==","severity":"Unspecified"}`,
)

// decode KeyValueList
f(newLogWithBody(1234, pb.AnyValue{
KeyValueList: &pb.KeyValueList{Values: []*pb.KeyValue{
{Key: "foo", Value: &pb.AnyValue{StringValue: ptrTo("bar")}},
{Key: "bar", Value: &pb.AnyValue{StringValue: ptrTo("buz")}},
},
},
}),
[]int64{1234},
`{"foo":"bar","bar":"buz","severity":"Unspecified"}`,
)
}

func ptrTo[T any](s T) *T {
return &s
}

func newLogWithBody(timestamp uint64, body pb.AnyValue) []pb.ResourceLogs {
return []pb.ResourceLogs{
{
ScopeLogs: []pb.ScopeLogs{
{LogRecords: []pb.LogRecord{{TimeUnixNano: timestamp, Body: body}}},
},
},
}
}
Loading