Skip to content

Commit

Permalink
Fixes #388: Add byte-level progress to pgcopydb list progress
Browse files Browse the repository at this point in the history
  • Loading branch information
rimbi authored and marikkan-microsoft committed Aug 1, 2024
1 parent 792c554 commit cb38b90
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 35 deletions.
6 changes: 4 additions & 2 deletions src/bin/pgcopydb/copydb.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,15 +436,17 @@ bool copydb_process_table_data_worker(CopyDataSpec *specs);
bool copydb_process_table_data_with_workers(CopyDataSpec *specs);

bool copydb_copy_table(CopyDataSpec *specs, PGSQL *src, PGSQL *dst,
CopyTableDataSpec *tableSpecs);
CopyTableDataSpec *tableSpecs,
CopyProgressCallback onCopyProgress,
void *onCopyProgressContext);


bool copydb_table_create_lockfile(CopyDataSpec *specs,
CopyTableDataSpec *tableSpecs,
PGSQL *dst,
bool *isDone);

bool copydb_mark_table_as_done(CopyDataSpec *specs,
bool copydb_save_copy_progress(CopyDataSpec *specs,
CopyTableDataSpec *tableSpecs);

bool copydb_table_parts_are_all_done(CopyDataSpec *specs,
Expand Down
7 changes: 5 additions & 2 deletions src/bin/pgcopydb/extensions.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <getopt.h>
#include <inttypes.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>

#include "catalog.h"
Expand Down Expand Up @@ -173,10 +174,12 @@ copydb_copy_ext_table(PGSQL *src, PGSQL *dst, char *qname, char *condition)
.srcWhereClause = condition,
.dstQname = qname,
.dstAttrList = "",
.bytesTransmitted = 0
.bytesTransmitted = 0,
.bytesTransmittedBeforeSavingProgress = 0,
.lastSavingTimeMs = time(NULL),
};

if (!pg_copy(src, dst, &args))
if (!pg_copy(src, dst, &args, NULL, NULL))
{
/* errors have already been logged */
return false;
Expand Down
17 changes: 11 additions & 6 deletions src/bin/pgcopydb/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ static bool build_parameters_list(PQExpBuffer buffer,
static void parseIdentifySystemResult(void *ctx, PGresult *result);
static void parseTimelineHistoryResult(void *ctx, PGresult *result);

static bool pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args);
static bool pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args, CopyProgressCallback
onProgress, void *context);

static bool pg_copy_send_query(PGSQL *pgsql, CopyArgs *args,
ExecStatusType status);
Expand Down Expand Up @@ -2662,7 +2663,8 @@ pgsql_truncate(PGSQL *pgsql, const char *qname)
* referenced by the qualified identifier name dstQname on the target.
*/
bool
pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args)
pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args, CopyProgressCallback
on_copy_progress_hook, void *context)
{
bool srcConnIsOurs = src->connection == NULL;
if (!pgsql_open_connection(src))
Expand All @@ -2681,7 +2683,7 @@ pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args)
return false;
}

bool result = pg_copy_data(src, dst, args);
bool result = pg_copy_data(src, dst, args, on_copy_progress_hook, context);

if (srcConnIsOurs)
{
Expand All @@ -2703,7 +2705,8 @@ pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args)
* expects src and dst are opened connection and doesn't manage their lifetime.
*/
static bool
pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args)
pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args, CopyProgressCallback
on_copy_progress_hook, void *context)
{
PGconn *srcConn = src->connection;
PGconn *dstConn = dst->connection;
Expand Down Expand Up @@ -2745,6 +2748,8 @@ pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args)
bool failedOnSrc = false;
bool failedOnDst = false;
args->bytesTransmitted = 0;
args->bytesTransmittedBeforeSavingProgress = 0;
args->lastSavingTimeMs = time(NULL);

for (;;)
{
Expand Down Expand Up @@ -2849,9 +2854,9 @@ pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args)
/*
* If successful PQgetCopyData returns the row length as a result.
*/
else if (bufsize > 0)
else if (bufsize > 0 && on_copy_progress_hook != NULL)
{
args->bytesTransmitted += bufsize;
on_copy_progress_hook(bufsize, context);
}

