From dcc51ad9709cf6561efc993a8e2a980e1b73f359 Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Mon, 6 Nov 2023 15:33:23 +0100 Subject: [PATCH] Process ENDPOS in transform and replay processes. When the ENDPOS internal message is received from the PIPEs in replay mode, check the current sentinel.endpos value on the source database to make sure the ENDPOS message matches with the current setting, and that being the case stop processing. --- src/bin/pgcopydb/file_utils.c | 24 ++++++++++++++--- src/bin/pgcopydb/ld_replay.c | 30 +++++++++++++++++++++ src/bin/pgcopydb/ld_transform.c | 47 ++++++++++++++++++++++++++++----- 3 files changed, 91 insertions(+), 10 deletions(-) diff --git a/src/bin/pgcopydb/file_utils.c b/src/bin/pgcopydb/file_utils.c index 1febdcc1a..892c73376 100644 --- a/src/bin/pgcopydb/file_utils.c +++ b/src/bin/pgcopydb/file_utils.c @@ -419,6 +419,7 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context) { int countFdsReadyToRead, nfds; /* see man select(2) */ fd_set readFileDescriptorSet; + fd_set exceptFileDescriptorSet; context->fd = fileno(stream); nfds = context->fd + 1; @@ -442,19 +443,28 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context) FD_ZERO(&readFileDescriptorSet); FD_SET(context->fd, &readFileDescriptorSet); + FD_ZERO(&exceptFileDescriptorSet); + FD_SET(context->fd, &exceptFileDescriptorSet); + countFdsReadyToRead = - select(nfds, &readFileDescriptorSet, NULL, NULL, &timeout); + select(nfds, + &readFileDescriptorSet, NULL, &exceptFileDescriptorSet, + &timeout); if (countFdsReadyToRead == -1) { + log_debug("countFdsReadyToRead == -1"); + if (errno == EINTR || errno == EAGAIN) { + log_debug("received EINTR or EAGAIN"); + if (asked_to_quit) { /* * When asked_to_stop || asked_to_stop_fast still continue * reading through EOF on the input stream, then quit - * normally. Here when + * normally. */ doneReading = true; } @@ -469,6 +479,14 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context) } } + if (FD_ISSET(context->fd, &exceptFileDescriptorSet)) + { + log_error("Failed to select on file descriptor %d: " + "an exceptional condition happened", + context->fd); + return false; + } + /* * When asked_to_stop || asked_to_stop_fast still continue reading * through EOF on the input stream, then quit normally. Here when @@ -480,7 +498,7 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context) if (asked_to_quit) { doneReading = true; - log_notice("read_from_stream was asked to stop or quit"); + log_notice("read_from_stream was asked to quit"); } continue; diff --git a/src/bin/pgcopydb/ld_replay.c b/src/bin/pgcopydb/ld_replay.c index 988417983..cf30737b9 100644 --- a/src/bin/pgcopydb/ld_replay.c +++ b/src/bin/pgcopydb/ld_replay.c @@ -217,6 +217,36 @@ stream_replay_line(void *ctx, const char *line, bool *stop) break; } + case STREAM_ACTION_ENDPOS: + { + PGSQL src = { 0 }; + char *dsn = context->connStrings->source_pguri; + + if (!pgsql_init(&src, dsn, PGSQL_CONN_SOURCE)) + { + /* errors have already been logged */ + return false; + } + + CopyDBSentinel sentinel = { 0 }; + + if (!pgsql_get_sentinel(&src, &sentinel)) + { + /* errors have already been logged */ + return false; + } + + if (sentinel.endpos <= metadata.lsn) + { + *stop = true; + context->reachedEndPos = true; + + log_info("Replay reached ENDPOS %X/%X", + LSN_FORMAT_ARGS(metadata.lsn)); + } + break; + } + /* skip reporting progress in other cases */ case STREAM_ACTION_BEGIN: case STREAM_ACTION_INSERT: diff --git a/src/bin/pgcopydb/ld_transform.c b/src/bin/pgcopydb/ld_transform.c index 2008a3fbf..a1c51110f 100644 --- a/src/bin/pgcopydb/ld_transform.c +++ b/src/bin/pgcopydb/ld_transform.c @@ -311,6 +311,34 @@ stream_transform_line(void *ctx, const char *line, bool *stop) return false; } } + /* at ENDPOS check that it's the current sentinel value and exit */ + else if (metadata->action == STREAM_ACTION_ENDPOS) + { + PGSQL src = { 0 }; + char *dsn = privateContext->connStrings->source_pguri; + + if (!pgsql_init(&src, dsn, PGSQL_CONN_SOURCE)) + { + /* errors have already been logged */ + return false; + } + + CopyDBSentinel sentinel = { 0 }; + + if (!pgsql_get_sentinel(&src, &sentinel)) + { + /* errors have already been logged */ + return false; + } + + if (sentinel.endpos <= metadata->lsn) + { + *stop = true; + + log_info("Transform process reached ENDPOS %X/%X", + LSN_FORMAT_ARGS(metadata->lsn)); + } + } if (privateContext->endpos != InvalidXLogRecPtr && privateContext->endpos <= metadata->lsn) @@ -1255,8 +1283,11 @@ coalesceLogicalTransactionStatement(LogicalTransaction *txn, { LogicalTransactionStatement *last = txn->last; - LogicalMessageValuesArray *lastValuesArray = &(last->stmt.insert.new.array->values); - LogicalMessageValuesArray *newValuesArray = &(new->stmt.insert.new.array->values); + LogicalMessageValuesArray *lastValuesArray = + &(last->stmt.insert.new.array->values); + + LogicalMessageValuesArray *newValuesArray = + &(new->stmt.insert.new.array->values); int capacity = lastValuesArray->capacity; LogicalMessageValues *array = lastValuesArray->array; @@ -1275,8 +1306,9 @@ coalesceLogicalTransactionStatement(LogicalTransaction *txn, * and potential heap memory fragmentation. */ capacity *= 2; - array = (LogicalMessageValues *) realloc(array, sizeof(LogicalMessageValues) * - capacity); + array = (LogicalMessageValues *) + realloc(array, sizeof(LogicalMessageValues) * capacity); + if (array == NULL) { log_error(ALLOCATION_FAILED_ERROR); @@ -1318,7 +1350,8 @@ canCoalesceLogicalTransactionStatement(LogicalTransaction *txn, LogicalTransactionStatement *last = txn->last; /* TODO: Support UPDATE and DELETE */ - if (last->action != STREAM_ACTION_INSERT || new->action != STREAM_ACTION_INSERT) + if (last->action != STREAM_ACTION_INSERT || + new->action != STREAM_ACTION_INSERT) { return false; } @@ -1327,8 +1360,8 @@ canCoalesceLogicalTransactionStatement(LogicalTransaction *txn, LogicalMessageInsert *newInsert = &new->stmt.insert; /* Last and current statements must target same relation */ - if (!streq(lastInsert->nspname, newInsert->nspname) || !streq(lastInsert->relname, - newInsert->relname)) + if (!streq(lastInsert->nspname, newInsert->nspname) || + !streq(lastInsert->relname, newInsert->relname)) { return false; }