Skip to content

Commit

Permalink
Merge branch 'main' into use-copy-with-binary
Browse files Browse the repository at this point in the history
  • Loading branch information
VaibhaveS authored Aug 2, 2024
2 parents b28d466 + 64a5ca7 commit 528a1a2
Show file tree
Hide file tree
Showing 28 changed files with 264 additions and 68 deletions.
6 changes: 6 additions & 0 deletions src/bin/pgcopydb/cli_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ cli_ping(int argc, char **argv)
}
}

if (errors > 0)
{
/* errors have already been logged */
exit(EXIT_CODE_INTERNAL_ERROR);
}

/*
* In case of error in one sub-process, we still want the other to fully
* try.
Expand Down
3 changes: 3 additions & 0 deletions src/bin/pgcopydb/copydb.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,9 @@ bool summary_add_table(DatabaseCatalog *catalog,
bool summary_finish_table(DatabaseCatalog *catalog,
CopyTableDataSpec *tableSpecs);

bool summary_update_table_copy_stats(DatabaseCatalog *catalog,
CopyTableDataSpec *tableSpecs);

bool summary_delete_table(DatabaseCatalog *catalog,
CopyTableDataSpec *tableSpecs);

Expand Down
8 changes: 5 additions & 3 deletions src/bin/pgcopydb/extensions.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,13 @@ copydb_copy_ext_table(PGSQL *src, PGSQL *dst, char *qname, char *condition)
.srcAttrList = "*",
.srcWhereClause = condition,
.dstQname = qname,
.dstAttrList = "",
.bytesTransmitted = 0
.dstAttrList = ""
};

if (!pg_copy(src, dst, &args))
/* skip statistics maintenance on extension configuration tables */
CopyStats stats = { 0 };

if (!pg_copy(src, dst, &args, &stats, NULL, NULL))
{
/* errors have already been logged */
return false;
Expand Down
39 changes: 32 additions & 7 deletions src/bin/pgcopydb/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ static bool build_parameters_list(PQExpBuffer buffer,
int paramCount,
const char **paramValues);

static bool pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args);
static bool pg_copy_data(PGSQL *src, PGSQL *dst,
CopyArgs *args, CopyStats *stats,
void *context, CopyStatsCallback *callback);

static bool pg_copy_send_query(PGSQL *pgsql, CopyArgs *args,
ExecStatusType status);
Expand Down Expand Up @@ -2659,7 +2661,9 @@ pgsql_truncate(PGSQL *pgsql, const char *qname)
* referenced by the qualified identifier name dstQname on the target.
*/
bool
pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args)
pg_copy(PGSQL *src, PGSQL *dst,
CopyArgs *args, CopyStats *stats,
void *context, CopyStatsCallback *callback)
{
bool srcConnIsOurs = src->connection == NULL;
if (!pgsql_open_connection(src))
Expand All @@ -2678,7 +2682,7 @@ pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args)
return false;
}

bool result = pg_copy_data(src, dst, args);
bool result = pg_copy_data(src, dst, args, stats, context, callback);

if (srcConnIsOurs)
{
Expand All @@ -2700,7 +2704,9 @@ pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args)
* expects src and dst are opened connection and doesn't manage their lifetime.
*/
static bool
pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args)
pg_copy_data(PGSQL *src, PGSQL *dst,
CopyArgs *args, CopyStats *stats,
void *context, CopyStatsCallback *callback)
{
PGconn *srcConn = src->connection;
PGconn *dstConn = dst->connection;
Expand All @@ -2719,7 +2725,10 @@ pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args)
}
}

/* cannot perform COPY FREEZE if the table was not created or truncated in the current subtransaction */
/*
* COPY FREEZE is only accepted by Postgres if the table was created or
* truncated in the current transaction.
*/
args->freeze &= args->truncate;

/* make sure to log TRUNCATE before we log COPY, avoid confusion */
Expand All @@ -2741,7 +2750,10 @@ pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args)
char *copybuf;
bool failedOnSrc = false;
bool failedOnDst = false;
args->bytesTransmitted = 0;

/* also init and maintain copy statistics */
stats->startTime = time(NULL);
stats->bytesTransmitted = 0;

