diff --git a/src/bin/pgcopydb/catalog.c b/src/bin/pgcopydb/catalog.c index 9378dc19..64027caf 100644 --- a/src/bin/pgcopydb/catalog.c +++ b/src/bin/pgcopydb/catalog.c @@ -186,7 +186,10 @@ static char *sourceDBcreateDDLs[] = { "create table sentinel(" " id integer primary key check (id = 1), " " startpos pg_lsn, endpos pg_lsn, apply bool, " - " write_lsn pg_lsn, flush_lsn pg_lsn, replay_lsn pg_lsn)" + " write_lsn pg_lsn, flush_lsn pg_lsn, replay_lsn pg_lsn)", + + "create table timeline_history(" + " tli integer primary key, startpos pg_lsn, endpos pg_lsn)" }; @@ -437,7 +440,8 @@ static char *sourceDBdropDDLs[] = { "drop table if exists s_table_parts_done", "drop table if exists s_table_indexes_done", - "drop table if exists sentinel" + "drop table if exists sentinel", + "drop table if exists timeline_history" }; @@ -7703,6 +7707,171 @@ catalog_count_summary_done_fetch(SQLiteQuery *query) } +/* + * catalog_add_timeline_history inserts a timeline history entry to our + * internal catalogs database. + */ +bool +catalog_add_timeline_history(DatabaseCatalog *catalog, TimelineHistoryEntry *entry) +{ + if (catalog == NULL) + { + log_error("BUG: catalog_add_timeline_history: catalog is NULL"); + return false; + } + + sqlite3 *db = catalog->db; + + if (db == NULL) + { + log_error("BUG: catalog_add_timeline_history: db is NULL"); + return false; + } + + char *sql = + "insert or replace into timeline_history(tli, startpos, endpos)" + "values($1, $2, $3)"; + + SQLiteQuery query = { 0 }; + + if (!catalog_sql_prepare(db, sql, &query)) + { + /* errors have already been logged */ + return false; + } + + char slsn[PG_LSN_MAXLENGTH] = { 0 }; + char elsn[PG_LSN_MAXLENGTH] = { 0 }; + + sformat(slsn, sizeof(slsn), "%X/%X", LSN_FORMAT_ARGS(entry->begin)); + sformat(elsn, sizeof(elsn), "%X/%X", LSN_FORMAT_ARGS(entry->end)); + + /* bind our parameters now */ + BindParam params[] = { + { BIND_PARAMETER_TYPE_INT, "tli", entry->tli, NULL }, + { BIND_PARAMETER_TYPE_TEXT, "startpos", 0, slsn }, + { BIND_PARAMETER_TYPE_TEXT, "endpos", 0, elsn } + }; + + int count = sizeof(params) / sizeof(params[0]); + + if (!catalog_sql_bind(&query, params, count)) + { + /* errors have already been logged */ + return false; + } + + /* now execute the query, which does not return any row */ + if (!catalog_sql_execute_once(&query)) + { + /* errors have already been logged */ + return false; + } + + return true; +} + + +/* + * catalog_lookup_timeline_history fetches the current TimelineHistoryEntry + * from our catalogs. + */ +bool +catalog_lookup_timeline_history(DatabaseCatalog *catalog, + int tli, + TimelineHistoryEntry *entry) +{ + sqlite3 *db = catalog->db; + + if (db == NULL) + { + log_error("BUG: catalog_lookup_timeline_history: db is NULL"); + return false; + } + + SQLiteQuery query = { + .context = entry, + .fetchFunction = &catalog_timeline_history_fetch + }; + + char *sql = + " select tli, startpos, endpos" + " from timeline_history" + " where tli = $1"; + + + if (!catalog_sql_prepare(db, sql, &query)) + { + /* errors have already been logged */ + return false; + } + + /* bind our parameters now */ + BindParam params[] = { + { BIND_PARAMETER_TYPE_INT, "tli", tli, NULL } + }; + + int count = sizeof(params) / sizeof(params[0]); + + if (!catalog_sql_bind(&query, params, count)) + { + /* errors have already been logged */ + return false; + } + + /* now execute the query, which return exactly one row */ + if (!catalog_sql_execute_once(&query)) + { + /* errors have already been logged */ + return false; + } + + return true; +} + + +/* + * catalog_timeline_history_fetch fetches a TimelineHistoryEntry from a query + * ppStmt result. + */ +bool +catalog_timeline_history_fetch(SQLiteQuery *query) +{ + TimelineHistoryEntry *entry = (TimelineHistoryEntry *) query->context; + + bzero(entry, sizeof(TimelineHistoryEntry)); + + /* tli */ + entry->tli = sqlite3_column_int(query->ppStmt, 0); + + /* begin LSN */ + if (sqlite3_column_type(query->ppStmt, 1) != SQLITE_NULL) + { + const char *startpos = (const char *) sqlite3_column_text(query->ppStmt, 1); + + if (!parseLSN(startpos, &entry->begin)) + { + log_error("Failed to parse LSN from \"%s\"", startpos); + return false; + } + } + + /* end LSN */ + if (sqlite3_column_type(query->ppStmt, 2) != SQLITE_NULL) + { + const char *endpos = (const char *) sqlite3_column_text(query->ppStmt, 2); + + if (!parseLSN(endpos, &entry->end)) + { + log_error("Failed to parse LSN from \"%s\"", endpos); + return false; + } + } + + return true; +} + + /* * catalog_execute executes sqlite query */ diff --git a/src/bin/pgcopydb/catalog.h b/src/bin/pgcopydb/catalog.h index 1971363d..199081a2 100644 --- a/src/bin/pgcopydb/catalog.h +++ b/src/bin/pgcopydb/catalog.h @@ -627,6 +627,16 @@ bool catalog_count_summary_done(DatabaseCatalog *catalog, bool catalog_count_summary_done_fetch(SQLiteQuery *query); +/* + * Logical decoding + */ +bool catalog_add_timeline_history(DatabaseCatalog *catalog, + TimelineHistoryEntry *entry); +bool catalog_lookup_timeline_history(DatabaseCatalog *catalog, + int tli, + TimelineHistoryEntry *entry); +bool catalog_timeline_history_fetch(SQLiteQuery *query); + /* * Internal tooling for catalogs management */ diff --git a/src/bin/pgcopydb/copydb.c b/src/bin/pgcopydb/copydb.c index ae4a9c27..d299b482 100644 --- a/src/bin/pgcopydb/copydb.c +++ b/src/bin/pgcopydb/copydb.c @@ -368,10 +368,6 @@ copydb_prepare_filepaths(CopyFilePaths *cfPaths, "%s/slot", cfPaths->cdc.dir); - sformat(cfPaths->cdc.tlihistfile, MAXPGPATH, - "%s/tli.history", - cfPaths->cdc.dir); - sformat(cfPaths->cdc.tlifile, MAXPGPATH, "%s/tli", cfPaths->cdc.dir); diff --git a/src/bin/pgcopydb/copydb_paths.h b/src/bin/pgcopydb/copydb_paths.h index f35679ca..a9129d6d 100644 --- a/src/bin/pgcopydb/copydb_paths.h +++ b/src/bin/pgcopydb/copydb_paths.h @@ -18,7 +18,6 @@ typedef struct CDCPaths char slotfile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/slot */ char walsegsizefile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/wal_segment_size */ char tlifile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/tli */ - char tlihistfile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/tli.history */ char lsntrackingfile[MAXPGPATH]; /* /tmp/pgcopydb/cdc/lsn.json */ } CDCPaths; diff --git a/src/bin/pgcopydb/follow.c b/src/bin/pgcopydb/follow.c index d8757a55..c2ad996a 100644 --- a/src/bin/pgcopydb/follow.c +++ b/src/bin/pgcopydb/follow.c @@ -419,12 +419,11 @@ follow_prepare_mode_switch(StreamSpecs *streamSpecs, if (streamSpecs->system.timeline == 0) { - if (!stream_read_context(&(streamSpecs->paths), - &(streamSpecs->system), - &(streamSpecs->WalSegSz))) + if (!stream_read_context(streamSpecs)) { log_error("Failed to read the streaming context information " - "from the source database, see above for details"); + "from the source database and internal catalogs, " + "see above for details"); return false; } } diff --git a/src/bin/pgcopydb/ld_apply.c b/src/bin/pgcopydb/ld_apply.c index 83488bc9..4a6b8eb4 100644 --- a/src/bin/pgcopydb/ld_apply.c +++ b/src/bin/pgcopydb/ld_apply.c @@ -208,12 +208,11 @@ stream_apply_setup(StreamSpecs *specs, StreamApplyContext *context) if (specs->system.timeline == 0) { - if (!stream_read_context(&(specs->paths), - &(specs->system), - &(specs->WalSegSz))) + if (!stream_read_context(specs)) { log_error("Failed to read the streaming context information " - "from the source database, see above for details"); + "from the source database and internal catalogs, " + "see above for details"); return false; } } diff --git a/src/bin/pgcopydb/ld_stream.c b/src/bin/pgcopydb/ld_stream.c index 0c4623ef..73a11ed2 100644 --- a/src/bin/pgcopydb/ld_stream.c +++ b/src/bin/pgcopydb/ld_stream.c @@ -24,8 +24,9 @@ #include "lock_utils.h" #include "log.h" #include "parsing_utils.h" -#include "pidfile.h" #include "pg_utils.h" +#include "pgsql_timeline.h" +#include "pidfile.h" #include "schema.h" #include "signals.h" #include "string_utils.h" @@ -429,6 +430,7 @@ startLogicalStreaming(StreamSpecs *specs) stream.closeFunction = &streamClose; stream.feedbackFunction = &streamFeedback; stream.keepaliveFunction = &streamKeepalive; + strlcpy(stream.cdcPathDir, specs->paths.dir, MAXPGPATH); /* * Read possibly already existing file to initialize the start LSN from a @@ -2613,8 +2615,8 @@ stream_fetch_current_lsn(uint64_t *lsn, /* - * stream_write_context writes the wal_segment_size and timeline history to - * files. + * stream_write_context writes the wal_segment_size and tli to files, as well as + * populate our internal catalogs with information in the timeline history file. */ bool stream_write_context(StreamSpecs *specs, LogicalStreamClient *stream) @@ -2652,16 +2654,16 @@ stream_write_context(StreamSpecs *specs, LogicalStreamClient *stream) log_debug("Wrote tli %s timeline file \"%s\"", tli, specs->paths.tlifile); - if (!write_file(system->timelines.content, - strlen(system->timelines.content), - specs->paths.tlihistfile)) + /* read from the timeline history file and populate internal catalogs */ + if (stream->system.timeline > 1 && + !parse_timeline_history_file(stream->system.timelineHistoryFilename, + specs->sourceDB, + stream->system.timeline)) { /* errors have already been logged */ return false; } - log_debug("Wrote timeline history file \"%s\"", specs->paths.tlihistfile); - return true; } @@ -2677,7 +2679,6 @@ stream_cleanup_context(StreamSpecs *specs) success = success && unlink_file(specs->paths.walsegsizefile); success = success && unlink_file(specs->paths.tlifile); - success = success && unlink_file(specs->paths.tlihistfile); /* reset the timeline, so that we always read from the disk */ specs->system.timeline = 0; @@ -2691,14 +2692,14 @@ stream_cleanup_context(StreamSpecs *specs) * wal_segment_size and timeline history. */ bool -stream_read_context(CDCPaths *paths, - IdentifySystem *system, - uint32_t *WalSegSz) +stream_read_context(StreamSpecs *specs) { + CDCPaths *paths = &(specs->paths); + IdentifySystem *system = &(specs->system); + uint32_t *WalSegSz = &(specs->WalSegSz); + char *wal_segment_size = NULL; char *tli = NULL; - char *history = NULL; - long size = 0L; /* @@ -2727,8 +2728,7 @@ stream_read_context(CDCPaths *paths, } if (file_exists(paths->walsegsizefile) && - file_exists(paths->tlifile) && - file_exists(paths->tlihistfile)) + file_exists(paths->tlifile)) { /* * File did exist, but might have been deleted now (race condition @@ -2742,9 +2742,6 @@ stream_read_context(CDCPaths *paths, success = success && read_file(paths->tlifile, &tli, &size); - success = success && - read_file(paths->tlihistfile, &history, &size); - if (success) { /* success: break out of the retry loop */ @@ -2765,8 +2762,7 @@ stream_read_context(CDCPaths *paths, /* did retry policy expire before the files are created? */ if (!(file_exists(paths->walsegsizefile) && - file_exists(paths->tlifile) && - file_exists(paths->tlihistfile))) + file_exists(paths->tlifile))) { log_error("Failed to read stream context file: retry policy expired"); return false; @@ -2787,12 +2783,13 @@ stream_read_context(CDCPaths *paths, return false; } - if (!parseTimeLineHistory(paths->tlihistfile, history, system)) + DatabaseCatalog *source = specs->sourceDB; + if (!catalog_lookup_timeline_history(source, system->timeline, + &system->currentTimeline)) { /* errors have already been logged */ return false; } - return true; } diff --git a/src/bin/pgcopydb/ld_stream.h b/src/bin/pgcopydb/ld_stream.h index 2aa0607e..beb0860d 100644 --- a/src/bin/pgcopydb/ld_stream.h +++ b/src/bin/pgcopydb/ld_stream.h @@ -620,9 +620,7 @@ bool stream_fetch_current_lsn(uint64_t *lsn, bool stream_write_context(StreamSpecs *specs, LogicalStreamClient *stream); bool stream_cleanup_context(StreamSpecs *specs); -bool stream_read_context(CDCPaths *paths, - IdentifySystem *system, - uint32_t *WalSegSz); +bool stream_read_context(StreamSpecs *specs); StreamAction StreamActionFromChar(char action); char * StreamActionToString(StreamAction action); diff --git a/src/bin/pgcopydb/ld_transform.c b/src/bin/pgcopydb/ld_transform.c index 0da592c0..370c5d9f 100644 --- a/src/bin/pgcopydb/ld_transform.c +++ b/src/bin/pgcopydb/ld_transform.c @@ -283,12 +283,11 @@ stream_transform_resume(StreamSpecs *specs) /* we need timeline and wal_segment_size to compute WAL filenames */ if (specs->system.timeline == 0) { - if (!stream_read_context(&(specs->paths), - &(specs->system), - &(specs->WalSegSz))) + if (!stream_read_context(specs)) { log_error("Failed to read the streaming context information " - "from the source database, see above for details"); + "from the source database and internal catalogs, " + "see above for details"); return false; } } @@ -669,9 +668,9 @@ stream_transform_worker(StreamSpecs *specs) * The timeline and wal segment size are determined when connecting to the * source database, and stored to local files at that time. When the Stream * Transform Worker process is created, that information is read from our - * local files. + * local files and internal catalogs. */ - if (!stream_read_context(&(specs->paths), &(specs->system), &(specs->WalSegSz))) + if (!stream_read_context(specs)) { if (asked_to_stop || asked_to_stop_fast || asked_to_quit) { @@ -680,7 +679,8 @@ stream_transform_worker(StreamSpecs *specs) } log_error("Failed to read the streaming context information " - "from the source database, see above for details"); + "from the source database and internal catalogs, " + "see above for details"); return false; } diff --git a/src/bin/pgcopydb/pgsql.c b/src/bin/pgcopydb/pgsql.c index 89e49bdf..80864267 100644 --- a/src/bin/pgcopydb/pgsql.c +++ b/src/bin/pgcopydb/pgsql.c @@ -26,6 +26,8 @@ #include "log.h" #include "parsing_utils.h" #include "pgsql.h" +#include "pgsql_timeline.h" +#include "pgsql_utils.h" #include "pg_utils.h" #include "signals.h" #include "string_utils.h" @@ -42,8 +44,6 @@ static void log_connection_error(PGconn *connection, int logLevel); static void pgAutoCtlDefaultNoticeProcessor(void *arg, const char *message); static bool pgsql_retry_open_connection(PGSQL *pgsql); -static bool is_response_ok(PGresult *result); -static bool clear_results(PGSQL *pgsql); static void pgsql_handle_notifications(PGSQL *pgsql); static void pgsql_execute_log_error(PGSQL *pgsql, @@ -56,9 +56,6 @@ static bool build_parameters_list(PQExpBuffer buffer, int paramCount, const char **paramValues); -static void parseIdentifySystemResult(void *ctx, PGresult *result); -static void parseTimelineHistoryResult(void *ctx, PGresult *result); - static bool pg_copy_data(PGSQL *src, PGSQL *dst, CopyArgs *args); static bool pg_copy_send_query(PGSQL *pgsql, CopyArgs *args, @@ -2431,7 +2428,7 @@ build_parameters_list(PQExpBuffer buffer, * is_response_ok returns whether the query result is a correct response * (not an error or failure). */ -static bool +bool is_response_ok(PGresult *result) { ExecStatusType resultStatus = PQresultStatus(result); @@ -2484,7 +2481,7 @@ pgsql_state_is_connection_error(PGSQL *pgsql) * clear_results consumes results on a connection until NULL is returned. * If an error is returned it returns false. */ -static bool +bool clear_results(PGSQL *pgsql) { PGconn *connection = pgsql->connection; @@ -3294,369 +3291,6 @@ getSequenceValue(void *ctx, PGresult *result) } -typedef struct IdentifySystemResult -{ - char sqlstate[6]; - bool parsedOk; - IdentifySystem *system; -} IdentifySystemResult; - - -typedef struct TimelineHistoryResult -{ - char sqlstate[6]; - bool parsedOk; - char filename[MAXPGPATH]; - char content[BUFSIZE * BUFSIZE]; /* 1MB should get us quite very far */ -} TimelineHistoryResult; - -/* - * pgsql_identify_system connects to the given pgsql client and issue the - * replication command IDENTIFY_SYSTEM. The pgsql connection string should - * contain the 'replication=1' parameter. - */ -bool -pgsql_identify_system(PGSQL *pgsql, IdentifySystem *system) -{ - bool connIsOurs = pgsql->connection == NULL; - - PGconn *connection = pgsql_open_connection(pgsql); - if (connection == NULL) - { - /* error message was logged in pgsql_open_connection */ - return false; - } - - /* extended query protocol not supported in a replication connection */ - PGresult *result = PQexec(connection, "IDENTIFY_SYSTEM"); - - if (!is_response_ok(result)) - { - log_error("Failed to IDENTIFY_SYSTEM: %s", PQerrorMessage(connection)); - PQclear(result); - clear_results(pgsql); - - PQfinish(connection); - - return false; - } - - IdentifySystemResult isContext = { { 0 }, false, system }; - - (void) parseIdentifySystemResult((void *) &isContext, result); - - PQclear(result); - clear_results(pgsql); - - log_sql("IDENTIFY_SYSTEM: timeline %d, xlogpos %s, systemid %" PRIu64, - system->timeline, - system->xlogpos, - system->identifier); - - if (!isContext.parsedOk) - { - log_error("Failed to get result from IDENTIFY_SYSTEM"); - PQfinish(connection); - return false; - } - - /* while at it, we also run the TIMELINE_HISTORY command */ - if (system->timeline > 1) - { - TimelineHistoryResult hContext = { 0 }; - - char sql[BUFSIZE] = { 0 }; - sformat(sql, sizeof(sql), "TIMELINE_HISTORY %d", system->timeline); - - result = PQexec(connection, sql); - - if (!is_response_ok(result)) - { - log_error("Failed to request TIMELINE_HISTORY: %s", - PQerrorMessage(connection)); - PQclear(result); - clear_results(pgsql); - - PQfinish(connection); - - return false; - } - - (void) parseTimelineHistoryResult((void *) &hContext, result); - - PQclear(result); - clear_results(pgsql); - - if (!hContext.parsedOk) - { - log_error("Failed to get result from TIMELINE_HISTORY"); - PQfinish(connection); - return false; - } - - if (!parseTimeLineHistory(hContext.filename, hContext.content, system)) - { - /* errors have already been logged */ - PQfinish(connection); - return false; - } - - TimeLineHistoryEntry *current = - &(system->timelines.history[system->timelines.count - 1]); - - log_sql("TIMELINE_HISTORY: \"%s\", timeline %d started at %X/%X", - hContext.filename, - current->tli, - (uint32_t) (current->begin >> 32), - (uint32_t) current->begin); - } - - if (connIsOurs) - { - (void) pgsql_finish(pgsql); - } - - return true; -} - - -/* - * parsePgMetadata parses the result from a PostgreSQL query fetching - * two columns from pg_stat_replication: sync_state and currentLSN. - */ -static void -parseIdentifySystemResult(void *ctx, PGresult *result) -{ - IdentifySystemResult *context = (IdentifySystemResult *) ctx; - - if (PQnfields(result) != 4) - { - log_error("Query returned %d columns, expected 4", PQnfields(result)); - context->parsedOk = false; - return; - } - - if (PQntuples(result) == 0) - { - log_sql("parseIdentifySystem: query returned no rows"); - context->parsedOk = false; - return; - } - if (PQntuples(result) != 1) - { - log_error("Query returned %d rows, expected 1", PQntuples(result)); - context->parsedOk = false; - return; - } - - /* systemid (text) */ - char *value = PQgetvalue(result, 0, 0); - if (!stringToUInt64(value, &(context->system->identifier))) - { - log_error("Failed to parse system_identifier \"%s\"", value); - context->parsedOk = false; - return; - } - - /* timeline (int4) */ - value = PQgetvalue(result, 0, 1); - if (!stringToUInt32(value, &(context->system->timeline))) - { - log_error("Failed to parse timeline \"%s\"", value); - context->parsedOk = false; - return; - } - - /* xlogpos (text) */ - value = PQgetvalue(result, 0, 2); - strlcpy(context->system->xlogpos, value, PG_LSN_MAXLENGTH); - - /* dbname (text) Database connected to or null */ - if (!PQgetisnull(result, 0, 3)) - { - value = PQgetvalue(result, 0, 3); - strlcpy(context->system->dbname, value, NAMEDATALEN); - } - - context->parsedOk = true; -} - - -/* - * parseTimelineHistory parses the result of the TIMELINE_HISTORY replication - * command. - */ -static void -parseTimelineHistoryResult(void *ctx, PGresult *result) -{ - TimelineHistoryResult *context = (TimelineHistoryResult *) ctx; - - if (PQnfields(result) != 2) - { - log_error("Query returned %d columns, expected 2", PQnfields(result)); - context->parsedOk = false; - return; - } - - if (PQntuples(result) == 0) - { - log_sql("parseTimelineHistory: query returned no rows"); - context->parsedOk = false; - return; - } - - if (PQntuples(result) != 1) - { - log_error("Query returned %d rows, expected 1", PQntuples(result)); - context->parsedOk = false; - return; - } - - /* filename (text) */ - char *value = PQgetvalue(result, 0, 0); - strlcpy(context->filename, value, sizeof(context->filename)); - - /* content (bytea) */ - value = PQgetvalue(result, 0, 1); - - if (strlen(value) >= sizeof(context->content)) - { - log_error("Received a timeline history file of %zu bytes, " - "pgcopydb is limited to files of up to %zu bytes.", - strlen(value), - sizeof(context->content)); - context->parsedOk = false; - } - strlcpy(context->content, value, sizeof(context->content)); - - context->parsedOk = true; -} - - -/* - * parseTimeLineHistory parses the content of a timeline history file. - */ -bool -parseTimeLineHistory(const char *filename, const char *content, - IdentifySystem *system) -{ - LinesBuffer lbuf = { 0 }; - - if (!splitLines(&lbuf, (char *) content)) - { - /* errors have already been logged */ - return false; - } - - if (lbuf.count >= PGCOPYDB_MAX_TIMELINES) - { - log_error("history file \"%s\" contains %lld lines, " - "pgcopydb only supports up to %d lines", - filename, (long long) lbuf.count, PGCOPYDB_MAX_TIMELINES - 1); - return false; - } - - /* keep the original content around */ - strlcpy(system->timelines.filename, filename, MAXPGPATH); - strlcpy(system->timelines.content, content, PGCOPYDB_MAX_TIMELINE_CONTENT); - - uint64_t prevend = InvalidXLogRecPtr; - - system->timelines.count = 0; - - TimeLineHistoryEntry *entry = - &(system->timelines.history[system->timelines.count]); - - for (uint64_t lineNumber = 0; lineNumber < lbuf.count; lineNumber++) - { - char *ptr = lbuf.lines[lineNumber]; - - /* skip leading whitespace and check for # comment */ - for (; *ptr; ptr++) - { - if (!isspace((unsigned char) *ptr)) - { - break; - } - } - - if (*ptr == '\0' || *ptr == '#') - { - continue; - } - - log_trace("parseTimeLineHistory line %lld is \"%s\"", - (long long) lineNumber, - lbuf.lines[lineNumber]); - - char *tabptr = strchr(lbuf.lines[lineNumber], '\t'); - - if (tabptr == NULL) - { - log_error("Failed to parse history file line %lld: \"%s\"", - (long long) lineNumber, ptr); - return false; - } - - *tabptr = '\0'; - - if (!stringToUInt(lbuf.lines[lineNumber], &(entry->tli))) - { - log_error("Failed to parse history timeline \"%s\"", tabptr); - return false; - } - - char *lsn = tabptr + 1; - - for (char *lsnend = lsn; *lsnend; lsnend++) - { - if (!(isxdigit((unsigned char) *lsnend) || *lsnend == '/')) - { - *lsnend = '\0'; - break; - } - } - - if (!parseLSN(lsn, &(entry->end))) - { - log_error("Failed to parse history timeline %d LSN \"%s\"", - entry->tli, lsn); - return false; - } - - entry->begin = prevend; - prevend = entry->end; - - log_trace("parseTimeLineHistory[%d]: tli %d [%X/%X %X/%X]", - system->timelines.count, - entry->tli, - LSN_FORMAT_ARGS(entry->begin), - LSN_FORMAT_ARGS(entry->end)); - - entry = &(system->timelines.history[++system->timelines.count]); - } - - /* - * Create one more entry for the "tip" of the timeline, which has no entry - * in the history file. - */ - entry->tli = system->timeline; - entry->begin = prevend; - entry->end = InvalidXLogRecPtr; - - log_trace("parseTimeLineHistory[%d]: tli %d [%X/%X %X/%X]", - system->timelines.count, - entry->tli, - LSN_FORMAT_ARGS(entry->begin), - LSN_FORMAT_ARGS(entry->end)); - - /* fix the off-by-one so that the count is a count, not an index */ - ++system->timelines.count; - - return true; -} - - /* * pgsql_set_gucs sets the given GUC array in the current session attached to * the pgsql client. @@ -4236,7 +3870,7 @@ pgsql_start_replication(LogicalStreamClient *client) } /* fetch the source timeline */ - if (!pgsql_identify_system(pgsql, &(client->system))) + if (!pgsql_identify_system(pgsql, &(client->system), client->cdcPathDir)) { /* errors have already been logged */ destroyPQExpBuffer(query); diff --git a/src/bin/pgcopydb/pgsql.h b/src/bin/pgcopydb/pgsql.h index 55405ce0..b548f6bb 100644 --- a/src/bin/pgcopydb/pgsql.h +++ b/src/bin/pgcopydb/pgsql.h @@ -367,10 +367,10 @@ bool pg_copy_large_object(PGSQL *src, #define PG_LSN_MAXLENGTH 18 /* - * TimeLineHistoryEntry is taken from Postgres definitions and adapted to + * TimelineHistoryEntry is taken from Postgres definitions and adapted to * client-size code where we don't have all the necessary infrastruture. In * particular we don't define a XLogRecPtr data type nor do we define a - * TimeLineID data type. + * TimelineID data type. * * Zero is used indicate an invalid pointer. Bootstrap skips the first possible * WAL segment, initializing the first WAL page at WAL segment size, so no XLOG @@ -379,25 +379,13 @@ bool pg_copy_large_object(PGSQL *src, #define InvalidXLogRecPtr 0 #define XLogRecPtrIsInvalid(r) ((r) == InvalidXLogRecPtr) -#define PGCOPYDB_MAX_TIMELINES 1024 -#define PGCOPYDB_MAX_TIMELINE_CONTENT (1024 * 1024) - -typedef struct TimeLineHistoryEntry +typedef struct TimelineHistoryEntry { uint32_t tli; uint64_t begin; /* inclusive */ uint64_t end; /* exclusive, InvalidXLogRecPtr means infinity */ -} TimeLineHistoryEntry; - - -typedef struct TimeLineHistory -{ - int count; - TimeLineHistoryEntry history[PGCOPYDB_MAX_TIMELINES]; +} TimelineHistoryEntry; - char filename[MAXPGPATH]; - char content[PGCOPYDB_MAX_TIMELINE_CONTENT]; -} TimeLineHistory; /* * The IdentifySystem contains information that is parsed from the @@ -409,12 +397,10 @@ typedef struct IdentifySystem uint32_t timeline; char xlogpos[PG_LSN_MAXLENGTH]; char dbname[NAMEDATALEN]; - TimeLineHistory timelines; + TimelineHistoryEntry currentTimeline; + char timelineHistoryFilename[MAXPGPATH]; } IdentifySystem; -bool pgsql_identify_system(PGSQL *pgsql, IdentifySystem *system); -bool parseTimeLineHistory(const char *filename, const char *content, - IdentifySystem *system); /* * Logical Decoding support. @@ -470,6 +456,7 @@ typedef struct LogicalStreamClient { PGSQL pgsql; IdentifySystem system; + char cdcPathDir[MAXPGPATH]; char slotName[NAMEDATALEN]; diff --git a/src/bin/pgcopydb/pgsql_timeline.c b/src/bin/pgcopydb/pgsql_timeline.c new file mode 100644 index 00000000..8fbc0686 --- /dev/null +++ b/src/bin/pgcopydb/pgsql_timeline.c @@ -0,0 +1,394 @@ +/* + * src/bin/pgcopydb/pgsql_timeline.c + * API for sending SQL commands about timelines to a PostgreSQL server + */ + +#include "catalog.h" +#include "file_utils.h" +#include "log.h" +#include "pg_utils.h" +#include "pgsql_timeline.h" +#include "pgsql_utils.h" +#include "pgsql.h" + + +typedef struct IdentifySystemResult +{ + char sqlstate[6]; + bool parsedOk; + IdentifySystem *system; +} IdentifySystemResult; + + +typedef struct TimelineHistoryResult +{ + char sqlstate[6]; + bool parsedOk; + char filename[MAXPGPATH]; +} TimelineHistoryResult; + +static void parseIdentifySystemResult(IdentifySystemResult *ctx, PGresult *result); +static void parseTimelineHistoryResult(TimelineHistoryResult *context, PGresult *result, + char *cdcPathDir); +static bool writeTimelineHistoryFile(char *filename, char *content); +static bool register_timeline_hook(void *ctx, char *line); + + +/* + * pgsql_identify_system connects to the given pgsql client and issue the + * replication command IDENTIFY_SYSTEM. The pgsql connection string should + * contain the 'replication=1' parameter. + */ +bool +pgsql_identify_system(PGSQL *pgsql, IdentifySystem *system, char *cdcPathDir) +{ + bool connIsOurs = pgsql->connection == NULL; + + PGconn *connection = pgsql_open_connection(pgsql); + if (connection == NULL) + { + /* error message was logged in pgsql_open_connection */ + return false; + } + + /* extended query protocol not supported in a replication connection */ + PGresult *result = PQexec(connection, "IDENTIFY_SYSTEM"); + + if (!is_response_ok(result)) + { + log_error("Failed to IDENTIFY_SYSTEM: %s", PQerrorMessage(connection)); + PQclear(result); + clear_results(pgsql); + + PQfinish(connection); + + return false; + } + + IdentifySystemResult isContext = { { 0 }, false, system }; + + (void) parseIdentifySystemResult((void *) &isContext, result); + + PQclear(result); + clear_results(pgsql); + + log_sql("IDENTIFY_SYSTEM: timeline %d, xlogpos %s, systemid %" PRIu64, + system->timeline, + system->xlogpos, + system->identifier); + + if (!isContext.parsedOk) + { + log_error("Failed to get result from IDENTIFY_SYSTEM"); + PQfinish(connection); + return false; + } + + /* while at it, we also run the TIMELINE_HISTORY command */ + if (system->timeline > 1) + { + TimelineHistoryResult hContext = { 0 }; + + char sql[BUFSIZE] = { 0 }; + sformat(sql, sizeof(sql), "TIMELINE_HISTORY %d", system->timeline); + + result = PQexec(connection, sql); + + if (!is_response_ok(result)) + { + log_error("Failed to request TIMELINE_HISTORY: %s", + PQerrorMessage(connection)); + PQclear(result); + clear_results(pgsql); + + PQfinish(connection); + + return false; + } + + (void) parseTimelineHistoryResult((void *) &hContext, result, cdcPathDir); + + PQclear(result); + clear_results(pgsql); + + if (!hContext.parsedOk) + { + log_error("Failed to get result from TIMELINE_HISTORY"); + PQfinish(connection); + return false; + } + + /* store the filename for the timeline history file */ + strlcpy(system->timelineHistoryFilename, hContext.filename, MAXPGPATH); + } + + if (connIsOurs) + { + (void) pgsql_finish(pgsql); + } + + return true; +} + + +/* + * writeTimelineHistoryFile writes the content of a timeline history file to disk. + * The filename is determined by the PostgreSQL TIMELINE_HISTORY command. + */ +static bool +writeTimelineHistoryFile(char *filename, char *content) +{ + log_debug("Writing timeline history file \"%s\"", filename); + + size_t size = strlen(content); + return write_file(content, size, filename); +} + + +/* + * parseIdentifySystemResult parses the result from a replication query + * IDENTIFY_SYSTEM, and fills the given IdentifySystem structure. + */ +static void +parseIdentifySystemResult(IdentifySystemResult *context, PGresult *result) +{ + if (PQnfields(result) != 4) + { + log_error("Query returned %d columns, expected 4", PQnfields(result)); + context->parsedOk = false; + return; + } + + if (PQntuples(result) == 0) + { + log_sql("parseIdentifySystem: query returned no rows"); + context->parsedOk = false; + return; + } + if (PQntuples(result) != 1) + { + log_error("Query returned %d rows, expected 1", PQntuples(result)); + context->parsedOk = false; + return; + } + + /* systemid (text) */ + char *value = PQgetvalue(result, 0, 0); + if (!stringToUInt64(value, &(context->system->identifier))) + { + log_error("Failed to parse system_identifier \"%s\"", value); + context->parsedOk = false; + return; + } + + /* timeline (int4) */ + value = PQgetvalue(result, 0, 1); + if (!stringToUInt32(value, &(context->system->timeline))) + { + log_error("Failed to parse timeline \"%s\"", value); + context->parsedOk = false; + return; + } + + /* xlogpos (text) */ + value = PQgetvalue(result, 0, 2); + strlcpy(context->system->xlogpos, value, PG_LSN_MAXLENGTH); + + /* dbname (text) Database connected to or null */ + if (!PQgetisnull(result, 0, 3)) + { + value = PQgetvalue(result, 0, 3); + strlcpy(context->system->dbname, value, NAMEDATALEN); + } + + context->parsedOk = true; +} + + +/* + * parseTimelineHistoryResult parses the result of the TIMELINE_HISTORY + * replication command. The content is written to disk, and the filename + * is stored in the TimelineHistoryResult structure. + */ +static void +parseTimelineHistoryResult(TimelineHistoryResult *context, PGresult *result, + char *cdcPathDir) +{ + if (PQnfields(result) != 2) + { + log_error("Query returned %d columns, expected 2", PQnfields(result)); + context->parsedOk = false; + return; + } + + if (PQntuples(result) == 0) + { + log_sql("parseTimelineHistoryResult: query returned no rows"); + context->parsedOk = false; + return; + } + + if (PQntuples(result) != 1) + { + log_error("Query returned %d rows, expected 1", PQntuples(result)); + context->parsedOk = false; + return; + } + + /* filename (text) */ + char *value = PQgetvalue(result, 0, 0); + sformat(context->filename, sizeof(context->filename), "%s/%s", cdcPathDir, value); + + /* + * content (bytea) + * + * We do not want to store this value in memory. Instead we write it to disk + * as it is. + */ + value = PQgetvalue(result, 0, 1); + if (!writeTimelineHistoryFile(context->filename, value)) + { + log_error("Failed to write timeline history file \"%s\"", context->filename); + context->parsedOk = false; + return; + } + + context->parsedOk = true; +} + + +typedef struct TimeLineHistoryContext +{ + DatabaseCatalog *catalog; + uint32_t prevtli; + uint32_t prevend; +} TimelineHistoryContext; + + +/* + * parse_timeline_history_file is a wrapper for the iterator that prepares the + * context etc. + */ +bool +parse_timeline_history_file(char *filename, + DatabaseCatalog *catalog, + uint32_t currentTimeline) +{ + /* step 1: prepare the context */ + TimelineHistoryContext context = + { + .catalog = catalog, + .prevtli = 0, + .prevend = InvalidXLogRecPtr + }; + + /* step 2: iterate over the file */ + if (!file_iter_lines(filename, BUFSIZE, &context, register_timeline_hook)) + { + /* errors have already been logged */ + return false; + } + + /* step 3: add the current timeline to catalog */ + if (currentTimeline != context.prevtli + 1) + { + log_warn("parse_timeline_history_file: Expected timeline %d, got %d", + context.prevtli + 1, currentTimeline); + } + + TimelineHistoryEntry entry = { + .tli = currentTimeline, + .begin = context.prevend, + .end = InvalidXLogRecPtr + }; + + if (!catalog_add_timeline_history(catalog, &entry)) + { + log_error("Failed to add timeline history entry to catalog"); + return false; + } + + return true; +} + + +/* register_timeline_hook is the callback that is called for each line of a */ +/* timeline history file. */ +static bool +register_timeline_hook(void *ctx, char *line) +{ + TimelineHistoryContext *context = (TimelineHistoryContext *) ctx; + + char *ptr = line; + + /* skip leading whitespace */ + for (; *ptr; ptr++) + { + if (!isspace((unsigned char) *ptr)) + { + break; + } + } + + if (*ptr == '\0') + { + /* skip empty lines */ + return true; + } + + log_trace("register_timeline_hook: line is \"%s\"", line); + + char *tabptr = strchr(ptr, '\t'); + + if (tabptr == NULL) + { + log_error("Failed to parse history file line \"%s\"", line); + return false; + } + + *tabptr = '\0'; + + uint32_t current_tli = 0; + uint64_t current_lsn = InvalidXLogRecPtr; + + if (!stringToUInt(ptr, ¤t_tli)) + { + log_error("Failed to parse history timeline \"%s\"", line); + return false; + } + + char *lsn = tabptr + 1; + + for (char *lsnend = lsn; *lsnend; lsnend++) + { + if (!(isxdigit((unsigned char) *lsnend) || *lsnend == '/')) + { + *lsnend = '\0'; + break; + } + } + + if (!parseLSN(lsn, ¤t_lsn)) + { + log_error("Failed to parse history timeline %d LSN \"%s\"", + current_tli, lsn); + return false; + } + + TimelineHistoryEntry entry = { + .tli = current_tli, + .begin = context->prevend, + .end = current_lsn + }; + + if (!catalog_add_timeline_history(context->catalog, &entry)) + { + log_error("Failed to add timeline history entry to catalog"); + return false; + } + + context->prevtli = current_tli; + context->prevend = current_lsn; + + return true; +} diff --git a/src/bin/pgcopydb/pgsql_timeline.h b/src/bin/pgcopydb/pgsql_timeline.h new file mode 100644 index 00000000..a3dabaee --- /dev/null +++ b/src/bin/pgcopydb/pgsql_timeline.h @@ -0,0 +1,18 @@ +/* + * src/bin/pgcopydb/pgsql_timeline.h + * API for sending SQL commands about timelines to a PostgreSQL server + */ +#ifndef PGSQL_TIMELINE_H +#define PGSQL_TIMELINE_H + +#include "pgsql.h" +#include "schema.h" + +/* pgsql_timeline.c */ +bool pgsql_identify_system(PGSQL *pgsql, IdentifySystem *system, + char *cdcPathDir); +bool parse_timeline_history_file(char *filename, + DatabaseCatalog *catalog, + uint32_t currentTimeline); + +#endif /* PGSQL_TIMELINE_H */ diff --git a/src/bin/pgcopydb/pgsql_utils.h b/src/bin/pgcopydb/pgsql_utils.h new file mode 100644 index 00000000..d9ec90d2 --- /dev/null +++ b/src/bin/pgcopydb/pgsql_utils.h @@ -0,0 +1,15 @@ +/* + * src/bin/pgcopydb/pgsql_utils.h + * Common helper functions for interacting with a postgres server + */ +#ifndef PGSQL_UTILS_H +#define PGSQL_UTILS_H + + +#include "libpq-fe.h" + +/* pgsql.c */ +bool is_response_ok(PGresult *result); +bool clear_results(PGSQL *pgsql); + +#endif /* PGSQL_UTILS_H */