diff --git a/src/bin/pgcopydb/cli_ping.c b/src/bin/pgcopydb/cli_ping.c index 7f21cd82..324804eb 100644 --- a/src/bin/pgcopydb/cli_ping.c +++ b/src/bin/pgcopydb/cli_ping.c @@ -314,6 +314,12 @@ cli_ping(int argc, char **argv) } } + if (errors > 0) + { + /* errors have already been logged */ + exit(EXIT_CODE_INTERNAL_ERROR); + } + /* * In case of error in one sub-process, we still want the other to fully * try. diff --git a/src/bin/pgcopydb/copydb.h b/src/bin/pgcopydb/copydb.h index 8b4352fd..0a46f931 100644 --- a/src/bin/pgcopydb/copydb.h +++ b/src/bin/pgcopydb/copydb.h @@ -528,6 +528,9 @@ bool summary_add_table(DatabaseCatalog *catalog, bool summary_finish_table(DatabaseCatalog *catalog, CopyTableDataSpec *tableSpecs); +bool summary_update_table_copy_stats(DatabaseCatalog *catalog, + CopyTableDataSpec *tableSpecs); + bool summary_delete_table(DatabaseCatalog *catalog, CopyTableDataSpec *tableSpecs); diff --git a/src/bin/pgcopydb/extensions.c b/src/bin/pgcopydb/extensions.c index b3cdd97c..cd5917e2 100644 --- a/src/bin/pgcopydb/extensions.c +++ b/src/bin/pgcopydb/extensions.c @@ -172,11 +172,13 @@ copydb_copy_ext_table(PGSQL *src, PGSQL *dst, char *qname, char *condition) .srcAttrList = "*", .srcWhereClause = condition, .dstQname = qname, - .dstAttrList = "", - .bytesTransmitted = 0 + .dstAttrList = "" }; - if (!pg_copy(src, dst, &args)) + /* skip statistics maintenance on extension configuration tables */ + CopyStats stats = { 0 }; + + if (!pg_copy(src, dst, &args, &stats, NULL, NULL)) { /* errors have already been logged */ return false; diff --git a/src/bin/pgcopydb/pgsql.c b/src/bin/pgcopydb/pgsql.c index 80864267..4b660ccb 100644 --- a/src/bin/pgcopydb/pgsql.c +++ b/src/bin/pgcopydb/pgsql.c @@ -56,7 +56,9 @@ static bool build_parameters_list(PQExpBuffer buffer, int paramCount, const char **paramValues); -static bool pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args); +static bool pg_copy_data(PGSQL *src, PGSQL *dst, + CopyArgs *args, CopyStats *stats, + void *context, CopyStatsCallback *callback); static bool pg_copy_send_query(PGSQL *pgsql, CopyArgs *args, ExecStatusType status); @@ -2659,7 +2661,9 @@ pgsql_truncate(PGSQL *pgsql, const char *qname) * referenced by the qualified identifier name dstQname on the target. */ bool -pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args) +pg_copy(PGSQL *src, PGSQL *dst, + CopyArgs *args, CopyStats *stats, + void *context, CopyStatsCallback *callback) { bool srcConnIsOurs = src->connection == NULL; if (!pgsql_open_connection(src)) @@ -2678,7 +2682,7 @@ pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args) return false; } - bool result = pg_copy_data(src, dst, args); + bool result = pg_copy_data(src, dst, args, stats, context, callback); if (srcConnIsOurs) { @@ -2700,7 +2704,9 @@ pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args) * expects src and dst are opened connection and doesn't manage their lifetime. */ static bool -pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args) +pg_copy_data(PGSQL *src, PGSQL *dst, + CopyArgs *args, CopyStats *stats, + void *context, CopyStatsCallback *callback) { PGconn *srcConn = src->connection; PGconn *dstConn = dst->connection; @@ -2719,7 +2725,10 @@ pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args) } } - /* cannot perform COPY FREEZE if the table was not created or truncated in the current subtransaction */ + /* + * COPY FREEZE is only accepted by Postgres if the table was created or + * truncated in the current transaction. + */ args->freeze &= args->truncate; /* make sure to log TRUNCATE before we log COPY, avoid confusion */ @@ -2741,7 +2750,11 @@ pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args) char *copybuf; bool failedOnSrc = false; bool failedOnDst = false; - args->bytesTransmitted = 0; + + /* also init and maintain copy statistics */ + stats->startTime = time(NULL); + stats->lastUpdate = stats->startTime; + stats->bytesTransmitted = 0; for (;;) { @@ -2848,7 +2861,21 @@ pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args) */ else if (bufsize > 0) { - args->bytesTransmitted += bufsize; + stats->lastUpdate = time(NULL); + stats->bytesTransmitted += bufsize; + + if (callback != NULL) + { + /* + * Allow the Copy Stats user callback to fail, still continue + * with the copy. + */ + if (!(*callback)(context, stats)) + { + log_debug("Copy Stats Callback failed, " + "see above for details"); + } + } } /* diff --git a/src/bin/pgcopydb/pgsql.h b/src/bin/pgcopydb/pgsql.h index b548f6bb..5ac18542 100644 --- a/src/bin/pgcopydb/pgsql.h +++ b/src/bin/pgcopydb/pgsql.h @@ -336,10 +336,21 @@ typedef struct CopyArgs char *logCommand; bool truncate; bool freeze; - uint64_t bytesTransmitted; } CopyArgs; -bool pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args); + +typedef struct CopyStats +{ + uint64_t startTime; + uint64_t lastUpdate; + uint64_t bytesTransmitted; +} CopyStats; + +typedef bool (CopyStatsCallback)(void *context, CopyStats *stats); + +bool pg_copy(PGSQL *src, PGSQL *dst, + CopyArgs *args, CopyStats *stats, + void *context, CopyStatsCallback *callback); bool pg_copy_from_stdin(PGSQL *pgsql, const char *qname); bool pg_copy_row_from_stdin(PGSQL *pgsql, char *fmt, ...); diff --git a/src/bin/pgcopydb/summary.c b/src/bin/pgcopydb/summary.c index e39a8b22..d3a3c08b 100644 --- a/src/bin/pgcopydb/summary.c +++ b/src/bin/pgcopydb/summary.c @@ -501,6 +501,83 @@ summary_finish_table(DatabaseCatalog *catalog, CopyTableDataSpec *tableSpecs) } +/* + * summary_update_table_copy_stats UPDATEs a summary entry to our internal + * catalogs database with the current COPY statistics, typically while the COPY + * command is running. + */ +bool +summary_update_table_copy_stats(DatabaseCatalog *catalog, + CopyTableDataSpec *tableSpecs) +{ + sqlite3 *db = catalog->db; + + if (db == NULL) + { + log_error("BUG: summary_update_table_copy_stats: db is NULL"); + return false; + } + + SourceTable *table = tableSpecs->sourceTable; + CopyTableSummary *tableSummary = &(tableSpecs->summary); + + char *sql = + "update summary set duration = $2, bytes = $3 " + "where pid = $4 and tableoid = $5 and partnum = $6"; + + if (!semaphore_lock(&(catalog->sema))) + { + /* errors have already been logged */ + return false; + } + + SQLiteQuery query = { 0 }; + + if (!catalog_sql_prepare(db, sql, &query)) + { + /* errors have already been logged */ + (void) semaphore_unlock(&(catalog->sema)); + return false; + } + + /* bind our parameters now */ + BindParam params[] = { + { BIND_PARAMETER_TYPE_INT64, "duration", + tableSummary->durationMs, NULL }, + + { BIND_PARAMETER_TYPE_INT64, "bytes", + tableSummary->bytesTransmitted, NULL }, + + { BIND_PARAMETER_TYPE_INT64, "pid", getpid(), NULL }, + { BIND_PARAMETER_TYPE_INT64, "tableoid", table->oid, NULL }, + + { BIND_PARAMETER_TYPE_INT64, "partnum", + table->partition.partNumber, NULL } + }; + + int count = sizeof(params) / sizeof(params[0]); + + if (!catalog_sql_bind(&query, params, count)) + { + /* errors have already been logged */ + (void) semaphore_unlock(&(catalog->sema)); + return false; + } + + /* now execute the query, which does not return any row */ + if (!catalog_sql_execute_once(&query)) + { + /* errors have already been logged */ + (void) semaphore_unlock(&(catalog->sema)); + return false; + } + + (void) semaphore_unlock(&(catalog->sema)); + + return true; +} + + /* * summary_table_all_parts_done sets tableSpecs->allPartsAreDone to true when * all the parts have already been done in the summary table of our internal diff --git a/src/bin/pgcopydb/table-data.c b/src/bin/pgcopydb/table-data.c index 208be4ff..edd6fa63 100644 --- a/src/bin/pgcopydb/table-data.c +++ b/src/bin/pgcopydb/table-data.c @@ -27,6 +27,7 @@ static bool copydb_copy_supervisor_add_table_hook(void *ctx, SourceTable *table); +static bool copydb_update_copy_stats_hook(void *ctx, CopyStats *stats); /* * copydb_table_data fetches the list of tables from the source database and @@ -1128,7 +1129,6 @@ copydb_table_create_lockfile(CopyDataSpec *specs, args->dstAttrList = tableSpecs->sourceTable->attrList; args->truncate = false; /* default value, see below */ args->freeze = tableSpecs->sourceTable->partition.partCount <= 1; - args->bytesTransmitted = 0; /* * Check to see if we want to TRUNCATE the table and benefit from the COPY @@ -1286,6 +1286,14 @@ copydb_table_parts_are_all_done(CopyDataSpec *specs, } +typedef struct UpdateCopyStatsContext +{ + CopyDataSpec *specs; + CopyTableDataSpec *tableSpecs; + uint64_t lastWrite; +} UpdateCopyStatsContext; + + /* * copydb_copy_table implements the sub-process activity to pg_dump | * pg_restore the table's data and then create the indexes and the constraints @@ -1305,6 +1313,7 @@ copydb_copy_table(CopyDataSpec *specs, PGSQL *src, PGSQL *dst, /* Now copy the data from source to target */ CopyTableSummary *summary = &(tableSpecs->summary); + CopyStats stats = { 0 }; int attempts = 0; int maxAttempts = 5; /* allow 5 attempts total, 4 retries */ @@ -1316,8 +1325,19 @@ copydb_copy_table(CopyDataSpec *specs, PGSQL *src, PGSQL *dst, { ++attempts; + /* re-init stats between attempts */ + CopyStats empty = { 0 }; + stats = empty; + + UpdateCopyStatsContext context = { + .specs = specs, + .tableSpecs = tableSpecs + }; + /* ignore previous attempts, we need only one success here */ - success = pg_copy(src, dst, &(tableSpecs->copyArgs)); + success = pg_copy(src, dst, + &(tableSpecs->copyArgs), &stats, + &context, ©db_update_copy_stats_hook); if (success) { @@ -1368,12 +1388,63 @@ copydb_copy_table(CopyDataSpec *specs, PGSQL *src, PGSQL *dst, } /* publish bytesTransmitted accumulated value to the summary */ - summary->bytesTransmitted = tableSpecs->copyArgs.bytesTransmitted; + summary->bytesTransmitted = stats.bytesTransmitted; return success; } +/* + * copydb_update_copy_stats_hook updates the bytesTransmitted data in our + * SQLite summary. + */ +static bool +copydb_update_copy_stats_hook(void *ctx, CopyStats *stats) +{ + UpdateCopyStatsContext *context = (UpdateCopyStatsContext *) ctx; + + DatabaseCatalog *sourceDB = &(context->specs->catalogs.source); + CopyTableDataSpec *tableSpecs = context->tableSpecs; + CopyTableSummary *summary = &(tableSpecs->summary); + + uint64_t now = time(NULL); + + if (context->lastWrite == 0) + { + context->lastWrite = now; + } + + /* refrain from updating the statistics too often */ + const uint64_t WRITE_INTERVAL_SECS = 5; + + if ((now - context->lastWrite) < WRITE_INTERVAL_SECS) + { + return true; + } + + /* update tablespecs summary durationMs and bytesTransmitted */ + summary->bytesTransmitted = stats->bytesTransmitted; + + instr_time duration; + + INSTR_TIME_SET_CURRENT(duration); + INSTR_TIME_SUBTRACT(duration, summary->startTimeInstr); + + summary->durationMs = INSTR_TIME_GET_MILLISEC(duration); + + if (!summary_update_table_copy_stats(sourceDB, tableSpecs)) + { + /* errors have already been logged */ + return false; + } + + /* keep track of when we wrote last time */ + context->lastWrite = now; + + return true; +} + + /* * copydb_prepare_copy_query prepares a COPY query using the list of attribute * names from the SourceTable instance.