for (;;)
{
Expand Down Expand Up @@ -2848,7 +2860,20 @@ pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args)
*/
else if (bufsize > 0)
{
args->bytesTransmitted += bufsize;
stats->bytesTransmitted += bufsize;

if (callback != NULL)
{
/*
* Allow the Copy Stats user callback to fail, still continue
* with the copy.
*/
if (!(*callback)(context, stats))
{
log_debug("Copy Stats Callback failed, "
"see above for details");
}
}
}

/*
Expand Down
13 changes: 12 additions & 1 deletion src/bin/pgcopydb/pgsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,18 @@ typedef struct CopyArgs
uint64_t bytesTransmitted;
} CopyArgs;

bool pg_copy(PGSQL *src, PGSQL *dst, CopyArgs *args);

typedef struct CopyStats
{
uint64_t startTime;
uint64_t bytesTransmitted;
} CopyStats;

typedef bool (CopyStatsCallback)(void *context, CopyStats *stats);

bool pg_copy(PGSQL *src, PGSQL *dst,
CopyArgs *args, CopyStats *stats,
void *context, CopyStatsCallback *callback);

bool pg_copy_from_stdin(PGSQL *pgsql, const char *qname);
bool pg_copy_row_from_stdin(PGSQL *pgsql, char *fmt, ...);
Expand Down
77 changes: 77 additions & 0 deletions src/bin/pgcopydb/summary.c
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,83 @@ summary_finish_table(DatabaseCatalog *catalog, CopyTableDataSpec *tableSpecs)
}


/*
* summary_update_table_copy_stats UPDATEs a summary entry to our internal
* catalogs database with the current COPY statistics, typically while the COPY
* command is running.
*/
bool
summary_update_table_copy_stats(DatabaseCatalog *catalog,
CopyTableDataSpec *tableSpecs)
{
sqlite3 *db = catalog->db;

if (db == NULL)
{
log_error("BUG: summary_update_table_copy_stats: db is NULL");
return false;
}

SourceTable *table = tableSpecs->sourceTable;
CopyTableSummary *tableSummary = &(tableSpecs->summary);

char *sql =
"update summary set duration = $1, bytes = $2 "
"where pid = $3 and tableoid = $4 and partnum = $5";

if (!semaphore_lock(&(catalog->sema)))
{
/* errors have already been logged */
return false;
}

SQLiteQuery query = { 0 };

if (!catalog_sql_prepare(db, sql, &query))
{
/* errors have already been logged */
(void) semaphore_unlock(&(catalog->sema));
return false;
}

/* bind our parameters now */
BindParam params[] = {
{ BIND_PARAMETER_TYPE_INT64, "duration",
tableSummary->durationMs, NULL },

{ BIND_PARAMETER_TYPE_INT64, "bytes",
tableSummary->bytesTransmitted, NULL },

{ BIND_PARAMETER_TYPE_INT64, "pid", getpid(), NULL },
{ BIND_PARAMETER_TYPE_INT64, "tableoid", table->oid, NULL },

{ BIND_PARAMETER_TYPE_INT64, "partnum",
table->partition.partNumber, NULL }
};

int count = sizeof(params) / sizeof(params[0]);

if (!catalog_sql_bind(&query, params, count))
{
/* errors have already been logged */
(void) semaphore_unlock(&(catalog->sema));
return false;
}

/* now execute the query, which does not return any row */
if (!catalog_sql_execute_once(&query))
{
/* errors have already been logged */
(void) semaphore_unlock(&(catalog->sema));
return false;
}

(void) semaphore_unlock(&(catalog->sema));

return true;
}


/*
* summary_table_all_parts_done sets tableSpecs->allPartsAreDone to true when
* all the parts have already been done in the summary table of our internal
Expand Down
76 changes: 74 additions & 2 deletions src/bin/pgcopydb/table-data.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@


static bool copydb_copy_supervisor_add_table_hook(void *ctx, SourceTable *table);
static bool copydb_update_copy_stats_hook(void *ctx, CopyStats *stats);

/*
* copydb_table_data fetches the list of tables from the source database and
Expand Down Expand Up @@ -1287,6 +1288,14 @@ copydb_table_parts_are_all_done(CopyDataSpec *specs,
}


typedef struct UpdateCopyStatsContext
{
CopyDataSpec *specs;
CopyTableDataSpec *tableSpecs;
uint64_t lastWrite;
} UpdateCopyStatsContext;


/*
* copydb_copy_table implements the sub-process activity to pg_dump |
* pg_restore the table's data and then create the indexes and the constraints
Expand All @@ -1306,6 +1315,7 @@ copydb_copy_table(CopyDataSpec *specs, PGSQL *src, PGSQL *dst,

/* Now copy the data from source to target */
CopyTableSummary *summary = &(tableSpecs->summary);
CopyStats stats = { 0 };

