Skip to content

Commit

Permalink
Merge branch 'main' into improve-connection-timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitri authored Nov 6, 2023
2 parents e7c5382 + 2a1c778 commit 0097a6a
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 275 deletions.
31 changes: 27 additions & 4 deletions src/bin/pgcopydb/file_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
{
int countFdsReadyToRead, nfds; /* see man select(2) */
fd_set readFileDescriptorSet;
fd_set exceptFileDescriptorSet;

context->fd = fileno(stream);
nfds = context->fd + 1;
Expand All @@ -442,15 +443,29 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
FD_ZERO(&readFileDescriptorSet);
FD_SET(context->fd, &readFileDescriptorSet);

FD_ZERO(&exceptFileDescriptorSet);
FD_SET(context->fd, &exceptFileDescriptorSet);

countFdsReadyToRead =
select(nfds, &readFileDescriptorSet, NULL, NULL, &timeout);
select(nfds,
&readFileDescriptorSet, NULL, &exceptFileDescriptorSet,
&timeout);

if (countFdsReadyToRead == -1)
{
log_debug("countFdsReadyToRead == -1");

if (errno == EINTR || errno == EAGAIN)
{
if (asked_to_stop || asked_to_stop_fast || asked_to_quit)
log_debug("received EINTR or EAGAIN");

if (asked_to_quit)
{
/*
* When asked_to_stop || asked_to_stop_fast still continue
* reading through EOF on the input stream, then quit
* normally.
*/
doneReading = true;
}

Expand All @@ -464,6 +479,14 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
}
}

if (FD_ISSET(context->fd, &exceptFileDescriptorSet))
{
log_error("Failed to select on file descriptor %d: "
"an exceptional condition happened",
context->fd);
return false;
}

/*
* When asked_to_stop || asked_to_stop_fast still continue reading
* through EOF on the input stream, then quit normally. Here when
Expand All @@ -472,10 +495,10 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
*/
if (countFdsReadyToRead == 0)
{
if (asked_to_stop || asked_to_stop_fast || asked_to_quit)
if (asked_to_quit)
{
doneReading = true;
log_notice("read_from_stream was asked to stop or quit");
log_notice("read_from_stream was asked to quit");
}

continue;
Expand Down
53 changes: 42 additions & 11 deletions src/bin/pgcopydb/follow.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ follow_init_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel)
* values from the pgcopydb.sentinel table on the source database.
*/
bool
follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel)
follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel, bool verbose)
{
PGSQL pgsql = { 0 };

Expand Down Expand Up @@ -217,16 +217,19 @@ follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel)
LSN_FORMAT_ARGS(sentinel->replay_lsn),
LSN_FORMAT_ARGS(sentinel->endpos));
}
else if (sentinel->endpos != InvalidXLogRecPtr)
else if (verbose)
{
log_info("Current sentinel replay_lsn is %X/%X, endpos is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn),
LSN_FORMAT_ARGS(sentinel->endpos));
}
else if (sentinel->replay_lsn != InvalidXLogRecPtr)
{
log_info("Current sentinel replay_lsn is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn));
if (sentinel->endpos != InvalidXLogRecPtr)
{
log_info("Current sentinel replay_lsn is %X/%X, endpos is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn),
LSN_FORMAT_ARGS(sentinel->endpos));
}
else if (sentinel->replay_lsn != InvalidXLogRecPtr)
{
log_info("Current sentinel replay_lsn is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn));
}
}

return true;
Expand Down Expand Up @@ -386,9 +389,10 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
bool
follow_reached_endpos(StreamSpecs *streamSpecs, bool *done)
{
bool verbose = true;
CopyDBSentinel *sentinel = &(streamSpecs->sentinel);

if (!follow_get_sentinel(streamSpecs, sentinel))
if (!follow_get_sentinel(streamSpecs, sentinel, verbose))
{
log_error("Failed to get sentinel values");
return false;
Expand Down Expand Up @@ -978,6 +982,26 @@ follow_wait_subprocesses(StreamSpecs *specs)
if (processArray[i]->returnCode != 0 ||
specs->endpos == InvalidXLogRecPtr)
{
char endposStatus[BUFSIZE] = { 0 };

if (specs->endpos == InvalidXLogRecPtr)
{
sformat(endposStatus, sizeof(endposStatus), "unset");
}
else
{
sformat(endposStatus, sizeof(endposStatus),
"set to %X/%X",
LSN_FORMAT_ARGS(specs->endpos));
}

log_notice("Process %s has exited with return code %d, "
"and endpos is %s: "
"terminating other processes",
processArray[i]->name,
processArray[i]->returnCode,
endposStatus);

if (!follow_terminate_subprocesses(specs))
{
log_error("Failed to terminate other subprocesses, "
Expand All @@ -990,6 +1014,13 @@ follow_wait_subprocesses(StreamSpecs *specs)
}
}

/* update current sentinel values (endpos) */
if (!follow_get_sentinel(specs, &(specs->sentinel), false))
{
/* ignore errors here, supervisor can't quit before sub-processes */
log_warn("Failed to get sentinel values");
}

/* avoid busy looping, wait for 150ms before checking again */
pg_usleep(150 * 1000);
}
Expand Down
Loading

0 comments on commit 0097a6a

Please sign in to comment.