Skip to content

Commit

Permalink
adapt to breaking changes in ttlcache
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Jan 28, 2025
1 parent 007deca commit 2c0a0b9
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 89 deletions.
2 changes: 1 addition & 1 deletion receiver/googlecloudspannerreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.0

require (
cloud.google.com/go/spanner v1.73.0
github.com/ReneKroon/ttlcache/v3 v3.3.0
github.com/jellydator/ttlcache/v3 v3.3.0
github.com/mitchellh/hashstructure v1.1.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.118.1-0.20250123125445-24f88da7b583
Expand Down
7 changes: 2 additions & 5 deletions receiver/googlecloudspannerreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
package filter // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/filter"

import (
"errors"
"fmt"
"sort"
"sync"
"time"

"github.com/ReneKroon/ttlcache/v2"
"github.com/jellydator/ttlcache/v3"
"go.uber.org/zap"
)

Expand All @@ -36,7 +36,8 @@ type itemCardinalityFilter struct {
limitByTimestamp int
itemActivityPeriod time.Duration
logger *zap.Logger
cache *ttlcache.Cache
cache *ttlcache.Cache[string, struct{}]
stopOnce sync.Once
}

type currentLimitByTimestamp struct {
Expand All @@ -58,10 +59,11 @@ func NewItemCardinalityFilter(metricName string, totalLimit int, limitByTimestam
return nil, fmt.Errorf("total limit %q is lower or equal to limit by timestamp %q", totalLimit, limitByTimestamp)
}

cache := ttlcache.NewCache()

cache.SetCacheSizeLimit(totalLimit)
cache.SkipTTLExtensionOnHit(true)
cache := ttlcache.New[string, struct{}](
ttlcache.WithCapacity[string, struct{}](uint64(totalLimit)),
ttlcache.WithDisableTouchOnHit[string, struct{}](),
)
go cache.Start()

return &itemCardinalityFilter{
metricName: metricName,
Expand Down Expand Up @@ -116,23 +118,15 @@ func (f *itemCardinalityFilter) filterItems(items []*Item) ([]*Item, error) {
}

func (f *itemCardinalityFilter) includeItem(item *Item, limit *currentLimitByTimestamp) (bool, error) {
if _, err := f.cache.Get(item.SeriesKey); err == nil {
if f.cache.Get(item.SeriesKey) != nil {
return true, nil
} else if !errors.Is(err, ttlcache.ErrNotFound) {
return false, err
}

if !f.canIncludeNewItem(limit.get()) {
f.logger.Debug("Skip item", zap.String("seriesKey", item.SeriesKey), zap.Time("timestamp", item.Timestamp))
return false, nil
}

if err := f.cache.SetWithTTL(item.SeriesKey, struct{}{}, f.itemActivityPeriod); err != nil {
if errors.Is(err, ttlcache.ErrClosed) {
err = fmt.Errorf("set item from cache failed for metric %q because cache has been already closed: %w", f.metricName, err)
}
return false, err
}
_ = f.cache.Set(item.SeriesKey, struct{}{}, f.itemActivityPeriod)

f.logger.Debug("Added item to cache", zap.String("seriesKey", item.SeriesKey), zap.Time("timestamp", item.Timestamp))

Expand All @@ -142,11 +136,12 @@ func (f *itemCardinalityFilter) includeItem(item *Item, limit *currentLimitByTim
}

func (f *itemCardinalityFilter) canIncludeNewItem(currentLimitByTimestamp int) bool {
return f.cache.Count() < f.totalLimit && currentLimitByTimestamp > 0
return f.cache.Len() < f.totalLimit && currentLimitByTimestamp > 0
}

func (f *itemCardinalityFilter) Shutdown() error {
return f.cache.Close()
f.stopOnce.Do(func() { f.cache.Stop() })
return nil
}

func groupByTimestamp(items []*Item) map[time.Time][]*Item {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package filter

import (
"context"
"runtime"
"testing"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -82,8 +84,8 @@ func TestItemCardinalityFilter_CanIncludeNewItem(t *testing.T) {
defer executeShutdown(t, filterCasted)

for _, key := range testCase.keysAlreadyInCache {
err = filterCasted.cache.Set(key, byte(1))
require.NoError(t, err)
item := filterCasted.cache.Set(key, struct{}{}, time.Duration(0))
assert.NotNil(t, item)
}

assert.Equal(t, testCase.expectedResult, filterCasted.canIncludeNewItem(testCase.limitByTimestamp))
Expand All @@ -93,33 +95,12 @@ func TestItemCardinalityFilter_CanIncludeNewItem(t *testing.T) {

func TestItemCardinalityFilter_Shutdown(t *testing.T) {
logger := zaptest.NewLogger(t)
testCases := map[string]struct {
closeCache bool
expectError bool
}{
"Happy path": {false, false},
"Cache has been closed": {true, true},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
filter, err := NewItemCardinalityFilter(metricName, totalLimit, limitByTimestamp, itemActivityPeriod, logger)
require.NoError(t, err)
filterCasted := filter.(*itemCardinalityFilter)

if testCase.closeCache {
// Covering case when by some reasons cache is closed
err = filterCasted.cache.Close()
require.NoError(t, err)
}
filter, err := NewItemCardinalityFilter(metricName, totalLimit, limitByTimestamp, itemActivityPeriod, logger)
require.NoError(t, err)

if testCase.expectError {
require.Error(t, filter.Shutdown())
} else {
require.NoError(t, filter.Shutdown())
}
})
}
// Ensure shutdown is safe to be called multiple times.
require.NoError(t, filter.Shutdown())
require.NoError(t, filter.Shutdown())
}

func TestItemCardinalityFilter_Filter(t *testing.T) {
Expand Down Expand Up @@ -149,8 +130,8 @@ func TestItemCardinalityFilter_Filter(t *testing.T) {
// Doing this to avoid of relying on timeouts and sleeps(avoid potential flaky tests)
syncChannel := make(chan bool, 10)

filterCasted.cache.SetExpirationCallback(func(string, any) {
if filterCasted.cache.Count() > 0 {
filterCasted.cache.OnEviction(func(context.Context, ttlcache.EvictionReason, *ttlcache.Item[string, struct{}]) {
if filterCasted.cache.Len() > 0 {
// Waiting until cache is really empty - all items are expired
return
}
Expand All @@ -159,23 +140,13 @@ func TestItemCardinalityFilter_Filter(t *testing.T) {

<-syncChannel

filterCasted.cache.SetExpirationCallback(nil)
filterCasted.cache.OnEviction(nil)

filteredItems, err = filter.Filter(items)
require.NoError(t, err)

// All entries expired, nothing should be filtered out from items
assertInitialFiltering(t, items, filteredItems)

// Test filtering when cache was closed
filter, err = NewItemCardinalityFilter(metricName, totalLimit, limitByTimestamp, itemActivityPeriod, logger)
require.NoError(t, err)
require.NoError(t, filter.Shutdown())

filteredItems, err = filter.Filter(items)

require.Error(t, err)
require.Nil(t, filteredItems)
}

func TestItemCardinalityFilter_FilterItems(t *testing.T) {
Expand Down Expand Up @@ -210,8 +181,8 @@ func TestItemCardinalityFilter_FilterItems(t *testing.T) {
// Doing this to avoid of relying on timeouts and sleeps(avoid potential flaky tests)
syncChannel := make(chan bool, 10)

filterCasted.cache.SetExpirationCallback(func(string, any) {
if filterCasted.cache.Count() > 0 {
filterCasted.cache.OnEviction(func(context.Context, ttlcache.EvictionReason, *ttlcache.Item[string, struct{}]) {
if filterCasted.cache.Len() > 0 {
// Waiting until cache is really empty - all items are expired
return
}
Expand All @@ -220,23 +191,13 @@ func TestItemCardinalityFilter_FilterItems(t *testing.T) {

<-syncChannel

filterCasted.cache.SetExpirationCallback(nil)
filterCasted.cache.OnEviction(nil)

filteredItems, err = filter.Filter(items)
require.NoError(t, err)

// All entries expired, same picture as on first case
assertInitialFiltering(t, expectedFilteredInitialItemsWithSameTimestamp(t), filteredItems)

// Test filtering when cache was closed
filter, err = NewItemCardinalityFilter(metricName, totalLimit, limitByTimestamp, itemActivityPeriod, logger)
require.NoError(t, err)
require.NoError(t, filter.Shutdown())

filteredItems, err = filter.Filter(items)

require.Error(t, err)
require.Nil(t, filteredItems)
}

func TestItemCardinalityFilter_IncludeItem(t *testing.T) {
Expand Down Expand Up @@ -268,15 +229,6 @@ func TestItemCardinalityFilter_IncludeItem(t *testing.T) {
result, err = filterCasted.includeItem(item2, timestampLimiter)
require.NoError(t, err)
assert.False(t, result)

// Test with closed cache - do not need to execute shutdown in this case
filter, err = NewItemCardinalityFilter(metricName, totalLimit, limitByTimestamp, itemActivityPeriod, logger)
require.NoError(t, err)
filterCasted = filter.(*itemCardinalityFilter)
require.NoError(t, filterCasted.cache.Close())
result, err = filterCasted.includeItem(item1, timestampLimiter)
require.Error(t, err)
assert.False(t, result)
}

func TestGroupByTimestamp(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ import (
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("github.com/ReneKroon/ttlcache/v2.(*Cache).checkExpirationCallback"))
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("github.com/jellydator/ttlcache/v3.(*Cache).checkExpirationCallback"))
}

0 comments on commit 2c0a0b9

Please sign in to comment.