Skip to content

Commit 6fc3663

Browse files
committed
add support for optimistic alignment to PeriodSnapshot.accumulator
1 parent 28cdeac commit 6fc3663

File tree

2 files changed

+48
-5
lines changed

2 files changed

+48
-5
lines changed

core/kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala

+32
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,34 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
9393
accumulator.add(fiveSecondsSeven) shouldBe empty // second 0:35
9494
}
9595

96+
"not align snapshot when optimistic tick alignment is false" in {
97+
// When the kamon.metric.optimistic-tick-alignment is false
98+
// If accumulating over 11 seconds, the snapshots should be generated at 00:00:11, 00:00:22, 00:00:33 and so on.
99+
// Thus the snapshot next tick is never aligned
100+
applyConfig("kamon.metric.optimistic-tick-alignment = no")
101+
102+
val accumulator = newAccumulator(11, 0)
103+
// as the first add snapshot.to determines the first ticker use zero second
104+
// to send snapshot at the seconds that are multiples of the duration
105+
accumulator.add(zeroSecond) shouldBe empty // second 0:0
106+
accumulator.add(fiveSecondsOne) shouldBe empty // second 0:5
107+
accumulator.add(fiveSecondsTwo) shouldBe empty // second 0:10
108+
109+
val s15 = accumulator.add(fiveSecondsThree).value // second 0:15
110+
s15.from shouldBe(zeroSecond.from)
111+
s15.to shouldBe(fiveSecondsThree.to)
112+
113+
accumulator.add(fiveSecondsFour) shouldBe empty // second 0:20
114+
val s25 = accumulator.add(fiveSecondsFive).value // second 0:25
115+
s25.from shouldBe(fiveSecondsFour.from)
116+
s25.to shouldBe(fiveSecondsFive.to)
117+
118+
accumulator.add(fiveSecondsSix) shouldBe empty // second 0:30
119+
}
120+
96121
"do best effort to align when snapshots themselves are not aligned" in {
122+
applyConfig("kamon.metric.optimistic-tick-alignment = yes")
123+
97124
val accumulator = newAccumulator(30, 0)
98125
accumulator.add(tenSecondsOne) shouldBe empty // second 0:13
99126
accumulator.add(tenSecondsTwo) shouldBe empty // second 0:23
@@ -139,6 +166,8 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
139166
}
140167

141168
"produce a snapshot when enough data has been accumulated" in {
169+
applyConfig("kamon.metric.optimistic-tick-alignment = yes")
170+
142171
val accumulator = newAccumulator(15, 1)
143172
accumulator.add(fiveSecondsOne) shouldBe empty
144173
accumulator.add(fiveSecondsTwo) shouldBe empty
@@ -167,6 +196,8 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
167196
val alignedZeroTime = Clock.nextAlignedInstant(Kamon.clock().instant(), Duration.ofSeconds(60)).minusSeconds(60)
168197
val unAlignedZeroTime = alignedZeroTime.plusSeconds(3)
169198

199+
val zeroSecond = createPeriodSnapshot(alignedZeroTime, alignedZeroTime, 13)
200+
170201
// Aligned snapshots, every 5 seconds from second 00.
171202
val fiveSecondsOne = createPeriodSnapshot(alignedZeroTime, alignedZeroTime.plusSeconds(5), 22)
172203
val fiveSecondsTwo = createPeriodSnapshot(alignedZeroTime.plusSeconds(5), alignedZeroTime.plusSeconds(10), 33)
@@ -221,6 +252,7 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
221252
)
222253

223254
override protected def beforeAll(): Unit = {
255+
applyConfig("kamon.metric.optimistic-tick-alignment = yes")
224256
applyConfig("kamon.metric.tick-interval = 10 seconds")
225257
}
226258
}

core/kamon-core/src/main/scala/kamon/metric/PeriodSnapshot.scala

+16-5
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ object PeriodSnapshot {
9797
private var _accumulatingFrom: Option[Instant] = None
9898

9999
def add(periodSnapshot: PeriodSnapshot): Option[PeriodSnapshot] = {
100-
101100
// Initialize the next tick based on incoming snapshots.
102-
if(_nextTick == Instant.EPOCH)
103-
_nextTick = Clock.nextAlignedInstant(periodSnapshot.to, period)
101+
if (_nextTick == Instant.EPOCH) {
102+
_nextTick = nextInstant(periodSnapshot.to)
103+
}
104104

105105
// short-circuit if there is no need to accumulate (e.g. when metrics tick-interval is the same as duration or the
106106
// snapshots have a longer period than the duration).
@@ -116,10 +116,9 @@ object PeriodSnapshot {
116116

117117
for(from <- _accumulatingFrom if isAroundNextTick(periodSnapshot.to)) yield {
118118
val accumulatedPeriodSnapshot = buildPeriodSnapshot(from, periodSnapshot.to, resetState = true)
119-
_nextTick = Clock.nextAlignedInstant(_nextTick, period)
119+
_nextTick = nextInstant(_nextTick)
120120
_accumulatingFrom = None
121121
clearAccumulatedData()
122-
123122
accumulatedPeriodSnapshot
124123
}
125124
}
@@ -133,6 +132,18 @@ object PeriodSnapshot {
133132
Duration.between(instant, _nextTick.minus(margin)).toMillis() <= 0
134133
}
135134

135+
private def nextInstant(from: Instant): Instant = {
136+
if (isOptimisticAlignmentEnabled()) {
137+
Clock.nextAlignedInstant(from, period)
138+
} else {
139+
Instant.ofEpochMilli(from.toEpochMilli + period.toMillis)
140+
}
141+
}
142+
143+
private def isOptimisticAlignmentEnabled(): Boolean = {
144+
Kamon.config().getBoolean("kamon.metric.optimistic-tick-alignment")
145+
}
146+
136147
private def isSameDurationAsTickInterval(): Boolean = {
137148
Kamon.config().getDuration("kamon.metric.tick-interval") == period
138149
}

0 commit comments

Comments
 (0)