/*
Expand Down
9 changes: 8 additions & 1 deletion src/bin/pgcopydb/pgsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,16 @@ typedef struct CopyArgs
bool truncate;
bool freeze;
uint64_t bytesTransmitted;
uint64_t bytesTransmittedBeforeSavingProgress; /* the bytes transmitted before saving the progress to DB */
uint64_t lastSavingTimeMs; /* the last saving time of the progress to the DB */
} CopyArgs;

bool pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args);
/* Callback type to be called during the copy data operation */
typedef bool (*CopyProgressCallback)(int bytesTransmitted, void *context);


bool pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args, CopyProgressCallback
on_copy_progress_hook, void *context);

bool pg_copy_from_stdin(PGSQL *pgsql, const char *qname);
bool pg_copy_row_from_stdin(PGSQL *pgsql, char *fmt, ...);
Expand Down
18 changes: 9 additions & 9 deletions src/bin/pgcopydb/summary.c
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ summary_finish_vacuum(DatabaseCatalog *catalog, CopyTableDataSpec *tableSpecs)

if (db == NULL)
{
log_error("BUG: summary_finish_table: db is NULL");
log_error("BUG: summary_finish_vacuum: db is NULL");
return false;
}

Expand Down Expand Up @@ -2347,21 +2347,21 @@ catalog_timing_fetch(SQLiteQuery *query)
timing->doneTime = sqlite3_column_int64(query->ppStmt, 3);
timing->durationMs = sqlite3_column_int64(query->ppStmt, 4);

if (sqlite3_column_type(query->ppStmt, 5) != SQLITE_NULL)
if (timing->durationMs > 0)
{
strlcpy(timing->ppDuration,
(char *) sqlite3_column_text(query->ppStmt, 5),
sizeof(timing->ppDuration));
IntervalToString(timing->durationMs,
timing->ppDuration,
INTSTRING_MAX_DIGITS);
}

timing->count = sqlite3_column_int64(query->ppStmt, 6);
timing->bytes = sqlite3_column_int64(query->ppStmt, 7);

if (sqlite3_column_type(query->ppStmt, 8) != SQLITE_NULL)
if (timing->bytes > 0)
{
strlcpy(timing->ppBytes,
(char *) sqlite3_column_text(query->ppStmt, 8),
sizeof(timing->ppBytes));
pretty_print_bytes(timing->ppBytes,
sizeof(timing->ppBytes),
timing->bytes);
}

return true;
Expand Down
82 changes: 67 additions & 15 deletions src/bin/pgcopydb/table-data.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
#include <getopt.h>
#include <inttypes.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>

#include "pgsql.h"
#include "postgres_fe.h"
#include "libpq-fe.h"
#include "pqexpbuffer.h"
Expand All @@ -20,6 +22,7 @@
#include "lock_utils.h"
#include "log.h"
#include "pidfile.h"
#include "progress.h"
#include "schema.h"
#include "signals.h"
#include "string_utils.h"
Expand Down Expand Up @@ -836,6 +839,45 @@ copydb_table_data_worker(CopyDataSpec *specs)
}


typedef struct CopyProgressContext
{
CopyDataSpec *specs;
CopyTableDataSpec *tableSpecs;
} CopyProgressContext;

/*
* on_copy_progress_hook saves the copy progress into the catalog.
* It avoids frequent access to the DB by using an arbitrary threshold value.
*/
static bool
on_copy_progress_hook(int bytesTransmitted, void *context)
{
CopyProgressContext *ctx = (CopyProgressContext *) context;
CopyDataSpec *specs = ctx->specs;
CopyTableDataSpec *tableSpecs = ctx->tableSpecs;
CopyArgs *copyArgs = &(tableSpecs->copyArgs);

copyArgs->bytesTransmittedBeforeSavingProgress += bytesTransmitted;
copyArgs->bytesTransmitted += bytesTransmitted;
tableSpecs->summary.bytesTransmitted += bytesTransmitted;
const int NUMBER_OF_BYTES_BEFORE_SAVE = 10 * 1024 * 1024;
if (copyArgs->bytesTransmittedBeforeSavingProgress < NUMBER_OF_BYTES_BEFORE_SAVE)
{
/* Avoid frequent access to DB, safe to return here */
return true;
}

if (!copydb_save_copy_progress(specs, tableSpecs))
{
/* errors have already been logged */
return false;
}

copyArgs->bytesTransmittedBeforeSavingProgress = 0;
return true;
}


