diff --git a/errors.toml b/errors.toml index 9980a98ab14..303ef31ea16 100644 --- a/errors.toml +++ b/errors.toml @@ -421,6 +421,11 @@ error = ''' internal etcd transaction error occurred ''' +["PD:etcd:ErrEtcdTxnResponse"] +error = ''' +etcd transaction returned invalid response: %v +''' + ["PD:etcd:ErrEtcdURLMap"] error = ''' etcd url map error diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 571d90b8651..7cc6bc97f81 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -390,6 +390,7 @@ var ( ErrEtcdGrantLease = errors.Normalize("etcd lease failed", errors.RFCCodeText("PD:etcd:ErrEtcdGrantLease")) ErrEtcdTxnInternal = errors.Normalize("internal etcd transaction error occurred", errors.RFCCodeText("PD:etcd:ErrEtcdTxnInternal")) ErrEtcdTxnConflict = errors.Normalize("etcd transaction failed, conflicted and rolled back", errors.RFCCodeText("PD:etcd:ErrEtcdTxnConflict")) + ErrEtcdTxnResponse = errors.Normalize("etcd transaction returned invalid response: %v", errors.RFCCodeText("PD:etcd:ErrEtcdTxnResponse")) ErrEtcdKVPut = errors.Normalize("etcd KV put failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVPut")) ErrEtcdKVDelete = errors.Normalize("etcd KV delete failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVDelete")) ErrEtcdKVGet = errors.Normalize("etcd KV get failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVGet")) diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index c25f4d66060..7fe2691f30b 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -16,6 +16,7 @@ package kv import ( "context" + "fmt" "path" "strings" "time" @@ -139,6 +140,14 @@ func (kv *etcdKVBase) Remove(key string) error { return nil } +// CreateRawTxn creates a transaction that provides interface in if-then-else pattern. +func (kv *etcdKVBase) CreateRawTxn() RawTxn { + return &rawTxnWrapper{ + inner: NewSlowLogTxn(kv.client), + rootPath: kv.rootPath, + } +} + // SlowLogTxn wraps etcd transaction and log slow one. type SlowLogTxn struct { clientv3.Txn @@ -296,3 +305,117 @@ func (txn *etcdTxn) commit() error { } return nil } + +type rawTxnWrapper struct { + inner clientv3.Txn + rootPath string +} + +// If implements RawTxn interface for adding conditions to the transaction. +func (l *rawTxnWrapper) If(conditions ...RawTxnCondition) RawTxn { + cmpList := make([]clientv3.Cmp, 0, len(conditions)) + for _, c := range conditions { + key := strings.Join([]string{l.rootPath, c.Key}, "/") + if c.CmpType == RawTxnCmpExists { + cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), ">", 0)) + } else if c.CmpType == RawTxnCmpNotExists { + cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), "=", 0)) + } else { + var cmpOp string + switch c.CmpType { + case RawTxnCmpEqual: + cmpOp = "=" + case RawTxnCmpNotEqual: + cmpOp = "!=" + case RawTxnCmpGreater: + cmpOp = ">" + case RawTxnCmpLess: + cmpOp = "<" + default: + panic(fmt.Sprintf("unknown cmp type %v", c.CmpType)) + } + cmpList = append(cmpList, clientv3.Compare(clientv3.Value(key), cmpOp, c.Value)) + } + } + l.inner = l.inner.If(cmpList...) + return l +} + +func (l *rawTxnWrapper) convertOps(ops []RawTxnOp) []clientv3.Op { + opsList := make([]clientv3.Op, 0, len(ops)) + for _, op := range ops { + key := strings.Join([]string{l.rootPath, op.Key}, "/") + switch op.OpType { + case RawTxnOpPut: + opsList = append(opsList, clientv3.OpPut(key, op.Value)) + case RawTxnOpDelete: + opsList = append(opsList, clientv3.OpDelete(key)) + case RawTxnOpGet: + opsList = append(opsList, clientv3.OpGet(key)) + case RawTxnOpGetRange: + if op.EndKey == "\x00" { + opsList = append(opsList, clientv3.OpGet(key, clientv3.WithPrefix(), clientv3.WithLimit(int64(op.Limit)))) + } else { + endKey := strings.Join([]string{l.rootPath, op.EndKey}, "/") + opsList = append(opsList, clientv3.OpGet(key, clientv3.WithRange(endKey), clientv3.WithLimit(int64(op.Limit)))) + } + default: + panic(fmt.Sprintf("unknown op type %v", op.OpType)) + } + } + return opsList +} + +// Then implements RawTxn interface for adding operations that need to be executed when the condition passes to +// the transaction. +func (l *rawTxnWrapper) Then(ops ...RawTxnOp) RawTxn { + l.inner = l.inner.Then(l.convertOps(ops)...) + return l +} + +// Else implements RawTxn interface for adding operations that need to be executed when the condition doesn't pass +// to the transaction. +func (l *rawTxnWrapper) Else(ops ...RawTxnOp) RawTxn { + l.inner = l.inner.Else(l.convertOps(ops)...) + return l +} + +// Commit implements RawTxn interface for committing the transaction. +func (l *rawTxnWrapper) Commit() (RawTxnResponse, error) { + resp, err := l.inner.Commit() + if err != nil { + return RawTxnResponse{}, err + } + items := make([]RawTxnResponseItem, 0, len(resp.Responses)) + for i, rpcRespItem := range resp.Responses { + var respItem RawTxnResponseItem + if put := rpcRespItem.GetResponsePut(); put != nil { + // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. + respItem = RawTxnResponseItem{} + } else if del := rpcRespItem.GetResponseDeleteRange(); del != nil { + // Put and delete operations of etcd's transaction won't return any previous data. Skip handling it. + respItem = RawTxnResponseItem{} + } else if rangeResp := rpcRespItem.GetResponseRange(); rangeResp != nil { + kvs := make([]KeyValuePair, 0, len(rangeResp.Kvs)) + for _, kv := range rangeResp.Kvs { + key := strings.TrimPrefix(string(kv.Key), l.rootPath+"/") + kvs = append(kvs, KeyValuePair{ + Key: key, + Value: string(kv.Value), + }) + } + respItem = RawTxnResponseItem{ + KeyValuePairs: kvs, + } + } else { + return RawTxnResponse{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs( + fmt.Sprintf("succeeded: %v, index: %v, response: %v", resp.Succeeded, i, rpcRespItem), + ) + } + items = append(items, respItem) + } + return RawTxnResponse{ + Succeeded: resp.Succeeded, + Responses: items, + }, nil +} diff --git a/pkg/storage/kv/kv.go b/pkg/storage/kv/kv.go index a6e870db9c9..2def6fac057 100644 --- a/pkg/storage/kv/kv.go +++ b/pkg/storage/kv/kv.go @@ -16,24 +16,125 @@ package kv import "context" -// Txn bundles multiple operations into a single executable unit. -// It enables kv to atomically apply a set of updates. -type Txn interface { +// RawTxnCmpType represents the comparison type that is used in the condition of RawTxn. +type RawTxnCmpType int + +// RawTxnOpType represents the operation type that is used in the operations (either `Then` branch or `Else` +// branch) of RawTxn. +type RawTxnOpType int + +// nolint:revive +const ( + RawTxnCmpEqual RawTxnCmpType = iota + RawTxnCmpNotEqual + RawTxnCmpLess + RawTxnCmpGreater + RawTxnCmpExists + RawTxnCmpNotExists +) + +// nolint:revive +const ( + RawTxnOpPut RawTxnOpType = iota + RawTxnOpDelete + RawTxnOpGet + RawTxnOpGetRange +) + +// RawTxnCondition represents a condition in a RawTxn. +type RawTxnCondition struct { + Key string + CmpType RawTxnCmpType + // The value to compare with. It's not used when CmpType is RawTxnCmpExists or RawTxnCmpNotExists. + Value string +} + +// RawTxnOp represents an operation in a RawTxn's `Then` or `Else` branch and will be executed according to +// the result of checking conditions. +type RawTxnOp struct { + Key string + OpType RawTxnOpType + Value string + // The end key when the OpType is RawTxnOpGetRange. + EndKey string + // The limit of the keys to get when the OpType is RawTxnOpGetRange. + Limit int +} + +// KeyValuePair represents a pair of key and value. +type KeyValuePair struct { + Key string + Value string +} + +// RawTxnResponseItem represents a single result of a read operation in a RawTxn. +type RawTxnResponseItem struct { + KeyValuePairs []KeyValuePair +} + +// RawTxnResponse represents the result of a RawTxn. The results of operations in `Then` or `Else` branches +// will be listed in `Responses` in the same order as the operations are added. +// For Put or Delete operations, its corresponding result is the previous value before writing. +type RawTxnResponse struct { + Succeeded bool + // The results of each operation in the `Then` branch or the `Else` branch of a transaction, depending on + // whether `Succeeded`. The i-th result belongs to the i-th operation added to the executed branch. + // * For Put or Delete operations, the result is empty. + // * For Get operations, the result contains a key-value pair representing the get result. In case the key + // does not exist, its `KeyValuePairs` field will be empty. + // * For GetRange operations, the result is a list of key-value pairs containing key-value paris that are scanned. + Responses []RawTxnResponseItem +} + +// RawTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction +// API. When the backend is etcd, it simply calls etcd's equivalent APIs internally. Otherwise, the +// behavior is simulated. +// Avoid reading/writing the same key multiple times in a single transaction, otherwise the behavior +// would be undefined. +type RawTxn interface { + If(conditions ...RawTxnCondition) RawTxn + Then(ops ...RawTxnOp) RawTxn + Else(ops ...RawTxnOp) RawTxn + Commit() (RawTxnResponse, error) +} + +// BaseReadWrite is the API set, shared by Base and Txn interfaces, that provides basic KV read and write operations. +type BaseReadWrite interface { Save(key, value string) error Remove(key string) error Load(key string) (string, error) LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) } +// Txn bundles multiple operations into a single executable unit. +// It enables kv to atomically apply a set of updates. +type Txn interface { + BaseReadWrite +} + // Base is an abstract interface for load/save pd cluster data. type Base interface { - Txn + BaseReadWrite // RunInTxn runs the user provided function in a Transaction. // If user provided function f returns a non-nil error, then // transaction will not be committed, the same error will be // returned by RunInTxn. // Otherwise, it returns the error occurred during the // transaction. + // + // This is a highly-simplified transaction interface. As + // etcd's transaction API is quite limited, it's hard to use it + // to provide a complete transaction model as how a normal database + // does. When this API is running on etcd backend, each read on + // `txn` implicitly constructs a condition. + // (ref: https://etcd.io/docs/v3.5/learning/api/#transaction) + // When reading a range using `LoadRange`, for each key found in the + // range there will be a condition constructed. Be aware of the + // possibility of causing phantom read. + // RunInTxn may not suit all use cases. When RunInTxn is found + // improper to use, consider using CreateRawTxn instead, which + // is available when the backend is etcd. + // // Note that transaction are not committed until RunInTxn returns nil. // Note: // 1. Load and LoadRange operations provides only stale read. @@ -42,4 +143,10 @@ type Base interface { // 2. Only when storage is etcd, does RunInTxn checks that // values loaded during transaction has not been modified before commit. RunInTxn(ctx context.Context, f func(txn Txn) error) error + + // CreateRawTxn creates a transaction that provides the if-then-else + // API pattern when the backend is etcd, makes it possible + // to precisely control how etcd's transaction API is used. When the + // backend is not etcd, it panics. + CreateRawTxn() RawTxn } diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index f05561b0c0b..42f6f833510 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -38,6 +38,7 @@ func TestEtcd(t *testing.T) { testRange(re, kv) testSaveMultiple(re, kv, 20) testLoadConflict(re, kv) + testRawTxn(re, kv) } func TestLevelDB(t *testing.T) { @@ -159,3 +160,236 @@ func testLoadConflict(re *require.Assertions, kv Base) { // When other writer exists, loader must error. re.Error(kv.RunInTxn(context.Background(), conflictLoader)) } + +// nolint:unparam +func mustHaveKeys(re *require.Assertions, kv Base, prefix string, expected ...KeyValuePair) { + keys, values, err := kv.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), 0) + re.NoError(err) + re.Equal(len(expected), len(keys)) + for i, key := range keys { + re.Equal(expected[i].Key, key) + re.Equal(expected[i].Value, values[i]) + } +} + +func testRawTxn(re *require.Assertions, kv Base) { + // Test NotExists condition, putting in transaction. + res, err := kv.CreateRawTxn().If( + RawTxnCondition{ + Key: "txn-k1", + CmpType: RawTxnCmpNotExists, + }, + ).Then( + RawTxnOp{ + Key: "txn-k1", + OpType: RawTxnOpPut, + Value: "v1", + }, + RawTxnOp{ + Key: "txn-k2", + OpType: RawTxnOpPut, + Value: "v2", + }, + ).Else( + RawTxnOp{ + Key: "txn-unexpected", + OpType: RawTxnOpPut, + Value: "unexpected", + }, + ).Commit() + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.Responses, 2) + re.Empty(res.Responses[0].KeyValuePairs) + re.Empty(res.Responses[1].KeyValuePairs) + + mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) + + // Test Equal condition; reading in transaction. + res, err = kv.CreateRawTxn().If( + RawTxnCondition{ + Key: "txn-k1", + CmpType: RawTxnCmpEqual, + Value: "v1", + }, + ).Then( + RawTxnOp{ + Key: "txn-k2", + OpType: RawTxnOpGet, + }, + ).Else( + RawTxnOp{ + Key: "txn-unexpected", + OpType: RawTxnOpPut, + Value: "unexpected", + }, + ).Commit() + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.Responses, 1) + re.Len(res.Responses[0].KeyValuePairs, 1) + re.Equal("v2", res.Responses[0].KeyValuePairs[0].Value) + mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k1", Value: "v1"}, KeyValuePair{Key: "txn-k2", Value: "v2"}) + + // Test NotEqual condition, else branch, reading range in transaction, reading & writing mixed. + res, err = kv.CreateRawTxn().If( + RawTxnCondition{ + Key: "txn-k1", + CmpType: RawTxnCmpNotEqual, + Value: "v1", + }, + ).Then( + RawTxnOp{ + Key: "txn-unexpected", + OpType: RawTxnOpPut, + Value: "unexpected", + }, + ).Else( + RawTxnOp{ + Key: "txn-k1", + OpType: RawTxnOpGetRange, + EndKey: "txn-k2\x00", + }, + RawTxnOp{ + Key: "txn-k3", + OpType: RawTxnOpPut, + Value: "k3", + }, + ).Commit() + + re.NoError(err) + re.False(res.Succeeded) + re.Len(res.Responses, 2) + re.Len(res.Responses[0].KeyValuePairs, 2) + re.Equal([]KeyValuePair{{Key: "txn-k1", Value: "v1"}, {Key: "txn-k2", Value: "v2"}}, res.Responses[0].KeyValuePairs) + re.Empty(res.Responses[1].KeyValuePairs) + + mustHaveKeys(re, kv, "txn-", + KeyValuePair{Key: "txn-k1", Value: "v1"}, + KeyValuePair{Key: "txn-k2", Value: "v2"}, + KeyValuePair{Key: "txn-k3", Value: "k3"}) + + // Test Exists condition, deleting, overwriting. + res, err = kv.CreateRawTxn().If( + RawTxnCondition{ + Key: "txn-k1", + CmpType: RawTxnCmpExists, + }, + ).Then( + RawTxnOp{ + Key: "txn-k1", + OpType: RawTxnOpDelete, + }, + RawTxnOp{ + Key: "txn-k2", + OpType: RawTxnOpPut, + Value: "v22", + }, + // Delete not existing key. + RawTxnOp{ + Key: "txn-k4", + OpType: RawTxnOpDelete, + }, + ).Else( + RawTxnOp{ + Key: "txn-unexpected", + OpType: RawTxnOpPut, + Value: "unexpected", + }, + ).Commit() + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.Responses, 3) + for _, item := range res.Responses { + re.Empty(item.KeyValuePairs) + } + + mustHaveKeys(re, kv, "txn-", KeyValuePair{Key: "txn-k2", Value: "v22"}, KeyValuePair{Key: "txn-k3", Value: "k3"}) + + // Deleted keys can be regarded as not existing correctly. + res, err = kv.CreateRawTxn().If( + RawTxnCondition{ + Key: "txn-k1", + CmpType: RawTxnCmpNotExists, + }, + ).Then( + RawTxnOp{ + Key: "txn-k2", + OpType: RawTxnOpDelete, + }, + RawTxnOp{ + Key: "txn-k3", + OpType: RawTxnOpDelete, + }, + ).Commit() + + re.NoError(err) + re.True(res.Succeeded) + re.Len(res.Responses, 2) + for _, item := range res.Responses { + re.Empty(item.KeyValuePairs) + } + mustHaveKeys(re, kv, "txn-") + + // The following tests only check the correctness of the conditions. + check := func(conditions []RawTxnCondition, shouldSuccess bool) { + res, err := kv.CreateRawTxn().If(conditions...).Commit() + re.NoError(err) + re.Equal(shouldSuccess, res.Succeeded) + } + + // "txn-k1" doesn't exist at this point. + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpExists}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpNotExists}}, true) + + err = kv.Save("txn-k1", "v1") + re.NoError(err) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpExists}}, true) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpNotExists}}, false) + + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v1"}}, true) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpNotEqual, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v2"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpNotEqual, Value: "v2"}}, true) + + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpLess, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpLess, Value: "v0"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpLess, Value: "v2"}}, true) + + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpGreater, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpGreater, Value: "v2"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpGreater, Value: "v0"}}, true) + + // Test comparing with not-existing key. + err = kv.Remove("txn-k1") + re.NoError(err) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpNotEqual, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpLess, Value: "v1"}}, false) + check([]RawTxnCondition{{Key: "txn-k1", CmpType: RawTxnCmpGreater, Value: "v1"}}, false) + + // Test the conditions are conjunctions. + err = kv.Save("txn-k1", "v1") + re.NoError(err) + err = kv.Save("txn-k2", "v2") + re.NoError(err) + check([]RawTxnCondition{ + {Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v1"}, + {Key: "txn-k2", CmpType: RawTxnCmpEqual, Value: "v2"}, + }, true) + check([]RawTxnCondition{ + {Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v1"}, + {Key: "txn-k2", CmpType: RawTxnCmpEqual, Value: "v0"}, + }, false) + check([]RawTxnCondition{ + {Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v0"}, + {Key: "txn-k2", CmpType: RawTxnCmpEqual, Value: "v2"}, + }, false) + check([]RawTxnCondition{ + {Key: "txn-k1", CmpType: RawTxnCmpEqual, Value: "v0"}, + {Key: "txn-k2", CmpType: RawTxnCmpEqual, Value: "v0"}, + }, false) +} diff --git a/pkg/storage/kv/levedb_kv.go b/pkg/storage/kv/levedb_kv.go index 5a74c1928e8..c4b59edac65 100644 --- a/pkg/storage/kv/levedb_kv.go +++ b/pkg/storage/kv/levedb_kv.go @@ -80,6 +80,11 @@ func (kv *LevelDBKV) Remove(key string) error { return errors.WithStack(kv.Delete([]byte(key), nil)) } +// CreateRawTxn implements kv.Base interface. +func (*LevelDBKV) CreateRawTxn() RawTxn { + panic("unimplemented") +} + // levelDBTxn implements kv.Txn. // It utilizes leveldb.Batch to batch user operations to an atomic execution unit. type levelDBTxn struct { diff --git a/pkg/storage/kv/mem_kv.go b/pkg/storage/kv/mem_kv.go index cc5dca29851..b572290307b 100644 --- a/pkg/storage/kv/mem_kv.go +++ b/pkg/storage/kv/mem_kv.go @@ -99,6 +99,11 @@ func (kv *memoryKV) Remove(key string) error { return nil } +// CreateRawTxn implements kv.Base interface. +func (*memoryKV) CreateRawTxn() RawTxn { + panic("unimplemented") +} + // memTxn implements kv.Txn. type memTxn struct { kv *memoryKV