Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions db/phys_rep.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,6 @@ char *gbl_physrep_source_host;
char *gbl_physrep_metadb_name;
char *gbl_physrep_metadb_host;

struct metadb {
char *dbname;
char *host;
char **hosts;
pthread_mutex_t lk;
int host_count;
};

#define MAX_ALTERNATE_METADBS 10
struct metadb gbl_altmetadb[MAX_ALTERNATE_METADBS] = {0};
__thread int altmetadb_index[MAX_ALTERNATE_METADBS] = {0};
int gbl_altmetadb_count = 0;
Expand Down Expand Up @@ -189,6 +180,16 @@ void physrep_fanout_dump(void)
Pthread_mutex_unlock(&fanout_lk);
}

void physrep_alt_metadb_print(void)
{
physrep_logmsg(LOGMSG_USER, "Alternate metadb count: %d\n", gbl_altmetadb_count);
for (int i = 0; i < gbl_altmetadb_count; ++i) {
physrep_logmsg(LOGMSG_USER, " metadb %d: dbname %s host %s\n", i,
gbl_altmetadb[i].dbname ? gbl_altmetadb[i].dbname : "NULL",
gbl_altmetadb[i].host ? gbl_altmetadb[i].host : "NULL");
}
}

void cleanup_hosts()
{
DB_Connection *cnct;
Expand Down
13 changes: 13 additions & 0 deletions db/phys_rep.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
#include <stdlib.h>
#include <cdb2api.h>

struct metadb {
char *dbname;
char *host;
char **hosts;
pthread_mutex_t lk;
int host_count;
};

#define MAX_ALTERNATE_METADBS 10

extern char *gbl_physrep_source_dbname;
extern char *gbl_physrep_source_host;
extern char *gbl_physrep_metadb_name;
Expand All @@ -31,6 +41,8 @@ extern int gbl_deferred_phys_flag;

extern unsigned int gbl_deferred_phys_update;

extern struct metadb gbl_altmetadb[MAX_ALTERNATE_METADBS];

int start_physrep_threads();
int stop_physrep_threads();
int physrep_exited();
Expand All @@ -41,5 +53,6 @@ void physrep_fanout_override(const char *dbname, int fanout);
int physrep_fanout_get(const char *dbname);
void physrep_fanout_dump(void);
int physrep_add_alternate_metadb(char *dbname, char *host);
void physrep_alt_metadb_print(void);

#endif /* PHYS_REP_H */
266 changes: 133 additions & 133 deletions db/process_message.c

Large diffs are not rendered by default.

34 changes: 27 additions & 7 deletions docs/pages/operating/physical_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pull the physical logs from the source machine and apply it locally.
It is important to note that physreps are applying the logs out-of-band and, thus,
should not be considered part of the source cluster.

In order to enable replication, `replicate_from` must be added to the copycomdb2-d
In order to enable replication, `replicate_from` must be added to the `copycomdb2 -d`
physrep's lrl file. This line can either take a valid Comdb2 cluster tier, a
hostname or a comma-separated list of hostnames (without any space).

Expand All @@ -30,8 +30,9 @@ physrep added to the system would add to the overall cost incurred by the source
host/cluster.

In order to avoid having source support all physreps directly, one could setup tiered
replication in which some physical replicants could become the source for the other
replicants, thus keeping some load off of the top-level source host/cluster.
replication (not to be confused with machine classes), in which some physical replicants
could become the source for the other replicants, thus keeping some load off of
the top-level source host/cluster.

```

Expand All @@ -49,20 +50,22 @@ Note: The source cluster nodes are always considered at tier 0.
Setting up tiered replication topology requires 2 base tables to maintain the
current state of replication as well as the replication topology.

* comdb2_physreps
* comdb2_physrep_connection
* [comdb2_physreps](#comdb2_physreps)
* [comdb2_physrep_connection](#comdb2_physrep_connections)

These tables automatically get updated to reflect the changes as replicants
join or leave the system and thus are not designed to be manually modified
under normal circumstances. In order to keep the load evenly spread, these table
are consulted to ensure a certain fanout `physrep_fanout` is maintained across all
the nodes. The LSN information in `comdb2_physreps` table is used by all the
the nodes. The LSN (file:offset) information in `comdb2_physreps` table is used by all the
nodes to pause log-deletion.

## Algorithm

On start, a physical replicant executes `sys.physrep.register_replicant()` against
the `physrep_metadb`, which in turn, responds with a list of potential nodes that
the `physrep_metadb`, which in turn, responds with a list of potential nodes
(by doing a graph traversal on nodes (`comdb2_physreps`) and edges (`comdb2_physrep_connections`)
, starting at source as root node/tier 0, ref: `lua/lib/physrep_register_replicant.lua`) that
can be used as the source of physical logs. The replicant then picks up a node from
the list and tries to connect to it. On successful connection, the replicant executes
`sys.physrep.update_registry()` against the `physrep_metadb`, confirming that the
Expand Down Expand Up @@ -112,6 +115,21 @@ the hosts listed in that cluster, as represented by the bbcpu.lst.
NOTE: In cross-tier replication, the replication metadata tables must be hosted by a
separate database running in the a lower (development) tier.

### Alternate Metadbs

Physrep setup supports configuring multiple alternate metadbs in addition to the primary
`physrep_metadb`. The idea was to setup an alternate metadb in a separate tier/class (say beta) so that
production tier/class doesn't have to directly interact with lower level tiers (this is an update to the
cross-tier replication model discussed above).

Key gotchas:
* A physical replicant registers (`register_replicant`) only against the primary metadb (never an alternate).
* Metadb does not provide transaction logs, but returns candidate source nodes to replicate from (based on fanout and tree traversal, refer to [algorithm](#algorithm)).
* Alternate metadbs are primarily used by the source (physrep-parent) side to try and establish a reverse connection based on the `comdb2_physrep_sources` table.
* The source cluster writes replication metadata (entries into `comdb2_physreps`, `comdb2_physrep_connections`) to primary physrep_metadb.
* If a source is itself a physrep (tiered chain), it still uses only its primary metadb for its own registration, while reverse connecting outwards based on configured alternate metadbs.


## Physical replication metadata tables

### comdb2_physreps
Expand All @@ -125,6 +143,7 @@ CREATE TABLE comdb2_physreps(dbname CSTRING(60),
state CSTRING(60),
UNIQUE (dbname, host))
```
This could be thought of as the registry of all the physical replicants nodes in the system (both sources and replicants).
* Physical replicant states:
* `Pending` : The node has requested to become a physical replicant (registration in-progress)
* `Active` : The node is a physical replicant
Expand Down Expand Up @@ -181,6 +200,7 @@ CREATE TABLE comdb2_physrep_sources(dbname CSTRING(60),
* replicate_from dbname @host/tier: This line sets the source host/cluster. It is required for all physical replicants.
* replicate_wait <sec>: Tells the physical replicant to wait for this many seconds before applying the log records.
* physrep_metadb: If set, all the nodes will connect to this database (as against source host/cluster mentioned via `replicate_from`) for replication metadata tables
* alternate_metadb <dbname> <host>: If set, parent node will try to establish reverse connection based on the `comdb2_physrep_sources` table.
* physrep_fanout_override <dbname> <fanout>: This is set on the metadb, and allows per-database overrides of the 'physrep_fanout' tunable. The 'physrep_fanout_override' message-trap allows this to be set dynamically. The 'physrep_fanout_dump' message-trap prints the current overrides.
* physrep_ignore <tables>: All the log records that belong to any of these tables are ignored by physical replicants
* nonames: This configuration forces system database file names to not carry the database name. This setting is required for physical-log based replication to work properly.
Expand Down
1 change: 1 addition & 0 deletions sqlite/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ add_library(sqlite
ext/comdb2/opcode_handlers.c
ext/comdb2/partial_datacopies.c
ext/comdb2/permissions.c
ext/comdb2/phys_rep_alt_metadb.c
ext/comdb2/plugins.c
ext/comdb2/procedures.c
ext/comdb2/query_plans.c
Expand Down
1 change: 1 addition & 0 deletions sqlite/ext/comdb2/comdb2systblInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ int systblTableMetricsInit(sqlite3 *db);
int systblApiHistoryInit(sqlite3 *db);
int systblDbInfoInit(sqlite3 *db);
int systblUnusedFilesInit(sqlite3 *db);
int systblPhysrepAltmetadbInit(sqlite3 *db);

/* Simple yes/no answer for booleans */
#define YESNO(x) ((x) ? "Y" : "N")
Expand Down
86 changes: 86 additions & 0 deletions sqlite/ext/comdb2/phys_rep_alt_metadb.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2020 Bloomberg Finance L.P.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <stdlib.h>
#include <string.h>
#include <stddef.h>

#include "comdb2.h"
#include "comdb2systblInt.h"
#include "sql.h"
#include "ezsystables.h"
#include "phys_rep.h"

typedef struct systable_physrep_altmetadb_t {
char *dbname;
char *host;
int host_count;
} systable_physrep_altmetadb_t;

static int collect_physrep_alt_metadbs(void **pdata, int *pn) {
systable_physrep_altmetadb_t *rows;
int nrows = 0;

extern int gbl_altmetadb_count;
extern struct metadb gbl_altmetadb[MAX_ALTERNATE_METADBS];

if (gbl_altmetadb_count == 0) {
*pdata = NULL;
*pn = 0;
return 0;
}

rows = calloc(gbl_altmetadb_count, sizeof(systable_physrep_altmetadb_t));
if (!rows) return -1;

// Populate rows from gbl_altmetadb array
for (int i = 0; i < gbl_altmetadb_count; i++) {
rows[i].dbname = strdup(gbl_altmetadb[i].dbname);
rows[i].host = strdup(gbl_altmetadb[i].host);
rows[i].host_count = gbl_altmetadb[i].host_count;
nrows++;
}
*pdata = rows;
*pn = nrows;
return 0;
}


static void free_physrep_altmetadb(void *data, int nrows) {
systable_physrep_altmetadb_t *rows = (systable_physrep_altmetadb_t *)data;

for (int i = 0; i < nrows; i++) {
free(rows[i].dbname);
free(rows[i].host);
}
free(rows);
}

sqlite3_module systblPhysrepAltmetadbModule = {
.access_flag = CDB2_ALLOW_ALL,
};

int systblPhysrepAltmetadbInit(sqlite3 *db) {
return create_system_table(
db, "comdb2_physrep_altmetadb", &systblPhysrepAltmetadbModule,
collect_physrep_alt_metadbs, free_physrep_altmetadb,
sizeof(systable_physrep_altmetadb_t),
CDB2_CSTRING, "dbname", -1, offsetof(systable_physrep_altmetadb_t, dbname),
CDB2_CSTRING, "host", -1, offsetof(systable_physrep_altmetadb_t, host),
CDB2_INTEGER, "host_count", -1, offsetof(systable_physrep_altmetadb_t, host_count),
SYSTABLE_END_OF_FIELDS
);
}
2 changes: 2 additions & 0 deletions sqlite/ext/comdb2/tables.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ int comdb2SystblInit(
rc = systblDbInfoInit(db);
if (rc == SQLITE_OK)
rc = systblUnusedFilesInit(db);
if (rc == SQLITE_OK)
rc = systblPhysrepAltmetadbInit(db);
#endif
return rc;
}
Expand Down
1 change: 1 addition & 0 deletions tests/auth.test/t09.expected
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@
(candidate='comdb2_net_userfuncs')
(candidate='comdb2_opcode_handlers')
(candidate='comdb2_partial_datacopies')
(candidate='comdb2_physrep_altmetadb')
(candidate='comdb2_plugins')
(candidate='comdb2_prepared')
(candidate='comdb2_procedures')
Expand Down
1 change: 1 addition & 0 deletions tests/cdb2sql.test/t00.expected
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ unknown @ls sub-command foo
(name='comdb2_net_userfuncs')
(name='comdb2_opcode_handlers')
(name='comdb2_partial_datacopies')
(name='comdb2_physrep_altmetadb')
(name='comdb2_plugins')
(name='comdb2_prepared')
(name='comdb2_procedures')
Expand Down
1 change: 1 addition & 0 deletions tests/comdb2sys.test/comdb2sys.expected
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@
(name='comdb2_net_userfuncs')
(name='comdb2_opcode_handlers')
(name='comdb2_partial_datacopies')
(name='comdb2_physrep_altmetadb')
(name='comdb2_plugins')
(name='comdb2_prepared')
(name='comdb2_procedures')
Expand Down
23 changes: 23 additions & 0 deletions tests/phys_rep_tiered.test/runit
Original file line number Diff line number Diff line change
Expand Up @@ -1978,6 +1978,24 @@ function check_metadb_memstat
fi
}

function verify_alt_metadb_system_table
{
out=$($CDB2SQL_EXE $CDB2_OPTIONS $REPL_CLUS_DBNAME --host $REPL_CLUS_HOST "exec procedure sys.cmd.send('physrep_alt_metadb_print')" | grep "$REPL_ALTMETA_DBNAME" | wc -l)
if [[ "$out" -ne "1" ]]; then
cleanFailExit "Alt metadb $REPL_ALTMETA_DBNAME message trap"
fi

out=$($CDB2SQL_EXE $CDB2_OPTIONS $SOURCE_DBNAME --host $SOURCE_HOST "select * from comdb2_physrep_altmetadb" | grep "$REPL_ALTMETA_DBNAME" | wc -l)
if [[ "$out" -ne "1" ]]; then
cleanFailExit "alt metadb $REPL_ALTMETA_DBNAME not found in the sytem table on source"
fi

out=$($CDB2SQL_EXE $CDB2_OPTIONS $REPL_CLUS_DBNAME --host $REPL_CLUS_HOST "select * from comdb2_physrep_altmetadb" | grep "$REPL_ALTMETA_DBNAME" | wc -l)
if [[ "$out" -ne "1" ]]; then
cleanFailExit "alt metadb $REPL_ALTMETA_DBNAME not found in the sytem table on replicant"
fi
}

# Halt a clustered physical replicant .. swing the master on that cluster
# a number of times, then restart the physical replicant .. looking at the
# trace, verify that nothing ignores rectype 2, 11, or 10 ..
Expand Down Expand Up @@ -2206,6 +2224,11 @@ function run_tests
testcase_preamble $testcase
verify_connection_count
testcase_finish $testcase

testcase="verify_alt_metadb_system_table"
testcase_preamble $testcase
verify_alt_metadb_system_table
testcase_finish $testcase
}

run_tests
Expand Down
1 change: 1 addition & 0 deletions tests/userschema.test/t00.expected
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
(tablename='comdb2_net_userfuncs', username='mohit', READ='Y', WRITE='Y', DDL='Y')
(tablename='comdb2_opcode_handlers', username='mohit', READ='Y', WRITE='Y', DDL='Y')
(tablename='comdb2_partial_datacopies', username='mohit', READ='Y', WRITE='Y', DDL='Y')
(tablename='comdb2_physrep_altmetadb', username='mohit', READ='Y', WRITE='Y', DDL='Y')
(tablename='comdb2_plugins', username='mohit', READ='Y', WRITE='Y', DDL='Y')
(tablename='comdb2_prepared', username='mohit', READ='Y', WRITE='Y', DDL='Y')
(tablename='comdb2_procedures', username='mohit', READ='Y', WRITE='Y', DDL='Y')
Expand Down