Skip to content

Commit d49bebe

Browse files
authored
[MySQL] Extend replication keep alive mechanism. (#314)
* Removed zongji type mappings which are now provided by the Zongji package directly Added check for tablemap events * Moved most of the binlog event handling logic to a separate BinlogListener class. Introduced a mechanism to limit the maximum size of the binlog processing queue, thus also limiting memory usage. This maximum processing queue size is configurable * Updated the BinLogStream to use the new BinLogListener * Renamed BinlogListener to BinLogListener * Added changeset * Simplified BinLogListener stopping mechanism Cleaned up BinLogStream logs a bit * Corrected BinLogListener name. Simplified BinLogListener stopping mechanism * Supply port for binlog listener connections. * Only set up binlog heartbeat once the listener is fully started up. Added a few more defensive stopped checks to the binlog listener * Updated changeset * Changed binlog backpressure mechanism to be based on processing queue memory usage rather than number of events * Changed binlog backpressure mechanism to be based on processing queue memory usage rather than number of events. Introduced a maximum timeout that the binlog processing queue can be paused before auto-resuming. This is to prevent the replication connection timing out. * Added optional columns field to SourceEntityDescriptor Made SourceTable implement SourceEntityDescriptor interface * Cleanup unused imports * Ensure column values are preserved when available Report 0 storage metrics instead of ignoring them. SourceTable. Moved MySQL table detail retrieval logic to utility function. * Added basic schema change handling for MySQL * Revert columns field addition to SourceEntityDescriptor * Added schema change handling for the MySQL binlog replication. * Include powersync core version in metrics metadata * Code analysis cleanup * Merge conflicts * Fixed parser import * Fixed mysql->sqlite rows parsing that would filter out columns with null values * Cleaned up SchemaChange handling in BinLogListener Improved binlog table filtering Added extended type definitions for node-sql-parser package * Added schema change tests Cleaned up MySQL tests in general and added a few new test utils * Change binlog event receive log message to debug * Revert and fix mysql->sqlite row conversion for null value columns * Added conditional skip of mysql schema test for syntax that does not exist in version 5.7 * Fixed version checking for mysql 5.7 incompatible test * Fix skip test on mysql 5.7 schema change * Reverted mysql dev docker compose Updated to released zongji listener version * Moved schema change handling to processing queue Catch parsing errors, and log an error if the DDL query might apply to one of the tables in the sync rules. * Fixed bug where multiple zongji listeners could be started if multiple schema change events were in the processing queue Added small timeout to test to prevent rare race condition * Extended node-sql-parser type definitions Added util functions to identify the different types of DDL statements * - Simplified schema change types - Added more detections of constraint changes - Removed detection of create table statements since they can be detected and reacted to when row events are received for new tables - Added multiple extra test cases * Removed unused constant * Skip unsupported schema test for MySQL 5.7 * Added error handling for zongji emitted schema errors * Added changeset * Typo fixes from pr feedback * Removed filters from mysql dev docker config * Added safeguard for gtid splitting when no transactions have been run on the mysql database yet. * BinLog listener now correctly takes schema into account for replication. TableFilter creation is now internally handled in the BinLog listener Pause/unpause binlog listening now uses the same stop start functionality used for schema change handling. * BinLog stream now correctly honors multiple schemas in the sync rules. * Added tests for multi schema support * MySQL util fix post merge * Removed accidentally commited keepalive code in BinLogStream. * Cleaned up Binlog docs and comments a bit * Added keepalive support for MySQL replication. * Removed potentially spammy log entry. * Increased MySQL keepalive table timestamp granularity Added tests for keepalive Moved common test functions to utils * Added check to skip MySQL keepalive table creation if present and correctly set up. * Added success check for mysql keepalive * Reworked MySQL keepalive mechanism by hooking into heartbeat events on the binlog. * Bucket Keepalives now bump the syncrules keepalive timestamp even if the LSN stayed the same. * Switched keepalive logs to debug Added a few more comments for keepalives Updated to released Zongji package * Added changeset * Small cleanup of reverted code
1 parent c0bdbbe commit d49bebe

File tree

13 files changed

+251
-143
lines changed

13 files changed

+251
-143
lines changed

.changeset/wise-elephants-trade.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-module-mysql': minor
3+
'@powersync/service-module-postgres-storage': patch
4+
'@powersync/service-module-mongodb-storage': patch
5+
'@powersync/service-core': patch
6+
---
7+
8+
- Hooked up the MySQL binlog heartbeat events with the bucket batch keepalive mechanism.
9+
Heartbeat events will now update the latest keepalive timestamp in the sync rules.

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,7 @@ export class MongoBucketBatch
807807
}
808808

