This repository has been archived by the owner on Aug 28, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 266
/
Copy pathmem_table.go
129 lines (109 loc) · 2.89 KB
/
mem_table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"sort"
"sync"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/hash"
)
type memTable struct {
chunks map[addr][]byte
order []hasRecord // Must maintain the invariant that these are sorted by rec.order
maxData, totalData uint64
snapper snappyEncoder
}
func newMemTable(memTableSize uint64) *memTable {
return &memTable{chunks: map[addr][]byte{}, maxData: memTableSize}
}
func (mt *memTable) addChunk(h addr, data []byte) bool {
if len(data) == 0 {
panic("NBS blocks cannont be zero length")
}
if _, ok := mt.chunks[h]; ok {
return true
}
dataLen := uint64(len(data))
if mt.totalData+dataLen > mt.maxData {
return false
}
mt.totalData += dataLen
mt.chunks[h] = data
mt.order = append(mt.order, hasRecord{
&h,
h.Prefix(),
len(mt.order),
false,
})
return true
}
func (mt *memTable) count() uint32 {
return uint32(len(mt.order))
}
func (mt *memTable) uncompressedLen() uint64 {
return mt.totalData
}
func (mt *memTable) has(h addr) (has bool) {
_, has = mt.chunks[h]
return
}
func (mt *memTable) hasMany(addrs []hasRecord) (remaining bool) {
for i, addr := range addrs {
if addr.has {
continue
}
if mt.has(*addr.a) {
addrs[i].has = true
} else {
remaining = true
}
}
return
}
func (mt *memTable) get(h addr, stats *Stats) []byte {
return mt.chunks[h]
}
func (mt *memTable) getMany(reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, stats *Stats) (remaining bool) {
for _, r := range reqs {
data := mt.chunks[*r.a]
if data != nil {
c := chunks.NewChunkWithHash(hash.Hash(*r.a), data)
foundChunks <- &c
} else {
remaining = true
}
}
return
}
func (mt *memTable) extract(chunks chan<- extractRecord) {
for _, hrec := range mt.order {
chunks <- extractRecord{a: *hrec.a, data: mt.chunks[*hrec.a]}
}
return
}
func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []byte, count uint32) {
maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData)
buff := make([]byte, maxSize)
tw := newTableWriter(buff, mt.snapper)
if haver != nil {
sort.Sort(hasRecordByPrefix(mt.order)) // hasMany() requires addresses to be sorted.
haver.hasMany(mt.order)
sort.Sort(hasRecordByOrder(mt.order)) // restore "insertion" order for write
}
for _, addr := range mt.order {
if !addr.has {
h := addr.a
tw.addChunk(*h, mt.chunks[*h])
count++
}
}
tableSize, name := tw.finish()
if count > 0 {
stats.BytesPerPersist.Sample(uint64(tableSize))
stats.CompressedChunkBytesPerPersist.Sample(uint64(tw.totalCompressedData))
stats.UncompressedChunkBytesPerPersist.Sample(uint64(tw.totalUncompressedData))
stats.ChunksPerPersist.Sample(uint64(count))
}
return name, buff[:tableSize], count
}