diff --git a/pkg/mimirpb/compat_rw2_test.go b/pkg/mimirpb/compat_rw2_test.go index 02a8c833d84..4f863fa8c43 100644 --- a/pkg/mimirpb/compat_rw2_test.go +++ b/pkg/mimirpb/compat_rw2_test.go @@ -797,6 +797,50 @@ func TestRW2Unmarshal(t *testing.T) { require.Equal(t, received.Metadata[0].Help, "It's a cool series, but old description.") require.Equal(t, received.Metadata[0].Unit, "megawatts") }) + + t.Run("conflicting metadata, skipDeduplicateMetadata is true, both metadata and their order is preserved", func(t *testing.T) { + writeRequest := &WriteRequest{ + SymbolsRW2: []string{"", "__name__", "my_cool_series", "It's a cool series, but old description.", "It's a cool series, but new description.", "megawatts"}, + TimeseriesRW2: []TimeSeriesRW2{ + { + LabelsRefs: []uint32{1, 2}, + Metadata: MetadataRW2{ + Type: METRIC_TYPE_COUNTER, + HelpRef: 3, + UnitRef: 5, + }, + }, + { + LabelsRefs: []uint32{1, 2}, + Metadata: MetadataRW2{ + Type: METRIC_TYPE_COUNTER, + HelpRef: 4, + UnitRef: 5, + }, + }, + }, + } + data, err := writeRequest.Marshal() + require.NoError(t, err) + + // Unmarshal the data back into Mimir's WriteRequest. + received := PreallocWriteRequest{ + SkipDeduplicateMetadata: true, + } + received.UnmarshalFromRW2 = true + err = received.Unmarshal(data) + require.NoError(t, err) + + require.Len(t, received.Metadata, 2) + require.Equal(t, received.Metadata[0].MetricFamilyName, "my_cool_series") + require.Equal(t, received.Metadata[0].Type, COUNTER) + require.Equal(t, received.Metadata[0].Help, "It's a cool series, but old description.") + require.Equal(t, received.Metadata[0].Unit, "megawatts") + require.Equal(t, received.Metadata[1].MetricFamilyName, "my_cool_series") + require.Equal(t, received.Metadata[1].Type, COUNTER) + require.Equal(t, received.Metadata[1].Help, "It's a cool series, but new description.") + require.Equal(t, received.Metadata[1].Unit, "megawatts") + }) } func makeTestRW2WriteRequest(syms *test.SymbolTableBuilder) *WriteRequest { diff --git a/pkg/mimirpb/custom.go b/pkg/mimirpb/custom.go index 9cd477f300c..e042da838bf 100644 --- a/pkg/mimirpb/custom.go +++ b/pkg/mimirpb/custom.go @@ -452,6 +452,13 @@ type MarshalerWithSize interface { MarshalWithSize(size int) ([]byte, error) } +func metadataSetFromSettings(skipDeduplicateMetadata bool) metadataSet { + if skipDeduplicateMetadata { + return newPassthroughMetadataSet() + } + return newDedupingMetadataSet() +} + // metadataSet is the collection of metadata within a request. // It keeps the order at which metadata is added. Metadata may optionally be deduplicated by family name. type metadataSet interface { @@ -461,6 +468,7 @@ type metadataSet interface { } var _ metadataSet = dedupingMetadataSet{} +var _ metadataSet = &passthroughMetadataSet{} // dedupingMetadataSet is a metadataSet that only stores one metadata per metric family. // Only the first metadata seen for a given family is kept. @@ -496,6 +504,28 @@ func (m dedupingMetadataSet) slice() []*MetricMetadata { return result } +type passthroughMetadataSet struct { + metadata []*MetricMetadata +} + +func newPassthroughMetadataSet() *passthroughMetadataSet { + return &passthroughMetadataSet{ + metadata: make([]*MetricMetadata, 0), + } +} + +func (m *passthroughMetadataSet) add(family string, mm MetricMetadata) { + m.metadata = append(m.metadata, &mm) +} + +func (m *passthroughMetadataSet) len() int { + return len(m.metadata) +} + +func (m *passthroughMetadataSet) slice() []*MetricMetadata { + return m.metadata +} + // orderAwareMetricMetadata is a tuple (index, metadata) that knows its own position in a metadata slice. // It's tied to custom logic that unmarshals RW2 metadata into a map, and allows us to // remember the order that metadata arrived in when unmarshalling. diff --git a/pkg/mimirpb/mimir.pb.go b/pkg/mimirpb/mimir.pb.go index f8afd4cc9b7..e7d3c6a4392 100644 --- a/pkg/mimirpb/mimir.pb.go +++ b/pkg/mimirpb/mimir.pb.go @@ -307,6 +307,8 @@ type WriteRequest struct { skipUnmarshalingExemplars bool // Skip normalization of metadata metric names when unmarshalling the request. skipNormalizeMetadataMetricName bool + // Skip deduplication of metric metadata by family name. + skipDeduplicateMetadata bool // Unmarshal from Remote Write 2.0. if rw2symbols is not nil. unmarshalFromRW2 bool rw2symbols rw2PagedSymbols @@ -7584,7 +7586,7 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { m.Timeseries = append(m.Timeseries, PreallocTimeseries{}) m.Timeseries[len(m.Timeseries)-1].skipUnmarshalingExemplars = m.skipUnmarshalingExemplars if metadata == nil { - metadata = newDedupingMetadataSet() + metadata = metadataSetFromSettings(m.skipDeduplicateMetadata) } if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex], &m.rw2symbols, metadata, m.skipNormalizeMetadataMetricName); err != nil { return err diff --git a/pkg/mimirpb/mimir.pb.go.expdiff b/pkg/mimirpb/mimir.pb.go.expdiff index d77a28d82ba..589eb878a1d 100644 --- a/pkg/mimirpb/mimir.pb.go.expdiff +++ b/pkg/mimirpb/mimir.pb.go.expdiff @@ -1,5 +1,5 @@ diff --git a/pkg/mimirpb/mimir.pb.go b/pkg/mimirpb/mimir.pb.go -index f8afd4cc9b..dda8609298 100644 +index e7d3c6a439..dda8609298 100644 --- a/pkg/mimirpb/mimir.pb.go +++ b/pkg/mimirpb/mimir.pb.go @@ -14,7 +14,6 @@ import ( @@ -20,7 +20,7 @@ index f8afd4cc9b..dda8609298 100644 Timeseries []PreallocTimeseries `protobuf:"bytes,1,rep,name=timeseries,proto3,customtype=PreallocTimeseries" json:"timeseries"` Source WriteRequest_SourceEnum `protobuf:"varint,2,opt,name=Source,proto3,enum=cortexpb.WriteRequest_SourceEnum" json:"Source,omitempty"` Metadata []*MetricMetadata `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty"` -@@ -302,14 +298,6 @@ type WriteRequest struct { +@@ -302,16 +298,6 @@ type WriteRequest struct { SkipLabelValidation bool `protobuf:"varint,1000,opt,name=skip_label_validation,json=skipLabelValidation,proto3" json:"skip_label_validation,omitempty"` // Skip label count validation. SkipLabelCountValidation bool `protobuf:"varint,1001,opt,name=skip_label_count_validation,json=skipLabelCountValidation,proto3" json:"skip_label_count_validation,omitempty"` @@ -29,13 +29,15 @@ index f8afd4cc9b..dda8609298 100644 - skipUnmarshalingExemplars bool - // Skip normalization of metadata metric names when unmarshalling the request. - skipNormalizeMetadataMetricName bool +- // Skip deduplication of metric metadata by family name. +- skipDeduplicateMetadata bool - // Unmarshal from Remote Write 2.0. if rw2symbols is not nil. - unmarshalFromRW2 bool - rw2symbols rw2PagedSymbols } func (m *WriteRequest) Reset() { *m = WriteRequest{} } -@@ -472,11 +460,6 @@ func (m *ErrorDetails) GetSoft() bool { +@@ -474,11 +460,6 @@ func (m *ErrorDetails) GetSoft() bool { return false } @@ -47,7 +49,7 @@ index f8afd4cc9b..dda8609298 100644 type TimeSeries struct { Labels []UnsafeMutableLabel `protobuf:"bytes,1,rep,name=labels,proto3,customtype=UnsafeMutableLabel" json:"labels"` // Sorted by time, oldest sample first. -@@ -489,9 +472,6 @@ type TimeSeries struct { +@@ -491,9 +472,6 @@ type TimeSeries struct { // Zero value means value not set. If you need to use exactly zero value for // the timestamp, use 1 millisecond before or after. CreatedTimestamp int64 `protobuf:"varint,6,opt,name=created_timestamp,json=createdTimestamp,proto3" json:"created_timestamp,omitempty"` @@ -57,7 +59,7 @@ index f8afd4cc9b..dda8609298 100644 } func (m *TimeSeries) Reset() { *m = TimeSeries{} } -@@ -5961,25 +5941,19 @@ func (m *TimeSeriesRW2) MarshalToSizedBuffer(dAtA []byte) (int, error) { +@@ -5963,25 +5941,19 @@ func (m *TimeSeriesRW2) MarshalToSizedBuffer(dAtA []byte) (int, error) { } } if len(m.LabelsRefs) > 0 { @@ -88,7 +90,7 @@ index f8afd4cc9b..dda8609298 100644 i = encodeVarintMimir(dAtA, i, uint64(j21)) i-- dAtA[i] = 0xa -@@ -6019,25 +5993,19 @@ func (m *ExemplarRW2) MarshalToSizedBuffer(dAtA []byte) (int, error) { +@@ -6021,25 +5993,19 @@ func (m *ExemplarRW2) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0x11 } if len(m.LabelsRefs) > 0 { @@ -119,7 +121,7 @@ index f8afd4cc9b..dda8609298 100644 i = encodeVarintMimir(dAtA, i, uint64(j23)) i-- dAtA[i] = 0xa -@@ -7385,9 +7353,6 @@ func valueToStringMimir(v interface{}) string { +@@ -7387,9 +7353,6 @@ func valueToStringMimir(v interface{}) string { return fmt.Sprintf("*%v", pv) } func (m *WriteRequest) Unmarshal(dAtA []byte) error { @@ -129,7 +131,7 @@ index f8afd4cc9b..dda8609298 100644 l := len(dAtA) iNdEx := 0 for iNdEx < l { -@@ -7417,9 +7382,6 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { +@@ -7419,9 +7382,6 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: @@ -139,7 +141,7 @@ index f8afd4cc9b..dda8609298 100644 if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) } -@@ -7449,8 +7411,7 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { +@@ -7451,8 +7411,7 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { return io.ErrUnexpectedEOF } m.Timeseries = append(m.Timeseries, PreallocTimeseries{}) @@ -149,7 +151,7 @@ index f8afd4cc9b..dda8609298 100644 return err } iNdEx = postIndex -@@ -7474,9 +7435,6 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { +@@ -7476,9 +7435,6 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { } } case 3: @@ -159,7 +161,7 @@ index f8afd4cc9b..dda8609298 100644 if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) } -@@ -7511,9 +7469,6 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { +@@ -7513,9 +7469,6 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { } iNdEx = postIndex case 4: @@ -169,7 +171,7 @@ index f8afd4cc9b..dda8609298 100644 if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field SymbolsRW2", wireType) } -@@ -7543,16 +7498,9 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { +@@ -7545,16 +7498,9 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } @@ -187,14 +189,14 @@ index f8afd4cc9b..dda8609298 100644 if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field TimeseriesRW2", wireType) } -@@ -7581,12 +7529,8 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { +@@ -7583,12 +7529,8 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Timeseries = append(m.Timeseries, PreallocTimeseries{}) - m.Timeseries[len(m.Timeseries)-1].skipUnmarshalingExemplars = m.skipUnmarshalingExemplars - if metadata == nil { -- metadata = newDedupingMetadataSet() +- metadata = metadataSetFromSettings(m.skipDeduplicateMetadata) - } - if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex], &m.rw2symbols, metadata, m.skipNormalizeMetadataMetricName); err != nil { + m.TimeseriesRW2 = append(m.TimeseriesRW2, TimeSeriesRW2{}) @@ -202,7 +204,7 @@ index f8afd4cc9b..dda8609298 100644 return err } iNdEx = postIndex -@@ -7649,12 +7593,6 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { +@@ -7651,12 +7593,6 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { if iNdEx > l { return io.ErrUnexpectedEOF } @@ -215,7 +217,7 @@ index f8afd4cc9b..dda8609298 100644 return nil } func (m *WriteResponse) Unmarshal(dAtA []byte) error { -@@ -7922,11 +7860,9 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error { +@@ -7924,11 +7860,9 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } @@ -230,7 +232,7 @@ index f8afd4cc9b..dda8609298 100644 } iNdEx = postIndex case 4: -@@ -11235,10 +11171,6 @@ func (m *WriteRequestRW2) Unmarshal(dAtA []byte) error { +@@ -11237,10 +11171,6 @@ func (m *WriteRequestRW2) Unmarshal(dAtA []byte) error { return nil } func (m *TimeSeriesRW2) Unmarshal(dAtA []byte) error { @@ -241,7 +243,7 @@ index f8afd4cc9b..dda8609298 100644 l := len(dAtA) iNdEx := 0 for iNdEx < l { -@@ -11269,7 +11201,22 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat +@@ -11271,7 +11201,22 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat switch fieldNum { case 1: if wireType == 0 { @@ -265,7 +267,7 @@ index f8afd4cc9b..dda8609298 100644 } else if wireType == 2 { var packedLen int for shift := uint(0); ; shift += 7 { -@@ -11304,14 +11251,9 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat +@@ -11306,14 +11251,9 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat } } elementCount = count @@ -282,7 +284,7 @@ index f8afd4cc9b..dda8609298 100644 for iNdEx < postIndex { var v uint32 for shift := uint(0); ; shift += 7 { -@@ -11328,27 +11270,7 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat +@@ -11330,27 +11270,7 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat break } } @@ -311,7 +313,7 @@ index f8afd4cc9b..dda8609298 100644 } } else { return fmt.Errorf("proto: wrong wireType = %d for field LabelsRefs", wireType) -@@ -11450,11 +11372,9 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat +@@ -11452,11 +11372,9 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat if postIndex > l { return io.ErrUnexpectedEOF } @@ -326,7 +328,7 @@ index f8afd4cc9b..dda8609298 100644 } iNdEx = postIndex case 5: -@@ -11486,7 +11406,7 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat +@@ -11488,7 +11406,7 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat if postIndex > l { return io.ErrUnexpectedEOF } @@ -335,7 +337,7 @@ index f8afd4cc9b..dda8609298 100644 return err } iNdEx = postIndex -@@ -11531,10 +11451,6 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat +@@ -11533,10 +11451,6 @@ func (m *TimeSeries) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadat return nil } func (m *ExemplarRW2) Unmarshal(dAtA []byte) error { @@ -346,7 +348,7 @@ index f8afd4cc9b..dda8609298 100644 l := len(dAtA) iNdEx := 0 for iNdEx < l { -@@ -11565,7 +11481,22 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { +@@ -11567,7 +11481,22 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { switch fieldNum { case 1: if wireType == 0 { @@ -370,7 +372,7 @@ index f8afd4cc9b..dda8609298 100644 } else if wireType == 2 { var packedLen int for shift := uint(0); ; shift += 7 { -@@ -11600,13 +11531,9 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { +@@ -11602,13 +11531,9 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { } } elementCount = count @@ -386,7 +388,7 @@ index f8afd4cc9b..dda8609298 100644 for iNdEx < postIndex { var v uint32 for shift := uint(0); ; shift += 7 { -@@ -11623,20 +11550,7 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { +@@ -11625,20 +11550,7 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { break } } @@ -408,7 +410,7 @@ index f8afd4cc9b..dda8609298 100644 } } else { return fmt.Errorf("proto: wrong wireType = %d for field LabelsRefs", wireType) -@@ -11656,7 +11570,7 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { +@@ -11658,7 +11570,7 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) } @@ -417,7 +419,7 @@ index f8afd4cc9b..dda8609298 100644 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowMimir -@@ -11666,7 +11580,7 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { +@@ -11668,7 +11580,7 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { } b := dAtA[iNdEx] iNdEx++ @@ -426,7 +428,7 @@ index f8afd4cc9b..dda8609298 100644 if b < 0x80 { break } -@@ -11693,16 +11607,6 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { +@@ -11695,16 +11607,6 @@ func (m *Exemplar) UnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols) error { return nil } func (m *MetadataRW2) Unmarshal(dAtA []byte) error { @@ -443,7 +445,7 @@ index f8afd4cc9b..dda8609298 100644 l := len(dAtA) iNdEx := 0 for iNdEx < l { -@@ -11735,7 +11639,7 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata +@@ -11737,7 +11639,7 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) } @@ -452,7 +454,7 @@ index f8afd4cc9b..dda8609298 100644 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowMimir -@@ -11745,7 +11649,7 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata +@@ -11747,7 +11649,7 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata } b := dAtA[iNdEx] iNdEx++ @@ -461,7 +463,7 @@ index f8afd4cc9b..dda8609298 100644 if b < 0x80 { break } -@@ -11754,7 +11658,7 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata +@@ -11756,7 +11658,7 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field HelpRef", wireType) } @@ -470,7 +472,7 @@ index f8afd4cc9b..dda8609298 100644 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowMimir -@@ -11764,20 +11668,16 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata +@@ -11766,20 +11668,16 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata } b := dAtA[iNdEx] iNdEx++ @@ -493,7 +495,7 @@ index f8afd4cc9b..dda8609298 100644 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowMimir -@@ -11787,15 +11687,11 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata +@@ -11789,15 +11687,11 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata } b := dAtA[iNdEx] iNdEx++ @@ -510,7 +512,7 @@ index f8afd4cc9b..dda8609298 100644 default: iNdEx = preIndex skippy, err := skipMimir(dAtA[iNdEx:]) -@@ -11815,23 +11711,6 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata +@@ -11817,23 +11711,6 @@ func MetricMetadataUnmarshalRW2(dAtA []byte, symbols *rw2PagedSymbols, metadata if iNdEx > l { return io.ErrUnexpectedEOF } diff --git a/pkg/mimirpb/prealloc_rw2.go b/pkg/mimirpb/prealloc_rw2.go index a7be8c3c0cf..f2a0a8b1fbc 100644 --- a/pkg/mimirpb/prealloc_rw2.go +++ b/pkg/mimirpb/prealloc_rw2.go @@ -29,6 +29,7 @@ func FromWriteRequestToRW2Request(rw1 *WriteRequest, commonSymbols []string, off SkipLabelCountValidation: rw1.SkipLabelCountValidation, skipUnmarshalingExemplars: rw1.skipUnmarshalingExemplars, skipNormalizeMetadataMetricName: rw1.skipNormalizeMetadataMetricName, + skipDeduplicateMetadata: rw1.skipDeduplicateMetadata, } symbols := symbolsTableFromPool() diff --git a/pkg/mimirpb/prealloc_rw2_test.go b/pkg/mimirpb/prealloc_rw2_test.go index 0dcb34386bf..96503905a74 100644 --- a/pkg/mimirpb/prealloc_rw2_test.go +++ b/pkg/mimirpb/prealloc_rw2_test.go @@ -19,6 +19,7 @@ func TestWriteRequestRW2Conversion(t *testing.T) { SkipLabelCountValidation: true, skipUnmarshalingExemplars: true, skipNormalizeMetadataMetricName: true, + skipDeduplicateMetadata: true, } rw2, err := FromWriteRequestToRW2Request(req, nil, 0) @@ -28,6 +29,7 @@ func TestWriteRequestRW2Conversion(t *testing.T) { require.True(t, rw2.SkipLabelCountValidation) require.True(t, rw2.skipUnmarshalingExemplars) require.True(t, rw2.skipNormalizeMetadataMetricName) + require.True(t, rw2.skipDeduplicateMetadata) }) t.Run("nil request turns into nil request", func(t *testing.T) { @@ -693,6 +695,7 @@ func TestWriteRequestRW2Conversion_WriteRequestHasChanged(t *testing.T) { "SkipLabelCountValidation", "skipUnmarshalingExemplars", "skipNormalizeMetadataMetricName", + "skipDeduplicateMetadata", "unmarshalFromRW2", "rw2symbols", "BufferHolder", diff --git a/pkg/mimirpb/split.go b/pkg/mimirpb/split.go index e951bc19f7b..0b043731e70 100644 --- a/pkg/mimirpb/split.go +++ b/pkg/mimirpb/split.go @@ -68,6 +68,7 @@ func SplitWriteRequestByMaxMarshalSizeRW2(req *WriteRequest, reqSize, maxSize in SkipLabelValidation: req.SkipLabelValidation, skipUnmarshalingExemplars: req.skipUnmarshalingExemplars, skipNormalizeMetadataMetricName: req.skipNormalizeMetadataMetricName, + skipDeduplicateMetadata: req.skipDeduplicateMetadata, TimeseriesRW2: make([]TimeSeriesRW2, 0, estimatedTimeseriesPerPartialReq), } @@ -132,6 +133,7 @@ func splitTimeseriesByMaxMarshalSize(req *WriteRequest, reqSize, maxSize int) [] Source: req.Source, SkipLabelValidation: req.SkipLabelValidation, skipNormalizeMetadataMetricName: req.skipNormalizeMetadataMetricName, + skipDeduplicateMetadata: req.skipDeduplicateMetadata, } return r, r.Size() @@ -198,6 +200,7 @@ func splitMetadataByMaxMarshalSize(req *WriteRequest, reqSize, maxSize int) []*W SkipLabelValidation: req.SkipLabelValidation, skipUnmarshalingExemplars: req.skipUnmarshalingExemplars, skipNormalizeMetadataMetricName: req.skipNormalizeMetadataMetricName, + skipDeduplicateMetadata: req.skipDeduplicateMetadata, } return r, r.Size() } diff --git a/pkg/mimirpb/split_test.go b/pkg/mimirpb/split_test.go index deebe2b1f4e..2bcd267168e 100644 --- a/pkg/mimirpb/split_test.go +++ b/pkg/mimirpb/split_test.go @@ -444,6 +444,7 @@ func TestSplitWriteRequestByMaxMarshalSize_WriteRequestHasChanged(t *testing.T) "SkipLabelCountValidation", "skipUnmarshalingExemplars", "skipNormalizeMetadataMetricName", + "skipDeduplicateMetadata", "unmarshalFromRW2", "rw2symbols", "BufferHolder", diff --git a/pkg/mimirpb/timeseries.go b/pkg/mimirpb/timeseries.go index 95d70337964..2c58fd81e94 100644 --- a/pkg/mimirpb/timeseries.go +++ b/pkg/mimirpb/timeseries.go @@ -76,6 +76,11 @@ type PreallocWriteRequest struct { // SkipNormalizeMetadataMetricName skips normalization of metric name in metadata on unmarshal. E.g., don't remove `_count` suffixes from histograms. // Has no effect on marshalled or existing structs; must be set prior to Unmarshal calls. SkipNormalizeMetadataMetricName bool + // SkipDeduplicateMetadata skips deduplication of RW2 metadata by metric family name. + // Normally this is done because RW2 requests to repeat metadata as it's embedded in timeseries. + // Some applications, like RW1->RW2 translation, might choose to disable it. + // Has no effect on marshalled or existing structs; must be set prior to Unmarshal calls. + SkipDeduplicateMetadata bool } // Unmarshal implements proto.Message. @@ -85,6 +90,7 @@ func (p *PreallocWriteRequest) Unmarshal(dAtA []byte) error { p.Timeseries = PreallocTimeseriesSliceFromPool() p.skipUnmarshalingExemplars = p.SkipUnmarshalingExemplars p.skipNormalizeMetadataMetricName = p.SkipNormalizeMetadataMetricName + p.skipDeduplicateMetadata = p.SkipDeduplicateMetadata p.unmarshalFromRW2 = p.UnmarshalFromRW2 p.rw2symbols.offset = p.RW2SymbolOffset p.rw2symbols.commonSymbols = p.RW2CommonSymbols diff --git a/pkg/storage/ingest/version.go b/pkg/storage/ingest/version.go index e4e7dab2c26..67c6cf1d3d1 100644 --- a/pkg/storage/ingest/version.go +++ b/pkg/storage/ingest/version.go @@ -190,6 +190,7 @@ func deserializeRecordContentV2(content []byte, wr *mimirpb.PreallocWriteRequest wr.RW2SymbolOffset = V2RecordSymbolOffset wr.RW2CommonSymbols = V2CommonSymbols wr.SkipNormalizeMetadataMetricName = true + wr.SkipDeduplicateMetadata = true return wr.Unmarshal(content) } diff --git a/pkg/storage/ingest/version_test.go b/pkg/storage/ingest/version_test.go index e5bb872b38e..fb066a77a3f 100644 --- a/pkg/storage/ingest/version_test.go +++ b/pkg/storage/ingest/version_test.go @@ -240,6 +240,55 @@ func TestDeserializeRecordContent(t *testing.T) { }} require.Equal(t, expMetadata, wr.Metadata) }) + + t.Run("v2 preserves conflicting metadata", func(t *testing.T) { + syms := test.NewSymbolTableBuilderWithCommon(nil, V2RecordSymbolOffset, V2CommonSymbols) + reqv2 := &mimirpb.WriteRequestRW2{ + Timeseries: []mimirpb.TimeSeriesRW2{ + { + LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_histogram_seconds_bucket")}, + Metadata: mimirpb.MetadataRW2{ + Type: mimirpb.METRIC_TYPE_HISTOGRAM, + HelpRef: syms.GetSymbol("Help for test_histogram_seconds_bucket"), + UnitRef: syms.GetSymbol("seconds"), + }, + }, + { + LabelsRefs: []uint32{syms.GetSymbol("__name__"), syms.GetSymbol("test_histogram_seconds_bucket")}, + Metadata: mimirpb.MetadataRW2{ + Type: mimirpb.METRIC_TYPE_GAUGE, + HelpRef: syms.GetSymbol("Help for test_histogram_seconds_bucket, but different"), + UnitRef: syms.GetSymbol("seconds"), + }, + }, + }, + } + reqv2.Symbols = syms.GetSymbols() + // Symbols should not contain common labels "__name__" and "job" + require.Equal(t, []string{"", "test_histogram_seconds_bucket", "Help for test_histogram_seconds_bucket", "seconds", "Help for test_histogram_seconds_bucket, but different"}, reqv2.Symbols) + v2bytes, err := reqv2.Marshal() + require.NoError(t, err) + + wr := mimirpb.PreallocWriteRequest{} + err = DeserializeRecordContent(v2bytes, &wr, 2) + require.NoError(t, err) + require.Len(t, wr.Timeseries, 2) + expMetadata := []*mimirpb.MetricMetadata{ + { + Type: mimirpb.HISTOGRAM, + MetricFamilyName: "test_histogram_seconds_bucket", + Help: "Help for test_histogram_seconds_bucket", + Unit: "seconds", + }, + { + Type: mimirpb.GAUGE, + MetricFamilyName: "test_histogram_seconds_bucket", + Help: "Help for test_histogram_seconds_bucket, but different", + Unit: "seconds", + }, + } + require.Equal(t, expMetadata, wr.Metadata) + }) } func TestRecordSerializer(t *testing.T) {