From 0ce67d1fe7f1a8e07f9ef0d1c1f9c08490d9837c Mon Sep 17 00:00:00 2001 From: Jochen Topf Date: Sun, 7 Jul 2024 22:25:50 +0200 Subject: [PATCH 1/2] Refactor how COPY is handled, use variant instead of virtual class COPYs are handled in extra threads through a worker queue. Using std::variant instead of virtual classes simplifies the code, because we don't need the indirection through a unique_ptr any more. --- src/db-copy-mgr.hpp | 110 +++++++++++++++++----------------- src/db-copy.cpp | 58 ++++++++++-------- src/db-copy.hpp | 84 +++++++++++--------------- tests/test-db-copy-thread.cpp | 70 ++++++++++++---------- 4 files changed, 161 insertions(+), 161 deletions(-) diff --git a/src/db-copy-mgr.hpp b/src/db-copy-mgr.hpp index d2089cc22..88767ba75 100644 --- a/src/db-copy-mgr.hpp +++ b/src/db-copy-mgr.hpp @@ -36,20 +36,19 @@ class db_copy_mgr_t */ void new_line(std::shared_ptr const &table) { - if (!m_current || !m_current->target->same_copy_target(*table)) { + if (!m_current || !m_current.target->same_copy_target(*table)) { if (m_current) { - m_processor->add_buffer(std::move(m_current)); + m_processor->send_command(std::move(m_current)); } - - m_current = std::make_unique>(table); + m_current = db_cmd_copy_delete_t(table); } - m_committed = m_current->buffer.size(); + m_committed = m_current.buffer.size(); } void rollback_line() { assert(m_current); - m_current->buffer.resize(m_committed); + m_current.buffer.resize(m_committed); } /** @@ -62,7 +61,7 @@ class db_copy_mgr_t { assert(m_current); - auto &buf = m_current->buffer; + auto &buf = m_current.buffer; assert(!buf.empty()); // Expect that a column has been written last which ended in a '\t'. @@ -70,8 +69,9 @@ class db_copy_mgr_t assert(buf.back() == '\t'); buf.back() = '\n'; - if (m_current->is_full()) { - m_processor->add_buffer(std::move(m_current)); + if (m_current.is_full()) { + m_processor->send_command(std::move(m_current)); + m_current = {}; } } @@ -96,7 +96,7 @@ class db_copy_mgr_t void add_column(T value) { add_value(value); - m_current->buffer += '\t'; + m_current.buffer += '\t'; } /** @@ -104,7 +104,7 @@ class db_copy_mgr_t * * Adds a NULL value for the column. */ - void add_null_column() { m_current->buffer += "\\N\t"; } + void add_null_column() { m_current.buffer += "\\N\t"; } /** * Start an array column. @@ -113,7 +113,7 @@ class db_copy_mgr_t * * Must be finished with a call to finish_array(). */ - void new_array() { m_current->buffer += "{"; } + void new_array() { m_current.buffer += "{"; } /** * Add a single value to an array column. @@ -124,7 +124,7 @@ class db_copy_mgr_t void add_array_elem(osmid_t value) { add_value(value); - m_current->buffer += ','; + m_current.buffer += ','; } /** @@ -135,13 +135,13 @@ class db_copy_mgr_t */ void finish_array() { - assert(!m_current->buffer.empty()); - if (m_current->buffer.back() == '{') { - m_current->buffer += '}'; + assert(!m_current.buffer.empty()); + if (m_current.buffer.back() == '{') { + m_current.buffer += '}'; } else { - m_current->buffer.back() = '}'; + m_current.buffer.back() = '}'; } - m_current->buffer += '\t'; + m_current.buffer += '\t'; } /** @@ -172,11 +172,11 @@ class db_copy_mgr_t */ void add_hash_elem(char const *k, char const *v) { - m_current->buffer += '"'; + m_current.buffer += '"'; add_escaped_string(k); - m_current->buffer += "\"=>\""; + m_current.buffer += "\"=>\""; add_escaped_string(v); - m_current->buffer += "\","; + m_current.buffer += "\","; } /** @@ -187,11 +187,11 @@ class db_copy_mgr_t */ void add_hash_elem_noescape(char const *k, char const *v) { - m_current->buffer += '"'; - m_current->buffer += k; - m_current->buffer += "\"=>\""; - m_current->buffer += v; - m_current->buffer += "\","; + m_current.buffer += '"'; + m_current.buffer += k; + m_current.buffer += "\"=>\""; + m_current.buffer += v; + m_current.buffer += "\","; } /** @@ -207,11 +207,11 @@ class db_copy_mgr_t template void add_hstore_num_noescape(char const *k, T const value) { - m_current->buffer += '"'; - m_current->buffer += k; - m_current->buffer += "\"=>\""; - m_current->buffer += std::to_string(value); - m_current->buffer += "\","; + m_current.buffer += '"'; + m_current.buffer += k; + m_current.buffer += "\"=>\""; + m_current.buffer += std::to_string(value); + m_current.buffer += "\","; } /** @@ -222,11 +222,11 @@ class db_copy_mgr_t */ void finish_hash() { - auto const idx = m_current->buffer.size() - 1; - if (!m_current->buffer.empty() && m_current->buffer[idx] == ',') { - m_current->buffer[idx] = '\t'; + auto const idx = m_current.buffer.size() - 1; + if (!m_current.buffer.empty() && m_current.buffer[idx] == ',') { + m_current.buffer[idx] = '\t'; } else { - m_current->buffer += '\t'; + m_current.buffer += '\t'; } } @@ -241,10 +241,10 @@ class db_copy_mgr_t for (auto c : wkb) { unsigned int const num = static_cast(c); - m_current->buffer += lookup_hex[(num >> 4U) & 0xfU]; - m_current->buffer += lookup_hex[num & 0xfU]; + m_current.buffer += lookup_hex[(num >> 4U) & 0xfU]; + m_current.buffer += lookup_hex[num & 0xfU]; } - m_current->buffer += '\t'; + m_current.buffer += '\t'; } /** @@ -257,14 +257,16 @@ class db_copy_mgr_t void delete_object(ARGS &&... args) { assert(m_current); - m_current->add_deletable(std::forward(args)...); + m_current.add_deletable(std::forward(args)...); } void flush() { // flush current buffer if there is one if (m_current) { - m_processor->add_buffer(std::move(m_current)); + m_processor->send_command( + db_cmd_copy_delete_t{std::move(m_current)}); + m_current = {}; } // close any ongoing copy operations m_processor->end_copy(); @@ -285,7 +287,7 @@ class db_copy_mgr_t template void add_value(T value) { - m_current->buffer += fmt::to_string(value); + m_current.buffer += fmt::to_string(value); } void add_value(std::string const &s) { add_value(s.c_str()); } @@ -296,22 +298,22 @@ class db_copy_mgr_t for (char const *c = s; *c; ++c) { switch (*c) { case '"': - m_current->buffer += "\\\""; + m_current.buffer += "\\\""; break; case '\\': - m_current->buffer += "\\\\"; + m_current.buffer += "\\\\"; break; case '\n': - m_current->buffer += "\\n"; + m_current.buffer += "\\n"; break; case '\r': - m_current->buffer += "\\r"; + m_current.buffer += "\\r"; break; case '\t': - m_current->buffer += "\\t"; + m_current.buffer += "\\t"; break; default: - m_current->buffer += *c; + m_current.buffer += *c; break; } } @@ -322,29 +324,29 @@ class db_copy_mgr_t for (char const *c = s; *c; ++c) { switch (*c) { case '"': - m_current->buffer += R"(\\")"; + m_current.buffer += R"(\\")"; break; case '\\': - m_current->buffer += R"(\\\\)"; + m_current.buffer += R"(\\\\)"; break; case '\n': - m_current->buffer += "\\n"; + m_current.buffer += "\\n"; break; case '\r': - m_current->buffer += "\\r"; + m_current.buffer += "\\r"; break; case '\t': - m_current->buffer += "\\t"; + m_current.buffer += "\\t"; break; default: - m_current->buffer += *c; + m_current.buffer += *c; break; } } } std::shared_ptr m_processor; - std::unique_ptr> m_current; + db_cmd_copy_delete_t m_current; std::size_t m_committed = 0; }; diff --git a/src/db-copy.cpp b/src/db-copy.cpp index 7c320dca1..0753fe6e7 100644 --- a/src/db-copy.cpp +++ b/src/db-copy.cpp @@ -92,7 +92,7 @@ db_copy_thread_t::db_copy_thread_t(connection_params_t const &connection_params) db_copy_thread_t::~db_copy_thread_t() { finish(); } -void db_copy_thread_t::add_buffer(std::unique_ptr &&buffer) +void db_copy_thread_t::send_command(db_cmd_t &&buffer) { assert(m_worker.joinable()); // thread must not have been finished @@ -107,21 +107,21 @@ void db_copy_thread_t::add_buffer(std::unique_ptr &&buffer) void db_copy_thread_t::end_copy() { - add_buffer(std::make_unique()); + send_command(db_cmd_end_copy_t{}); } void db_copy_thread_t::sync_and_wait() { std::promise barrier; std::future const sync = barrier.get_future(); - add_buffer(std::make_unique(std::move(barrier))); + send_command(db_cmd_sync_t{std::move(barrier)}); sync.wait(); } void db_copy_thread_t::finish() { if (m_worker.joinable()) { - add_buffer(std::make_unique()); + send_command(db_cmd_finish_t{}); m_worker.join(); } } @@ -147,7 +147,7 @@ void db_copy_thread_t::thread_t::operator()() bool done = false; while (!done) { - std::unique_ptr item; + db_cmd_t item{}; { std::unique_lock lock{m_shared->queue_mutex}; m_shared->queue_cond.wait( @@ -158,21 +158,11 @@ void db_copy_thread_t::thread_t::operator()() m_shared->queue_full_cond.notify_one(); } - switch (item->type) { - case db_cmd_t::Cmd_copy: - write_to_db(static_cast(item.get())); - break; - case db_cmd_t::Cmd_end_copy: - finish_copy(); - break; - case db_cmd_t::Cmd_sync: - finish_copy(); - static_cast(item.get())->barrier.set_value(); - break; - case db_cmd_t::Cmd_finish: - done = true; - break; - } + done = std::visit( + [&](auto &&cmd) { + return execute(std::forward(cmd)); + }, + item); } finish_copy(); @@ -184,20 +174,36 @@ void db_copy_thread_t::thread_t::operator()() } } -void db_copy_thread_t::thread_t::write_to_db(db_cmd_copy_t *buffer) +template +bool db_copy_thread_t::thread_t::execute(db_cmd_copy_delete_t &cmd) { - if (buffer->has_deletables() || - (m_inflight && !buffer->target->same_copy_target(*m_inflight))) { + if (cmd.has_deletables() || + (m_inflight && !cmd.target->same_copy_target(*m_inflight))) { finish_copy(); } - buffer->delete_data(*m_db_connection); + cmd.delete_data(*m_db_connection); if (!m_inflight) { - start_copy(buffer->target); + start_copy(cmd.target); } - m_db_connection->copy_send(buffer->buffer, buffer->target->name()); + m_db_connection->copy_send(cmd.buffer, cmd.target->name()); + + return false; +} + +bool db_copy_thread_t::thread_t::execute(db_cmd_end_copy_t &) +{ + finish_copy(); + return false; +} + +bool db_copy_thread_t::thread_t::execute(db_cmd_sync_t &cmd) +{ + finish_copy(); + cmd.barrier.set_value(); + return false; } void db_copy_thread_t::thread_t::start_copy( diff --git a/src/db-copy.hpp b/src/db-copy.hpp index af4e1fa0d..766f3451f 100644 --- a/src/db-copy.hpp +++ b/src/db-copy.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include "osmtypes.hpp" @@ -136,29 +137,7 @@ class db_deleter_by_type_and_id_t bool m_has_type = false; }; -/** - * A command for the copy thread to execute. - */ -class db_cmd_t -{ -public: - enum cmd_t - { - Cmd_copy, ///< Copy buffer content into given target. - Cmd_end_copy, ///< End COPY command. - Cmd_sync, ///< Synchronize with parent. - Cmd_finish - }; - - virtual ~db_cmd_t() = default; - - cmd_t type; - -protected: - explicit db_cmd_t(cmd_t t) : type(t) {} -}; - -struct db_cmd_copy_t : public db_cmd_t +struct db_cmd_copy_t { enum { @@ -183,14 +162,15 @@ struct db_cmd_copy_t : public db_cmd_t /// actual copy buffer std::string buffer; - virtual bool has_deletables() const noexcept = 0; - virtual void delete_data(pg_conn_t const &db_connection) = 0; + db_cmd_copy_t() = default; explicit db_cmd_copy_t(std::shared_ptr t) - : db_cmd_t(db_cmd_t::Cmd_copy), target(std::move(t)) + : target(std::move(t)) { buffer.reserve(Max_buf_size); } + + explicit operator bool() const noexcept { return target != nullptr; } }; template @@ -205,12 +185,9 @@ class db_cmd_copy_delete_t : public db_cmd_copy_t return (buffer.size() > Max_buf_size - 100) || m_deleter.is_full(); } - bool has_deletables() const noexcept override - { - return m_deleter.has_data(); - } + bool has_deletables() const noexcept { return m_deleter.has_data(); } - void delete_data(pg_conn_t const &db_connection) override + void delete_data(pg_conn_t const &db_connection) { if (m_deleter.has_data()) { m_deleter.delete_rows( @@ -230,25 +207,30 @@ class db_cmd_copy_delete_t : public db_cmd_copy_t DELETER m_deleter; }; -struct db_cmd_end_copy_t : public db_cmd_t +struct db_cmd_end_copy_t { - db_cmd_end_copy_t() : db_cmd_t(db_cmd_t::Cmd_end_copy) {} }; -struct db_cmd_sync_t : public db_cmd_t +struct db_cmd_sync_t { std::promise barrier; - explicit db_cmd_sync_t(std::promise &&b) - : db_cmd_t(db_cmd_t::Cmd_sync), barrier(std::move(b)) - {} + explicit db_cmd_sync_t(std::promise &&b) : barrier(std::move(b)) {} }; -struct db_cmd_finish_t : public db_cmd_t +struct db_cmd_finish_t { - db_cmd_finish_t() : db_cmd_t(db_cmd_t::Cmd_finish) {} }; +/** + * This type implements the commands that can be sent through the worker + * queue to the worker thread. + */ +using db_cmd_t = + std::variant, + db_cmd_copy_delete_t, + db_cmd_end_copy_t, db_cmd_sync_t, db_cmd_finish_t>; + /** * The manager for the worker thread that streams copy data into the database. */ @@ -265,17 +247,13 @@ class db_copy_thread_t ~db_copy_thread_t(); - /** - * Add another command for the worker. - */ - void add_buffer(std::unique_ptr &&buffer); + /// Add a command to the worker queue. + void send_command(db_cmd_t &&buffer); - /// Close COPY if one is open + /// Close COPY if one is open. void end_copy(); - /** - * Send sync command and wait for the notification. - */ + /// Send sync command and wait for it to finish. void sync_and_wait(); /** @@ -292,7 +270,7 @@ class db_copy_thread_t std::mutex queue_mutex; std::condition_variable queue_cond; std::condition_variable queue_full_cond; - std::deque> worker_queue; + std::deque worker_queue; }; // This is the class that actually instantiated and run in the thread. @@ -304,7 +282,15 @@ class db_copy_thread_t void operator()(); private: - void write_to_db(db_cmd_copy_t *buffer); + template + bool execute(db_cmd_copy_delete_t &cmd); + + bool execute(db_cmd_end_copy_t &); + + bool execute(db_cmd_sync_t &cmd); + + static bool execute(db_cmd_finish_t &) { return true; } + void start_copy(std::shared_ptr const &target); void finish_copy(); void delete_rows(db_cmd_copy_t *buffer); diff --git a/tests/test-db-copy-thread.cpp b/tests/test-db-copy-thread.cpp index 3e6ed2d65..728aa8b38 100644 --- a/tests/test-db-copy-thread.cpp +++ b/tests/test-db-copy-thread.cpp @@ -31,16 +31,16 @@ TEST_CASE("db_copy_thread_t with db_deleter_by_id_t") db_copy_thread_t t{db.connection_params()}; using cmd_copy_t = db_cmd_copy_delete_t; - auto cmd = std::make_unique(table); SECTION("simple copy command") { SECTION("add one copy line and sync") { - cmd->buffer += "42\n"; + cmd_copy_t cmd{table}; + cmd.buffer += "42\n"; - t.add_buffer(std::unique_ptr(cmd.release())); + t.send_command(std::move(cmd)); t.sync_and_wait(); REQUIRE(conn.result_as_int("SELECT id FROM test_copy_thread") == @@ -49,9 +49,10 @@ TEST_CASE("db_copy_thread_t with db_deleter_by_id_t") SECTION("add multiple rows and sync") { - cmd->buffer += "101\n 23\n 900\n"; + cmd_copy_t cmd{table}; + cmd.buffer += "101\n 23\n 900\n"; - t.add_buffer(std::unique_ptr(cmd.release())); + t.send_command(std::move(cmd)); t.sync_and_wait(); REQUIRE(table_count(conn) == 3); @@ -59,9 +60,10 @@ TEST_CASE("db_copy_thread_t with db_deleter_by_id_t") SECTION("add one line and finish") { - cmd->buffer += "2\n"; + cmd_copy_t cmd{table}; + cmd.buffer += "2\n"; - t.add_buffer(std::unique_ptr(cmd.release())); + t.send_command(std::move(cmd)); t.finish(); REQUIRE(conn.result_as_int("SELECT id FROM test_copy_thread") == 2); @@ -70,18 +72,18 @@ TEST_CASE("db_copy_thread_t with db_deleter_by_id_t") SECTION("delete command") { - cmd->buffer += "42\n43\n133\n223\n224\n"; - t.add_buffer(std::unique_ptr(cmd.release())); + cmd_copy_t cmd{table}; + cmd.buffer += "42\n43\n133\n223\n224\n"; + t.send_command(std::move(cmd)); t.sync_and_wait(); - cmd = std::make_unique(table); - SECTION("simple delete of existing rows") { - cmd->add_deletable(223); - cmd->add_deletable(42); + cmd = cmd_copy_t{table}; + cmd.add_deletable(223); + cmd.add_deletable(42); - t.add_buffer(std::unique_ptr(cmd.release())); + t.send_command(std::move(cmd)); t.sync_and_wait(); REQUIRE(table_count(conn, "WHERE id = 42") == 0); @@ -90,10 +92,11 @@ TEST_CASE("db_copy_thread_t with db_deleter_by_id_t") SECTION("delete one and add another") { - cmd->add_deletable(133); - cmd->buffer += "134\n"; + cmd = cmd_copy_t{table}; + cmd.add_deletable(133); + cmd.buffer += "134\n"; - t.add_buffer(std::unique_ptr(cmd.release())); + t.send_command(std::move(cmd)); t.sync_and_wait(); REQUIRE(table_count(conn, "WHERE id = 133") == 0); @@ -102,10 +105,11 @@ TEST_CASE("db_copy_thread_t with db_deleter_by_id_t") SECTION("delete one and add the same") { - cmd->add_deletable(133); - cmd->buffer += "133\n"; + cmd = cmd_copy_t{table}; + cmd.add_deletable(133); + cmd.buffer += "133\n"; - t.add_buffer(std::unique_ptr(cmd.release())); + t.send_command(std::move(cmd)); t.sync_and_wait(); REQUIRE(table_count(conn, "WHERE id = 133") == 1); @@ -114,12 +118,13 @@ TEST_CASE("db_copy_thread_t with db_deleter_by_id_t") SECTION("multi buffer add without delete") { - cmd->buffer += "542\n5543\n10133\n"; - t.add_buffer(std::unique_ptr(cmd.release())); + cmd_copy_t cmd{table}; + cmd.buffer += "542\n5543\n10133\n"; + t.send_command(std::move(cmd)); - cmd = std::make_unique(table); - cmd->buffer += "12\n784\n523\n"; - t.add_buffer(std::unique_ptr(cmd.release())); + cmd = cmd_copy_t{table}; + cmd.buffer += "12\n784\n523\n"; + t.send_command(std::move(cmd)); t.finish(); @@ -130,13 +135,14 @@ TEST_CASE("db_copy_thread_t with db_deleter_by_id_t") SECTION("multi buffer add with delete") { - cmd->buffer += "542\n5543\n10133\n"; - t.add_buffer(std::unique_ptr(cmd.release())); - - cmd = std::make_unique(table); - cmd->add_deletable(542); - cmd->buffer += "12\n"; - t.add_buffer(std::unique_ptr(cmd.release())); + cmd_copy_t cmd{table}; + cmd.buffer += "542\n5543\n10133\n"; + t.send_command(std::move(cmd)); + + cmd = cmd_copy_t{table}; + cmd.add_deletable(542); + cmd.buffer += "12\n"; + t.send_command(std::move(cmd)); t.finish(); From 6bd67a749d890a092557c8812fd67c0d3f170dce Mon Sep 17 00:00:00 2001 From: Jochen Topf Date: Sun, 7 Jul 2024 22:25:50 +0200 Subject: [PATCH 2/2] Create db connection in main thread and move it into the thread This is simpler than having the connection in a unique_ptr in the worker thread. Open the connection in the main thread also means any errors happen still in the main thread where they are trivial to handle. --- src/db-copy-mgr.hpp | 3 +-- src/db-copy.cpp | 22 +++++++++------------- src/db-copy.hpp | 4 ++-- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/db-copy-mgr.hpp b/src/db-copy-mgr.hpp index 88767ba75..108900cc4 100644 --- a/src/db-copy-mgr.hpp +++ b/src/db-copy-mgr.hpp @@ -264,8 +264,7 @@ class db_copy_mgr_t { // flush current buffer if there is one if (m_current) { - m_processor->send_command( - db_cmd_copy_delete_t{std::move(m_current)}); + m_processor->send_command(std::move(m_current)); m_current = {}; } // close any ongoing copy operations diff --git a/src/db-copy.cpp b/src/db-copy.cpp index 0753fe6e7..a58a9aea3 100644 --- a/src/db-copy.cpp +++ b/src/db-copy.cpp @@ -87,7 +87,8 @@ db_copy_thread_t::db_copy_thread_t(connection_params_t const &connection_params) // Connection params are captured by copy here, because we don't know // whether the reference will still be valid once we get around to running // the thread. - m_worker = std::thread{thread_t{connection_params, &m_shared}}; + m_worker = + std::thread{thread_t{pg_conn_t{connection_params, "copy"}, &m_shared}}; } db_copy_thread_t::~db_copy_thread_t() { finish(); } @@ -126,24 +127,21 @@ void db_copy_thread_t::finish() } } -db_copy_thread_t::thread_t::thread_t(connection_params_t connection_params, +db_copy_thread_t::thread_t::thread_t(pg_conn_t &&db_connection, shared *shared) -: m_connection_params(std::move(connection_params)), m_shared(shared) +: m_db_connection(std::move(db_connection)), m_shared(shared) {} void db_copy_thread_t::thread_t::operator()() { try { - m_db_connection = - std::make_unique(m_connection_params, "copy"); - // Disable sequential scan on database tables in the copy threads. // The copy threads only do COPYs (which are unaffected by this // setting) and DELETEs which we know benefit from the index. For // some reason PostgreSQL chooses in some cases not to use that index, // possibly because the DELETEs get a large list of ids to delete of // which many are not in the table which confuses the query planner. - m_db_connection->exec("SET enable_seqscan = off"); + m_db_connection.exec("SET enable_seqscan = off"); bool done = false; while (!done) { @@ -166,8 +164,6 @@ void db_copy_thread_t::thread_t::operator()() } finish_copy(); - - m_db_connection.reset(); } catch (std::runtime_error const &e) { log_error("DB copy thread failed: {}", e.what()); std::exit(2); // NOLINT(concurrency-mt-unsafe) @@ -182,13 +178,13 @@ bool db_copy_thread_t::thread_t::execute(db_cmd_copy_delete_t &cmd) finish_copy(); } - cmd.delete_data(*m_db_connection); + cmd.delete_data(m_db_connection); if (!m_inflight) { start_copy(cmd.target); } - m_db_connection->copy_send(cmd.buffer, cmd.target->name()); + m_db_connection.copy_send(cmd.buffer, cmd.target->name()); return false; } @@ -224,7 +220,7 @@ void db_copy_thread_t::thread_t::start_copy( } sql.push_back('\0'); - m_db_connection->copy_start(sql.data()); + m_db_connection.copy_start(sql.data()); m_inflight = target; } @@ -232,7 +228,7 @@ void db_copy_thread_t::thread_t::start_copy( void db_copy_thread_t::thread_t::finish_copy() { if (m_inflight) { - m_db_connection->copy_end(m_inflight->name()); + m_db_connection.copy_end(m_inflight->name()); m_inflight.reset(); } } diff --git a/src/db-copy.hpp b/src/db-copy.hpp index 766f3451f..bd5946a0d 100644 --- a/src/db-copy.hpp +++ b/src/db-copy.hpp @@ -277,7 +277,7 @@ class db_copy_thread_t class thread_t { public: - thread_t(connection_params_t connection_params, shared *shared); + thread_t(pg_conn_t &&db_connection, shared *shared); void operator()(); @@ -296,7 +296,7 @@ class db_copy_thread_t void delete_rows(db_cmd_copy_t *buffer); connection_params_t m_connection_params; - std::unique_ptr m_db_connection; + pg_conn_t m_db_connection; // Target for copy operation currently ongoing. std::shared_ptr m_inflight;