Skip to content

Commit

Permalink
remove unused Filter error
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Jan 28, 2025
1 parent a9bcdfc commit 26dea3f
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Item struct {
}

type ItemFilter interface {
Filter(source []*Item) ([]*Item, error)
Filter(source []*Item) []*Item
Shutdown() error
TotalLimit() int
LimitByTimestamp() int
Expand Down Expand Up @@ -83,47 +83,40 @@ func (f *itemCardinalityFilter) LimitByTimestamp() int {
return f.limitByTimestamp
}

func (f *itemCardinalityFilter) Filter(sourceItems []*Item) ([]*Item, error) {
func (f *itemCardinalityFilter) Filter(sourceItems []*Item) []*Item {
var filteredItems []*Item
groupedItems := groupByTimestamp(sourceItems)
sortedItemKeys := sortedKeys(groupedItems)

for _, key := range sortedItemKeys {
filteredGroupedItems, err := f.filterItems(groupedItems[key])
if err != nil {
return nil, err
}

filteredItems = append(filteredItems, filteredGroupedItems...)
filteredItems = append(filteredItems, f.filterItems(groupedItems[key])...)
}

return filteredItems, nil
return filteredItems
}

func (f *itemCardinalityFilter) filterItems(items []*Item) ([]*Item, error) {
func (f *itemCardinalityFilter) filterItems(items []*Item) []*Item {
limit := currentLimitByTimestamp{
limitByTimestamp: f.limitByTimestamp,
}

var filteredItems []*Item
for _, item := range items {
if included, err := f.includeItem(item, &limit); err != nil {
return nil, err
} else if included {
if f.includeItem(item, &limit) {
filteredItems = append(filteredItems, item)
}
}

return filteredItems, nil
return filteredItems
}

func (f *itemCardinalityFilter) includeItem(item *Item, limit *currentLimitByTimestamp) (bool, error) {
func (f *itemCardinalityFilter) includeItem(item *Item, limit *currentLimitByTimestamp) bool {
if f.cache.Get(item.SeriesKey) != nil {
return true, nil
return true
}
if !f.canIncludeNewItem(limit.get()) {
f.logger.Debug("Skip item", zap.String("seriesKey", item.SeriesKey), zap.Time("timestamp", item.Timestamp))
return false, nil
return false
}

_ = f.cache.Set(item.SeriesKey, struct{}{}, f.itemActivityPeriod)
Expand All @@ -132,7 +125,7 @@ func (f *itemCardinalityFilter) includeItem(item *Item, limit *currentLimitByTim

limit.dec()

return true, nil
return true
}

func (f *itemCardinalityFilter) canIncludeNewItem(currentLimitByTimestamp int) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,13 @@ func TestItemCardinalityFilter_Filter(t *testing.T) {
filterCasted := filter.(*itemCardinalityFilter)
defer executeShutdown(t, filterCasted)

filteredItems, err := filter.Filter(items)
require.NoError(t, err)
filteredItems := filter.Filter(items)

// Items with key3 and key6 must be not present in filtered items
assertInitialFiltering(t, expectedFilteredInitialItems(t), filteredItems)

items = additionalTestData(t)
filteredItems, err = filter.Filter(items)
require.NoError(t, err)
filteredItems = filter.Filter(items)

// Cache timeout hasn't been reached, so filtered out all items
assert.Empty(t, filteredItems)
Expand All @@ -142,8 +140,7 @@ func TestItemCardinalityFilter_Filter(t *testing.T) {

filterCasted.cache.OnEviction(nil)

filteredItems, err = filter.Filter(items)
require.NoError(t, err)
filteredItems = filter.Filter(items)

// All entries expired, nothing should be filtered out from items
assertInitialFiltering(t, items, filteredItems)
Expand All @@ -160,20 +157,17 @@ func TestItemCardinalityFilter_FilterItems(t *testing.T) {
filterCasted := filter.(*itemCardinalityFilter)
defer executeShutdown(t, filterCasted)

filteredItems, err := filterCasted.filterItems(items)
require.NoError(t, err)
filteredItems := filterCasted.filterItems(items)

// Items with key1 and key2 must be not present in filtered items
assertInitialFiltering(t, expectedFilteredInitialItemsWithSameTimestamp(t), filteredItems)

// 2 new and 2 existing items must be present in filtered items
filteredItems, err = filterCasted.filterItems(items)
require.NoError(t, err)
filteredItems = filterCasted.filterItems(items)

assert.Len(t, filteredItems, totalLimit)

filteredItems, err = filter.Filter(items)
require.NoError(t, err)
filteredItems = filter.Filter(items)

// Cache timeout hasn't been reached, so no more new items expected
assert.Len(t, filteredItems, totalLimit)
Expand All @@ -193,8 +187,7 @@ func TestItemCardinalityFilter_FilterItems(t *testing.T) {

filterCasted.cache.OnEviction(nil)

filteredItems, err = filter.Filter(items)
require.NoError(t, err)
filteredItems = filter.Filter(items)

// All entries expired, same picture as on first case
assertInitialFiltering(t, expectedFilteredInitialItemsWithSameTimestamp(t), filteredItems)
Expand All @@ -216,18 +209,15 @@ func TestItemCardinalityFilter_IncludeItem(t *testing.T) {
limitByTimestamp: 1,
}

result, err := filterCasted.includeItem(item1, timestampLimiter)
require.NoError(t, err)
result := filterCasted.includeItem(item1, timestampLimiter)
assert.True(t, result)

// Item already exists in cache
result, err = filterCasted.includeItem(item1, timestampLimiter)
require.NoError(t, err)
result = filterCasted.includeItem(item1, timestampLimiter)
assert.True(t, result)

// Limit by timestamp reached
result, err = filterCasted.includeItem(item2, timestampLimiter)
require.NoError(t, err)
result = filterCasted.includeItem(item2, timestampLimiter)
assert.False(t, result)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func NewNopItemFilterResolver() ItemFilterResolver {
}
}

func (f *nopItemCardinalityFilter) Filter(sourceItems []*Item) ([]*Item, error) {
return sourceItems, nil
func (f *nopItemCardinalityFilter) Filter(sourceItems []*Item) []*Item {
return sourceItems
}

func (f *nopItemCardinalityFilter) Shutdown() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ func TestNopItemCardinalityFilter_Filter(t *testing.T) {
filter := NewNopItemCardinalityFilter()
sourceItems := []*Item{{}}

filteredItems, err := filter.Filter(sourceItems)
filteredItems := filter.Filter(sourceItems)

require.NoError(t, err)
assert.Equal(t, sourceItems, filteredItems)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type mockFilter struct {
mock.Mock
}

func (f *mockFilter) Filter(source []*filter.Item) ([]*filter.Item, error) {
return source, nil
func (f *mockFilter) Filter(source []*filter.Item) []*filter.Item {
return source
}

func (f *mockFilter) Shutdown() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ func (b *metricsFromDataPointBuilder) filter(metricName string, dataPoints []*Me
}
}

filteredItems, err := itemFilter.Filter(itemsForFiltering)
if err != nil {
return nil, err
}
filteredItems := itemFilter.Filter(itemsForFiltering)

// Creating new slice instead of removing elements from source slice because removing by value is not efficient operation.
// Need to use such approach for preserving data points order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,6 @@ func (r *mockItemFilterResolver) Shutdown() error {
return args.Error(0)
}

type errorFilter struct{}

func (f errorFilter) Filter(_ []*filter.Item) ([]*filter.Item, error) {
return nil, errors.New("error on filter")
}

func (f errorFilter) Shutdown() error {
return nil
}

func (f errorFilter) TotalLimit() int {
return 0
}

func (f errorFilter) LimitByTimestamp() int {
return 0
}

type testData struct {
dataPoints []*MetricsDataPoint
expectedGroupingKeys []MetricsDataPointKey
Expand All @@ -73,46 +55,36 @@ func TestNewMetricsFromDataPointBuilder(t *testing.T) {
func TestMetricsFromDataPointBuilder_Build(t *testing.T) {
testCases := map[string]struct {
metricsDataType pmetric.MetricType
expectedError error
}{
"Gauge": {pmetric.MetricTypeGauge, nil},
"Sum": {pmetric.MetricTypeSum, nil},
"Gauge with filtering error": {pmetric.MetricTypeGauge, errors.New("filtering error")},
"Sum with filtering error": {pmetric.MetricTypeSum, errors.New("filtering error")},
"Gauge": {pmetric.MetricTypeGauge},
"Sum": {pmetric.MetricTypeSum},
"Gauge with filtering error": {pmetric.MetricTypeGauge},
"Sum with filtering error": {pmetric.MetricTypeSum},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
testMetricsFromDataPointBuilderBuild(t, testCase.metricsDataType, testCase.expectedError)
testMetricsFromDataPointBuilderBuild(t, testCase.metricsDataType)
})
}
}

func testMetricsFromDataPointBuilderBuild(t *testing.T, metricDataType pmetric.MetricType, expectedError error) {
func testMetricsFromDataPointBuilderBuild(t *testing.T, metricDataType pmetric.MetricType) {
filterResolver := &mockItemFilterResolver{}
dataForTesting := generateTestData(metricDataType)
builder := &metricsFromDataPointBuilder{filterResolver: filterResolver}
defer executeMockedShutdown(t, builder, filterResolver, expectedError)
defer executeMockedShutdown(t, builder, filterResolver, nil)
expectedGroupingKeysByMetricName := make(map[string]MetricsDataPointKey, len(dataForTesting.expectedGroupingKeys))

for _, expectedGroupingKey := range dataForTesting.expectedGroupingKeys {
expectedGroupingKeysByMetricName[expectedGroupingKey.MetricName] = expectedGroupingKey
}

if expectedError != nil {
filterResolver.On("Resolve").Return(errorFilter{}, nil)
} else {
filterResolver.On("Resolve").Return(filter.NewNopItemCardinalityFilter(), nil)
}
filterResolver.On("Resolve").Return(filter.NewNopItemCardinalityFilter(), nil)

metric, err := builder.Build(dataForTesting.dataPoints)

filterResolver.AssertExpectations(t)

if expectedError != nil {
require.Error(t, err)
return
}
require.NoError(t, err)

assert.Equal(t, len(dataForTesting.dataPoints), metric.DataPointCount())
Expand Down Expand Up @@ -176,11 +148,7 @@ func TestMetricsFromDataPointBuilder_GroupAndFilter(t *testing.T) {
defer executeMockedShutdown(t, builder, filterResolver, testCase.expectedError)
dataForTesting := generateTestData(metricDataType)

if testCase.expectedError != nil {
filterResolver.On("Resolve").Return(errorFilter{}, nil)
} else {
filterResolver.On("Resolve").Return(filter.NewNopItemCardinalityFilter(), testCase.expectedError)
}
filterResolver.On("Resolve").Return(filter.NewNopItemCardinalityFilter(), testCase.expectedError)

groupedDataPoints, err := builder.groupAndFilter(dataForTesting.dataPoints)

Expand Down Expand Up @@ -239,11 +207,7 @@ func TestMetricsFromDataPointBuilder_Filter(t *testing.T) {
}
defer executeMockedShutdown(t, builder, filterResolver, testCase.expectedError)

if testCase.expectedError != nil {
filterResolver.On("Resolve").Return(errorFilter{}, testCase.expectedError)
} else {
filterResolver.On("Resolve").Return(filter.NewNopItemCardinalityFilter(), testCase.expectedError)
}
filterResolver.On("Resolve").Return(filter.NewNopItemCardinalityFilter(), testCase.expectedError)

filteredDataPoints, err := builder.filter(metricName1, dataForTesting.dataPoints)

Expand Down

0 comments on commit 26dea3f

Please sign in to comment.