diff --git a/app/vlinsert/opentelemetry/opentelemetry.go b/app/vlinsert/opentelemetry/opentelemetry.go index ad1db77f8..854ee070b 100644 --- a/app/vlinsert/opentelemetry/opentelemetry.go +++ b/app/vlinsert/opentelemetry/opentelemetry.go @@ -7,7 +7,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/opentelemetry/pb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil" "github.com/VictoriaMetrics/metrics" @@ -75,80 +74,20 @@ var ( ) func pushProtobufRequest(data []byte, lmp insertutil.LogMessageProcessor, msgFields []string, useDefaultStreamFields bool) error { - var req pb.ExportLogsServiceRequest - if err := req.UnmarshalProtobuf(data); err != nil { - errorsTotal.Inc() - return fmt.Errorf("cannot unmarshal request from %d bytes: %w", len(data), err) - } - - var commonFields []logstorage.Field - for _, rl := range req.ResourceLogs { - commonFields = commonFields[:0] - commonFields = appendKeyValues(commonFields, rl.Resource.Attributes, "") - commonFieldsLen := len(commonFields) - for _, sc := range rl.ScopeLogs { - commonFields = pushFieldsFromScopeLogs(&sc, commonFields[:commonFieldsLen], lmp, msgFields, useDefaultStreamFields) - } - } - - return nil -} - -func pushFieldsFromScopeLogs(sc *pb.ScopeLogs, commonFields []logstorage.Field, lmp insertutil.LogMessageProcessor, msgFields []string, useDefaultStreamFields bool) []logstorage.Field { - fields := commonFields - for _, lr := range sc.LogRecords { - fields = fields[:len(commonFields)] - if lr.Body.KeyValueList != nil { - fields = appendKeyValues(fields, lr.Body.KeyValueList.Values, "") - logstorage.RenameField(fields[len(commonFields):], msgFields, "_msg") - } else { - fields = append(fields, logstorage.Field{ - Name: "_msg", - Value: lr.Body.FormatString(true), - }) - } - fields = appendKeyValues(fields, lr.Attributes, "") - if len(lr.TraceID) > 0 { - fields = append(fields, logstorage.Field{ - Name: "trace_id", - Value: lr.TraceID, - }) - } - if len(lr.SpanID) > 0 { - fields = append(fields, logstorage.Field{ - Name: "span_id", - Value: lr.SpanID, - }) - } - fields = append(fields, logstorage.Field{ - Name: "severity", - Value: lr.FormatSeverity(), - }) + push := func(timestamp int64, resource, attributes []logstorage.Field) { + logstorage.RenameField(attributes, msgFields, "_msg") var streamFields []logstorage.Field if useDefaultStreamFields { - streamFields = commonFields + streamFields = resource } - lmp.AddRow(lr.ExtractTimestampNano(), fields, streamFields) - } - return fields -} -func appendKeyValues(fields []logstorage.Field, kvs []*pb.KeyValue, parentField string) []logstorage.Field { - for _, attr := range kvs { - fieldName := attr.Key - if parentField != "" { - fieldName = parentField + "." + fieldName - } + lmp.AddRow(timestamp, attributes, streamFields) + } - if attr.Value.KeyValueList != nil { - fields = appendKeyValues(fields, attr.Value.KeyValueList.Values, fieldName) - } else { - fields = append(fields, logstorage.Field{ - Name: fieldName, - Value: attr.Value.FormatString(true), - }) - } + if err := decodeRequest(data, push); err != nil { + errorsTotal.Inc() + return fmt.Errorf("cannot decode request from %d bytes: %w", len(data), err) } - return fields + return nil } diff --git a/app/vlinsert/opentelemetry/opentelemetry_test.go b/app/vlinsert/opentelemetry/opentelemetry_test.go index 800e4ad22..91142c2d2 100644 --- a/app/vlinsert/opentelemetry/opentelemetry_test.go +++ b/app/vlinsert/opentelemetry/opentelemetry_test.go @@ -8,7 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaLogs/app/vlinsert/insertutil" ) -func TestPushProtoOk(t *testing.T) { +func TestPushProtoOK(t *testing.T) { f := func(src []pb.ResourceLogs, timestampsExpected []int64, resultExpected string) { t.Helper() lr := pb.ExportLogsServiceRequest{ @@ -202,8 +202,58 @@ func TestPushProtoOk(t *testing.T) { `{"_msg":"nested fields","error.type":"document_parsing_exception","error.reason":"failed to parse field [_msg] of type [text]",`+ `"error.caused_by.type":"x_content_parse_exception","error.caused_by.reason":"unexpected end-of-input in VALUE_STRING",`+ `"error.caused_by.caused_by.type":"json_e_o_f_exception","error.caused_by.caused_by.reason":"eof","severity":"Unspecified"}`) + + // decode ArrayValue + f(newLogWithBody(1234, pb.AnyValue{ + ArrayValue: &pb.ArrayValue{Values: []*pb.AnyValue{{StringValue: ptrTo("foo bar")}}}, + }), + []int64{1234}, + `{"_msg":"[\"foo bar\"]","severity":"Unspecified"}`, + ) + + // decode ArrayValue of ArrayValue + f(newLogWithBody(1234, pb.AnyValue{ + ArrayValue: &pb.ArrayValue{Values: []*pb.AnyValue{ + {ArrayValue: &pb.ArrayValue{Values: []*pb.AnyValue{{StringValue: ptrTo("foo")}}}}, + {ArrayValue: &pb.ArrayValue{Values: []*pb.AnyValue{{StringValue: ptrTo("bar")}}}}, + {ArrayValue: &pb.ArrayValue{Values: []*pb.AnyValue{{StringValue: ptrTo("buz")}}}}, + }}, + }), + []int64{1234}, + `{"_msg":"[[\"foo\"],[\"bar\"],[\"buz\"]]","severity":"Unspecified"}`, + ) + + // decode BytesValue + f(newLogWithBody(1234, pb.AnyValue{ + BytesValue: ptrTo([]byte("foo bar")), + }), + []int64{1234}, + `{"_msg":"Zm9vIGJhcg==","severity":"Unspecified"}`, + ) + + // decode KeyValueList + f(newLogWithBody(1234, pb.AnyValue{ + KeyValueList: &pb.KeyValueList{Values: []*pb.KeyValue{ + {Key: "foo", Value: &pb.AnyValue{StringValue: ptrTo("bar")}}, + {Key: "bar", Value: &pb.AnyValue{StringValue: ptrTo("buz")}}, + }, + }, + }), + []int64{1234}, + `{"foo":"bar","bar":"buz","severity":"Unspecified"}`, + ) } func ptrTo[T any](s T) *T { return &s } + +func newLogWithBody(timestamp uint64, body pb.AnyValue) []pb.ResourceLogs { + return []pb.ResourceLogs{ + { + ScopeLogs: []pb.ScopeLogs{ + {LogRecords: []pb.LogRecord{{TimeUnixNano: timestamp, Body: body}}}, + }, + }, + } +} diff --git a/app/vlinsert/opentelemetry/pb.go b/app/vlinsert/opentelemetry/pb.go new file mode 100644 index 000000000..3c1e20517 --- /dev/null +++ b/app/vlinsert/opentelemetry/pb.go @@ -0,0 +1,589 @@ +package opentelemetry + +import ( + "encoding/base64" + "encoding/hex" + "fmt" + "strconv" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/easyproto" +) + +type handler func(timestamp int64, resource, attributes []logstorage.Field) + +// decodeRequest parses a LogsData protobuf message from src +// and calls the provided handler for each decoded log record. +// +// See the definition of LogsData here: +// https://github.com/open-telemetry/opentelemetry-proto/blob/34d29fe5ad4689b5db0259d3750de2bfa195bc85/opentelemetry/proto/logs/v1/logs.proto#L38 +func decodeRequest(src []byte, handle handler) (err error) { + fb := getFmtBuffer() + defer putFmtBuffer(fb) + fs := getFieldsSlice() + defer putFieldsSlice(fs) + + // message LogsData { + // repeated ResourceLogs resource_logs = 1; + // } + + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in ExportLogsServiceRequest: %s", err) + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot read ResourceLogs data") + } + + fb.reset() + fs.reset() + + fs.s, err = decodeResourceLogs(fb, fs.s, data, handle) + if err != nil { + return fmt.Errorf("cannot decode ResourceLogs: %s", err) + } + } + } + return nil +} + +func decodeResourceLogs(fb *fmtBuffer, fields []logstorage.Field, src []byte, handle handler) (_ []logstorage.Field, err error) { + // message ResourceLogs { + // Resource resource = 1; + // repeated ScopeLogs scope_logs = 2; + // } + + // Decode resource + data, ok, err := findMessageData(src, 1) + if err != nil { + return nil, fmt.Errorf("cannot find Resource in ResourceLogs: %s", err) + } + resource := fields + if ok { + resource, err = decodeResource(fb, resource, data) + if err != nil { + return nil, fmt.Errorf("cannot decode Resource: %s", err) + } + } + + // Decode scope_logs + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return nil, fmt.Errorf("cannot read next ScopeLogs in ResourceLogs: %s", err) + } + switch fc.FieldNum { + case 2: + data, ok := fc.MessageData() + if !ok { + return nil, fmt.Errorf("cannot read ScopeLogs data") + } + + fields, err := decodeScopeLogs(fb, data, resource, handle) + if err != nil { + return nil, fmt.Errorf("cannot decode ScopeLogs: %s", err) + } + resource = fields[:len(resource)] + } + } + + return resource, nil +} + +func decodeResource(f *fmtBuffer, dst []logstorage.Field, src []byte) (_ []logstorage.Field, err error) { + // message Resource { + // repeated KeyValue attributes = 1; + // } + + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return dst, fmt.Errorf("cannot read next field in Resource") + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return dst, fmt.Errorf("cannot read Attributes data") + } + dst, err = decodeKeyValue(f, dst, "", data) + if err != nil { + return dst, fmt.Errorf("cannot decode Attributes: %s", err) + } + } + } + return dst, nil +} + +func decodeScopeLogs(fb *fmtBuffer, src []byte, resource []logstorage.Field, handle handler) (_ []logstorage.Field, err error) { + // message ScopeLogs { + // repeated LogRecord log_records = 2; + // } + + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return nil, fmt.Errorf("cannot read next field in ScopeLogs: %w", err) + } + switch fc.FieldNum { + case 2: + data, ok := fc.MessageData() + if !ok { + return nil, fmt.Errorf("cannot read LogRecord data") + } + + bufOrig := fb.buf + dst := resource + + timestamp, fields, err := decodeLogRecord(fb, data, dst) + if err != nil { + return nil, fmt.Errorf("cannot decode LogRecord: %w", err) + } + + handle(timestamp, resource, fields) + + resource = fields[:len(resource)] + fb.buf = fb.buf[:len(bufOrig)] + } + } + return resource, nil +} + +func decodeLogRecord(f *fmtBuffer, src []byte, dst []logstorage.Field) (int64, []logstorage.Field, error) { + // message LogRecord { + // fixed64 time_unix_nano = 1; + // fixed64 observed_time_unix_nano = 11; + // SeverityNumber severity_number = 2; + // string severity_text = 3; + // AnyValue body = 5; + // repeated KeyValue attributes = 6; + // bytes trace_id = 9; + // bytes span_id = 10; + // } + + fields := dst + var ( + timeUnixNano uint64 + observedTimeUnixNano uint64 + severityText string + severityNumber int32 + ) + + var fc easyproto.FieldContext + for len(src) > 0 { + var err error + src, err = fc.NextField(src) + if err != nil { + return 0, nil, fmt.Errorf("cannot read next field in LogRecord: %w", err) + } + var ok bool + switch fc.FieldNum { + case 1: + timeUnixNano, ok = fc.Fixed64() + if !ok { + return 0, nil, fmt.Errorf("cannot read log record timestamp") + } + case 11: + observedTimeUnixNano, ok = fc.Fixed64() + if !ok { + return 0, nil, fmt.Errorf("cannot read log record observed timestamp") + } + case 2: + severityNumber, ok = fc.Int32() + if !ok { + return 0, nil, fmt.Errorf("cannot read severity number") + } + case 3: + severityText, ok = fc.String() + if !ok { + return 0, nil, fmt.Errorf("cannot read severity string") + } + case 5: + body, ok := fc.MessageData() + if !ok { + return 0, nil, fmt.Errorf("cannot read Body") + } + fields, err = decodeAnyValue(f, fields, body, "") + if err != nil { + return 0, nil, fmt.Errorf("cannot decode Body: %w", err) + } + case 6: + data, ok := fc.MessageData() + if !ok { + return 0, nil, fmt.Errorf("cannot read attributes data") + } + fields, err = decodeKeyValue(f, fields, "", data) + if err != nil { + return 0, nil, fmt.Errorf("cannot decode attributes: %w", err) + } + case 9: + traceID, ok := fc.Bytes() + if !ok { + return 0, nil, fmt.Errorf("cannot read trace id") + } + fields = append(fields, logstorage.Field{ + Name: "trace_id", + Value: f.formatHex(traceID), + }) + case 10: + spanID, ok := fc.Bytes() + if !ok { + return 0, nil, fmt.Errorf("cannot read span id") + } + fields = append(fields, logstorage.Field{ + Name: "span_id", + Value: f.formatHex(spanID), + }) + } + } + + if severityText == "" { + severityText = formatSeverity(severityNumber) + } + fields = append(fields, logstorage.Field{ + Name: "severity", + Value: severityText, + }) + + var timestamp int64 + switch { + case timeUnixNano > 0: + timestamp = int64(timeUnixNano) + case observedTimeUnixNano > 0: + timestamp = int64(observedTimeUnixNano) + default: + timestamp = time.Now().UnixNano() + } + + return timestamp, fields, nil +} + +// https://github.com/open-telemetry/opentelemetry-collector/blob/cd1f7623fe67240e32e74735488c3db111fad47b/pdata/plog/severity_number.go#L41 +var logSeverities = []string{ + "Unspecified", + "Trace", + "Trace2", + "Trace3", + "Trace4", + "Debug", + "Debug2", + "Debug3", + "Debug4", + "Info", + "Info2", + "Info3", + "Info4", + "Warn", + "Warn2", + "Warn3", + "Warn4", + "Error", + "Error2", + "Error3", + "Error4", + "Fatal", + "Fatal2", + "Fatal3", + "Fatal4", +} + +func formatSeverity(severity int32) string { + if severity < 0 || severity >= int32(len(logSeverities)) { + return logSeverities[0] + } + return logSeverities[severity] +} + +func decodeKeyValue(f *fmtBuffer, dst []logstorage.Field, fieldName string, src []byte) (_ []logstorage.Field, err error) { + // message KeyValue { + // string key = 1; + // AnyValue value = 2; + // } + + // Decode key + data, ok, err := findMessageData(src, 1) + if err != nil { + return dst, fmt.Errorf("cannot find Key in KeyValue: %s", err) + } + if !ok { + return dst, fmt.Errorf("key is missing in KeyValue") + } + fieldName = f.formatSubFieldName(fieldName, data) + + // Decode value + data, ok, err = findMessageData(src, 2) + if err != nil { + return dst, fmt.Errorf("cannot find Value in KeyValue: %s", err) + } + if !ok { + // Value is null, skip it. + return dst, nil + } + + dst, err = decodeAnyValue(f, dst, data, fieldName) + if err != nil { + return dst, fmt.Errorf("cannot decode AnyValue: %s", err) + } + return dst, nil +} + +func decodeAnyValue(f *fmtBuffer, dst []logstorage.Field, src []byte, fieldName string) (_ []logstorage.Field, err error) { + // message AnyValue { + // oneof value { + // string string_value = 1; + // bool bool_value = 2; + // int64 int_value = 3; + // double double_value = 4; + // ArrayValue array_value = 5; + // KeyValueList kvlist_value = 6; + // bytes bytes_value = 7; + // } + // } + + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return dst, fmt.Errorf("cannot read next field in AnyValue") + } + switch fc.FieldNum { + case 1: + stringValue, ok := fc.String() + if !ok { + return dst, fmt.Errorf("cannot read StringValue") + } + dst = append(dst, logstorage.Field{ + Name: fieldName, + Value: stringValue, + }) + case 2: + boolValue, ok := fc.Bool() + if !ok { + return dst, fmt.Errorf("cannot read BoolValue") + } + dst = append(dst, logstorage.Field{ + Name: fieldName, + Value: strconv.FormatBool(boolValue), + }) + case 3: + intValue, ok := fc.Int64() + if !ok { + return dst, fmt.Errorf("cannot read IntValue") + } + dst = append(dst, logstorage.Field{ + Name: fieldName, + Value: f.formatInt(intValue), + }) + case 4: + doubleValue, ok := fc.Double() + if !ok { + return dst, fmt.Errorf("cannot read DoubleValue") + } + dst = append(dst, logstorage.Field{ + Name: fieldName, + Value: f.formatFloat(doubleValue), + }) + case 5: + data, ok := fc.MessageData() + if !ok { + return dst, fmt.Errorf("cannot read ArrayValue") + } + + arena := jsonArenaPool.Get() + + // Encode arrays as JSON to match the behavior of /insert/jsonline + arr, err := decodeArrayValueToJSON(arena, data) + if err != nil { + return dst, fmt.Errorf("cannot decode ArrayValue: %s", err) + } + + n := len(f.buf) + f.buf = arr.MarshalTo(f.buf) + jsonArenaPool.Put(arena) + encodedArray := bytesutil.ToUnsafeString(f.buf[n:]) + + dst = append(dst, logstorage.Field{ + Name: fieldName, + Value: encodedArray, + }) + case 6: + data, ok := fc.MessageData() + if !ok { + return dst, fmt.Errorf("cannot read KeyValueList") + } + dst, err = decodeKeyValueList(f, dst, fieldName, data) + if err != nil { + return dst, fmt.Errorf("cannot decode KeyValueList: %s", err) + } + case 7: + bytesValue, ok := fc.Bytes() + if !ok { + return dst, fmt.Errorf("cannot read BytesValue") + } + v := f.formatBase64(bytesValue) + dst = append(dst, logstorage.Field{ + Name: fieldName, + Value: v, + }) + default: + logger.Warnf("unsupported AnyValue type %d, please create an issue: https://github.com/VictoriaMetrics/VictoriaLogs/issues", fc.FieldNum) + } + } + return dst, nil +} + +func decodeKeyValueList(f *fmtBuffer, fields []logstorage.Field, fieldName string, src []byte) (_ []logstorage.Field, err error) { + // message KeyValueList { + // repeated KeyValue values = 1; + // } + + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fields, fmt.Errorf("cannot read next field in KeyValueList") + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return fields, fmt.Errorf("cannot read Value data") + } + fields, err = decodeKeyValue(f, fields, fieldName, data) + if err != nil { + return fields, fmt.Errorf("cannot decode KeyValue: %s", err) + } + } + } + return fields, nil +} + +func findMessageData(src []byte, fieldNum uint32) (data []byte, ok bool, err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return nil, false, fmt.Errorf("cannot read next field: %s", err) + } + + if fc.FieldNum != fieldNum { + continue + } + + data, ok = fc.MessageData() + if !ok { + return nil, false, fmt.Errorf("cannot read field data") + } + return data, true, nil + } + return nil, false, nil +} + +type fieldSlice struct { + s []logstorage.Field +} + +var fieldsSlicePool = sync.Pool{ + New: func() any { + return &fieldSlice{} + }, +} + +func getFieldsSlice() *fieldSlice { + c := fieldsSlicePool.Get().(*fieldSlice) + return c +} + +func putFieldsSlice(c *fieldSlice) { + c.reset() + fieldsSlicePool.Put(c) +} + +func (c *fieldSlice) reset() { + clear(c.s) + c.s = c.s[:0] +} + +type fmtBuffer struct { + buf []byte +} + +var fmtBufferPool = sync.Pool{ + New: func() any { + return &fmtBuffer{} + }, +} + +func getFmtBuffer() *fmtBuffer { + fb := fmtBufferPool.Get().(*fmtBuffer) + return fb +} + +func putFmtBuffer(f *fmtBuffer) { + f.reset() + fmtBufferPool.Put(f) +} + +func (fb *fmtBuffer) reset() { + fb.buf = fb.buf[:0] +} + +func (fb *fmtBuffer) formatInt(v int64) string { + n := len(fb.buf) + fb.buf = strconv.AppendInt(fb.buf, v, 10) + return bytesutil.ToUnsafeString(fb.buf[n:]) +} + +func (fb *fmtBuffer) formatFloat(v float64) string { + n := len(fb.buf) + fb.buf = strconv.AppendFloat(fb.buf, v, 'f', -1, 64) + return bytesutil.ToUnsafeString(fb.buf[n:]) +} + +func (fb *fmtBuffer) formatSubFieldName(prefix string, suffix []byte) string { + if prefix == "" { + // There is no prefix, so just return the suffix as is. + return bytesutil.ToUnsafeString(suffix) + } + + n := len(fb.buf) + fb.buf = append(fb.buf, prefix...) + fb.buf = append(fb.buf, '.') + fb.buf = append(fb.buf, suffix...) + + fieldName := bytesutil.ToUnsafeString(fb.buf[n:]) + return fieldName +} + +func (fb *fmtBuffer) formatHex(src []byte) string { + n := len(fb.buf) + size := hex.EncodedLen(len(src)) + fb.buf = bytesutil.ResizeNoCopyMayOverallocate(fb.buf, len(fb.buf)+size) + + hex.Encode(fb.buf[n:], src) + v := bytesutil.ToUnsafeString(fb.buf[n:]) + return v +} + +func (fb *fmtBuffer) formatBase64(src []byte) string { + n := len(fb.buf) + size := base64.StdEncoding.EncodedLen(len(src)) + fb.buf = bytesutil.ResizeNoCopyMayOverallocate(fb.buf, len(fb.buf)+size) + + base64.StdEncoding.Encode(fb.buf[n:], src) + + v := bytesutil.ToUnsafeString(fb.buf[n:]) + return v +} diff --git a/app/vlinsert/opentelemetry/pb_json.go b/app/vlinsert/opentelemetry/pb_json.go new file mode 100644 index 000000000..4cb2a0540 --- /dev/null +++ b/app/vlinsert/opentelemetry/pb_json.go @@ -0,0 +1,193 @@ +package opentelemetry + +import ( + "encoding/base64" + "fmt" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/easyproto" + "github.com/valyala/fastjson" +) + +// decodeArrayValueToJSON decodes a protobuf ArrayValue message +// into a JSON array represented by fastjson.Value. +func decodeArrayValueToJSON(a *fastjson.Arena, src []byte) (_ *fastjson.Value, err error) { + // message ArrayValue { + // repeated AnyValue values = 1; + // } + + dst := a.NewArray() + + var fc easyproto.FieldContext + for i := 0; len(src) > 0; i++ { + src, err = fc.NextField(src) + if err != nil { + return nil, fmt.Errorf("cannot read next field in ArrayValue") + } + + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return nil, fmt.Errorf("cannot read Value data") + } + + v, err := decodeAnyValueToJSON(a, data) + if err != nil { + return nil, fmt.Errorf("cannot decode AnyValue: %s", err) + } + dst.SetArrayItem(i, v) + } + } + + return dst, nil +} + +func decodeAnyValueToJSON(a *fastjson.Arena, src []byte) (_ *fastjson.Value, err error) { + // message AnyValue { + // oneof value { + // string string_value = 1; + // bool bool_value = 2; + // int64 int_value = 3; + // double double_value = 4; + // ArrayValue array_value = 5; + // KeyValueList kvlist_value = 6; + // bytes bytes_value = 7; + // } + // } + + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return nil, fmt.Errorf("cannot read next field in AnyValue") + } + switch fc.FieldNum { + case 1: + stringValue, ok := fc.String() + if !ok { + return nil, fmt.Errorf("cannot read StringValue") + } + return a.NewString(stringValue), nil + case 2: + boolValue, ok := fc.Bool() + if !ok { + return nil, fmt.Errorf("cannot read BoolValue") + } + if boolValue { + return a.NewTrue(), nil + } else { + return a.NewFalse(), nil + } + case 3: + intValue, ok := fc.Int64() + if !ok { + return nil, fmt.Errorf("cannot read IntValue") + } + return a.NewNumberInt(int(intValue)), nil + case 4: + doubleValue, ok := fc.Double() + if !ok { + return nil, fmt.Errorf("cannot read DoubleValue") + } + return a.NewNumberFloat64(doubleValue), nil + case 5: + data, ok := fc.MessageData() + if !ok { + return nil, fmt.Errorf("cannot read ArrayValue") + } + arr, err := decodeArrayValueToJSON(a, data) + if err != nil { + return nil, fmt.Errorf("cannot decode ArrayValue: %s", err) + } + return arr, nil + case 6: + data, ok := fc.MessageData() + if !ok { + return nil, fmt.Errorf("cannot read KeyValueList") + } + obj, err := decodeKeyValueListToJSON(a, data) + if err != nil { + return nil, fmt.Errorf("cannot decode KeyValueList: %s", err) + } + return obj, nil + case 7: + bytesValue, ok := fc.Bytes() + if !ok { + return nil, fmt.Errorf("cannot read BytesValue") + } + b64 := base64.StdEncoding.EncodeToString(bytesValue) + return a.NewString(b64), nil + default: + logger.Warnf("unsupported AnyValue type %d, please create an issue: https://github.com/VictoriaMetrics/VictoriaLogs/issues", fc.FieldNum) + } + } + return nil, nil +} + +func decodeKeyValueListToJSON(a *fastjson.Arena, src []byte) (_ *fastjson.Value, err error) { + // message KeyValueList { + // repeated KeyValue values = 1; + // } + + dst := a.NewObject() + + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return nil, fmt.Errorf("cannot read next field in KeyValueList") + } + switch fc.FieldNum { + case 1: + data, ok := fc.MessageData() + if !ok { + return nil, fmt.Errorf("cannot read Value data") + } + + if err := decodeKeyValueToJSON(a, dst, data); err != nil { + return nil, fmt.Errorf("cannot decode KeyValue: %s", err) + } + } + } + return dst, nil +} + +func decodeKeyValueToJSON(a *fastjson.Arena, dst *fastjson.Value, src []byte) (err error) { + // message KeyValue { + // string key = 1; + // AnyValue value = 2; + // } + + // Decode key + data, ok, err := findMessageData(src, 1) + if err != nil { + return fmt.Errorf("cannot find Key in KeyValue: %s", err) + } + if !ok { + return fmt.Errorf("key is missing in KeyValue") + } + fieldName := bytesutil.ToUnsafeString(data) + + // Decode value + data, ok, err = findMessageData(src, 2) + if err != nil { + return fmt.Errorf("cannot find Value in KeyValue: %s", err) + } + if !ok { + // Value is null, skip it. + return nil + } + + v, err := decodeAnyValueToJSON(a, data) + if err != nil { + return fmt.Errorf("cannot decode AnyValue: %s", err) + } + + dst.Set(fieldName, v) + + return nil +} + +var jsonArenaPool fastjson.ArenaPool