Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update module github.com/ReneKroon/ttlcache/v2 to v3 #37524

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion receiver/googlecloudspannerreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.0

require (
cloud.google.com/go/spanner v1.73.0
github.com/ReneKroon/ttlcache/v2 v2.11.0
github.com/jellydator/ttlcache/v3 v3.3.0
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.118.1-0.20250123125445-24f88da7b583
Expand Down
7 changes: 2 additions & 5 deletions receiver/googlecloudspannerreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
package filter // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/filter"

import (
"errors"
"fmt"
"sort"
"sync"
"time"

"github.com/ReneKroon/ttlcache/v2"
"github.com/jellydator/ttlcache/v3"
"go.uber.org/zap"
)

Expand All @@ -19,7 +19,7 @@ type Item struct {
}

type ItemFilter interface {
Filter(source []*Item) ([]*Item, error)
Filter(source []*Item) []*Item
Shutdown() error
TotalLimit() int
LimitByTimestamp() int
Expand All @@ -36,7 +36,8 @@ type itemCardinalityFilter struct {
limitByTimestamp int
itemActivityPeriod time.Duration
logger *zap.Logger
cache *ttlcache.Cache
cache *ttlcache.Cache[string, struct{}]
stopOnce sync.Once
}

type currentLimitByTimestamp struct {
Expand All @@ -58,10 +59,11 @@ func NewItemCardinalityFilter(metricName string, totalLimit int, limitByTimestam
return nil, fmt.Errorf("total limit %q is lower or equal to limit by timestamp %q", totalLimit, limitByTimestamp)
}

cache := ttlcache.NewCache()

cache.SetCacheSizeLimit(totalLimit)
cache.SkipTTLExtensionOnHit(true)
cache := ttlcache.New[string, struct{}](
ttlcache.WithCapacity[string, struct{}](uint64(totalLimit)),
ttlcache.WithDisableTouchOnHit[string, struct{}](),
)
go cache.Start()

return &itemCardinalityFilter{
metricName: metricName,
Expand All @@ -81,72 +83,58 @@ func (f *itemCardinalityFilter) LimitByTimestamp() int {
return f.limitByTimestamp
}

func (f *itemCardinalityFilter) Filter(sourceItems []*Item) ([]*Item, error) {
func (f *itemCardinalityFilter) Filter(sourceItems []*Item) []*Item {
var filteredItems []*Item
groupedItems := groupByTimestamp(sourceItems)
sortedItemKeys := sortedKeys(groupedItems)

for _, key := range sortedItemKeys {
filteredGroupedItems, err := f.filterItems(groupedItems[key])
if err != nil {
return nil, err
}

filteredItems = append(filteredItems, filteredGroupedItems...)
filteredItems = append(filteredItems, f.filterItems(groupedItems[key])...)
}

return filteredItems, nil
return filteredItems
}

func (f *itemCardinalityFilter) filterItems(items []*Item) ([]*Item, error) {
func (f *itemCardinalityFilter) filterItems(items []*Item) []*Item {
limit := currentLimitByTimestamp{
limitByTimestamp: f.limitByTimestamp,
}

var filteredItems []*Item
for _, item := range items {
if included, err := f.includeItem(item, &limit); err != nil {
return nil, err
} else if included {
if f.includeItem(item, &limit) {
filteredItems = append(filteredItems, item)
}
}

return filteredItems, nil
return filteredItems
}

func (f *itemCardinalityFilter) includeItem(item *Item, limit *currentLimitByTimestamp) (bool, error) {
if _, err := f.cache.Get(item.SeriesKey); err == nil {
return true, nil
} else if !errors.Is(err, ttlcache.ErrNotFound) {
return false, err
func (f *itemCardinalityFilter) includeItem(item *Item, limit *currentLimitByTimestamp) bool {
if f.cache.Get(item.SeriesKey) != nil {
return true
}

if !f.canIncludeNewItem(limit.get()) {
f.logger.Debug("Skip item", zap.String("seriesKey", item.SeriesKey), zap.Time("timestamp", item.Timestamp))
return false, nil
return false
}

if err := f.cache.SetWithTTL(item.SeriesKey, struct{}{}, f.itemActivityPeriod); err != nil {
if errors.Is(err, ttlcache.ErrClosed) {
err = fmt.Errorf("set item from cache failed for metric %q because cache has been already closed: %w", f.metricName, err)
}
return false, err
}
_ = f.cache.Set(item.SeriesKey, struct{}{}, f.itemActivityPeriod)

f.logger.Debug("Added item to cache", zap.String("seriesKey", item.SeriesKey), zap.Time("timestamp", item.Timestamp))

limit.dec()

return true, nil
return true
}

func (f *itemCardinalityFilter) canIncludeNewItem(currentLimitByTimestamp int) bool {
return f.cache.Count() < f.totalLimit && currentLimitByTimestamp > 0
return f.cache.Len() < f.totalLimit && currentLimitByTimestamp > 0
}

func (f *itemCardinalityFilter) Shutdown() error {
return f.cache.Close()
f.stopOnce.Do(func() { f.cache.Stop() })
return nil
}

func groupByTimestamp(items []*Item) map[time.Time][]*Item {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package filter

import (
"context"
"runtime"
"testing"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -82,8 +84,8 @@ func TestItemCardinalityFilter_CanIncludeNewItem(t *testing.T) {
defer executeShutdown(t, filterCasted)

for _, key := range testCase.keysAlreadyInCache {
err = filterCasted.cache.Set(key, byte(1))
require.NoError(t, err)
item := filterCasted.cache.Set(key, struct{}{}, time.Duration(0))
assert.NotNil(t, item)
}

assert.Equal(t, testCase.expectedResult, filterCasted.canIncludeNewItem(testCase.limitByTimestamp))
Expand All @@ -93,33 +95,12 @@ func TestItemCardinalityFilter_CanIncludeNewItem(t *testing.T) {

func TestItemCardinalityFilter_Shutdown(t *testing.T) {
logger := zaptest.NewLogger(t)
testCases := map[string]struct {
closeCache bool
expectError bool
}{
"Happy path": {false, false},
"Cache has been closed": {true, true},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
filter, err := NewItemCardinalityFilter(metricName, totalLimit, limitByTimestamp, itemActivityPeriod, logger)
require.NoError(t, err)
filterCasted := filter.(*itemCardinalityFilter)

if testCase.closeCache {
// Covering case when by some reasons cache is closed
err = filterCasted.cache.Close()
require.NoError(t, err)
}
filter, err := NewItemCardinalityFilter(metricName, totalLimit, limitByTimestamp, itemActivityPeriod, logger)
require.NoError(t, err)

if testCase.expectError {
require.Error(t, filter.Shutdown())
} else {
require.NoError(t, filter.Shutdown())
}
})
}
// Ensure shutdown is safe to be called multiple times.
require.NoError(t, filter.Shutdown())
require.NoError(t, filter.Shutdown())
}

func TestItemCardinalityFilter_Filter(t *testing.T) {
Expand All @@ -133,24 +114,22 @@ func TestItemCardinalityFilter_Filter(t *testing.T) {
filterCasted := filter.(*itemCardinalityFilter)
defer executeShutdown(t, filterCasted)

filteredItems, err := filter.Filter(items)
require.NoError(t, err)
filteredItems := filter.Filter(items)

// Items with key3 and key6 must be not present in filtered items
assertInitialFiltering(t, expectedFilteredInitialItems(t), filteredItems)

items = additionalTestData(t)
filteredItems, err = filter.Filter(items)
require.NoError(t, err)
filteredItems = filter.Filter(items)

// Cache timeout hasn't been reached, so filtered out all items
assert.Empty(t, filteredItems)

// Doing this to avoid of relying on timeouts and sleeps(avoid potential flaky tests)
syncChannel := make(chan bool, 10)

filterCasted.cache.SetExpirationCallback(func(string, any) {
if filterCasted.cache.Count() > 0 {
filterCasted.cache.OnEviction(func(context.Context, ttlcache.EvictionReason, *ttlcache.Item[string, struct{}]) {
if filterCasted.cache.Len() > 0 {
// Waiting until cache is really empty - all items are expired
return
}
Expand All @@ -159,23 +138,10 @@ func TestItemCardinalityFilter_Filter(t *testing.T) {

<-syncChannel

filterCasted.cache.SetExpirationCallback(nil)

filteredItems, err = filter.Filter(items)
require.NoError(t, err)
filteredItems = filter.Filter(items)

// All entries expired, nothing should be filtered out from items
assertInitialFiltering(t, items, filteredItems)

// Test filtering when cache was closed
filter, err = NewItemCardinalityFilter(metricName, totalLimit, limitByTimestamp, itemActivityPeriod, logger)
require.NoError(t, err)
require.NoError(t, filter.Shutdown())

filteredItems, err = filter.Filter(items)

require.Error(t, err)
require.Nil(t, filteredItems)
}

func TestItemCardinalityFilter_FilterItems(t *testing.T) {
Expand All @@ -189,29 +155,26 @@ func TestItemCardinalityFilter_FilterItems(t *testing.T) {
filterCasted := filter.(*itemCardinalityFilter)
defer executeShutdown(t, filterCasted)

filteredItems, err := filterCasted.filterItems(items)
require.NoError(t, err)
filteredItems := filterCasted.filterItems(items)

// Items with key1 and key2 must be not present in filtered items
assertInitialFiltering(t, expectedFilteredInitialItemsWithSameTimestamp(t), filteredItems)

// 2 new and 2 existing items must be present in filtered items
filteredItems, err = filterCasted.filterItems(items)
require.NoError(t, err)
filteredItems = filterCasted.filterItems(items)

assert.Len(t, filteredItems, totalLimit)

filteredItems, err = filter.Filter(items)
require.NoError(t, err)
filteredItems = filter.Filter(items)

// Cache timeout hasn't been reached, so no more new items expected
assert.Len(t, filteredItems, totalLimit)

// Doing this to avoid of relying on timeouts and sleeps(avoid potential flaky tests)
syncChannel := make(chan bool, 10)

filterCasted.cache.SetExpirationCallback(func(string, any) {
if filterCasted.cache.Count() > 0 {
filterCasted.cache.OnEviction(func(context.Context, ttlcache.EvictionReason, *ttlcache.Item[string, struct{}]) {
if filterCasted.cache.Len() > 0 {
// Waiting until cache is really empty - all items are expired
return
}
Expand All @@ -220,23 +183,10 @@ func TestItemCardinalityFilter_FilterItems(t *testing.T) {

<-syncChannel

filterCasted.cache.SetExpirationCallback(nil)

filteredItems, err = filter.Filter(items)
require.NoError(t, err)
filteredItems = filter.Filter(items)

// All entries expired, same picture as on first case
assertInitialFiltering(t, expectedFilteredInitialItemsWithSameTimestamp(t), filteredItems)

// Test filtering when cache was closed
filter, err = NewItemCardinalityFilter(metricName, totalLimit, limitByTimestamp, itemActivityPeriod, logger)
require.NoError(t, err)
require.NoError(t, filter.Shutdown())

filteredItems, err = filter.Filter(items)

require.Error(t, err)
require.Nil(t, filteredItems)
}

func TestItemCardinalityFilter_IncludeItem(t *testing.T) {
Expand All @@ -255,27 +205,15 @@ func TestItemCardinalityFilter_IncludeItem(t *testing.T) {
limitByTimestamp: 1,
}

result, err := filterCasted.includeItem(item1, timestampLimiter)
require.NoError(t, err)
result := filterCasted.includeItem(item1, timestampLimiter)
assert.True(t, result)

// Item already exists in cache
result, err = filterCasted.includeItem(item1, timestampLimiter)
require.NoError(t, err)
result = filterCasted.includeItem(item1, timestampLimiter)
assert.True(t, result)

// Limit by timestamp reached
result, err = filterCasted.includeItem(item2, timestampLimiter)
require.NoError(t, err)
assert.False(t, result)

// Test with closed cache - do not need to execute shutdown in this case
filter, err = NewItemCardinalityFilter(metricName, totalLimit, limitByTimestamp, itemActivityPeriod, logger)
require.NoError(t, err)
filterCasted = filter.(*itemCardinalityFilter)
require.NoError(t, filterCasted.cache.Close())
result, err = filterCasted.includeItem(item1, timestampLimiter)
require.Error(t, err)
result = filterCasted.includeItem(item2, timestampLimiter)
assert.False(t, result)
}

Expand Down
Loading