/*
* copydb_copy_data_by_oid finds the SourceTable entry by its OID and then
* COPY the table data to the target database.
Expand Down Expand Up @@ -947,14 +989,19 @@ copydb_copy_data_by_oid(CopyDataSpec *specs, PGSQL *src, PGSQL *dst,
*/
if (!table->excludeData)
{
if (!copydb_copy_table(specs, src, dst, tableSpecs))
CopyProgressContext context = {
.specs = specs,
.tableSpecs = tableSpecs
};
if (!copydb_copy_table(specs, src, dst, tableSpecs, on_copy_progress_hook,
&context))
{
/* errors have already been logged */
return false;
}
}

if (!copydb_mark_table_as_done(specs, tableSpecs))
if (!copydb_save_copy_progress(specs, tableSpecs))
{
/* errors have already been logged */
return false;
Expand Down Expand Up @@ -1129,6 +1176,8 @@ copydb_table_create_lockfile(CopyDataSpec *specs,
args->truncate = false; /* default value, see below */
args->freeze = tableSpecs->sourceTable->partition.partCount <= 1;
args->bytesTransmitted = 0;
args->bytesTransmittedBeforeSavingProgress = 0;
args->lastSavingTimeMs = time(NULL);

/*
* Check to see if we want to TRUNCATE the table and benefit from the COPY
Expand Down Expand Up @@ -1196,12 +1245,11 @@ copydb_table_create_lockfile(CopyDataSpec *specs,


/*
* copydb_mark_table_as_done creates the table doneFile with the expected
* summary content. To create a doneFile we must acquire the synchronisation
* semaphore first. The lockFile is also removed here.
* copydb_save_copy_progress saves the duration and the number of
* bytes transmitted about the copy progress into the catalog.
*/
bool
copydb_mark_table_as_done(CopyDataSpec *specs,
copydb_save_copy_progress(CopyDataSpec *specs,
CopyTableDataSpec *tableSpecs)
{
DatabaseCatalog *sourceDB = &(specs->catalogs.source);
Expand All @@ -1212,11 +1260,17 @@ copydb_mark_table_as_done(CopyDataSpec *specs,
return false;
}

CopyArgs *copyArgs = &(tableSpecs->copyArgs);

/* calculate the elapsed time since the last copy operation */
uint64_t nowMs = time(NULL);
uint64_t elapsedTimeMs = nowMs - copyArgs->lastSavingTimeMs;
copyArgs->lastSavingTimeMs = nowMs;
if (!summary_increment_timing(sourceDB,
TIMING_SECTION_COPY_DATA,
1, /* count */
tableSpecs->sourceTable->bytes,
tableSpecs->summary.durationMs))
copyArgs->bytesTransmittedBeforeSavingProgress,
elapsedTimeMs))
{
/* errors have already been logged */
return false;
Expand Down Expand Up @@ -1293,7 +1347,9 @@ copydb_table_parts_are_all_done(CopyDataSpec *specs,
*/
bool
copydb_copy_table(CopyDataSpec *specs, PGSQL *src, PGSQL *dst,
CopyTableDataSpec *tableSpecs)
CopyTableDataSpec *tableSpecs,
CopyProgressCallback onCopyProgress,
void *onCopyProgressContext)
{
/* COPY the data from the source table to the target table */
if (tableSpecs->section != DATA_SECTION_TABLE_DATA &&
Expand All @@ -1304,8 +1360,6 @@ copydb_copy_table(CopyDataSpec *specs, PGSQL *src, PGSQL *dst,
}

/* Now copy the data from source to target */
CopyTableSummary *summary = &(tableSpecs->summary);

int attempts = 0;
int maxAttempts = 5; /* allow 5 attempts total, 4 retries */

Expand All @@ -1317,7 +1371,8 @@ copydb_copy_table(CopyDataSpec *specs, PGSQL *src, PGSQL *dst,
++attempts;

/* ignore previous attempts, we need only one success here */
success = pg_copy(src, dst, &(tableSpecs->copyArgs));
success = pg_copy(src, dst, &(tableSpecs->copyArgs), onCopyProgress,
onCopyProgressContext);

if (success)
{
Expand Down Expand Up @@ -1367,9 +1422,6 @@ copydb_copy_table(CopyDataSpec *specs, PGSQL *src, PGSQL *dst,
}
}

/* publish bytesTransmitted accumulated value to the summary */
summary->bytesTransmitted = tableSpecs->copyArgs.bytesTransmitted;

return success;
}

Expand Down

0 comments on commit cb38b90

Please sign in to comment.