From 927c29d91dcec357dd3ca53031c7302b0f65a0cc Mon Sep 17 00:00:00 2001 From: Dimitri Fontaine Date: Mon, 30 Oct 2023 14:39:37 +0100 Subject: [PATCH] Update pgcopydb sentinel in the main follow process. To avoid infinite looping when endpos has been reached it's important to update our endpos value to the sentinel's one even in the main follow process. In passing, review some error messages. Also refrain from stopping early from reading data from a PIPE when a terminating signal is received, we should finish reading as per the comments in the code. --- src/bin/pgcopydb/file_utils.c | 9 ++++-- src/bin/pgcopydb/follow.c | 53 +++++++++++++++++++++++++++-------- src/bin/pgcopydb/ld_apply.c | 7 +++-- src/bin/pgcopydb/ld_stream.h | 4 ++- 4 files changed, 56 insertions(+), 17 deletions(-) diff --git a/src/bin/pgcopydb/file_utils.c b/src/bin/pgcopydb/file_utils.c index 055c31930..1febdcc1a 100644 --- a/src/bin/pgcopydb/file_utils.c +++ b/src/bin/pgcopydb/file_utils.c @@ -449,8 +449,13 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context) { if (errno == EINTR || errno == EAGAIN) { - if (asked_to_stop || asked_to_stop_fast || asked_to_quit) + if (asked_to_quit) { + /* + * When asked_to_stop || asked_to_stop_fast still continue + * reading through EOF on the input stream, then quit + * normally. Here when + */ doneReading = true; } @@ -472,7 +477,7 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context) */ if (countFdsReadyToRead == 0) { - if (asked_to_stop || asked_to_stop_fast || asked_to_quit) + if (asked_to_quit) { doneReading = true; log_notice("read_from_stream was asked to stop or quit"); diff --git a/src/bin/pgcopydb/follow.c b/src/bin/pgcopydb/follow.c index e3e6d0107..05f13d573 100644 --- a/src/bin/pgcopydb/follow.c +++ b/src/bin/pgcopydb/follow.c @@ -186,7 +186,7 @@ follow_init_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel) * values from the pgcopydb.sentinel table on the source database. */ bool -follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel) +follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel, bool verbose) { PGSQL pgsql = { 0 }; @@ -216,16 +216,19 @@ follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel) LSN_FORMAT_ARGS(sentinel->replay_lsn), LSN_FORMAT_ARGS(sentinel->endpos)); } - else if (sentinel->endpos != InvalidXLogRecPtr) + else if (verbose) { - log_info("Current sentinel replay_lsn is %X/%X, endpos is %X/%X", - LSN_FORMAT_ARGS(sentinel->replay_lsn), - LSN_FORMAT_ARGS(sentinel->endpos)); - } - else if (sentinel->replay_lsn != InvalidXLogRecPtr) - { - log_info("Current sentinel replay_lsn is %X/%X", - LSN_FORMAT_ARGS(sentinel->replay_lsn)); + if (sentinel->endpos != InvalidXLogRecPtr) + { + log_info("Current sentinel replay_lsn is %X/%X, endpos is %X/%X", + LSN_FORMAT_ARGS(sentinel->replay_lsn), + LSN_FORMAT_ARGS(sentinel->endpos)); + } + else if (sentinel->replay_lsn != InvalidXLogRecPtr) + { + log_info("Current sentinel replay_lsn is %X/%X", + LSN_FORMAT_ARGS(sentinel->replay_lsn)); + } } return true; @@ -375,9 +378,10 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs) bool follow_reached_endpos(StreamSpecs *streamSpecs, bool *done) { + bool verbose = true; CopyDBSentinel *sentinel = &(streamSpecs->sentinel); - if (!follow_get_sentinel(streamSpecs, sentinel)) + if (!follow_get_sentinel(streamSpecs, sentinel, verbose)) { log_error("Failed to get sentinel values"); return false; @@ -967,6 +971,26 @@ follow_wait_subprocesses(StreamSpecs *specs) if (processArray[i]->returnCode != 0 || specs->endpos == InvalidXLogRecPtr) { + char endposStatus[BUFSIZE] = { 0 }; + + if (specs->endpos == InvalidXLogRecPtr) + { + sformat(endposStatus, sizeof(endposStatus), "unset"); + } + else + { + sformat(endposStatus, sizeof(endposStatus), + "set to %X/%X", + LSN_FORMAT_ARGS(specs->endpos)); + } + + log_notice("Process %s has exited with return code %d, " + "and endpos is %s: " + "terminating other processes", + processArray[i]->name, + processArray[i]->returnCode, + endposStatus); + if (!follow_terminate_subprocesses(specs)) { log_error("Failed to terminate other subprocesses, " @@ -979,6 +1003,13 @@ follow_wait_subprocesses(StreamSpecs *specs) } } + /* update current sentinel values (endpos) */ + if (!follow_get_sentinel(specs, &(specs->sentinel), false)) + { + /* ignore errors here, supervisor can't quit before sub-processes */ + log_warn("Failed to get sentinel values"); + } + /* avoid busy looping, wait for 150ms before checking again */ pg_usleep(150 * 1000); } diff --git a/src/bin/pgcopydb/ld_apply.c b/src/bin/pgcopydb/ld_apply.c index 5a41e3050..473ab0415 100644 --- a/src/bin/pgcopydb/ld_apply.c +++ b/src/bin/pgcopydb/ld_apply.c @@ -790,7 +790,8 @@ stream_apply_sql(StreamApplyContext *context, lsn, metadata->timestamp)) { - /* errors have already been logged */ + log_error("Failed to setup apply transaction, " + "see above for details"); return false; } @@ -1806,8 +1807,8 @@ stream_apply_read_lsn_tracking(StreamApplyContext *context) /* it's okay if the file does not exists, just skip the operation */ if (!file_exists(filename)) { - log_warn("Failed to parse JSON file \"%s\": file does not exists", - filename); + log_notice("Failed to parse JSON file \"%s\": file does not exists", + filename); return true; } diff --git a/src/bin/pgcopydb/ld_stream.h b/src/bin/pgcopydb/ld_stream.h index c5987996c..067540b09 100644 --- a/src/bin/pgcopydb/ld_stream.h +++ b/src/bin/pgcopydb/ld_stream.h @@ -697,7 +697,9 @@ bool follow_setup_databases(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs); bool follow_reset_sequences(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs); bool follow_init_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel); -bool follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel); +bool follow_get_sentinel(StreamSpecs *specs, + CopyDBSentinel *sentinel, + bool verbose); bool follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs); bool followDB(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs);