@@ -758,13 +758,6 @@ var requestPool = sync.Pool{
758758}
759759
760760func (db * DB ) writeToLSM (b * request ) error {
761- // We should check the length of b.Prts and b.Entries only when badger is not
762- // running in InMemory mode. In InMemory mode, we don't write anything to the
763- // value log and that's why the length of b.Ptrs will always be zero.
764- if ! db .opt .InMemory && len (b .Ptrs ) != len (b .Entries ) {
765- return errors .Errorf ("Ptrs and Entries don't match: %+v" , b )
766- }
767-
768761 for i , entry := range b .Entries {
769762 var err error
770763 if entry .skipVlogAndSetThreshold (db .valueThreshold ()) {
@@ -829,6 +822,7 @@ func (db *DB) writeRequests(reqs []*request) error {
829822 }
830823 count += len (b .Entries )
831824 var i uint64
825+ var err error
832826 for err = db .ensureRoomForWrite (); err == errNoRoom ; err = db .ensureRoomForWrite () {
833827 i ++
834828 if i % 100 == 0 {
@@ -945,7 +939,8 @@ func (db *DB) doWrites(lc *z.Closer) {
945939
946940// batchSet applies a list of badger.Entry. If a request level error occurs it
947941// will be returned.
948- // Check(kv.BatchSet(entries))
942+ //
943+ // Check(kv.BatchSet(entries))
949944func (db * DB ) batchSet (entries []* Entry ) error {
950945 req , err := db .sendToWriteCh (entries )
951946 if err != nil {
@@ -958,9 +953,10 @@ func (db *DB) batchSet(entries []*Entry) error {
958953// batchSetAsync is the asynchronous version of batchSet. It accepts a callback
959954// function which is called when all the sets are complete. If a request level
960955// error occurs, it will be passed back via the callback.
961- // err := kv.BatchSetAsync(entries, func(err error)) {
962- // Check(err)
963- // }
956+ //
957+ // err := kv.BatchSetAsync(entries, func(err error)) {
958+ // Check(err)
959+ // }
964960func (db * DB ) batchSetAsync (entries []* Entry , f func (error )) error {
965961 req , err := db .sendToWriteCh (entries )
966962 if err != nil {
@@ -1011,10 +1007,16 @@ func arenaSize(opt Options) int64 {
10111007
10121008// buildL0Table builds a new table from the memtable.
10131009func buildL0Table (ft flushTask , bopts table.Options ) * table.Builder {
1014- iter := ft .mt .sl .NewIterator ()
1010+ var iter y.Iterator
1011+ if ft .itr != nil {
1012+ iter = ft .itr
1013+ } else {
1014+ iter = ft .mt .sl .NewUniIterator (false )
1015+ }
10151016 defer iter .Close ()
1017+
10161018 b := table .NewTableBuilder (bopts )
1017- for iter .SeekToFirst (); iter .Valid (); iter .Next () {
1019+ for iter .Rewind (); iter .Valid (); iter .Next () {
10181020 if len (ft .dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), ft .dropPrefixes ) {
10191021 continue
10201022 }
@@ -1030,16 +1032,13 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
10301032
10311033type flushTask struct {
10321034 mt * memTable
1035+ itr y.Iterator
10331036 dropPrefixes [][]byte
10341037}
10351038
10361039// handleFlushTask must be run serially.
10371040func (db * DB ) handleFlushTask (ft flushTask ) error {
1038- // There can be a scenario, when empty memtable is flushed.
1039- if ft .mt .sl .Empty () {
1040- return nil
1041- }
1042-
1041+ // ft.mt could be nil with ft.itr being the valid field.
10431042 bopts := buildTableOptions (db )
10441043 builder := buildL0Table (ft , bopts )
10451044 defer builder .Close ()
@@ -1075,11 +1074,48 @@ func (db *DB) handleFlushTask(ft flushTask) error {
10751074func (db * DB ) flushMemtable (lc * z.Closer ) error {
10761075 defer lc .Done ()
10771076
1077+ var sz int64
1078+ var itrs []y.Iterator
1079+ var mts []* memTable
1080+ slurp := func () {
1081+ for {
1082+ select {
1083+ case more := <- db .flushChan :
1084+ if more .mt == nil {
1085+ return
1086+ }
1087+ sl := more .mt .sl
1088+ itrs = append (itrs , sl .NewUniIterator (false ))
1089+ mts = append (mts , more .mt )
1090+
1091+ sz += sl .MemSize ()
1092+ if sz > db .opt .MemTableSize {
1093+ return
1094+ }
1095+ default :
1096+ return
1097+ }
1098+ }
1099+ }
1100+
10781101 for ft := range db .flushChan {
10791102 if ft .mt == nil {
10801103 // We close db.flushChan now, instead of sending a nil ft.mt.
10811104 continue
10821105 }
1106+ sz = ft .mt .sl .MemSize ()
1107+ // Reset of itrs, mts etc. is being done below.
1108+ y .AssertTrue (len (itrs ) == 0 && len (mts ) == 0 )
1109+ itrs = append (itrs , ft .mt .sl .NewUniIterator (false ))
1110+ mts = append (mts , ft .mt )
1111+
1112+ // Pick more memtables, so we can really fill up the L0 table.
1113+ slurp ()
1114+
1115+ // db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
1116+ ft .mt = nil
1117+ ft .itr = table .NewMergeIterator (itrs , false )
1118+
10831119 for {
10841120 err := db .handleFlushTask (ft )
10851121 if err == nil {
@@ -1090,9 +1126,11 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
10901126 // which would arrive here would match db.imm[0], because we acquire a
10911127 // lock over DB when pushing to flushChan.
10921128 // TODO: This logic is dirty AF. Any change and this could easily break.
1093- y .AssertTrue (ft .mt == db .imm [0 ])
1094- db .imm = db .imm [1 :]
1095- ft .mt .DecrRef () // Return memory.
1129+ for _ , mt := range mts {
1130+ y .AssertTrue (mt == db .imm [0 ])
1131+ db .imm = db .imm [1 :]
1132+ mt .DecrRef () // Return memory.
1133+ }
10961134 db .lock .Unlock ()
10971135
10981136 break
@@ -1101,6 +1139,8 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
11011139 db .opt .Errorf ("Failure while flushing memtable to disk: %v. Retrying...\n " , err )
11021140 time .Sleep (time .Second )
11031141 }
1142+ // Reset everything.
1143+ itrs , mts , sz = itrs [:0 ], mts [:0 ], 0
11041144 }
11051145 return nil
11061146}
@@ -1719,16 +1759,16 @@ func (db *DB) dropAll() (func(), error) {
17191759}
17201760
17211761// DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
1722- // - Stop accepting new writes.
1723- // - Stop memtable flushes before acquiring lock. Because we're acquring lock here
1724- // and memtable flush stalls for lock, which leads to deadlock
1725- // - Flush out all memtables, skipping over keys with the given prefix, Kp.
1726- // - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
1727- // back after a restart.
1728- // - Stop compaction.
1729- // - Compact L0->L1, skipping over Kp.
1730- // - Compact rest of the levels, Li->Li, picking tables which have Kp.
1731- // - Resume memtable flushes, compactions and writes.
1762+ // - Stop accepting new writes.
1763+ // - Stop memtable flushes before acquiring lock. Because we're acquring lock here
1764+ // and memtable flush stalls for lock, which leads to deadlock
1765+ // - Flush out all memtables, skipping over keys with the given prefix, Kp.
1766+ // - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
1767+ // back after a restart.
1768+ // - Stop compaction.
1769+ // - Compact L0->L1, skipping over Kp.
1770+ // - Compact rest of the levels, Li->Li, picking tables which have Kp.
1771+ // - Resume memtable flushes, compactions and writes.
17321772func (db * DB ) DropPrefix (prefixes ... []byte ) error {
17331773 if len (prefixes ) == 0 {
17341774 return nil
0 commit comments