int attempts = 0;
int maxAttempts = 5; /* allow 5 attempts total, 4 retries */
Expand All @@ -1317,8 +1327,19 @@ copydb_copy_table(CopyDataSpec *specs, PGSQL *src, PGSQL *dst,
{
++attempts;

/* re-init stats between attempts */
CopyStats empty = { 0 };
stats = empty;

UpdateCopyStatsContext context = {
.specs = specs,
.tableSpecs = tableSpecs
};

/* ignore previous attempts, we need only one success here */
success = pg_copy(src, dst, &(tableSpecs->copyArgs));
success = pg_copy(src, dst,
&(tableSpecs->copyArgs), &stats,
&context, &copydb_update_copy_stats_hook);

if (success)
{
Expand Down Expand Up @@ -1369,12 +1390,63 @@ copydb_copy_table(CopyDataSpec *specs, PGSQL *src, PGSQL *dst,
}

/* publish bytesTransmitted accumulated value to the summary */
summary->bytesTransmitted = tableSpecs->copyArgs.bytesTransmitted;
summary->bytesTransmitted = stats.bytesTransmitted;

return success;
}


/*
* copydb_update_copy_stats_hook updates the bytesTransmitted data in our
* SQLite summary.
*/
static bool
copydb_update_copy_stats_hook(void *ctx, CopyStats *stats)
{
UpdateCopyStatsContext *context = (UpdateCopyStatsContext *) ctx;

DatabaseCatalog *sourceDB = &(context->specs->catalogs.source);
CopyTableDataSpec *tableSpecs = context->tableSpecs;
CopyTableSummary *summary = &(tableSpecs->summary);

uint64_t now = time(NULL);

if (context->lastWrite == 0)
{
context->lastWrite = now;
}

/* refrain from updating the statistics too often */
const uint64_t WRITE_INTERVAL_SECS = 5;

if ((now - context->lastWrite) < WRITE_INTERVAL_SECS)
{
return true;
}

/* update tablespecs summary durationMs and bytesTransmitted */
summary->bytesTransmitted = stats->bytesTransmitted;

instr_time duration;

INSTR_TIME_SET_CURRENT(duration);
INSTR_TIME_SUBTRACT(duration, summary->startTimeInstr);

summary->durationMs = INSTR_TIME_GET_MILLISEC(duration);

if (!summary_update_table_copy_stats(sourceDB, tableSpecs))
{
/* errors have already been logged */
return false;
}

/* keep track of when we wrote last time */
context->lastWrite = now;

return true;
}


/*
* copydb_prepare_copy_query prepares a COPY query using the list of attribute
* names from the SourceTable instance.
Expand Down
6 changes: 3 additions & 3 deletions tests/blobs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
test: down run down ;

run: build
docker-compose run test
docker compose run test

down:
docker-compose down
docker compose down

build:
docker-compose build --quiet
docker compose build --quiet

import:
psql --single-transaction --no-psqlrc -f import.sql
Expand Down
6 changes: 3 additions & 3 deletions tests/cdc-endpos-between-transaction/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
test: down run down ;

run: build
docker-compose run test
docker compose run test

down:
docker-compose down
docker compose down

build:
docker-compose build --quiet
docker compose build --quiet

.PHONY: run down build test
6 changes: 3 additions & 3 deletions tests/cdc-low-level/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
test: down run down ;

run: build
docker-compose run test
docker compose run test

down:
docker-compose down
docker compose down

build:
docker-compose build --quiet
docker compose build --quiet

.PHONY: run down build test
Loading

0 comments on commit 528a1a2

Please sign in to comment.