Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store timelines in internal catalogs #848

Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
79635c9
TimeLine -> Timeline
hanefi Jul 3, 2024
198c70d
Store timeline history in internal catalogs
hanefi Jul 19, 2024
75199cc
Reindent
hanefi Jul 19, 2024
5c22d86
Fix interface and imports
hanefi Jul 23, 2024
dc76ce5
Remove some unused structs and struct members
hanefi Jul 23, 2024
e619b2b
Read current timeline from catalogs
hanefi Jul 23, 2024
83cbfd6
Fix module structure
hanefi Jul 23, 2024
f08cac3
Address reviews
hanefi Jul 23, 2024
494fc27
Remove unnecessary internal function
hanefi Jul 23, 2024
0e7859f
Properly handle failures when adding timelines to catalog
hanefi Jul 23, 2024
431058e
Remove unused constants
hanefi Jul 23, 2024
bbc6b63
Remove TimelineHistoryContext
hanefi Jul 23, 2024
eb621c3
Format LSN with helpers
hanefi Jul 23, 2024
a424f0e
Remove tli hist file references
hanefi Jul 23, 2024
cf79152
Rename a function for consistency
hanefi Jul 23, 2024
713f1f2
Merge branch 'main' of github.com:dimitri/pgcopydb into 834-parsing-o…
hanefi Jul 23, 2024
5174220
Remove void pointers
hanefi Jul 23, 2024
70b5079
Bring tlihistory file back
hanefi Jul 23, 2024
d3e6d28
Fix formatting of new header file
hanefi Jul 23, 2024
2de32eb
Remove all void pointer contexts in branch
hanefi Jul 23, 2024
905b093
Persist timeline history files for debugging
hanefi Jul 23, 2024
b5070fa
Fix copy paste errors, add missing comments
hanefi Jul 23, 2024
77c5a36
Introduce new header file
hanefi Jul 24, 2024
2eacb24
Add debug message for writing history file to disk
hanefi Jul 24, 2024
aee6efe
Add function level comments
hanefi Jul 29, 2024
2499fd1
Move debug message before writing file to disk
hanefi Jul 29, 2024
c517ae6
Implement iterator API for timeline history parsing
hanefi Jul 30, 2024
8c7a8da
Do not access catalogs in pgsql_start_replication
hanefi Jul 30, 2024
eb6b808
Write to files only if we have timeline history
hanefi Jul 30, 2024
937fd1a
Merge branch 'main' of github.com:dimitri/pgcopydb into 834-parsing-o…
hanefi Jul 30, 2024
8ba0fe3
Reindent
hanefi Jul 30, 2024
a02d5ea
store cdc path in logical stream client
hanefi Jul 31, 2024
a9a29c9
Fix conditional to decide when to write history to file
hanefi Jul 31, 2024
f9e59b6
Cleanup contexts and hooks
hanefi Jul 31, 2024
356f198
Fix file paths
hanefi Jul 31, 2024
2ab8a2e
Calculate abs path only once
hanefi Jul 31, 2024
fea700b
Remove custom iterators
hanefi Aug 1, 2024
b87cae7
Remove tlihistfile references
hanefi Aug 1, 2024
7985881
Fix warning message
hanefi Aug 1, 2024
751574e
Move declarations in timeline header
hanefi Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 169 additions & 2 deletions src/bin/pgcopydb/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ static char *sourceDBcreateDDLs[] = {
"create table sentinel("
" id integer primary key check (id = 1), "
" startpos pg_lsn, endpos pg_lsn, apply bool, "
" write_lsn pg_lsn, flush_lsn pg_lsn, replay_lsn pg_lsn)"
" write_lsn pg_lsn, flush_lsn pg_lsn, replay_lsn pg_lsn)",

"create table timeline_history("
" tli integer primary key, startpos pg_lsn, endpos pg_lsn)"
};


Expand Down Expand Up @@ -434,7 +437,8 @@ static char *sourceDBdropDDLs[] = {
"drop table if exists s_table_parts_done",
"drop table if exists s_table_indexes_done",

"drop table if exists sentinel"
"drop table if exists sentinel",
"drop table if exists timeline_history"
};


Expand Down Expand Up @@ -7700,6 +7704,169 @@ catalog_count_summary_done_fetch(SQLiteQuery *query)
}


