Skip to content

Commit

Permalink
Update pgcopydb sentinel in the main follow process.
Browse files Browse the repository at this point in the history
To avoid infinite looping when endpos has been reached it's important to
update our endpos value to the sentinel's one even in the main follow
process.

In passing, review some error messages.

Also refrain from stopping early from reading data from a PIPE when a
terminating signal is received, we should finish reading as per the comments
in the code.
  • Loading branch information
dimitri committed Oct 30, 2023
1 parent 0557802 commit 927c29d
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 17 deletions.
9 changes: 7 additions & 2 deletions src/bin/pgcopydb/file_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,13 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
{
if (errno == EINTR || errno == EAGAIN)
{
if (asked_to_stop || asked_to_stop_fast || asked_to_quit)
if (asked_to_quit)
{
/*
* When asked_to_stop || asked_to_stop_fast still continue
* reading through EOF on the input stream, then quit
* normally. Here when
*/
doneReading = true;
}

Expand All @@ -472,7 +477,7 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
*/
if (countFdsReadyToRead == 0)
{
if (asked_to_stop || asked_to_stop_fast || asked_to_quit)
if (asked_to_quit)
{
doneReading = true;
log_notice("read_from_stream was asked to stop or quit");
Expand Down
53 changes: 42 additions & 11 deletions src/bin/pgcopydb/follow.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ follow_init_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel)
* values from the pgcopydb.sentinel table on the source database.
*/
bool
follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel)
follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel, bool verbose)
{
PGSQL pgsql = { 0 };

Expand Down Expand Up @@ -216,16 +216,19 @@ follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel)
LSN_FORMAT_ARGS(sentinel->replay_lsn),
LSN_FORMAT_ARGS(sentinel->endpos));
}
else if (sentinel->endpos != InvalidXLogRecPtr)
else if (verbose)
{
log_info("Current sentinel replay_lsn is %X/%X, endpos is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn),
LSN_FORMAT_ARGS(sentinel->endpos));
}
else if (sentinel->replay_lsn != InvalidXLogRecPtr)
{
log_info("Current sentinel replay_lsn is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn));
if (sentinel->endpos != InvalidXLogRecPtr)
{
log_info("Current sentinel replay_lsn is %X/%X, endpos is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn),
LSN_FORMAT_ARGS(sentinel->endpos));
}
else if (sentinel->replay_lsn != InvalidXLogRecPtr)
{
log_info("Current sentinel replay_lsn is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn));
}
}

return true;
Expand Down Expand Up @@ -375,9 +378,10 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
bool
follow_reached_endpos(StreamSpecs *streamSpecs, bool *done)
{
bool verbose = true;
CopyDBSentinel *sentinel = &(streamSpecs->sentinel);

if (!follow_get_sentinel(streamSpecs, sentinel))
if (!follow_get_sentinel(streamSpecs, sentinel, verbose))
{
log_error("Failed to get sentinel values");
return false;
Expand Down Expand Up @@ -967,6 +971,26 @@ follow_wait_subprocesses(StreamSpecs *specs)
if (processArray[i]->returnCode != 0 ||
specs->endpos == InvalidXLogRecPtr)
{
char endposStatus[BUFSIZE] = { 0 };

if (specs->endpos == InvalidXLogRecPtr)
{
sformat(endposStatus, sizeof(endposStatus), "unset");
}
else
{
sformat(endposStatus, sizeof(endposStatus),
"set to %X/%X",
LSN_FORMAT_ARGS(specs->endpos));
}

log_notice("Process %s has exited with return code %d, "
"and endpos is %s: "
"terminating other processes",
processArray[i]->name,
processArray[i]->returnCode,
endposStatus);

if (!follow_terminate_subprocesses(specs))
{
log_error("Failed to terminate other subprocesses, "
Expand All @@ -979,6 +1003,13 @@ follow_wait_subprocesses(StreamSpecs *specs)
}
}

/* update current sentinel values (endpos) */
if (!follow_get_sentinel(specs, &(specs->sentinel), false))
{
/* ignore errors here, supervisor can't quit before sub-processes */
log_warn("Failed to get sentinel values");
}

/* avoid busy looping, wait for 150ms before checking again */
pg_usleep(150 * 1000);
}
Expand Down
7 changes: 4 additions & 3 deletions src/bin/pgcopydb/ld_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,8 @@ stream_apply_sql(StreamApplyContext *context,
lsn,
metadata->timestamp))
{
/* errors have already been logged */
log_error("Failed to setup apply transaction, "
"see above for details");
return false;
}

Expand Down Expand Up @@ -1806,8 +1807,8 @@ stream_apply_read_lsn_tracking(StreamApplyContext *context)
/* it's okay if the file does not exists, just skip the operation */
if (!file_exists(filename))
{
log_warn("Failed to parse JSON file \"%s\": file does not exists",
filename);
log_notice("Failed to parse JSON file \"%s\": file does not exists",
filename);
return true;
}

Expand Down
4 changes: 3 additions & 1 deletion src/bin/pgcopydb/ld_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,9 @@ bool follow_setup_databases(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs);
bool follow_reset_sequences(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs);

bool follow_init_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel);
bool follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel);
bool follow_get_sentinel(StreamSpecs *specs,
CopyDBSentinel *sentinel,
bool verbose);
bool follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs);

bool followDB(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs);
Expand Down

0 comments on commit 927c29d

Please sign in to comment.