diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 6218916ce5..d6176265a4 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -726,7 +726,7 @@ func runRule( infoOptions = append( infoOptions, info.WithLabelSetFunc(func() []labelpb.ZLabelSet { - return tsdbStore.LabelSet() + return labelpb.ZLabelSetsFromPromLabels(tsdbStore.LabelSet()...) }), info.WithStoreInfoFunc(func() (*infopb.StoreInfo, error) { if httpProbe.IsReady() { diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 8b89e0c1a1..e16691d152 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -5,7 +5,6 @@ package receive import ( "context" - "fmt" "os" "path" "path/filepath" @@ -217,7 +216,7 @@ func (l *localClient) Matches(matchers []*labels.Matcher) bool { } func (l *localClient) LabelSets() []labels.Labels { - return labelpb.ZLabelSetsToPromLabelSets(l.store.LabelSet()...) + return l.store.LabelSet() } func (l *localClient) TimeRange() (mint int64, maxt int64) { @@ -233,7 +232,9 @@ func (l *localClient) TSDBInfos() []infopb.TSDBInfo { mint, maxt := l.store.TimeRange() return []infopb.TSDBInfo{ { - Labels: labelsets[0], + Labels: labelpb.ZLabelSet{ + Labels: labelpb.ZLabelsFromPromLabels(labelsets[0]), + }, MinTime: mint, MaxTime: maxt, }, @@ -241,11 +242,7 @@ func (l *localClient) TSDBInfos() []infopb.TSDBInfo { } func (l *localClient) String() string { - mint, maxt := l.store.TimeRange() - return fmt.Sprintf( - "LabelSets: %v MinTime: %d MaxTime: %d", - labelpb.PromLabelSetsToString(l.LabelSets()), mint, maxt, - ) + return l.store.String() } func (l *localClient) Addr() (string, bool) { diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index 1ab909fd5f..b1fa4ec508 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -16,7 +16,6 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/store" - "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/testutil/custom" ) @@ -835,7 +834,7 @@ func setupSetsOfExpectedAndActualStoreClientLabelSets( testStore := store.TSDBStore{} testStore.SetExtLset(expectedExternalLabelSets[i]) - expectedClientLabelSets := labelpb.ZLabelSetsToPromLabelSets(testStore.LabelSet()...) + expectedClientLabelSets := testStore.LabelSet() setOfExpectedClientLabelSets = append(setOfExpectedClientLabelSets, expectedClientLabelSets) actualClientLabelSets := actualStoreClients[i].LabelSets() diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 2829bec439..838989cb27 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -4,38 +4,16 @@ // Package containing proto and JSON serializable Labels and ZLabels (no copy) structs used to // identify series. This package expose no-copy converters to Prometheus labels.Labels. +//go:build !stringlabels && !dedupelabels + package labelpb import ( - "encoding/json" - "fmt" - "io" - "sort" - "strings" "unsafe" - "github.com/cespare/xxhash/v2" - "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" - "go4.org/intern" -) - -var ( - ErrOutOfOrderLabels = errors.New("out of order labels") - ErrEmptyLabels = errors.New("label set contains a label with empty name or value") - ErrDuplicateLabels = errors.New("label set contains duplicate label names") - - sep = []byte{'\xff'} ) -func noAllocString(buf []byte) string { - return *(*string)(unsafe.Pointer(&buf)) -} - -func noAllocBytes(buf string) []byte { - return *(*[]byte)(unsafe.Pointer(&buf)) -} - // ZLabelsFromPromLabels converts Prometheus labels to slice of labelpb.ZLabel in type unsafe manner. // It reuses the same memory. Caller should abort using passed labels.Labels. func ZLabelsFromPromLabels(lset labels.Labels) []ZLabel { @@ -50,39 +28,6 @@ func ZLabelsToPromLabels(lset []ZLabel) labels.Labels { return *(*labels.Labels)(unsafe.Pointer(&lset)) } -// ReAllocAndInternZLabelsStrings re-allocates all underlying bytes for string, detaching it from bigger memory pool. -// If `intern` is set to true, the method will use interning, i.e. reuse already allocated strings, to make the reallocation -// method more efficient. -// -// This is primarily intended to be used before labels are written into TSDB which can hold label strings in the memory long term. -func ReAllocZLabelsStrings(lset *[]ZLabel, intern bool) { - if intern { - for j, l := range *lset { - (*lset)[j].Name = detachAndInternLabelString(l.Name) - (*lset)[j].Value = detachAndInternLabelString(l.Value) - } - return - } - - for j, l := range *lset { - (*lset)[j].Name = string(noAllocBytes(l.Name)) - (*lset)[j].Value = string(noAllocBytes(l.Value)) - } -} - -// internLabelString is a helper method to intern a label string or, -// if the string was previously interned, it returns the existing -// reference and asserts it to a string. -func internLabelString(s string) string { - return intern.GetByString(s).Get().(string) -} - -// detachAndInternLabelString reallocates the label string to detach it -// from a bigger memory pool and interns the string. -func detachAndInternLabelString(s string) string { - return internLabelString(string(noAllocBytes(s))) -} - // ZLabelSetsToPromLabelSets converts slice of labelpb.ZLabelSet to slice of Prometheus labels. func ZLabelSetsToPromLabelSets(lss ...ZLabelSet) []labels.Labels { res := make([]labels.Labels, 0, len(lss)) @@ -110,318 +55,3 @@ func ZLabelSetsFromPromLabels(lss ...labels.Labels) []ZLabelSet { return sets } - -// ZLabel is a Label (also easily transformable to Prometheus labels.Labels) that can be unmarshalled from protobuf -// reusing the same memory address for string bytes. -// NOTE: While unmarshalling it uses exactly same bytes that were allocated for protobuf. This mean that *whole* protobuf -// bytes will be not GC-ed as long as ZLabels are referenced somewhere. Use it carefully, only for short living -// protobuf message processing. -type ZLabel Label - -func (m *ZLabel) MarshalTo(data []byte) (int, error) { - f := Label(*m) - return f.MarshalTo(data) -} - -func (m *ZLabel) MarshalToSizedBuffer(data []byte) (int, error) { - f := Label(*m) - return f.MarshalToSizedBuffer(data) -} - -// Unmarshal unmarshalls gRPC protobuf into ZLabel struct. ZLabel string is directly using bytes passed in `data`. -// To use it add (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel" to proto field tag. -// NOTE: This exists in internal Google protobuf implementation, but not in open source one: https://news.ycombinator.com/item?id=23588882 -func (m *ZLabel) Unmarshal(data []byte) error { - l := len(data) - - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowTypes - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: ZLabel: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: ZLabel: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowTypes - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthTypes - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthTypes - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Name = noAllocString(data[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowTypes - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthTypes - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthTypes - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Value = noAllocString(data[iNdEx:postIndex]) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipTypes(data[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthTypes - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthTypes - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} - -func (m *ZLabel) UnmarshalJSON(entry []byte) error { - f := Label(*m) - if err := json.Unmarshal(entry, &f); err != nil { - return errors.Wrapf(err, "labels: label field unmarshal: %v", string(entry)) - } - *m = ZLabel(f) - return nil -} - -func (m *ZLabel) Marshal() ([]byte, error) { - f := Label(*m) - return f.Marshal() -} - -func (m *ZLabel) MarshalJSON() ([]byte, error) { - return json.Marshal(Label(*m)) -} - -// Size implements proto.Sizer. -func (m *ZLabel) Size() (n int) { - f := Label(*m) - return f.Size() -} - -// Equal implements proto.Equaler. -func (m *ZLabel) Equal(other ZLabel) bool { - return m.Name == other.Name && m.Value == other.Value -} - -// Compare implements proto.Comparer. -func (m *ZLabel) Compare(other ZLabel) int { - if c := strings.Compare(m.Name, other.Name); c != 0 { - return c - } - return strings.Compare(m.Value, other.Value) -} - -// ExtendSortedLabels extend given labels by extend in labels format. -// The type conversion is done safely, which means we don't modify extend labels underlying array. -// -// In case of existing labels already present in given label set, it will be overwritten by external one. -func ExtendSortedLabels(lset, extend labels.Labels) labels.Labels { - if extend.IsEmpty() { - return lset.Copy() - } - b := labels.NewBuilder(lset) - extend.Range(func(l labels.Label) { - b.Set(l.Name, l.Value) - }) - return b.Labels() -} - -func PromLabelSetsToString(lsets []labels.Labels) string { - s := []string{} - for _, ls := range lsets { - s = append(s, ls.String()) - } - sort.Strings(s) - return strings.Join(s, ",") -} - -func (m *ZLabelSet) UnmarshalJSON(entry []byte) error { - lbls := labels.Labels{} - if err := lbls.UnmarshalJSON(entry); err != nil { - return errors.Wrapf(err, "labels: labels field unmarshal: %v", string(entry)) - } - m.Labels = ZLabelsFromPromLabels(lbls) - return nil -} - -func (m *ZLabelSet) MarshalJSON() ([]byte, error) { - return m.PromLabels().MarshalJSON() -} - -// PromLabels return Prometheus labels.Labels without extra allocation. -func (m *ZLabelSet) PromLabels() labels.Labels { - return ZLabelsToPromLabels(m.Labels) -} - -// DeepCopy copies labels and each label's string to separate bytes. -func DeepCopy(lbls []ZLabel) []ZLabel { - ret := make([]ZLabel, len(lbls)) - for i := range lbls { - ret[i].Name = string(noAllocBytes(lbls[i].Name)) - ret[i].Value = string(noAllocBytes(lbls[i].Value)) - } - return ret -} - -// HashWithPrefix returns a hash for the given prefix and labels. -func HashWithPrefix(prefix string, lbls []ZLabel) uint64 { - // Use xxhash.Sum64(b) for fast path as it's faster. - b := make([]byte, 0, 1024) - b = append(b, prefix...) - b = append(b, sep[0]) - - for i, v := range lbls { - if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { - // If labels entry is 1KB allocate do not allocate whole entry. - h := xxhash.New() - _, _ = h.Write(b) - for _, v := range lbls[i:] { - _, _ = h.WriteString(v.Name) - _, _ = h.Write(sep) - _, _ = h.WriteString(v.Value) - _, _ = h.Write(sep) - } - return h.Sum64() - } - b = append(b, v.Name...) - b = append(b, sep[0]) - b = append(b, v.Value...) - b = append(b, sep[0]) - } - return xxhash.Sum64(b) -} - -// ValidateLabels validates label names and values (checks for empty -// names and values, out of order labels and duplicate label names) -// Returns appropriate error if validation fails on a label. -func ValidateLabels(lbls []ZLabel) error { - if len(lbls) == 0 { - return ErrEmptyLabels - } - - // Check first label. - l0 := lbls[0] - if l0.Name == "" || l0.Value == "" { - return ErrEmptyLabels - } - - // Iterate over the rest, check each for empty / duplicates and - // check lexicographical (alphabetically) ordering. - for _, l := range lbls[1:] { - if l.Name == "" || l.Value == "" { - return ErrEmptyLabels - } - - if l.Name == l0.Name { - return ErrDuplicateLabels - } - - if l.Name < l0.Name { - return ErrOutOfOrderLabels - } - l0 = l - } - - return nil -} - -// ZLabelSets is a sortable list of ZLabelSet. It assumes the label pairs in each ZLabelSet element are already sorted. -type ZLabelSets []ZLabelSet - -func (z ZLabelSets) Len() int { return len(z) } - -func (z ZLabelSets) Swap(i, j int) { z[i], z[j] = z[j], z[i] } - -func (z ZLabelSets) Less(i, j int) bool { - l := 0 - r := 0 - var result int - lenI, lenJ := len(z[i].Labels), len(z[j].Labels) - for l < lenI && r < lenJ { - result = z[i].Labels[l].Compare(z[j].Labels[r]) - if result == 0 { - l++ - r++ - continue - } - return result < 0 - } - - return l == lenI -} diff --git a/pkg/store/labelpb/label_common.go b/pkg/store/labelpb/label_common.go new file mode 100644 index 0000000000..4ecd7cf7a2 --- /dev/null +++ b/pkg/store/labelpb/label_common.go @@ -0,0 +1,411 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Package containing proto and JSON serializable Labels and ZLabels (no copy) structs used to +// identify series. This package expose no-copy converters to Prometheus labels.Labels. + +package labelpb + +import ( + "encoding/json" + "fmt" + "io" + "sort" + "strings" + "unsafe" + + "github.com/cespare/xxhash/v2" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "go4.org/intern" +) + +var ( + ErrOutOfOrderLabels = errors.New("out of order labels") + ErrEmptyLabels = errors.New("label set contains a label with empty name or value") + ErrDuplicateLabels = errors.New("label set contains duplicate label names") + + sep = []byte{'\xff'} +) + +func noAllocString(buf []byte) string { + return *(*string)(unsafe.Pointer(&buf)) +} + +func noAllocBytes(buf string) []byte { + return *(*[]byte)(unsafe.Pointer(&buf)) +} + +// ReAllocAndInternZLabelsStrings re-allocates all underlying bytes for string, detaching it from bigger memory pool. +// If `intern` is set to true, the method will use interning, i.e. reuse already allocated strings, to make the reallocation +// method more efficient. +// +// This is primarily intended to be used before labels are written into TSDB which can hold label strings in the memory long term. +func ReAllocZLabelsStrings(lset *[]ZLabel, intern bool) { + if intern { + for j, l := range *lset { + (*lset)[j].Name = detachAndInternLabelString(l.Name) + (*lset)[j].Value = detachAndInternLabelString(l.Value) + } + return + } + + for j, l := range *lset { + (*lset)[j].Name = string(noAllocBytes(l.Name)) + (*lset)[j].Value = string(noAllocBytes(l.Value)) + } +} + +// internLabelString is a helper method to intern a label string or, +// if the string was previously interned, it returns the existing +// reference and asserts it to a string. +func internLabelString(s string) string { + return intern.GetByString(s).Get().(string) +} + +// detachAndInternLabelString reallocates the label string to detach it +// from a bigger memory pool and interns the string. +func detachAndInternLabelString(s string) string { + return internLabelString(string(noAllocBytes(s))) +} + +// ZLabel is a Label (also easily transformable to Prometheus labels.Labels) that can be unmarshalled from protobuf +// reusing the same memory address for string bytes. +// NOTE: While unmarshalling it uses exactly same bytes that were allocated for protobuf. This mean that *whole* protobuf +// bytes will be not GC-ed as long as ZLabels are referenced somewhere. Use it carefully, only for short living +// protobuf message processing. +type ZLabel Label + +func (m *ZLabel) MarshalTo(data []byte) (int, error) { + f := Label(*m) + return f.MarshalTo(data) +} + +func (m *ZLabel) MarshalToSizedBuffer(data []byte) (int, error) { + f := Label(*m) + return f.MarshalToSizedBuffer(data) +} + +// Unmarshal unmarshalls gRPC protobuf into ZLabel struct. ZLabel string is directly using bytes passed in `data`. +// To use it add (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel" to proto field tag. +// NOTE: This exists in internal Google protobuf implementation, but not in open source one: https://news.ycombinator.com/item?id=23588882 +func (m *ZLabel) Unmarshal(data []byte) error { + l := len(data) + + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ZLabel: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ZLabel: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = noAllocString(data[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = noAllocString(data[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTypes(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} + +func (m *ZLabel) UnmarshalJSON(entry []byte) error { + f := Label(*m) + if err := json.Unmarshal(entry, &f); err != nil { + return errors.Wrapf(err, "labels: label field unmarshal: %v", string(entry)) + } + *m = ZLabel(f) + return nil +} + +func (m *ZLabel) Marshal() ([]byte, error) { + f := Label(*m) + return f.Marshal() +} + +func (m *ZLabel) MarshalJSON() ([]byte, error) { + return json.Marshal(Label(*m)) +} + +// Size implements proto.Sizer. +func (m *ZLabel) Size() (n int) { + f := Label(*m) + return f.Size() +} + +// Equal implements proto.Equaler. +func (m *ZLabel) Equal(other ZLabel) bool { + return m.Name == other.Name && m.Value == other.Value +} + +// Compare implements proto.Comparer. +func (m *ZLabel) Compare(other ZLabel) int { + if c := strings.Compare(m.Name, other.Name); c != 0 { + return c + } + return strings.Compare(m.Value, other.Value) +} + +// ExtendSortedLabels extend given labels by extend in labels format. +// The type conversion is done safely, which means we don't modify extend labels underlying array. +// +// In case of existing labels already present in given label set, it will be overwritten by external one. +func ExtendSortedLabels(lset, extend labels.Labels) labels.Labels { + if extend.IsEmpty() { + return lset.Copy() + } + b := labels.NewBuilder(lset) + extend.Range(func(l labels.Label) { + b.Set(l.Name, l.Value) + }) + return b.Labels() +} + +func PromLabelSetsToString(lsets []labels.Labels) string { + s := []string{} + for _, ls := range lsets { + s = append(s, ls.String()) + } + sort.Strings(s) + return strings.Join(s, ",") +} + +func (m *ZLabelSet) UnmarshalJSON(entry []byte) error { + lbls := labels.Labels{} + if err := lbls.UnmarshalJSON(entry); err != nil { + return errors.Wrapf(err, "labels: labels field unmarshal: %v", string(entry)) + } + m.Labels = ZLabelsFromPromLabels(lbls) + return nil +} + +func (m *ZLabelSet) MarshalJSON() ([]byte, error) { + return m.PromLabels().MarshalJSON() +} + +// PromLabels return Prometheus labels.Labels without extra allocation. +func (m *ZLabelSet) PromLabels() labels.Labels { + return ZLabelsToPromLabels(m.Labels) +} + +// DeepCopy copies labels and each label's string to separate bytes. +func DeepCopy(lbls []ZLabel) []ZLabel { + ret := make([]ZLabel, len(lbls)) + for i := range lbls { + ret[i].Name = string(noAllocBytes(lbls[i].Name)) + ret[i].Value = string(noAllocBytes(lbls[i].Value)) + } + return ret +} + +// HashWithPrefix returns a hash for the given prefix and labels. +func HashWithPrefix(prefix string, lbls []ZLabel) uint64 { + // Use xxhash.Sum64(b) for fast path as it's faster. + b := make([]byte, 0, 1024) + b = append(b, prefix...) + b = append(b, sep[0]) + + for i, v := range lbls { + if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) { + // If labels entry is 1KB allocate do not allocate whole entry. + h := xxhash.New() + _, _ = h.Write(b) + for _, v := range lbls[i:] { + _, _ = h.WriteString(v.Name) + _, _ = h.Write(sep) + _, _ = h.WriteString(v.Value) + _, _ = h.Write(sep) + } + return h.Sum64() + } + b = append(b, v.Name...) + b = append(b, sep[0]) + b = append(b, v.Value...) + b = append(b, sep[0]) + } + return xxhash.Sum64(b) +} + +// ValidateLabels validates label names and values (checks for empty +// names and values, out of order labels and duplicate label names) +// Returns appropriate error if validation fails on a label. +func ValidateLabels(lbls []ZLabel) error { + if len(lbls) == 0 { + return ErrEmptyLabels + } + + // Check first label. + l0 := lbls[0] + if l0.Name == "" || l0.Value == "" { + return ErrEmptyLabels + } + + // Iterate over the rest, check each for empty / duplicates and + // check lexicographical (alphabetically) ordering. + for _, l := range lbls[1:] { + if l.Name == "" || l.Value == "" { + return ErrEmptyLabels + } + + if l.Name == l0.Name { + return ErrDuplicateLabels + } + + if l.Name < l0.Name { + return ErrOutOfOrderLabels + } + l0 = l + } + + return nil +} + +// ZLabelSets is a sortable list of ZLabelSet. It assumes the label pairs in each ZLabelSet element are already sorted. +type ZLabelSets []ZLabelSet + +func (z ZLabelSets) Len() int { return len(z) } + +func (z ZLabelSets) Swap(i, j int) { z[i], z[j] = z[j], z[i] } + +func (z ZLabelSets) Less(i, j int) bool { + l := 0 + r := 0 + var result int + lenI, lenJ := len(z[i].Labels), len(z[j].Labels) + for l < lenI && r < lenJ { + result = z[i].Labels[l].Compare(z[j].Labels[r]) + if result == 0 { + l++ + r++ + continue + } + return result < 0 + } + + return l == lenI +} + +// Compare compares the two label sets. +// The result will be 0 if a==b, <0 if a < b, and >0 if a > b. +func Compare(a, b []ZLabel) int { + l := len(a) + if len(b) < l { + l = len(b) + } + + for i := 0; i < l; i++ { + if a[i].Name != b[i].Name { + if a[i].Name < b[i].Name { + return -1 + } + return 1 + } + if a[i].Value != b[i].Value { + if a[i].Value < b[i].Value { + return -1 + } + return 1 + } + } + // If all labels so far were in common, the set with fewer labels comes first. + return len(a) - len(b) +} diff --git a/pkg/store/labelpb/label_stringlabels.go b/pkg/store/labelpb/label_stringlabels.go new file mode 100644 index 0000000000..f919555b44 --- /dev/null +++ b/pkg/store/labelpb/label_stringlabels.go @@ -0,0 +1,64 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Package containing proto and JSON serializable Labels and ZLabels (no copy) structs used to +// identify series. This package expose no-copy converters to Prometheus labels.Labels. + +//go:build stringlabels + +package labelpb + +import ( + "github.com/prometheus/prometheus/model/labels" +) + +// ZLabelsFromPromLabels converts Prometheus labels to slice of labelpb.ZLabel. +// It reuses the same memory. Caller should abort using passed labels.Labels. +func ZLabelsFromPromLabels(lset labels.Labels) []ZLabel { + zlabels := make([]ZLabel, 0, lset.Len()) + lset.Range(func(l labels.Label) { + zlabels = append(zlabels, ZLabel{ + Name: l.Name, + Value: l.Value, + }) + }) + return zlabels +} + +// ZLabelsToPromLabels convert slice of labelpb.ZLabel to Prometheus labels. +func ZLabelsToPromLabels(lset []ZLabel) labels.Labels { + builder := labels.NewScratchBuilder(len(lset)) + for _, l := range lset { + builder.Add(l.Name, l.Value) + } + builder.Sort() + return builder.Labels() +} + +// ZLabelSetsToPromLabelSets converts slice of labelpb.ZLabelSet to slice of Prometheus labels. +func ZLabelSetsToPromLabelSets(lss ...ZLabelSet) []labels.Labels { + res := make([]labels.Labels, 0, len(lss)) + for _, ls := range lss { + res = append(res, ls.PromLabels()) + } + return res +} + +// ZLabelSetsFromPromLabels converts []labels.labels to []labelpb.ZLabelSet. +func ZLabelSetsFromPromLabels(lss ...labels.Labels) []ZLabelSet { + sets := make([]ZLabelSet, 0, len(lss)) + for _, ls := range lss { + set := ZLabelSet{ + Labels: make([]ZLabel, 0, ls.Len()), + } + ls.Range(func(lbl labels.Label) { + set.Labels = append(set.Labels, ZLabel{ + Name: lbl.Name, + Value: lbl.Value, + }) + }) + sets = append(sets, set) + } + + return sets +} diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 4442cf8fdb..a42528ae6b 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -181,10 +181,7 @@ func NewProxyResponseLoserTree(seriesSets ...respSet) *losertree.Tree[*storepb.S return true } if a.GetSeries() != nil && b.GetSeries() != nil { - iLbls := labelpb.ZLabelsToPromLabels(a.GetSeries().Labels) - jLbls := labelpb.ZLabelsToPromLabels(b.GetSeries().Labels) - - return labels.Compare(iLbls, jLbls) < 0 + return labelpb.Compare(a.GetSeries().Labels, b.GetSeries().Labels) < 0 } else if a.GetSeries() == nil && b.GetSeries() != nil { return true } else if a.GetSeries() != nil && b.GetSeries() == nil { @@ -204,14 +201,6 @@ func (l *lazyRespSet) StoreID() string { return l.storeName } -func (l *lazyRespSet) Labelset() string { - return labelpb.PromLabelSetsToString(l.storeLabelSets) -} - -func (l *lazyRespSet) StoreLabels() map[string]struct{} { - return l.storeLabels -} - // lazyRespSet is a lazy storepb.SeriesSet that buffers // everything as fast as possible while at the same it permits // reading response-by-response. It blocks if there is no data @@ -447,12 +436,10 @@ func newAsyncRespSet( logger log.Logger, emptyStreamResponses prometheus.Counter, ) (respSet, error) { - var ( span opentracing.Span cancel context.CancelFunc ) - storeID, storeAddr, isLocalStore := storeInfo(st) seriesCtx := grpc_opentracing.ClientAddContextTags(ctx, opentracing.Tags{ "target": storeAddr, @@ -474,7 +461,7 @@ func newAsyncRespSet( cl, err := st.Series(seriesCtx, req) if err != nil { - err = errors.Wrapf(err, "fetch series for %s %s", storeID, st) + err = errors.Wrapf(err, "fetch series for %s", storeID) span.SetTag("err", err.Error()) span.Finish() @@ -755,20 +742,10 @@ func (l *eagerRespSet) StoreID() string { return l.storeName } -func (l *eagerRespSet) Labelset() string { - return labelpb.PromLabelSetsToString(l.storeLabelSets) -} - -func (l *eagerRespSet) StoreLabels() map[string]struct{} { - return l.storeLabels -} - type respSet interface { Close() At() *storepb.SeriesResponse Next() bool StoreID() string - Labelset() string - StoreLabels() map[string]struct{} Empty() bool } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index dd0ac551ff..8d19eaab70 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -502,6 +502,7 @@ func TestProxyStore_Series(t *testing.T) { ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, MinTime: 1, MaxTime: 300, + Name: labels.FromStrings("ext", "1").String(), }, }, req: &storepb.SeriesRequest{ @@ -511,7 +512,7 @@ func TestProxyStore_Series(t *testing.T) { PartialResponseDisabled: true, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, }, - expectedErr: errors.New("fetch series for {ext=\"1\"} : error!"), + expectedErr: errors.New("fetch series for {ext=\"1\"}: error!"), }, { title: "storeAPI available for time range; available series for ext=1 external label matcher; allowed by store debug matcher", @@ -1804,6 +1805,18 @@ func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) { } } +var buffersPool = &sync.Pool{New: func() interface{} { + b := make([]byte, 0, initialBufSize) + return &b +}} + +func assertSeriesMatchShard(t *testing.T, set []storepb.Series, info *storepb.ShardInfo) { + matcher := info.Matcher(buffersPool) + for _, series := range set { + testutil.Assert(t, matcher.MatchesZLabels(series.Labels)) + } +} + func TestStoreMatches(t *testing.T) { t.Parallel() diff --git a/pkg/store/storepb/shard_info.go b/pkg/store/storepb/shard_info.go index 28d559b49a..4ce529358f 100644 --- a/pkg/store/storepb/shard_info.go +++ b/pkg/store/storepb/shard_info.go @@ -45,7 +45,7 @@ func (s *ShardMatcher) MatchesZLabels(zLabels []labelpb.ZLabel) bool { *s.buf = (*s.buf)[:0] for _, lbl := range zLabels { - if shardByLabel(s.shardingLabelset, lbl, s.by) { + if shardByLabel(s.shardingLabelset, lbl.Name, s.by) { *s.buf = append(*s.buf, lbl.Name...) *s.buf = append(*s.buf, sep[0]) *s.buf = append(*s.buf, lbl.Value...) @@ -57,12 +57,32 @@ func (s *ShardMatcher) MatchesZLabels(zLabels []labelpb.ZLabel) bool { return hash%uint64(s.totalShards) == uint64(s.shardIndex) } -func (s *ShardMatcher) MatchesLabels(lbls labels.Labels) bool { - return s.MatchesZLabels(labelpb.ZLabelsFromPromLabels(lbls)) +type LabelsIter interface { + Range(func(labels.Label)) } -func shardByLabel(labelSet map[string]struct{}, zlabel labelpb.ZLabel, groupingBy bool) bool { - _, shardHasLabel := labelSet[zlabel.Name] +func (s *ShardMatcher) MatchesLabels(lbls LabelsIter) bool { + // Match all series when query is not sharded + if s == nil || !s.isSharded { + return true + } + + *s.buf = (*s.buf)[:0] + lbls.Range(func(lbl labels.Label) { + if shardByLabel(s.shardingLabelset, lbl.Name, s.by) { + *s.buf = append(*s.buf, lbl.Name...) + *s.buf = append(*s.buf, sep[0]) + *s.buf = append(*s.buf, lbl.Value...) + *s.buf = append(*s.buf, sep[0]) + } + }) + + hash := xxhash.Sum64(*s.buf) + return hash%uint64(s.totalShards) == uint64(s.shardIndex) +} + +func shardByLabel(labelSet map[string]struct{}, labelName string, groupingBy bool) bool { + _, shardHasLabel := labelSet[labelName] if groupingBy && shardHasLabel { return true } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index d1d88cb92c..e860e6285c 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -64,14 +64,18 @@ func WithMatcherCacheInstance(cache storecache.MatchersCache) TSDBStoreOption { // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. type TSDBStore struct { - logger log.Logger - db TSDBReader - component component.StoreAPI + logger log.Logger + db TSDBReader + component component.StoreAPI + + extLset labels.Labels + extLsetString string + extLabelsSlice []labels.Label + buffers sync.Pool maxBytesPerFrame int matcherCache storecache.MatchersCache - extLset labels.Labels startStoreFilterUpdate bool storeFilter filter.StoreFilter mtx sync.RWMutex @@ -109,10 +113,13 @@ func NewTSDBStore( } st := &TSDBStore{ - logger: logger, - db: db, - component: component, - extLset: extLset, + logger: logger, + db: db, + component: component, + extLset: extLset, + extLsetString: extLset.String(), + extLabelsSlice: labelsToSlice(extLset), + maxBytesPerFrame: RemoteReadFrameLimit, storeFilter: filter.AllowAllStoreFilter{}, close: func() {}, @@ -167,6 +174,8 @@ func (s *TSDBStore) SetExtLset(extLset labels.Labels) { defer s.mtx.Unlock() s.extLset = extLset + s.extLsetString = extLset.String() + s.extLabelsSlice = labelsToSlice(extLset) } func (s *TSDBStore) getExtLset() labels.Labels { @@ -176,14 +185,21 @@ func (s *TSDBStore) getExtLset() labels.Labels { return s.extLset } -func (s *TSDBStore) LabelSet() []labelpb.ZLabelSet { - labels := labelpb.ZLabelSetsFromPromLabels(s.getExtLset()) - labelSets := []labelpb.ZLabelSet{} - if len(labels) > 0 { - labelSets = append(labelSets, labels...) +func (s *TSDBStore) getExtLabelsSlice() []labels.Label { + s.mtx.RLock() + defer s.mtx.RUnlock() + + return s.extLabelsSlice +} + +func (s *TSDBStore) LabelSet() []labels.Labels { + return []labels.Labels{ + s.getExtLset(), } +} - return labelSets +func (s *TSDBStore) String() string { + return s.extLsetString } func (s *TSDBStore) TSDBInfos() []infopb.TSDBInfo { @@ -196,7 +212,7 @@ func (s *TSDBStore) TSDBInfos() []infopb.TSDBInfo { return []infopb.TSDBInfo{ { Labels: labelpb.ZLabelSet{ - Labels: labels[0].Labels, + Labels: labelpb.ZLabelsFromPromLabels(labels[0]), }, MinTime: mint, MaxTime: maxt, @@ -208,7 +224,7 @@ func (s *TSDBStore) TimeRange() (int64, int64) { var minTime int64 = math.MinInt64 startTime, err := s.db.StartTime() if err == nil { - // Since we always use tsdb.DB implementation, + // Since we always use tsdb.DB implementation, // StartTime should never return error. minTime = startTime } @@ -293,22 +309,30 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser hasher := hashPool.Get().(hash.Hash64) defer hashPool.Put(hasher) - extLsetToRemove := map[string]struct{}{} + var ( + withoutLabels = make(map[string]struct{}) + lblsScratch []labelpb.ZLabel + ) for _, lbl := range r.WithoutReplicaLabels { - extLsetToRemove[lbl] = struct{}{} + withoutLabels[lbl] = struct{}{} } - finalExtLset := rmLabels(s.extLset.Copy(), extLsetToRemove) - // Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. for set.Next() { series := set.At() - completeLabelset := labelpb.ExtendSortedLabels(rmLabels(series.Labels(), extLsetToRemove), finalExtLset) - if !shardMatcher.MatchesLabels(completeLabelset) { + lbls := buildSeriesLabels( + lblsScratch, + series.Labels(), + s.getExtLabelsSlice(), + withoutLabels, + ) + if !shardMatcher.MatchesZLabels(lbls) { continue } - storeSeries := storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(completeLabelset)} + lblsCopy := make([]labelpb.ZLabel, len(lbls)) + copy(lblsCopy, lbls) + storeSeries := storepb.Series{Labels: lblsCopy} if r.SkipChunks { if err := srv.Send(storepb.NewSeriesResponse(&storeSeries)); err != nil { return status.Error(codes.Aborted, err.Error()) @@ -372,9 +396,46 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser return status.Error(codes.Aborted, err.Error()) } } + return srv.Flush() } +func buildSeriesLabels(lbls []labelpb.ZLabel, seriesLabels labels.Labels, extLabels []labels.Label, withoutLabels map[string]struct{}) []labelpb.ZLabel { + lbls = lbls[:0] + iExt := 0 + seriesLabels.Range(func(intLbl labels.Label) { + extLoop: + for iExt < len(extLabels) { + cmp := strings.Compare(extLabels[iExt].Name, intLbl.Name) + if cmp < 1 { + extLbl := extLabels[iExt] + if _, ok := withoutLabels[extLbl.Name]; !ok { + lbls = append(lbls, labelpb.ZLabel{Name: extLbl.Name, Value: extLbl.Value}) + } + iExt++ + } + // If the current internal label is identical to the external label, move over to + // the next internal label to avoid duplicates. + // Otherwise, move over to adding internal labels. + switch cmp { + case 0: + return + case 1: + break extLoop + } + } + if _, ok := withoutLabels[intLbl.Name]; !ok { + lbls = append(lbls, labelpb.ZLabel{Name: intLbl.Name, Value: intLbl.Value}) + } + }) + for _, l := range extLabels[iExt:] { + if _, ok := withoutLabels[l.Name]; !ok { + lbls = append(lbls, labelpb.ZLabel{Name: l.Name, Value: l.Value}) + } + } + return lbls +} + // LabelNames returns all known label names constrained with the given matchers. func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, @@ -494,3 +555,11 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque return &storepb.LabelValuesResponse{Values: values}, nil } + +func labelsToSlice(lset labels.Labels) []labels.Label { + r := make([]labels.Label, 0, lset.Len()) + lset.Range(func(l labels.Label) { + r = append(r, l) + }) + return r +} diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index cd94fa76e7..a3526eae3b 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -75,29 +75,17 @@ func TestTSDBStore_Series(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - db, err := e2eutil.NewTSDB() - defer func() { testutil.Ok(t, db.Close()) }() - testutil.Ok(t, err) - - tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) - - appender := db.Appender(context.Background()) - - for i := 1; i <= 3; i++ { - _, err = appender.Append(0, labels.FromStrings("a", "1"), int64(i), float64(i)) - testutil.Ok(t, err) - } - err = appender.Commit() - testutil.Ok(t, err) - for _, tc := range []struct { - title string + name string + externalLabels labels.Labels + series []labels.Labels req *storepb.SeriesRequest expectedSeries []rawSeries expectedError string }{ { - title: "total match series", + name: "total match series", + series: []labels.Labels{labels.FromStrings("a", "1")}, req: &storepb.SeriesRequest{ MinTime: 1, MaxTime: 3, @@ -113,7 +101,8 @@ func TestTSDBStore_Series(t *testing.T) { }, }, { - title: "partially match time range series", + name: "partially match time range series", + series: []labels.Labels{labels.FromStrings("a", "1")}, req: &storepb.SeriesRequest{ MinTime: 1, MaxTime: 2, @@ -129,7 +118,8 @@ func TestTSDBStore_Series(t *testing.T) { }, }, { - title: "don't match time range series", + name: "dont't match time range series", + series: []labels.Labels{labels.FromStrings("a", "1")}, req: &storepb.SeriesRequest{ MinTime: 4, MaxTime: 6, @@ -140,7 +130,8 @@ func TestTSDBStore_Series(t *testing.T) { expectedSeries: []rawSeries{}, }, { - title: "only match external label", + name: "only match external label", + series: []labels.Labels{labels.FromStrings("a", "1")}, req: &storepb.SeriesRequest{ MinTime: 1, MaxTime: 3, @@ -151,7 +142,8 @@ func TestTSDBStore_Series(t *testing.T) { expectedError: "rpc error: code = InvalidArgument desc = no matchers specified (excluding external labels)", }, { - title: "don't match labels", + name: "dont't match labels", + series: []labels.Labels{labels.FromStrings("a", "1")}, req: &storepb.SeriesRequest{ MinTime: 1, MaxTime: 3, @@ -162,7 +154,8 @@ func TestTSDBStore_Series(t *testing.T) { expectedSeries: []rawSeries{}, }, { - title: "no chunk", + name: "no chunk", + series: []labels.Labels{labels.FromStrings("a", "1")}, req: &storepb.SeriesRequest{ MinTime: 1, MaxTime: 3, @@ -177,20 +170,122 @@ func TestTSDBStore_Series(t *testing.T) { }, }, }, + { + name: "sharding with ext label at beginning of labelset", + series: []labels.Labels{ + labels.FromStrings("y", "1", "z", "1"), + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "y", Value: ".+"}, + }, + SkipChunks: true, + ShardInfo: &storepb.ShardInfo{ + ShardIndex: 0, + TotalShards: 2, + }, + WithoutReplicaLabels: []string{"ext1"}, + }, + expectedSeries: []rawSeries{ + {lset: labels.FromStrings("region", "eu-west", "y", "1", "z", "1")}, + }, + }, + { + name: "sharding with ext label in middle of labelset", + series: []labels.Labels{ + labels.FromStrings("a", "2", "z", "1"), + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: ".+"}, + }, + SkipChunks: true, + ShardInfo: &storepb.ShardInfo{ + ShardIndex: 1, + TotalShards: 2, + }, + WithoutReplicaLabels: []string{"ext1"}, + }, + expectedSeries: []rawSeries{ + {lset: labels.FromStrings("a", "2", "region", "eu-west", "z", "1")}, + }, + }, + { + name: "sharding with ext label at end of labelset", + series: []labels.Labels{ + labels.FromStrings("a", "1", "b", "1"), + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: ".+"}, + }, + SkipChunks: true, + ShardInfo: &storepb.ShardInfo{ + ShardIndex: 0, + TotalShards: 2, + }, + WithoutReplicaLabels: []string{"ext1"}, + }, + expectedSeries: []rawSeries{ + {lset: labels.FromStrings("a", "1", "b", "1", "region", "eu-west")}, + }, + }, + { + name: "label present both as internal and external, external label preserved", + series: []labels.Labels{ + labels.FromStrings("a", "2", "z", "1", "region", "eu-west-internal"), + }, + externalLabels: labels.FromStrings("a", "ext"), + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "z", Value: ".+"}, + }, + SkipChunks: true, + }, + expectedSeries: []rawSeries{ + {lset: labels.FromStrings("a", "ext", "region", "eu-west", "z", "1")}, + }, + }, } { - if ok := t.Run(tc.title, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { + db, err := e2eutil.NewTSDB() + defer func() { testutil.Ok(t, db.Close()) }() + testutil.Ok(t, err) + + extLset := labelpb.ExtendSortedLabels(labels.FromStrings("region", "eu-west"), tc.externalLabels) + tsdbStore := NewTSDBStore(nil, db, component.Rule, extLset) + + appender := db.Appender(context.Background()) + + for _, s := range tc.series { + for i := 1; i <= 3; i++ { + _, err = appender.Append(0, s, int64(i), float64(i)) + testutil.Ok(t, err) + } + } + err = appender.Commit() + testutil.Ok(t, err) + srv := newStoreSeriesServer(ctx) - err := tsdbStore.Series(tc.req, srv) + err = tsdbStore.Series(tc.req, srv) if len(tc.expectedError) > 0 { testutil.NotOk(t, err) testutil.Equals(t, tc.expectedError, err.Error()) - } else { - testutil.Ok(t, err) - seriesEquals(t, tc.expectedSeries, srv.SeriesSet) + return } - }); !ok { - return - } + + testutil.Ok(t, err) + seriesEquals(t, tc.expectedSeries, srv.SeriesSet) + assertSeriesMatchShard(t, srv.SeriesSet, tc.req.ShardInfo) + }) } }