Skip to content

Commit 71262ec

Browse files
changed numeric aggregators to use float64
1 parent 79b3cf6 commit 71262ec

File tree

5 files changed

+63
-31
lines changed

5 files changed

+63
-31
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ require (
99
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
1010
)
1111

12+
require go.uber.org/atomic v1.11.0
13+
1214
retract (
1315
v9.15.1 // This version is used to retract v9.15.0
1416
v9.15.0 // This version was accidentally released. It is identical to 9.15.0-beta.2

go.sum

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,10 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
44
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
55
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
66
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
7+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
78
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
89
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
10+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
11+
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
12+
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
13+
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=

internal/routing/aggregator.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import (
55
"fmt"
66
"math"
77
"sync"
8+
89
"sync/atomic"
910

1011
"github.com/redis/go-redis/v9/internal/util"
12+
uberAtomic "go.uber.org/atomic"
1113
)
1214

1315
var (
@@ -214,7 +216,7 @@ func (a *OneSucceededAggregator) Result() (interface{}, error) {
214216
// AggSumAggregator sums numeric replies from all shards.
215217
type AggSumAggregator struct {
216218
err atomic.Value
217-
res int64
219+
res uberAtomic.Float64
218220
}
219221

220222
func (a *AggSumAggregator) Add(result interface{}, err error) error {
@@ -223,12 +225,12 @@ func (a *AggSumAggregator) Add(result interface{}, err error) error {
223225
}
224226

225227
if result != nil {
226-
val, err := toInt64(result)
228+
val, err := toFloat64(result)
227229
if err != nil {
228230
a.err.CompareAndSwap(nil, err)
229231
return err
230232
}
231-
atomic.AddInt64(&a.res, val)
233+
a.res.Add(val)
232234
}
233235

234236
return nil
@@ -277,7 +279,7 @@ func (a *AggSumAggregator) BatchSlice(results []AggregatorResErr) error {
277279
}
278280

279281
func (a *AggSumAggregator) Result() (interface{}, error) {
280-
res, err := atomic.LoadInt64(&a.res), a.err.Load()
282+
res, err := a.res.Load(), a.err.Load()
281283
if err != nil {
282284
return nil, err.(error)
283285
}
@@ -297,13 +299,13 @@ func (a *AggMinAggregator) Add(result interface{}, err error) error {
297299
return nil
298300
}
299301

300-
intVal, e := toInt64(result)
302+
floatVal, e := toFloat64(result)
301303
if e != nil {
302304
a.err.CompareAndSwap(nil, err)
303305
return nil
304306
}
305307

306-
a.res.Value(intVal)
308+
a.res.Value(floatVal)
307309

308310
return nil
309311
}
@@ -337,22 +339,22 @@ func (a *AggMinAggregator) AddWithKey(key string, result interface{}, err error)
337339
}
338340

339341
func (a *AggMinAggregator) BatchSlice(results []AggregatorResErr) error {
340-
min := int64(math.MaxInt64)
342+
min := float64(math.MaxFloat64)
341343

342344
for _, res := range results {
343345
if res.Err != nil {
344346
_ = a.Add(nil, res.Err)
345347
return nil
346348
}
347349

348-
resInt, err := toInt64(res.Result)
350+
floatVal, err := toFloat64(res.Result)
349351
if err != nil {
350352
_ = a.Add(nil, res.Err)
351353
return nil
352354
}
353355

354-
if resInt < min {
355-
min = resInt
356+
if floatVal < min {
357+
min = floatVal
356358
}
357359

358360
}
@@ -385,13 +387,13 @@ func (a *AggMaxAggregator) Add(result interface{}, err error) error {
385387
return nil
386388
}
387389

388-
intVal, e := toInt64(result)
390+
floatVal, e := toFloat64(result)
389391
if e != nil {
390392
a.err.CompareAndSwap(nil, err)
391393
return nil
392394
}
393395

394-
a.res.Value(intVal)
396+
a.res.Value(floatVal)
395397

396398
return nil
397399
}
@@ -650,6 +652,27 @@ func toInt64(val interface{}) (int64, error) {
650652
}
651653
}
652654

655+
func toFloat64(val interface{}) (float64, error) {
656+
if val == nil {
657+
return 0, nil
658+
}
659+
660+
switch v := val.(type) {
661+
case float64:
662+
return v, nil
663+
case int:
664+
return float64(v), nil
665+
case int32:
666+
return float64(v), nil
667+
case int64:
668+
return float64(v), nil
669+
case float32:
670+
return float64(v), nil
671+
default:
672+
return 0, fmt.Errorf("cannot convert %T to float64", val)
673+
}
674+
}
675+
653676
func toBool(val interface{}) (bool, error) {
654677
if val == nil {
655678
return false, nil

internal/util/atomic_max.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
ISC License
44
55
Modified by htemelski-redis
6-
Removed the treshold, adapted it to work with int64
6+
Removed the treshold, adapted it to work with float64
77
*/
88

99
package util
1010

1111
import (
1212
"math"
13-
"sync/atomic"
13+
14+
"go.uber.org/atomic"
1415
)
1516

1617
// AtomicMax is a thread-safe max container
@@ -22,7 +23,7 @@ import (
2223
type AtomicMax struct {
2324

2425
// value is current max
25-
value atomic.Int64
26+
value atomic.Float64
2627
// whether [AtomicMax.Value] has been invoked
2728
// with value equal or greater to threshold
2829
hasValue atomic.Bool
@@ -32,7 +33,7 @@ type AtomicMax struct {
3233
// - if threshold is not used, AtomicMax is initialization-free
3334
func NewAtomicMax() (atomicMax *AtomicMax) {
3435
m := AtomicMax{}
35-
m.value.Store(math.MinInt64)
36+
m.value.Store((-math.MaxFloat64))
3637
return &m
3738
}
3839

@@ -44,14 +45,14 @@ func NewAtomicMax() (atomicMax *AtomicMax) {
4445
// - upon return, Max and Max1 are guaranteed to reflect the invocation
4546
// - the return order of concurrent Value invocations is not guaranteed
4647
// - Thread-safe
47-
func (m *AtomicMax) Value(value int64) (isNewMax bool) {
48-
// math.MinInt64 as max case
48+
func (m *AtomicMax) Value(value float64) (isNewMax bool) {
49+
// -math.MaxFloat64 as max case
4950
var hasValue0 = m.hasValue.Load()
50-
if value == math.MinInt64 {
51+
if value == (-math.MaxFloat64) {
5152
if !hasValue0 {
5253
isNewMax = m.hasValue.CompareAndSwap(false, true)
5354
}
54-
return // math.MinInt64 as max: isNewMax true for first 0 writer
55+
return // -math.MaxFloat64 as max: isNewMax true for first 0 writer
5556
}
5657

5758
// check against present value
@@ -82,7 +83,7 @@ func (m *AtomicMax) Value(value int64) (isNewMax bool) {
8283
// - hasValue true indicates that value reflects a Value invocation
8384
// - hasValue false: value is zero-value
8485
// - Thread-safe
85-
func (m *AtomicMax) Max() (value int64, hasValue bool) {
86+
func (m *AtomicMax) Max() (value float64, hasValue bool) {
8687
if hasValue = m.hasValue.Load(); !hasValue {
8788
return
8889
}
@@ -93,4 +94,4 @@ func (m *AtomicMax) Max() (value int64, hasValue bool) {
9394
// Max1 returns current maximum whether zero-value or set by Value
9495
// - threshold is ignored
9596
// - Thread-safe
96-
func (m *AtomicMax) Max1() (value int64) { return m.value.Load() }
97+
func (m *AtomicMax) Max1() (value float64) { return m.value.Load() }

internal/util/atomic_min.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ Adapted from the modified atomic_max, but with inverted logic
1010

1111
import (
1212
"math"
13-
"sync/atomic"
13+
14+
"go.uber.org/atomic"
1415
)
1516

1617
// AtomicMin is a thread-safe Min container
@@ -21,7 +22,7 @@ import (
2122
type AtomicMin struct {
2223

2324
// value is current Min
24-
value atomic.Int64
25+
value atomic.Float64
2526
// whether [AtomicMin.Value] has been invoked
2627
// with value equal or greater to threshold
2728
hasValue atomic.Bool
@@ -31,7 +32,7 @@ type AtomicMin struct {
3132
// - if threshold is not used, AtomicMin is initialization-free
3233
func NewAtomicMin() (atomicMin *AtomicMin) {
3334
m := AtomicMin{}
34-
m.value.Store(math.MaxInt64)
35+
m.value.Store(math.MaxFloat64)
3536
return &m
3637
}
3738

@@ -43,14 +44,14 @@ func NewAtomicMin() (atomicMin *AtomicMin) {
4344
// - upon return, Min and Min1 are guaranteed to reflect the invocation
4445
// - the return order of concurrent Value invocations is not guaranteed
4546
// - Thread-safe
46-
func (m *AtomicMin) Value(value int64) (isNewMin bool) {
47-
// math.MaxInt64 as Min case
47+
func (m *AtomicMin) Value(value float64) (isNewMin bool) {
48+
// math.MaxFloat64 as Min case
4849
var hasValue0 = m.hasValue.Load()
49-
if value == math.MaxInt64 {
50+
if value == math.MaxFloat64 {
5051
if !hasValue0 {
5152
isNewMin = m.hasValue.CompareAndSwap(false, true)
5253
}
53-
return // math.MaxInt64 as Min: isNewMin true for first 0 writer
54+
return // math.MaxFloat64 as Min: isNewMin true for first 0 writer
5455
}
5556

5657
// check against present value
@@ -81,7 +82,7 @@ func (m *AtomicMin) Value(value int64) (isNewMin bool) {
8182
// - hasValue true indicates that value reflects a Value invocation
8283
// - hasValue false: value is zero-value
8384
// - Thread-safe
84-
func (m *AtomicMin) Min() (value int64, hasValue bool) {
85+
func (m *AtomicMin) Min() (value float64, hasValue bool) {
8586
if hasValue = m.hasValue.Load(); !hasValue {
8687
return
8788
}
@@ -92,4 +93,4 @@ func (m *AtomicMin) Min() (value int64, hasValue bool) {
9293
// Min1 returns current Minimum whether zero-value or set by Value
9394
// - threshold is ignored
9495
// - Thread-safe
95-
func (m *AtomicMin) Min1() (value int64) { return m.value.Load() }
96+
func (m *AtomicMin) Min1() (value float64) { return m.value.Load() }

0 commit comments

Comments
 (0)