/*
* catalog_add_timeline_history inserts a timeline history entry to our
* internal catalogs database.
*/
bool
catalog_add_timeline_history(void *ctx, TimelineHistoryEntry *entry)
{
TimelineHistoryContext *context = (TimelineHistoryContext *) ctx;
DatabaseCatalog *catalog = context->source;
dimitri marked this conversation as resolved.
Show resolved Hide resolved
if (catalog == NULL)
{
log_error("BUG: catalog_add_timeline_history: catalog is NULL");
return false;
}

sqlite3 *db = catalog->db;

if (db == NULL)
{
log_error("BUG: catalog_add_timeline_history: db is NULL");
return false;
}

char *sql =
"insert or replace into timeline_history(tli, startpos, endpos)"
"values($1, $2, $3)";

SQLiteQuery query = { 0 };

if (!catalog_sql_prepare(db, sql, &query))
{
/* errors have already been logged */
return false;
}

char slsn[PG_LSN_MAXLENGTH] = { 0 };
char elsn[PG_LSN_MAXLENGTH] = { 0 };

sformat(slsn, sizeof(slsn), "%X/%X", LSN_FORMAT_ARGS(entry->begin));
sformat(elsn, sizeof(elsn), "%X/%X", LSN_FORMAT_ARGS(entry->end));

/* bind our parameters now */
BindParam params[] = {
{ BIND_PARAMETER_TYPE_INT, "tli", entry->tli, NULL },
{ BIND_PARAMETER_TYPE_TEXT, "startpos", 0, slsn },
{ BIND_PARAMETER_TYPE_TEXT, "endpos", 0, elsn }
};

int count = sizeof(params) / sizeof(params[0]);

if (!catalog_sql_bind(&query, params, count))
{
/* errors have already been logged */
return false;
}

/* now execute the query, which does not return any row */
if (!catalog_sql_execute_once(&query))
{
/* errors have already been logged */
return false;
}

return true;
}


/*
* catalog_lookup_timeline fetches the current TimelineHistoryEntry
* from our catalogs.
*/
bool
catalog_lookup_timeline(DatabaseCatalog *catalog,
int tli,
TimelineHistoryEntry *entry)
{
sqlite3 *db = catalog->db;

if (db == NULL)
{
log_error("BUG: catalog_lookup_timeline: db is NULL");
return false;
}

SQLiteQuery query = {
.context = entry,
.fetchFunction = &catalog_timeline_history_fetch
};

char *sql =
" select tli, startpos, endpos"
" from timeline_history"
" where tli = $1";


if (!catalog_sql_prepare(db, sql, &query))
{
/* errors have already been logged */
return false;
}

/* bind our parameters now */
BindParam params[] = {
{ BIND_PARAMETER_TYPE_INT, "tli", tli, NULL }
};

int count = sizeof(params) / sizeof(params[0]);

if (!catalog_sql_bind(&query, params, count))
{
/* errors have already been logged */
return false;
}

/* now execute the query, which return exactly one row */
if (!catalog_sql_execute_once(&query))
{
/* errors have already been logged */
return false;
}

return true;
}


bool
catalog_timeline_history_fetch(SQLiteQuery *query)
dimitri marked this conversation as resolved.
Show resolved Hide resolved
{
TimelineHistoryEntry *entry = (TimelineHistoryEntry *) query->context;

bzero(entry, sizeof(TimelineHistoryEntry));

/* tli */
entry->tli = sqlite3_column_int(query->ppStmt, 0);

/* begin LSN */
if (sqlite3_column_type(query->ppStmt, 1) != SQLITE_NULL)
{
const char *startpos = (const char *) sqlite3_column_text(query->ppStmt, 1);

if (!parseLSN(startpos, &entry->begin))
{
log_error("Failed to parse LSN from \"%s\"", startpos);
return false;
}
}

/* end LSN */
if (sqlite3_column_type(query->ppStmt, 2) != SQLITE_NULL)
{
const char *endpos = (const char *) sqlite3_column_text(query->ppStmt, 2);

if (!parseLSN(endpos, &entry->end))
{
log_error("Failed to parse LSN from \"%s\"", endpos);
return false;
}
}

return true;
}


