From 7f341460605fdb62fb451351c59a996ce3582271 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Mon, 20 Jan 2025 18:10:19 +0800 Subject: [PATCH 01/11] expose etcd's transaction as LowLevelTxn interface to kv.Base, simulate the behavior in memkv and leveldb Signed-off-by: MyonKeminta --- errors.toml | 5 + pkg/errs/errno.go | 1 + pkg/storage/kv/etcd_kv.go | 123 +++++++++++++++++++ pkg/storage/kv/kv.go | 133 +++++++++++++++++++- pkg/storage/kv/kv_test.go | 236 ++++++++++++++++++++++++++++++++++++ pkg/storage/kv/levedb_kv.go | 133 +++++++++++++++++++- pkg/storage/kv/mem_kv.go | 121 ++++++++++++++++++ 7 files changed, 746 insertions(+), 6 deletions(-) diff --git a/errors.toml b/errors.toml index 9980a98ab14..a56b187cbac 100644 --- a/errors.toml +++ b/errors.toml @@ -416,6 +416,11 @@ error = ''' etcd transaction failed, conflicted and rolled back ''' +[PD:etcd:ErrEtcdTxnResponse] +error = ''' +etcd transaction returned invalid response: %v +''' + ["PD:etcd:ErrEtcdTxnInternal"] error = ''' internal etcd transaction error occurred diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 834bf4f824e..5692f6cf037 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -390,6 +390,7 @@ var ( ErrEtcdGrantLease = errors.Normalize("etcd lease failed", errors.RFCCodeText("PD:etcd:ErrEtcdGrantLease")) ErrEtcdTxnInternal = errors.Normalize("internal etcd transaction error occurred", errors.RFCCodeText("PD:etcd:ErrEtcdTxnInternal")) ErrEtcdTxnConflict = errors.Normalize("etcd transaction failed, conflicted and rolled back", errors.RFCCodeText("PD:etcd:ErrEtcdTxnConflict")) + ErrEtcdTxnResponse = errors.Normalize("etcd transaction returned invalid response: %v", errors.RFCCodeText("PD:etcd:ErrEtcdTxnResponse")) ErrEtcdKVPut = errors.Normalize("etcd KV put failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVPut")) ErrEtcdKVDelete = errors.Normalize("etcd KV delete failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVDelete")) ErrEtcdKVGet = errors.Normalize("etcd KV get failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVGet")) diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index c25f4d66060..e4b838e3047 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -16,6 +16,7 @@ package kv import ( "context" + "fmt" "path" "strings" "time" @@ -139,6 +140,13 @@ func (kv *etcdKVBase) Remove(key string) error { return nil } +func (kv *etcdKVBase) CreateLowLevelTxn() LowLevelTxn { + return &lowLevelTxnWrapper{ + inner: NewSlowLogTxn(kv.client), + rootPath: kv.rootPath, + } +} + // SlowLogTxn wraps etcd transaction and log slow one. type SlowLogTxn struct { clientv3.Txn @@ -296,3 +304,118 @@ func (txn *etcdTxn) commit() error { } return nil } + +type lowLevelTxnWrapper struct { + inner clientv3.Txn + rootPath string +} + +func (l *lowLevelTxnWrapper) If(conditions ...LowLevelTxnCondition) LowLevelTxn { + cmpList := make([]clientv3.Cmp, 0, len(conditions)) + for _, c := range conditions { + key := strings.Join([]string{l.rootPath, c.Key}, "/") + if c.CmpType == LowLevelCmpExists { + cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), ">", 0)) + } else if c.CmpType == LowLevelCmpNotExists { + cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), "=", 0)) + } else { + var cmpOp string + switch c.CmpType { + case LowLevelCmpEqual: + cmpOp = "=" + case LowLevelCmpNotEqual: + cmpOp = "!=" + case LowLevelCmpGreater: + cmpOp = ">" + case LowLevelCmpLess: + cmpOp = "<" + default: + panic(fmt.Sprintf("unknown cmp type %v", c.CmpType)) + } + cmpList = append(cmpList, clientv3.Compare(clientv3.Value(key), cmpOp, c.Value)) + } + } + l.inner = l.inner.If(cmpList...) + return l +} + +func (l *lowLevelTxnWrapper) convertOps(ops []LowLevelTxnOp) []clientv3.Op { + opsList := make([]clientv3.Op, 0, len(ops)) + for _, op := range ops { + key := strings.Join([]string{l.rootPath, op.Key}, "/") + switch op.OpType { + case LowLevelOpPut: + opsList = append(opsList, clientv3.OpPut(key, op.Value)) + case LowLevelOpDelete: + opsList = append(opsList, clientv3.OpDelete(key)) + case LowLevelOpGet: + opsList = append(opsList, clientv3.OpGet(key)) + case LowLevelOpGetRange: + if op.EndKey == "\x00" { + opsList = append(opsList, clientv3.OpGet(key, clientv3.WithPrefix(), clientv3.WithLimit(int64(op.Limit)))) + } else { + endKey := strings.Join([]string{l.rootPath, op.EndKey}, "/") + opsList = append(opsList, clientv3.OpGet(key, clientv3.WithRange(endKey), clientv3.WithLimit(int64(op.Limit)))) + } + default: + panic(fmt.Sprintf("unknown op type %v", op.OpType)) + } + } + return opsList +} + +func (l *lowLevelTxnWrapper) Then(ops ...LowLevelTxnOp) LowLevelTxn { + l.inner = l.inner.Then(l.convertOps(ops)...) + return l +} + +func (l *lowLevelTxnWrapper) Else(ops ...LowLevelTxnOp) LowLevelTxn { + l.inner = l.inner.Else(l.convertOps(ops)...) + return l +} + +func (l *lowLevelTxnWrapper) Commit(_ctx context.Context) (LowLevelTxnResult, error) { + resp, err := l.inner.Commit() + if err != nil { + return LowLevelTxnResult{}, err + } + items := make([]LowLevelTxnResultItem, 0, len(resp.Responses)) + for i, respItem := range resp.Responses { + var resultItem LowLevelTxnResultItem + if put := respItem.GetResponsePut(); put != nil { + // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. + resultItem = LowLevelTxnResultItem{} + if put.PrevKv != nil { + key := strings.TrimPrefix(strings.TrimPrefix(string(put.PrevKv.Key), l.rootPath), "/") + resultItem.KeyValuePairs = []KeyValuePair{{ + Key: key, + Value: string(put.PrevKv.Value), + }} + } + } else if del := respItem.GetResponseDeleteRange(); del != nil { + // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. + resultItem = LowLevelTxnResultItem{} + } else if rangeResp := respItem.GetResponseRange(); rangeResp != nil { + kvs := make([]KeyValuePair, 0, len(rangeResp.Kvs)) + for _, kv := range rangeResp.Kvs { + key := strings.TrimPrefix(strings.TrimPrefix(string(kv.Key), l.rootPath), "/") + kvs = append(kvs, KeyValuePair{ + Key: key, + Value: string(kv.Value), + }) + } + resultItem = LowLevelTxnResultItem{ + KeyValuePairs: kvs, + } + } else { + return LowLevelTxnResult{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs( + fmt.Sprintf("succeeded: %v, index: %v, response: %v", resp.Succeeded, i, respItem), + ) + } + items = append(items, resultItem) + } + return LowLevelTxnResult{ + Succeeded: resp.Succeeded, + Items: items, + }, nil +} diff --git a/pkg/storage/kv/kv.go b/pkg/storage/kv/kv.go index a6e870db9c9..48640e743c9 100644 --- a/pkg/storage/kv/kv.go +++ b/pkg/storage/kv/kv.go @@ -16,24 +16,142 @@ package kv import "context" -// Txn bundles multiple operations into a single executable unit. -// It enables kv to atomically apply a set of updates. -type Txn interface { +type BaseReadWrite interface { Save(key, value string) error Remove(key string) error Load(key string) (string, error) LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) } +// Txn bundles multiple operations into a single executable unit. +// It enables kv to atomically apply a set of updates. +type Txn interface { + BaseReadWrite +} + +type LowLevelTxnCmpType int +type LowLevelTxnOpType int + +const ( + LowLevelCmpEqual LowLevelTxnCmpType = iota + LowLevelCmpNotEqual + LowLevelCmpLess + LowLevelCmpGreater + LowLevelCmpExists + LowLevelCmpNotExists +) + +const ( + LowLevelOpPut LowLevelTxnOpType = iota + LowLevelOpDelete + LowLevelOpGet + LowLevelOpGetRange +) + +type LowLevelTxnCondition struct { + Key string + CmpType LowLevelTxnCmpType + Value string +} + +func (c *LowLevelTxnCondition) CheckOnValue(value string, exists bool) bool { + switch c.CmpType { + case LowLevelCmpEqual: + if exists && value == c.Value { + return true + } + case LowLevelCmpNotEqual: + if exists && value != c.Value { + return true + } + case LowLevelCmpLess: + if exists && value < c.Value { + return true + } + case LowLevelCmpGreater: + if exists && value > c.Value { + return true + } + case LowLevelCmpExists: + if exists { + return true + } + case LowLevelCmpNotExists: + if !exists { + return true + } + default: + panic("unreachable") + } + return false +} + +type LowLevelTxnOp struct { + Key string + OpType LowLevelTxnOpType + Value string + // The end key when the OpType is LowLevelOpGetRange. + EndKey string + // The limit of the keys to get when the OpType is LowLevelOpGetRange. + Limit int +} + +type KeyValuePair struct { + Key string + Value string +} + +// LowLevelTxnResultItem represents a single result of a read operation in a LowLevelTxn. +type LowLevelTxnResultItem struct { + KeyValuePairs []KeyValuePair + More bool +} + +// LowLevelTxnResult represents the result of a LowLevelTxn. The results of operations in `Then` or `Else` branches +// will be listed in `Items` in the same order as the operations are added. +// For Put or Delete operations, its corresponding result is the previous value before writing. +type LowLevelTxnResult struct { + Succeeded bool + Items []LowLevelTxnResultItem +} + +// LowLevelTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction +// API. When the backend is etcd, it simply calls etcd's equivalent APIs internally. Otherwise, the +// behavior is simulated. +// Considering that in different backends, the kv pairs may not have equivalent property of etcd's +// version, create-time, etc., the abstracted LowLevelTxn interface does not support comparing on them. +// It only supports checking the value or whether the key exists. +// Avoid reading/writing the same key multiple times in a single transaction, otherwise the behavior +// would be undefined. +type LowLevelTxn interface { + If(conditions ...LowLevelTxnCondition) LowLevelTxn + Then(ops ...LowLevelTxnOp) LowLevelTxn + Else(ops ...LowLevelTxnOp) LowLevelTxn + Commit(ctx context.Context) (LowLevelTxnResult, error) +} + // Base is an abstract interface for load/save pd cluster data. type Base interface { - Txn + BaseReadWrite // RunInTxn runs the user provided function in a Transaction. // If user provided function f returns a non-nil error, then // transaction will not be committed, the same error will be // returned by RunInTxn. // Otherwise, it returns the error occurred during the // transaction. + // + // This is a highly-simplified transaction interface. As + // etcd's transaction API is quite limited, it's hard to use it + // to provide a complete transaction model as how a normal database + // does. So when this API is running on etcd backend, each read on + // `txn` implicitly constructs a condition. + // (ref: https://etcd.io/docs/v3.5/learning/api/#transaction) + // When reading a range using `LoadRange`, for each key found in the + // range there will be a condition constructed. Be aware of the + // possibility of causing phantom read. + // RunInTxn may not suit all use cases. When RunInTxn is found not + // improper to use, consider using CreateLowLevelTxn instead. + // // Note that transaction are not committed until RunInTxn returns nil. // Note: // 1. Load and LoadRange operations provides only stale read. @@ -42,4 +160,11 @@ type Base interface { // 2. Only when storage is etcd, does RunInTxn checks that // values loaded during transaction has not been modified before commit. RunInTxn(ctx context.Context, f func(txn Txn) error) error + + // CreateLowLevelTxn creates a transaction that provides the if-then-else + // API pattern which is the same as how etcd does, makes it possible + // to precisely control how etcd's transaction API is used when the + // backend is etcd. When there's other backend types, the behavior will be + // simulated. + CreateLowLevelTxn() LowLevelTxn } diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index f05561b0c0b..87890042d11 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -38,6 +38,7 @@ func TestEtcd(t *testing.T) { testRange(re, kv) testSaveMultiple(re, kv, 20) testLoadConflict(re, kv) + testLowLevelTxn(re, kv) } func TestLevelDB(t *testing.T) { @@ -49,6 +50,7 @@ func TestLevelDB(t *testing.T) { testReadWrite(re, kv) testRange(re, kv) testSaveMultiple(re, kv, 20) + testLowLevelTxn(re, kv) } func TestMemKV(t *testing.T) { @@ -57,6 +59,7 @@ func TestMemKV(t *testing.T) { testReadWrite(re, kv) testRange(re, kv) testSaveMultiple(re, kv, 20) + testLowLevelTxn(re, kv) } func testReadWrite(re *require.Assertions, kv Base) { @@ -159,3 +162,236 @@ func testLoadConflict(re *require.Assertions, kv Base) { // When other writer exists, loader must error. re.Error(kv.RunInTxn(context.Background(), conflictLoader)) } + +func mustHaveKeys(re *require.Assertions, kv Base, prefix string, expected ...KeyValuePair) { + keys, values, err := kv.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), 0) + re.NoError(err) + re.Equal(len(expected), len(keys)) + for i, key := range keys { + re.Equal(expected[i].Key, key) + re.Equal(expected[i].Value, values[i]) + } +} + +func testLowLevelTxn(re *require.Assertions, kv Base) { + // Test NotExists condition, putting in transaction. + res, err := kv.CreateLowLevelTxn().If( + LowLevelTxnCondition{ + Key: "txn-k1", + CmpType: LowLevelCmpNotExists, + }, + ).Then( + LowLevelTxnOp{ + Key: "txn-k1", + OpType: LowLevelOpPut, + Value: "v1", + }, + LowLevelTxnOp{ + Key: "txn-k2", + OpType: LowLevelOpPut, + Value: "v2", + }, + ).Else( + LowLevelTxnOp{ + Key: "txn-unexpected", + OpType: LowLevelOpPut, + Value: "unexpected", + }, + ).Commit(context.Background()) + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.Items, 2) + re.Len(res.Items[0].KeyValuePairs, 0) + re.Len(res.Items[1].KeyValuePairs, 0) + + mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) + + // Test Equal condition; reading in transaction. + res, err = kv.CreateLowLevelTxn().If( + LowLevelTxnCondition{ + Key: "txn-k1", + CmpType: LowLevelCmpEqual, + Value: "v1", + }, + ).Then( + LowLevelTxnOp{ + Key: "txn-k2", + OpType: LowLevelOpGet, + }, + ).Else( + LowLevelTxnOp{ + Key: "txn-unexpected", + OpType: LowLevelOpPut, + Value: "unexpected", + }, + ).Commit(context.Background()) + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.Items, 1) + re.Len(res.Items[0].KeyValuePairs, 1) + re.Equal("v2", res.Items[0].KeyValuePairs[0].Value) + mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) + + // Test NotEqual condition, else branch, reading range in transaction, reading & writing mixed. + res, err = kv.CreateLowLevelTxn().If( + LowLevelTxnCondition{ + Key: "txn-k1", + CmpType: LowLevelCmpNotEqual, + Value: "v1", + }, + ).Then( + LowLevelTxnOp{ + Key: "txn-unexpected", + OpType: LowLevelOpPut, + Value: "unexpected", + }, + ).Else( + LowLevelTxnOp{ + Key: "txn-k1", + OpType: LowLevelOpGetRange, + EndKey: "txn-k2\x00", + }, + LowLevelTxnOp{ + Key: "txn-k3", + OpType: LowLevelOpPut, + Value: "k3", + }, + ).Commit(context.Background()) + + re.NoError(err) + re.False(res.Succeeded) + re.Len(res.Items, 2) + re.Len(res.Items[0].KeyValuePairs, 2) + re.Equal([]KeyValuePair{{Key: "txn-k1", Value: "v1"}, {Key: "txn-k2", Value: "v2"}}, res.Items[0].KeyValuePairs) + re.Len(res.Items[1].KeyValuePairs, 0) + + mustHaveKeys(re, kv, "txn-", + KeyValuePair{Key: "txn-k1", Value: "v1"}, + KeyValuePair{Key: "txn-k2", Value: "v2"}, + KeyValuePair{Key: "txn-k3", Value: "k3"}) + + // Test Exists condition, deleting, overwriting. + res, err = kv.CreateLowLevelTxn().If( + LowLevelTxnCondition{ + Key: "txn-k1", + CmpType: LowLevelCmpExists, + }, + ).Then( + LowLevelTxnOp{ + Key: "txn-k1", + OpType: LowLevelOpDelete, + }, + LowLevelTxnOp{ + Key: "txn-k2", + OpType: LowLevelOpPut, + Value: "v22", + }, + // Delete not existing key. + LowLevelTxnOp{ + Key: "txn-k4", + OpType: LowLevelOpDelete, + }, + ).Else( + LowLevelTxnOp{ + Key: "txn-unexpected", + OpType: LowLevelOpPut, + Value: "unexpected", + }, + ).Commit(context.Background()) + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.Items, 3) + for _, item := range res.Items { + re.Len(item.KeyValuePairs, 0) + } + + mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k2", Value: "v22"}, KeyValuePair{Key: "txn-k3", Value: "k3"}) + + // Deleted keys can be regarded as not existing correctly. + res, err = kv.CreateLowLevelTxn().If( + LowLevelTxnCondition{ + Key: "txn-k1", + CmpType: LowLevelCmpNotExists, + }, + ).Then( + LowLevelTxnOp{ + Key: "txn-k2", + OpType: LowLevelOpDelete, + }, + LowLevelTxnOp{ + Key: "txn-k3", + OpType: LowLevelOpDelete, + }, + ).Commit(context.Background()) + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.Items, 2) + for _, item := range res.Items { + re.Len(item.KeyValuePairs, 0) + } + mustHaveKeys(re, kv, "txn-") + + // The following tests only check the correctness of the conditions. + check := func(conditions []LowLevelTxnCondition, shouldSuccess bool) { + + res, err := kv.CreateLowLevelTxn().If(conditions...).Commit(context.Background()) + re.NoError(err) + re.Equal(shouldSuccess, res.Succeeded) + } + + // "txn-k1" doesn't exist at this point. + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpExists}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotExists}}, true) + + err = kv.Save("txn-k1", "v1") + re.NoError(err) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpExists}}, true) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotExists}}, false) + + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}}, true) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotEqual, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v2"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotEqual, Value: "v2"}}, true) + + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v0"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v2"}}, true) + + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v2"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v0"}}, true) + + // Test comparing with not-existing key. + err = kv.Remove("txn-k1") + re.NoError(err) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotEqual, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v1"}}, false) + check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v1"}}, false) + + // Test the conditions are conjunctions. + err = kv.Save("txn-k1", "v1") + re.NoError(err) + err = kv.Save("txn-k2", "v2") + re.NoError(err) + check([]LowLevelTxnCondition{ + {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}, + {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v2"}, + }, true) + check([]LowLevelTxnCondition{ + {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}, + {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v0"}, + }, false) + check([]LowLevelTxnCondition{ + {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v0"}, + {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v2"}, + }, false) + check([]LowLevelTxnCondition{ + {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v0"}, + {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v0"}, + }, false) +} diff --git a/pkg/storage/kv/levedb_kv.go b/pkg/storage/kv/levedb_kv.go index 5a74c1928e8..3ece847843f 100644 --- a/pkg/storage/kv/levedb_kv.go +++ b/pkg/storage/kv/levedb_kv.go @@ -16,12 +16,12 @@ package kv import ( "context" + "fmt" + "github.com/pingcap/errors" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" - "github.com/pingcap/errors" - "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/syncutil" ) @@ -80,6 +80,12 @@ func (kv *LevelDBKV) Remove(key string) error { return errors.WithStack(kv.Delete([]byte(key), nil)) } +func (kv *LevelDBKV) CreateLowLevelTxn() LowLevelTxn { + return &levelDBLowLevelTxnSimulator{ + kv: kv, + } +} + // levelDBTxn implements kv.Txn. // It utilizes leveldb.Batch to batch user operations to an atomic execution unit. type levelDBTxn struct { @@ -147,3 +153,126 @@ func (txn *levelDBTxn) commit() error { return txn.kv.Write(txn.batch, nil) } + +type levelDBLowLevelTxnSimulator struct { + kv *LevelDBKV + condition []LowLevelTxnCondition + onSuccessOps []LowLevelTxnOp + onFailureOps []LowLevelTxnOp +} + +func (t *levelDBLowLevelTxnSimulator) If(conditions ...LowLevelTxnCondition) LowLevelTxn { + t.condition = append(t.condition, conditions...) + return t +} + +func (t *levelDBLowLevelTxnSimulator) Then(ops ...LowLevelTxnOp) LowLevelTxn { + t.onSuccessOps = append(t.onSuccessOps, ops...) + return t +} + +func (t *levelDBLowLevelTxnSimulator) Else(ops ...LowLevelTxnOp) LowLevelTxn { + t.onFailureOps = append(t.onFailureOps, ops...) + return t +} + +func (t *levelDBLowLevelTxnSimulator) Commit(_ctx context.Context) (res LowLevelTxnResult, err error) { + txn, err := t.kv.DB.OpenTransaction() + if err != nil { + return LowLevelTxnResult{}, err + } + defer func() { + // Set txn to nil when the function finished normally. + // When the function encounters any error and returns early, the transaction will be discarded here. + if txn != nil { + txn.Discard() + } + }() + + succeeds := true + for _, condition := range t.condition { + value, err := t.kv.DB.Get([]byte(condition.Key), nil) + valueStr := string(value) + exists := true + if err != nil { + if err == leveldb.ErrNotFound { + exists = false + } else { + return res, errors.WithStack(err) + } + } + + if !condition.CheckOnValue(valueStr, exists) { + succeeds = false + break + } + } + + ops := t.onSuccessOps + if !succeeds { + ops = t.onFailureOps + } + + results := make([]LowLevelTxnResultItem, 0, len(ops)) + + for _, operation := range ops { + switch operation.OpType { + case LowLevelOpPut: + err = txn.Put([]byte(operation.Key), []byte(operation.Value), nil) + if err != nil { + return res, errors.WithStack(err) + } + results = append(results, LowLevelTxnResultItem{}) + case LowLevelOpDelete: + err = txn.Delete([]byte(operation.Key), nil) + if err != nil { + return res, errors.WithStack(err) + } + results = append(results, LowLevelTxnResultItem{}) + case LowLevelOpGet: + value, err := txn.Get([]byte(operation.Key), nil) + result := LowLevelTxnResultItem{} + if err != nil { + if err != leveldb.ErrNotFound { + return res, errors.WithStack(err) + } + } else { + result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ + Key: operation.Key, + Value: string(value), + }) + } + results = append(results, result) + case LowLevelOpGetRange: + iter := txn.NewIterator(&util.Range{Start: []byte(operation.Key), Limit: []byte(operation.EndKey)}, nil) + result := LowLevelTxnResultItem{} + count := 0 + for iter.Next() { + if operation.Limit > 0 && count >= operation.Limit { + break + } + result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ + Key: string(iter.Key()), + Value: string(iter.Value()), + }) + count++ + } + iter.Release() + results = append(results, result) + default: + panic(fmt.Sprintf("unknown operation type %v", operation.OpType)) + } + } + + err = txn.Commit() + if err != nil { + return res, errors.WithStack(err) + } + // Avoid being discarded again in the defer block. + txn = nil + + return LowLevelTxnResult{ + Succeeded: succeeds, + Items: results, + }, nil +} diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index cc5dca29851..40d87186805 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -16,6 +16,7 @@ package kv import ( "context" + "fmt" "github.com/google/btree" @@ -52,6 +53,10 @@ func (s *memoryKVItem) Less(than *memoryKVItem) bool { func (kv *memoryKV) Load(key string) (string, error) { kv.RLock() defer kv.RUnlock() + return kv.load(key) +} + +func (kv *memoryKV) load(key string) (string, error) { item, ok := kv.tree.Get(memoryKVItem{key, ""}) if !ok { return "", nil @@ -69,6 +74,10 @@ func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string }) kv.RLock() defer kv.RUnlock() + return kv.loadRange(key, endKey, limit) +} + +func (kv *memoryKV) loadRange(key, endKey string, limit int) ([]string, []string, error) { keys := make([]string, 0, limit) values := make([]string, 0, limit) kv.tree.AscendRange(memoryKVItem{key, ""}, memoryKVItem{endKey, ""}, func(item memoryKVItem) bool { @@ -86,6 +95,10 @@ func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string func (kv *memoryKV) Save(key, value string) error { kv.Lock() defer kv.Unlock() + return kv.save(key, value) +} + +func (kv *memoryKV) save(key, value string) error { kv.tree.ReplaceOrInsert(memoryKVItem{key, value}) return nil } @@ -94,11 +107,20 @@ func (kv *memoryKV) Save(key, value string) error { func (kv *memoryKV) Remove(key string) error { kv.Lock() defer kv.Unlock() + return kv.remove(key) +} +func (kv *memoryKV) remove(key string) error { kv.tree.Delete(memoryKVItem{key, ""}) return nil } +func (kv *memoryKV) CreateLowLevelTxn() LowLevelTxn { + return &memKvLowLevelTxnSimulator{ + kv: kv, + } +} + // memTxn implements kv.Txn. type memTxn struct { kv *memoryKV @@ -198,3 +220,102 @@ func (txn *memTxn) commit() error { } return nil } + +type memKvLowLevelTxnSimulator struct { + kv *memoryKV + conditions []LowLevelTxnCondition + onSuccessOps []LowLevelTxnOp + onFailureOps []LowLevelTxnOp +} + +func (t *memKvLowLevelTxnSimulator) If(conditions ...LowLevelTxnCondition) LowLevelTxn { + t.conditions = append(t.conditions, conditions...) + return t +} + +func (t *memKvLowLevelTxnSimulator) Then(ops ...LowLevelTxnOp) LowLevelTxn { + t.onSuccessOps = append(t.onSuccessOps, ops...) + return t +} + +func (t *memKvLowLevelTxnSimulator) Else(ops ...LowLevelTxnOp) LowLevelTxn { + t.onFailureOps = append(t.onFailureOps, ops...) + return t +} + +func (t *memKvLowLevelTxnSimulator) Commit(_ctx context.Context) (LowLevelTxnResult, error) { + t.kv.Lock() + defer t.kv.Unlock() + + succeeds := true + for _, condition := range t.conditions { + value, err := t.kv.load(condition.Key) + if err != nil { + return LowLevelTxnResult{}, err + } + // There's a convention to represent not-existing key with empty value. + exists := value != "" + + if !condition.CheckOnValue(value, exists) { + succeeds = false + break + } + } + + ops := t.onSuccessOps + if !succeeds { + ops = t.onFailureOps + } + + results := make([]LowLevelTxnResultItem, 0, len(ops)) + + for _, operation := range ops { + switch operation.OpType { + case LowLevelOpPut: + err := t.kv.save(operation.Key, operation.Value) + if err != nil { + panic(fmt.Sprintf("unexpected error when operating memoryKV: %v", err)) + } + results = append(results, LowLevelTxnResultItem{}) + case LowLevelOpDelete: + err := t.kv.remove(operation.Key) + if err != nil { + panic(fmt.Sprintf("unexpected error when operating memoryKV: %v", err)) + } + results = append(results, LowLevelTxnResultItem{}) + case LowLevelOpGet: + value, err := t.kv.load(operation.Key) + if err != nil { + panic(fmt.Sprintf("unexpected error when operating memoryKV: %v", err)) + } + result := LowLevelTxnResultItem{} + if len(value) > 0 { + result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ + Key: operation.Key, + Value: value, + }) + } + results = append(results, result) + case LowLevelOpGetRange: + keys, values, err := t.kv.loadRange(operation.Key, operation.EndKey, operation.Limit) + if err != nil { + panic(fmt.Sprintf("unexpected error when operating memoryKV: %v", err)) + } + result := LowLevelTxnResultItem{} + for i := range keys { + result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ + Key: keys[i], + Value: values[i], + }) + } + results = append(results, result) + default: + panic(fmt.Sprintf("unknown operation type %v", operation.OpType)) + } + } + + return LowLevelTxnResult{ + Succeeded: succeeds, + Items: results, + }, nil +} From 413cc45798aac6698d8f4cd5cc31cabe35bb95de Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 21 Jan 2025 14:22:26 +0800 Subject: [PATCH 02/11] Fix lint and add comments Signed-off-by: MyonKeminta --- errors.toml | 2 +- pkg/storage/kv/etcd_kv.go | 11 +++++-- pkg/storage/kv/kv.go | 52 ++++++++++++++++++++---------- pkg/storage/kv/kv_test.go | 34 ++++++++++---------- pkg/storage/kv/levedb_kv.go | 14 +++++++-- pkg/storage/kv/mem_kv.go | 63 +++++++++++++++++-------------------- 6 files changed, 101 insertions(+), 75 deletions(-) diff --git a/errors.toml b/errors.toml index a56b187cbac..d242174d88a 100644 --- a/errors.toml +++ b/errors.toml @@ -416,7 +416,7 @@ error = ''' etcd transaction failed, conflicted and rolled back ''' -[PD:etcd:ErrEtcdTxnResponse] +["PD:etcd:ErrEtcdTxnResponse"] error = ''' etcd transaction returned invalid response: %v ''' diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index e4b838e3047..59ac1f0d7f7 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -140,6 +140,7 @@ func (kv *etcdKVBase) Remove(key string) error { return nil } +// CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern. func (kv *etcdKVBase) CreateLowLevelTxn() LowLevelTxn { return &lowLevelTxnWrapper{ inner: NewSlowLogTxn(kv.client), @@ -310,6 +311,7 @@ type lowLevelTxnWrapper struct { rootPath string } +// If implements LowLevelTxn interface for adding conditions to the transaction. func (l *lowLevelTxnWrapper) If(conditions ...LowLevelTxnCondition) LowLevelTxn { cmpList := make([]clientv3.Cmp, 0, len(conditions)) for _, c := range conditions { @@ -364,16 +366,21 @@ func (l *lowLevelTxnWrapper) convertOps(ops []LowLevelTxnOp) []clientv3.Op { return opsList } +// Then implements LowLevelTxn interface for adding operations that need to be executed when the condition passes to +// the transaction. func (l *lowLevelTxnWrapper) Then(ops ...LowLevelTxnOp) LowLevelTxn { l.inner = l.inner.Then(l.convertOps(ops)...) return l } +// Else implements LowLevelTxn interface for adding operations that need to be executed when the condition doesn't pass +// to the transaction. func (l *lowLevelTxnWrapper) Else(ops ...LowLevelTxnOp) LowLevelTxn { l.inner = l.inner.Else(l.convertOps(ops)...) return l } +// Commit implements LowLevelTxn interface for committing the transaction. func (l *lowLevelTxnWrapper) Commit(_ctx context.Context) (LowLevelTxnResult, error) { resp, err := l.inner.Commit() if err != nil { @@ -415,7 +422,7 @@ func (l *lowLevelTxnWrapper) Commit(_ctx context.Context) (LowLevelTxnResult, er items = append(items, resultItem) } return LowLevelTxnResult{ - Succeeded: resp.Succeeded, - Items: items, + Succeeded: resp.Succeeded, + ResultItems: items, }, nil } diff --git a/pkg/storage/kv/kv.go b/pkg/storage/kv/kv.go index 48640e743c9..db619c1d3bc 100644 --- a/pkg/storage/kv/kv.go +++ b/pkg/storage/kv/kv.go @@ -16,22 +16,14 @@ package kv import "context" -type BaseReadWrite interface { - Save(key, value string) error - Remove(key string) error - Load(key string) (string, error) - LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) -} - -// Txn bundles multiple operations into a single executable unit. -// It enables kv to atomically apply a set of updates. -type Txn interface { - BaseReadWrite -} - +// LowLevelTxnCmpType represents the comparison type that is used in the condition of LowLevelTxn. type LowLevelTxnCmpType int + +// LowLevelTxnOpType represents the operation type that is used in the operations (either `Then` branch or `Else` +// branch) of LowLevelTxn. type LowLevelTxnOpType int +// nolint:revive const ( LowLevelCmpEqual LowLevelTxnCmpType = iota LowLevelCmpNotEqual @@ -41,6 +33,7 @@ const ( LowLevelCmpNotExists ) +// nolint:revive const ( LowLevelOpPut LowLevelTxnOpType = iota LowLevelOpDelete @@ -48,12 +41,15 @@ const ( LowLevelOpGetRange ) +// LowLevelTxnCondition represents a condition in a LowLevelTxn. type LowLevelTxnCondition struct { Key string CmpType LowLevelTxnCmpType - Value string + // The value to compare with. It's not used when CmpType is LowLevelCmpExists or LowLevelCmpNotExists. + Value string } +// CheckOnValue checks whether the condition is satisfied on the given value. func (c *LowLevelTxnCondition) CheckOnValue(value string, exists bool) bool { switch c.CmpType { case LowLevelCmpEqual: @@ -86,6 +82,8 @@ func (c *LowLevelTxnCondition) CheckOnValue(value string, exists bool) bool { return false } +// LowLevelTxnOp represents an operation in a LowLevelTxn's `Then` or `Else` branch and will be executed according to +// the result of checking conditions. type LowLevelTxnOp struct { Key string OpType LowLevelTxnOpType @@ -96,6 +94,7 @@ type LowLevelTxnOp struct { Limit int } +// KeyValuePair represents a pair of key and value. type KeyValuePair struct { Key string Value string @@ -104,15 +103,20 @@ type KeyValuePair struct { // LowLevelTxnResultItem represents a single result of a read operation in a LowLevelTxn. type LowLevelTxnResultItem struct { KeyValuePairs []KeyValuePair - More bool } // LowLevelTxnResult represents the result of a LowLevelTxn. The results of operations in `Then` or `Else` branches -// will be listed in `Items` in the same order as the operations are added. +// will be listed in `ResultItems` in the same order as the operations are added. // For Put or Delete operations, its corresponding result is the previous value before writing. type LowLevelTxnResult struct { Succeeded bool - Items []LowLevelTxnResultItem + // The results of each operation in the `Then` branch or the `Else` branch of a transaction, depending on + // whether `Succeeded`. The i-th result belongs to the i-th operation added to the executed branch. + // * For Put or Delete operations, the result is empty. + // * For Get operations, the result contains a key-value pair representing the get result. In case the key + // does not exist, its `KeyValuePairs` field will be empty. + // * For GetRange operations, the result is a list of key-value pairs containing key-value paris that are scanned. + ResultItems []LowLevelTxnResultItem } // LowLevelTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction @@ -130,6 +134,20 @@ type LowLevelTxn interface { Commit(ctx context.Context) (LowLevelTxnResult, error) } +// BaseReadWrite is the API set, shared by Base and Txn interfaces, that provides basic KV read and write operations. +type BaseReadWrite interface { + Save(key, value string) error + Remove(key string) error + Load(key string) (string, error) + LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) +} + +// Txn bundles multiple operations into a single executable unit. +// It enables kv to atomically apply a set of updates. +type Txn interface { + BaseReadWrite +} + // Base is an abstract interface for load/save pd cluster data. type Base interface { BaseReadWrite diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index 87890042d11..51ee02651bc 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -163,6 +163,7 @@ func testLoadConflict(re *require.Assertions, kv Base) { re.Error(kv.RunInTxn(context.Background(), conflictLoader)) } +// nolint:unparam func mustHaveKeys(re *require.Assertions, kv Base, prefix string, expected ...KeyValuePair) { keys, values, err := kv.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), 0) re.NoError(err) @@ -201,9 +202,9 @@ func testLowLevelTxn(re *require.Assertions, kv Base) { re.NoError(err) re.True(res.Succeeded) - re.Len(res.Items, 2) - re.Len(res.Items[0].KeyValuePairs, 0) - re.Len(res.Items[1].KeyValuePairs, 0) + re.Len(res.ResultItems, 2) + re.Empty(res.ResultItems[0].KeyValuePairs) + re.Empty(res.ResultItems[1].KeyValuePairs) mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) @@ -229,9 +230,9 @@ func testLowLevelTxn(re *require.Assertions, kv Base) { re.NoError(err) re.True(res.Succeeded) - re.Len(res.Items, 1) - re.Len(res.Items[0].KeyValuePairs, 1) - re.Equal("v2", res.Items[0].KeyValuePairs[0].Value) + re.Len(res.ResultItems, 1) + re.Len(res.ResultItems[0].KeyValuePairs, 1) + re.Equal("v2", res.ResultItems[0].KeyValuePairs[0].Value) mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) // Test NotEqual condition, else branch, reading range in transaction, reading & writing mixed. @@ -262,10 +263,10 @@ func testLowLevelTxn(re *require.Assertions, kv Base) { re.NoError(err) re.False(res.Succeeded) - re.Len(res.Items, 2) - re.Len(res.Items[0].KeyValuePairs, 2) - re.Equal([]KeyValuePair{{Key: "txn-k1", Value: "v1"}, {Key: "txn-k2", Value: "v2"}}, res.Items[0].KeyValuePairs) - re.Len(res.Items[1].KeyValuePairs, 0) + re.Len(res.ResultItems, 2) + re.Len(res.ResultItems[0].KeyValuePairs, 2) + re.Equal([]KeyValuePair{{Key: "txn-k1", Value: "v1"}, {Key: "txn-k2", Value: "v2"}}, res.ResultItems[0].KeyValuePairs) + re.Empty(res.ResultItems[1].KeyValuePairs) mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, @@ -303,9 +304,9 @@ func testLowLevelTxn(re *require.Assertions, kv Base) { re.NoError(err) re.True(res.Succeeded) - re.Len(res.Items, 3) - for _, item := range res.Items { - re.Len(item.KeyValuePairs, 0) + re.Len(res.ResultItems, 3) + for _, item := range res.ResultItems { + re.Empty(item.KeyValuePairs) } mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k2", Value: "v22"}, KeyValuePair{Key: "txn-k3", Value: "k3"}) @@ -329,15 +330,14 @@ func testLowLevelTxn(re *require.Assertions, kv Base) { re.NoError(err) re.True(res.Succeeded) - re.Len(res.Items, 2) - for _, item := range res.Items { - re.Len(item.KeyValuePairs, 0) + re.Len(res.ResultItems, 2) + for _, item := range res.ResultItems { + re.Empty(item.KeyValuePairs) } mustHaveKeys(re, kv, "txn-") // The following tests only check the correctness of the conditions. check := func(conditions []LowLevelTxnCondition, shouldSuccess bool) { - res, err := kv.CreateLowLevelTxn().If(conditions...).Commit(context.Background()) re.NoError(err) re.Equal(shouldSuccess, res.Succeeded) diff --git a/pkg/storage/kv/levedb_kv.go b/pkg/storage/kv/levedb_kv.go index 3ece847843f..3f3f619c570 100644 --- a/pkg/storage/kv/levedb_kv.go +++ b/pkg/storage/kv/levedb_kv.go @@ -18,10 +18,11 @@ import ( "context" "fmt" - "github.com/pingcap/errors" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" + "github.com/pingcap/errors" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/syncutil" ) @@ -80,6 +81,7 @@ func (kv *LevelDBKV) Remove(key string) error { return errors.WithStack(kv.Delete([]byte(key), nil)) } +// CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern. func (kv *LevelDBKV) CreateLowLevelTxn() LowLevelTxn { return &levelDBLowLevelTxnSimulator{ kv: kv, @@ -161,21 +163,27 @@ type levelDBLowLevelTxnSimulator struct { onFailureOps []LowLevelTxnOp } +// If implements LowLevelTxn interface for adding conditions to the transaction. func (t *levelDBLowLevelTxnSimulator) If(conditions ...LowLevelTxnCondition) LowLevelTxn { t.condition = append(t.condition, conditions...) return t } +// Then implements LowLevelTxn interface for adding operations that need to be executed when the condition passes to +// the transaction. func (t *levelDBLowLevelTxnSimulator) Then(ops ...LowLevelTxnOp) LowLevelTxn { t.onSuccessOps = append(t.onSuccessOps, ops...) return t } +// Else implements LowLevelTxn interface for adding operations that need to be executed when the condition doesn't pass +// to the transaction. func (t *levelDBLowLevelTxnSimulator) Else(ops ...LowLevelTxnOp) LowLevelTxn { t.onFailureOps = append(t.onFailureOps, ops...) return t } +// Commit implements LowLevelTxn interface for committing the transaction. func (t *levelDBLowLevelTxnSimulator) Commit(_ctx context.Context) (res LowLevelTxnResult, err error) { txn, err := t.kv.DB.OpenTransaction() if err != nil { @@ -272,7 +280,7 @@ func (t *levelDBLowLevelTxnSimulator) Commit(_ctx context.Context) (res LowLevel txn = nil return LowLevelTxnResult{ - Succeeded: succeeds, - Items: results, + Succeeded: succeeds, + ResultItems: results, }, nil } diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index 40d87186805..9e402d31e8f 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -53,15 +53,15 @@ func (s *memoryKVItem) Less(than *memoryKVItem) bool { func (kv *memoryKV) Load(key string) (string, error) { kv.RLock() defer kv.RUnlock() - return kv.load(key) + return kv.loadNoLock(key), nil } -func (kv *memoryKV) load(key string) (string, error) { +func (kv *memoryKV) loadNoLock(key string) string { item, ok := kv.tree.Get(memoryKVItem{key, ""}) if !ok { - return "", nil + return "" } - return item.value, nil + return item.value } // LoadRange loads the keys in the range of [key, endKey). @@ -74,10 +74,11 @@ func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string }) kv.RLock() defer kv.RUnlock() - return kv.loadRange(key, endKey, limit) + keys, values := kv.loadRangeNoLock(key, endKey, limit) + return keys, values, nil } -func (kv *memoryKV) loadRange(key, endKey string, limit int) ([]string, []string, error) { +func (kv *memoryKV) loadRangeNoLock(key, endKey string, limit int) ([]string, []string) { keys := make([]string, 0, limit) values := make([]string, 0, limit) kv.tree.AscendRange(memoryKVItem{key, ""}, memoryKVItem{endKey, ""}, func(item memoryKVItem) bool { @@ -88,33 +89,34 @@ func (kv *memoryKV) loadRange(key, endKey string, limit int) ([]string, []string } return true }) - return keys, values, nil + return keys, values } // Save saves the key-value pair. func (kv *memoryKV) Save(key, value string) error { kv.Lock() defer kv.Unlock() - return kv.save(key, value) + kv.saveNoLock(key, value) + return nil } -func (kv *memoryKV) save(key, value string) error { +func (kv *memoryKV) saveNoLock(key, value string) { kv.tree.ReplaceOrInsert(memoryKVItem{key, value}) - return nil } // Remove removes the key. func (kv *memoryKV) Remove(key string) error { kv.Lock() defer kv.Unlock() - return kv.remove(key) + kv.removeNoLock(key) + return nil } -func (kv *memoryKV) remove(key string) error { +func (kv *memoryKV) removeNoLock(key string) { kv.tree.Delete(memoryKVItem{key, ""}) - return nil } +// CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern. func (kv *memoryKV) CreateLowLevelTxn() LowLevelTxn { return &memKvLowLevelTxnSimulator{ kv: kv, @@ -228,31 +230,34 @@ type memKvLowLevelTxnSimulator struct { onFailureOps []LowLevelTxnOp } +// If implements LowLevelTxn interface for adding conditions to the transaction. func (t *memKvLowLevelTxnSimulator) If(conditions ...LowLevelTxnCondition) LowLevelTxn { t.conditions = append(t.conditions, conditions...) return t } +// Then implements LowLevelTxn interface for adding operations that need to be executed when the condition passes to +// the transaction. func (t *memKvLowLevelTxnSimulator) Then(ops ...LowLevelTxnOp) LowLevelTxn { t.onSuccessOps = append(t.onSuccessOps, ops...) return t } +// Else implements LowLevelTxn interface for adding operations that need to be executed when the condition doesn't pass +// to the transaction. func (t *memKvLowLevelTxnSimulator) Else(ops ...LowLevelTxnOp) LowLevelTxn { t.onFailureOps = append(t.onFailureOps, ops...) return t } +// Commit implements LowLevelTxn interface for committing the transaction. func (t *memKvLowLevelTxnSimulator) Commit(_ctx context.Context) (LowLevelTxnResult, error) { t.kv.Lock() defer t.kv.Unlock() succeeds := true for _, condition := range t.conditions { - value, err := t.kv.load(condition.Key) - if err != nil { - return LowLevelTxnResult{}, err - } + value := t.kv.loadNoLock(condition.Key) // There's a convention to represent not-existing key with empty value. exists := value != "" @@ -272,22 +277,13 @@ func (t *memKvLowLevelTxnSimulator) Commit(_ctx context.Context) (LowLevelTxnRes for _, operation := range ops { switch operation.OpType { case LowLevelOpPut: - err := t.kv.save(operation.Key, operation.Value) - if err != nil { - panic(fmt.Sprintf("unexpected error when operating memoryKV: %v", err)) - } + t.kv.saveNoLock(operation.Key, operation.Value) results = append(results, LowLevelTxnResultItem{}) case LowLevelOpDelete: - err := t.kv.remove(operation.Key) - if err != nil { - panic(fmt.Sprintf("unexpected error when operating memoryKV: %v", err)) - } + t.kv.removeNoLock(operation.Key) results = append(results, LowLevelTxnResultItem{}) case LowLevelOpGet: - value, err := t.kv.load(operation.Key) - if err != nil { - panic(fmt.Sprintf("unexpected error when operating memoryKV: %v", err)) - } + value := t.kv.loadNoLock(operation.Key) result := LowLevelTxnResultItem{} if len(value) > 0 { result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ @@ -297,10 +293,7 @@ func (t *memKvLowLevelTxnSimulator) Commit(_ctx context.Context) (LowLevelTxnRes } results = append(results, result) case LowLevelOpGetRange: - keys, values, err := t.kv.loadRange(operation.Key, operation.EndKey, operation.Limit) - if err != nil { - panic(fmt.Sprintf("unexpected error when operating memoryKV: %v", err)) - } + keys, values := t.kv.loadRangeNoLock(operation.Key, operation.EndKey, operation.Limit) result := LowLevelTxnResultItem{} for i := range keys { result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ @@ -315,7 +308,7 @@ func (t *memKvLowLevelTxnSimulator) Commit(_ctx context.Context) (LowLevelTxnRes } return LowLevelTxnResult{ - Succeeded: succeeds, - Items: results, + Succeeded: succeeds, + ResultItems: results, }, nil } From 29a8470e60d2853a9c6b422c632fc0682241bd5a Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 21 Jan 2025 15:06:32 +0800 Subject: [PATCH 03/11] fix errors.toml Signed-off-by: MyonKeminta --- errors.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/errors.toml b/errors.toml index d242174d88a..303ef31ea16 100644 --- a/errors.toml +++ b/errors.toml @@ -416,14 +416,14 @@ error = ''' etcd transaction failed, conflicted and rolled back ''' -["PD:etcd:ErrEtcdTxnResponse"] +["PD:etcd:ErrEtcdTxnInternal"] error = ''' -etcd transaction returned invalid response: %v +internal etcd transaction error occurred ''' -["PD:etcd:ErrEtcdTxnInternal"] +["PD:etcd:ErrEtcdTxnResponse"] error = ''' -internal etcd transaction error occurred +etcd transaction returned invalid response: %v ''' ["PD:etcd:ErrEtcdURLMap"] From a93a18186778b43c74945eae225f1118f8b5a577 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 13 Feb 2025 15:37:54 +0800 Subject: [PATCH 04/11] fix typo Signed-off-by: MyonKeminta --- pkg/storage/kv/kv.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/kv/kv.go b/pkg/storage/kv/kv.go index db619c1d3bc..5458f6d4320 100644 --- a/pkg/storage/kv/kv.go +++ b/pkg/storage/kv/kv.go @@ -167,7 +167,7 @@ type Base interface { // When reading a range using `LoadRange`, for each key found in the // range there will be a condition constructed. Be aware of the // possibility of causing phantom read. - // RunInTxn may not suit all use cases. When RunInTxn is found not + // RunInTxn may not suit all use cases. When RunInTxn is found // improper to use, consider using CreateLowLevelTxn instead. // // Note that transaction are not committed until RunInTxn returns nil. From 1edfa440ebec70a6b6629faf35a78ed5099a2bfa Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Mon, 17 Feb 2025 18:05:58 +0800 Subject: [PATCH 05/11] Remove implementation to memory kv and leveldb kv Signed-off-by: MyonKeminta --- pkg/storage/kv/kv_test.go | 2 - pkg/storage/kv/levedb_kv.go | 134 +----------------------------------- pkg/storage/kv/mem_kv.go | 95 +------------------------ 3 files changed, 2 insertions(+), 229 deletions(-) diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index 51ee02651bc..9363d4c4aba 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -50,7 +50,6 @@ func TestLevelDB(t *testing.T) { testReadWrite(re, kv) testRange(re, kv) testSaveMultiple(re, kv, 20) - testLowLevelTxn(re, kv) } func TestMemKV(t *testing.T) { @@ -59,7 +58,6 @@ func TestMemKV(t *testing.T) { testReadWrite(re, kv) testRange(re, kv) testSaveMultiple(re, kv, 20) - testLowLevelTxn(re, kv) } func testReadWrite(re *require.Assertions, kv Base) { diff --git a/pkg/storage/kv/levedb_kv.go b/pkg/storage/kv/levedb_kv.go index 3f3f619c570..2a1706de162 100644 --- a/pkg/storage/kv/levedb_kv.go +++ b/pkg/storage/kv/levedb_kv.go @@ -16,7 +16,6 @@ package kv import ( "context" - "fmt" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/util" @@ -83,9 +82,7 @@ func (kv *LevelDBKV) Remove(key string) error { // CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern. func (kv *LevelDBKV) CreateLowLevelTxn() LowLevelTxn { - return &levelDBLowLevelTxnSimulator{ - kv: kv, - } + panic("unimplemented") } // levelDBTxn implements kv.Txn. @@ -155,132 +152,3 @@ func (txn *levelDBTxn) commit() error { return txn.kv.Write(txn.batch, nil) } - -type levelDBLowLevelTxnSimulator struct { - kv *LevelDBKV - condition []LowLevelTxnCondition - onSuccessOps []LowLevelTxnOp - onFailureOps []LowLevelTxnOp -} - -// If implements LowLevelTxn interface for adding conditions to the transaction. -func (t *levelDBLowLevelTxnSimulator) If(conditions ...LowLevelTxnCondition) LowLevelTxn { - t.condition = append(t.condition, conditions...) - return t -} - -// Then implements LowLevelTxn interface for adding operations that need to be executed when the condition passes to -// the transaction. -func (t *levelDBLowLevelTxnSimulator) Then(ops ...LowLevelTxnOp) LowLevelTxn { - t.onSuccessOps = append(t.onSuccessOps, ops...) - return t -} - -// Else implements LowLevelTxn interface for adding operations that need to be executed when the condition doesn't pass -// to the transaction. -func (t *levelDBLowLevelTxnSimulator) Else(ops ...LowLevelTxnOp) LowLevelTxn { - t.onFailureOps = append(t.onFailureOps, ops...) - return t -} - -// Commit implements LowLevelTxn interface for committing the transaction. -func (t *levelDBLowLevelTxnSimulator) Commit(_ctx context.Context) (res LowLevelTxnResult, err error) { - txn, err := t.kv.DB.OpenTransaction() - if err != nil { - return LowLevelTxnResult{}, err - } - defer func() { - // Set txn to nil when the function finished normally. - // When the function encounters any error and returns early, the transaction will be discarded here. - if txn != nil { - txn.Discard() - } - }() - - succeeds := true - for _, condition := range t.condition { - value, err := t.kv.DB.Get([]byte(condition.Key), nil) - valueStr := string(value) - exists := true - if err != nil { - if err == leveldb.ErrNotFound { - exists = false - } else { - return res, errors.WithStack(err) - } - } - - if !condition.CheckOnValue(valueStr, exists) { - succeeds = false - break - } - } - - ops := t.onSuccessOps - if !succeeds { - ops = t.onFailureOps - } - - results := make([]LowLevelTxnResultItem, 0, len(ops)) - - for _, operation := range ops { - switch operation.OpType { - case LowLevelOpPut: - err = txn.Put([]byte(operation.Key), []byte(operation.Value), nil) - if err != nil { - return res, errors.WithStack(err) - } - results = append(results, LowLevelTxnResultItem{}) - case LowLevelOpDelete: - err = txn.Delete([]byte(operation.Key), nil) - if err != nil { - return res, errors.WithStack(err) - } - results = append(results, LowLevelTxnResultItem{}) - case LowLevelOpGet: - value, err := txn.Get([]byte(operation.Key), nil) - result := LowLevelTxnResultItem{} - if err != nil { - if err != leveldb.ErrNotFound { - return res, errors.WithStack(err) - } - } else { - result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ - Key: operation.Key, - Value: string(value), - }) - } - results = append(results, result) - case LowLevelOpGetRange: - iter := txn.NewIterator(&util.Range{Start: []byte(operation.Key), Limit: []byte(operation.EndKey)}, nil) - result := LowLevelTxnResultItem{} - count := 0 - for iter.Next() { - if operation.Limit > 0 && count >= operation.Limit { - break - } - result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ - Key: string(iter.Key()), - Value: string(iter.Value()), - }) - count++ - } - iter.Release() - results = append(results, result) - default: - panic(fmt.Sprintf("unknown operation type %v", operation.OpType)) - } - } - - err = txn.Commit() - if err != nil { - return res, errors.WithStack(err) - } - // Avoid being discarded again in the defer block. - txn = nil - - return LowLevelTxnResult{ - Succeeded: succeeds, - ResultItems: results, - }, nil -} diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index 9e402d31e8f..d783b27fe7d 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -16,7 +16,6 @@ package kv import ( "context" - "fmt" "github.com/google/btree" @@ -118,9 +117,7 @@ func (kv *memoryKV) removeNoLock(key string) { // CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern. func (kv *memoryKV) CreateLowLevelTxn() LowLevelTxn { - return &memKvLowLevelTxnSimulator{ - kv: kv, - } + panic("unimplemented") } // memTxn implements kv.Txn. @@ -222,93 +219,3 @@ func (txn *memTxn) commit() error { } return nil } - -type memKvLowLevelTxnSimulator struct { - kv *memoryKV - conditions []LowLevelTxnCondition - onSuccessOps []LowLevelTxnOp - onFailureOps []LowLevelTxnOp -} - -// If implements LowLevelTxn interface for adding conditions to the transaction. -func (t *memKvLowLevelTxnSimulator) If(conditions ...LowLevelTxnCondition) LowLevelTxn { - t.conditions = append(t.conditions, conditions...) - return t -} - -// Then implements LowLevelTxn interface for adding operations that need to be executed when the condition passes to -// the transaction. -func (t *memKvLowLevelTxnSimulator) Then(ops ...LowLevelTxnOp) LowLevelTxn { - t.onSuccessOps = append(t.onSuccessOps, ops...) - return t -} - -// Else implements LowLevelTxn interface for adding operations that need to be executed when the condition doesn't pass -// to the transaction. -func (t *memKvLowLevelTxnSimulator) Else(ops ...LowLevelTxnOp) LowLevelTxn { - t.onFailureOps = append(t.onFailureOps, ops...) - return t -} - -// Commit implements LowLevelTxn interface for committing the transaction. -func (t *memKvLowLevelTxnSimulator) Commit(_ctx context.Context) (LowLevelTxnResult, error) { - t.kv.Lock() - defer t.kv.Unlock() - - succeeds := true - for _, condition := range t.conditions { - value := t.kv.loadNoLock(condition.Key) - // There's a convention to represent not-existing key with empty value. - exists := value != "" - - if !condition.CheckOnValue(value, exists) { - succeeds = false - break - } - } - - ops := t.onSuccessOps - if !succeeds { - ops = t.onFailureOps - } - - results := make([]LowLevelTxnResultItem, 0, len(ops)) - - for _, operation := range ops { - switch operation.OpType { - case LowLevelOpPut: - t.kv.saveNoLock(operation.Key, operation.Value) - results = append(results, LowLevelTxnResultItem{}) - case LowLevelOpDelete: - t.kv.removeNoLock(operation.Key) - results = append(results, LowLevelTxnResultItem{}) - case LowLevelOpGet: - value := t.kv.loadNoLock(operation.Key) - result := LowLevelTxnResultItem{} - if len(value) > 0 { - result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ - Key: operation.Key, - Value: value, - }) - } - results = append(results, result) - case LowLevelOpGetRange: - keys, values := t.kv.loadRangeNoLock(operation.Key, operation.EndKey, operation.Limit) - result := LowLevelTxnResultItem{} - for i := range keys { - result.KeyValuePairs = append(result.KeyValuePairs, KeyValuePair{ - Key: keys[i], - Value: values[i], - }) - } - results = append(results, result) - default: - panic(fmt.Sprintf("unknown operation type %v", operation.OpType)) - } - } - - return LowLevelTxnResult{ - Succeeded: succeeds, - ResultItems: results, - }, nil -} From a927abf63395b55e84d3f7a45081ef695dfe6e9b Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 18 Feb 2025 15:13:24 +0800 Subject: [PATCH 06/11] Renaming Signed-off-by: MyonKeminta --- pkg/storage/kv/etcd_kv.go | 62 +++++++------- pkg/storage/kv/kv.go | 92 ++++++++++---------- pkg/storage/kv/kv_test.go | 164 ++++++++++++++++++------------------ pkg/storage/kv/levedb_kv.go | 4 +- pkg/storage/kv/mem_kv.go | 4 +- 5 files changed, 163 insertions(+), 163 deletions(-) diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index 59ac1f0d7f7..357e85824f7 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -140,9 +140,9 @@ func (kv *etcdKVBase) Remove(key string) error { return nil } -// CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern. -func (kv *etcdKVBase) CreateLowLevelTxn() LowLevelTxn { - return &lowLevelTxnWrapper{ +// CreateRawEtcdTxn creates a transaction that provides interface in if-then-else pattern. +func (kv *etcdKVBase) CreateRawEtcdTxn() RawEtcdTxn { + return &rawTxnWrapper{ inner: NewSlowLogTxn(kv.client), rootPath: kv.rootPath, } @@ -306,30 +306,30 @@ func (txn *etcdTxn) commit() error { return nil } -type lowLevelTxnWrapper struct { +type rawTxnWrapper struct { inner clientv3.Txn rootPath string } -// If implements LowLevelTxn interface for adding conditions to the transaction. -func (l *lowLevelTxnWrapper) If(conditions ...LowLevelTxnCondition) LowLevelTxn { +// If implements RawEtcdTxn interface for adding conditions to the transaction. +func (l *rawTxnWrapper) If(conditions ...RawEtcdTxnCondition) RawEtcdTxn { cmpList := make([]clientv3.Cmp, 0, len(conditions)) for _, c := range conditions { key := strings.Join([]string{l.rootPath, c.Key}, "/") - if c.CmpType == LowLevelCmpExists { + if c.CmpType == EtcdTxnCmpExists { cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), ">", 0)) - } else if c.CmpType == LowLevelCmpNotExists { + } else if c.CmpType == EtcdTxnCmpNotExists { cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), "=", 0)) } else { var cmpOp string switch c.CmpType { - case LowLevelCmpEqual: + case EtcdTxnCmpEqual: cmpOp = "=" - case LowLevelCmpNotEqual: + case EtcdTxnCmpNotEqual: cmpOp = "!=" - case LowLevelCmpGreater: + case EtcdTxnCmpGreater: cmpOp = ">" - case LowLevelCmpLess: + case EtcdTxnCmpLess: cmpOp = "<" default: panic(fmt.Sprintf("unknown cmp type %v", c.CmpType)) @@ -341,18 +341,18 @@ func (l *lowLevelTxnWrapper) If(conditions ...LowLevelTxnCondition) LowLevelTxn return l } -func (l *lowLevelTxnWrapper) convertOps(ops []LowLevelTxnOp) []clientv3.Op { +func (l *rawTxnWrapper) convertOps(ops []RawEtcdTxnOp) []clientv3.Op { opsList := make([]clientv3.Op, 0, len(ops)) for _, op := range ops { key := strings.Join([]string{l.rootPath, op.Key}, "/") switch op.OpType { - case LowLevelOpPut: + case EtcdTxnOpPut: opsList = append(opsList, clientv3.OpPut(key, op.Value)) - case LowLevelOpDelete: + case EtcdTxnOpDelete: opsList = append(opsList, clientv3.OpDelete(key)) - case LowLevelOpGet: + case EtcdTxnOpGet: opsList = append(opsList, clientv3.OpGet(key)) - case LowLevelOpGetRange: + case EtcdTxnOpGetRange: if op.EndKey == "\x00" { opsList = append(opsList, clientv3.OpGet(key, clientv3.WithPrefix(), clientv3.WithLimit(int64(op.Limit)))) } else { @@ -366,32 +366,32 @@ func (l *lowLevelTxnWrapper) convertOps(ops []LowLevelTxnOp) []clientv3.Op { return opsList } -// Then implements LowLevelTxn interface for adding operations that need to be executed when the condition passes to +// Then implements RawEtcdTxn interface for adding operations that need to be executed when the condition passes to // the transaction. -func (l *lowLevelTxnWrapper) Then(ops ...LowLevelTxnOp) LowLevelTxn { +func (l *rawTxnWrapper) Then(ops ...RawEtcdTxnOp) RawEtcdTxn { l.inner = l.inner.Then(l.convertOps(ops)...) return l } -// Else implements LowLevelTxn interface for adding operations that need to be executed when the condition doesn't pass +// Else implements RawEtcdTxn interface for adding operations that need to be executed when the condition doesn't pass // to the transaction. -func (l *lowLevelTxnWrapper) Else(ops ...LowLevelTxnOp) LowLevelTxn { +func (l *rawTxnWrapper) Else(ops ...RawEtcdTxnOp) RawEtcdTxn { l.inner = l.inner.Else(l.convertOps(ops)...) return l } -// Commit implements LowLevelTxn interface for committing the transaction. -func (l *lowLevelTxnWrapper) Commit(_ctx context.Context) (LowLevelTxnResult, error) { +// Commit implements RawEtcdTxn interface for committing the transaction. +func (l *rawTxnWrapper) Commit() (RawEtcdTxnResult, error) { resp, err := l.inner.Commit() if err != nil { - return LowLevelTxnResult{}, err + return RawEtcdTxnResult{}, err } - items := make([]LowLevelTxnResultItem, 0, len(resp.Responses)) + items := make([]RawEtcdTxnResultItem, 0, len(resp.Responses)) for i, respItem := range resp.Responses { - var resultItem LowLevelTxnResultItem + var resultItem RawEtcdTxnResultItem if put := respItem.GetResponsePut(); put != nil { // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. - resultItem = LowLevelTxnResultItem{} + resultItem = RawEtcdTxnResultItem{} if put.PrevKv != nil { key := strings.TrimPrefix(strings.TrimPrefix(string(put.PrevKv.Key), l.rootPath), "/") resultItem.KeyValuePairs = []KeyValuePair{{ @@ -401,7 +401,7 @@ func (l *lowLevelTxnWrapper) Commit(_ctx context.Context) (LowLevelTxnResult, er } } else if del := respItem.GetResponseDeleteRange(); del != nil { // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. - resultItem = LowLevelTxnResultItem{} + resultItem = RawEtcdTxnResultItem{} } else if rangeResp := respItem.GetResponseRange(); rangeResp != nil { kvs := make([]KeyValuePair, 0, len(rangeResp.Kvs)) for _, kv := range rangeResp.Kvs { @@ -411,17 +411,17 @@ func (l *lowLevelTxnWrapper) Commit(_ctx context.Context) (LowLevelTxnResult, er Value: string(kv.Value), }) } - resultItem = LowLevelTxnResultItem{ + resultItem = RawEtcdTxnResultItem{ KeyValuePairs: kvs, } } else { - return LowLevelTxnResult{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs( + return RawEtcdTxnResult{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs( fmt.Sprintf("succeeded: %v, index: %v, response: %v", resp.Succeeded, i, respItem), ) } items = append(items, resultItem) } - return LowLevelTxnResult{ + return RawEtcdTxnResult{ Succeeded: resp.Succeeded, ResultItems: items, }, nil diff --git a/pkg/storage/kv/kv.go b/pkg/storage/kv/kv.go index 5458f6d4320..9550a5aabd6 100644 --- a/pkg/storage/kv/kv.go +++ b/pkg/storage/kv/kv.go @@ -16,63 +16,63 @@ package kv import "context" -// LowLevelTxnCmpType represents the comparison type that is used in the condition of LowLevelTxn. -type LowLevelTxnCmpType int +// EtcdTxnCmpType represents the comparison type that is used in the condition of RawEtcdTxn. +type EtcdTxnCmpType int -// LowLevelTxnOpType represents the operation type that is used in the operations (either `Then` branch or `Else` -// branch) of LowLevelTxn. -type LowLevelTxnOpType int +// EtcdTxnOpType represents the operation type that is used in the operations (either `Then` branch or `Else` +// branch) of RawEtcdTxn. +type EtcdTxnOpType int // nolint:revive const ( - LowLevelCmpEqual LowLevelTxnCmpType = iota - LowLevelCmpNotEqual - LowLevelCmpLess - LowLevelCmpGreater - LowLevelCmpExists - LowLevelCmpNotExists + EtcdTxnCmpEqual EtcdTxnCmpType = iota + EtcdTxnCmpNotEqual + EtcdTxnCmpLess + EtcdTxnCmpGreater + EtcdTxnCmpExists + EtcdTxnCmpNotExists ) // nolint:revive const ( - LowLevelOpPut LowLevelTxnOpType = iota - LowLevelOpDelete - LowLevelOpGet - LowLevelOpGetRange + EtcdTxnOpPut EtcdTxnOpType = iota + EtcdTxnOpDelete + EtcdTxnOpGet + EtcdTxnOpGetRange ) -// LowLevelTxnCondition represents a condition in a LowLevelTxn. -type LowLevelTxnCondition struct { +// RawEtcdTxnCondition represents a condition in a RawEtcdTxn. +type RawEtcdTxnCondition struct { Key string - CmpType LowLevelTxnCmpType - // The value to compare with. It's not used when CmpType is LowLevelCmpExists or LowLevelCmpNotExists. + CmpType EtcdTxnCmpType + // The value to compare with. It's not used when CmpType is EtcdTxnCmpExists or EtcdTxnCmpNotExists. Value string } // CheckOnValue checks whether the condition is satisfied on the given value. -func (c *LowLevelTxnCondition) CheckOnValue(value string, exists bool) bool { +func (c *RawEtcdTxnCondition) CheckOnValue(value string, exists bool) bool { switch c.CmpType { - case LowLevelCmpEqual: + case EtcdTxnCmpEqual: if exists && value == c.Value { return true } - case LowLevelCmpNotEqual: + case EtcdTxnCmpNotEqual: if exists && value != c.Value { return true } - case LowLevelCmpLess: + case EtcdTxnCmpLess: if exists && value < c.Value { return true } - case LowLevelCmpGreater: + case EtcdTxnCmpGreater: if exists && value > c.Value { return true } - case LowLevelCmpExists: + case EtcdTxnCmpExists: if exists { return true } - case LowLevelCmpNotExists: + case EtcdTxnCmpNotExists: if !exists { return true } @@ -82,15 +82,15 @@ func (c *LowLevelTxnCondition) CheckOnValue(value string, exists bool) bool { return false } -// LowLevelTxnOp represents an operation in a LowLevelTxn's `Then` or `Else` branch and will be executed according to +// RawEtcdTxnOp represents an operation in a RawEtcdTxn's `Then` or `Else` branch and will be executed according to // the result of checking conditions. -type LowLevelTxnOp struct { +type RawEtcdTxnOp struct { Key string - OpType LowLevelTxnOpType + OpType EtcdTxnOpType Value string - // The end key when the OpType is LowLevelOpGetRange. + // The end key when the OpType is EtcdTxnOpGetRange. EndKey string - // The limit of the keys to get when the OpType is LowLevelOpGetRange. + // The limit of the keys to get when the OpType is EtcdTxnOpGetRange. Limit int } @@ -100,15 +100,15 @@ type KeyValuePair struct { Value string } -// LowLevelTxnResultItem represents a single result of a read operation in a LowLevelTxn. -type LowLevelTxnResultItem struct { +// RawEtcdTxnResultItem represents a single result of a read operation in a RawEtcdTxn. +type RawEtcdTxnResultItem struct { KeyValuePairs []KeyValuePair } -// LowLevelTxnResult represents the result of a LowLevelTxn. The results of operations in `Then` or `Else` branches +// RawEtcdTxnResult represents the result of a RawEtcdTxn. The results of operations in `Then` or `Else` branches // will be listed in `ResultItems` in the same order as the operations are added. // For Put or Delete operations, its corresponding result is the previous value before writing. -type LowLevelTxnResult struct { +type RawEtcdTxnResult struct { Succeeded bool // The results of each operation in the `Then` branch or the `Else` branch of a transaction, depending on // whether `Succeeded`. The i-th result belongs to the i-th operation added to the executed branch. @@ -116,22 +116,22 @@ type LowLevelTxnResult struct { // * For Get operations, the result contains a key-value pair representing the get result. In case the key // does not exist, its `KeyValuePairs` field will be empty. // * For GetRange operations, the result is a list of key-value pairs containing key-value paris that are scanned. - ResultItems []LowLevelTxnResultItem + ResultItems []RawEtcdTxnResultItem } -// LowLevelTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction +// RawEtcdTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction // API. When the backend is etcd, it simply calls etcd's equivalent APIs internally. Otherwise, the // behavior is simulated. // Considering that in different backends, the kv pairs may not have equivalent property of etcd's -// version, create-time, etc., the abstracted LowLevelTxn interface does not support comparing on them. +// version, create-time, etc., the abstracted RawEtcdTxn interface does not support comparing on them. // It only supports checking the value or whether the key exists. // Avoid reading/writing the same key multiple times in a single transaction, otherwise the behavior // would be undefined. -type LowLevelTxn interface { - If(conditions ...LowLevelTxnCondition) LowLevelTxn - Then(ops ...LowLevelTxnOp) LowLevelTxn - Else(ops ...LowLevelTxnOp) LowLevelTxn - Commit(ctx context.Context) (LowLevelTxnResult, error) +type RawEtcdTxn interface { + If(conditions ...RawEtcdTxnCondition) RawEtcdTxn + Then(ops ...RawEtcdTxnOp) RawEtcdTxn + Else(ops ...RawEtcdTxnOp) RawEtcdTxn + Commit() (RawEtcdTxnResult, error) } // BaseReadWrite is the API set, shared by Base and Txn interfaces, that provides basic KV read and write operations. @@ -168,7 +168,7 @@ type Base interface { // range there will be a condition constructed. Be aware of the // possibility of causing phantom read. // RunInTxn may not suit all use cases. When RunInTxn is found - // improper to use, consider using CreateLowLevelTxn instead. + // improper to use, consider using CreateRawEtcdTxn instead. // // Note that transaction are not committed until RunInTxn returns nil. // Note: @@ -179,10 +179,10 @@ type Base interface { // values loaded during transaction has not been modified before commit. RunInTxn(ctx context.Context, f func(txn Txn) error) error - // CreateLowLevelTxn creates a transaction that provides the if-then-else + // CreateRawEtcdTxn creates a transaction that provides the if-then-else // API pattern which is the same as how etcd does, makes it possible // to precisely control how etcd's transaction API is used when the // backend is etcd. When there's other backend types, the behavior will be // simulated. - CreateLowLevelTxn() LowLevelTxn + CreateRawEtcdTxn() RawEtcdTxn } diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index 9363d4c4aba..01fc387fa68 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -38,7 +38,7 @@ func TestEtcd(t *testing.T) { testRange(re, kv) testSaveMultiple(re, kv, 20) testLoadConflict(re, kv) - testLowLevelTxn(re, kv) + testRawEtcdTxn(re, kv) } func TestLevelDB(t *testing.T) { @@ -172,31 +172,31 @@ func mustHaveKeys(re *require.Assertions, kv Base, prefix string, expected ...Ke } } -func testLowLevelTxn(re *require.Assertions, kv Base) { +func testRawEtcdTxn(re *require.Assertions, kv Base) { // Test NotExists condition, putting in transaction. - res, err := kv.CreateLowLevelTxn().If( - LowLevelTxnCondition{ + res, err := kv.CreateRawEtcdTxn().If( + RawEtcdTxnCondition{ Key: "txn-k1", - CmpType: LowLevelCmpNotExists, + CmpType: EtcdTxnCmpNotExists, }, ).Then( - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-k1", - OpType: LowLevelOpPut, + OpType: EtcdTxnOpPut, Value: "v1", }, - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-k2", - OpType: LowLevelOpPut, + OpType: EtcdTxnOpPut, Value: "v2", }, ).Else( - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-unexpected", - OpType: LowLevelOpPut, + OpType: EtcdTxnOpPut, Value: "unexpected", }, - ).Commit(context.Background()) + ).Commit() re.NoError(err) re.True(res.Succeeded) @@ -207,24 +207,24 @@ func testLowLevelTxn(re *require.Assertions, kv Base) { mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) // Test Equal condition; reading in transaction. - res, err = kv.CreateLowLevelTxn().If( - LowLevelTxnCondition{ + res, err = kv.CreateRawEtcdTxn().If( + RawEtcdTxnCondition{ Key: "txn-k1", - CmpType: LowLevelCmpEqual, + CmpType: EtcdTxnCmpEqual, Value: "v1", }, ).Then( - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-k2", - OpType: LowLevelOpGet, + OpType: EtcdTxnOpGet, }, ).Else( - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-unexpected", - OpType: LowLevelOpPut, + OpType: EtcdTxnOpPut, Value: "unexpected", }, - ).Commit(context.Background()) + ).Commit() re.NoError(err) re.True(res.Succeeded) @@ -234,30 +234,30 @@ func testLowLevelTxn(re *require.Assertions, kv Base) { mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) // Test NotEqual condition, else branch, reading range in transaction, reading & writing mixed. - res, err = kv.CreateLowLevelTxn().If( - LowLevelTxnCondition{ + res, err = kv.CreateRawEtcdTxn().If( + RawEtcdTxnCondition{ Key: "txn-k1", - CmpType: LowLevelCmpNotEqual, + CmpType: EtcdTxnCmpNotEqual, Value: "v1", }, ).Then( - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-unexpected", - OpType: LowLevelOpPut, + OpType: EtcdTxnOpPut, Value: "unexpected", }, ).Else( - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-k1", - OpType: LowLevelOpGetRange, + OpType: EtcdTxnOpGetRange, EndKey: "txn-k2\x00", }, - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-k3", - OpType: LowLevelOpPut, + OpType: EtcdTxnOpPut, Value: "k3", }, - ).Commit(context.Background()) + ).Commit() re.NoError(err) re.False(res.Succeeded) @@ -272,33 +272,33 @@ func testLowLevelTxn(re *require.Assertions, kv Base) { KeyValuePair{Key: "txn-k3", Value: "k3"}) // Test Exists condition, deleting, overwriting. - res, err = kv.CreateLowLevelTxn().If( - LowLevelTxnCondition{ + res, err = kv.CreateRawEtcdTxn().If( + RawEtcdTxnCondition{ Key: "txn-k1", - CmpType: LowLevelCmpExists, + CmpType: EtcdTxnCmpExists, }, ).Then( - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-k1", - OpType: LowLevelOpDelete, + OpType: EtcdTxnOpDelete, }, - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-k2", - OpType: LowLevelOpPut, + OpType: EtcdTxnOpPut, Value: "v22", }, // Delete not existing key. - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-k4", - OpType: LowLevelOpDelete, + OpType: EtcdTxnOpDelete, }, ).Else( - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-unexpected", - OpType: LowLevelOpPut, + OpType: EtcdTxnOpPut, Value: "unexpected", }, - ).Commit(context.Background()) + ).Commit() re.NoError(err) re.True(res.Succeeded) @@ -310,21 +310,21 @@ func testLowLevelTxn(re *require.Assertions, kv Base) { mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k2", Value: "v22"}, KeyValuePair{Key: "txn-k3", Value: "k3"}) // Deleted keys can be regarded as not existing correctly. - res, err = kv.CreateLowLevelTxn().If( - LowLevelTxnCondition{ + res, err = kv.CreateRawEtcdTxn().If( + RawEtcdTxnCondition{ Key: "txn-k1", - CmpType: LowLevelCmpNotExists, + CmpType: EtcdTxnCmpNotExists, }, ).Then( - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-k2", - OpType: LowLevelOpDelete, + OpType: EtcdTxnOpDelete, }, - LowLevelTxnOp{ + RawEtcdTxnOp{ Key: "txn-k3", - OpType: LowLevelOpDelete, + OpType: EtcdTxnOpDelete, }, - ).Commit(context.Background()) + ).Commit() re.NoError(err) re.True(res.Succeeded) @@ -335,61 +335,61 @@ func testLowLevelTxn(re *require.Assertions, kv Base) { mustHaveKeys(re, kv, "txn-") // The following tests only check the correctness of the conditions. - check := func(conditions []LowLevelTxnCondition, shouldSuccess bool) { - res, err := kv.CreateLowLevelTxn().If(conditions...).Commit(context.Background()) + check := func(conditions []RawEtcdTxnCondition, shouldSuccess bool) { + res, err := kv.CreateRawEtcdTxn().If(conditions...).Commit() re.NoError(err) re.Equal(shouldSuccess, res.Succeeded) } // "txn-k1" doesn't exist at this point. - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpExists}}, false) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotExists}}, true) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpExists}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpNotExists}}, true) err = kv.Save("txn-k1", "v1") re.NoError(err) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpExists}}, true) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotExists}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpExists}}, true) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpNotExists}}, false) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}}, true) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotEqual, Value: "v1"}}, false) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v2"}}, false) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotEqual, Value: "v2"}}, true) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v1"}}, true) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpNotEqual, Value: "v1"}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v2"}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpNotEqual, Value: "v2"}}, true) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v1"}}, false) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v0"}}, false) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v2"}}, true) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpLess, Value: "v1"}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpLess, Value: "v0"}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpLess, Value: "v2"}}, true) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v1"}}, false) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v2"}}, false) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v0"}}, true) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpGreater, Value: "v1"}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpGreater, Value: "v2"}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpGreater, Value: "v0"}}, true) // Test comparing with not-existing key. err = kv.Remove("txn-k1") re.NoError(err) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}}, false) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpNotEqual, Value: "v1"}}, false) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpLess, Value: "v1"}}, false) - check([]LowLevelTxnCondition{{Key: "txn-k1", CmpType: LowLevelCmpGreater, Value: "v1"}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v1"}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpNotEqual, Value: "v1"}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpLess, Value: "v1"}}, false) + check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpGreater, Value: "v1"}}, false) // Test the conditions are conjunctions. err = kv.Save("txn-k1", "v1") re.NoError(err) err = kv.Save("txn-k2", "v2") re.NoError(err) - check([]LowLevelTxnCondition{ - {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}, - {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v2"}, + check([]RawEtcdTxnCondition{ + {Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v1"}, + {Key: "txn-k2", CmpType: EtcdTxnCmpEqual, Value: "v2"}, }, true) - check([]LowLevelTxnCondition{ - {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v1"}, - {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v0"}, + check([]RawEtcdTxnCondition{ + {Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v1"}, + {Key: "txn-k2", CmpType: EtcdTxnCmpEqual, Value: "v0"}, }, false) - check([]LowLevelTxnCondition{ - {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v0"}, - {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v2"}, + check([]RawEtcdTxnCondition{ + {Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v0"}, + {Key: "txn-k2", CmpType: EtcdTxnCmpEqual, Value: "v2"}, }, false) - check([]LowLevelTxnCondition{ - {Key: "txn-k1", CmpType: LowLevelCmpEqual, Value: "v0"}, - {Key: "txn-k2", CmpType: LowLevelCmpEqual, Value: "v0"}, + check([]RawEtcdTxnCondition{ + {Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v0"}, + {Key: "txn-k2", CmpType: EtcdTxnCmpEqual, Value: "v0"}, }, false) } diff --git a/pkg/storage/kv/levedb_kv.go b/pkg/storage/kv/levedb_kv.go index 2a1706de162..f7f3662542f 100644 --- a/pkg/storage/kv/levedb_kv.go +++ b/pkg/storage/kv/levedb_kv.go @@ -80,8 +80,8 @@ func (kv *LevelDBKV) Remove(key string) error { return errors.WithStack(kv.Delete([]byte(key), nil)) } -// CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern. -func (kv *LevelDBKV) CreateLowLevelTxn() LowLevelTxn { +// CreateRawEtcdTxn creates a transaction that provides interface in if-then-else pattern. +func (kv *LevelDBKV) CreateRawEtcdTxn() RawEtcdTxn { panic("unimplemented") } diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index d783b27fe7d..de87df39552 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -115,8 +115,8 @@ func (kv *memoryKV) removeNoLock(key string) { kv.tree.Delete(memoryKVItem{key, ""}) } -// CreateLowLevelTxn creates a transaction that provides interface in if-then-else pattern. -func (kv *memoryKV) CreateLowLevelTxn() LowLevelTxn { +// CreateRawEtcdTxn creates a transaction that provides interface in if-then-else pattern. +func (kv *memoryKV) CreateRawEtcdTxn() RawEtcdTxn { panic("unimplemented") } From d6bc7307e882c96f7fa1c8674d82f194f9890dd8 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 18 Feb 2025 16:05:48 +0800 Subject: [PATCH 07/11] Adjust comments Signed-off-by: MyonKeminta --- pkg/storage/kv/kv.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pkg/storage/kv/kv.go b/pkg/storage/kv/kv.go index 9550a5aabd6..6167fb12775 100644 --- a/pkg/storage/kv/kv.go +++ b/pkg/storage/kv/kv.go @@ -122,9 +122,6 @@ type RawEtcdTxnResult struct { // RawEtcdTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction // API. When the backend is etcd, it simply calls etcd's equivalent APIs internally. Otherwise, the // behavior is simulated. -// Considering that in different backends, the kv pairs may not have equivalent property of etcd's -// version, create-time, etc., the abstracted RawEtcdTxn interface does not support comparing on them. -// It only supports checking the value or whether the key exists. // Avoid reading/writing the same key multiple times in a single transaction, otherwise the behavior // would be undefined. type RawEtcdTxn interface { @@ -161,14 +158,15 @@ type Base interface { // This is a highly-simplified transaction interface. As // etcd's transaction API is quite limited, it's hard to use it // to provide a complete transaction model as how a normal database - // does. So when this API is running on etcd backend, each read on + // does. When this API is running on etcd backend, each read on // `txn` implicitly constructs a condition. // (ref: https://etcd.io/docs/v3.5/learning/api/#transaction) // When reading a range using `LoadRange`, for each key found in the // range there will be a condition constructed. Be aware of the // possibility of causing phantom read. // RunInTxn may not suit all use cases. When RunInTxn is found - // improper to use, consider using CreateRawEtcdTxn instead. + // improper to use, consider using CreateRawEtcdTxn instead, which + // is available when the backend is etcd. // // Note that transaction are not committed until RunInTxn returns nil. // Note: @@ -180,9 +178,8 @@ type Base interface { RunInTxn(ctx context.Context, f func(txn Txn) error) error // CreateRawEtcdTxn creates a transaction that provides the if-then-else - // API pattern which is the same as how etcd does, makes it possible - // to precisely control how etcd's transaction API is used when the - // backend is etcd. When there's other backend types, the behavior will be - // simulated. + // API pattern when the backend is etcd, makes it possible + // to precisely control how etcd's transaction API is used. When the + // backend is not etcd, it panics. CreateRawEtcdTxn() RawEtcdTxn } From e9cf2a140e1fccb87b3f98682821cf6ef890de39 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 18 Feb 2025 16:08:42 +0800 Subject: [PATCH 08/11] Remove unnecessary changes to mem_kv.go; adjust comments Signed-off-by: MyonKeminta --- pkg/storage/kv/levedb_kv.go | 2 +- pkg/storage/kv/mem_kv.go | 28 ++++++---------------------- 2 files changed, 7 insertions(+), 23 deletions(-) diff --git a/pkg/storage/kv/levedb_kv.go b/pkg/storage/kv/levedb_kv.go index f7f3662542f..207bb9bbce5 100644 --- a/pkg/storage/kv/levedb_kv.go +++ b/pkg/storage/kv/levedb_kv.go @@ -80,7 +80,7 @@ func (kv *LevelDBKV) Remove(key string) error { return errors.WithStack(kv.Delete([]byte(key), nil)) } -// CreateRawEtcdTxn creates a transaction that provides interface in if-then-else pattern. +// CreateRawEtcdTxn implements kv.Base interface. func (kv *LevelDBKV) CreateRawEtcdTxn() RawEtcdTxn { panic("unimplemented") } diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index de87df39552..c69a8b6e9d0 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -52,15 +52,11 @@ func (s *memoryKVItem) Less(than *memoryKVItem) bool { func (kv *memoryKV) Load(key string) (string, error) { kv.RLock() defer kv.RUnlock() - return kv.loadNoLock(key), nil -} - -func (kv *memoryKV) loadNoLock(key string) string { item, ok := kv.tree.Get(memoryKVItem{key, ""}) if !ok { - return "" + return "", nil } - return item.value + return item.value, nil } // LoadRange loads the keys in the range of [key, endKey). @@ -73,11 +69,6 @@ func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string }) kv.RLock() defer kv.RUnlock() - keys, values := kv.loadRangeNoLock(key, endKey, limit) - return keys, values, nil -} - -func (kv *memoryKV) loadRangeNoLock(key, endKey string, limit int) ([]string, []string) { keys := make([]string, 0, limit) values := make([]string, 0, limit) kv.tree.AscendRange(memoryKVItem{key, ""}, memoryKVItem{endKey, ""}, func(item memoryKVItem) bool { @@ -88,34 +79,27 @@ func (kv *memoryKV) loadRangeNoLock(key, endKey string, limit int) ([]string, [] } return true }) - return keys, values + return keys, values, nil } // Save saves the key-value pair. func (kv *memoryKV) Save(key, value string) error { kv.Lock() defer kv.Unlock() - kv.saveNoLock(key, value) - return nil -} - -func (kv *memoryKV) saveNoLock(key, value string) { kv.tree.ReplaceOrInsert(memoryKVItem{key, value}) + return nil } // Remove removes the key. func (kv *memoryKV) Remove(key string) error { kv.Lock() defer kv.Unlock() - kv.removeNoLock(key) - return nil -} -func (kv *memoryKV) removeNoLock(key string) { kv.tree.Delete(memoryKVItem{key, ""}) + return nil } -// CreateRawEtcdTxn creates a transaction that provides interface in if-then-else pattern. +// CreateRawEtcdTxn implements kv.Base interface. func (kv *memoryKV) CreateRawEtcdTxn() RawEtcdTxn { panic("unimplemented") } From 4fa3b5d61f58ce61fb273a047ba80cd267779437 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 18 Feb 2025 16:14:25 +0800 Subject: [PATCH 09/11] Fix lint Signed-off-by: MyonKeminta --- pkg/storage/kv/levedb_kv.go | 2 +- pkg/storage/kv/mem_kv.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/kv/levedb_kv.go b/pkg/storage/kv/levedb_kv.go index 207bb9bbce5..0a5e89e4c8d 100644 --- a/pkg/storage/kv/levedb_kv.go +++ b/pkg/storage/kv/levedb_kv.go @@ -81,7 +81,7 @@ func (kv *LevelDBKV) Remove(key string) error { } // CreateRawEtcdTxn implements kv.Base interface. -func (kv *LevelDBKV) CreateRawEtcdTxn() RawEtcdTxn { +func (*LevelDBKV) CreateRawEtcdTxn() RawEtcdTxn { panic("unimplemented") } diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index c69a8b6e9d0..be896ea153c 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -100,7 +100,7 @@ func (kv *memoryKV) Remove(key string) error { } // CreateRawEtcdTxn implements kv.Base interface. -func (kv *memoryKV) CreateRawEtcdTxn() RawEtcdTxn { +func (*memoryKV) CreateRawEtcdTxn() RawEtcdTxn { panic("unimplemented") } From 4f868de57564fc6cc23d48ef2f3168f1a5f61f79 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 18 Feb 2025 17:08:22 +0800 Subject: [PATCH 10/11] Remove useless code; address comments Signed-off-by: MyonKeminta --- pkg/storage/kv/etcd_kv.go | 43 +++++++++++++++-------------------- pkg/storage/kv/kv.go | 47 ++++++--------------------------------- pkg/storage/kv/kv_test.go | 28 +++++++++++------------ 3 files changed, 39 insertions(+), 79 deletions(-) diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index 357e85824f7..ab26b89272f 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -381,48 +381,41 @@ func (l *rawTxnWrapper) Else(ops ...RawEtcdTxnOp) RawEtcdTxn { } // Commit implements RawEtcdTxn interface for committing the transaction. -func (l *rawTxnWrapper) Commit() (RawEtcdTxnResult, error) { +func (l *rawTxnWrapper) Commit() (RawEtcdTxnResponse, error) { resp, err := l.inner.Commit() if err != nil { - return RawEtcdTxnResult{}, err + return RawEtcdTxnResponse{}, err } - items := make([]RawEtcdTxnResultItem, 0, len(resp.Responses)) - for i, respItem := range resp.Responses { - var resultItem RawEtcdTxnResultItem - if put := respItem.GetResponsePut(); put != nil { + items := make([]RawEtcdTxnResponseItem, 0, len(resp.Responses)) + for i, rpcRespItem := range resp.Responses { + var respItem RawEtcdTxnResponseItem + if put := rpcRespItem.GetResponsePut(); put != nil { // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. - resultItem = RawEtcdTxnResultItem{} - if put.PrevKv != nil { - key := strings.TrimPrefix(strings.TrimPrefix(string(put.PrevKv.Key), l.rootPath), "/") - resultItem.KeyValuePairs = []KeyValuePair{{ - Key: key, - Value: string(put.PrevKv.Value), - }} - } - } else if del := respItem.GetResponseDeleteRange(); del != nil { + respItem = RawEtcdTxnResponseItem{} + } else if del := rpcRespItem.GetResponseDeleteRange(); del != nil { // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. - resultItem = RawEtcdTxnResultItem{} - } else if rangeResp := respItem.GetResponseRange(); rangeResp != nil { + respItem = RawEtcdTxnResponseItem{} + } else if rangeResp := rpcRespItem.GetResponseRange(); rangeResp != nil { kvs := make([]KeyValuePair, 0, len(rangeResp.Kvs)) for _, kv := range rangeResp.Kvs { - key := strings.TrimPrefix(strings.TrimPrefix(string(kv.Key), l.rootPath), "/") + key := strings.TrimPrefix(string(kv.Key), l.rootPath+"/") kvs = append(kvs, KeyValuePair{ Key: key, Value: string(kv.Value), }) } - resultItem = RawEtcdTxnResultItem{ + respItem = RawEtcdTxnResponseItem{ KeyValuePairs: kvs, } } else { - return RawEtcdTxnResult{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs( - fmt.Sprintf("succeeded: %v, index: %v, response: %v", resp.Succeeded, i, respItem), + return RawEtcdTxnResponse{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs( + fmt.Sprintf("succeeded: %v, index: %v, response: %v", resp.Succeeded, i, rpcRespItem), ) } - items = append(items, resultItem) + items = append(items, respItem) } - return RawEtcdTxnResult{ - Succeeded: resp.Succeeded, - ResultItems: items, + return RawEtcdTxnResponse{ + Succeeded: resp.Succeeded, + Responses: items, }, nil } diff --git a/pkg/storage/kv/kv.go b/pkg/storage/kv/kv.go index 6167fb12775..f6c94a54c00 100644 --- a/pkg/storage/kv/kv.go +++ b/pkg/storage/kv/kv.go @@ -49,39 +49,6 @@ type RawEtcdTxnCondition struct { Value string } -// CheckOnValue checks whether the condition is satisfied on the given value. -func (c *RawEtcdTxnCondition) CheckOnValue(value string, exists bool) bool { - switch c.CmpType { - case EtcdTxnCmpEqual: - if exists && value == c.Value { - return true - } - case EtcdTxnCmpNotEqual: - if exists && value != c.Value { - return true - } - case EtcdTxnCmpLess: - if exists && value < c.Value { - return true - } - case EtcdTxnCmpGreater: - if exists && value > c.Value { - return true - } - case EtcdTxnCmpExists: - if exists { - return true - } - case EtcdTxnCmpNotExists: - if !exists { - return true - } - default: - panic("unreachable") - } - return false -} - // RawEtcdTxnOp represents an operation in a RawEtcdTxn's `Then` or `Else` branch and will be executed according to // the result of checking conditions. type RawEtcdTxnOp struct { @@ -100,15 +67,15 @@ type KeyValuePair struct { Value string } -// RawEtcdTxnResultItem represents a single result of a read operation in a RawEtcdTxn. -type RawEtcdTxnResultItem struct { +// RawEtcdTxnResponseItem represents a single result of a read operation in a RawEtcdTxn. +type RawEtcdTxnResponseItem struct { KeyValuePairs []KeyValuePair } -// RawEtcdTxnResult represents the result of a RawEtcdTxn. The results of operations in `Then` or `Else` branches -// will be listed in `ResultItems` in the same order as the operations are added. +// RawEtcdTxnResponse represents the result of a RawEtcdTxn. The results of operations in `Then` or `Else` branches +// will be listed in `Responses` in the same order as the operations are added. // For Put or Delete operations, its corresponding result is the previous value before writing. -type RawEtcdTxnResult struct { +type RawEtcdTxnResponse struct { Succeeded bool // The results of each operation in the `Then` branch or the `Else` branch of a transaction, depending on // whether `Succeeded`. The i-th result belongs to the i-th operation added to the executed branch. @@ -116,7 +83,7 @@ type RawEtcdTxnResult struct { // * For Get operations, the result contains a key-value pair representing the get result. In case the key // does not exist, its `KeyValuePairs` field will be empty. // * For GetRange operations, the result is a list of key-value pairs containing key-value paris that are scanned. - ResultItems []RawEtcdTxnResultItem + Responses []RawEtcdTxnResponseItem } // RawEtcdTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction @@ -128,7 +95,7 @@ type RawEtcdTxn interface { If(conditions ...RawEtcdTxnCondition) RawEtcdTxn Then(ops ...RawEtcdTxnOp) RawEtcdTxn Else(ops ...RawEtcdTxnOp) RawEtcdTxn - Commit() (RawEtcdTxnResult, error) + Commit() (RawEtcdTxnResponse, error) } // BaseReadWrite is the API set, shared by Base and Txn interfaces, that provides basic KV read and write operations. diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index 01fc387fa68..d0f01ad12da 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -200,9 +200,9 @@ func testRawEtcdTxn(re *require.Assertions, kv Base) { re.NoError(err) re.True(res.Succeeded) - re.Len(res.ResultItems, 2) - re.Empty(res.ResultItems[0].KeyValuePairs) - re.Empty(res.ResultItems[1].KeyValuePairs) + re.Len(res.Responses, 2) + re.Empty(res.Responses[0].KeyValuePairs) + re.Empty(res.Responses[1].KeyValuePairs) mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) @@ -228,9 +228,9 @@ func testRawEtcdTxn(re *require.Assertions, kv Base) { re.NoError(err) re.True(res.Succeeded) - re.Len(res.ResultItems, 1) - re.Len(res.ResultItems[0].KeyValuePairs, 1) - re.Equal("v2", res.ResultItems[0].KeyValuePairs[0].Value) + re.Len(res.Responses, 1) + re.Len(res.Responses[0].KeyValuePairs, 1) + re.Equal("v2", res.Responses[0].KeyValuePairs[0].Value) mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) // Test NotEqual condition, else branch, reading range in transaction, reading & writing mixed. @@ -261,10 +261,10 @@ func testRawEtcdTxn(re *require.Assertions, kv Base) { re.NoError(err) re.False(res.Succeeded) - re.Len(res.ResultItems, 2) - re.Len(res.ResultItems[0].KeyValuePairs, 2) - re.Equal([]KeyValuePair{{Key: "txn-k1", Value: "v1"}, {Key: "txn-k2", Value: "v2"}}, res.ResultItems[0].KeyValuePairs) - re.Empty(res.ResultItems[1].KeyValuePairs) + re.Len(res.Responses, 2) + re.Len(res.Responses[0].KeyValuePairs, 2) + re.Equal([]KeyValuePair{{Key: "txn-k1", Value: "v1"}, {Key: "txn-k2", Value: "v2"}}, res.Responses[0].KeyValuePairs) + re.Empty(res.Responses[1].KeyValuePairs) mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, @@ -302,8 +302,8 @@ func testRawEtcdTxn(re *require.Assertions, kv Base) { re.NoError(err) re.True(res.Succeeded) - re.Len(res.ResultItems, 3) - for _, item := range res.ResultItems { + re.Len(res.Responses, 3) + for _, item := range res.Responses { re.Empty(item.KeyValuePairs) } @@ -328,8 +328,8 @@ func testRawEtcdTxn(re *require.Assertions, kv Base) { re.NoError(err) re.True(res.Succeeded) - re.Len(res.ResultItems, 2) - for _, item := range res.ResultItems { + re.Len(res.Responses, 2) + for _, item := range res.Responses { re.Empty(item.KeyValuePairs) } mustHaveKeys(re, kv, "txn-") From 13b3106c1e024981ee705b39d54e07b94cae487a Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 18 Feb 2025 17:12:34 +0800 Subject: [PATCH 11/11] Remove mentions to etcd in the new raw transaction interface Signed-off-by: MyonKeminta --- pkg/storage/kv/etcd_kv.go | 58 +++++++------- pkg/storage/kv/kv.go | 76 +++++++++--------- pkg/storage/kv/kv_test.go | 154 ++++++++++++++++++------------------ pkg/storage/kv/levedb_kv.go | 4 +- pkg/storage/kv/mem_kv.go | 4 +- 5 files changed, 148 insertions(+), 148 deletions(-) diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index ab26b89272f..7fe2691f30b 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -140,8 +140,8 @@ func (kv *etcdKVBase) Remove(key string) error { return nil } -// CreateRawEtcdTxn creates a transaction that provides interface in if-then-else pattern. -func (kv *etcdKVBase) CreateRawEtcdTxn() RawEtcdTxn { +// CreateRawTxn creates a transaction that provides interface in if-then-else pattern. +func (kv *etcdKVBase) CreateRawTxn() RawTxn { return &rawTxnWrapper{ inner: NewSlowLogTxn(kv.client), rootPath: kv.rootPath, @@ -311,25 +311,25 @@ type rawTxnWrapper struct { rootPath string } -// If implements RawEtcdTxn interface for adding conditions to the transaction. -func (l *rawTxnWrapper) If(conditions ...RawEtcdTxnCondition) RawEtcdTxn { +// If implements RawTxn interface for adding conditions to the transaction. +func (l *rawTxnWrapper) If(conditions ...RawTxnCondition) RawTxn { cmpList := make([]clientv3.Cmp, 0, len(conditions)) for _, c := range conditions { key := strings.Join([]string{l.rootPath, c.Key}, "/") - if c.CmpType == EtcdTxnCmpExists { + if c.CmpType == RawTxnCmpExists { cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), ">", 0)) - } else if c.CmpType == EtcdTxnCmpNotExists { + } else if c.CmpType == RawTxnCmpNotExists { cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), "=", 0)) } else { var cmpOp string switch c.CmpType { - case EtcdTxnCmpEqual: + case RawTxnCmpEqual: cmpOp = "=" - case EtcdTxnCmpNotEqual: + case RawTxnCmpNotEqual: cmpOp = "!=" - case EtcdTxnCmpGreater: + case RawTxnCmpGreater: cmpOp = ">" - case EtcdTxnCmpLess: + case RawTxnCmpLess: cmpOp = "<" default: panic(fmt.Sprintf("unknown cmp type %v", c.CmpType)) @@ -341,18 +341,18 @@ func (l *rawTxnWrapper) If(conditions ...RawEtcdTxnCondition) RawEtcdTxn { return l } -func (l *rawTxnWrapper) convertOps(ops []RawEtcdTxnOp) []clientv3.Op { +func (l *rawTxnWrapper) convertOps(ops []RawTxnOp) []clientv3.Op { opsList := make([]clientv3.Op, 0, len(ops)) for _, op := range ops { key := strings.Join([]string{l.rootPath, op.Key}, "/") switch op.OpType { - case EtcdTxnOpPut: + case RawTxnOpPut: opsList = append(opsList, clientv3.OpPut(key, op.Value)) - case EtcdTxnOpDelete: + case RawTxnOpDelete: opsList = append(opsList, clientv3.OpDelete(key)) - case EtcdTxnOpGet: + case RawTxnOpGet: opsList = append(opsList, clientv3.OpGet(key)) - case EtcdTxnOpGetRange: + case RawTxnOpGetRange: if op.EndKey == "\x00" { opsList = append(opsList, clientv3.OpGet(key, clientv3.WithPrefix(), clientv3.WithLimit(int64(op.Limit)))) } else { @@ -366,35 +366,35 @@ func (l *rawTxnWrapper) convertOps(ops []RawEtcdTxnOp) []clientv3.Op { return opsList } -// Then implements RawEtcdTxn interface for adding operations that need to be executed when the condition passes to +// Then implements RawTxn interface for adding operations that need to be executed when the condition passes to // the transaction. -func (l *rawTxnWrapper) Then(ops ...RawEtcdTxnOp) RawEtcdTxn { +func (l *rawTxnWrapper) Then(ops ...RawTxnOp) RawTxn { l.inner = l.inner.Then(l.convertOps(ops)...) return l } -// Else implements RawEtcdTxn interface for adding operations that need to be executed when the condition doesn't pass +// Else implements RawTxn interface for adding operations that need to be executed when the condition doesn't pass // to the transaction. -func (l *rawTxnWrapper) Else(ops ...RawEtcdTxnOp) RawEtcdTxn { +func (l *rawTxnWrapper) Else(ops ...RawTxnOp) RawTxn { l.inner = l.inner.Else(l.convertOps(ops)...) return l } -// Commit implements RawEtcdTxn interface for committing the transaction. -func (l *rawTxnWrapper) Commit() (RawEtcdTxnResponse, error) { +// Commit implements RawTxn interface for committing the transaction. +func (l *rawTxnWrapper) Commit() (RawTxnResponse, error) { resp, err := l.inner.Commit() if err != nil { - return RawEtcdTxnResponse{}, err + return RawTxnResponse{}, err } - items := make([]RawEtcdTxnResponseItem, 0, len(resp.Responses)) + items := make([]RawTxnResponseItem, 0, len(resp.Responses)) for i, rpcRespItem := range resp.Responses { - var respItem RawEtcdTxnResponseItem + var respItem RawTxnResponseItem if put := rpcRespItem.GetResponsePut(); put != nil { // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. - respItem = RawEtcdTxnResponseItem{} + respItem = RawTxnResponseItem{} } else if del := rpcRespItem.GetResponseDeleteRange(); del != nil { // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. - respItem = RawEtcdTxnResponseItem{} + respItem = RawTxnResponseItem{} } else if rangeResp := rpcRespItem.GetResponseRange(); rangeResp != nil { kvs := make([]KeyValuePair, 0, len(rangeResp.Kvs)) for _, kv := range rangeResp.Kvs { @@ -404,17 +404,17 @@ func (l *rawTxnWrapper) Commit() (RawEtcdTxnResponse, error) { Value: string(kv.Value), }) } - respItem = RawEtcdTxnResponseItem{ + respItem = RawTxnResponseItem{ KeyValuePairs: kvs, } } else { - return RawEtcdTxnResponse{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs( + return RawTxnResponse{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs( fmt.Sprintf("succeeded: %v, index: %v, response: %v", resp.Succeeded, i, rpcRespItem), ) } items = append(items, respItem) } - return RawEtcdTxnResponse{ + return RawTxnResponse{ Succeeded: resp.Succeeded, Responses: items, }, nil diff --git a/pkg/storage/kv/kv.go b/pkg/storage/kv/kv.go index f6c94a54c00..2def6fac057 100644 --- a/pkg/storage/kv/kv.go +++ b/pkg/storage/kv/kv.go @@ -16,48 +16,48 @@ package kv import "context" -// EtcdTxnCmpType represents the comparison type that is used in the condition of RawEtcdTxn. -type EtcdTxnCmpType int +// RawTxnCmpType represents the comparison type that is used in the condition of RawTxn. +type RawTxnCmpType int -// EtcdTxnOpType represents the operation type that is used in the operations (either `Then` branch or `Else` -// branch) of RawEtcdTxn. -type EtcdTxnOpType int +// RawTxnOpType represents the operation type that is used in the operations (either `Then` branch or `Else` +// branch) of RawTxn. +type RawTxnOpType int // nolint:revive const ( - EtcdTxnCmpEqual EtcdTxnCmpType = iota - EtcdTxnCmpNotEqual - EtcdTxnCmpLess - EtcdTxnCmpGreater - EtcdTxnCmpExists - EtcdTxnCmpNotExists + RawTxnCmpEqual RawTxnCmpType = iota + RawTxnCmpNotEqual + RawTxnCmpLess + RawTxnCmpGreater + RawTxnCmpExists + RawTxnCmpNotExists ) // nolint:revive const ( - EtcdTxnOpPut EtcdTxnOpType = iota - EtcdTxnOpDelete - EtcdTxnOpGet - EtcdTxnOpGetRange + RawTxnOpPut RawTxnOpType = iota + RawTxnOpDelete + RawTxnOpGet + RawTxnOpGetRange ) -// RawEtcdTxnCondition represents a condition in a RawEtcdTxn. -type RawEtcdTxnCondition struct { +// RawTxnCondition represents a condition in a RawTxn. +type RawTxnCondition struct { Key string - CmpType EtcdTxnCmpType - // The value to compare with. It's not used when CmpType is EtcdTxnCmpExists or EtcdTxnCmpNotExists. + CmpType RawTxnCmpType + // The value to compare with. It's not used when CmpType is RawTxnCmpExists or RawTxnCmpNotExists. Value string } -// RawEtcdTxnOp represents an operation in a RawEtcdTxn's `Then` or `Else` branch and will be executed according to +// RawTxnOp represents an operation in a RawTxn's `Then` or `Else` branch and will be executed according to // the result of checking conditions. -type RawEtcdTxnOp struct { +type RawTxnOp struct { Key string - OpType EtcdTxnOpType + OpType RawTxnOpType Value string - // The end key when the OpType is EtcdTxnOpGetRange. + // The end key when the OpType is RawTxnOpGetRange. EndKey string - // The limit of the keys to get when the OpType is EtcdTxnOpGetRange. + // The limit of the keys to get when the OpType is RawTxnOpGetRange. Limit int } @@ -67,15 +67,15 @@ type KeyValuePair struct { Value string } -// RawEtcdTxnResponseItem represents a single result of a read operation in a RawEtcdTxn. -type RawEtcdTxnResponseItem struct { +// RawTxnResponseItem represents a single result of a read operation in a RawTxn. +type RawTxnResponseItem struct { KeyValuePairs []KeyValuePair } -// RawEtcdTxnResponse represents the result of a RawEtcdTxn. The results of operations in `Then` or `Else` branches +// RawTxnResponse represents the result of a RawTxn. The results of operations in `Then` or `Else` branches // will be listed in `Responses` in the same order as the operations are added. // For Put or Delete operations, its corresponding result is the previous value before writing. -type RawEtcdTxnResponse struct { +type RawTxnResponse struct { Succeeded bool // The results of each operation in the `Then` branch or the `Else` branch of a transaction, depending on // whether `Succeeded`. The i-th result belongs to the i-th operation added to the executed branch. @@ -83,19 +83,19 @@ type RawEtcdTxnResponse struct { // * For Get operations, the result contains a key-value pair representing the get result. In case the key // does not exist, its `KeyValuePairs` field will be empty. // * For GetRange operations, the result is a list of key-value pairs containing key-value paris that are scanned. - Responses []RawEtcdTxnResponseItem + Responses []RawTxnResponseItem } -// RawEtcdTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction +// RawTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction // API. When the backend is etcd, it simply calls etcd's equivalent APIs internally. Otherwise, the // behavior is simulated. // Avoid reading/writing the same key multiple times in a single transaction, otherwise the behavior // would be undefined. -type RawEtcdTxn interface { - If(conditions ...RawEtcdTxnCondition) RawEtcdTxn - Then(ops ...RawEtcdTxnOp) RawEtcdTxn - Else(ops ...RawEtcdTxnOp) RawEtcdTxn - Commit() (RawEtcdTxnResponse, error) +type RawTxn interface { + If(conditions ...RawTxnCondition) RawTxn + Then(ops ...RawTxnOp) RawTxn + Else(ops ...RawTxnOp) RawTxn + Commit() (RawTxnResponse, error) } // BaseReadWrite is the API set, shared by Base and Txn interfaces, that provides basic KV read and write operations. @@ -132,7 +132,7 @@ type Base interface { // range there will be a condition constructed. Be aware of the // possibility of causing phantom read. // RunInTxn may not suit all use cases. When RunInTxn is found - // improper to use, consider using CreateRawEtcdTxn instead, which + // improper to use, consider using CreateRawTxn instead, which // is available when the backend is etcd. // // Note that transaction are not committed until RunInTxn returns nil. @@ -144,9 +144,9 @@ type Base interface { // values loaded during transaction has not been modified before commit. RunInTxn(ctx context.Context, f func(txn Txn) error) error - // CreateRawEtcdTxn creates a transaction that provides the if-then-else + // CreateRawTxn creates a transaction that provides the if-then-else // API pattern when the backend is etcd, makes it possible // to precisely control how etcd's transaction API is used. When the // backend is not etcd, it panics. - CreateRawEtcdTxn() RawEtcdTxn + CreateRawTxn() RawTxn } diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index d0f01ad12da..42f6f833510 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -38,7 +38,7 @@ func TestEtcd(t *testing.T) { testRange(re, kv) testSaveMultiple(re, kv, 20) testLoadConflict(re, kv) - testRawEtcdTxn(re, kv) + testRawTxn(re, kv) } func TestLevelDB(t *testing.T) { @@ -172,28 +172,28 @@ func mustHaveKeys(re *require.Assertions, kv Base, prefix string, expected ...Ke } } -func testRawEtcdTxn(re *require.Assertions, kv Base) { +func testRawTxn(re *require.Assertions, kv Base) { // Test NotExists condition, putting in transaction. - res, err := kv.CreateRawEtcdTxn().If( - RawEtcdTxnCondition{ + res, err := kv.CreateRawTxn().If( + RawTxnCondition{ Key: "txn-k1", - CmpType: EtcdTxnCmpNotExists, + CmpType: RawTxnCmpNotExists, }, ).Then( - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-k1", - OpType: EtcdTxnOpPut, + OpType: RawTxnOpPut, Value: "v1", }, - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-k2", - OpType: EtcdTxnOpPut, + OpType: RawTxnOpPut, Value: "v2", }, ).Else( - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-unexpected", - OpType: EtcdTxnOpPut, + OpType: RawTxnOpPut, Value: "unexpected", }, ).Commit() @@ -207,21 +207,21 @@ func testRawEtcdTxn(re *require.Assertions, kv Base) { mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) // Test Equal condition; reading in transaction. - res, err = kv.CreateRawEtcdTxn().If( - RawEtcdTxnCondition{ + res, err = kv.CreateRawTxn().If( + RawTxnCondition{ Key: "txn-k1", - CmpType: EtcdTxnCmpEqual, + CmpType: RawTxnCmpEqual, Value: "v1", }, ).Then( - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-k2", - OpType: EtcdTxnOpGet, + OpType: RawTxnOpGet, }, ).Else( - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-unexpected", - OpType: EtcdTxnOpPut, + OpType: RawTxnOpPut, Value: "unexpected", }, ).Commit() @@ -234,27 +234,27 @@ func testRawEtcdTxn(re *require.Assertions, kv Base) { mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) // Test NotEqual condition, else branch, reading range in transaction, reading & writing mixed. - res, err = kv.CreateRawEtcdTxn().If( - RawEtcdTxnCondition{ + res, err = kv.CreateRawTxn().If( + RawTxnCondition{ Key: "txn-k1", - CmpType: EtcdTxnCmpNotEqual, + CmpType: RawTxnCmpNotEqual, Value: "v1", }, ).Then( - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-unexpected", - OpType: EtcdTxnOpPut, + OpType: RawTxnOpPut, Value: "unexpected", }, ).Else( - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-k1", - OpType: EtcdTxnOpGetRange, + OpType: RawTxnOpGetRange, EndKey: "txn-k2\x00", }, - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-k3", - OpType: EtcdTxnOpPut, + OpType: RawTxnOpPut, Value: "k3", }, ).Commit() @@ -272,30 +272,30 @@ func testRawEtcdTxn(re *require.Assertions, kv Base) { KeyValuePair{Key: "txn-k3", Value: "k3"}) // Test Exists condition, deleting, overwriting. - res, err = kv.CreateRawEtcdTxn().If( - RawEtcdTxnCondition{ + res, err = kv.CreateRawTxn().If( + RawTxnCondition{ Key: "txn-k1", - CmpType: EtcdTxnCmpExists, + CmpType: RawTxnCmpExists, }, ).Then( - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-k1", - OpType: EtcdTxnOpDelete, + OpType: RawTxnOpDelete, }, - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-k2", - OpType: EtcdTxnOpPut, + OpType: RawTxnOpPut, Value: "v22", }, // Delete not existing key. - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-k4", - OpType: EtcdTxnOpDelete, + OpType: RawTxnOpDelete, }, ).Else( - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-unexpected", - OpType: EtcdTxnOpPut, + OpType: RawTxnOpPut, Value: "unexpected", }, ).Commit() @@ -310,19 +310,19 @@ func testRawEtcdTxn(re *require.Assertions, kv Base) { mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k2", Value: "v22"}, KeyValuePair{Key: "txn-k3", Value: "k3"}) // Deleted keys can be regarded as not existing correctly. - res, err = kv.CreateRawEtcdTxn().If( - RawEtcdTxnCondition{ + res, err = kv.CreateRawTxn().If( + RawTxnCondition{ Key: "txn-k1", - CmpType: EtcdTxnCmpNotExists, + CmpType: RawTxnCmpNotExists, }, ).Then( - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-k2", - OpType: EtcdTxnOpDelete, + OpType: RawTxnOpDelete, }, - RawEtcdTxnOp{ + RawTxnOp{ Key: "txn-k3", - OpType: EtcdTxnOpDelete, + OpType: RawTxnOpDelete, }, ).Commit() @@ -335,61 +335,61 @@ func testRawEtcdTxn(re *require.Assertions, kv Base) { mustHaveKeys(re, kv, "txn-") // The following tests only check the correctness of the conditions. - check := func(conditions []RawEtcdTxnCondition, shouldSuccess bool) { - res, err := kv.CreateRawEtcdTxn().If(conditions...).Commit() + check := func(conditions []RawTxnCondition, shouldSuccess bool) { + res, err := kv.CreateRawTxn().If(conditions...).Commit() re.NoError(err) re.Equal(shouldSuccess, res.Succeeded) } // "txn-k1" doesn't exist at this point. - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpExists}}, false) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpNotExists}}, true) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpExists}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpNotExists}}, true) err = kv.Save("txn-k1", "v1") re.NoError(err) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpExists}}, true) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpNotExists}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpExists}}, true) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpNotExists}}, false) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v1"}}, true) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpNotEqual, Value: "v1"}}, false) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v2"}}, false) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpNotEqual, Value: "v2"}}, true) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v1"}}, true) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpNotEqual, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v2"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpNotEqual, Value: "v2"}}, true) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpLess, Value: "v1"}}, false) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpLess, Value: "v0"}}, false) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpLess, Value: "v2"}}, true) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpLess, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpLess, Value: "v0"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpLess, Value: "v2"}}, true) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpGreater, Value: "v1"}}, false) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpGreater, Value: "v2"}}, false) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpGreater, Value: "v0"}}, true) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpGreater, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpGreater, Value: "v2"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpGreater, Value: "v0"}}, true) // Test comparing with not-existing key. err = kv.Remove("txn-k1") re.NoError(err) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v1"}}, false) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpNotEqual, Value: "v1"}}, false) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpLess, Value: "v1"}}, false) - check([]RawEtcdTxnCondition{{Key: "txn-k1", CmpType: EtcdTxnCmpGreater, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpNotEqual, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpLess, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpGreater, Value: "v1"}}, false) // Test the conditions are conjunctions. err = kv.Save("txn-k1", "v1") re.NoError(err) err = kv.Save("txn-k2", "v2") re.NoError(err) - check([]RawEtcdTxnCondition{ - {Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v1"}, - {Key: "txn-k2", CmpType: EtcdTxnCmpEqual, Value: "v2"}, + check([]RawTxnCondition{ + {Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v1"}, + {Key: "txn-k2", CmpType: RawTxnCmpEqual, Value: "v2"}, }, true) - check([]RawEtcdTxnCondition{ - {Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v1"}, - {Key: "txn-k2", CmpType: EtcdTxnCmpEqual, Value: "v0"}, + check([]RawTxnCondition{ + {Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v1"}, + {Key: "txn-k2", CmpType: RawTxnCmpEqual, Value: "v0"}, }, false) - check([]RawEtcdTxnCondition{ - {Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v0"}, - {Key: "txn-k2", CmpType: EtcdTxnCmpEqual, Value: "v2"}, + check([]RawTxnCondition{ + {Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v0"}, + {Key: "txn-k2", CmpType: RawTxnCmpEqual, Value: "v2"}, }, false) - check([]RawEtcdTxnCondition{ - {Key: "txn-k1", CmpType: EtcdTxnCmpEqual, Value: "v0"}, - {Key: "txn-k2", CmpType: EtcdTxnCmpEqual, Value: "v0"}, + check([]RawTxnCondition{ + {Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v0"}, + {Key: "txn-k2", CmpType: RawTxnCmpEqual, Value: "v0"}, }, false) } diff --git a/pkg/storage/kv/levedb_kv.go b/pkg/storage/kv/levedb_kv.go index 0a5e89e4c8d..c4b59edac65 100644 --- a/pkg/storage/kv/levedb_kv.go +++ b/pkg/storage/kv/levedb_kv.go @@ -80,8 +80,8 @@ func (kv *LevelDBKV) Remove(key string) error { return errors.WithStack(kv.Delete([]byte(key), nil)) } -// CreateRawEtcdTxn implements kv.Base interface. -func (*LevelDBKV) CreateRawEtcdTxn() RawEtcdTxn { +// CreateRawTxn implements kv.Base interface. +func (*LevelDBKV) CreateRawTxn() RawTxn { panic("unimplemented") } diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index be896ea153c..b572290307b 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -99,8 +99,8 @@ func (kv *memoryKV) Remove(key string) error { return nil } -// CreateRawEtcdTxn implements kv.Base interface. -func (*memoryKV) CreateRawEtcdTxn() RawEtcdTxn { +// CreateRawTxn implements kv.Base interface. +func (*memoryKV) CreateRawTxn() RawTxn { panic("unimplemented") }