Skip to content

Commit b1ea360

Browse files
authored
chore: use atomic types instead (#1878)
1 parent 5aec88e commit b1ea360

File tree

22 files changed

+202
-211
lines changed

22 files changed

+202
-211
lines changed

badger/cmd/bank.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ var (
6969
numAccounts int
7070
numPrevious int
7171
duration string
72-
stopAll int32
72+
stopAll atomic.Int32
7373
checkStream bool
7474
checkSubscriber bool
7575
verbose bool
@@ -241,7 +241,7 @@ func seekTotal(txn *badger.Txn) ([]account, error) {
241241
if total != expected {
242242
log.Printf("Balance did NOT match up. Expected: %d. Received: %d",
243243
expected, total)
244-
atomic.AddInt32(&stopAll, 1)
244+
stopAll.Add(1)
245245
return accounts, errFailure
246246
}
247247
return accounts, nil
@@ -419,7 +419,7 @@ func runTest(cmd *cobra.Command, args []string) error {
419419

420420
// startTs := time.Now()
421421
endTs := time.Now().Add(dur)
422-
var total, errors, reads uint64
422+
var total, errors, reads atomic.Uint64
423423

424424
var wg sync.WaitGroup
425425
wg.Add(1)
@@ -429,15 +429,15 @@ func runTest(cmd *cobra.Command, args []string) error {
429429
defer ticker.Stop()
430430

431431
for range ticker.C {
432-
if atomic.LoadInt32(&stopAll) > 0 {
432+
if stopAll.Load() > 0 {
433433
// Do not proceed.
434434
return
435435
}
436436
// log.Printf("[%6s] Total: %d. Errors: %d Reads: %d.\n",
437437
// time.Since(startTs).Round(time.Second).String(),
438-
// atomic.LoadUint64(&total),
439-
// atomic.LoadUint64(&errors),
440-
// atomic.LoadUint64(&reads))
438+
// total.Load(),
439+
// errors.Load(),
440+
// reads.Load())
441441
if time.Now().After(endTs) {
442442
return
443443
}
@@ -454,7 +454,7 @@ func runTest(cmd *cobra.Command, args []string) error {
454454
defer ticker.Stop()
455455

456456
for range ticker.C {
457-
if atomic.LoadInt32(&stopAll) > 0 {
457+
if stopAll.Load() > 0 {
458458
// Do not proceed.
459459
return
460460
}
@@ -467,11 +467,11 @@ func runTest(cmd *cobra.Command, args []string) error {
467467
continue
468468
}
469469
err := moveMoney(db, from, to)
470-
atomic.AddUint64(&total, 1)
470+
total.Add(1)
471471
if err == nil && verbose {
472472
log.Printf("Moved $5. %d -> %d\n", from, to)
473473
} else {
474-
atomic.AddUint64(&errors, 1)
474+
errors.Add(1)
475475
}
476476
}
477477
}()
@@ -489,7 +489,7 @@ func runTest(cmd *cobra.Command, args []string) error {
489489
log.Printf("Received stream\n")
490490

491491
// Do not proceed.
492-
if atomic.LoadInt32(&stopAll) > 0 || time.Now().After(endTs) {
492+
if stopAll.Load() > 0 || time.Now().After(endTs) {
493493
return
494494
}
495495

@@ -533,7 +533,7 @@ func runTest(cmd *cobra.Command, args []string) error {
533533
defer ticker.Stop()
534534

535535
for range ticker.C {
536-
if atomic.LoadInt32(&stopAll) > 0 {
536+
if stopAll.Load() > 0 {
537537
// Do not proceed.
538538
return
539539
}
@@ -546,7 +546,7 @@ func runTest(cmd *cobra.Command, args []string) error {
546546
if err != nil {
547547
log.Printf("Error while calculating total: %v", err)
548548
} else {
549-
atomic.AddUint64(&reads, 1)
549+
reads.Add(1)
550550
}
551551
return nil
552552
}))
@@ -586,13 +586,13 @@ func runTest(cmd *cobra.Command, args []string) error {
586586
if err != nil {
587587
log.Printf("Error while calculating subscriber DB total: %v", err)
588588
} else {
589-
atomic.AddUint64(&reads, 1)
589+
reads.Add(1)
590590
}
591591
return nil
592592
}))
593593
}
594594

595-
if atomic.LoadInt32(&stopAll) == 0 {
595+
if stopAll.Load() == 0 {
596596
log.Println("Test OK")
597597
return nil
598598
}

badger/cmd/read_bench.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ This command reads data from existing Badger database randomly using multiple go
4343
}
4444

4545
var (
46-
sizeRead uint64 // will store size read till now
47-
entriesRead uint64 // will store entries read till now
48-
startTime time.Time // start time of read benchmarking
46+
sizeRead atomic.Uint64 // will store size read till now
47+
entriesRead atomic.Uint64 // will store entries read till now
48+
startTime time.Time // start time of read benchmarking
4949

5050
ro = struct {
5151
blockCacheSize int64
@@ -91,8 +91,8 @@ func fullScanDB(db *badger.DB) {
9191
defer it.Close()
9292
for it.Rewind(); it.Valid(); it.Next() {
9393
i := it.Item()
94-
atomic.AddUint64(&entriesRead, 1)
95-
atomic.AddUint64(&sizeRead, uint64(i.EstimatedSize()))
94+
entriesRead.Add(1)
95+
sizeRead.Add(uint64(i.EstimatedSize()))
9696
}
9797
}
9898

@@ -140,8 +140,8 @@ func printStats(c *z.Closer) {
140140
return
141141
case <-t.C:
142142
dur := time.Since(startTime)
143-
sz := atomic.LoadUint64(&sizeRead)
144-
entries := atomic.LoadUint64(&entriesRead)
143+
sz := sizeRead.Load()
144+
entries := entriesRead.Load()
145145
bytesRate := sz / uint64(dur.Seconds())
146146
entriesRate := entries / uint64(dur.Seconds())
147147
fmt.Printf("Time elapsed: %s, bytes read: %s, speed: %s/sec, "+
@@ -160,8 +160,8 @@ func readKeys(db *badger.DB, c *z.Closer, keys [][]byte) {
160160
return
161161
default:
162162
key := keys[r.Int31n(int32(len(keys)))]
163-
atomic.AddUint64(&sizeRead, lookupForKey(db, key))
164-
atomic.AddUint64(&entriesRead, 1)
163+
sizeRead.Add(lookupForKey(db, key))
164+
entriesRead.Add(1)
165165
}
166166
}
167167
}

badger/cmd/write_bench.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@ var (
7878
gcDiscardRatio float64
7979
}{}
8080

81-
sizeWritten uint64
82-
gcSuccess uint64
81+
sizeWritten atomic.Uint64
82+
gcSuccess atomic.Uint64
8383
sstCount uint32
8484
vlogCount uint32
8585
files []string
86-
entriesWritten uint64
86+
entriesWritten atomic.Uint64
8787
)
8888

8989
const (
@@ -161,8 +161,8 @@ func writeRandom(db *badger.DB, num uint64) error {
161161
panic(err)
162162
}
163163

164-
atomic.AddUint64(&entriesWritten, 1)
165-
atomic.AddUint64(&sizeWritten, es)
164+
entriesWritten.Add(1)
165+
sizeWritten.Add(es)
166166
}
167167
return batch.Flush()
168168
}
@@ -224,8 +224,8 @@ func writeSorted(db *badger.DB, num uint64) error {
224224
badger.KVToBuffer(kv, kvBuf)
225225

226226
sz += es
227-
atomic.AddUint64(&entriesWritten, 1)
228-
atomic.AddUint64(&sizeWritten, uint64(es))
227+
entriesWritten.Add(1)
228+
sizeWritten.Add(uint64(es))
229229

230230
if sz >= 4<<20 { // 4 MB
231231
writeCh <- kvBuf
@@ -390,8 +390,8 @@ func reportStats(c *z.Closer, db *badger.DB) {
390390
}
391391

392392
dur := time.Since(startTime)
393-
sz := atomic.LoadUint64(&sizeWritten)
394-
entries := atomic.LoadUint64(&entriesWritten)
393+
sz := sizeWritten.Load()
394+
entries := entriesWritten.Load()
395395
bytesRate := sz / uint64(dur.Seconds())
396396
entriesRate := entries / uint64(dur.Seconds())
397397
fmt.Printf("[WRITE] Time elapsed: %s, bytes written: %s, speed: %s/sec, "+
@@ -423,7 +423,7 @@ func runGC(c *z.Closer, db *badger.DB) {
423423
return
424424
case <-t.C:
425425
if err := db.RunValueLogGC(wo.gcDiscardRatio); err == nil {
426-
atomic.AddUint64(&gcSuccess, 1)
426+
gcSuccess.Add(1)
427427
} else {
428428
log.Printf("[GC] Failed due to following err %v", err)
429429
}
@@ -502,8 +502,8 @@ func printReadStats(c *z.Closer, startTime time.Time) {
502502
return
503503
case <-t.C:
504504
dur := time.Since(startTime)
505-
sz := atomic.LoadUint64(&sizeRead)
506-
entries := atomic.LoadUint64(&entriesRead)
505+
sz := sizeRead.Load()
506+
entries := entriesRead.Load()
507507
bytesRate := sz / uint64(dur.Seconds())
508508
entriesRate := entries / uint64(dur.Seconds())
509509
fmt.Printf("[READ] Time elapsed: %s, bytes read: %s, speed: %s/sec, "+

db.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ type DB struct {
112112
flushChan chan flushTask // For flushing memtables.
113113
closeOnce sync.Once // For closing DB only once.
114114

115-
blockWrites int32
116-
isClosed uint32
115+
blockWrites atomic.Int32
116+
isClosed atomic.Uint32
117117

118118
orc *oracle
119119
bannedNamespaces *lockedKeys
@@ -531,16 +531,16 @@ func (db *DB) Close() error {
531531
// IsClosed denotes if the badger DB is closed or not. A DB instance should not
532532
// be used after closing it.
533533
func (db *DB) IsClosed() bool {
534-
return atomic.LoadUint32(&db.isClosed) == 1
534+
return db.isClosed.Load() == 1
535535
}
536536

537537
func (db *DB) close() (err error) {
538538
defer db.allocPool.Release()
539539

540540
db.opt.Debugf("Closing database")
541-
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(atomic.LoadInt64(&db.lc.l0stallsMs)))
541+
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs.Load()))
542542

543-
atomic.StoreInt32(&db.blockWrites, 1)
543+
db.blockWrites.Store(1)
544544

545545
if !db.opt.InMemory {
546546
// Stop value GC first.
@@ -626,7 +626,7 @@ func (db *DB) close() (err error) {
626626
db.blockCache.Close()
627627
db.indexCache.Close()
628628

629-
atomic.StoreUint32(&db.isClosed, 1)
629+
db.isClosed.Store(1)
630630
db.threshold.close()
631631

632632
if db.opt.InMemory {
@@ -851,7 +851,7 @@ func (db *DB) writeRequests(reqs []*request) error {
851851
}
852852

853853
func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
854-
if atomic.LoadInt32(&db.blockWrites) == 1 {
854+
if db.blockWrites.Load() == 1 {
855855
return nil, ErrBlockedWrites
856856
}
857857
var count, size int64
@@ -1604,7 +1604,7 @@ func (db *DB) Flatten(workers int) error {
16041604

16051605
func (db *DB) blockWrite() error {
16061606
// Stop accepting new writes.
1607-
if !atomic.CompareAndSwapInt32(&db.blockWrites, 0, 1) {
1607+
if !db.blockWrites.CompareAndSwap(0, 1) {
16081608
return ErrBlockedWrites
16091609
}
16101610

@@ -1619,7 +1619,7 @@ func (db *DB) unblockWrite() {
16191619
go db.doWrites(db.closers.writes)
16201620

16211621
// Resume writes.
1622-
atomic.StoreInt32(&db.blockWrites, 0)
1622+
db.blockWrites.Store(0)
16231623
}
16241624

16251625
func (db *DB) prepareToDrop() (func(), error) {
@@ -1709,7 +1709,7 @@ func (db *DB) dropAll() (func(), error) {
17091709
if err != nil {
17101710
return resume, err
17111711
}
1712-
db.lc.nextFileID = 1
1712+
db.lc.nextFileID.Store(1)
17131713
db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
17141714
db.blockCache.Clear()
17151715
db.indexCache.Clear()
@@ -1906,7 +1906,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
19061906
return err
19071907
case <-ctx.Done():
19081908
c.Done()
1909-
atomic.StoreUint64(s.active, 0)
1909+
s.active.Store(0)
19101910
drain()
19111911
db.pub.deleteSubscriber(s.id)
19121912
// Delete the subscriber to avoid further updates.
@@ -1915,7 +1915,7 @@ func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches
19151915
err := slurp(batch)
19161916
if err != nil {
19171917
c.Done()
1918-
atomic.StoreUint64(s.active, 0)
1918+
s.active.Store(0)
19191919
drain()
19201920
// Delete the subscriber if there is an error by the callback.
19211921
db.pub.deleteSubscriber(s.id)

integration/testgc/main.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type testSuite struct {
2424
sync.Mutex
2525
vals map[uint64]uint64
2626

27-
count uint64 // Not under mutex lock.
27+
count atomic.Uint64 // Not under mutex lock.
2828
}
2929

3030
func encoded(i uint64) []byte {
@@ -39,7 +39,7 @@ func (s *testSuite) write(db *badger.DB) error {
3939
// These keys would be overwritten.
4040
keyi := uint64(rand.Int63n(maxValue))
4141
key := encoded(keyi)
42-
vali := atomic.AddUint64(&s.count, 1)
42+
vali := s.count.Add(1)
4343
val := encoded(vali)
4444
val = append(val, suffix...)
4545
if err := txn.SetEntry(badger.NewEntry(key, val)); err != nil {
@@ -48,7 +48,7 @@ func (s *testSuite) write(db *badger.DB) error {
4848
}
4949
for i := 0; i < 20; i++ {
5050
// These keys would be new and never overwritten.
51-
keyi := atomic.AddUint64(&s.count, 1)
51+
keyi := s.count.Add(1)
5252
if keyi%1000000 == 0 {
5353
log.Printf("Count: %d\n", keyi)
5454
}
@@ -63,7 +63,7 @@ func (s *testSuite) write(db *badger.DB) error {
6363
}
6464

6565
func (s *testSuite) read(db *badger.DB) error {
66-
max := int64(atomic.LoadUint64(&s.count))
66+
max := int64(s.count.Load())
6767
keyi := uint64(rand.Int63n(max))
6868
key := encoded(keyi)
6969

@@ -138,11 +138,9 @@ func main() {
138138
}
139139
}()
140140

141-
s := testSuite{
142-
count: uint64(maxValue),
143-
vals: make(map[uint64]uint64),
144-
}
145-
var numLoops uint64
141+
s := testSuite{vals: make(map[uint64]uint64)}
142+
s.count.Store(uint64(maxValue))
143+
var numLoops atomic.Uint64
146144
ticker := time.NewTicker(5 * time.Second)
147145
for i := 0; i < 10; i++ {
148146
go func() {
@@ -156,7 +154,7 @@ func main() {
156154
log.Fatal(err)
157155
}
158156
}
159-
nl := atomic.AddUint64(&numLoops, 1)
157+
nl := numLoops.Add(1)
160158
select {
161159
case <-closer.HasBeenClosed():
162160
return

0 commit comments

Comments
 (0)