1
1
package io .harness .cf .client .api ;
2
2
3
+ import static io .harness .cf .client .common .SdkCodes .warnMetricsBufferFull ;
3
4
import static io .harness .cf .client .common .Utils .shutdownExecutorService ;
4
5
import static java .util .concurrent .TimeUnit .SECONDS ;
5
6
@@ -70,6 +71,10 @@ private void transferValueIntoMapAtomicallyAndUpdateTo(
70
71
return newValue ;
71
72
});
72
73
}
74
+
75
+ public boolean containsKey (K key ) {
76
+ return freqMap .containsKey (key );
77
+ }
73
78
}
74
79
75
80
private static final String FEATURE_NAME_ATTRIBUTE = "featureName" ;
@@ -91,55 +96,66 @@ private void transferValueIntoMapAtomicallyAndUpdateTo(
91
96
private static final Target globalTarget =
92
97
Target .builder ().name (GLOBAL_TARGET_NAME ).identifier (GLOBAL_TARGET ).build ();
93
98
99
+ private static final int MAX_SENT_TARGETS_TO_RETAIN = 100_000 ;
100
+ private static final int MAX_FREQ_MAP_TO_RETAIN = 10_000 ;
101
+
102
+ private static final LongAdder evalCounter = new LongAdder ();
103
+ private static final LongAdder metricsEvalsDropped = new LongAdder ();
104
+ private static final LongAdder targetsSeenDropped = new LongAdder ();
94
105
private final Connector connector ;
95
106
private final BaseConfig config ;
96
107
private final FrequencyMap <MetricEvent > frequencyMap ;
97
- private final Set <Target > uniqueTargetSet ;
108
+ private final Set <Target > targetsSeen ;
98
109
99
110
private ScheduledFuture <?> runningTask = null ;
100
111
private final ScheduledExecutorService scheduler = Executors .newScheduledThreadPool (1 );
101
112
102
113
private final LongAdder metricsSent = new LongAdder ();
114
+ private final int maxFreqMapSize ;
103
115
104
116
public MetricsProcessor (
105
117
@ NonNull Connector connector , @ NonNull BaseConfig config , @ NonNull MetricsCallback callback ) {
106
118
this .connector = connector ;
107
119
this .config = config ;
108
120
this .frequencyMap = new FrequencyMap <>();
109
- this .uniqueTargetSet = ConcurrentHashMap .newKeySet ();
121
+ this .targetsSeen = ConcurrentHashMap .newKeySet ();
122
+ this .maxFreqMapSize = clamp (config .getBufferSize (), 2048 , MAX_FREQ_MAP_TO_RETAIN );
110
123
callback .onMetricsReady ();
111
124
}
112
125
126
+ private int clamp (int value , int lower , int higher ) {
127
+ return Math .max (lower , Math .min (higher , value ));
128
+ }
129
+
113
130
@ Deprecated /* The name of this method no longer makes sense since we moved to a map, kept for source compatibility */
114
131
public void pushToQueue (Target target , String featureName , Variation variation ) {
115
132
registerEvaluation (target , featureName , variation );
116
133
}
117
134
118
135
void registerEvaluation (Target target , String featureName , Variation variation ) {
119
136
120
- final int freqMapSize = frequencyMap . size () ;
137
+ Target metricTarget = globalTarget ;
121
138
122
- if (freqMapSize > config .getBufferSize ()) {
123
- if (log .isWarnEnabled ()) {
124
- log .warn (
125
- "Metric frequency map exceeded buffer size ({} > {}), force flushing" ,
126
- freqMapSize ,
127
- config .getBufferSize ());
139
+ if (target != null ) {
140
+ if (!targetsSeen .contains (target ) && targetsSeen .size () + 1 > MAX_SENT_TARGETS_TO_RETAIN ) {
141
+ targetsSeenDropped .increment ();
142
+ } else {
143
+ targetsSeen .add (target );
144
+ if (!config .isGlobalTargetEnabled ()) {
145
+ metricTarget = target ;
146
+ }
128
147
}
129
- // If the map is starting to grow too much then push the events now and reset the counters
130
- scheduler .schedule (this ::runOneIteration , 0 , SECONDS );
131
148
}
132
149
133
- Target metricTarget = globalTarget ;
150
+ final MetricEvent metricsEvent = new MetricEvent ( featureName , metricTarget , variation ) ;
134
151
135
- if (target != null ) {
136
- uniqueTargetSet .add (target );
137
- if (!config .isGlobalTargetEnabled ()) {
138
- metricTarget = target ;
139
- }
152
+ if (!frequencyMap .containsKey (metricsEvent ) && frequencyMap .size () + 1 > maxFreqMapSize ) {
153
+ metricsEvalsDropped .increment ();
154
+ } else {
155
+ frequencyMap .increment (metricsEvent );
140
156
}
141
157
142
- frequencyMap .increment (new MetricEvent ( featureName , metricTarget , variation ) );
158
+ evalCounter .increment ();
143
159
}
144
160
145
161
/** This method sends the metrics data to the analytics server and resets the cache */
@@ -259,15 +275,23 @@ private void addTargetData(Metrics metrics, Target target) {
259
275
260
276
void runOneIteration () {
261
277
Thread .currentThread ().setName ("MetricsThread" );
278
+
279
+ final long droppedEvals = metricsEvalsDropped .sumThenReset ();
280
+ final long droppedTargets = targetsSeenDropped .sumThenReset ();
281
+
282
+ if (droppedEvals > 0 || droppedTargets > 0 ) {
283
+ warnMetricsBufferFull (droppedEvals , droppedTargets );
284
+ }
285
+
262
286
if (log .isDebugEnabled ()) {
263
287
log .debug (
264
288
"Drain metrics queue : frequencyMap size={} uniqueTargetSet size={}" ,
265
289
frequencyMap .size (),
266
- uniqueTargetSet .size ());
290
+ targetsSeen .size ());
267
291
}
268
- sendDataAndResetCache (frequencyMap .drainToMap (), new HashSet <>(uniqueTargetSet ));
292
+ sendDataAndResetCache (frequencyMap .drainToMap (), new HashSet <>(targetsSeen ));
269
293
270
- uniqueTargetSet .clear ();
294
+ targetsSeen .clear ();
271
295
}
272
296
273
297
public void start () {
@@ -328,7 +352,15 @@ long getQueueSize() {
328
352
}
329
353
330
354
long getTargetSetSize () {
331
- return uniqueTargetSet .size ();
355
+ return targetsSeen .size ();
356
+ }
357
+
358
+ long getMetricsEvalsDropped () {
359
+ return metricsEvalsDropped .sum ();
360
+ }
361
+
362
+ long getTargetsSeenDropped () {
363
+ return targetsSeenDropped .sum ();
332
364
}
333
365
334
366
void reset () {
0 commit comments