Skip to content

Commit

Permalink
Process ENDPOS in transform and replay processes.
Browse files Browse the repository at this point in the history
When the ENDPOS internal message is received from the PIPEs in replay mode,
check the current sentinel.endpos value on the source database to make sure
the ENDPOS message matches with the current setting, and that being the case
stop processing.
  • Loading branch information
dimitri committed Nov 6, 2023
1 parent e3d5703 commit dcc51ad
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 10 deletions.
24 changes: 21 additions & 3 deletions src/bin/pgcopydb/file_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
{
int countFdsReadyToRead, nfds; /* see man select(2) */
fd_set readFileDescriptorSet;
fd_set exceptFileDescriptorSet;

context->fd = fileno(stream);
nfds = context->fd + 1;
Expand All @@ -442,19 +443,28 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
FD_ZERO(&readFileDescriptorSet);
FD_SET(context->fd, &readFileDescriptorSet);

FD_ZERO(&exceptFileDescriptorSet);
FD_SET(context->fd, &exceptFileDescriptorSet);

countFdsReadyToRead =
select(nfds, &readFileDescriptorSet, NULL, NULL, &timeout);
select(nfds,
&readFileDescriptorSet, NULL, &exceptFileDescriptorSet,
&timeout);

if (countFdsReadyToRead == -1)
{
log_debug("countFdsReadyToRead == -1");

if (errno == EINTR || errno == EAGAIN)
{
log_debug("received EINTR or EAGAIN");

if (asked_to_quit)
{
/*
* When asked_to_stop || asked_to_stop_fast still continue
* reading through EOF on the input stream, then quit
* normally. Here when
* normally.
*/
doneReading = true;
}
Expand All @@ -469,6 +479,14 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
}
}

if (FD_ISSET(context->fd, &exceptFileDescriptorSet))
{
log_error("Failed to select on file descriptor %d: "
"an exceptional condition happened",
context->fd);
return false;
}

/*
* When asked_to_stop || asked_to_stop_fast still continue reading
* through EOF on the input stream, then quit normally. Here when
Expand All @@ -480,7 +498,7 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context)
if (asked_to_quit)
{
doneReading = true;
log_notice("read_from_stream was asked to stop or quit");
log_notice("read_from_stream was asked to quit");
}

continue;
Expand Down
30 changes: 30 additions & 0 deletions src/bin/pgcopydb/ld_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,36 @@ stream_replay_line(void *ctx, const char *line, bool *stop)
break;
}

case STREAM_ACTION_ENDPOS:
{
PGSQL src = { 0 };
char *dsn = context->connStrings->source_pguri;

if (!pgsql_init(&src, dsn, PGSQL_CONN_SOURCE))
{
/* errors have already been logged */
return false;
}

CopyDBSentinel sentinel = { 0 };

if (!pgsql_get_sentinel(&src, &sentinel))
{
/* errors have already been logged */
return false;
}

if (sentinel.endpos <= metadata.lsn)
{
*stop = true;
context->reachedEndPos = true;

log_info("Replay reached ENDPOS %X/%X",
LSN_FORMAT_ARGS(metadata.lsn));
}
break;
}

/* skip reporting progress in other cases */
case STREAM_ACTION_BEGIN:
case STREAM_ACTION_INSERT:
Expand Down
47 changes: 40 additions & 7 deletions src/bin/pgcopydb/ld_transform.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,34 @@ stream_transform_line(void *ctx, const char *line, bool *stop)
return false;
}
}
/* at ENDPOS check that it's the current sentinel value and exit */
else if (metadata->action == STREAM_ACTION_ENDPOS)
{
PGSQL src = { 0 };
char *dsn = privateContext->connStrings->source_pguri;

if (!pgsql_init(&src, dsn, PGSQL_CONN_SOURCE))
{
/* errors have already been logged */
return false;
}

CopyDBSentinel sentinel = { 0 };

if (!pgsql_get_sentinel(&src, &sentinel))
{
/* errors have already been logged */
return false;
}

if (sentinel.endpos <= metadata->lsn)
{
*stop = true;

log_info("Transform process reached ENDPOS %X/%X",
LSN_FORMAT_ARGS(metadata->lsn));
}
}

if (privateContext->endpos != InvalidXLogRecPtr &&
privateContext->endpos <= metadata->lsn)
Expand Down Expand Up @@ -1255,8 +1283,11 @@ coalesceLogicalTransactionStatement(LogicalTransaction *txn,
{
LogicalTransactionStatement *last = txn->last;

LogicalMessageValuesArray *lastValuesArray = &(last->stmt.insert.new.array->values);
LogicalMessageValuesArray *newValuesArray = &(new->stmt.insert.new.array->values);
LogicalMessageValuesArray *lastValuesArray =
&(last->stmt.insert.new.array->values);

LogicalMessageValuesArray *newValuesArray =
&(new->stmt.insert.new.array->values);

int capacity = lastValuesArray->capacity;
LogicalMessageValues *array = lastValuesArray->array;
Expand All @@ -1275,8 +1306,9 @@ coalesceLogicalTransactionStatement(LogicalTransaction *txn,
* and potential heap memory fragmentation.
*/
capacity *= 2;
array = (LogicalMessageValues *) realloc(array, sizeof(LogicalMessageValues) *
capacity);
array = (LogicalMessageValues *)
realloc(array, sizeof(LogicalMessageValues) * capacity);

if (array == NULL)
{
log_error(ALLOCATION_FAILED_ERROR);
Expand Down Expand Up @@ -1318,7 +1350,8 @@ canCoalesceLogicalTransactionStatement(LogicalTransaction *txn,
LogicalTransactionStatement *last = txn->last;

/* TODO: Support UPDATE and DELETE */
if (last->action != STREAM_ACTION_INSERT || new->action != STREAM_ACTION_INSERT)
if (last->action != STREAM_ACTION_INSERT ||
new->action != STREAM_ACTION_INSERT)
{
return false;
}
Expand All @@ -1327,8 +1360,8 @@ canCoalesceLogicalTransactionStatement(LogicalTransaction *txn,
LogicalMessageInsert *newInsert = &new->stmt.insert;

/* Last and current statements must target same relation */
if (!streq(lastInsert->nspname, newInsert->nspname) || !streq(lastInsert->relname,
newInsert->relname))
if (!streq(lastInsert->nspname, newInsert->nspname) ||
!streq(lastInsert->relname, newInsert->relname))
{
return false;
}
Expand Down

0 comments on commit dcc51ad

Please sign in to comment.