From 765fc003c7d2228274f1ec3a8511ca2e64838325 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 3 Mar 2025 16:02:54 +0000 Subject: [PATCH 01/15] Bump SonarSource/sonarcloud-github-action from 4.0.0 to 5.0.0 Bumps [SonarSource/sonarcloud-github-action](https://github.com/sonarsource/sonarcloud-github-action) from 4.0.0 to 5.0.0. - [Release notes](https://github.com/sonarsource/sonarcloud-github-action/releases) - [Commits](https://github.com/sonarsource/sonarcloud-github-action/compare/v4.0.0...v5.0.0) --- updated-dependencies: - dependency-name: SonarSource/sonarcloud-github-action dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3992b126..61b23f38 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: - name: SonarQube Scan (Push) if: ${{ github.event_name == 'push' }} - uses: SonarSource/sonarcloud-github-action@v4.0.0 + uses: SonarSource/sonarcloud-github-action@v5.0.0 env: SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -46,7 +46,7 @@ jobs: - name: SonarQube Scan (Pull Request) if: ${{ github.event_name == 'pull_request' }} - uses: SonarSource/sonarcloud-github-action@v4.0.0 + uses: SonarSource/sonarcloud-github-action@v5.0.0 env: SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} From 9a55959c28a89ee80a7c3e880667901e7a3cc071 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Mon, 17 Mar 2025 13:07:40 -0300 Subject: [PATCH 02/15] adding uniquekeys storage --- provisional/impmanager_test.go | 494 +++++++++--------- provisional/strategy/none_test.go | 156 +++--- provisional/strategy/uniquekeystracker.go | 57 +- .../strategy/uniquekeystracker_test.go | 64 +-- storage/inmemory/mutexmap/uniquekeys.go | 99 ++++ storage/interfaces.go | 13 + storage/mocks/uniquekeys.go | 22 +- synchronizer/synchronizer.go | 3 + synchronizer/synchronizer_test.go | 13 +- tasks/telemetrysync_test.go | 1 + tasks/uniquekeyssync.go | 6 +- tasks/uniquekeyssync_test.go | 196 +++---- telemetry/interface.go | 4 +- telemetry/memory.go | 7 +- telemetry/memory_test.go | 8 +- telemetry/redis.go | 19 +- telemetry/redis_test.go | 2 +- 17 files changed, 626 insertions(+), 538 deletions(-) create mode 100644 storage/inmemory/mutexmap/uniquekeys.go diff --git a/provisional/impmanager_test.go b/provisional/impmanager_test.go index 9847f880..9bc1ac10 100644 --- a/provisional/impmanager_test.go +++ b/provisional/impmanager_test.go @@ -1,249 +1,249 @@ package provisional -import ( - "testing" - "time" - - "github.com/splitio/go-split-commons/v6/dtos" - "github.com/splitio/go-split-commons/v6/provisional/strategy" - "github.com/splitio/go-split-commons/v6/storage/filter" - "github.com/splitio/go-split-commons/v6/storage/inmemory" - "github.com/splitio/go-split-commons/v6/telemetry" -) - -func TestImpManagerInMemoryDebugListenerDisabled(t *testing.T) { - observer, _ := strategy.NewImpressionObserver(5000) - debug := strategy.NewDebugImpl(observer, false) - impManager := NewImpressionManager(debug) - - now := time.Now().UTC().UnixNano() - imp1 := &dtos.Impression{ - BucketingKey: "someBucketingKey", - ChangeNumber: 123456789, - FeatureName: "someFeature", - KeyName: "someKey", - Label: "someLabel", - Time: now, - Treatment: "someTreatment", - } - - impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) - if len(impressionsForListener) != 0 { - t.Error("It should not return an impression") - } - if len(impressionsForLog) != 1 { - t.Error("It should return an impression") - } - if impressionsForLog[0].Pt != 0 { - t.Error("It should not have pt associated yet") - } - - impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) - if len(impressionsForListener) != 0 || len(impressionsForLog) != 1 { - t.Error("It should return an impression") - } -} - -func TestImpManagerInMemoryDebug(t *testing.T) { - observer, _ := strategy.NewImpressionObserver(5000) - debug := strategy.NewDebugImpl(observer, true) - impManager := NewImpressionManager(debug) - - now := time.Now().UTC().UnixNano() - imp1 := &dtos.Impression{ - BucketingKey: "someBucketingKey", - ChangeNumber: 123456789, - FeatureName: "someFeature", - KeyName: "someKey", - Label: "someLabel", - Time: now, - Treatment: "someTreatment", - } - - impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) - if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { - t.Error("It should return an impression") - } - if impressionsForListener[0].Pt != 0 { - t.Error("It should not have pt associated yet") - } - - impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) - if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { - t.Error("It should return an impression") - } - if impressionsForListener[0].Pt != now { - t.Error("It should have pt associated") - } -} - -func TestImpManagerInMemoryOptimized(t *testing.T) { - runtimeTelemetry, _ := inmemory.NewTelemetryStorage() - counter := strategy.NewImpressionsCounter() - observer, _ := strategy.NewImpressionObserver(5000) - optimized := strategy.NewOptimizedImpl(observer, counter, runtimeTelemetry, true) - impManager := NewImpressionManager(optimized) - - now := time.Now().UTC().UnixNano() - imp1 := &dtos.Impression{ - BucketingKey: "someBucketingKey", - ChangeNumber: 123456789, - FeatureName: "someFeature", - KeyName: "someKey", - Label: "someLabel", - Time: now, - Treatment: "someTreatment", - } - - impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) - if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { - t.Error("It should return an impression") - } - if impressionsForListener[0].Pt != 0 { - t.Error("It should not have pt associated yet") - } - - impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) - if len(impressionsForListener) != 1 { - t.Error("It should return an impression") - } - if len(impressionsForLog) != 0 { - t.Error("It should not return an impression") - } - if impressionsForListener[0].Pt != now { - t.Error("It should have pt associated") - } - - if runtimeTelemetry.GetImpressionsStats(telemetry.ImpressionsDeduped) != 1 { - t.Error("It should be 1") - } -} - -func TestImpManagerInMemoryNone(t *testing.T) { - counter := strategy.NewImpressionsCounter() - filter := filter.NewBloomFilter(3000, 0.01) - uniqueTracker := strategy.NewUniqueKeysTracker(filter) - none := strategy.NewNoneImpl(counter, uniqueTracker, true) - impManager := NewImpressionManager(none) - - now := time.Now().UTC().UnixNano() - imp1 := &dtos.Impression{ - BucketingKey: "someBucketingKey", - ChangeNumber: 123456789, - FeatureName: "someFeature", - KeyName: "someKey", - Label: "someLabel", - Time: now, - Treatment: "someTreatment", - } - - impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) - if len(impressionsForListener) != 1 { - t.Error("It should return an impression") - } - if len(impressionsForLog) != 0 { - t.Error("It should not return an impression") - } - - if impressionsForListener[0].Pt != 0 { - t.Error("It should not have pt associated") - } -} - -func TestImpManagerRedis(t *testing.T) { - observer, _ := strategy.NewImpressionObserver(5000) - debug := strategy.NewDebugImpl(observer, true) - impManager := NewImpressionManager(debug) - - now := time.Now().UTC().UnixNano() - imp1 := &dtos.Impression{ - BucketingKey: "someBucketingKey", - ChangeNumber: 123456789, - FeatureName: "someFeature", - KeyName: "someKey", - Label: "someLabel", - Time: now, - Treatment: "someTreatment", - } - - impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) - if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { - t.Error("It should return an impression") - } - if impressionsForListener[0].Pt != 0 { - t.Error("It should not have pt associated") - } - - impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) - if len(impressionsForListener) != 1 { - t.Error("It should return an impression") - } - if len(impressionsForLog) != 1 { - t.Error("It should return an impression") - } - if impressionsForListener[0].Pt == 0 { - t.Error("It should have pt") - } -} - -func TestProcess(t *testing.T) { - observer, _ := strategy.NewImpressionObserver(5000) - debug := strategy.NewDebugImpl(observer, true) - filter := filter.NewBloomFilter(3000, 0.01) - uniqueTracker := strategy.NewUniqueKeysTracker(filter) - counter := strategy.NewImpressionsCounter() - none := strategy.NewNoneImpl(counter, uniqueTracker, false) - - now := time.Now().UTC().UnixNano() - impressions := []dtos.Impression{ - { - BucketingKey: "someBucketingKey", - ChangeNumber: 123456789, - FeatureName: "someFeature", - KeyName: "someKey", - Label: "someLabel", - Time: now, - Treatment: "someTreatment", - Disabled: true, - }, - { - BucketingKey: "someBucketingKey", - ChangeNumber: 123456789, - FeatureName: "harnessFlag", - KeyName: "someKey", - Label: "someLabel", - Time: now, - Treatment: "someTreatment", - Disabled: true, - }, - { - BucketingKey: "someBucketingKey", - ChangeNumber: 123456789, - FeatureName: "featureTest", - KeyName: "someKey", - Label: "someLabel", - Time: now, - Treatment: "someTreatment", - Disabled: false, - }, - } - - impManager := NewImpressionManagerImp(none, debug) - impressionsForLog, impressionsForListener := impManager.Process(impressions, true) - if len(impressionsForListener) != 3 { - t.Error("Impressions for Listener should be 3. Actual: ", len(impressionsForListener)) - } - if len(impressionsForLog) != 1 { - t.Error("Impressions for Log should be 3. Actual: ", len(impressionsForLog)) - } - - impManager = NewImpressionManagerImp(none, none) - - impressionsForLog, impressionsForListener = impManager.Process(impressions, false) - if len(impressionsForListener) != 0 { - t.Error("Impressions for Listener should be 0. Actual: ", len(impressionsForListener)) - } - if len(impressionsForLog) != 0 { - t.Error("Impressions for Log should be 1. Actual: ", len(impressionsForLog)) - } -} +// import ( +// "testing" +// "time" + +// "github.com/splitio/go-split-commons/v6/dtos" +// "github.com/splitio/go-split-commons/v6/provisional/strategy" +// "github.com/splitio/go-split-commons/v6/storage/filter" +// "github.com/splitio/go-split-commons/v6/storage/inmemory" +// "github.com/splitio/go-split-commons/v6/telemetry" +// ) + +// func TestImpManagerInMemoryDebugListenerDisabled(t *testing.T) { +// observer, _ := strategy.NewImpressionObserver(5000) +// debug := strategy.NewDebugImpl(observer, false) +// impManager := NewImpressionManager(debug) + +// now := time.Now().UTC().UnixNano() +// imp1 := &dtos.Impression{ +// BucketingKey: "someBucketingKey", +// ChangeNumber: 123456789, +// FeatureName: "someFeature", +// KeyName: "someKey", +// Label: "someLabel", +// Time: now, +// Treatment: "someTreatment", +// } + +// impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) +// if len(impressionsForListener) != 0 { +// t.Error("It should not return an impression") +// } +// if len(impressionsForLog) != 1 { +// t.Error("It should return an impression") +// } +// if impressionsForLog[0].Pt != 0 { +// t.Error("It should not have pt associated yet") +// } + +// impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) +// if len(impressionsForListener) != 0 || len(impressionsForLog) != 1 { +// t.Error("It should return an impression") +// } +// } + +// func TestImpManagerInMemoryDebug(t *testing.T) { +// observer, _ := strategy.NewImpressionObserver(5000) +// debug := strategy.NewDebugImpl(observer, true) +// impManager := NewImpressionManager(debug) + +// now := time.Now().UTC().UnixNano() +// imp1 := &dtos.Impression{ +// BucketingKey: "someBucketingKey", +// ChangeNumber: 123456789, +// FeatureName: "someFeature", +// KeyName: "someKey", +// Label: "someLabel", +// Time: now, +// Treatment: "someTreatment", +// } + +// impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) +// if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { +// t.Error("It should return an impression") +// } +// if impressionsForListener[0].Pt != 0 { +// t.Error("It should not have pt associated yet") +// } + +// impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) +// if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { +// t.Error("It should return an impression") +// } +// if impressionsForListener[0].Pt != now { +// t.Error("It should have pt associated") +// } +// } + +// func TestImpManagerInMemoryOptimized(t *testing.T) { +// runtimeTelemetry, _ := inmemory.NewTelemetryStorage() +// counter := strategy.NewImpressionsCounter() +// observer, _ := strategy.NewImpressionObserver(5000) +// optimized := strategy.NewOptimizedImpl(observer, counter, runtimeTelemetry, true) +// impManager := NewImpressionManager(optimized) + +// now := time.Now().UTC().UnixNano() +// imp1 := &dtos.Impression{ +// BucketingKey: "someBucketingKey", +// ChangeNumber: 123456789, +// FeatureName: "someFeature", +// KeyName: "someKey", +// Label: "someLabel", +// Time: now, +// Treatment: "someTreatment", +// } + +// impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) +// if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { +// t.Error("It should return an impression") +// } +// if impressionsForListener[0].Pt != 0 { +// t.Error("It should not have pt associated yet") +// } + +// impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) +// if len(impressionsForListener) != 1 { +// t.Error("It should return an impression") +// } +// if len(impressionsForLog) != 0 { +// t.Error("It should not return an impression") +// } +// if impressionsForListener[0].Pt != now { +// t.Error("It should have pt associated") +// } + +// if runtimeTelemetry.GetImpressionsStats(telemetry.ImpressionsDeduped) != 1 { +// t.Error("It should be 1") +// } +// } + +// func TestImpManagerInMemoryNone(t *testing.T) { +// counter := strategy.NewImpressionsCounter() +// filter := filter.NewBloomFilter(3000, 0.01) +// uniqueTracker := strategy.NewUniqueKeysTracker(filter) +// none := strategy.NewNoneImpl(counter, uniqueTracker, true) +// impManager := NewImpressionManager(none) + +// now := time.Now().UTC().UnixNano() +// imp1 := &dtos.Impression{ +// BucketingKey: "someBucketingKey", +// ChangeNumber: 123456789, +// FeatureName: "someFeature", +// KeyName: "someKey", +// Label: "someLabel", +// Time: now, +// Treatment: "someTreatment", +// } + +// impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) +// if len(impressionsForListener) != 1 { +// t.Error("It should return an impression") +// } +// if len(impressionsForLog) != 0 { +// t.Error("It should not return an impression") +// } + +// if impressionsForListener[0].Pt != 0 { +// t.Error("It should not have pt associated") +// } +// } + +// func TestImpManagerRedis(t *testing.T) { +// observer, _ := strategy.NewImpressionObserver(5000) +// debug := strategy.NewDebugImpl(observer, true) +// impManager := NewImpressionManager(debug) + +// now := time.Now().UTC().UnixNano() +// imp1 := &dtos.Impression{ +// BucketingKey: "someBucketingKey", +// ChangeNumber: 123456789, +// FeatureName: "someFeature", +// KeyName: "someKey", +// Label: "someLabel", +// Time: now, +// Treatment: "someTreatment", +// } + +// impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) +// if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { +// t.Error("It should return an impression") +// } +// if impressionsForListener[0].Pt != 0 { +// t.Error("It should not have pt associated") +// } + +// impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) +// if len(impressionsForListener) != 1 { +// t.Error("It should return an impression") +// } +// if len(impressionsForLog) != 1 { +// t.Error("It should return an impression") +// } +// if impressionsForListener[0].Pt == 0 { +// t.Error("It should have pt") +// } +// } + +// func TestProcess(t *testing.T) { +// observer, _ := strategy.NewImpressionObserver(5000) +// debug := strategy.NewDebugImpl(observer, true) +// filter := filter.NewBloomFilter(3000, 0.01) +// uniqueTracker := strategy.NewUniqueKeysTracker(filter) +// counter := strategy.NewImpressionsCounter() +// none := strategy.NewNoneImpl(counter, uniqueTracker, false) + +// now := time.Now().UTC().UnixNano() +// impressions := []dtos.Impression{ +// { +// BucketingKey: "someBucketingKey", +// ChangeNumber: 123456789, +// FeatureName: "someFeature", +// KeyName: "someKey", +// Label: "someLabel", +// Time: now, +// Treatment: "someTreatment", +// Disabled: true, +// }, +// { +// BucketingKey: "someBucketingKey", +// ChangeNumber: 123456789, +// FeatureName: "harnessFlag", +// KeyName: "someKey", +// Label: "someLabel", +// Time: now, +// Treatment: "someTreatment", +// Disabled: true, +// }, +// { +// BucketingKey: "someBucketingKey", +// ChangeNumber: 123456789, +// FeatureName: "featureTest", +// KeyName: "someKey", +// Label: "someLabel", +// Time: now, +// Treatment: "someTreatment", +// Disabled: false, +// }, +// } + +// impManager := NewImpressionManagerImp(none, debug) +// impressionsForLog, impressionsForListener := impManager.Process(impressions, true) +// if len(impressionsForListener) != 3 { +// t.Error("Impressions for Listener should be 3. Actual: ", len(impressionsForListener)) +// } +// if len(impressionsForLog) != 1 { +// t.Error("Impressions for Log should be 3. Actual: ", len(impressionsForLog)) +// } + +// impManager = NewImpressionManagerImp(none, none) + +// impressionsForLog, impressionsForListener = impManager.Process(impressions, false) +// if len(impressionsForListener) != 0 { +// t.Error("Impressions for Listener should be 0. Actual: ", len(impressionsForListener)) +// } +// if len(impressionsForLog) != 0 { +// t.Error("Impressions for Log should be 1. Actual: ", len(impressionsForLog)) +// } +// } diff --git a/provisional/strategy/none_test.go b/provisional/strategy/none_test.go index ee90b59b..56534e1c 100644 --- a/provisional/strategy/none_test.go +++ b/provisional/strategy/none_test.go @@ -1,80 +1,80 @@ package strategy -import ( - "testing" - "time" - - "github.com/splitio/go-split-commons/v6/dtos" - "github.com/splitio/go-split-commons/v6/storage/filter" - "github.com/splitio/go-split-commons/v6/util" -) - -func TestNoneMode(t *testing.T) { - now := time.Now().UTC().UnixNano() - filter := filter.NewBloomFilter(1000, 0.01) - tracker := NewUniqueKeysTracker(filter) - counter := NewImpressionsCounter() - none := NewNoneImpl(counter, tracker, true) - - imp := dtos.Impression{ - BucketingKey: "someBuck", - ChangeNumber: 123, - KeyName: "someKey", - Label: "someLabel", - Time: now, - Treatment: "on", - FeatureName: "feature-test", - } - - toLog, toListener := none.Apply([]dtos.Impression{imp}) - - if len(toLog) != 0 || len(toListener) != 1 { - t.Error("Should not have to log") - } - - toLog, toListener = none.Apply([]dtos.Impression{imp}) - - if len(toLog) != 0 || len(toListener) != 1 { - t.Error("Should not have to log") - } - - counts := counter.PopAll() - value := counts[Key{ - FeatureName: imp.FeatureName, - TimeFrame: util.TruncateTimeFrame(now), - }] - - if value != 2 { - t.Error("Should be 2") - } -} - -func TestApplySingleNone(t *testing.T) { - now := time.Now().UTC().UnixNano() - filter := filter.NewBloomFilter(1000, 0.01) - tracker := NewUniqueKeysTracker(filter) - counter := NewImpressionsCounter() - none := NewNoneImpl(counter, tracker, true) - - imp := dtos.Impression{ - BucketingKey: "someBuck", - ChangeNumber: 123, - KeyName: "someKey", - Label: "someLabel", - Time: now, - Treatment: "on", - FeatureName: "feature-test", - } - - toLog := none.ApplySingle(&imp) - - if toLog { - t.Error("Should be false") - } - - toLog = none.ApplySingle(&imp) - - if toLog { - t.Error("Should be false") - } -} +// import ( +// "testing" +// "time" + +// "github.com/splitio/go-split-commons/v6/dtos" +// "github.com/splitio/go-split-commons/v6/storage/filter" +// "github.com/splitio/go-split-commons/v6/util" +// ) + +// func TestNoneMode(t *testing.T) { +// now := time.Now().UTC().UnixNano() +// filter := filter.NewBloomFilter(1000, 0.01) +// tracker := NewUniqueKeysTracker(filter) +// counter := NewImpressionsCounter() +// none := NewNoneImpl(counter, tracker, true) + +// imp := dtos.Impression{ +// BucketingKey: "someBuck", +// ChangeNumber: 123, +// KeyName: "someKey", +// Label: "someLabel", +// Time: now, +// Treatment: "on", +// FeatureName: "feature-test", +// } + +// toLog, toListener := none.Apply([]dtos.Impression{imp}) + +// if len(toLog) != 0 || len(toListener) != 1 { +// t.Error("Should not have to log") +// } + +// toLog, toListener = none.Apply([]dtos.Impression{imp}) + +// if len(toLog) != 0 || len(toListener) != 1 { +// t.Error("Should not have to log") +// } + +// counts := counter.PopAll() +// value := counts[Key{ +// FeatureName: imp.FeatureName, +// TimeFrame: util.TruncateTimeFrame(now), +// }] + +// if value != 2 { +// t.Error("Should be 2") +// } +// } + +// func TestApplySingleNone(t *testing.T) { +// now := time.Now().UTC().UnixNano() +// filter := filter.NewBloomFilter(1000, 0.01) +// tracker := NewUniqueKeysTracker(filter) +// counter := NewImpressionsCounter() +// none := NewNoneImpl(counter, tracker, true) + +// imp := dtos.Impression{ +// BucketingKey: "someBuck", +// ChangeNumber: 123, +// KeyName: "someKey", +// Label: "someLabel", +// Time: now, +// Treatment: "on", +// FeatureName: "feature-test", +// } + +// toLog := none.ApplySingle(&imp) + +// if toLog { +// t.Error("Should be false") +// } + +// toLog = none.ApplySingle(&imp) + +// if toLog { +// t.Error("Should be false") +// } +// } diff --git a/provisional/strategy/uniquekeystracker.go b/provisional/strategy/uniquekeystracker.go index acae45a5..86b6dae4 100644 --- a/provisional/strategy/uniquekeystracker.go +++ b/provisional/strategy/uniquekeystracker.go @@ -1,32 +1,26 @@ package strategy import ( - "sync" - - "github.com/splitio/go-split-commons/v6/dtos" "github.com/splitio/go-split-commons/v6/storage" - "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap" ) // UniqueKeysTracker interface type UniqueKeysTracker interface { Track(featureName string, key string) bool - PopAll() dtos.Uniques } // UniqueKeysTrackerImpl description type UniqueKeysTrackerImpl struct { filter storage.Filter - cache map[string]*set.ThreadUnsafeSet - mutex *sync.RWMutex + cache *mutexmap.MMUniqueKeysStorage } // NewUniqueKeysTracker create new implementation -func NewUniqueKeysTracker(f storage.Filter) UniqueKeysTracker { +func NewUniqueKeysTracker(f storage.Filter, cache *mutexmap.MMUniqueKeysStorage) UniqueKeysTracker { return &UniqueKeysTrackerImpl{ filter: f, - cache: make(map[string]*set.ThreadUnsafeSet), - mutex: &sync.RWMutex{}, + cache: cache, } } @@ -37,49 +31,8 @@ func (t *UniqueKeysTrackerImpl) Track(featureName string, key string) bool { return false } - t.mutex.Lock() - defer t.mutex.Unlock() - t.filter.Add(fKey) - _, ok := t.cache[featureName] - if !ok { - t.cache[featureName] = set.NewSet() - } - - t.cache[featureName].Add(key) + t.cache.Add(featureName, key) return true } - -// PopAll returns all the elements stored in the cache and resets the cache -func (t *UniqueKeysTrackerImpl) PopAll() dtos.Uniques { - t.mutex.Lock() - defer t.mutex.Unlock() - toReturn := t.cache - t.cache = make(map[string]*set.ThreadUnsafeSet) - - return getUniqueKeysDto(toReturn) -} - -func getUniqueKeysDto(uniques map[string]*set.ThreadUnsafeSet) dtos.Uniques { - uniqueKeys := dtos.Uniques{ - Keys: make([]dtos.Key, 0, len(uniques)), - } - - for name, keys := range uniques { - list := keys.List() - keysDto := make([]string, 0, len(list)) - - for _, value := range list { - keysDto = append(keysDto, value.(string)) - } - keyDto := dtos.Key{ - Feature: name, - Keys: keysDto, - } - - uniqueKeys.Keys = append(uniqueKeys.Keys, keyDto) - } - - return uniqueKeys -} diff --git a/provisional/strategy/uniquekeystracker_test.go b/provisional/strategy/uniquekeystracker_test.go index f2ec0992..5e2e1cc3 100644 --- a/provisional/strategy/uniquekeystracker_test.go +++ b/provisional/strategy/uniquekeystracker_test.go @@ -1,34 +1,34 @@ package strategy -import ( - "fmt" - "testing" - - "github.com/splitio/go-split-commons/v6/storage/filter" -) - -func Test(t *testing.T) { - bf := filter.NewBloomFilter(10000, 0.01) - - tracker := NewUniqueKeysTracker(bf) - - for i := 0; i < 10; i++ { - if !tracker.Track("feature-1", "key-"+fmt.Sprint(i)) { - t.Error("Should be true") - } - } - - for i := 0; i < 10; i++ { - if !tracker.Track("feature-2", "key-"+fmt.Sprint(i)) { - t.Error("Should be true") - } - } - - if tracker.Track("feature-2", "key-4") { - t.Error("Should be false") - } - - if tracker.Track("feature-1", "key-4") { - t.Error("Should be false") - } -} +// import ( +// "fmt" +// "testing" + +// "github.com/splitio/go-split-commons/v6/storage/filter" +// ) + +// func Test(t *testing.T) { +// bf := filter.NewBloomFilter(10000, 0.01) + +// tracker := NewUniqueKeysTracker(bf) + +// for i := 0; i < 10; i++ { +// if !tracker.Track("feature-1", "key-"+fmt.Sprint(i)) { +// t.Error("Should be true") +// } +// } + +// for i := 0; i < 10; i++ { +// if !tracker.Track("feature-2", "key-"+fmt.Sprint(i)) { +// t.Error("Should be true") +// } +// } + +// if tracker.Track("feature-2", "key-4") { +// t.Error("Should be false") +// } + +// if tracker.Track("feature-1", "key-4") { +// t.Error("Should be false") +// } +// } diff --git a/storage/inmemory/mutexmap/uniquekeys.go b/storage/inmemory/mutexmap/uniquekeys.go new file mode 100644 index 00000000..02c7e37c --- /dev/null +++ b/storage/inmemory/mutexmap/uniquekeys.go @@ -0,0 +1,99 @@ +package mutexmap + +import ( + "sync" + + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-split-commons/v6/storage" + "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/splitio/go-toolkit/v5/logging" +) + +type MMUniqueKeysStorage struct { + data map[string]*set.ThreadUnsafeSet + maxSize int64 + size int64 + mutex *sync.RWMutex + fullChan chan string //only write channel + logger logging.LoggerInterface + runtimeTelemetry storage.TelemetryRuntimeProducer +} + +func NewMMUniqueKeysStorage(maxSize int64, isFull chan string, logger logging.LoggerInterface, runtimeTelemetry storage.TelemetryRuntimeProducer) *MMUniqueKeysStorage { + return &MMUniqueKeysStorage{ + data: make(map[string]*set.ThreadUnsafeSet), + maxSize: maxSize, + size: 0, + mutex: &sync.RWMutex{}, + fullChan: isFull, + logger: logger, + runtimeTelemetry: runtimeTelemetry, + } +} + +func (s *MMUniqueKeysStorage) Add(featureName string, key string) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.size++ + _, ok := s.data[featureName] + if !ok { + s.data[featureName] = set.NewSet() + } + + s.data[featureName].Add(key) + + if s.size >= s.maxSize { + s.sendSignalIsFull() + } +} + +func (s *MMUniqueKeysStorage) PopAll() dtos.Uniques { + s.mutex.Lock() + defer s.mutex.Unlock() + + toReturn := s.data + s.data = make(map[string]*set.ThreadUnsafeSet) + s.size = 0 + + result := getUniqueKeysDto(toReturn) + + return result +} + +func (s *MMUniqueKeysStorage) sendSignalIsFull() { + // Nom blocking select + select { + case s.fullChan <- "UNIQUE_KEYS_FULL": + // Send "queue is full" signal + break + default: + s.logger.Debug("Some error occurred on sending signal for unique keys") + } +} + +func getUniqueKeysDto(uniques map[string]*set.ThreadUnsafeSet) dtos.Uniques { + uniqueKeys := dtos.Uniques{ + Keys: make([]dtos.Key, 0, len(uniques)), + } + + for name, keys := range uniques { + list := keys.List() + keysDto := make([]string, 0, len(list)) + + for _, value := range list { + keysDto = append(keysDto, value.(string)) + } + keyDto := dtos.Key{ + Feature: name, + Keys: keysDto, + } + + uniqueKeys.Keys = append(uniqueKeys.Keys, keyDto) + } + + return uniqueKeys +} + +var _ storage.UniqueKeysStorageConsumer = (*MMUniqueKeysStorage)(nil) +var _ storage.UniqueKeysStorageProducer = (*MMUniqueKeysStorage)(nil) diff --git a/storage/interfaces.go b/storage/interfaces.go index 357bd177..d12e065b 100644 --- a/storage/interfaces.go +++ b/storage/interfaces.go @@ -265,3 +265,16 @@ type LargeSegmentsStorage interface { LargeSegmentStorageProducer LargeSegmentStorageConsumer } + +type UniqueKeysStorageConsumer interface { + PopAll() dtos.Uniques +} + +type UniqueKeysStorageProducer interface { + Add(featureName string, key string) +} + +type UniqueKeysStorage interface { + UniqueKeysStorageConsumer + UniqueKeysStorageProducer +} diff --git a/storage/mocks/uniquekeys.go b/storage/mocks/uniquekeys.go index 14d7a736..bb020474 100644 --- a/storage/mocks/uniquekeys.go +++ b/storage/mocks/uniquekeys.go @@ -1,15 +1,23 @@ package mocks +import ( + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-split-commons/v6/storage" + "github.com/stretchr/testify/mock" +) + type MockUniqueKeysStorage struct { - CountCall func() int64 - PopNRawCall func(n int64) ([]string, int64, error) + mock.Mock } -// Count mock -func (m MockUniqueKeysStorage) Count() int64 { - return m.CountCall() +func (u *MockUniqueKeysStorage) Add(featureName string, key string) { + u.Called(featureName, key) } -func (m MockUniqueKeysStorage) PopNRaw(n int64) ([]string, int64, error) { - return m.PopNRawCall(n) +func (u *MockUniqueKeysStorage) PopAll() dtos.Uniques { + args := u.Called() + return args.Get(0).(dtos.Uniques) } + +var _ storage.UniqueKeysStorageConsumer = (*MockUniqueKeysStorage)(nil) +var _ storage.UniqueKeysStorageProducer = (*MockUniqueKeysStorage)(nil) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 948c488d..6892f103 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -254,6 +254,9 @@ func (s *SynchronizerImpl) dataFlusher() { if err != nil { s.logger.Error("Error flushing storage queue", err) } + case "UNIQUE_KEYS_FULL": + s.logger.Debug("FLUSHING Unique Keys storage") + s.workers.TelemetryRecorder.SynchronizeUniqueKeys() } } } diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index ccf86e10..9b12f237 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -64,13 +64,14 @@ func TestSyncAllErrorSplits(t *testing.T) { atomic.AddInt64(¬ifyEventCalled, 1) }, } + advanced := conf.AdvancedConfig{EventsQueueSize: 100, EventsBulkSize: 100, HTTPTimeout: 100, ImpressionsBulkSize: 100, ImpressionsQueueSize: 100, SegmentQueueSize: 50, SegmentWorkers: 5} workers := Workers{ SplitUpdater: split.NewSplitUpdater(splitMockStorage, splitAPI.SplitFetcher, logger, telemetryMockStorage, appMonitorMock, flagsets.NewFlagSetFilter(nil)), SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, storageMock.MockSegmentStorage{}, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(storageMock.MockEventStorage{}, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), ImpressionRecorder: impression.NewRecorderSingle(storageMock.MockImpressionStorage{}, splitAPI.ImpressionRecorder, logger, dtos.Metadata{}, conf.ImpressionsModeDebug, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, 10, logger), @@ -152,7 +153,7 @@ func TestSyncAllErrorInSegments(t *testing.T) { SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, segmentMockStorage, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(storageMock.MockEventStorage{}, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), ImpressionRecorder: impression.NewRecorderSingle(storageMock.MockImpressionStorage{}, splitAPI.ImpressionRecorder, logger, dtos.Metadata{}, conf.ImpressionsModeDebug, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, 10, logger), @@ -256,7 +257,7 @@ func TestSyncAllOk(t *testing.T) { SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, segmentMockStorage, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(storageMock.MockEventStorage{}, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), ImpressionRecorder: impression.NewRecorderSingle(storageMock.MockImpressionStorage{}, splitAPI.ImpressionRecorder, logger, dtos.Metadata{}, conf.ImpressionsModeDebug, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, 10, logger), @@ -360,7 +361,7 @@ func TestPeriodicFetching(t *testing.T) { SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, segmentMockStorage, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(storageMock.MockEventStorage{}, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), ImpressionRecorder: impression.NewRecorderSingle(storageMock.MockImpressionStorage{}, splitAPI.ImpressionRecorder, logger, dtos.Metadata{}, conf.ImpressionsModeDebug, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, 1, logger), @@ -484,7 +485,7 @@ func TestPeriodicRecording(t *testing.T) { SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, segmentMockStorage, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(eventMockStorage, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), ImpressionRecorder: impression.NewRecorderSingle(impressionMockStorage, splitAPI.ImpressionRecorder, logger, dtos.Metadata{}, conf.ImpressionsModeDebug, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, splitAPI.TelemetryRecorder, splitMockStorage, segmentMockStorage, logger, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, splitAPI.TelemetryRecorder, splitMockStorage, segmentMockStorage, logger, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, 1, logger), @@ -1000,7 +1001,7 @@ func TestSplitUpdateWithReferencedSegments(t *testing.T) { SplitUpdater: split.NewSplitUpdater(splitMockStorage, splitAPI.SplitFetcher, logger, telemetryMockStorage, appMonitorMock, flagsets.NewFlagSetFilter(nil)), SegmentUpdater: segment.NewSegmentUpdater(splitMockStorage, segmentMockStorage, splitAPI.SegmentFetcher, logger, telemetryMockStorage, appMonitorMock), EventRecorder: event.NewEventRecorderSingle(storageMock.MockEventStorage{}, splitAPI.EventRecorder, logger, dtos.Metadata{}, telemetryMockStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryMockStorage, nil, nil, nil, nil, dtos.Metadata{}, telemetryMockStorage, nil), } splitTasks := SplitTasks{ SegmentSyncTask: tasks.NewFetchSegmentsTask(workers.SegmentUpdater, 10, 5, 50, logger, appMonitorMock), diff --git a/tasks/telemetrysync_test.go b/tasks/telemetrysync_test.go index 455fff4b..d0e891ad 100644 --- a/tasks/telemetrysync_test.go +++ b/tasks/telemetrysync_test.go @@ -64,6 +64,7 @@ func TestTelemetrySyncTask(t *testing.T) { logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage, + nil, ), 2, logging.NewLogger(&logging.LoggerOptions{}), diff --git a/tasks/uniquekeyssync.go b/tasks/uniquekeyssync.go index a2c2b598..21cdf1af 100644 --- a/tasks/uniquekeyssync.go +++ b/tasks/uniquekeyssync.go @@ -1,7 +1,6 @@ package tasks import ( - "github.com/splitio/go-split-commons/v6/provisional/strategy" "github.com/splitio/go-split-commons/v6/telemetry" "github.com/splitio/go-toolkit/v5/asynctask" "github.com/splitio/go-toolkit/v5/logging" @@ -10,16 +9,15 @@ import ( // NewRecordUniqueKeysTask constructor func NewRecordUniqueKeysTask( recorder telemetry.TelemetrySynchronizer, - uniqueTracker strategy.UniqueKeysTracker, period int, logger logging.LoggerInterface, ) *asynctask.AsyncTask { record := func(logger logging.LoggerInterface) error { - return recorder.SynchronizeUniqueKeys(uniqueTracker.PopAll()) + return recorder.SynchronizeUniqueKeys() } onStop := func(logger logging.LoggerInterface) { - recorder.SynchronizeUniqueKeys(uniqueTracker.PopAll()) + recorder.SynchronizeUniqueKeys() } return asynctask.NewAsyncTask("SubmitUniqueKeys", record, period, nil, onStop, logger) diff --git a/tasks/uniquekeyssync_test.go b/tasks/uniquekeyssync_test.go index 9046466b..f656df35 100644 --- a/tasks/uniquekeyssync_test.go +++ b/tasks/uniquekeyssync_test.go @@ -1,109 +1,111 @@ package tasks -import ( - "testing" - "time" +// import ( +// "testing" +// "time" - "github.com/splitio/go-split-commons/v6/dtos" - "github.com/splitio/go-split-commons/v6/provisional/strategy" - "github.com/splitio/go-split-commons/v6/service/mocks" - st "github.com/splitio/go-split-commons/v6/storage/mocks" - "github.com/splitio/go-split-commons/v6/telemetry" - "github.com/splitio/go-toolkit/v5/datastructures/set" - "github.com/splitio/go-toolkit/v5/logging" -) +// "github.com/splitio/go-split-commons/v6/dtos" +// "github.com/splitio/go-split-commons/v6/provisional/strategy" +// "github.com/splitio/go-split-commons/v6/service/mocks" +// st "github.com/splitio/go-split-commons/v6/storage/mocks" +// "github.com/splitio/go-split-commons/v6/telemetry" +// "github.com/splitio/go-toolkit/v5/datastructures/set" +// "github.com/splitio/go-toolkit/v5/logging" +// ) -func TestUniqueKeysTask(t *testing.T) { - var call int64 +// func TestUniqueKeysTask(t *testing.T) { +// var call int64 - mockedTelemetryStorage := st.MockTelemetryStorage{ - PopLatenciesCall: func() dtos.MethodLatencies { return dtos.MethodLatencies{} }, - PopExceptionsCall: func() dtos.MethodExceptions { return dtos.MethodExceptions{} }, - GetLastSynchronizationCall: func() dtos.LastSynchronization { return dtos.LastSynchronization{} }, - PopHTTPErrorsCall: func() dtos.HTTPErrors { return dtos.HTTPErrors{} }, - PopHTTPLatenciesCall: func() dtos.HTTPLatencies { return dtos.HTTPLatencies{} }, - GetImpressionsStatsCall: func(dataType int) int64 { return 0 }, - GetEventsStatsCall: func(dataType int) int64 { return 0 }, - PopTokenRefreshesCall: func() int64 { return 0 }, - PopAuthRejectionsCall: func() int64 { return 0 }, - PopStreamingEventsCall: func() []dtos.StreamingEvent { return []dtos.StreamingEvent{} }, - GetSessionLengthCall: func() int64 { return 0 }, - PopTagsCall: func() []string { return []string{} }, - RecordSuccessfulSyncCall: func(resource int, tm time.Time) {}, - RecordSyncLatencyCall: func(resource int, latency time.Duration) {}, - RecordUniqueKeysCall: func(uniques dtos.Uniques) error { - return nil - }, - } - mockedTelemetryHTTP := mocks.MockTelemetryRecorder{ - RecordStatsCall: func(stats dtos.Stats, metadata dtos.Metadata) error { - return nil - }, - RecordUniqueKeysCall: func(uniques dtos.Uniques, metadata dtos.Metadata) error { - if len(uniques.Keys) != 2 { - t.Error("Should be 2") - } - call++ - return nil - }, - } - mockedSplitStorage := st.MockSplitStorage{ - SplitNamesCall: func() []string { return []string{} }, - SegmentNamesCall: func() *set.ThreadUnsafeSet { return set.NewSet() }, - } - mockedSegmentStorage := st.MockSegmentStorage{ - SegmentKeysCountCall: func() int64 { return 10 }, - } +// mockedTelemetryStorage := st.MockTelemetryStorage{ +// PopLatenciesCall: func() dtos.MethodLatencies { return dtos.MethodLatencies{} }, +// PopExceptionsCall: func() dtos.MethodExceptions { return dtos.MethodExceptions{} }, +// GetLastSynchronizationCall: func() dtos.LastSynchronization { return dtos.LastSynchronization{} }, +// PopHTTPErrorsCall: func() dtos.HTTPErrors { return dtos.HTTPErrors{} }, +// PopHTTPLatenciesCall: func() dtos.HTTPLatencies { return dtos.HTTPLatencies{} }, +// GetImpressionsStatsCall: func(dataType int) int64 { return 0 }, +// GetEventsStatsCall: func(dataType int) int64 { return 0 }, +// PopTokenRefreshesCall: func() int64 { return 0 }, +// PopAuthRejectionsCall: func() int64 { return 0 }, +// PopStreamingEventsCall: func() []dtos.StreamingEvent { return []dtos.StreamingEvent{} }, +// GetSessionLengthCall: func() int64 { return 0 }, +// PopTagsCall: func() []string { return []string{} }, +// RecordSuccessfulSyncCall: func(resource int, tm time.Time) {}, +// RecordSyncLatencyCall: func(resource int, latency time.Duration) {}, +// RecordUniqueKeysCall: func(uniques dtos.Uniques) error { +// return nil +// }, +// } +// mockedTelemetryHTTP := mocks.MockTelemetryRecorder{ +// RecordStatsCall: func(stats dtos.Stats, metadata dtos.Metadata) error { +// return nil +// }, +// RecordUniqueKeysCall: func(uniques dtos.Uniques, metadata dtos.Metadata) error { +// if len(uniques.Keys) != 2 { +// t.Error("Should be 2") +// } +// call++ +// return nil +// }, +// } +// mockedSplitStorage := st.MockSplitStorage{ +// SplitNamesCall: func() []string { return []string{} }, +// SegmentNamesCall: func() *set.ThreadUnsafeSet { return set.NewSet() }, +// } +// mockedSegmentStorage := st.MockSegmentStorage{ +// SegmentKeysCountCall: func() int64 { return 10 }, +// } - synchronizer := telemetry.NewTelemetrySynchronizer( - mockedTelemetryStorage, - mockedTelemetryHTTP, - mockedSplitStorage, - mockedSegmentStorage, - logging.NewLogger(&logging.LoggerOptions{}), - dtos.Metadata{}, - mockedTelemetryStorage, - ) +// synchronizer := telemetry.NewTelemetrySynchronizer( +// mockedTelemetryStorage, +// mockedTelemetryHTTP, +// mockedSplitStorage, +// mockedSegmentStorage, +// logging.NewLogger(&logging.LoggerOptions{}), +// dtos.Metadata{}, +// mockedTelemetryStorage, +// ) - filter := st.MockFilter{ - AddCall: func(data string) {}, - ContainsCall: func(data string) bool { return false }, - ClearCall: func() {}, - } - tracker := strategy.NewUniqueKeysTracker(filter) - task := NewRecordUniqueKeysTask( - synchronizer, - tracker, - 2, - logging.NewLogger(&logging.LoggerOptions{}), - ) +// filter := st.MockFilter{ +// AddCall: func(data string) {}, +// ContainsCall: func(data string) bool { return false }, +// ClearCall: func() {}, +// } +// tracker := strategy.NewUniqueKeysTracker(filter) +// task := NewRecordUniqueKeysTask( +// synchronizer, +// tracker, +// 2, +// logging.NewLogger(&logging.LoggerOptions{}), +// ) - if !tracker.Track("tratment-1", "key-1") { - t.Error("Should be true") - } - if !tracker.Track("tratment-1", "key-2") { - t.Error("Should be true") - } - if !tracker.Track("tratment-2", "key-1") { - t.Error("Should be true") - } - if !tracker.Track("tratment-2", "key-2") { - t.Error("Should be true") - } +// if !tracker.Track("tratment-1", "key-1") { +// t.Error("Should be true") +// } +// if !tracker.Track("tratment-1", "key-2") { +// t.Error("Should be true") +// } +// if !tracker.Track("tratment-2", "key-1") { +// t.Error("Should be true") +// } +// if !tracker.Track("tratment-2", "key-2") { +// t.Error("Should be true") +// } - task.Start() - time.Sleep(3 * time.Second) +// task.Start() +// time.Sleep(3 * time.Second) - if !task.IsRunning() { - t.Error("UniqueKeys task should be running") - } +// if !task.IsRunning() { +// t.Error("UniqueKeys task should be running") +// } - task.Stop(true) - if call != 1 { - t.Error("Request not received") - } +// task.Stop(true) +// if call != 1 { +// t.Error("Request not received") +// } - if task.IsRunning() { - t.Error("Task should be stopped") - } -} +// if task.IsRunning() { +// t.Error("Task should be stopped") +// } + +// t.Error(tracker.PopAll().Keys) +// } diff --git a/telemetry/interface.go b/telemetry/interface.go index ca6e0b13..6598fa2e 100644 --- a/telemetry/interface.go +++ b/telemetry/interface.go @@ -1,10 +1,8 @@ package telemetry -import "github.com/splitio/go-split-commons/v6/dtos" - // TelemetrySynchronizer interface type TelemetrySynchronizer interface { SynchronizeConfig(cfg InitConfig, timedUntilReady int64, factoryInstances map[string]int64, tags []string) SynchronizeStats() error - SynchronizeUniqueKeys(uniques dtos.Uniques) error + SynchronizeUniqueKeys() error } diff --git a/telemetry/memory.go b/telemetry/memory.go index f7fa3d1d..28c58a35 100644 --- a/telemetry/memory.go +++ b/telemetry/memory.go @@ -20,6 +20,7 @@ type RecorderSingle struct { logger logging.LoggerInterface metadata dtos.Metadata runtimeTelemetry storage.TelemetryRuntimeProducer + uniqueKeysStorage storage.UniqueKeysStorageConsumer } // NewTelemetrySynchronizer creates new event synchronizer for posting events @@ -31,6 +32,7 @@ func NewTelemetrySynchronizer( logger logging.LoggerInterface, metadata dtos.Metadata, runtimeTelemetry storage.TelemetryRuntimeProducer, + uniqueKeysStorage storage.UniqueKeysStorageConsumer, ) TelemetrySynchronizer { return &RecorderSingle{ telemetryStorage: telemetryStorage, @@ -40,6 +42,7 @@ func NewTelemetrySynchronizer( logger: logger, metadata: metadata, runtimeTelemetry: runtimeTelemetry, + uniqueKeysStorage: uniqueKeysStorage, } } @@ -133,7 +136,9 @@ func (e *RecorderSingle) SynchronizeConfig(cfg InitConfig, timedUntilReady int64 } // SynchronizeUniqueKeys syncs unique keys -func (e *RecorderSingle) SynchronizeUniqueKeys(uniques dtos.Uniques) error { +func (e *RecorderSingle) SynchronizeUniqueKeys() error { + uniques := e.uniqueKeysStorage.PopAll() + if len(uniques.Keys) < 1 { e.logger.Debug("Unique keys list is empty, nothing to synchronize.") return nil diff --git a/telemetry/memory_test.go b/telemetry/memory_test.go index 850957ab..d07d51f9 100644 --- a/telemetry/memory_test.go +++ b/telemetry/memory_test.go @@ -64,7 +64,7 @@ func TestTelemetryRecorderError(t *testing.T) { }, } - telemetrySync := NewTelemetrySynchronizer(mockedTelemetryStorage, telemetryRecorderMock, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage) + telemetrySync := NewTelemetrySynchronizer(mockedTelemetryStorage, telemetryRecorderMock, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage, nil) err := telemetrySync.SynchronizeStats() if err == nil { @@ -119,7 +119,7 @@ func TestTelemetryRecorder(t *testing.T) { RecordStatsCall: func(stats dtos.Stats, metadata dtos.Metadata) error { return nil }, } - telemetrySync := NewTelemetrySynchronizer(mockedTelemetryStorage, telemetryRecorderMock, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage) + telemetrySync := NewTelemetrySynchronizer(mockedTelemetryStorage, telemetryRecorderMock, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage, nil) err := telemetrySync.SynchronizeStats() if err != nil { @@ -201,7 +201,7 @@ func TestTelemetryRecorderSync(t *testing.T) { PopUpdatesFromSSECall: func() dtos.UpdatesFromSSE { return dtos.UpdatesFromSSE{} }, } - telemetryRecorder := NewTelemetrySynchronizer(mockedTelemetryStorage, httpTelemetryRecorder, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage) + telemetryRecorder := NewTelemetrySynchronizer(mockedTelemetryStorage, httpTelemetryRecorder, mockedSplitStorage, mockedSegmentStorage, logging.NewLogger(&logging.LoggerOptions{}), dtos.Metadata{}, mockedTelemetryStorage, nil) telemetryRecorder.SynchronizeStats() @@ -255,7 +255,7 @@ func TestConfig(t *testing.T) { }, } - sync := NewTelemetrySynchronizer(mockTelemetryStorage, mockRecorder, st.MockSplitStorage{}, st.MockSegmentStorage{}, logger, dtos.Metadata{SDKVersion: "go-test", MachineIP: "1.1.1.1", MachineName: "some"}, mockTelemetryStorage) + sync := NewTelemetrySynchronizer(mockTelemetryStorage, mockRecorder, st.MockSplitStorage{}, st.MockSegmentStorage{}, logger, dtos.Metadata{SDKVersion: "go-test", MachineIP: "1.1.1.1", MachineName: "some"}, mockTelemetryStorage, nil) factories := make(map[string]int64) factories["one"] = 1 factories["two"] = 1 diff --git a/telemetry/redis.go b/telemetry/redis.go index fb63cee5..d98d5362 100644 --- a/telemetry/redis.go +++ b/telemetry/redis.go @@ -8,15 +8,21 @@ import ( // SynchronizerRedis struct type SynchronizerRedis struct { - storage storage.TelemetryConfigProducer - logger logging.LoggerInterface + storage storage.TelemetryConfigProducer + logger logging.LoggerInterface + uniqueKeysStorage storage.UniqueKeysStorageConsumer } // NewSynchronizerRedis constructor -func NewSynchronizerRedis(storage storage.TelemetryConfigProducer, logger logging.LoggerInterface) TelemetrySynchronizer { +func NewSynchronizerRedis( + storage storage.TelemetryConfigProducer, + logger logging.LoggerInterface, + uniqueKeysStorage storage.UniqueKeysStorageConsumer, +) TelemetrySynchronizer { return &SynchronizerRedis{ - storage: storage, - logger: logger, + storage: storage, + logger: logger, + uniqueKeysStorage: uniqueKeysStorage, } } @@ -45,7 +51,8 @@ func (r *SynchronizerRedis) SynchronizeConfig(cfg InitConfig, timedUntilReady in } // SynchronizeUniqueKeys syncs unique keys -func (r *SynchronizerRedis) SynchronizeUniqueKeys(uniques dtos.Uniques) error { +func (r *SynchronizerRedis) SynchronizeUniqueKeys() error { + uniques := r.uniqueKeysStorage.PopAll() if len(uniques.Keys) < 1 { r.logger.Debug("Unique keys list is empty, nothing to synchronize.") return nil diff --git a/telemetry/redis_test.go b/telemetry/redis_test.go index b6b9c288..b3b75610 100644 --- a/telemetry/redis_test.go +++ b/telemetry/redis_test.go @@ -34,7 +34,7 @@ func TestRecorderRedis(t *testing.T) { }, } - sender := NewSynchronizerRedis(redisMock, logger) + sender := NewSynchronizerRedis(redisMock, logger, nil) factories := make(map[string]int64) factories["one"] = 1 sender.SynchronizeConfig(InitConfig{}, 0, factories, []string{"sentinel"}) From 465488a09e81eea07d2ed6787bccae76159941bd Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Mon, 17 Mar 2025 13:09:38 -0300 Subject: [PATCH 03/15] upgrade sonarcloud-github-action to v5 --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3992b126..61b23f38 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: - name: SonarQube Scan (Push) if: ${{ github.event_name == 'push' }} - uses: SonarSource/sonarcloud-github-action@v4.0.0 + uses: SonarSource/sonarcloud-github-action@v5.0.0 env: SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -46,7 +46,7 @@ jobs: - name: SonarQube Scan (Pull Request) if: ${{ github.event_name == 'pull_request' }} - uses: SonarSource/sonarcloud-github-action@v4.0.0 + uses: SonarSource/sonarcloud-github-action@v5.0.0 env: SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} From ebd4ac6ae03e1449fae7aaf9a6ae8ef43a7d0251 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Mon, 17 Mar 2025 14:41:51 -0300 Subject: [PATCH 04/15] updating localhost --- telemetry/localhost.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/telemetry/localhost.go b/telemetry/localhost.go index fb95d74c..2810ae5d 100644 --- a/telemetry/localhost.go +++ b/telemetry/localhost.go @@ -1,9 +1,5 @@ package telemetry -import ( - "github.com/splitio/go-split-commons/v6/dtos" -) - type NoOp struct{} func (n *NoOp) SynchronizeConfig(cfg InitConfig, timedUntilReady int64, factoryInstances map[string]int64, tags []string) { @@ -13,6 +9,8 @@ func (n *NoOp) SynchronizeStats() error { return nil } -func (n *NoOp) SynchronizeUniqueKeys(uniques dtos.Uniques) error { +func (n *NoOp) SynchronizeUniqueKeys() error { return nil } + +var _ TelemetrySynchronizer = (*NoOp)(nil) From 3923ef40878707748dc8b8f14afc5f8223670540 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Mon, 17 Mar 2025 15:42:32 -0300 Subject: [PATCH 05/15] polishing --- provisional/impmanager_test.go | 498 +++++++++++----------- provisional/strategy/none_test.go | 160 +++---- provisional/strategy/uniquekeystracker.go | 13 +- storage/inmemory/mutexmap/uniquekeys.go | 28 +- storage/mocks/uniquekeys.go | 17 +- tasks/uniquekeyssync_test.go | 198 ++++----- 6 files changed, 458 insertions(+), 456 deletions(-) diff --git a/provisional/impmanager_test.go b/provisional/impmanager_test.go index 9bc1ac10..1c3befe2 100644 --- a/provisional/impmanager_test.go +++ b/provisional/impmanager_test.go @@ -1,249 +1,253 @@ package provisional -// import ( -// "testing" -// "time" - -// "github.com/splitio/go-split-commons/v6/dtos" -// "github.com/splitio/go-split-commons/v6/provisional/strategy" -// "github.com/splitio/go-split-commons/v6/storage/filter" -// "github.com/splitio/go-split-commons/v6/storage/inmemory" -// "github.com/splitio/go-split-commons/v6/telemetry" -// ) - -// func TestImpManagerInMemoryDebugListenerDisabled(t *testing.T) { -// observer, _ := strategy.NewImpressionObserver(5000) -// debug := strategy.NewDebugImpl(observer, false) -// impManager := NewImpressionManager(debug) - -// now := time.Now().UTC().UnixNano() -// imp1 := &dtos.Impression{ -// BucketingKey: "someBucketingKey", -// ChangeNumber: 123456789, -// FeatureName: "someFeature", -// KeyName: "someKey", -// Label: "someLabel", -// Time: now, -// Treatment: "someTreatment", -// } - -// impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) -// if len(impressionsForListener) != 0 { -// t.Error("It should not return an impression") -// } -// if len(impressionsForLog) != 1 { -// t.Error("It should return an impression") -// } -// if impressionsForLog[0].Pt != 0 { -// t.Error("It should not have pt associated yet") -// } - -// impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) -// if len(impressionsForListener) != 0 || len(impressionsForLog) != 1 { -// t.Error("It should return an impression") -// } -// } - -// func TestImpManagerInMemoryDebug(t *testing.T) { -// observer, _ := strategy.NewImpressionObserver(5000) -// debug := strategy.NewDebugImpl(observer, true) -// impManager := NewImpressionManager(debug) - -// now := time.Now().UTC().UnixNano() -// imp1 := &dtos.Impression{ -// BucketingKey: "someBucketingKey", -// ChangeNumber: 123456789, -// FeatureName: "someFeature", -// KeyName: "someKey", -// Label: "someLabel", -// Time: now, -// Treatment: "someTreatment", -// } - -// impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) -// if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { -// t.Error("It should return an impression") -// } -// if impressionsForListener[0].Pt != 0 { -// t.Error("It should not have pt associated yet") -// } - -// impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) -// if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { -// t.Error("It should return an impression") -// } -// if impressionsForListener[0].Pt != now { -// t.Error("It should have pt associated") -// } -// } - -// func TestImpManagerInMemoryOptimized(t *testing.T) { -// runtimeTelemetry, _ := inmemory.NewTelemetryStorage() -// counter := strategy.NewImpressionsCounter() -// observer, _ := strategy.NewImpressionObserver(5000) -// optimized := strategy.NewOptimizedImpl(observer, counter, runtimeTelemetry, true) -// impManager := NewImpressionManager(optimized) - -// now := time.Now().UTC().UnixNano() -// imp1 := &dtos.Impression{ -// BucketingKey: "someBucketingKey", -// ChangeNumber: 123456789, -// FeatureName: "someFeature", -// KeyName: "someKey", -// Label: "someLabel", -// Time: now, -// Treatment: "someTreatment", -// } - -// impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) -// if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { -// t.Error("It should return an impression") -// } -// if impressionsForListener[0].Pt != 0 { -// t.Error("It should not have pt associated yet") -// } - -// impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) -// if len(impressionsForListener) != 1 { -// t.Error("It should return an impression") -// } -// if len(impressionsForLog) != 0 { -// t.Error("It should not return an impression") -// } -// if impressionsForListener[0].Pt != now { -// t.Error("It should have pt associated") -// } - -// if runtimeTelemetry.GetImpressionsStats(telemetry.ImpressionsDeduped) != 1 { -// t.Error("It should be 1") -// } -// } - -// func TestImpManagerInMemoryNone(t *testing.T) { -// counter := strategy.NewImpressionsCounter() -// filter := filter.NewBloomFilter(3000, 0.01) -// uniqueTracker := strategy.NewUniqueKeysTracker(filter) -// none := strategy.NewNoneImpl(counter, uniqueTracker, true) -// impManager := NewImpressionManager(none) - -// now := time.Now().UTC().UnixNano() -// imp1 := &dtos.Impression{ -// BucketingKey: "someBucketingKey", -// ChangeNumber: 123456789, -// FeatureName: "someFeature", -// KeyName: "someKey", -// Label: "someLabel", -// Time: now, -// Treatment: "someTreatment", -// } - -// impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) -// if len(impressionsForListener) != 1 { -// t.Error("It should return an impression") -// } -// if len(impressionsForLog) != 0 { -// t.Error("It should not return an impression") -// } - -// if impressionsForListener[0].Pt != 0 { -// t.Error("It should not have pt associated") -// } -// } - -// func TestImpManagerRedis(t *testing.T) { -// observer, _ := strategy.NewImpressionObserver(5000) -// debug := strategy.NewDebugImpl(observer, true) -// impManager := NewImpressionManager(debug) - -// now := time.Now().UTC().UnixNano() -// imp1 := &dtos.Impression{ -// BucketingKey: "someBucketingKey", -// ChangeNumber: 123456789, -// FeatureName: "someFeature", -// KeyName: "someKey", -// Label: "someLabel", -// Time: now, -// Treatment: "someTreatment", -// } - -// impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) -// if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { -// t.Error("It should return an impression") -// } -// if impressionsForListener[0].Pt != 0 { -// t.Error("It should not have pt associated") -// } - -// impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) -// if len(impressionsForListener) != 1 { -// t.Error("It should return an impression") -// } -// if len(impressionsForLog) != 1 { -// t.Error("It should return an impression") -// } -// if impressionsForListener[0].Pt == 0 { -// t.Error("It should have pt") -// } -// } - -// func TestProcess(t *testing.T) { -// observer, _ := strategy.NewImpressionObserver(5000) -// debug := strategy.NewDebugImpl(observer, true) -// filter := filter.NewBloomFilter(3000, 0.01) -// uniqueTracker := strategy.NewUniqueKeysTracker(filter) -// counter := strategy.NewImpressionsCounter() -// none := strategy.NewNoneImpl(counter, uniqueTracker, false) - -// now := time.Now().UTC().UnixNano() -// impressions := []dtos.Impression{ -// { -// BucketingKey: "someBucketingKey", -// ChangeNumber: 123456789, -// FeatureName: "someFeature", -// KeyName: "someKey", -// Label: "someLabel", -// Time: now, -// Treatment: "someTreatment", -// Disabled: true, -// }, -// { -// BucketingKey: "someBucketingKey", -// ChangeNumber: 123456789, -// FeatureName: "harnessFlag", -// KeyName: "someKey", -// Label: "someLabel", -// Time: now, -// Treatment: "someTreatment", -// Disabled: true, -// }, -// { -// BucketingKey: "someBucketingKey", -// ChangeNumber: 123456789, -// FeatureName: "featureTest", -// KeyName: "someKey", -// Label: "someLabel", -// Time: now, -// Treatment: "someTreatment", -// Disabled: false, -// }, -// } - -// impManager := NewImpressionManagerImp(none, debug) -// impressionsForLog, impressionsForListener := impManager.Process(impressions, true) -// if len(impressionsForListener) != 3 { -// t.Error("Impressions for Listener should be 3. Actual: ", len(impressionsForListener)) -// } -// if len(impressionsForLog) != 1 { -// t.Error("Impressions for Log should be 3. Actual: ", len(impressionsForLog)) -// } - -// impManager = NewImpressionManagerImp(none, none) - -// impressionsForLog, impressionsForListener = impManager.Process(impressions, false) -// if len(impressionsForListener) != 0 { -// t.Error("Impressions for Listener should be 0. Actual: ", len(impressionsForListener)) -// } -// if len(impressionsForLog) != 0 { -// t.Error("Impressions for Log should be 1. Actual: ", len(impressionsForLog)) -// } -// } +import ( + "testing" + "time" + + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-split-commons/v6/provisional/strategy" + "github.com/splitio/go-split-commons/v6/storage/filter" + "github.com/splitio/go-split-commons/v6/storage/inmemory" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap" + "github.com/splitio/go-split-commons/v6/telemetry" + "github.com/splitio/go-toolkit/v5/logging" +) + +func TestImpManagerInMemoryDebugListenerDisabled(t *testing.T) { + observer, _ := strategy.NewImpressionObserver(5000) + debug := strategy.NewDebugImpl(observer, false) + impManager := NewImpressionManager(debug) + + now := time.Now().UTC().UnixNano() + imp1 := &dtos.Impression{ + BucketingKey: "someBucketingKey", + ChangeNumber: 123456789, + FeatureName: "someFeature", + KeyName: "someKey", + Label: "someLabel", + Time: now, + Treatment: "someTreatment", + } + + impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) + if len(impressionsForListener) != 0 { + t.Error("It should not return an impression") + } + if len(impressionsForLog) != 1 { + t.Error("It should return an impression") + } + if impressionsForLog[0].Pt != 0 { + t.Error("It should not have pt associated yet") + } + + impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) + if len(impressionsForListener) != 0 || len(impressionsForLog) != 1 { + t.Error("It should return an impression") + } +} + +func TestImpManagerInMemoryDebug(t *testing.T) { + observer, _ := strategy.NewImpressionObserver(5000) + debug := strategy.NewDebugImpl(observer, true) + impManager := NewImpressionManager(debug) + + now := time.Now().UTC().UnixNano() + imp1 := &dtos.Impression{ + BucketingKey: "someBucketingKey", + ChangeNumber: 123456789, + FeatureName: "someFeature", + KeyName: "someKey", + Label: "someLabel", + Time: now, + Treatment: "someTreatment", + } + + impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) + if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { + t.Error("It should return an impression") + } + if impressionsForListener[0].Pt != 0 { + t.Error("It should not have pt associated yet") + } + + impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) + if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { + t.Error("It should return an impression") + } + if impressionsForListener[0].Pt != now { + t.Error("It should have pt associated") + } +} + +func TestImpManagerInMemoryOptimized(t *testing.T) { + runtimeTelemetry, _ := inmemory.NewTelemetryStorage() + counter := strategy.NewImpressionsCounter() + observer, _ := strategy.NewImpressionObserver(5000) + optimized := strategy.NewOptimizedImpl(observer, counter, runtimeTelemetry, true) + impManager := NewImpressionManager(optimized) + + now := time.Now().UTC().UnixNano() + imp1 := &dtos.Impression{ + BucketingKey: "someBucketingKey", + ChangeNumber: 123456789, + FeatureName: "someFeature", + KeyName: "someKey", + Label: "someLabel", + Time: now, + Treatment: "someTreatment", + } + + impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) + if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { + t.Error("It should return an impression") + } + if impressionsForListener[0].Pt != 0 { + t.Error("It should not have pt associated yet") + } + + impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) + if len(impressionsForListener) != 1 { + t.Error("It should return an impression") + } + if len(impressionsForLog) != 0 { + t.Error("It should not return an impression") + } + if impressionsForListener[0].Pt != now { + t.Error("It should have pt associated") + } + + if runtimeTelemetry.GetImpressionsStats(telemetry.ImpressionsDeduped) != 1 { + t.Error("It should be 1") + } +} + +func TestImpManagerInMemoryNone(t *testing.T) { + counter := strategy.NewImpressionsCounter() + filter := filter.NewBloomFilter(3000, 0.01) + uniqueKeysStorage := mutexmap.NewMMUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + uniqueTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage) + none := strategy.NewNoneImpl(counter, uniqueTracker, true) + impManager := NewImpressionManager(none) + + now := time.Now().UTC().UnixNano() + imp1 := &dtos.Impression{ + BucketingKey: "someBucketingKey", + ChangeNumber: 123456789, + FeatureName: "someFeature", + KeyName: "someKey", + Label: "someLabel", + Time: now, + Treatment: "someTreatment", + } + + impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) + if len(impressionsForListener) != 1 { + t.Error("It should return an impression") + } + if len(impressionsForLog) != 0 { + t.Error("It should not return an impression") + } + + if impressionsForListener[0].Pt != 0 { + t.Error("It should not have pt associated") + } +} + +func TestImpManagerRedis(t *testing.T) { + observer, _ := strategy.NewImpressionObserver(5000) + debug := strategy.NewDebugImpl(observer, true) + impManager := NewImpressionManager(debug) + + now := time.Now().UTC().UnixNano() + imp1 := &dtos.Impression{ + BucketingKey: "someBucketingKey", + ChangeNumber: 123456789, + FeatureName: "someFeature", + KeyName: "someKey", + Label: "someLabel", + Time: now, + Treatment: "someTreatment", + } + + impressionsForLog, impressionsForListener := impManager.ProcessImpressions([]dtos.Impression{*imp1}) + if len(impressionsForListener) != 1 || len(impressionsForLog) != 1 { + t.Error("It should return an impression") + } + if impressionsForListener[0].Pt != 0 { + t.Error("It should not have pt associated") + } + + impressionsForLog, impressionsForListener = impManager.ProcessImpressions([]dtos.Impression{*imp1}) + if len(impressionsForListener) != 1 { + t.Error("It should return an impression") + } + if len(impressionsForLog) != 1 { + t.Error("It should return an impression") + } + if impressionsForListener[0].Pt == 0 { + t.Error("It should have pt") + } +} + +func TestProcess(t *testing.T) { + observer, _ := strategy.NewImpressionObserver(5000) + debug := strategy.NewDebugImpl(observer, true) + filter := filter.NewBloomFilter(3000, 0.01) + uniqueKeysStorage := mutexmap.NewMMUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + uniqueTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage) + counter := strategy.NewImpressionsCounter() + none := strategy.NewNoneImpl(counter, uniqueTracker, false) + + now := time.Now().UTC().UnixNano() + impressions := []dtos.Impression{ + { + BucketingKey: "someBucketingKey", + ChangeNumber: 123456789, + FeatureName: "someFeature", + KeyName: "someKey", + Label: "someLabel", + Time: now, + Treatment: "someTreatment", + Disabled: true, + }, + { + BucketingKey: "someBucketingKey", + ChangeNumber: 123456789, + FeatureName: "harnessFlag", + KeyName: "someKey", + Label: "someLabel", + Time: now, + Treatment: "someTreatment", + Disabled: true, + }, + { + BucketingKey: "someBucketingKey", + ChangeNumber: 123456789, + FeatureName: "featureTest", + KeyName: "someKey", + Label: "someLabel", + Time: now, + Treatment: "someTreatment", + Disabled: false, + }, + } + + impManager := NewImpressionManagerImp(none, debug) + impressionsForLog, impressionsForListener := impManager.Process(impressions, true) + if len(impressionsForListener) != 3 { + t.Error("Impressions for Listener should be 3. Actual: ", len(impressionsForListener)) + } + if len(impressionsForLog) != 1 { + t.Error("Impressions for Log should be 3. Actual: ", len(impressionsForLog)) + } + + impManager = NewImpressionManagerImp(none, none) + + impressionsForLog, impressionsForListener = impManager.Process(impressions, false) + if len(impressionsForListener) != 0 { + t.Error("Impressions for Listener should be 0. Actual: ", len(impressionsForListener)) + } + if len(impressionsForLog) != 0 { + t.Error("Impressions for Log should be 1. Actual: ", len(impressionsForLog)) + } +} diff --git a/provisional/strategy/none_test.go b/provisional/strategy/none_test.go index 56534e1c..ac263052 100644 --- a/provisional/strategy/none_test.go +++ b/provisional/strategy/none_test.go @@ -1,80 +1,84 @@ package strategy -// import ( -// "testing" -// "time" - -// "github.com/splitio/go-split-commons/v6/dtos" -// "github.com/splitio/go-split-commons/v6/storage/filter" -// "github.com/splitio/go-split-commons/v6/util" -// ) - -// func TestNoneMode(t *testing.T) { -// now := time.Now().UTC().UnixNano() -// filter := filter.NewBloomFilter(1000, 0.01) -// tracker := NewUniqueKeysTracker(filter) -// counter := NewImpressionsCounter() -// none := NewNoneImpl(counter, tracker, true) - -// imp := dtos.Impression{ -// BucketingKey: "someBuck", -// ChangeNumber: 123, -// KeyName: "someKey", -// Label: "someLabel", -// Time: now, -// Treatment: "on", -// FeatureName: "feature-test", -// } - -// toLog, toListener := none.Apply([]dtos.Impression{imp}) - -// if len(toLog) != 0 || len(toListener) != 1 { -// t.Error("Should not have to log") -// } - -// toLog, toListener = none.Apply([]dtos.Impression{imp}) - -// if len(toLog) != 0 || len(toListener) != 1 { -// t.Error("Should not have to log") -// } - -// counts := counter.PopAll() -// value := counts[Key{ -// FeatureName: imp.FeatureName, -// TimeFrame: util.TruncateTimeFrame(now), -// }] - -// if value != 2 { -// t.Error("Should be 2") -// } -// } - -// func TestApplySingleNone(t *testing.T) { -// now := time.Now().UTC().UnixNano() -// filter := filter.NewBloomFilter(1000, 0.01) -// tracker := NewUniqueKeysTracker(filter) -// counter := NewImpressionsCounter() -// none := NewNoneImpl(counter, tracker, true) - -// imp := dtos.Impression{ -// BucketingKey: "someBuck", -// ChangeNumber: 123, -// KeyName: "someKey", -// Label: "someLabel", -// Time: now, -// Treatment: "on", -// FeatureName: "feature-test", -// } - -// toLog := none.ApplySingle(&imp) - -// if toLog { -// t.Error("Should be false") -// } - -// toLog = none.ApplySingle(&imp) - -// if toLog { -// t.Error("Should be false") -// } -// } +import ( + "testing" + "time" + + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-split-commons/v6/storage/filter" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap" + "github.com/splitio/go-split-commons/v6/util" + "github.com/splitio/go-toolkit/v5/logging" +) + +func TestNoneMode(t *testing.T) { + now := time.Now().UTC().UnixNano() + filter := filter.NewBloomFilter(1000, 0.01) + uniqueKeysStorage := mutexmap.NewMMUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + tracker := NewUniqueKeysTracker(filter, uniqueKeysStorage) + counter := NewImpressionsCounter() + none := NewNoneImpl(counter, tracker, true) + + imp := dtos.Impression{ + BucketingKey: "someBuck", + ChangeNumber: 123, + KeyName: "someKey", + Label: "someLabel", + Time: now, + Treatment: "on", + FeatureName: "feature-test", + } + + toLog, toListener := none.Apply([]dtos.Impression{imp}) + + if len(toLog) != 0 || len(toListener) != 1 { + t.Error("Should not have to log") + } + + toLog, toListener = none.Apply([]dtos.Impression{imp}) + + if len(toLog) != 0 || len(toListener) != 1 { + t.Error("Should not have to log") + } + + counts := counter.PopAll() + value := counts[Key{ + FeatureName: imp.FeatureName, + TimeFrame: util.TruncateTimeFrame(now), + }] + + if value != 2 { + t.Error("Should be 2") + } +} + +func TestApplySingleNone(t *testing.T) { + now := time.Now().UTC().UnixNano() + filter := filter.NewBloomFilter(1000, 0.01) + uniqueKeysStorage := mutexmap.NewMMUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + tracker := NewUniqueKeysTracker(filter, uniqueKeysStorage) + counter := NewImpressionsCounter() + none := NewNoneImpl(counter, tracker, true) + + imp := dtos.Impression{ + BucketingKey: "someBuck", + ChangeNumber: 123, + KeyName: "someKey", + Label: "someLabel", + Time: now, + Treatment: "on", + FeatureName: "feature-test", + } + + toLog := none.ApplySingle(&imp) + + if toLog { + t.Error("Should be false") + } + + toLog = none.ApplySingle(&imp) + + if toLog { + t.Error("Should be false") + } +} diff --git a/provisional/strategy/uniquekeystracker.go b/provisional/strategy/uniquekeystracker.go index 86b6dae4..ebd0b480 100644 --- a/provisional/strategy/uniquekeystracker.go +++ b/provisional/strategy/uniquekeystracker.go @@ -2,7 +2,6 @@ package strategy import ( "github.com/splitio/go-split-commons/v6/storage" - "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap" ) // UniqueKeysTracker interface @@ -12,15 +11,15 @@ type UniqueKeysTracker interface { // UniqueKeysTrackerImpl description type UniqueKeysTrackerImpl struct { - filter storage.Filter - cache *mutexmap.MMUniqueKeysStorage + filter storage.Filter + storage storage.UniqueKeysStorageProducer } // NewUniqueKeysTracker create new implementation -func NewUniqueKeysTracker(f storage.Filter, cache *mutexmap.MMUniqueKeysStorage) UniqueKeysTracker { +func NewUniqueKeysTracker(f storage.Filter, storage storage.UniqueKeysStorageProducer) UniqueKeysTracker { return &UniqueKeysTrackerImpl{ - filter: f, - cache: cache, + filter: f, + storage: storage, } } @@ -32,7 +31,7 @@ func (t *UniqueKeysTrackerImpl) Track(featureName string, key string) bool { } t.filter.Add(fKey) - t.cache.Add(featureName, key) + t.storage.Add(featureName, key) return true } diff --git a/storage/inmemory/mutexmap/uniquekeys.go b/storage/inmemory/mutexmap/uniquekeys.go index 02c7e37c..662ed2de 100644 --- a/storage/inmemory/mutexmap/uniquekeys.go +++ b/storage/inmemory/mutexmap/uniquekeys.go @@ -10,24 +10,22 @@ import ( ) type MMUniqueKeysStorage struct { - data map[string]*set.ThreadUnsafeSet - maxSize int64 - size int64 - mutex *sync.RWMutex - fullChan chan string //only write channel - logger logging.LoggerInterface - runtimeTelemetry storage.TelemetryRuntimeProducer + data map[string]*set.ThreadUnsafeSet + maxSize int64 + size int64 + mutex *sync.RWMutex + fullChan chan string //only write channel + logger logging.LoggerInterface } -func NewMMUniqueKeysStorage(maxSize int64, isFull chan string, logger logging.LoggerInterface, runtimeTelemetry storage.TelemetryRuntimeProducer) *MMUniqueKeysStorage { +func NewMMUniqueKeysStorage(maxSize int64, isFull chan string, logger logging.LoggerInterface) *MMUniqueKeysStorage { return &MMUniqueKeysStorage{ - data: make(map[string]*set.ThreadUnsafeSet), - maxSize: maxSize, - size: 0, - mutex: &sync.RWMutex{}, - fullChan: isFull, - logger: logger, - runtimeTelemetry: runtimeTelemetry, + data: make(map[string]*set.ThreadUnsafeSet), + maxSize: maxSize, + size: 0, + mutex: &sync.RWMutex{}, + fullChan: isFull, + logger: logger, } } diff --git a/storage/mocks/uniquekeys.go b/storage/mocks/uniquekeys.go index bb020474..cae9b995 100644 --- a/storage/mocks/uniquekeys.go +++ b/storage/mocks/uniquekeys.go @@ -2,22 +2,17 @@ package mocks import ( "github.com/splitio/go-split-commons/v6/dtos" - "github.com/splitio/go-split-commons/v6/storage" - "github.com/stretchr/testify/mock" ) type MockUniqueKeysStorage struct { - mock.Mock + AddCall func(featureName string, key string) + PopAllCall func() dtos.Uniques } -func (u *MockUniqueKeysStorage) Add(featureName string, key string) { - u.Called(featureName, key) +func (m MockUniqueKeysStorage) Add(featureName string, key string) { + m.AddCall(featureName, key) } -func (u *MockUniqueKeysStorage) PopAll() dtos.Uniques { - args := u.Called() - return args.Get(0).(dtos.Uniques) +func (m MockUniqueKeysStorage) PopAll() dtos.Uniques { + return m.PopAllCall() } - -var _ storage.UniqueKeysStorageConsumer = (*MockUniqueKeysStorage)(nil) -var _ storage.UniqueKeysStorageProducer = (*MockUniqueKeysStorage)(nil) diff --git a/tasks/uniquekeyssync_test.go b/tasks/uniquekeyssync_test.go index f656df35..d002321e 100644 --- a/tasks/uniquekeyssync_test.go +++ b/tasks/uniquekeyssync_test.go @@ -1,111 +1,113 @@ package tasks -// import ( -// "testing" -// "time" +import ( + "testing" + "time" -// "github.com/splitio/go-split-commons/v6/dtos" -// "github.com/splitio/go-split-commons/v6/provisional/strategy" -// "github.com/splitio/go-split-commons/v6/service/mocks" -// st "github.com/splitio/go-split-commons/v6/storage/mocks" -// "github.com/splitio/go-split-commons/v6/telemetry" -// "github.com/splitio/go-toolkit/v5/datastructures/set" -// "github.com/splitio/go-toolkit/v5/logging" -// ) + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-split-commons/v6/provisional/strategy" + "github.com/splitio/go-split-commons/v6/service/mocks" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap" + st "github.com/splitio/go-split-commons/v6/storage/mocks" + "github.com/splitio/go-split-commons/v6/telemetry" + "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/splitio/go-toolkit/v5/logging" +) -// func TestUniqueKeysTask(t *testing.T) { -// var call int64 +func TestUniqueKeysTask(t *testing.T) { + var call int64 -// mockedTelemetryStorage := st.MockTelemetryStorage{ -// PopLatenciesCall: func() dtos.MethodLatencies { return dtos.MethodLatencies{} }, -// PopExceptionsCall: func() dtos.MethodExceptions { return dtos.MethodExceptions{} }, -// GetLastSynchronizationCall: func() dtos.LastSynchronization { return dtos.LastSynchronization{} }, -// PopHTTPErrorsCall: func() dtos.HTTPErrors { return dtos.HTTPErrors{} }, -// PopHTTPLatenciesCall: func() dtos.HTTPLatencies { return dtos.HTTPLatencies{} }, -// GetImpressionsStatsCall: func(dataType int) int64 { return 0 }, -// GetEventsStatsCall: func(dataType int) int64 { return 0 }, -// PopTokenRefreshesCall: func() int64 { return 0 }, -// PopAuthRejectionsCall: func() int64 { return 0 }, -// PopStreamingEventsCall: func() []dtos.StreamingEvent { return []dtos.StreamingEvent{} }, -// GetSessionLengthCall: func() int64 { return 0 }, -// PopTagsCall: func() []string { return []string{} }, -// RecordSuccessfulSyncCall: func(resource int, tm time.Time) {}, -// RecordSyncLatencyCall: func(resource int, latency time.Duration) {}, -// RecordUniqueKeysCall: func(uniques dtos.Uniques) error { -// return nil -// }, -// } -// mockedTelemetryHTTP := mocks.MockTelemetryRecorder{ -// RecordStatsCall: func(stats dtos.Stats, metadata dtos.Metadata) error { -// return nil -// }, -// RecordUniqueKeysCall: func(uniques dtos.Uniques, metadata dtos.Metadata) error { -// if len(uniques.Keys) != 2 { -// t.Error("Should be 2") -// } -// call++ -// return nil -// }, -// } -// mockedSplitStorage := st.MockSplitStorage{ -// SplitNamesCall: func() []string { return []string{} }, -// SegmentNamesCall: func() *set.ThreadUnsafeSet { return set.NewSet() }, -// } -// mockedSegmentStorage := st.MockSegmentStorage{ -// SegmentKeysCountCall: func() int64 { return 10 }, -// } + mockedTelemetryStorage := st.MockTelemetryStorage{ + PopLatenciesCall: func() dtos.MethodLatencies { return dtos.MethodLatencies{} }, + PopExceptionsCall: func() dtos.MethodExceptions { return dtos.MethodExceptions{} }, + GetLastSynchronizationCall: func() dtos.LastSynchronization { return dtos.LastSynchronization{} }, + PopHTTPErrorsCall: func() dtos.HTTPErrors { return dtos.HTTPErrors{} }, + PopHTTPLatenciesCall: func() dtos.HTTPLatencies { return dtos.HTTPLatencies{} }, + GetImpressionsStatsCall: func(dataType int) int64 { return 0 }, + GetEventsStatsCall: func(dataType int) int64 { return 0 }, + PopTokenRefreshesCall: func() int64 { return 0 }, + PopAuthRejectionsCall: func() int64 { return 0 }, + PopStreamingEventsCall: func() []dtos.StreamingEvent { return []dtos.StreamingEvent{} }, + GetSessionLengthCall: func() int64 { return 0 }, + PopTagsCall: func() []string { return []string{} }, + RecordSuccessfulSyncCall: func(resource int, tm time.Time) {}, + RecordSyncLatencyCall: func(resource int, latency time.Duration) {}, + RecordUniqueKeysCall: func(uniques dtos.Uniques) error { + return nil + }, + } + mockedTelemetryHTTP := mocks.MockTelemetryRecorder{ + RecordStatsCall: func(stats dtos.Stats, metadata dtos.Metadata) error { + return nil + }, + RecordUniqueKeysCall: func(uniques dtos.Uniques, metadata dtos.Metadata) error { + if len(uniques.Keys) != 2 { + t.Error("Should be 2") + } + call++ + return nil + }, + } + mockedSplitStorage := st.MockSplitStorage{ + SplitNamesCall: func() []string { return []string{} }, + SegmentNamesCall: func() *set.ThreadUnsafeSet { return set.NewSet() }, + } + mockedSegmentStorage := st.MockSegmentStorage{ + SegmentKeysCountCall: func() int64 { return 10 }, + } -// synchronizer := telemetry.NewTelemetrySynchronizer( -// mockedTelemetryStorage, -// mockedTelemetryHTTP, -// mockedSplitStorage, -// mockedSegmentStorage, -// logging.NewLogger(&logging.LoggerOptions{}), -// dtos.Metadata{}, -// mockedTelemetryStorage, -// ) + uniqueKeys := mutexmap.NewMMUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) -// filter := st.MockFilter{ -// AddCall: func(data string) {}, -// ContainsCall: func(data string) bool { return false }, -// ClearCall: func() {}, -// } -// tracker := strategy.NewUniqueKeysTracker(filter) -// task := NewRecordUniqueKeysTask( -// synchronizer, -// tracker, -// 2, -// logging.NewLogger(&logging.LoggerOptions{}), -// ) + synchronizer := telemetry.NewTelemetrySynchronizer( + mockedTelemetryStorage, + mockedTelemetryHTTP, + mockedSplitStorage, + mockedSegmentStorage, + logging.NewLogger(&logging.LoggerOptions{}), + dtos.Metadata{}, + mockedTelemetryStorage, + uniqueKeys, + ) -// if !tracker.Track("tratment-1", "key-1") { -// t.Error("Should be true") -// } -// if !tracker.Track("tratment-1", "key-2") { -// t.Error("Should be true") -// } -// if !tracker.Track("tratment-2", "key-1") { -// t.Error("Should be true") -// } -// if !tracker.Track("tratment-2", "key-2") { -// t.Error("Should be true") -// } + filter := st.MockFilter{ + AddCall: func(data string) {}, + ContainsCall: func(data string) bool { return false }, + ClearCall: func() {}, + } -// task.Start() -// time.Sleep(3 * time.Second) + tracker := strategy.NewUniqueKeysTracker(filter, uniqueKeys) + task := NewRecordUniqueKeysTask( + synchronizer, + 2, + logging.NewLogger(&logging.LoggerOptions{}), + ) -// if !task.IsRunning() { -// t.Error("UniqueKeys task should be running") -// } + if !tracker.Track("tratment-1", "key-1") { + t.Error("Should be true") + } + if !tracker.Track("tratment-1", "key-2") { + t.Error("Should be true") + } + if !tracker.Track("tratment-2", "key-1") { + t.Error("Should be true") + } + if !tracker.Track("tratment-2", "key-2") { + t.Error("Should be true") + } -// task.Stop(true) -// if call != 1 { -// t.Error("Request not received") -// } + task.Start() + time.Sleep(3 * time.Second) -// if task.IsRunning() { -// t.Error("Task should be stopped") -// } + if !task.IsRunning() { + t.Error("UniqueKeys task should be running") + } -// t.Error(tracker.PopAll().Keys) -// } + task.Stop(true) + if call != 1 { + t.Error("Request not received") + } + + if task.IsRunning() { + t.Error("Task should be stopped") + } +} From 607b3bf03f054a7a8df02d61baff96a92ec13fe0 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Thu, 27 Mar 2025 13:59:51 -0300 Subject: [PATCH 06/15] polishing --- conf/conf.go | 2 + dtos/telemetry.go | 4 +- provisional/impmanager_test.go | 6 +- provisional/strategy/none_test.go | 6 +- provisional/strategy/uniquekeystracker.go | 2 +- .../strategy/uniquekeystracker_test.go | 66 +++++------ service/api/http_recorders_test.go | 8 +- storage/inmemory/mutexmap/uniquekeys.go | 97 ---------------- storage/inmemory/mutexqueue/uniquekeys.go | 104 ++++++++++++++++++ .../inmemory/mutexqueue/uniquekeys_test.go | 83 ++++++++++++++ storage/interfaces.go | 4 +- storage/mocks/uniquekeys.go | 12 +- storage/redis/telemetry_test.go | 4 +- storage/redis/uniquekeys_test.go | 6 +- synchronizer/synchronizer.go | 4 +- tasks/uniquekeyssync.go | 5 +- tasks/uniquekeyssync_test.go | 5 +- telemetry/interface.go | 2 +- telemetry/localhost.go | 2 +- telemetry/memory.go | 4 +- telemetry/redis.go | 4 +- 21 files changed, 264 insertions(+), 166 deletions(-) delete mode 100644 storage/inmemory/mutexmap/uniquekeys.go create mode 100644 storage/inmemory/mutexqueue/uniquekeys.go create mode 100644 storage/inmemory/mutexqueue/uniquekeys_test.go diff --git a/conf/conf.go b/conf/conf.go index e84026fd..e96b71aa 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -77,6 +77,8 @@ type AdvancedConfig struct { EventsQueueSize int ImpressionsQueueSize int ImpressionsBulkSize int64 + UniqueKeysQueueSize int64 + UniqueKeysBulkSize int64 StreamingEnabled bool AuthServiceURL string StreamingServiceURL string diff --git a/dtos/telemetry.go b/dtos/telemetry.go index 1de529ab..afa9062d 100644 --- a/dtos/telemetry.go +++ b/dtos/telemetry.go @@ -143,8 +143,8 @@ type Stats struct { // Key struct type Key struct { - Feature string `json:"f,omitempty"` - Keys []string `json:"ks,omitempty"` + Feature string `json:"f,omitempty"` + Keys []interface{} `json:"ks,omitempty"` } // Uniques struct diff --git a/provisional/impmanager_test.go b/provisional/impmanager_test.go index 1c3befe2..ce87de80 100644 --- a/provisional/impmanager_test.go +++ b/provisional/impmanager_test.go @@ -8,7 +8,7 @@ import ( "github.com/splitio/go-split-commons/v6/provisional/strategy" "github.com/splitio/go-split-commons/v6/storage/filter" "github.com/splitio/go-split-commons/v6/storage/inmemory" - "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue" "github.com/splitio/go-split-commons/v6/telemetry" "github.com/splitio/go-toolkit/v5/logging" ) @@ -124,7 +124,7 @@ func TestImpManagerInMemoryOptimized(t *testing.T) { func TestImpManagerInMemoryNone(t *testing.T) { counter := strategy.NewImpressionsCounter() filter := filter.NewBloomFilter(3000, 0.01) - uniqueKeysStorage := mutexmap.NewMMUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) uniqueTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage) none := strategy.NewNoneImpl(counter, uniqueTracker, true) impManager := NewImpressionManager(none) @@ -193,7 +193,7 @@ func TestProcess(t *testing.T) { observer, _ := strategy.NewImpressionObserver(5000) debug := strategy.NewDebugImpl(observer, true) filter := filter.NewBloomFilter(3000, 0.01) - uniqueKeysStorage := mutexmap.NewMMUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) uniqueTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage) counter := strategy.NewImpressionsCounter() none := strategy.NewNoneImpl(counter, uniqueTracker, false) diff --git a/provisional/strategy/none_test.go b/provisional/strategy/none_test.go index ac263052..7fd40c8c 100644 --- a/provisional/strategy/none_test.go +++ b/provisional/strategy/none_test.go @@ -6,7 +6,7 @@ import ( "github.com/splitio/go-split-commons/v6/dtos" "github.com/splitio/go-split-commons/v6/storage/filter" - "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue" "github.com/splitio/go-split-commons/v6/util" "github.com/splitio/go-toolkit/v5/logging" ) @@ -14,7 +14,7 @@ import ( func TestNoneMode(t *testing.T) { now := time.Now().UTC().UnixNano() filter := filter.NewBloomFilter(1000, 0.01) - uniqueKeysStorage := mutexmap.NewMMUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) tracker := NewUniqueKeysTracker(filter, uniqueKeysStorage) counter := NewImpressionsCounter() none := NewNoneImpl(counter, tracker, true) @@ -55,7 +55,7 @@ func TestNoneMode(t *testing.T) { func TestApplySingleNone(t *testing.T) { now := time.Now().UTC().UnixNano() filter := filter.NewBloomFilter(1000, 0.01) - uniqueKeysStorage := mutexmap.NewMMUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) tracker := NewUniqueKeysTracker(filter, uniqueKeysStorage) counter := NewImpressionsCounter() none := NewNoneImpl(counter, tracker, true) diff --git a/provisional/strategy/uniquekeystracker.go b/provisional/strategy/uniquekeystracker.go index ebd0b480..7bae3855 100644 --- a/provisional/strategy/uniquekeystracker.go +++ b/provisional/strategy/uniquekeystracker.go @@ -31,7 +31,7 @@ func (t *UniqueKeysTrackerImpl) Track(featureName string, key string) bool { } t.filter.Add(fKey) - t.storage.Add(featureName, key) + t.storage.Push(featureName, key) return true } diff --git a/provisional/strategy/uniquekeystracker_test.go b/provisional/strategy/uniquekeystracker_test.go index 5e2e1cc3..af2cdb5c 100644 --- a/provisional/strategy/uniquekeystracker_test.go +++ b/provisional/strategy/uniquekeystracker_test.go @@ -1,34 +1,36 @@ package strategy -// import ( -// "fmt" -// "testing" - -// "github.com/splitio/go-split-commons/v6/storage/filter" -// ) - -// func Test(t *testing.T) { -// bf := filter.NewBloomFilter(10000, 0.01) - -// tracker := NewUniqueKeysTracker(bf) - -// for i := 0; i < 10; i++ { -// if !tracker.Track("feature-1", "key-"+fmt.Sprint(i)) { -// t.Error("Should be true") -// } -// } - -// for i := 0; i < 10; i++ { -// if !tracker.Track("feature-2", "key-"+fmt.Sprint(i)) { -// t.Error("Should be true") -// } -// } - -// if tracker.Track("feature-2", "key-4") { -// t.Error("Should be false") -// } - -// if tracker.Track("feature-1", "key-4") { -// t.Error("Should be false") -// } -// } +import ( + "fmt" + "testing" + + "github.com/splitio/go-split-commons/v6/storage/filter" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue" + "github.com/splitio/go-toolkit/v5/logging" +) + +func TestUniqueKeysTracker(t *testing.T) { + bf := filter.NewBloomFilter(10000, 0.01) + uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + tracker := NewUniqueKeysTracker(bf, uniqueKeysStorage) + + for i := 0; i < 10; i++ { + if !tracker.Track("feature-1", "key-"+fmt.Sprint(i)) { + t.Error("Should be true") + } + } + + for i := 0; i < 10; i++ { + if !tracker.Track("feature-2", "key-"+fmt.Sprint(i)) { + t.Error("Should be true") + } + } + + if tracker.Track("feature-2", "key-4") { + t.Error("Should be false") + } + + if tracker.Track("feature-1", "key-4") { + t.Error("Should be false") + } +} diff --git a/service/api/http_recorders_test.go b/service/api/http_recorders_test.go index 04b311fd..c3c22cd3 100644 --- a/service/api/http_recorders_test.go +++ b/service/api/http_recorders_test.go @@ -237,11 +237,11 @@ func TestJsonUniqueKeys(t *testing.T) { Keys: []dtos.Key{ { Feature: "feature-1", - Keys: []string{"key-1", "key-2"}, + Keys: []interface{}{"key-1", "key-2"}, }, { Feature: "feature-2", - Keys: []string{"key-10", "key-20"}, + Keys: []interface{}{"key-10", "key-20"}, }, }, } @@ -317,11 +317,11 @@ func TestPostUniqueKeys(t *testing.T) { keys := []dtos.Key{ { Feature: "feature-1", - Keys: []string{"key-1", "key-2"}, + Keys: []interface{}{"key-1", "key-2"}, }, { Feature: "feature-2", - Keys: []string{"key-3", "key-4"}, + Keys: []interface{}{"key-3", "key-4"}, }, } err := telemetryRecorder.RecordUniqueKeys(dtos.Uniques{ diff --git a/storage/inmemory/mutexmap/uniquekeys.go b/storage/inmemory/mutexmap/uniquekeys.go deleted file mode 100644 index 662ed2de..00000000 --- a/storage/inmemory/mutexmap/uniquekeys.go +++ /dev/null @@ -1,97 +0,0 @@ -package mutexmap - -import ( - "sync" - - "github.com/splitio/go-split-commons/v6/dtos" - "github.com/splitio/go-split-commons/v6/storage" - "github.com/splitio/go-toolkit/v5/datastructures/set" - "github.com/splitio/go-toolkit/v5/logging" -) - -type MMUniqueKeysStorage struct { - data map[string]*set.ThreadUnsafeSet - maxSize int64 - size int64 - mutex *sync.RWMutex - fullChan chan string //only write channel - logger logging.LoggerInterface -} - -func NewMMUniqueKeysStorage(maxSize int64, isFull chan string, logger logging.LoggerInterface) *MMUniqueKeysStorage { - return &MMUniqueKeysStorage{ - data: make(map[string]*set.ThreadUnsafeSet), - maxSize: maxSize, - size: 0, - mutex: &sync.RWMutex{}, - fullChan: isFull, - logger: logger, - } -} - -func (s *MMUniqueKeysStorage) Add(featureName string, key string) { - s.mutex.Lock() - defer s.mutex.Unlock() - - s.size++ - _, ok := s.data[featureName] - if !ok { - s.data[featureName] = set.NewSet() - } - - s.data[featureName].Add(key) - - if s.size >= s.maxSize { - s.sendSignalIsFull() - } -} - -func (s *MMUniqueKeysStorage) PopAll() dtos.Uniques { - s.mutex.Lock() - defer s.mutex.Unlock() - - toReturn := s.data - s.data = make(map[string]*set.ThreadUnsafeSet) - s.size = 0 - - result := getUniqueKeysDto(toReturn) - - return result -} - -func (s *MMUniqueKeysStorage) sendSignalIsFull() { - // Nom blocking select - select { - case s.fullChan <- "UNIQUE_KEYS_FULL": - // Send "queue is full" signal - break - default: - s.logger.Debug("Some error occurred on sending signal for unique keys") - } -} - -func getUniqueKeysDto(uniques map[string]*set.ThreadUnsafeSet) dtos.Uniques { - uniqueKeys := dtos.Uniques{ - Keys: make([]dtos.Key, 0, len(uniques)), - } - - for name, keys := range uniques { - list := keys.List() - keysDto := make([]string, 0, len(list)) - - for _, value := range list { - keysDto = append(keysDto, value.(string)) - } - keyDto := dtos.Key{ - Feature: name, - Keys: keysDto, - } - - uniqueKeys.Keys = append(uniqueKeys.Keys, keyDto) - } - - return uniqueKeys -} - -var _ storage.UniqueKeysStorageConsumer = (*MMUniqueKeysStorage)(nil) -var _ storage.UniqueKeysStorageProducer = (*MMUniqueKeysStorage)(nil) diff --git a/storage/inmemory/mutexqueue/uniquekeys.go b/storage/inmemory/mutexqueue/uniquekeys.go new file mode 100644 index 00000000..18121a0c --- /dev/null +++ b/storage/inmemory/mutexqueue/uniquekeys.go @@ -0,0 +1,104 @@ +package mutexqueue + +import ( + "container/list" + "sync" + + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-split-commons/v6/storage" + "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/splitio/go-toolkit/v5/logging" +) + +type UniqueKeyWrapper struct { + featureName string + key string +} + +type MQUniqueKeysStorage struct { + queue *list.List + maxSize int64 + mutexQueue *sync.Mutex + fullChan chan string //only write channel + logger logging.LoggerInterface +} + +func NewMQUniqueKeysStorage(maxSize int64, isFull chan string, logger logging.LoggerInterface) *MQUniqueKeysStorage { + return &MQUniqueKeysStorage{ + queue: list.New(), + maxSize: maxSize, + mutexQueue: &sync.Mutex{}, + fullChan: isFull, + logger: logger, + } +} + +func (s *MQUniqueKeysStorage) Push(featureName string, key string) { + s.mutexQueue.Lock() + defer s.mutexQueue.Unlock() + + s.queue.PushBack(UniqueKeyWrapper{featureName: featureName, key: key}) + if s.queue.Len() == int(s.maxSize) { + s.sendSignalIsFull() + } +} + +func (s *MQUniqueKeysStorage) PopN(n int64) dtos.Uniques { + s.mutexQueue.Lock() + defer s.mutexQueue.Unlock() + + var totalItems int + if int64(s.queue.Len()) >= n { + totalItems = int(n) + } else { + totalItems = s.queue.Len() + } + + uniques := make(map[string]*set.ThreadUnsafeSet) + for i := 0; i < totalItems; i++ { + item, ok := s.queue.Remove(s.queue.Front()).(UniqueKeyWrapper) + if !ok { + continue + } + + _, exists := uniques[item.featureName] + if !exists { + uniques[item.featureName] = set.NewSet() + } + + uniques[item.featureName].Add(item.key) + } + + return getUniqueKeysDto(uniques) +} + +func (s *MQUniqueKeysStorage) sendSignalIsFull() { + // Nom blocking select + select { + case s.fullChan <- "UNIQUE_KEYS_FULL": + // Send "queue is full" signal + break + default: + s.logger.Debug("Some error occurred on sending signal for unique keys") + } +} + +func getUniqueKeysDto(uniques map[string]*set.ThreadUnsafeSet) dtos.Uniques { + keysToReturn := make([]dtos.Key, 0) + + for name, keys := range uniques { + keyDto := dtos.Key{ + Feature: name, + Keys: keys.List(), + } + + keysToReturn = append(keysToReturn, keyDto) + } + + return dtos.Uniques{ + Keys: keysToReturn, + } +} + +var _ storage.UniqueKeysStorageConsumer = (*MQUniqueKeysStorage)(nil) +var _ storage.UniqueKeysStorageProducer = (*MQUniqueKeysStorage)(nil) diff --git a/storage/inmemory/mutexqueue/uniquekeys_test.go b/storage/inmemory/mutexqueue/uniquekeys_test.go new file mode 100644 index 00000000..430d8e63 --- /dev/null +++ b/storage/inmemory/mutexqueue/uniquekeys_test.go @@ -0,0 +1,83 @@ +package mutexqueue + +import ( + "testing" + + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-toolkit/v5/logging" +) + +func TestMQUniqueKeysStorage(t *testing.T) { + isFull := make(chan string, 1) + logger := logging.NewLogger(&logging.LoggerOptions{}) + storage := NewMQUniqueKeysStorage(5, isFull, logger) + + // Push some items into the queue + storage.Push("feature-1", "key-1") + storage.Push("feature-1", "key-2") + storage.Push("feature-2", "key-3") + storage.Push("feature-2", "key-4") + storage.Push("feature-3", "key-5") + + // Test PopN with n less than the queue size + result := storage.PopN(3) + if len(result.Keys) != 2 { + t.Errorf("Expected 2 feature groups, got %d", len(result.Keys)) + } + + // Validate feature-1 keys + feature1Keys := findFeatureKeys(result.Keys, "feature-1") + if len(feature1Keys) != 2 || !contains(feature1Keys, "key-1") || !contains(feature1Keys, "key-2") { + t.Errorf("Unexpected keys for feature-1: %v", feature1Keys) + } + + // Validate feature-2 keys + feature2Keys := findFeatureKeys(result.Keys, "feature-2") + if len(feature2Keys) != 1 || !contains(feature2Keys, "key-3") { + t.Errorf("Unexpected keys for feature-2: %v", feature2Keys) + } + + // Test PopN with n greater than the remaining queue size + result = storage.PopN(5) + if len(result.Keys) != 2 { + t.Errorf("Expected 2 feature groups, got %d", len(result.Keys)) + } + + // Validate feature-2 keys + feature2Keys = findFeatureKeys(result.Keys, "feature-2") + if len(feature2Keys) != 1 || !contains(feature2Keys, "key-4") { + t.Errorf("Unexpected keys for feature-2: %v", feature2Keys) + } + + // Validate feature-3 keys + feature3Keys := findFeatureKeys(result.Keys, "feature-3") + if len(feature3Keys) != 1 || !contains(feature3Keys, "key-5") { + t.Errorf("Unexpected keys for feature-3: %v", feature3Keys) + } + + // Test PopN with an empty queue + result = storage.PopN(1) + if len(result.Keys) != 0 { + t.Errorf("Expected 0 feature groups, got %d", len(result.Keys)) + } +} + +// Helper function to find keys for a specific feature +func findFeatureKeys(keys []dtos.Key, feature string) []interface{} { + for _, key := range keys { + if key.Feature == feature { + return key.Keys + } + } + return nil +} + +// Helper function to check if a slice contains a specific value +func contains(slice []interface{}, value string) bool { + for _, v := range slice { + if v == value { + return true + } + } + return false +} diff --git a/storage/interfaces.go b/storage/interfaces.go index d12e065b..777f97be 100644 --- a/storage/interfaces.go +++ b/storage/interfaces.go @@ -267,11 +267,11 @@ type LargeSegmentsStorage interface { } type UniqueKeysStorageConsumer interface { - PopAll() dtos.Uniques + PopN(n int64) dtos.Uniques } type UniqueKeysStorageProducer interface { - Add(featureName string, key string) + Push(featureName string, key string) } type UniqueKeysStorage interface { diff --git a/storage/mocks/uniquekeys.go b/storage/mocks/uniquekeys.go index cae9b995..df3018ec 100644 --- a/storage/mocks/uniquekeys.go +++ b/storage/mocks/uniquekeys.go @@ -5,14 +5,14 @@ import ( ) type MockUniqueKeysStorage struct { - AddCall func(featureName string, key string) - PopAllCall func() dtos.Uniques + PushCall func(featureName string, key string) + PopNCall func(bulkSize int64) dtos.Uniques } -func (m MockUniqueKeysStorage) Add(featureName string, key string) { - m.AddCall(featureName, key) +func (m MockUniqueKeysStorage) Push(featureName string, key string) { + m.PushCall(featureName, key) } -func (m MockUniqueKeysStorage) PopAll() dtos.Uniques { - return m.PopAllCall() +func (m MockUniqueKeysStorage) PopN(bulkSize int64) dtos.Uniques { + return m.PopNCall(bulkSize) } diff --git a/storage/redis/telemetry_test.go b/storage/redis/telemetry_test.go index f99f81c6..c2ac217a 100644 --- a/storage/redis/telemetry_test.go +++ b/storage/redis/telemetry_test.go @@ -191,11 +191,11 @@ func TestRecordUniqueKeys(t *testing.T) { Keys: []dtos.Key{ { Feature: "feature-1", - Keys: []string{"key-1", "key-2"}, + Keys: []interface{}{"key-1", "key-2"}, }, { Feature: "feature-2", - Keys: []string{"key-1", "key-2"}, + Keys: []interface{}{"key-1", "key-2"}, }, }, }) diff --git a/storage/redis/uniquekeys_test.go b/storage/redis/uniquekeys_test.go index 2563b8d1..ce3c9608 100644 --- a/storage/redis/uniquekeys_test.go +++ b/storage/redis/uniquekeys_test.go @@ -47,15 +47,15 @@ func TestPopNRaw(t *testing.T) { Keys: []dtos.Key{ { Feature: "feature-test-1", - Keys: []string{"key-1", "key-2", "key-3"}, + Keys: []interface{}{"key-1", "key-2", "key-3"}, }, { Feature: "feature-test-2", - Keys: []string{"key-1", "key-2", "key-3"}, + Keys: []interface{}{"key-1", "key-2", "key-3"}, }, { Feature: "feature-test-3", - Keys: []string{"key-1", "key-2", "key-3"}, + Keys: []interface{}{"key-1", "key-2", "key-3"}, }, }, } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 6892f103..dca190fb 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -66,6 +66,7 @@ type SynchronizerImpl struct { inMememoryFullQueue chan string impressionBulkSize int64 eventBulkSize int64 + uniqueKeysBulkSize int64 splitsRefreshRate int segmentsRefreshRate int httpTiemoutSecs int @@ -84,6 +85,7 @@ func NewSynchronizer( sync := &SynchronizerImpl{ impressionBulkSize: confAdvanced.ImpressionsBulkSize, eventBulkSize: confAdvanced.EventsBulkSize, + uniqueKeysBulkSize: confAdvanced.UniqueKeysBulkSize, splitTasks: splitTasks, workers: workers, logger: logger, @@ -256,7 +258,7 @@ func (s *SynchronizerImpl) dataFlusher() { } case "UNIQUE_KEYS_FULL": s.logger.Debug("FLUSHING Unique Keys storage") - s.workers.TelemetryRecorder.SynchronizeUniqueKeys() + s.workers.TelemetryRecorder.SynchronizeUniqueKeys(s.uniqueKeysBulkSize) } } } diff --git a/tasks/uniquekeyssync.go b/tasks/uniquekeyssync.go index 21cdf1af..0cb390e3 100644 --- a/tasks/uniquekeyssync.go +++ b/tasks/uniquekeyssync.go @@ -11,13 +11,14 @@ func NewRecordUniqueKeysTask( recorder telemetry.TelemetrySynchronizer, period int, logger logging.LoggerInterface, + bulkSize int64, ) *asynctask.AsyncTask { record := func(logger logging.LoggerInterface) error { - return recorder.SynchronizeUniqueKeys() + return recorder.SynchronizeUniqueKeys(bulkSize) } onStop := func(logger logging.LoggerInterface) { - recorder.SynchronizeUniqueKeys() + recorder.SynchronizeUniqueKeys(bulkSize) } return asynctask.NewAsyncTask("SubmitUniqueKeys", record, period, nil, onStop, logger) diff --git a/tasks/uniquekeyssync_test.go b/tasks/uniquekeyssync_test.go index d002321e..d130037c 100644 --- a/tasks/uniquekeyssync_test.go +++ b/tasks/uniquekeyssync_test.go @@ -7,7 +7,7 @@ import ( "github.com/splitio/go-split-commons/v6/dtos" "github.com/splitio/go-split-commons/v6/provisional/strategy" "github.com/splitio/go-split-commons/v6/service/mocks" - "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue" st "github.com/splitio/go-split-commons/v6/storage/mocks" "github.com/splitio/go-split-commons/v6/telemetry" "github.com/splitio/go-toolkit/v5/datastructures/set" @@ -56,7 +56,7 @@ func TestUniqueKeysTask(t *testing.T) { SegmentKeysCountCall: func() int64 { return 10 }, } - uniqueKeys := mutexmap.NewMMUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) + uniqueKeys := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil)) synchronizer := telemetry.NewTelemetrySynchronizer( mockedTelemetryStorage, @@ -80,6 +80,7 @@ func TestUniqueKeysTask(t *testing.T) { synchronizer, 2, logging.NewLogger(&logging.LoggerOptions{}), + 5, ) if !tracker.Track("tratment-1", "key-1") { diff --git a/telemetry/interface.go b/telemetry/interface.go index 6598fa2e..8c68f6fc 100644 --- a/telemetry/interface.go +++ b/telemetry/interface.go @@ -4,5 +4,5 @@ package telemetry type TelemetrySynchronizer interface { SynchronizeConfig(cfg InitConfig, timedUntilReady int64, factoryInstances map[string]int64, tags []string) SynchronizeStats() error - SynchronizeUniqueKeys() error + SynchronizeUniqueKeys(bulkSizer int64) error } diff --git a/telemetry/localhost.go b/telemetry/localhost.go index 2810ae5d..815291bc 100644 --- a/telemetry/localhost.go +++ b/telemetry/localhost.go @@ -9,7 +9,7 @@ func (n *NoOp) SynchronizeStats() error { return nil } -func (n *NoOp) SynchronizeUniqueKeys() error { +func (n *NoOp) SynchronizeUniqueKeys(bulkSize int64) error { return nil } diff --git a/telemetry/memory.go b/telemetry/memory.go index 28c58a35..d01ab850 100644 --- a/telemetry/memory.go +++ b/telemetry/memory.go @@ -136,8 +136,8 @@ func (e *RecorderSingle) SynchronizeConfig(cfg InitConfig, timedUntilReady int64 } // SynchronizeUniqueKeys syncs unique keys -func (e *RecorderSingle) SynchronizeUniqueKeys() error { - uniques := e.uniqueKeysStorage.PopAll() +func (e *RecorderSingle) SynchronizeUniqueKeys(bulkSize int64) error { + uniques := e.uniqueKeysStorage.PopN(bulkSize) if len(uniques.Keys) < 1 { e.logger.Debug("Unique keys list is empty, nothing to synchronize.") diff --git a/telemetry/redis.go b/telemetry/redis.go index d98d5362..5e3b27ef 100644 --- a/telemetry/redis.go +++ b/telemetry/redis.go @@ -51,8 +51,8 @@ func (r *SynchronizerRedis) SynchronizeConfig(cfg InitConfig, timedUntilReady in } // SynchronizeUniqueKeys syncs unique keys -func (r *SynchronizerRedis) SynchronizeUniqueKeys() error { - uniques := r.uniqueKeysStorage.PopAll() +func (r *SynchronizerRedis) SynchronizeUniqueKeys(bulkSize int64) error { + uniques := r.uniqueKeysStorage.PopN(bulkSize) if len(uniques.Keys) < 1 { r.logger.Debug("Unique keys list is empty, nothing to synchronize.") return nil From 9a5c36389db178a4ee8850926a03b537b0d5a2b5 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Mon, 14 Apr 2025 19:43:40 -0300 Subject: [PATCH 07/15] adding uniquekeys mock --- storage/mocks/uniquekeys.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/storage/mocks/uniquekeys.go b/storage/mocks/uniquekeys.go index df3018ec..cd6d61a1 100644 --- a/storage/mocks/uniquekeys.go +++ b/storage/mocks/uniquekeys.go @@ -16,3 +16,16 @@ func (m MockUniqueKeysStorage) Push(featureName string, key string) { func (m MockUniqueKeysStorage) PopN(bulkSize int64) dtos.Uniques { return m.PopNCall(bulkSize) } + +type MockUniqueKeysMultiSdkConsumer struct { + CountCall func() int64 + PopNRawCall func(int64) ([]string, int64, error) +} + +func (m MockUniqueKeysMultiSdkConsumer) Count() int64 { + return m.CountCall() +} + +func (m MockUniqueKeysMultiSdkConsumer) PopNRaw(n int64) ([]string, int64, error) { + return m.PopNRawCall(n) +} From 62906f35f3b026449bfd708dbe8f0c8b56f973bd Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Mon, 14 Apr 2025 20:24:05 -0300 Subject: [PATCH 08/15] polishing --- telemetry/memory.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/telemetry/memory.go b/telemetry/memory.go index d01ab850..97d77958 100644 --- a/telemetry/memory.go +++ b/telemetry/memory.go @@ -137,6 +137,10 @@ func (e *RecorderSingle) SynchronizeConfig(cfg InitConfig, timedUntilReady int64 // SynchronizeUniqueKeys syncs unique keys func (e *RecorderSingle) SynchronizeUniqueKeys(bulkSize int64) error { + if e.uniqueKeysStorage == nil { + return nil + } + uniques := e.uniqueKeysStorage.PopN(bulkSize) if len(uniques.Keys) < 1 { From 55f450ca303b7c46dceac27ddd13ce565626f088 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Tue, 15 Apr 2025 19:38:36 -0300 Subject: [PATCH 09/15] improved spec handling --- service/api/http_fetchers_test.go | 8 ++--- service/api/specs/specversion.go | 32 +++++++++++--------- service/api/specs/splitversionfilter.go | 25 ++++++++------- service/api/specs/splitversionfilter_test.go | 20 ++++++------ service/commons_test.go | 8 ++--- 5 files changed, 48 insertions(+), 45 deletions(-) diff --git a/service/api/http_fetchers_test.go b/service/api/http_fetchers_test.go index 6f3ebdba..f473532e 100644 --- a/service/api/http_fetchers_test.go +++ b/service/api/http_fetchers_test.go @@ -40,7 +40,7 @@ func TestSpitChangesFetch(t *testing.T) { if r.URL.Query().Get("sets") != "" { t.Error("wrong sets") } - if r.URL.Query().Get("s") != specs.FLAG_V1_1 { + if r.URL.Query().Get("s") != specs.FlagSpecs[1] { t.Error("wrong spec") } if r.URL.RawQuery != "s=1.1&since=123456" { @@ -55,7 +55,7 @@ func TestSpitChangesFetch(t *testing.T) { conf.AdvancedConfig{ EventsURL: ts.URL, SdkURL: ts.URL, - FlagsSpecVersion: specs.FLAG_V1_1, + FlagsSpecVersion: specs.FlagSpecs[1], }, logger, dtos.Metadata{}, @@ -213,7 +213,7 @@ func TestSpitChangesFetchWithAll(t *testing.T) { EventsURL: ts.URL, SdkURL: ts.URL, FlagSetsFilter: []string{"one", "two"}, - FlagsSpecVersion: specs.FLAG_V1_1, + FlagsSpecVersion: specs.FlagSpecs[1], }, logger, dtos.Metadata{}, @@ -235,7 +235,7 @@ func TestSpitChangesFetchWithAll(t *testing.T) { if !queryParams.Has("sets") { t.Error("Expected to have sets") } - if queryParams.Get("s") != specs.FLAG_V1_1 { + if queryParams.Get("s") != specs.FlagSpecs[1] { t.Error("Expected to have spec") } asString := queryParams.Get("sets") diff --git a/service/api/specs/specversion.go b/service/api/specs/specversion.go index 78fafcee..a87128d6 100644 --- a/service/api/specs/specversion.go +++ b/service/api/specs/specversion.go @@ -1,30 +1,34 @@ package specs -import "fmt" +import ( + "fmt" -const ( - FLAG_V1_0 = "1.0" - FLAG_V1_1 = "1.1" - FLAG_V1_2 = "1.2" + "golang.org/x/exp/slices" ) +var FlagSpecs = []string{ + "1.0", // default + "1.1", // Semver Matcher + "1.2", // Large Segment Matcher +} + +var Latest = string(FlagSpecs[len(FlagSpecs)-1]) +var Default = string(FlagSpecs[0]) + // Match returns the spec version if it is valid, otherwise it returns nil func Match(version string) *string { - switch version { - case FLAG_V1_0: - return &version - case FLAG_V1_1: - return &version - case FLAG_V1_2: - return &version + ok := slices.Contains(FlagSpecs, version) + if !ok { + return nil } - return nil + + return &version } func ParseAndValidate(spec string) (string, error) { if len(spec) == 0 { // return default flag spec - return FLAG_V1_0, nil + return Default, nil } if Match(spec) == nil { diff --git a/service/api/specs/splitversionfilter.go b/service/api/specs/splitversionfilter.go index fb612241..d75ecbe3 100644 --- a/service/api/specs/splitversionfilter.go +++ b/service/api/specs/splitversionfilter.go @@ -5,35 +5,34 @@ import ( ) type SplitVersionFilter struct { - v1_0 map[string]bool - v1_1 map[string]bool + data map[string]map[string]bool } func NewSplitVersionFilter() SplitVersionFilter { - v1_1 := map[string]bool{matchers.MatcherTypeInLargeSegment: true} - v1_0 := mergeMaps(map[string]bool{ + data := map[string]map[string]bool{ + FlagSpecs[1]: {matchers.MatcherTypeInLargeSegment: true}, + } + + data[Default] = mergeMaps(map[string]bool{ matchers.MatcherEqualToSemver: true, matchers.MatcherTypeLessThanOrEqualToSemver: true, matchers.MatcherTypeGreaterThanOrEqualToSemver: true, matchers.MatcherTypeBetweenSemver: true, matchers.MatcherTypeInListSemver: true, - }, v1_1) + }, data[FlagSpecs[1]]) return SplitVersionFilter{ - v1_0: v1_0, - v1_1: v1_1, + data: data, } } func (f *SplitVersionFilter) ShouldFilter(matcher string, apiVersion string) bool { - switch apiVersion { - case FLAG_V1_1: - return f.v1_1[matcher] - case FLAG_V1_0: - return f.v1_0[matcher] + matchers, ok := f.data[apiVersion] + if !ok { + return false } - return false + return matchers[matcher] } func mergeMaps(versionMap map[string]bool, toMergeMap map[string]bool) map[string]bool { diff --git a/service/api/specs/splitversionfilter_test.go b/service/api/specs/splitversionfilter_test.go index e9f5cf0a..ebaf96a2 100644 --- a/service/api/specs/splitversionfilter_test.go +++ b/service/api/specs/splitversionfilter_test.go @@ -8,17 +8,17 @@ import ( func TestParseAndValidate(t *testing.T) { res, err := ParseAndValidate("") - if err != nil || res != FLAG_V1_0 { + if err != nil || res != Default { t.Error("It should be 1.1") } res, err = ParseAndValidate("1.1") - if err != nil || res != FLAG_V1_1 { + if err != nil || res != FlagSpecs[1] { t.Error("It should be 1.1") } res, err = ParseAndValidate("1.2") - if err != nil || res != FLAG_V1_2 { + if err != nil || res != FlagSpecs[2] { t.Error("It should be 1.2") } @@ -28,34 +28,34 @@ func TestParseAndValidate(t *testing.T) { } } -func TestsplitVersionFilter(t *testing.T) { +func TestSplitVersionFilter(t *testing.T) { filter := NewSplitVersionFilter() - shouldFilter := filter.ShouldFilter(matchers.MatcherTypeBetweenSemver, FLAG_V1_0) + shouldFilter := filter.ShouldFilter(matchers.MatcherTypeBetweenSemver, Default) if !shouldFilter { t.Error("It should filtered") } - shouldFilter = filter.ShouldFilter(matchers.MatcherTypeEqualTo, FLAG_V1_0) + shouldFilter = filter.ShouldFilter(matchers.MatcherTypeEqualTo, Default) if shouldFilter { t.Error("It should not filtered") } - shouldFilter = filter.ShouldFilter(matchers.MatcherTypeBetweenSemver, FLAG_V1_1) + shouldFilter = filter.ShouldFilter(matchers.MatcherTypeBetweenSemver, FlagSpecs[1]) if shouldFilter { t.Error("It should not filtered") } - shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, FLAG_V1_0) + shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, Default) if !shouldFilter { t.Error("It should filtered") } - shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, FLAG_V1_1) + shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, FlagSpecs[1]) if !shouldFilter { t.Error("It should filtered") } - shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, FLAG_V1_2) + shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, FlagSpecs[2]) if shouldFilter { t.Error("It should not filtered") } diff --git a/service/commons_test.go b/service/commons_test.go index ec940ebe..60ebe801 100644 --- a/service/commons_test.go +++ b/service/commons_test.go @@ -9,7 +9,7 @@ import ( ) func TestSplitFetchOptions(t *testing.T) { - fetchOptions := MakeFlagRequestParams().WithChangeNumber(123456).WithFlagSetsFilter("filter").WithTill(*common.Int64Ref(123)).WithSpecVersion(common.StringRef(specs.FLAG_V1_1)) + fetchOptions := MakeFlagRequestParams().WithChangeNumber(123456).WithFlagSetsFilter("filter").WithTill(*common.Int64Ref(123)).WithSpecVersion(common.StringRef(specs.FlagSpecs[1])) req, _ := http.NewRequest("GET", "test", nil) fetchOptions.Apply(req) @@ -19,7 +19,7 @@ func TestSplitFetchOptions(t *testing.T) { if req.URL.Query().Get(since) != "123456" { t.Error("Change number not set") } - if req.URL.Query().Get(spec) != specs.FLAG_V1_1 { + if req.URL.Query().Get(spec) != specs.FlagSpecs[1] { t.Error("Spec version not set") } if req.URL.Query().Get(sets) != "filter" { @@ -55,14 +55,14 @@ func TestSegmentRequestParams(t *testing.T) { } func TestAuthRequestParams(t *testing.T) { - fetchOptions := MakeAuthRequestParams(common.StringRef(specs.FLAG_V1_1)) + fetchOptions := MakeAuthRequestParams(common.StringRef(specs.FlagSpecs[1])) req, _ := http.NewRequest("GET", "test", nil) fetchOptions.Apply(req) if req.Header.Get(cacheControl) != cacheControlNoCache { t.Error("Cache control header not set") } - if req.URL.Query().Get(spec) != specs.FLAG_V1_1 { + if req.URL.Query().Get(spec) != specs.FlagSpecs[1] { t.Error("Spec version not set") } if req.URL.String() != "test?s=1.1" { From bd2b1aa25b8fd062989537e7cbb1b1807b2688cb Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Thu, 24 Apr 2025 11:59:44 -0300 Subject: [PATCH 10/15] polishing --- service/api/http_fetchers_test.go | 8 ++++---- service/api/specs/specversion.go | 12 ++++++++---- service/api/specs/splitversionfilter.go | 4 ++-- service/api/specs/splitversionfilter_test.go | 10 +++++----- service/commons_test.go | 8 ++++---- 5 files changed, 23 insertions(+), 19 deletions(-) diff --git a/service/api/http_fetchers_test.go b/service/api/http_fetchers_test.go index f473532e..6f3ebdba 100644 --- a/service/api/http_fetchers_test.go +++ b/service/api/http_fetchers_test.go @@ -40,7 +40,7 @@ func TestSpitChangesFetch(t *testing.T) { if r.URL.Query().Get("sets") != "" { t.Error("wrong sets") } - if r.URL.Query().Get("s") != specs.FlagSpecs[1] { + if r.URL.Query().Get("s") != specs.FLAG_V1_1 { t.Error("wrong spec") } if r.URL.RawQuery != "s=1.1&since=123456" { @@ -55,7 +55,7 @@ func TestSpitChangesFetch(t *testing.T) { conf.AdvancedConfig{ EventsURL: ts.URL, SdkURL: ts.URL, - FlagsSpecVersion: specs.FlagSpecs[1], + FlagsSpecVersion: specs.FLAG_V1_1, }, logger, dtos.Metadata{}, @@ -213,7 +213,7 @@ func TestSpitChangesFetchWithAll(t *testing.T) { EventsURL: ts.URL, SdkURL: ts.URL, FlagSetsFilter: []string{"one", "two"}, - FlagsSpecVersion: specs.FlagSpecs[1], + FlagsSpecVersion: specs.FLAG_V1_1, }, logger, dtos.Metadata{}, @@ -235,7 +235,7 @@ func TestSpitChangesFetchWithAll(t *testing.T) { if !queryParams.Has("sets") { t.Error("Expected to have sets") } - if queryParams.Get("s") != specs.FlagSpecs[1] { + if queryParams.Get("s") != specs.FLAG_V1_1 { t.Error("Expected to have spec") } asString := queryParams.Get("sets") diff --git a/service/api/specs/specversion.go b/service/api/specs/specversion.go index a87128d6..ca4169c2 100644 --- a/service/api/specs/specversion.go +++ b/service/api/specs/specversion.go @@ -6,18 +6,22 @@ import ( "golang.org/x/exp/slices" ) -var FlagSpecs = []string{ +var flagSpecs = []string{ "1.0", // default "1.1", // Semver Matcher "1.2", // Large Segment Matcher } -var Latest = string(FlagSpecs[len(FlagSpecs)-1]) -var Default = string(FlagSpecs[0]) +var FLAG_V1_0 = flagSpecs[0] +var FLAG_V1_1 = flagSpecs[1] +var FLAG_V1_2 = flagSpecs[2] + +var Latest = string(flagSpecs[len(flagSpecs)-1]) +var Default = string(flagSpecs[0]) // Match returns the spec version if it is valid, otherwise it returns nil func Match(version string) *string { - ok := slices.Contains(FlagSpecs, version) + ok := slices.Contains(flagSpecs, version) if !ok { return nil } diff --git a/service/api/specs/splitversionfilter.go b/service/api/specs/splitversionfilter.go index d75ecbe3..8de97b2f 100644 --- a/service/api/specs/splitversionfilter.go +++ b/service/api/specs/splitversionfilter.go @@ -10,7 +10,7 @@ type SplitVersionFilter struct { func NewSplitVersionFilter() SplitVersionFilter { data := map[string]map[string]bool{ - FlagSpecs[1]: {matchers.MatcherTypeInLargeSegment: true}, + FLAG_V1_1: {matchers.MatcherTypeInLargeSegment: true}, } data[Default] = mergeMaps(map[string]bool{ @@ -19,7 +19,7 @@ func NewSplitVersionFilter() SplitVersionFilter { matchers.MatcherTypeGreaterThanOrEqualToSemver: true, matchers.MatcherTypeBetweenSemver: true, matchers.MatcherTypeInListSemver: true, - }, data[FlagSpecs[1]]) + }, data[FLAG_V1_1]) return SplitVersionFilter{ data: data, diff --git a/service/api/specs/splitversionfilter_test.go b/service/api/specs/splitversionfilter_test.go index ebaf96a2..8bbb2a08 100644 --- a/service/api/specs/splitversionfilter_test.go +++ b/service/api/specs/splitversionfilter_test.go @@ -13,12 +13,12 @@ func TestParseAndValidate(t *testing.T) { } res, err = ParseAndValidate("1.1") - if err != nil || res != FlagSpecs[1] { + if err != nil || res != FLAG_V1_1 { t.Error("It should be 1.1") } res, err = ParseAndValidate("1.2") - if err != nil || res != FlagSpecs[2] { + if err != nil || res != FLAG_V1_2 { t.Error("It should be 1.2") } @@ -40,7 +40,7 @@ func TestSplitVersionFilter(t *testing.T) { t.Error("It should not filtered") } - shouldFilter = filter.ShouldFilter(matchers.MatcherTypeBetweenSemver, FlagSpecs[1]) + shouldFilter = filter.ShouldFilter(matchers.MatcherTypeBetweenSemver, FLAG_V1_1) if shouldFilter { t.Error("It should not filtered") } @@ -50,12 +50,12 @@ func TestSplitVersionFilter(t *testing.T) { t.Error("It should filtered") } - shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, FlagSpecs[1]) + shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, FLAG_V1_1) if !shouldFilter { t.Error("It should filtered") } - shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, FlagSpecs[2]) + shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, FLAG_V1_2) if shouldFilter { t.Error("It should not filtered") } diff --git a/service/commons_test.go b/service/commons_test.go index 60ebe801..ec940ebe 100644 --- a/service/commons_test.go +++ b/service/commons_test.go @@ -9,7 +9,7 @@ import ( ) func TestSplitFetchOptions(t *testing.T) { - fetchOptions := MakeFlagRequestParams().WithChangeNumber(123456).WithFlagSetsFilter("filter").WithTill(*common.Int64Ref(123)).WithSpecVersion(common.StringRef(specs.FlagSpecs[1])) + fetchOptions := MakeFlagRequestParams().WithChangeNumber(123456).WithFlagSetsFilter("filter").WithTill(*common.Int64Ref(123)).WithSpecVersion(common.StringRef(specs.FLAG_V1_1)) req, _ := http.NewRequest("GET", "test", nil) fetchOptions.Apply(req) @@ -19,7 +19,7 @@ func TestSplitFetchOptions(t *testing.T) { if req.URL.Query().Get(since) != "123456" { t.Error("Change number not set") } - if req.URL.Query().Get(spec) != specs.FlagSpecs[1] { + if req.URL.Query().Get(spec) != specs.FLAG_V1_1 { t.Error("Spec version not set") } if req.URL.Query().Get(sets) != "filter" { @@ -55,14 +55,14 @@ func TestSegmentRequestParams(t *testing.T) { } func TestAuthRequestParams(t *testing.T) { - fetchOptions := MakeAuthRequestParams(common.StringRef(specs.FlagSpecs[1])) + fetchOptions := MakeAuthRequestParams(common.StringRef(specs.FLAG_V1_1)) req, _ := http.NewRequest("GET", "test", nil) fetchOptions.Apply(req) if req.Header.Get(cacheControl) != cacheControlNoCache { t.Error("Cache control header not set") } - if req.URL.Query().Get(spec) != specs.FlagSpecs[1] { + if req.URL.Query().Get(spec) != specs.FLAG_V1_1 { t.Error("Spec version not set") } if req.URL.String() != "test?s=1.1" { From 2c84e28b13e82b5bac1f07b3200ac552ff1ac8de Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Thu, 24 Apr 2025 12:25:20 -0300 Subject: [PATCH 11/15] polishing --- service/api/specs/specversion.go | 3 +-- service/api/specs/splitversionfilter.go | 2 +- service/api/specs/splitversionfilter_test.go | 8 ++++---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/service/api/specs/specversion.go b/service/api/specs/specversion.go index ca4169c2..86d32985 100644 --- a/service/api/specs/specversion.go +++ b/service/api/specs/specversion.go @@ -17,7 +17,6 @@ var FLAG_V1_1 = flagSpecs[1] var FLAG_V1_2 = flagSpecs[2] var Latest = string(flagSpecs[len(flagSpecs)-1]) -var Default = string(flagSpecs[0]) // Match returns the spec version if it is valid, otherwise it returns nil func Match(version string) *string { @@ -32,7 +31,7 @@ func Match(version string) *string { func ParseAndValidate(spec string) (string, error) { if len(spec) == 0 { // return default flag spec - return Default, nil + return FLAG_V1_0, nil } if Match(spec) == nil { diff --git a/service/api/specs/splitversionfilter.go b/service/api/specs/splitversionfilter.go index 8de97b2f..cbf67a8c 100644 --- a/service/api/specs/splitversionfilter.go +++ b/service/api/specs/splitversionfilter.go @@ -13,7 +13,7 @@ func NewSplitVersionFilter() SplitVersionFilter { FLAG_V1_1: {matchers.MatcherTypeInLargeSegment: true}, } - data[Default] = mergeMaps(map[string]bool{ + data[FLAG_V1_0] = mergeMaps(map[string]bool{ matchers.MatcherEqualToSemver: true, matchers.MatcherTypeLessThanOrEqualToSemver: true, matchers.MatcherTypeGreaterThanOrEqualToSemver: true, diff --git a/service/api/specs/splitversionfilter_test.go b/service/api/specs/splitversionfilter_test.go index 8bbb2a08..ab8e51cb 100644 --- a/service/api/specs/splitversionfilter_test.go +++ b/service/api/specs/splitversionfilter_test.go @@ -8,7 +8,7 @@ import ( func TestParseAndValidate(t *testing.T) { res, err := ParseAndValidate("") - if err != nil || res != Default { + if err != nil || res != FLAG_V1_0 { t.Error("It should be 1.1") } @@ -30,12 +30,12 @@ func TestParseAndValidate(t *testing.T) { func TestSplitVersionFilter(t *testing.T) { filter := NewSplitVersionFilter() - shouldFilter := filter.ShouldFilter(matchers.MatcherTypeBetweenSemver, Default) + shouldFilter := filter.ShouldFilter(matchers.MatcherTypeBetweenSemver, FLAG_V1_0) if !shouldFilter { t.Error("It should filtered") } - shouldFilter = filter.ShouldFilter(matchers.MatcherTypeEqualTo, Default) + shouldFilter = filter.ShouldFilter(matchers.MatcherTypeEqualTo, FLAG_V1_0) if shouldFilter { t.Error("It should not filtered") } @@ -45,7 +45,7 @@ func TestSplitVersionFilter(t *testing.T) { t.Error("It should not filtered") } - shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, Default) + shouldFilter = filter.ShouldFilter(matchers.MatcherTypeInLargeSegment, FLAG_V1_0) if !shouldFilter { t.Error("It should filtered") } From eaa6073c54b65cfecb1fde39122661d316d8c79e Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Thu, 24 Apr 2025 12:27:39 -0300 Subject: [PATCH 12/15] removed unnecessary cast --- service/api/specs/specversion.go | 2 +- service/api/specs/splitversionfilter_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/service/api/specs/specversion.go b/service/api/specs/specversion.go index 86d32985..c15342e6 100644 --- a/service/api/specs/specversion.go +++ b/service/api/specs/specversion.go @@ -16,7 +16,7 @@ var FLAG_V1_0 = flagSpecs[0] var FLAG_V1_1 = flagSpecs[1] var FLAG_V1_2 = flagSpecs[2] -var Latest = string(flagSpecs[len(flagSpecs)-1]) +var Latest = flagSpecs[len(flagSpecs)-1] // Match returns the spec version if it is valid, otherwise it returns nil func Match(version string) *string { diff --git a/service/api/specs/splitversionfilter_test.go b/service/api/specs/splitversionfilter_test.go index ab8e51cb..e9f5cf0a 100644 --- a/service/api/specs/splitversionfilter_test.go +++ b/service/api/specs/splitversionfilter_test.go @@ -28,7 +28,7 @@ func TestParseAndValidate(t *testing.T) { } } -func TestSplitVersionFilter(t *testing.T) { +func TestsplitVersionFilter(t *testing.T) { filter := NewSplitVersionFilter() shouldFilter := filter.ShouldFilter(matchers.MatcherTypeBetweenSemver, FLAG_V1_0) if !shouldFilter { From 1cf706b11227978a8e2d4ca887c90e25c1e1dc38 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Thu, 24 Apr 2025 14:20:01 -0300 Subject: [PATCH 13/15] pr feedback --- service/api/specs/specversion.go | 15 ++++++--------- service/api/specs/splitversionfilter_test.go | 2 +- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/service/api/specs/specversion.go b/service/api/specs/specversion.go index c15342e6..8aba1eec 100644 --- a/service/api/specs/specversion.go +++ b/service/api/specs/specversion.go @@ -6,16 +6,13 @@ import ( "golang.org/x/exp/slices" ) -var flagSpecs = []string{ - "1.0", // default - "1.1", // Semver Matcher - "1.2", // Large Segment Matcher -} - -var FLAG_V1_0 = flagSpecs[0] -var FLAG_V1_1 = flagSpecs[1] -var FLAG_V1_2 = flagSpecs[2] +const ( + FLAG_V1_0 = "1.0" // default + FLAG_V1_1 = "1.1" // Semver Matcher + FLAG_V1_2 = "1.2" // Large Segment Matcher +) +var flagSpecs = []string{FLAG_V1_0, FLAG_V1_1, FLAG_V1_2} var Latest = flagSpecs[len(flagSpecs)-1] // Match returns the spec version if it is valid, otherwise it returns nil diff --git a/service/api/specs/splitversionfilter_test.go b/service/api/specs/splitversionfilter_test.go index e9f5cf0a..ab8e51cb 100644 --- a/service/api/specs/splitversionfilter_test.go +++ b/service/api/specs/splitversionfilter_test.go @@ -28,7 +28,7 @@ func TestParseAndValidate(t *testing.T) { } } -func TestsplitVersionFilter(t *testing.T) { +func TestSplitVersionFilter(t *testing.T) { filter := NewSplitVersionFilter() shouldFilter := filter.ShouldFilter(matchers.MatcherTypeBetweenSemver, FLAG_V1_0) if !shouldFilter { From 00def8c7d15b32db258dcae76da0277b67d71142 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Thu, 24 Apr 2025 14:44:17 -0300 Subject: [PATCH 14/15] update changes --- CHANGES | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGES b/CHANGES index 31aa4cb8..a2c4a883 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,11 @@ +7.0.0 (Apr 24, 2025) +- BREAKING CHANGE: + - UniqueKeysTracker interface changed: + - Added Unique keys storage implementation. + - Added support for batch posting. +- Bump dependencies for vulnerability fixes. +- Updated flag specs handler. + 6.1.0 (Jan 14, 2025) - Added support for Impressions toggle. - Bump dependencies for vulnerability fixes. From a87a8ca37ceec50ab91358050800c95d82e6cd99 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Fri, 25 Apr 2025 16:11:09 -0300 Subject: [PATCH 15/15] added spec 1.3 --- service/api/specs/specversion.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/service/api/specs/specversion.go b/service/api/specs/specversion.go index 8aba1eec..62a23cd2 100644 --- a/service/api/specs/specversion.go +++ b/service/api/specs/specversion.go @@ -8,8 +8,9 @@ import ( const ( FLAG_V1_0 = "1.0" // default - FLAG_V1_1 = "1.1" // Semver Matcher - FLAG_V1_2 = "1.2" // Large Segment Matcher + FLAG_V1_1 = "1.1" // Semver + FLAG_V1_2 = "1.2" // Large Segment + FLAG_V1_3 = "1.3" // Rule-based Segment ) var flagSpecs = []string{FLAG_V1_0, FLAG_V1_1, FLAG_V1_2}