Skip to content

Commit

Permalink
Fix: 修复并发 bug
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjiandongx committed Aug 8, 2021
1 parent 1ccde6c commit 1421c75
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
27 changes: 27 additions & 0 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mandodb

import (
"os"
"sync"

"github.com/chenjiandongx/mandodb/pkg/sortedlist"
)
Expand Down Expand Up @@ -35,6 +36,7 @@ type Desc struct {
}

type segmentList struct {
mut sync.Mutex
head Segment
lst sortedlist.List
}
Expand All @@ -44,6 +46,9 @@ func newSegmentList() *segmentList {
}

func (sl *segmentList) Get(start, end int64) []Segment {
sl.mut.Lock()
defer sl.mut.Unlock()

segs := make([]Segment, 0)

iter := sl.lst.All()
Expand Down Expand Up @@ -80,10 +85,16 @@ func (sl *segmentList) Choose(seg Segment, start, end int64) bool {
}

func (sl *segmentList) Add(segment Segment) {
sl.mut.Lock()
defer sl.mut.Unlock()

sl.lst.Add(segment.MinTs(), segment)
}

func (sl *segmentList) Remove(segment Segment) error {
sl.mut.Lock()
defer sl.mut.Unlock()

if err := segment.Close(); err != nil {
return err
}
Expand All @@ -96,6 +107,22 @@ func (sl *segmentList) Remove(segment Segment) error {
return nil
}

func (sl *segmentList) Replace(pre, nxt Segment) error {
sl.mut.Lock()
defer sl.mut.Unlock()

if err := pre.Close(); err != nil {
return err
}

if err := pre.Cleanup(); err != nil {
return err
}

sl.lst.Add(pre.MinTs(), nxt)
return nil
}

const metricName = "__name__"

func isFileExist(path string) bool {
Expand Down
5 changes: 3 additions & 2 deletions tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,7 @@ func (tsdb *TSDB) getHeadPartition() (Segment, error) {
return
}

tsdb.segs.Remove(head)
tsdb.segs.Add(newDiskSegment(mf, dn, head.MinTs(), head.MaxTs()))
tsdb.segs.Replace(head, newDiskSegment(mf, dn, head.MinTs(), head.MaxTs()))
logger.Infof("write file %s take: %v", fname, time.Since(t0))
}()

Expand Down Expand Up @@ -353,6 +352,7 @@ func (tsdb *TSDB) mergeQuerySeriesResult(ret ...LabelSet) []map[string]string {
func (tsdb *TSDB) QueryLabelValues(label string, start, end int64) []string {
tmp := make(map[string]struct{})
for _, segment := range tsdb.segs.Get(start, end) {
segment = segment.Load()
values := segment.QueryLabelValues(label)
for i := 0; i < len(values); i++ {
tmp[values[i]] = struct{}{}
Expand Down Expand Up @@ -406,6 +406,7 @@ func (tsdb *TSDB) removeExpires() {
}

func (tsdb *TSDB) loadFiles() {
mkdir(globalOpts.dataPath)
err := filepath.Walk(globalOpts.dataPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("failed to read the dir: %s, err: %v", path, err)
Expand Down
10 changes: 5 additions & 5 deletions tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func genPoints(ts int64, node, dc int) []*Row {
}

func TestTSDB_QueryRange(t *testing.T) {
tmpdir := "/tmp/tsdb"
tmpdir := "/tmp/tsdb1"

store := OpenTSDB(WithDataPath(tmpdir), WithLoggerConfig(&logger.Options{
Stdout: true,
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestTSDB_QueryRange(t *testing.T) {
}

func TestTSDB_QuerySeries(t *testing.T) {
tmpdir := "/tmp/tsdb"
tmpdir := "/tmp/tsdb2"

store := OpenTSDB(WithDataPath(tmpdir))
defer store.Close()
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestTSDB_QuerySeries(t *testing.T) {
}

func TestTSDB_QueryLabelValues(t *testing.T) {
tmpdir := "/tmp/tsdb"
tmpdir := "/tmp/tsdb3"

store := OpenTSDB(WithDataPath(tmpdir))
defer store.Close()
Expand All @@ -142,6 +142,6 @@ func TestTSDB_QueryLabelValues(t *testing.T) {

time.Sleep(time.Millisecond * 20)

ret := store.QueryLabelValues("idc", start, start+120)
assert.Equal(t, ret, []string{"0", "1", "2"})
ret := store.QueryLabelValues("node", start, start+120)
assert.Equal(t, ret, []string{"vm0", "vm1", "vm2"})
}

0 comments on commit 1421c75

Please sign in to comment.