Skip to content

Commit

Permalink
Use far fewer connections in flex output
Browse files Browse the repository at this point in the history
Most operations are not run in parallel anyway, they only need a single
connection. For the parts running in parallel, we have one connection
per thread. We don't have connections for each table any more.
  • Loading branch information
joto committed Feb 4, 2024
1 parent 974447f commit ef8f6e5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 22 deletions.
31 changes: 11 additions & 20 deletions src/output-flex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1075,12 +1075,12 @@ void output_flex_t::after_ways()
void output_flex_t::stop()
{
for (auto &table : m_table_connections) {
auto *db_connection = &m_db_connections.at(table.table().num());
table.task_set(thread_pool().submit([&, db_connection]() {
table.stop(*db_connection,
table.task_set(thread_pool().submit([&]() {
pg_conn_t const db_connection{get_options()->connection_params,
"out.flex.stop"};
table.stop(db_connection,
get_options()->slim && !get_options()->droptemp,
get_options()->append);
db_connection->close();
}));
}

Expand Down Expand Up @@ -1185,8 +1185,7 @@ void output_flex_t::delete_from_tables(osmium::item_type type, osmid_t osm_id)
{
for (auto &table : m_table_connections) {
if (table.table().matches_type(type) && table.table().has_id_column()) {
delete_from_table(&table, m_db_connections.at(table.table().num()),
type, osm_id);
delete_from_table(&table, m_db_connection, type, osm_id);
}
}
}
Expand Down Expand Up @@ -1230,12 +1229,8 @@ void output_flex_t::relation_modify(osmium::Relation const &rel)

void output_flex_t::start()
{
assert(m_db_connections.empty());

for (auto &table : m_table_connections) {
m_db_connections.emplace_back(get_options()->connection_params,
"out.flex.table");
table.start(m_db_connections.back(), get_options()->append);
table.start(m_db_connection, get_options()->append);
}
}

Expand Down Expand Up @@ -1263,19 +1258,16 @@ output_flex_t::output_flex_t(output_flex_t const *other,
std::shared_ptr<db_copy_thread_t> copy_thread)
: output_t(other, std::move(mid)), m_tables(other->m_tables),
m_expire_outputs(other->m_expire_outputs),
m_db_connection(get_options()->connection_params, "out.flex.thread"),
m_stage2_way_ids(other->m_stage2_way_ids),
m_copy_thread(std::move(copy_thread)), m_lua_state(other->m_lua_state),
m_process_node(other->m_process_node), m_process_way(other->m_process_way),
m_process_relation(other->m_process_relation),
m_select_relation_members(other->m_select_relation_members)
{
assert(m_db_connections.empty());

for (auto &table : *m_tables) {
auto &tc = m_table_connections.emplace_back(&table, m_copy_thread);
m_db_connections.emplace_back(get_options()->connection_params,
"out.flex.table");
tc.prepare(m_db_connections.back());
tc.prepare(m_db_connection);
}

for (auto &expire_output : *m_expire_outputs) {
Expand All @@ -1295,6 +1287,7 @@ output_flex_t::output_flex_t(std::shared_ptr<middle_query_t> const &mid,
std::shared_ptr<thread_pool_t> thread_pool,
options_t const &options)
: output_t(mid, std::move(thread_pool), options),
m_db_connection(get_options()->connection_params, "out.flex.main"),
m_copy_thread(std::make_shared<db_copy_thread_t>(options.connection_params))
{
init_lua(options.style);
Expand Down Expand Up @@ -1521,10 +1514,8 @@ void output_flex_t::reprocess_marked()
for (auto &table : m_table_connections) {
if (table.table().matches_type(osmium::item_type::way) &&
table.table().has_id_column()) {
auto const &db_connection =
m_db_connections.at(table.table().num());
table.analyze(db_connection);
table.create_id_index(db_connection);
table.analyze(m_db_connection);
table.create_id_index(m_db_connection);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/output-flex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ class output_flex_t : public output_t

std::vector<table_connection_t> m_table_connections;

/// The connections to the database server for each table.
std::vector<pg_conn_t> m_db_connections;
/// The connection to the database server.
pg_conn_t m_db_connection;

// This is shared between all clones of the output and must only be
// accessed while protected using the lua_mutex.
Expand Down

0 comments on commit ef8f6e5

Please sign in to comment.