/*
* catalog_execute executes sqlite query
*/
Expand Down
10 changes: 10 additions & 0 deletions src/bin/pgcopydb/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,16 @@ bool catalog_count_summary_done(DatabaseCatalog *catalog,
bool catalog_count_summary_done_fetch(SQLiteQuery *query);


/*
* Logical decoding
*/
bool catalog_add_timeline_history(void *context,
TimelineHistoryEntry *entry);
bool catalog_lookup_timeline(DatabaseCatalog *catalog,
int tli,
TimelineHistoryEntry *entry);
bool catalog_timeline_history_fetch(SQLiteQuery *query);

/*
* Internal tooling for catalogs management
*/
Expand Down
7 changes: 3 additions & 4 deletions src/bin/pgcopydb/follow.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,12 +419,11 @@ follow_prepare_mode_switch(StreamSpecs *streamSpecs,

if (streamSpecs->system.timeline == 0)
{
if (!stream_read_context(&(streamSpecs->paths),
&(streamSpecs->system),
&(streamSpecs->WalSegSz)))
if (!stream_read_context(streamSpecs))
{
log_error("Failed to read the streaming context information "
"from the source database, see above for details");
"from the source database and internal catalogs, "
"see above for details");
return false;
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/bin/pgcopydb/ld_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,11 @@ stream_apply_setup(StreamSpecs *specs, StreamApplyContext *context)

if (specs->system.timeline == 0)
{
if (!stream_read_context(&(specs->paths),
&(specs->system),
&(specs->WalSegSz)))
if (!stream_read_context(specs))
{
log_error("Failed to read the streaming context information "
"from the source database, see above for details");
"from the source database and internal catalogs, "
"see above for details");
return false;
}
}
Expand Down
42 changes: 14 additions & 28 deletions src/bin/pgcopydb/ld_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,10 @@ startLogicalStreaming(StreamSpecs *specs)
OutputPluginToString(specs->slot.plugin),
specs->pluginOptions.count);

if (!pgsql_start_replication(&stream))
/* prepare the context for timeline history */
TimelineHistoryContext timelineContext = { .source = specs->sourceDB };

if (!pgsql_start_replication(&stream, &timelineContext))
{
/* errors have already been logged */
return false;
Expand Down Expand Up @@ -2613,8 +2616,7 @@ stream_fetch_current_lsn(uint64_t *lsn,


/*
* stream_write_context writes the wal_segment_size and timeline history to
* files.
* stream_write_context writes the wal_segment_size and tli to files.
*/
bool
stream_write_context(StreamSpecs *specs, LogicalStreamClient *stream)
Expand Down Expand Up @@ -2652,16 +2654,6 @@ stream_write_context(StreamSpecs *specs, LogicalStreamClient *stream)

log_debug("Wrote tli %s timeline file \"%s\"", tli, specs->paths.tlifile);

if (!write_file(system->timelines.content,
strlen(system->timelines.content),
specs->paths.tlihistfile))
{
/* errors have already been logged */
return false;
}

log_debug("Wrote timeline history file \"%s\"", specs->paths.tlihistfile);

return true;
}

Expand All @@ -2677,7 +2669,6 @@ stream_cleanup_context(StreamSpecs *specs)

success = success && unlink_file(specs->paths.walsegsizefile);
success = success && unlink_file(specs->paths.tlifile);
success = success && unlink_file(specs->paths.tlihistfile);

/* reset the timeline, so that we always read from the disk */
specs->system.timeline = 0;
Expand All @@ -2691,14 +2682,14 @@ stream_cleanup_context(StreamSpecs *specs)
* wal_segment_size and timeline history.
*/
bool
stream_read_context(CDCPaths *paths,
IdentifySystem *system,
uint32_t *WalSegSz)
stream_read_context(StreamSpecs *specs)
{
CDCPaths *paths = &(specs->paths);
IdentifySystem *system = &(specs->system);
uint32_t *WalSegSz = &(specs->WalSegSz);

char *wal_segment_size = NULL;
char *tli = NULL;
char *history = NULL;

long size = 0L;

/*
Expand Down Expand Up @@ -2727,8 +2718,7 @@ stream_read_context(CDCPaths *paths,
}

if (file_exists(paths->walsegsizefile) &&
file_exists(paths->tlifile) &&
file_exists(paths->tlihistfile))
file_exists(paths->tlifile))
{
/*
* File did exist, but might have been deleted now (race condition
Expand All @@ -2742,9 +2732,6 @@ stream_read_context(CDCPaths *paths,
success = success &&
read_file(paths->tlifile, &tli, &size);

success = success &&
read_file(paths->tlihistfile, &history, &size);

if (success)
{
/* success: break out of the retry loop */
Expand All @@ -2765,8 +2752,7 @@ stream_read_context(CDCPaths *paths,

/* did retry policy expire before the files are created? */
if (!(file_exists(paths->walsegsizefile) &&
file_exists(paths->tlifile) &&
file_exists(paths->tlihistfile)))
file_exists(paths->tlifile)))
{
log_error("Failed to read stream context file: retry policy expired");
return false;
Expand All @@ -2787,12 +2773,12 @@ stream_read_context(CDCPaths *paths,
return false;
}

if (!parseTimeLineHistory(paths->tlihistfile, history, system))
DatabaseCatalog *source = specs->sourceDB;
if (!catalog_lookup_timeline(source, system->timeline, &system->currentTimeline))
{
/* errors have already been logged */
return false;
}


return true;
}
4 changes: 1 addition & 3 deletions src/bin/pgcopydb/ld_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,7 @@ bool stream_fetch_current_lsn(uint64_t *lsn,

bool stream_write_context(StreamSpecs *specs, LogicalStreamClient *stream);
bool stream_cleanup_context(StreamSpecs *specs);
bool stream_read_context(CDCPaths *paths,
IdentifySystem *system,
uint32_t *WalSegSz);
bool stream_read_context(StreamSpecs *specs);

StreamAction StreamActionFromChar(char action);
char * StreamActionToString(StreamAction action);
Expand Down
Loading
Loading