809809
async keepalive(lsn: string): Promise<boolean> {
810-
if (this.last_checkpoint_lsn != null && lsn <= this.last_checkpoint_lsn) {
810+
if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) {
811811
// No-op
812812
return false;
813813
}

modules/module-mysql/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
"@powersync/service-sync-rules": "workspace:*",
3434
"@powersync/service-types": "workspace:*",
3535
"@powersync/service-jsonbig": "workspace:*",
36-
"@powersync/mysql-zongji": "^0.4.0",
36+
"@powersync/mysql-zongji": "^0.5.0",
3737
"async": "^3.2.4",
3838
"mysql2": "^3.11.0",
3939
"node-sql-parser": "^5.3.9",

modules/module-mysql/src/replication/BinLogReplicationJob.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob {
2121
return this.options.storage.slot_name;
2222
}
2323

24-
async keepAlive() {}
24+
async keepAlive() {
25+
// Keepalives are handled by the binlog heartbeat mechanism
26+
}
2527

2628
async replicate() {
2729
try {
@@ -56,6 +58,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob {
5658
const connectionManager = this.connectionFactory.create({
5759
// Pool connections are only used intermittently.
5860
idleTimeout: 30_000,
61+
connectionLimit: 2,
5962

6063
connectAttributes: {
6164
// https://dev.mysql.com/doc/refman/8.0/en/performance-schema-connection-attribute-tables.html

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,6 @@ export class BinLogStream {
398398
const fromGTID = checkpoint_lsn
399399
? common.ReplicatedGTID.fromSerialized(checkpoint_lsn)
400400
: await common.readExecutedGtid(connection);
401-
const binLogPositionState = fromGTID.position;
402401
connection.release();
403402

404403
if (!this.stopped) {
@@ -409,7 +408,7 @@ export class BinLogStream {
409408
const binlogListener = new BinLogListener({
410409
logger: this.logger,
411410
sourceTables: this.syncRules.getSourceTables(),
412-
startPosition: binLogPositionState,
411+
startGTID: fromGTID,
413412
connectionManager: this.connections,
414413
serverId: serverId,
415414
eventHandler: binlogEventHandler
@@ -455,6 +454,12 @@ export class BinLogStream {
455454
tableEntry: tableMap
456455
});
457456
},
457+
onKeepAlive: async (lsn: string) => {
458+
const didCommit = await batch.keepalive(lsn);
459+
if (didCommit) {
460+
this.oldestUncommittedChange = null;
461+
}
462+
},
458463
onCommit: async (lsn: string) => {
459464
this.metrics.getCounter(ReplicationMetric.TRANSACTIONS_REPLICATED).add(1);
460465
const didCommit = await batch.commit(lsn, { oldestUncommittedChange: this.oldestUncommittedChange });

modules/module-mysql/src/replication/zongji/BinLogListener.ts

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ import { TablePattern } from '@powersync/service-sync-rules';
2929

3030
const { Parser } = pkg;
3131

32+
/**
33+
* Seconds of inactivity after which a keepalive event is sent by the MySQL server.
34+
*/
35+
export const KEEPALIVE_INACTIVITY_THRESHOLD = 30;
3236
export type Row = Record<string, any>;
3337

3438
/**
@@ -65,15 +69,17 @@ export interface BinLogEventHandler {
6569
onDelete: (rows: Row[], tableMap: TableMapEntry) => Promise<void>;
6670
onCommit: (lsn: string) => Promise<void>;
6771
onSchemaChange: (change: SchemaChange) => Promise<void>;
72+
onKeepAlive: (lsn: string) => Promise<void>;
6873
}
6974

7075
export interface BinLogListenerOptions {
7176
connectionManager: MySQLConnectionManager;
7277
eventHandler: BinLogEventHandler;
7378
sourceTables: TablePattern[];
7479
serverId: number;
75-
startPosition: common.BinLogPosition;
80+
startGTID: common.ReplicatedGTID;
7681
logger?: Logger;
82+
keepAliveInactivitySeconds?: number;
7783
}
7884

7985
/**
@@ -85,16 +91,19 @@ export class BinLogListener {
8591
private connectionManager: MySQLConnectionManager;
8692
private eventHandler: BinLogEventHandler;
8793
private binLogPosition: common.BinLogPosition;
88-
private currentGTID: common.ReplicatedGTID | null;
94+
private currentGTID: common.ReplicatedGTID;
8995
private logger: Logger;
9096
private listenerError: Error | null;
9197
private databaseFilter: { [schema: string]: (table: string) => boolean };
9298

99+
private isStopped: boolean = false;
100+
private isStopping: boolean = false;
101+
102+
// Flag to indicate if are currently in a transaction that involves multiple row mutation events.
103+
private isTransactionOpen = false;
93104
zongji: ZongJi;
94105
processingQueue: async.QueueObject<BinLogEvent>;
95106

96-
isStopped: boolean = false;
97-
isStopping: boolean = false;
98107
/**
99108
* The combined size in bytes of all the binlog events currently in the processing queue.
100109
*/
@@ -104,8 +113,8 @@ export class BinLogListener {
104113
this.logger = options.logger ?? defaultLogger;
105114
this.connectionManager = options.connectionManager;
106115
this.eventHandler = options.eventHandler;
107-
this.binLogPosition = options.startPosition;
108-
this.currentGTID = null;
116+
this.binLogPosition = options.startGTID.position;
117+
this.currentGTID = options.startGTID;
109118
this.sqlParser = new Parser();
110119
this.processingQueue = this.createProcessingQueue();
111120
this.zongji = this.createZongjiListener();
@@ -130,14 +139,13 @@ export class BinLogListener {
130139
`${isRestart ? 'Restarting' : 'Starting'} BinLog Listener with replica client id:${this.options.serverId}...`
131140
);
132141

133-
// Set a heartbeat interval for the Zongji replication connection
134-
// Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown
135-
// The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket.
142+
// Set a heartbeat interval for the Zongji replication connection, these events are enough to keep the connection
143+
// alive for setTimeout to work on the socket.
136144
// The heartbeat needs to be set before starting the listener, since the replication connection is locked once replicating
137145
await new Promise((resolve, reject) => {
138146
this.zongji.connection.query(
139147
// In nanoseconds, 10^9 = 1s
140-
'set @master_heartbeat_period=28*1000000000',
148+
`set @master_heartbeat_period=${this.options.keepAliveInactivitySeconds ?? KEEPALIVE_INACTIVITY_THRESHOLD}*1000000000`,
141149
(error: any, results: any, _fields: any) => {
142150
if (error) {
143151
reject(error);
@@ -158,9 +166,19 @@ export class BinLogListener {
158166
});
159167

160168
this.zongji.start({
161-
// We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive
162-
// tablemap events always need to be included for the other row events to work
163-
includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog', 'query'],
169+
// Tablemap events always need to be included for the other row events to work
170+
includeEvents: [
171+
'tablemap',
172+
'writerows',
173+
'updaterows',
174+
'deleterows',
175+
'xid',
176+
'rotate',
177+
'gtidlog',
178+
'query',
179+
'heartbeat',
180+
'heartbeat_v2'
181+
],
164182
includeSchema: this.databaseFilter,
165183
filename: this.binLogPosition.filename,
166184
position: this.binLogPosition.offset,
@@ -289,19 +307,24 @@ export class BinLogListener {
289307
this.logger.info(`Processed GTID event: ${this.currentGTID.comparable}`);
290308
break;
291309
case zongji_utils.eventIsRotation(evt):
292-
const newFile = this.binLogPosition.filename !== evt.binlogName;
310+
// The first event when starting replication is a synthetic Rotate event
311+
// It describes the last binlog file and position that the replica client processed
293312
this.binLogPosition.filename = evt.binlogName;
313+
this.binLogPosition.offset = evt.nextPosition !== 0 ? evt.nextPosition : evt.position;
294314
await this.eventHandler.onRotate();
295315

316+
const newFile = this.binLogPosition.filename !== evt.binlogName;
296317
if (newFile) {
297318
this.logger.info(
298319
`Processed Rotate event. New BinLog file is: ${this.binLogPosition.filename}:${this.binLogPosition.offset}`
299320
);
300321
}
322+
301323
break;
302324
case zongji_utils.eventIsWriteMutation(evt):
303325
const tableMap = evt.tableMap[evt.tableId];
304326
await this.eventHandler.onWrite(evt.rows, tableMap);
327+
this.binLogPosition.offset = evt.nextPosition;
305328
this.logger.info(
306329
`Processed Write event for table [${tableMap.parentSchema}.${tableMap.tableName}]. ${evt.rows.length} row(s) inserted.`
307330
);
@@ -312,20 +335,33 @@ export class BinLogListener {
312335
evt.rows.map((row) => row.before),
313336
evt.tableMap[evt.tableId]
314337
);
338+
this.binLogPosition.offset = evt.nextPosition;
315339
this.logger.info(
316340
`Processed Update event for table [${evt.tableMap[evt.tableId].tableName}]. ${evt.rows.length} row(s) updated.`
317341
);
318342
break;
319343
case zongji_utils.eventIsDeleteMutation(evt):
320344
await this.eventHandler.onDelete(evt.rows, evt.tableMap[evt.tableId]);
345+
this.binLogPosition.offset = evt.nextPosition;
321346
this.logger.info(
322347
`Processed Delete event for table [${evt.tableMap[evt.tableId].tableName}]. ${evt.rows.length} row(s) deleted.`
323348
);
324349
break;
350+
case zongji_utils.eventIsHeartbeat(evt):
351+
case zongji_utils.eventIsHeartbeat_v2(evt):
352+
// Heartbeats are sent by the master to keep the connection alive after a period of inactivity. They are synthetic
353+
// so are not written to the binlog. Consequently, they have no effect on the binlog position.
354+
// We forward these along with the current GTID to the event handler, but don't want to do this if a transaction is in progress.
355+
if (!this.isTransactionOpen) {
356+
await this.eventHandler.onKeepAlive(this.currentGTID.comparable);
357+
}
358+
this.logger.debug(`Processed Heartbeat event. Current GTID is: ${this.currentGTID.comparable}`);
359+
break;
325360
case zongji_utils.eventIsXid(evt):
361+
this.isTransactionOpen = false;
326362
this.binLogPosition.offset = evt.nextPosition;
327363
const LSN = new common.ReplicatedGTID({
328-
raw_gtid: this.currentGTID!.raw,
364+
raw_gtid: this.currentGTID.raw,
329365
position: this.binLogPosition
330366
}).comparable;
331367
await this.eventHandler.onCommit(LSN);
@@ -336,43 +372,44 @@ export class BinLogListener {
336372
break;
337373
}
338374

339-
// Update the binlog position after processing the event
340-
this.binLogPosition.offset = evt.nextPosition;
341375
this.queueMemoryUsage -= evt.size;
342376
};
343377
}
344378

345379
private async processQueryEvent(event: BinLogQueryEvent): Promise<void> {
346380
const { query, nextPosition } = event;
347381

348-
// BEGIN query events mark the start of a transaction before any row events. They are not relevant for schema changes
382+
// BEGIN query events mark the start of a transaction before any row events. They are not schema changes so no further parsing is necessary.
349383
if (query === 'BEGIN') {
384+
this.isTransactionOpen = true;
350385
return;
351386
}
352387

353388
const schemaChanges = this.toSchemaChanges(query, event.schema);
354389
if (schemaChanges.length > 0) {
355-
// Since handling the schema changes can take a long time, we need to stop the Zongji listener instead of pausing it.
390+
// Handling schema changes can take a long time, so we stop the Zongji listener whilst handling them to prevent the listener from timing out.
356391
await this.stopZongji();
357392

358393
for (const change of schemaChanges) {
359394
this.logger.info(`Processing schema change ${change.type} for table [${change.schema}.${change.table}]`);
360395
await this.eventHandler.onSchemaChange(change);
361396
}
362397

363-
// DDL queries are auto commited, but do not come with a corresponding Xid event.
364-
// This is problematic for DDL queries which result in row events because the checkpoint is not moved on,
365-
// so we manually commit here.
366-
this.binLogPosition.offset = nextPosition;
367-
const LSN = new common.ReplicatedGTID({
368-
raw_gtid: this.currentGTID!.raw,
369-
position: this.binLogPosition
370-
}).comparable;
371-
await this.eventHandler.onCommit(LSN);
398+
// DDL queries are auto commited, but do not come with a corresponding Xid event, in those cases we trigger a manual commit if we are not already in a transaction.
399+
// Some DDL queries include row events, and in those cases will include a Xid event.
400+
if (!this.isTransactionOpen) {
401+
this.binLogPosition.offset = nextPosition;
402+
const LSN = new common.ReplicatedGTID({
403+
raw_gtid: this.currentGTID.raw,
404+
position: this.binLogPosition
405+
}).comparable;
406+
await this.eventHandler.onCommit(LSN);
407+
}
372408

373409
this.logger.info(`Successfully processed ${schemaChanges.length} schema change(s).`);
374410

375411
// If there are still events in the processing queue, we need to process those before restarting Zongji
412+
// This avoids potentially processing the same events again after a restart.
376413
if (!this.processingQueue.idle()) {
377414
this.logger.info(`Processing [${this.processingQueue.length()}] events(s) before resuming...`);
378415
this.processingQueue.drain(async () => {
@@ -381,6 +418,13 @@ export class BinLogListener {
381418
} else {
382419
await this.restartZongji();
383420
}
421+
} else if (!this.isTransactionOpen) {
422+
this.binLogPosition.offset = nextPosition;
423+
const LSN = new common.ReplicatedGTID({
424+
raw_gtid: this.currentGTID.raw,
425+
position: this.binLogPosition
426+
}).comparable;
427+
await this.eventHandler.onCommit(LSN);
384428
}
385429
}
386430

modules/module-mysql/src/replication/zongji/zongji-utils.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import {
66
BinLogTableMapEvent,
77
BinLogRowUpdateEvent,
88
BinLogXidEvent,
9-
BinLogQueryEvent
9+
BinLogQueryEvent,
10+
BinLogHeartbeatEvent,
11+
BinLogHeartbeatEvent_V2
1012
} from '@powersync/mysql-zongji';
1113

1214
export function eventIsGTIDLog(event: BinLogEvent): event is BinLogGTIDLogEvent {
@@ -21,6 +23,14 @@ export function eventIsXid(event: BinLogEvent): event is BinLogXidEvent {
2123
return event.getEventName() == 'xid';
2224
}
2325

26+
export function eventIsHeartbeat(event: BinLogEvent): event is BinLogHeartbeatEvent {
27+
return event.getEventName() == 'heartbeat';
28+
}
29+
30+
export function eventIsHeartbeat_v2(event: BinLogEvent): event is BinLogHeartbeatEvent_V2 {
31+
return event.getEventName() == 'heartbeat_v2';
32+
}
33+
2434
export function eventIsRotation(event: BinLogEvent): event is BinLogRotationEvent {
2535
return event.getEventName() == 'rotate';
2636
}

0 commit comments

Comments
 (0)