From 810316a7fa658356836b35a82915150d2a208460 Mon Sep 17 00:00:00 2001 From: Arunprasad Rajkumar Date: Fri, 3 Nov 2023 13:28:06 +0000 Subject: [PATCH] Remove the usage of txn metadata file This commit removes the usage of transaction metadata file. Initially, it was used by the apply process to bypass transactions that were already applied. However, this approach had its challenges. Specifically, in live replay mode, a transaction with numerous statements could fill the UNIX PIPE (an IPC primitive used in replay mode), leading to a potential deadlock. This is because the apply process would be waiting for the transaction metadata file. By eliminating the transaction metadata file, the apply process lets the transaction proceed and decides whether to apply or skip it based on the commit LSN during the commit phase. Signed-off-by: Arunprasad Rajkumar --- src/bin/pgcopydb/ld_apply.c | 274 +++++++++++++------------------- src/bin/pgcopydb/ld_stream.h | 10 +- src/bin/pgcopydb/ld_transform.c | 94 ++--------- 3 files changed, 129 insertions(+), 249 deletions(-) diff --git a/src/bin/pgcopydb/ld_apply.c b/src/bin/pgcopydb/ld_apply.c index 5a41e3050..68f8b5bf6 100644 --- a/src/bin/pgcopydb/ld_apply.c +++ b/src/bin/pgcopydb/ld_apply.c @@ -650,16 +650,31 @@ stream_apply_sql(StreamApplyContext *context, case STREAM_ACTION_SWITCH: { log_debug("SWITCH from %X/%X to %X/%X", - LSN_FORMAT_ARGS(context->previousLSN), + LSN_FORMAT_ARGS(context->switchLSN), LSN_FORMAT_ARGS(metadata->lsn)); - context->previousLSN = metadata->lsn; + context->switchLSN = metadata->lsn; break; } case STREAM_ACTION_BEGIN: { + /* + * Abort the previous transaction, it might be due to + * restart or resuming from a previous run. + */ + if (context->transactionInProgress) + { + if (!pgsql_execute(pgsql, "ROLLBACK")) + { + /* errors have already been logged */ + return false; + } + + context->transactionInProgress = false; + } + if (metadata->lsn == InvalidXLogRecPtr || IS_EMPTY_STRING_BUFFER(metadata->timestamp)) { @@ -667,41 +682,49 @@ stream_apply_sql(StreamApplyContext *context, return false; } + /* + * Few a time, BEGIN won't have a txnCommitLSN for the txn which + * spread across multiple WAL segments. We call that txn as + * a continuedTxn and allow it to be replayed until we encounter + * a COMMIT message. + * + * The lsn of a COMMIT message determines whether to keep txn or + * abort. + */ + context->continuedTxn = metadata->txnCommitLSN == InvalidXLogRecPtr; + /* did we reach the starting LSN positions now? */ if (!context->reachedStartPos) { /* * compare previousLSN with COMMIT LSN to safely include - * complete transactions while skipping already applied changes. + * complete transactions while skipping already applied + * changes. * - * this is particularly useful at the beginnig where BEGIN LSN - * of some transactions could be less than `consistent_point`, - * but COMMIT LSN of those transactions is guaranteed to be - * greater. + * this is particularly useful at the beginnig where + * BEGIN LSN of some transactions could be less than + * `consistent_point`, but COMMIT LSN of those transactions + * is guaranteed to be greater. * - * in case of interruption and this is the first transaction to - * be applied, previousLSN should be equal to the last - * transaction's COMMIT LSN or the LSN of non-transaction - * action. Therefore, this condition will still hold true. + * in case of interruption and this is the first + * transaction to be applied, previousLSN should be equal + * to the last transaction's COMMIT LSN or the LSN of + * non-transaction action. Therefore, this condition will + * still hold true. */ - - if (!readTxnCommitLSN(context, metadata)) - { - /* errors have already been logged */ - return false; - } - context->reachedStartPos = context->previousLSN < metadata->txnCommitLSN; } + bool skip = !context->reachedStartPos && !context->continuedTxn; + log_debug("BEGIN %lld LSN %X/%X @%s, previous LSN %X/%X, COMMIT LSN %X/%X %s", (long long) metadata->xid, LSN_FORMAT_ARGS(metadata->lsn), metadata->timestamp, LSN_FORMAT_ARGS(context->previousLSN), LSN_FORMAT_ARGS(metadata->txnCommitLSN), - context->reachedStartPos ? "" : "[skipping]"); + skip ? "[skipping]" : ""); /* * Check if we reached the endpos LSN already. @@ -719,7 +742,7 @@ stream_apply_sql(StreamApplyContext *context, } /* actually skip this one if we didn't reach start pos yet */ - if (!context->reachedStartPos) + if (skip) { return true; } @@ -743,6 +766,7 @@ stream_apply_sql(StreamApplyContext *context, */ bool commitLSNreachesEndPos = context->endpos != InvalidXLogRecPtr && + !context->continuedTxn && context->endpos <= metadata->txnCommitLSN; GUC *settings = @@ -772,8 +796,34 @@ stream_apply_sql(StreamApplyContext *context, case STREAM_ACTION_COMMIT: { + context->reachedStartPos = context->previousLSN < metadata->lsn; + if (!context->reachedStartPos) { + /* + * Abort if we are not yet reachedStartPos and txn is a + * continuedTxn. + */ + if (context->continuedTxn) + { + log_notice("Skip(abort) applied transaction %lld LSN %X/%X " + "@%s, previous LSN %X/%X", + (long long) metadata->xid, + LSN_FORMAT_ARGS(metadata->lsn), + metadata->timestamp, + LSN_FORMAT_ARGS(context->previousLSN)); + + /* Rollback the transaction */ + if (!pgsql_execute(pgsql, "ROLLBACK")) + { + /* errors have already been logged */ + return false; + } + /* Reset the transactionInProgress after abort */ + context->transactionInProgress = false; + context->continuedTxn = false; + } + return true; } @@ -842,7 +892,7 @@ stream_apply_sql(StreamApplyContext *context, case STREAM_ACTION_ENDPOS: { - if (!context->reachedStartPos) + if (!context->reachedStartPos && !context->continuedTxn) { return true; } @@ -851,14 +901,24 @@ stream_apply_sql(StreamApplyContext *context, LSN_FORMAT_ARGS(metadata->lsn), LSN_FORMAT_ARGS(context->previousLSN)); - context->previousLSN = metadata->lsn; + /* + * Don't update previousLSN if we are in a continuedTxn. + * + * Otherwise, during resume a continued txn having an endpos + * will update the previousLSN to endpos's LSN causing the + * current transaction to be applied again. + */ + if (!context->continuedTxn) + { + context->previousLSN = metadata->lsn; + } /* * It could be the current endpos, or the endpos of a previous * run. */ if (context->endpos != InvalidXLogRecPtr && - context->endpos <= context->previousLSN) + context->endpos <= metadata->lsn) { context->reachedEndPos = true; @@ -891,7 +951,7 @@ stream_apply_sql(StreamApplyContext *context, case STREAM_ACTION_KEEPALIVE: { /* did we reach the starting LSN positions now? */ - if (!context->reachedStartPos) + if (!context->reachedStartPos && !context->continuedTxn) { context->reachedStartPos = context->previousLSN < metadata->lsn; @@ -1006,7 +1066,11 @@ stream_apply_sql(StreamApplyContext *context, case STREAM_ACTION_UPDATE: case STREAM_ACTION_DELETE: { - if (!context->reachedStartPos) + /* + * We still allow continuedTxn, COMMIT message determines whether + * to keep the transaction or abort it. + */ + if (!context->reachedStartPos && !context->continuedTxn) { return true; } @@ -1045,7 +1109,11 @@ stream_apply_sql(StreamApplyContext *context, case STREAM_ACTION_EXECUTE: { - if (!context->reachedStartPos) + /* + * We still allow continuedTxn, COMMIT message determines whether + * to keep the transaction or abort it. + */ + if (!context->reachedStartPos && !context->continuedTxn) { return true; } @@ -1109,7 +1177,11 @@ stream_apply_sql(StreamApplyContext *context, case STREAM_ACTION_TRUNCATE: { - if (!context->reachedStartPos) + /* + * We still allow continuedTxn, COMMIT message determines whether + * to keep the transaction or abort it. + */ + if (!context->reachedStartPos && !context->continuedTxn) { return true; } @@ -1290,6 +1362,7 @@ stream_apply_init_context(StreamApplyContext *context, } context->reachedStartPos = false; + context->continuedTxn = false; context->reachedEOF = false; context->connStrings = connStrings; @@ -1309,15 +1382,26 @@ computeSQLFileName(StreamApplyContext *context) { XLogSegNo segno; + uint64_t switchLSN = context->switchLSN; + + /* + * If we haven't switched WAL yet, then we're still at the previousLSN + * position. + */ + if (switchLSN == InvalidXLogRecPtr) + { + switchLSN = context->previousLSN; + } + if (context->WalSegSz == 0) { log_error("Failed to compute the SQL filename for LSN %X/%X " "without context->wal_segment_size", - LSN_FORMAT_ARGS(context->previousLSN)); + LSN_FORMAT_ARGS(switchLSN)); return false; } - XLByteToSeg(context->previousLSN, segno, context->WalSegSz); + XLByteToSeg(switchLSN, segno, context->WalSegSz); XLogFileName(context->wal, context->system.timeline, segno, context->WalSegSz); sformat(context->sqlFileName, sizeof(context->sqlFileName), @@ -1326,7 +1410,7 @@ computeSQLFileName(StreamApplyContext *context) context->wal); log_debug("computeSQLFileName: %X/%X \"%s\"", - LSN_FORMAT_ARGS(context->previousLSN), + LSN_FORMAT_ARGS(switchLSN), context->sqlFileName); return true; @@ -1510,136 +1594,6 @@ parseSQLAction(const char *query, LogicalMessageMetadata *metadata) } -/* - * readTxnCommitLSN ensures metadata has transaction COMMIT LSN by fetching it - * from metadata file if it is not present - */ -bool -readTxnCommitLSN(StreamApplyContext *context, - LogicalMessageMetadata *metadata) -{ - /* if txnCommitLSN is invalid, then fetch it from txn metadata file */ - if (metadata->txnCommitLSN != InvalidXLogRecPtr) - { - return true; - } - - char txnfilename[MAXPGPATH] = { 0 }; - - if (!computeTxnMetadataFilename(metadata->xid, - context->paths.dir, - txnfilename)) - { - /* errors have already been logged */ - return false; - } - - log_debug("stream_apply_sql: BEGIN message without a commit LSN, " - "fetching commit LSN from transaction metadata file \"%s\"", - txnfilename); - - LogicalMessageMetadata txnMetadata = { .xid = metadata->xid }; - - if (!parseTxnMetadataFile(txnfilename, &txnMetadata)) - { - /* errors have already been logged */ - return false; - } - - metadata->txnCommitLSN = txnMetadata.txnCommitLSN; - - return true; -} - - -/* - * parseTxnMetadataFile returns the transaction metadata content for the given - * metadata filename. - */ -bool -parseTxnMetadataFile(const char *filename, LogicalMessageMetadata *metadata) -{ - /* store xid as it will be overwritten while parsing metadata */ - uint32_t xid = metadata->xid; - - if (xid == 0) - { - log_error("BUG: parseTxnMetadataFile is called with " - "transaction xid: %lld", (long long) xid); - return false; - } - - /* - * Read the transaction metadata file created by the transform process for - * transactions spanning multiple WAL files. The metadata json file is - * generated upon encountering the COMMIT statement, but it may take some - * time to become available for transformation. Therefore, we retry here. - */ - - ConnectionRetryPolicy retryPolicy = { 0 }; - - int maxT = 900; /* 15 mins */ - int maxSleepTime = 3000; /* 2s */ - int baseSleepTime = 100; /* 100ms */ - - (void) pgsql_set_retry_policy(&retryPolicy, - maxT, - -1, /* unbounded number of attempts */ - maxSleepTime, - baseSleepTime); - - while (!pgsql_retry_policy_expired(&retryPolicy)) - { - if (file_exists(filename)) - { - break; - } - - int sleepTimeMs = - pgsql_compute_connection_retry_sleep_time(&retryPolicy); - - log_debug("parseTxnMetadataFile: waiting for transaction metadata " - "file %s to be created, retrying in %dms", - filename, sleepTimeMs); - - /* we have milliseconds, pg_usleep() wants microseconds */ - (void) pg_usleep(sleepTimeMs * 1000); - } - - char *txnMetadataContent = NULL; - long size = 0L; - - /* we don't want to retry anymore, error out if files still don't exist */ - if (!read_file(filename, &txnMetadataContent, &size)) - { - /* errors have already been logged */ - return false; - } - - JSON_Value *json = json_parse_string(txnMetadataContent); - - if (!parseMessageMetadata(metadata, txnMetadataContent, json, true)) - { - /* errors have already been logged */ - json_value_free(json); - return false; - } - - json_value_free(json); - - if (metadata->txnCommitLSN == InvalidXLogRecPtr || - metadata->xid != xid || - IS_EMPTY_STRING_BUFFER(metadata->timestamp)) - { - log_error("Failed to parse metadata for transaction metadata file " - "%s: %s", filename, txnMetadataContent); - return false; - } - - return true; -} - - /* * stream_apply_track_insert_lsn tracks the current pg_current_wal_insert_lsn() * location on the target system right after a COMMIT; of a transaction that diff --git a/src/bin/pgcopydb/ld_stream.h b/src/bin/pgcopydb/ld_stream.h index c5987996c..891dc2641 100644 --- a/src/bin/pgcopydb/ld_stream.h +++ b/src/bin/pgcopydb/ld_stream.h @@ -368,6 +368,7 @@ typedef struct StreamApplyContext uint32_t WalSegSz; /* information about source database */ uint64_t previousLSN; /* register COMMIT LSN progress */ + uint64_t switchLSN; /* register COMMIT LSN progress */ LSNTracking *lsnTrackingList; @@ -377,6 +378,7 @@ typedef struct StreamApplyContext uint64_t replay_lsn; /* from the pgcopydb sentinel */ bool reachedStartPos; + bool continuedTxn; bool reachedEndPos; bool reachedEOF; bool transactionInProgress; @@ -610,11 +612,6 @@ bool parseMessage(StreamContext *privateContext, char *message, JSON_Value *json bool streamLogicalTransactionAppendStatement(LogicalTransaction *txn, LogicalTransactionStatement *stmt); -bool computeTxnMetadataFilename(uint32_t xid, - const char *dir, - char *filename); -bool writeTxnMetadataFile(LogicalTransaction *txn, const char *dir); - void FreeLogicalMessage(LogicalMessage *msg); void FreeLogicalTransaction(LogicalTransaction *tx); void FreeLogicalMessageTupleArray(LogicalMessageTupleArray *tupleArray); @@ -670,9 +667,6 @@ bool setupReplicationOrigin(StreamApplyContext *context, bool logSQL); bool computeSQLFileName(StreamApplyContext *context); bool parseSQLAction(const char *query, LogicalMessageMetadata *metadata); -bool readTxnCommitLSN(StreamApplyContext *context, - LogicalMessageMetadata *metadata); -bool parseTxnMetadataFile(const char *filename, LogicalMessageMetadata *metadata); bool stream_apply_track_insert_lsn(StreamApplyContext *context, uint64_t sourceLSN); diff --git a/src/bin/pgcopydb/ld_transform.c b/src/bin/pgcopydb/ld_transform.c index 7f4ab88b4..c86ccb272 100644 --- a/src/bin/pgcopydb/ld_transform.c +++ b/src/bin/pgcopydb/ld_transform.c @@ -377,18 +377,6 @@ stream_transform_write_message(StreamContext *privateContext, return false; } - /* - * If we're in a continued transaction, it means that the earlier write - * of this txn's BEGIN statement didn't have the COMMIT LSN. Therefore, - * we need to maintain that LSN as a separate metadata file. This is - * necessary because the COMMIT LSN is required later in the apply - * process. - */ - if (txn->continued && txn->commit) - { - writeTxnMetadataFile(txn, privateContext->paths.dir); - } - (void) FreeLogicalMessage(currentMsg); if (metadata->action == STREAM_ACTION_COMMIT) @@ -1074,9 +1062,19 @@ parseMessage(StreamContext *privateContext, char *message, JSON_Value *json) { if (mesg->isTransaction) { - log_error("Failed to parse BEGIN: " - "transaction already in progress"); - return false; + txn = &(mesg->command.tx); + + log_notice("Ignore incomplete transaction xid %lld " + "at %X/%X", + (long long) txn->xid, + LSN_FORMAT_ARGS(txn->beginLSN)); + + (void) FreeLogicalMessage(mesg); + + /* then prepare a new one, reusing the same memory area */ + LogicalMessage empty = { 0 }; + + *mesg = empty; } mesg->isTransaction = true; @@ -2583,69 +2581,3 @@ LogicalMessageValueEq(LogicalMessageValue *a, LogicalMessageValue *b) /* makes compiler happy */ return false; } - - -/* - * computeTxnMetadataFilename computes the file path for transaction metadata - * based on its transaction id - */ -bool -computeTxnMetadataFilename(uint32_t xid, const char *dir, char *filename) -{ - if (dir == NULL) - { - log_error("BUG: computeTxnMetadataFilename is called with " - "directory: NULL"); - return false; - } - - if (xid == 0) - { - log_error("BUG: computeTxnMetadataFilename is called with " - "transaction xid: %lld", (long long) xid); - return false; - } - - sformat(filename, MAXPGPATH, "%s/%lld.json", dir, (long long) xid); - - return true; -} - - -/* - * writeTxnMetadataFile writes the transaction metadata to a file in the given - * directory - */ -bool -writeTxnMetadataFile(LogicalTransaction *txn, const char *dir) -{ - char txnfilename[MAXPGPATH] = { 0 }; - - if (!computeTxnMetadataFilename(txn->xid, dir, txnfilename)) - { - /* errors have already been logged */ - return false; - } - - log_debug("stream_write_commit_metadata_file: writing transaction " - "metadata file \"%s\" with commit lsn %X/%X", - txnfilename, - LSN_FORMAT_ARGS(txn->commitLSN)); - - char contents[BUFSIZE] = { 0 }; - - sformat(contents, BUFSIZE, - "{\"xid\":%lld,\"commit_lsn\":\"%X/%X\",\"timestamp\":\"%s\"}\n", - (long long) txn->xid, - LSN_FORMAT_ARGS(txn->commitLSN), - txn->timestamp); - - /* write the metadata to txnfilename */ - if (!write_file(contents, strlen(contents), txnfilename)) - { - log_error("Failed to write file \"%s\"", txnfilename); - return false; - } - - return true; -}