Skip to content

Commit d3032e7

Browse files
authored
Merge branch 'master' into v1_13_0
2 parents 328759e + 2c27d94 commit d3032e7

File tree

19 files changed

+967
-100
lines changed

19 files changed

+967
-100
lines changed

.github/workflows/ci.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ jobs:
1010
test:
1111
strategy:
1212
matrix:
13-
go: [ "1.24", "1.23", "1.22" ]
14-
os: [ ubuntu-24.04, ubuntu-22.04 ]
13+
go: [ "1.24", "1.23" ]
14+
os: [ ubuntu-latest, ubuntu-24.04, ubuntu-22.04 ]
1515
name: Tests Go ${{ matrix.go }} on ${{ matrix.os }} # This name is used in main branch protection rules
1616
runs-on: ${{ matrix.os }}
1717

@@ -56,8 +56,9 @@ jobs:
5656
strategy:
5757
matrix:
5858
mysql_version:
59-
- 8.0.40
60-
- 8.4.3
59+
- 8.0.42
60+
- 8.4.5
61+
- 9.3.0
6162
name: Tests with MySQL ${{ matrix.mysql_version }}
6263
runs-on: ubuntu-latest
6364
services:

README.md

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ import (
6262
"github.com/go-mysql-org/go-mysql/replication"
6363
"os"
6464
)
65-
// Create a binlog syncer with a unique server id, the server id must be different from other MySQL's.
65+
// Create a binlog syncer with a unique server id, the server id must be different from other MySQL's.
6666
// flavor is mysql or mariadb
6767
cfg := replication.BinlogSyncerConfig {
6868
ServerID: 100,
@@ -133,13 +133,39 @@ Schema: test
133133
Query: DROP TABLE IF EXISTS `test_replication` /* generated by server */
134134
```
135135

136-
## Canal
136+
### MariaDB 11.4+ compatibility
137+
138+
MariaDB 11.4+ introduced an optimization where events written through transaction or statement cache have `LogPos=0` so they can be copied directly to the binlog without computing the real end position. This optimization improves performance but makes position tracking unreliable for replication clients that need to track LogPos of events inside transactions.
139+
140+
To address this, a `FillZeroLogPos` configuration option is available:
141+
142+
```go
143+
cfg := replication.BinlogSyncerConfig {
144+
ServerID: 100,
145+
Flavor: "mariadb",
146+
Host: "127.0.0.1",
147+
Port: 3306,
148+
User: "root",
149+
Password: "",
150+
// Enable dynamic LogPos calculation for MariaDB 11.4+
151+
FillZeroLogPos: true,
152+
}
153+
```
154+
155+
**Behavior:**
156+
- When `FillZeroLogPos` is `true` and flavor is `mariadb`, the library automatically:
157+
- Adds `BINLOG_SEND_ANNOTATE_ROWS_EVENT` flag to binlog dump commands. This ensures correct position tracking by making the server send `ANNOTATE_ROWS_EVENT` events which are needed for accurate position calculation.
158+
- Calculates LogPos dynamically for events with `LogPos=0` that are not artificial.
159+
- Only works with MariaDB flavor; has no effect with MySQL.
160+
- Should be set to `true` if tracking of LogPos inside transactions is required.
161+
162+
## Canal
137163

138164
Canal is a package that can sync your MySQL into everywhere, like Redis, Elasticsearch.
139165

140-
First, canal will dump your MySQL data then sync changed data using binlog incrementally.
166+
First, canal will dump your MySQL data then sync changed data using binlog incrementally.
141167

142-
You must use ROW format for binlog, full binlog row image is preferred, because we may meet some errors when primary key changed in update for minimal or noblob row image.
168+
You must use ROW format for binlog, full binlog row image is preferred, because we may meet some errors when primary key changed in update for minimal or noblob row image.
143169

144170
A simple example:
145171

@@ -188,9 +214,9 @@ You can see [go-mysql-elasticsearch](https://github.com/go-mysql-org/go-mysql-el
188214

189215
## Client
190216

191-
Client package supports a simple MySQL connection driver which you can use it to communicate with MySQL server.
217+
Client package supports a simple MySQL connection driver which you can use it to communicate with MySQL server.
192218

193-
For an example see [`example_client_test.go`](client/example_client_test.go). You can run this testable example with
219+
For an example see [`example_client_test.go`](client/example_client_test.go). You can run this testable example with
194220
`go test -v ./client -run Example`.
195221

196222
Tested MySQL versions for the client include:
@@ -237,7 +263,7 @@ conn.Execute() / conn.Begin() / etc...
237263

238264
## Server
239265

240-
Server package supplies a framework to implement a simple MySQL server which can handle the packets from the MySQL client.
266+
Server package supplies a framework to implement a simple MySQL server which can handle the packets from the MySQL client.
241267
You can use it to build your own MySQL proxy. The server connection is compatible with MySQL 5.5, 5.6, 5.7, and 8.0 versions,
242268
so that most MySQL clients should be able to connect to the Server without modifications.
243269

@@ -493,7 +519,7 @@ We pass all tests in https://github.com/bradfitz/go-sql-test using go-mysql driv
493519

494520
Logging uses [log/slog](https://pkg.go.dev/log/slog) and by default is sent to standard out.
495521

496-
For the old logging package `github.com/siddontang/go-log/log`, a converting package
522+
For the old logging package `github.com/siddontang/go-log/log`, a converting package
497523
`https://github.com/serprex/slog-siddontang` is available.
498524
## How to migrate to this repo
499525
To change the used package in your repo it's enough to add this `replace` directive to your `go.mod`:

canal/canal.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type Canal struct {
5151
includeTableRegex []*regexp.Regexp
5252
excludeTableRegex []*regexp.Regexp
5353

54-
delay *uint32
54+
delay atomic.Uint32
5555

5656
ctx context.Context
5757
cancel context.CancelFunc
@@ -85,8 +85,6 @@ func NewCanal(cfg *Config) (*Canal, error) {
8585
}
8686
c.master = &masterInfo{logger: c.cfg.Logger}
8787

88-
c.delay = new(uint32)
89-
9088
var err error
9189

9290
if err = c.prepareDumper(); err != nil {
@@ -195,7 +193,7 @@ func (c *Canal) prepareDumper() error {
195193
}
196194

197195
func (c *Canal) GetDelay() uint32 {
198-
return atomic.LoadUint32(c.delay)
196+
return c.delay.Load()
199197
}
200198

201199
// Run will first try to dump all data from MySQL master `mysqldump`,
@@ -468,6 +466,8 @@ func (c *Canal) prepareSyncer() error {
468466
Dialer: c.cfg.Dialer,
469467
Localhost: c.cfg.Localhost,
470468
EventCacheCount: c.cfg.EventCacheCount,
469+
FillZeroLogPos: c.cfg.FillZeroLogPos,
470+
471471
RowsEventDecodeFunc: func(event *replication.RowsEvent, data []byte) error {
472472
pos, err := event.DecodeHeader(data)
473473
if err != nil {

canal/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ type Config struct {
113113
// the default value is 10240.
114114
// if you table contain large columns, you can decrease this value to avoid OOM.
115115
EventCacheCount int
116+
117+
// FillZeroLogPos enables dynamic LogPos calculation for MariaDB.
118+
// When enabled, automatically adds BINLOG_SEND_ANNOTATE_ROWS_EVENT flag
119+
// to ensure correct position calculation in MariaDB 11.4+.
120+
// Only works with MariaDB flavor.
121+
FillZeroLogPos bool
116122
}
117123

118124
func NewConfigWithFile(name string) (*Config, error) {

canal/sync.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package canal
22

33
import (
44
"log/slog"
5-
"sync/atomic"
65
"time"
76

87
"github.com/go-mysql-org/go-mysql/mysql"
@@ -259,7 +258,7 @@ func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) {
259258
if now >= ev.Header.Timestamp {
260259
newDelay = now - ev.Header.Timestamp
261260
}
262-
atomic.StoreUint32(c.delay, newDelay)
261+
c.delay.Store(newDelay)
263262
}
264263

265264
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {

client/auth.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ func (c *Conn) writeAuthHandshake() error {
222222
c.ccaps&mysql.CLIENT_COMPRESS | c.ccaps&mysql.CLIENT_ZSTD_COMPRESSION_ALGORITHM |
223223
c.ccaps&mysql.CLIENT_LOCAL_FILES
224224

225+
capability &^= c.clientExplicitOffCaps
226+
225227
// To enable TLS / SSL
226228
if c.tlsConfig != nil {
227229
capability |= mysql.CLIENT_SSL

client/conn.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ type Conn struct {
4545
capability uint32
4646
// client-set capabilities only
4747
ccaps uint32
48+
// Capability flags explicitly disabled by the client via UnsetCapability()
49+
// These flags are removed from the final advertised capability set during handshake.
50+
clientExplicitOffCaps uint32
4851

4952
attributes map[string]string
5053

@@ -234,14 +237,17 @@ func (c *Conn) Ping() error {
234237
return nil
235238
}
236239

237-
// SetCapability enables the use of a specific capability
240+
// SetCapability marks the specified flag as explicitly enabled by the client.
238241
func (c *Conn) SetCapability(cap uint32) {
239242
c.ccaps |= cap
243+
c.clientExplicitOffCaps &^= cap
240244
}
241245

242-
// UnsetCapability disables the use of a specific capability
246+
// UnsetCapability marks the specified flag as explicitly disabled by the client.
247+
// This disables the flag even if the server supports it.
243248
func (c *Conn) UnsetCapability(cap uint32) {
244-
c.ccaps &= ^cap
249+
c.ccaps &^= cap
250+
c.clientExplicitOffCaps |= cap
245251
}
246252

247253
// HasCapability returns true if the connection has the specific capability

replication/backup.go

Lines changed: 89 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,20 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p mysql.Position, timeout t
2727
}
2828
}
2929

30+
func (b *BinlogSyncer) StartBackupGTID(backupDir string, gset mysql.GTIDSet, timeout time.Duration) error {
31+
err := os.MkdirAll(backupDir, 0o755)
32+
if err != nil {
33+
return errors.Trace(err)
34+
}
35+
if b.cfg.SynchronousEventHandler == nil {
36+
return b.StartBackupWithHandlerAndGTID(gset, timeout, func(filename string) (io.WriteCloser, error) {
37+
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0o644)
38+
})
39+
} else {
40+
return b.StartSynchronousBackupWithGTID(gset, timeout)
41+
}
42+
}
43+
3044
// StartBackupWithHandler starts the backup process for the binary log using the specified position and handler.
3145
// The process will continue until the timeout is reached or an error occurs.
3246
// This method should not be used together with SynchronousEventHandler.
@@ -54,52 +68,72 @@ func (b *BinlogSyncer) StartBackupWithHandler(p mysql.Position, timeout time.Dur
5468
backupHandler := &BackupEventHandler{
5569
handler: handler,
5670
}
57-
5871
s, err := b.StartSync(p)
5972
if err != nil {
6073
return errors.Trace(err)
6174
}
75+
return processWithHandler(b, s, backupHandler, timeout)
76+
}
6277

63-
defer func() {
64-
if backupHandler.w != nil {
65-
closeErr := backupHandler.w.Close()
66-
if retErr == nil {
67-
retErr = closeErr
68-
}
69-
}
70-
}()
78+
// StartBackupWithHandlerAndGTID starts the backup process for the binary log using the specified GTID set and handler.
79+
// - gset: The GTID set from which to begin the backup.
80+
// - timeout: The maximum duration to wait for new binlog events before stopping the backup process.
81+
// If set to 0, a default very long timeout (30 days) is used instead.
82+
// - handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to.
83+
func (b *BinlogSyncer) StartBackupWithHandlerAndGTID(gset mysql.GTIDSet, timeout time.Duration,
84+
handler func(binlogFilename string) (io.WriteCloser, error),
85+
) (retErr error) {
86+
if timeout == 0 {
87+
// a very long timeout here
88+
timeout = 30 * 3600 * 24 * time.Second
89+
}
90+
if b.cfg.SynchronousEventHandler != nil {
91+
return errors.New("StartBackupWithHandlerAndGTID cannot be used when SynchronousEventHandler is set. Use StartSynchronousBackupWithGTID instead.")
92+
}
7193

72-
ctx, cancel := context.WithTimeout(context.Background(), timeout)
73-
defer cancel()
94+
// Force use raw mode
95+
b.parser.SetRawMode(true)
7496

75-
for {
76-
select {
77-
case <-ctx.Done():
78-
return nil
79-
case <-b.ctx.Done():
80-
return nil
81-
case err := <-s.ech:
82-
return errors.Trace(err)
83-
case e := <-s.ch:
84-
err = backupHandler.HandleEvent(e)
85-
if err != nil {
86-
return errors.Trace(err)
87-
}
88-
}
97+
// Set up the backup event handler
98+
backupHandler := &BackupEventHandler{
99+
handler: handler,
100+
}
101+
102+
s, err := b.StartSyncGTID(gset)
103+
if err != nil {
104+
return errors.Trace(err)
89105
}
106+
return processWithHandler(b, s, backupHandler, timeout)
90107
}
91108

92109
// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig.
93110
func (b *BinlogSyncer) StartSynchronousBackup(p mysql.Position, timeout time.Duration) error {
94111
if b.cfg.SynchronousEventHandler == nil {
95112
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup")
96113
}
97-
98114
s, err := b.StartSync(p)
99115
if err != nil {
100116
return errors.Trace(err)
101117
}
102118

119+
return process(b, s, timeout)
120+
}
121+
122+
// StartSynchronousBackupWithGTID starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig with a specified GTID set.
123+
func (b *BinlogSyncer) StartSynchronousBackupWithGTID(gset mysql.GTIDSet, timeout time.Duration) error {
124+
if b.cfg.SynchronousEventHandler == nil {
125+
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackupWithGTID")
126+
}
127+
128+
s, err := b.StartSyncGTID(gset)
129+
if err != nil {
130+
return errors.Trace(err)
131+
}
132+
133+
return process(b, s, timeout)
134+
}
135+
136+
func process(b *BinlogSyncer, s *BinlogStreamer, timeout time.Duration) error {
103137
var ctx context.Context
104138
var cancel context.CancelFunc
105139

@@ -123,6 +157,35 @@ func (b *BinlogSyncer) StartSynchronousBackup(p mysql.Position, timeout time.Dur
123157
}
124158
}
125159

160+
func processWithHandler(b *BinlogSyncer, s *BinlogStreamer, backupHandler *BackupEventHandler, timeout time.Duration) (retErr error) {
161+
defer func() {
162+
if backupHandler.w != nil {
163+
closeErr := backupHandler.w.Close()
164+
if retErr == nil {
165+
retErr = closeErr
166+
}
167+
}
168+
}()
169+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
170+
defer cancel()
171+
172+
for {
173+
select {
174+
case <-ctx.Done():
175+
return nil
176+
case <-b.ctx.Done():
177+
return nil
178+
case err := <-s.ech:
179+
return errors.Trace(err)
180+
case e := <-s.ch:
181+
err := backupHandler.HandleEvent(e)
182+
if err != nil {
183+
return errors.Trace(err)
184+
}
185+
}
186+
}
187+
}
188+
126189
// BackupEventHandler handles writing events for backup
127190
type BackupEventHandler struct {
128191
handler func(binlogFilename string) (io.WriteCloser, error)

0 commit comments

Comments
 (0)