diff --git a/src/bin/pgcopydb/file_utils.c b/src/bin/pgcopydb/file_utils.c index 055c31930..892c73376 100644 --- a/src/bin/pgcopydb/file_utils.c +++ b/src/bin/pgcopydb/file_utils.c @@ -419,6 +419,7 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context) { int countFdsReadyToRead, nfds; /* see man select(2) */ fd_set readFileDescriptorSet; + fd_set exceptFileDescriptorSet; context->fd = fileno(stream); nfds = context->fd + 1; @@ -442,15 +443,29 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context) FD_ZERO(&readFileDescriptorSet); FD_SET(context->fd, &readFileDescriptorSet); + FD_ZERO(&exceptFileDescriptorSet); + FD_SET(context->fd, &exceptFileDescriptorSet); + countFdsReadyToRead = - select(nfds, &readFileDescriptorSet, NULL, NULL, &timeout); + select(nfds, + &readFileDescriptorSet, NULL, &exceptFileDescriptorSet, + &timeout); if (countFdsReadyToRead == -1) { + log_debug("countFdsReadyToRead == -1"); + if (errno == EINTR || errno == EAGAIN) { - if (asked_to_stop || asked_to_stop_fast || asked_to_quit) + log_debug("received EINTR or EAGAIN"); + + if (asked_to_quit) { + /* + * When asked_to_stop || asked_to_stop_fast still continue + * reading through EOF on the input stream, then quit + * normally. + */ doneReading = true; } @@ -464,6 +479,14 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context) } } + if (FD_ISSET(context->fd, &exceptFileDescriptorSet)) + { + log_error("Failed to select on file descriptor %d: " + "an exceptional condition happened", + context->fd); + return false; + } + /* * When asked_to_stop || asked_to_stop_fast still continue reading * through EOF on the input stream, then quit normally. Here when @@ -472,10 +495,10 @@ read_from_stream(FILE *stream, ReadFromStreamContext *context) */ if (countFdsReadyToRead == 0) { - if (asked_to_stop || asked_to_stop_fast || asked_to_quit) + if (asked_to_quit) { doneReading = true; - log_notice("read_from_stream was asked to stop or quit"); + log_notice("read_from_stream was asked to quit"); } continue; diff --git a/src/bin/pgcopydb/follow.c b/src/bin/pgcopydb/follow.c index 0d160d04a..d35248158 100644 --- a/src/bin/pgcopydb/follow.c +++ b/src/bin/pgcopydb/follow.c @@ -187,7 +187,7 @@ follow_init_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel) * values from the pgcopydb.sentinel table on the source database. */ bool -follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel) +follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel, bool verbose) { PGSQL pgsql = { 0 }; @@ -217,16 +217,19 @@ follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel) LSN_FORMAT_ARGS(sentinel->replay_lsn), LSN_FORMAT_ARGS(sentinel->endpos)); } - else if (sentinel->endpos != InvalidXLogRecPtr) + else if (verbose) { - log_info("Current sentinel replay_lsn is %X/%X, endpos is %X/%X", - LSN_FORMAT_ARGS(sentinel->replay_lsn), - LSN_FORMAT_ARGS(sentinel->endpos)); - } - else if (sentinel->replay_lsn != InvalidXLogRecPtr) - { - log_info("Current sentinel replay_lsn is %X/%X", - LSN_FORMAT_ARGS(sentinel->replay_lsn)); + if (sentinel->endpos != InvalidXLogRecPtr) + { + log_info("Current sentinel replay_lsn is %X/%X, endpos is %X/%X", + LSN_FORMAT_ARGS(sentinel->replay_lsn), + LSN_FORMAT_ARGS(sentinel->endpos)); + } + else if (sentinel->replay_lsn != InvalidXLogRecPtr) + { + log_info("Current sentinel replay_lsn is %X/%X", + LSN_FORMAT_ARGS(sentinel->replay_lsn)); + } } return true; @@ -386,9 +389,10 @@ follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs) bool follow_reached_endpos(StreamSpecs *streamSpecs, bool *done) { + bool verbose = true; CopyDBSentinel *sentinel = &(streamSpecs->sentinel); - if (!follow_get_sentinel(streamSpecs, sentinel)) + if (!follow_get_sentinel(streamSpecs, sentinel, verbose)) { log_error("Failed to get sentinel values"); return false; @@ -978,6 +982,26 @@ follow_wait_subprocesses(StreamSpecs *specs) if (processArray[i]->returnCode != 0 || specs->endpos == InvalidXLogRecPtr) { + char endposStatus[BUFSIZE] = { 0 }; + + if (specs->endpos == InvalidXLogRecPtr) + { + sformat(endposStatus, sizeof(endposStatus), "unset"); + } + else + { + sformat(endposStatus, sizeof(endposStatus), + "set to %X/%X", + LSN_FORMAT_ARGS(specs->endpos)); + } + + log_notice("Process %s has exited with return code %d, " + "and endpos is %s: " + "terminating other processes", + processArray[i]->name, + processArray[i]->returnCode, + endposStatus); + if (!follow_terminate_subprocesses(specs)) { log_error("Failed to terminate other subprocesses, " @@ -990,6 +1014,13 @@ follow_wait_subprocesses(StreamSpecs *specs) } } + /* update current sentinel values (endpos) */ + if (!follow_get_sentinel(specs, &(specs->sentinel), false)) + { + /* ignore errors here, supervisor can't quit before sub-processes */ + log_warn("Failed to get sentinel values"); + } + /* avoid busy looping, wait for 150ms before checking again */ pg_usleep(150 * 1000); } diff --git a/src/bin/pgcopydb/ld_apply.c b/src/bin/pgcopydb/ld_apply.c index 91c563e1d..f31c2ac4f 100644 --- a/src/bin/pgcopydb/ld_apply.c +++ b/src/bin/pgcopydb/ld_apply.c @@ -830,7 +830,8 @@ stream_apply_sql(StreamApplyContext *context, lsn, metadata->timestamp)) { - /* errors have already been logged */ + log_error("Failed to setup apply transaction, " + "see above for details"); return false; } @@ -1750,8 +1751,8 @@ stream_apply_read_lsn_tracking(StreamApplyContext *context) /* it's okay if the file does not exists, just skip the operation */ if (!file_exists(filename)) { - log_warn("Failed to parse JSON file \"%s\": file does not exists", - filename); + log_notice("Failed to parse JSON file \"%s\": file does not exists", + filename); return true; } diff --git a/src/bin/pgcopydb/ld_replay.c b/src/bin/pgcopydb/ld_replay.c index 988417983..cf30737b9 100644 --- a/src/bin/pgcopydb/ld_replay.c +++ b/src/bin/pgcopydb/ld_replay.c @@ -217,6 +217,36 @@ stream_replay_line(void *ctx, const char *line, bool *stop) break; } + case STREAM_ACTION_ENDPOS: + { + PGSQL src = { 0 }; + char *dsn = context->connStrings->source_pguri; + + if (!pgsql_init(&src, dsn, PGSQL_CONN_SOURCE)) + { + /* errors have already been logged */ + return false; + } + + CopyDBSentinel sentinel = { 0 }; + + if (!pgsql_get_sentinel(&src, &sentinel)) + { + /* errors have already been logged */ + return false; + } + + if (sentinel.endpos <= metadata.lsn) + { + *stop = true; + context->reachedEndPos = true; + + log_info("Replay reached ENDPOS %X/%X", + LSN_FORMAT_ARGS(metadata.lsn)); + } + break; + } + /* skip reporting progress in other cases */ case STREAM_ACTION_BEGIN: case STREAM_ACTION_INSERT: diff --git a/src/bin/pgcopydb/ld_stream.h b/src/bin/pgcopydb/ld_stream.h index 4bade1838..cd7d8aedb 100644 --- a/src/bin/pgcopydb/ld_stream.h +++ b/src/bin/pgcopydb/ld_stream.h @@ -691,7 +691,9 @@ bool follow_setup_databases(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs); bool follow_reset_sequences(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs); bool follow_init_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel); -bool follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel); +bool follow_get_sentinel(StreamSpecs *specs, + CopyDBSentinel *sentinel, + bool verbose); bool follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs); bool followDB(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs); diff --git a/src/bin/pgcopydb/ld_transform.c b/src/bin/pgcopydb/ld_transform.c index 2008a3fbf..a1c51110f 100644 --- a/src/bin/pgcopydb/ld_transform.c +++ b/src/bin/pgcopydb/ld_transform.c @@ -311,6 +311,34 @@ stream_transform_line(void *ctx, const char *line, bool *stop) return false; } } + /* at ENDPOS check that it's the current sentinel value and exit */ + else if (metadata->action == STREAM_ACTION_ENDPOS) + { + PGSQL src = { 0 }; + char *dsn = privateContext->connStrings->source_pguri; + + if (!pgsql_init(&src, dsn, PGSQL_CONN_SOURCE)) + { + /* errors have already been logged */ + return false; + } + + CopyDBSentinel sentinel = { 0 }; + + if (!pgsql_get_sentinel(&src, &sentinel)) + { + /* errors have already been logged */ + return false; + } + + if (sentinel.endpos <= metadata->lsn) + { + *stop = true; + + log_info("Transform process reached ENDPOS %X/%X", + LSN_FORMAT_ARGS(metadata->lsn)); + } + } if (privateContext->endpos != InvalidXLogRecPtr && privateContext->endpos <= metadata->lsn) @@ -1255,8 +1283,11 @@ coalesceLogicalTransactionStatement(LogicalTransaction *txn, { LogicalTransactionStatement *last = txn->last; - LogicalMessageValuesArray *lastValuesArray = &(last->stmt.insert.new.array->values); - LogicalMessageValuesArray *newValuesArray = &(new->stmt.insert.new.array->values); + LogicalMessageValuesArray *lastValuesArray = + &(last->stmt.insert.new.array->values); + + LogicalMessageValuesArray *newValuesArray = + &(new->stmt.insert.new.array->values); int capacity = lastValuesArray->capacity; LogicalMessageValues *array = lastValuesArray->array; @@ -1275,8 +1306,9 @@ coalesceLogicalTransactionStatement(LogicalTransaction *txn, * and potential heap memory fragmentation. */ capacity *= 2; - array = (LogicalMessageValues *) realloc(array, sizeof(LogicalMessageValues) * - capacity); + array = (LogicalMessageValues *) + realloc(array, sizeof(LogicalMessageValues) * capacity); + if (array == NULL) { log_error(ALLOCATION_FAILED_ERROR); @@ -1318,7 +1350,8 @@ canCoalesceLogicalTransactionStatement(LogicalTransaction *txn, LogicalTransactionStatement *last = txn->last; /* TODO: Support UPDATE and DELETE */ - if (last->action != STREAM_ACTION_INSERT || new->action != STREAM_ACTION_INSERT) + if (last->action != STREAM_ACTION_INSERT || + new->action != STREAM_ACTION_INSERT) { return false; } @@ -1327,8 +1360,8 @@ canCoalesceLogicalTransactionStatement(LogicalTransaction *txn, LogicalMessageInsert *newInsert = &new->stmt.insert; /* Last and current statements must target same relation */ - if (!streq(lastInsert->nspname, newInsert->nspname) || !streq(lastInsert->relname, - newInsert->relname)) + if (!streq(lastInsert->nspname, newInsert->nspname) || + !streq(lastInsert->relname, newInsert->relname)) { return false; }