Skip to content

Commit

Permalink
Merge branch 'master' into dynamically_enable_router_client
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Feb 19, 2025
2 parents b2630f0 + b757cbe commit fb83211
Show file tree
Hide file tree
Showing 14 changed files with 521 additions and 44 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ error = '''
internal etcd transaction error occurred
'''

["PD:etcd:ErrEtcdTxnResponse"]
error = '''
etcd transaction returned invalid response: %v
'''

["PD:etcd:ErrEtcdURLMap"]
error = '''
etcd url map error
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20241104061623-bce95733dad7
github.com/pingcap/tidb-dashboard v0.0.0-20250219061340-d62018124ae2
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/common v0.55.0
github.com/sasha-s/go-deadlock v0.3.5
Expand Down Expand Up @@ -144,7 +144,7 @@ require (
github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-sqlite3 v1.14.15 // indirect
github.com/mattn/go-sqlite3 v1.14.24 // indirect
github.com/minio/sio v0.3.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand Down Expand Up @@ -214,8 +214,8 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/datatypes v1.1.0 // indirect
gorm.io/driver/mysql v1.4.5 // indirect
gorm.io/driver/sqlite v1.4.3 // indirect
gorm.io/gorm v1.24.3 // indirect
gorm.io/driver/sqlite v1.5.7 // indirect
gorm.io/gorm v1.25.12 // indirect
moul.io/zapgorm2 v1.1.0 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)
17 changes: 8 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI=
github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/microsoft/go-mssqldb v0.17.0 h1:Fto83dMZPnYv1Zwx5vHHxpNraeEaUlQ/hhHLgZiaenE=
github.com/microsoft/go-mssqldb v0.17.0/go.mod h1:OkoNGhGEs8EZqchVTtochlXruEhEOaO4S0d2sB5aeGQ=
github.com/minio/sio v0.3.0 h1:syEFBewzOMOYVzSTFpp1MqpSZk8rUNbz8VIIc+PNzus=
Expand Down Expand Up @@ -399,8 +399,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tidb-dashboard v0.0.0-20241104061623-bce95733dad7 h1:qQnXeY2585WkQaPVc7p4i1MWUECKCbkbw5/QcrK5Ahg=
github.com/pingcap/tidb-dashboard v0.0.0-20241104061623-bce95733dad7/go.mod h1:AT9vfeojwr/GGCHTURXtA8yZBE9AW8LdIo02/eYdfHU=
github.com/pingcap/tidb-dashboard v0.0.0-20250219061340-d62018124ae2 h1:XYKnwxMFz3tOTdNP1ZUoMIS951Ol8MTbq6sKVsg8lFE=
github.com/pingcap/tidb-dashboard v0.0.0-20250219061340-d62018124ae2/go.mod h1:1FZhe+luYtDAevex4G6OGCbLLiJM3E/IN6Plrmh1zo0=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4=
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down Expand Up @@ -797,15 +797,14 @@ gorm.io/driver/mysql v1.4.5 h1:u1lytId4+o9dDaNcPCFzNv7h6wvmc92UjNk3z8enSBU=
gorm.io/driver/mysql v1.4.5/go.mod h1:SxzItlnT1cb6e1e4ZRpgJN2VYtcqJgqnHxWr4wsP8oc=
gorm.io/driver/postgres v1.4.5 h1:mTeXTTtHAgnS9PgmhN2YeUbazYpLhUI1doLnw42XUZc=
gorm.io/driver/postgres v1.4.5/go.mod h1:GKNQYSJ14qvWkvPwXljMGehpKrhlDNsqYRr5HnYGncg=
gorm.io/driver/sqlite v1.4.3 h1:HBBcZSDnWi5BW3B3rwvVTc510KGkBkexlOg0QrmLUuU=
gorm.io/driver/sqlite v1.4.3/go.mod h1:0Aq3iPO+v9ZKbcdiz8gLWRw5VOPcBOPUQJFLq5e2ecI=
gorm.io/driver/sqlite v1.5.7 h1:8NvsrhP0ifM7LX9G4zPB97NwovUakUxc+2V2uuf3Z1I=
gorm.io/driver/sqlite v1.5.7/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4=
gorm.io/driver/sqlserver v1.4.1 h1:t4r4r6Jam5E6ejqP7N82qAJIJAht27EGT41HyPfXRw0=
gorm.io/driver/sqlserver v1.4.1/go.mod h1:DJ4P+MeZbc5rvY58PnmN1Lnyvb5gw5NPzGshHDnJLig=
gorm.io/gorm v1.21.9/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=
gorm.io/gorm v1.23.8/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
gorm.io/gorm v1.24.0/go.mod h1:DVrVomtaYTbqs7gB/x2uVvqnXzv0nqjB396B8cG4dBA=
gorm.io/gorm v1.24.3 h1:WL2ifUmzR/SLp85CSURAfybcHnGZ+yLSGSxgYXlFBHg=
gorm.io/gorm v1.24.3/go.mod h1:DVrVomtaYTbqs7gB/x2uVvqnXzv0nqjB396B8cG4dBA=
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
gotest.tools/gotestsum v1.7.0 h1:RwpqwwFKBAa2h+F6pMEGpE707Edld0etUD3GhqqhDNc=
gotest.tools/gotestsum v1.7.0/go.mod h1:V1m4Jw3eBerhI/A6qCxUE07RnCg7ACkKj9BYcAm09V8=
gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0=
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ var (
ErrEtcdGrantLease = errors.Normalize("etcd lease failed", errors.RFCCodeText("PD:etcd:ErrEtcdGrantLease"))
ErrEtcdTxnInternal = errors.Normalize("internal etcd transaction error occurred", errors.RFCCodeText("PD:etcd:ErrEtcdTxnInternal"))
ErrEtcdTxnConflict = errors.Normalize("etcd transaction failed, conflicted and rolled back", errors.RFCCodeText("PD:etcd:ErrEtcdTxnConflict"))
ErrEtcdTxnResponse = errors.Normalize("etcd transaction returned invalid response: %v", errors.RFCCodeText("PD:etcd:ErrEtcdTxnResponse"))
ErrEtcdKVPut = errors.Normalize("etcd KV put failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVPut"))
ErrEtcdKVDelete = errors.Normalize("etcd KV delete failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVDelete"))
ErrEtcdKVGet = errors.Normalize("etcd KV get failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVGet"))
Expand Down
123 changes: 123 additions & 0 deletions pkg/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"context"
"fmt"
"path"
"strings"
"time"
Expand Down Expand Up @@ -139,6 +140,14 @@ func (kv *etcdKVBase) Remove(key string) error {
return nil
}

// CreateRawTxn creates a transaction that provides interface in if-then-else pattern.
func (kv *etcdKVBase) CreateRawTxn() RawTxn {
return &rawTxnWrapper{
inner: NewSlowLogTxn(kv.client),
rootPath: kv.rootPath,
}
}

// SlowLogTxn wraps etcd transaction and log slow one.
type SlowLogTxn struct {
clientv3.Txn
Expand Down Expand Up @@ -296,3 +305,117 @@ func (txn *etcdTxn) commit() error {
}
return nil
}

type rawTxnWrapper struct {
inner clientv3.Txn
rootPath string
}

// If implements RawTxn interface for adding conditions to the transaction.
func (l *rawTxnWrapper) If(conditions ...RawTxnCondition) RawTxn {
cmpList := make([]clientv3.Cmp, 0, len(conditions))
for _, c := range conditions {
key := strings.Join([]string{l.rootPath, c.Key}, "/")
if c.CmpType == RawTxnCmpExists {
cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), ">", 0))
} else if c.CmpType == RawTxnCmpNotExists {
cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), "=", 0))
} else {
var cmpOp string
switch c.CmpType {
case RawTxnCmpEqual:
cmpOp = "="
case RawTxnCmpNotEqual:
cmpOp = "!="
case RawTxnCmpGreater:
cmpOp = ">"
case RawTxnCmpLess:
cmpOp = "<"
default:
panic(fmt.Sprintf("unknown cmp type %v", c.CmpType))
}
cmpList = append(cmpList, clientv3.Compare(clientv3.Value(key), cmpOp, c.Value))
}
}
l.inner = l.inner.If(cmpList...)
return l
}

func (l *rawTxnWrapper) convertOps(ops []RawTxnOp) []clientv3.Op {
opsList := make([]clientv3.Op, 0, len(ops))
for _, op := range ops {
key := strings.Join([]string{l.rootPath, op.Key}, "/")
switch op.OpType {
case RawTxnOpPut:
opsList = append(opsList, clientv3.OpPut(key, op.Value))
case RawTxnOpDelete:
opsList = append(opsList, clientv3.OpDelete(key))
case RawTxnOpGet:
opsList = append(opsList, clientv3.OpGet(key))
case RawTxnOpGetRange:
if op.EndKey == "\x00" {
opsList = append(opsList, clientv3.OpGet(key, clientv3.WithPrefix(), clientv3.WithLimit(int64(op.Limit))))
} else {
endKey := strings.Join([]string{l.rootPath, op.EndKey}, "/")
opsList = append(opsList, clientv3.OpGet(key, clientv3.WithRange(endKey), clientv3.WithLimit(int64(op.Limit))))
}
default:
panic(fmt.Sprintf("unknown op type %v", op.OpType))
}
}
return opsList
}

// Then implements RawTxn interface for adding operations that need to be executed when the condition passes to
// the transaction.
func (l *rawTxnWrapper) Then(ops ...RawTxnOp) RawTxn {
l.inner = l.inner.Then(l.convertOps(ops)...)
return l
}

// Else implements RawTxn interface for adding operations that need to be executed when the condition doesn't pass
// to the transaction.
func (l *rawTxnWrapper) Else(ops ...RawTxnOp) RawTxn {
l.inner = l.inner.Else(l.convertOps(ops)...)
return l
}

// Commit implements RawTxn interface for committing the transaction.
func (l *rawTxnWrapper) Commit() (RawTxnResponse, error) {
resp, err := l.inner.Commit()
if err != nil {
return RawTxnResponse{}, err
}
items := make([]RawTxnResponseItem, 0, len(resp.Responses))
for i, rpcRespItem := range resp.Responses {
var respItem RawTxnResponseItem
if put := rpcRespItem.GetResponsePut(); put != nil {
// Put and delete operations of etcd's transaction won't return any previous data. Skip handling it.
respItem = RawTxnResponseItem{}
} else if del := rpcRespItem.GetResponseDeleteRange(); del != nil {
// Put and delete operations of etcd's transaction won't return any previous data. Skip handling it.
respItem = RawTxnResponseItem{}
} else if rangeResp := rpcRespItem.GetResponseRange(); rangeResp != nil {
kvs := make([]KeyValuePair, 0, len(rangeResp.Kvs))
for _, kv := range rangeResp.Kvs {
key := strings.TrimPrefix(string(kv.Key), l.rootPath+"/")
kvs = append(kvs, KeyValuePair{
Key: key,
Value: string(kv.Value),
})
}
respItem = RawTxnResponseItem{
KeyValuePairs: kvs,
}
} else {
return RawTxnResponse{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs(
fmt.Sprintf("succeeded: %v, index: %v, response: %v", resp.Succeeded, i, rpcRespItem),
)
}
items = append(items, respItem)
}
return RawTxnResponse{
Succeeded: resp.Succeeded,
Responses: items,
}, nil
}
115 changes: 111 additions & 4 deletions pkg/storage/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,125 @@ package kv

import "context"

// Txn bundles multiple operations into a single executable unit.
// It enables kv to atomically apply a set of updates.
type Txn interface {
// RawTxnCmpType represents the comparison type that is used in the condition of RawTxn.
type RawTxnCmpType int

// RawTxnOpType represents the operation type that is used in the operations (either `Then` branch or `Else`
// branch) of RawTxn.
type RawTxnOpType int

// nolint:revive
const (
RawTxnCmpEqual RawTxnCmpType = iota
RawTxnCmpNotEqual
RawTxnCmpLess
RawTxnCmpGreater
RawTxnCmpExists
RawTxnCmpNotExists
)

// nolint:revive
const (
RawTxnOpPut RawTxnOpType = iota
RawTxnOpDelete
RawTxnOpGet
RawTxnOpGetRange
)

// RawTxnCondition represents a condition in a RawTxn.
type RawTxnCondition struct {
Key string
CmpType RawTxnCmpType
// The value to compare with. It's not used when CmpType is RawTxnCmpExists or RawTxnCmpNotExists.
Value string
}

// RawTxnOp represents an operation in a RawTxn's `Then` or `Else` branch and will be executed according to
// the result of checking conditions.
type RawTxnOp struct {
Key string
OpType RawTxnOpType
Value string
// The end key when the OpType is RawTxnOpGetRange.
EndKey string
// The limit of the keys to get when the OpType is RawTxnOpGetRange.
Limit int
}

// KeyValuePair represents a pair of key and value.
type KeyValuePair struct {
Key string
Value string
}

// RawTxnResponseItem represents a single result of a read operation in a RawTxn.
type RawTxnResponseItem struct {
KeyValuePairs []KeyValuePair
}

// RawTxnResponse represents the result of a RawTxn. The results of operations in `Then` or `Else` branches
// will be listed in `Responses` in the same order as the operations are added.
// For Put or Delete operations, its corresponding result is the previous value before writing.
type RawTxnResponse struct {
Succeeded bool
// The results of each operation in the `Then` branch or the `Else` branch of a transaction, depending on
// whether `Succeeded`. The i-th result belongs to the i-th operation added to the executed branch.
// * For Put or Delete operations, the result is empty.
// * For Get operations, the result contains a key-value pair representing the get result. In case the key
// does not exist, its `KeyValuePairs` field will be empty.
// * For GetRange operations, the result is a list of key-value pairs containing key-value paris that are scanned.
Responses []RawTxnResponseItem
}

// RawTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction
// API. When the backend is etcd, it simply calls etcd's equivalent APIs internally. Otherwise, the
// behavior is simulated.
// Avoid reading/writing the same key multiple times in a single transaction, otherwise the behavior
// would be undefined.
type RawTxn interface {
If(conditions ...RawTxnCondition) RawTxn
Then(ops ...RawTxnOp) RawTxn
Else(ops ...RawTxnOp) RawTxn
Commit() (RawTxnResponse, error)
}

// BaseReadWrite is the API set, shared by Base and Txn interfaces, that provides basic KV read and write operations.
type BaseReadWrite interface {
Save(key, value string) error
Remove(key string) error
Load(key string) (string, error)
LoadRange(key, endKey string, limit int) (keys []string, values []string, err error)
}

// Txn bundles multiple operations into a single executable unit.
// It enables kv to atomically apply a set of updates.
type Txn interface {
BaseReadWrite
}

// Base is an abstract interface for load/save pd cluster data.
type Base interface {
Txn
BaseReadWrite
// RunInTxn runs the user provided function in a Transaction.
// If user provided function f returns a non-nil error, then
// transaction will not be committed, the same error will be
// returned by RunInTxn.
// Otherwise, it returns the error occurred during the
// transaction.
//
// This is a highly-simplified transaction interface. As
// etcd's transaction API is quite limited, it's hard to use it
// to provide a complete transaction model as how a normal database
// does. When this API is running on etcd backend, each read on
// `txn` implicitly constructs a condition.
// (ref: https://etcd.io/docs/v3.5/learning/api/#transaction)
// When reading a range using `LoadRange`, for each key found in the
// range there will be a condition constructed. Be aware of the
// possibility of causing phantom read.
// RunInTxn may not suit all use cases. When RunInTxn is found
// improper to use, consider using CreateRawTxn instead, which
// is available when the backend is etcd.
//
// Note that transaction are not committed until RunInTxn returns nil.
// Note:
// 1. Load and LoadRange operations provides only stale read.
Expand All @@ -42,4 +143,10 @@ type Base interface {
// 2. Only when storage is etcd, does RunInTxn checks that
// values loaded during transaction has not been modified before commit.
RunInTxn(ctx context.Context, f func(txn Txn) error) error

// CreateRawTxn creates a transaction that provides the if-then-else
// API pattern when the backend is etcd, makes it possible
// to precisely control how etcd's transaction API is used. When the
// backend is not etcd, it panics.
CreateRawTxn() RawTxn
}
Loading

0 comments on commit fb83211

Please sign in to comment.