Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pgcopydb sentinel in the main follow process. #521

Merged
merged 2 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions src/bin/pgcopydb/file_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
{
int countFdsReadyToRead, nfds; /* see man select(2) */
fd_set readFileDescriptorSet;
fd_set exceptFileDescriptorSet;

context->fd = fileno(stream);
nfds = context->fd + 1;
Expand All @@ -442,15 +443,29 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
FD_ZERO(&readFileDescriptorSet);
FD_SET(context->fd, &readFileDescriptorSet);

FD_ZERO(&exceptFileDescriptorSet);
FD_SET(context->fd, &exceptFileDescriptorSet);

countFdsReadyToRead =
select(nfds, &readFileDescriptorSet, NULL, NULL, &timeout);
select(nfds,
&readFileDescriptorSet, NULL, &exceptFileDescriptorSet,
&timeout);

if (countFdsReadyToRead == -1)
{
log_debug("countFdsReadyToRead == -1");

if (errno == EINTR || errno == EAGAIN)
{
if (asked_to_stop || asked_to_stop_fast || asked_to_quit)
log_debug("received EINTR or EAGAIN");

if (asked_to_quit)
{
/*
* When asked_to_stop || asked_to_stop_fast still continue
* reading through EOF on the input stream, then quit
* normally.
*/
doneReading = true;
}

Expand All @@ -464,6 +479,14 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
}
}

if (FD_ISSET(context->fd, &exceptFileDescriptorSet))
{
log_error("Failed to select on file descriptor %d: "
"an exceptional condition happened",
context->fd);
return false;
}

/*
* When asked_to_stop || asked_to_stop_fast still continue reading
* through EOF on the input stream, then quit normally. Here when
Expand All @@ -472,10 +495,10 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
*/
if (countFdsReadyToRead == 0)
{
if (asked_to_stop || asked_to_stop_fast || asked_to_quit)
if (asked_to_quit)
{
doneReading = true;
log_notice("read_from_stream was asked to stop or quit");
log_notice("read_from_stream was asked to quit");
}

continue;
Expand Down
53 changes: 42 additions & 11 deletions src/bin/pgcopydb/follow.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ follow_init_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel)
* values from the pgcopydb.sentinel table on the source database.
*/
bool
follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel)
follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel, bool verbose)
{
PGSQL pgsql = { 0 };

Expand Down Expand Up @@ -217,16 +217,19 @@ follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel)
LSN_FORMAT_ARGS(sentinel->replay_lsn),
LSN_FORMAT_ARGS(sentinel->endpos));
}
else if (sentinel->endpos != InvalidXLogRecPtr)
else if (verbose)
{
log_info("Current sentinel replay_lsn is %X/%X, endpos is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn),
LSN_FORMAT_ARGS(sentinel->endpos));
}
else if (sentinel->replay_lsn != InvalidXLogRecPtr)
{
log_info("Current sentinel replay_lsn is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn));
if (sentinel->endpos != InvalidXLogRecPtr)
{
log_info("Current sentinel replay_lsn is %X/%X, endpos is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn),
LSN_FORMAT_ARGS(sentinel->endpos));
}
else if (sentinel->replay_lsn != InvalidXLogRecPtr)
{
log_info("Current sentinel replay_lsn is %X/%X",
LSN_FORMAT_ARGS(sentinel->replay_lsn));
}
}

return true;
Expand Down Expand Up @@ -386,9 +389,10 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
bool
follow_reached_endpos(StreamSpecs *streamSpecs, bool *done)
{
bool verbose = true;
CopyDBSentinel *sentinel = &(streamSpecs->sentinel);

if (!follow_get_sentinel(streamSpecs, sentinel))
if (!follow_get_sentinel(streamSpecs, sentinel, verbose))
{
log_error("Failed to get sentinel values");
return false;
Expand Down Expand Up @@ -978,6 +982,26 @@ follow_wait_subprocesses(StreamSpecs *specs)
if (processArray[i]->returnCode != 0 ||
specs->endpos == InvalidXLogRecPtr)
{
char endposStatus[BUFSIZE] = { 0 };

if (specs->endpos == InvalidXLogRecPtr)
{
sformat(endposStatus, sizeof(endposStatus), "unset");
}
else
{
sformat(endposStatus, sizeof(endposStatus),
"set to %X/%X",
LSN_FORMAT_ARGS(specs->endpos));
}

log_notice("Process %s has exited with return code %d, "
"and endpos is %s: "
"terminating other processes",
processArray[i]->name,
processArray[i]->returnCode,
endposStatus);

if (!follow_terminate_subprocesses(specs))
{
log_error("Failed to terminate other subprocesses, "
Expand All @@ -990,6 +1014,13 @@ follow_wait_subprocesses(StreamSpecs *specs)
}
}

/* update current sentinel values (endpos) */
if (!follow_get_sentinel(specs, &(specs->sentinel), false))
Copy link
Contributor

@arajkumar arajkumar Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimitri I'm seeing lots of print(with -vv) with the latest version due to this. IIUC, now we call follow_get_sentinel for every 150ms?

play_lsn 0/0
2023-11-07 06:50:17 895197 SQL pgsql.c:490 Connecting to [source] "postgres://[email protected]:26479/defaultdb?sslmode=require&keepalives=1&keepalives_idle=10&keepalives_interval=10&keepalives_count=60"
2023-11-07 06:50:17 895197 SQL pgsql.c:1471 [SOURCE 4026526] select startpos, endpos, apply, write_lsn, flush_lsn, replay_lsn from pgcopydb.sentinel;
2023-11-07 06:50:17 895197 SQL pgsql.c:400 Disconnecting from [source] "postgres://[email protected]:26479/defaultdb?sslmode=require&keepalives=1&keepalives_idle=10&keepalives_interval=10&keepalives_count=60"
2023-11-07 06:50:17 895197

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we do. We could call it less often I suppose, I will have a look, thanks for your feedback.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #531

{
/* ignore errors here, supervisor can't quit before sub-processes */
log_warn("Failed to get sentinel values");
}

/* avoid busy looping, wait for 150ms before checking again */
pg_usleep(150 * 1000);
}
Expand Down
7 changes: 4 additions & 3 deletions src/bin/pgcopydb/ld_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,8 @@ stream_apply_sql(StreamApplyContext *context,
lsn,
metadata->timestamp))
{
/* errors have already been logged */
log_error("Failed to setup apply transaction, "
"see above for details");
return false;
}

Expand Down Expand Up @@ -1750,8 +1751,8 @@ stream_apply_read_lsn_tracking(StreamApplyContext *context)
/* it's okay if the file does not exists, just skip the operation */
if (!file_exists(filename))
{
log_warn("Failed to parse JSON file \"%s\": file does not exists",
filename);
log_notice("Failed to parse JSON file \"%s\": file does not exists",
filename);
return true;
}

Expand Down
30 changes: 30 additions & 0 deletions src/bin/pgcopydb/ld_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,36 @@ stream_replay_line(void *ctx, const char *line, bool *stop)
break;
}

case STREAM_ACTION_ENDPOS:
{
PGSQL src = { 0 };
char *dsn = context->connStrings->source_pguri;

if (!pgsql_init(&src, dsn, PGSQL_CONN_SOURCE))
{
/* errors have already been logged */
return false;
}

CopyDBSentinel sentinel = { 0 };

if (!pgsql_get_sentinel(&src, &sentinel))
{
/* errors have already been logged */
return false;
}

if (sentinel.endpos <= metadata.lsn)
{
*stop = true;
context->reachedEndPos = true;

log_info("Replay reached ENDPOS %X/%X",
LSN_FORMAT_ARGS(metadata.lsn));
}
break;
}

/* skip reporting progress in other cases */
case STREAM_ACTION_BEGIN:
case STREAM_ACTION_INSERT:
Expand Down
4 changes: 3 additions & 1 deletion src/bin/pgcopydb/ld_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,9 @@ bool follow_setup_databases(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs);
bool follow_reset_sequences(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs);

bool follow_init_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel);
bool follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel);
bool follow_get_sentinel(StreamSpecs *specs,
CopyDBSentinel *sentinel,
bool verbose);
bool follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs);

