Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/impl/mysql/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ const (
// snapshot rows have been sent so the checkpoint can advance once all snapshot batches
// are acknowledged
messageOperationSnapshotComplete MessageOperation = "snapshot_complete"

// messageOperationXID is an internal sentinel emitted when a transaction commits
// so readMessages can advance the checkpoint to a transaction boundary, ensuring
// we never resume mid-transaction (which would miss the TABLE_MAP_EVENT).
messageOperationXID MessageOperation = "xid"
)

// MessageEvent represents a message from mysql cdc plugin
Expand Down
39 changes: 24 additions & 15 deletions internal/impl/mysql/input_mysql_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,11 @@ func prepSnapshotScannerAndMappers(cols []*sql.ColumnType) (values []any, mapper

func (i *mysqlStreamInput) readMessages(ctx context.Context) error {
var nextTimedBatchChan <-chan time.Time
// latestXIDPos tracks the most recently committed transaction boundary.
// Checkpoints only advance to XID positions so that on restart canal.RunFrom
// always resumes at the start of a new transaction, ensuring TABLE_MAP_EVENTs
// are received before any row events.
var latestXIDPos *position
for {
select {
case <-ctx.Done():
Expand All @@ -788,17 +793,22 @@ func (i *mysqlStreamInput) readMessages(ctx context.Context) error {
return fmt.Errorf("timed flush batch error: %w", err)
}

if err := i.flushBatch(ctx, i.cp, flushedBatch); err != nil {
if err := i.flushBatch(ctx, i.cp, flushedBatch, latestXIDPos); err != nil {
return fmt.Errorf("flushing periodic batch: %w", err)
}
case me := <-i.rawMessageEvents:
if me.Operation == messageOperationXID {
latestXIDPos = me.Position
continue
}

if me.Operation == messageOperationSnapshotComplete {
// Flush any remaining messages before post snapshot checkpoint
flushedBatch, err := i.batchPolicy.Flush(ctx)
if err != nil {
return fmt.Errorf("flushing snapshot completion batch: %w", err)
}
if err := i.flushBatch(ctx, i.cp, flushedBatch); err != nil {
if err := i.flushBatch(ctx, i.cp, flushedBatch, latestXIDPos); err != nil {
return fmt.Errorf("flushing snapshot completion batch: %w", err)
}

Expand Down Expand Up @@ -841,7 +851,7 @@ func (i *mysqlStreamInput) readMessages(ctx context.Context) error {
if err != nil {
return fmt.Errorf("flush batch error: %w", err)
}
if err := i.flushBatch(ctx, i.cp, flushedBatch); err != nil {
if err := i.flushBatch(ctx, i.cp, flushedBatch, latestXIDPos); err != nil {
return fmt.Errorf("flushing batch: %w", err)
}
} else {
Expand All @@ -858,23 +868,13 @@ func (i *mysqlStreamInput) flushBatch(
ctx context.Context,
checkpointer *checkpoint.Capped[*position],
batch service.MessageBatch,
checkpointPos *position,
) error {
if len(batch) == 0 {
return nil
}

lastMsg := batch[len(batch)-1]
strPosition, ok := lastMsg.MetaGet("binlog_position")
var binLogPos *position
if ok {
pos, err := parseBinlogPosition(strPosition)
if err != nil {
return err
}
binLogPos = &pos
}

resolveFn, err := checkpointer.Track(ctx, binLogPos, int64(len(batch)))
resolveFn, err := checkpointer.Track(ctx, checkpointPos, int64(len(batch)))
if err != nil {
return fmt.Errorf("tracking checkpoint for batch: %w", err)
}
Expand Down Expand Up @@ -987,6 +987,15 @@ func (i *mysqlStreamInput) OnRotate(_ *replication.EventHeader, re *replication.
return nil
}

func (i *mysqlStreamInput) OnXID(_ *replication.EventHeader, nextPos gomysql.Position) error {
select {
case i.rawMessageEvents <- MessageEvent{Operation: messageOperationXID, Position: &nextPos}:
case <-i.shutSig.SoftStopChan():
return context.Canceled
}
return nil
}

// OnTableChanged is called when a table is created, altered, renamed, or dropped.
// We invalidate the cached schema so it will be re-extracted on the next row event.
func (i *mysqlStreamInput) OnTableChanged(_ *replication.EventHeader, schema, table string) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/mysql/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ file:
assert.Eventually(t, func() bool {
outBatchMut.Lock()
defer outBatchMut.Unlock()
return len(outBatches) == 1000
return len(outBatches) >= 1000
}, time.Minute*5, time.Millisecond*100)

require.NoError(t, streamOut.StopWithin(time.Second*10))
Expand Down