bool followDB(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs);
Expand Down
47 changes: 40 additions & 7 deletions src/bin/pgcopydb/ld_transform.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,34 @@ stream_transform_line(void *ctx, const char *line, bool *stop)
return false;
}
}
/* at ENDPOS check that it's the current sentinel value and exit */
else if (metadata->action == STREAM_ACTION_ENDPOS)
{
PGSQL src = { 0 };
char *dsn = privateContext->connStrings->source_pguri;

if (!pgsql_init(&src, dsn, PGSQL_CONN_SOURCE))
{
/* errors have already been logged */
return false;
}

CopyDBSentinel sentinel = { 0 };

if (!pgsql_get_sentinel(&src, &sentinel))
{
/* errors have already been logged */
return false;
}

if (sentinel.endpos <= metadata->lsn)
{
*stop = true;

log_info("Transform process reached ENDPOS %X/%X",
LSN_FORMAT_ARGS(metadata->lsn));
}
}

if (privateContext->endpos != InvalidXLogRecPtr &&
privateContext->endpos <= metadata->lsn)
Expand Down Expand Up @@ -1255,8 +1283,11 @@ coalesceLogicalTransactionStatement(LogicalTransaction *txn,
{
LogicalTransactionStatement *last = txn->last;

LogicalMessageValuesArray *lastValuesArray = &(last->stmt.insert.new.array->values);
LogicalMessageValuesArray *newValuesArray = &(new->stmt.insert.new.array->values);
LogicalMessageValuesArray *lastValuesArray =
&(last->stmt.insert.new.array->values);

LogicalMessageValuesArray *newValuesArray =
&(new->stmt.insert.new.array->values);

int capacity = lastValuesArray->capacity;
LogicalMessageValues *array = lastValuesArray->array;
Expand All @@ -1275,8 +1306,9 @@ coalesceLogicalTransactionStatement(LogicalTransaction *txn,
* and potential heap memory fragmentation.
*/
capacity *= 2;
array = (LogicalMessageValues *) realloc(array, sizeof(LogicalMessageValues) *
capacity);
array = (LogicalMessageValues *)
realloc(array, sizeof(LogicalMessageValues) * capacity);

if (array == NULL)
{
log_error(ALLOCATION_FAILED_ERROR);
Expand Down Expand Up @@ -1318,7 +1350,8 @@ canCoalesceLogicalTransactionStatement(LogicalTransaction *txn,
LogicalTransactionStatement *last = txn->last;

/* TODO: Support UPDATE and DELETE */
if (last->action != STREAM_ACTION_INSERT || new->action != STREAM_ACTION_INSERT)
if (last->action != STREAM_ACTION_INSERT ||
new->action != STREAM_ACTION_INSERT)
{
return false;
}
Expand All @@ -1327,8 +1360,8 @@ canCoalesceLogicalTransactionStatement(LogicalTransaction *txn,
LogicalMessageInsert *newInsert = &new->stmt.insert;

/* Last and current statements must target same relation */
if (!streq(lastInsert->nspname, newInsert->nspname) || !streq(lastInsert->relname,
newInsert->relname))
if (!streq(lastInsert->nspname, newInsert->nspname) ||
!streq(lastInsert->relname, newInsert->relname))
{
return false;
}
Expand Down
Loading