Skip to content

Commit 6bd67a7

Browse files
committed
Create db connection in main thread and move it into the thread
This is simpler than having the connection in a unique_ptr in the worker thread. Open the connection in the main thread also means any errors happen still in the main thread where they are trivial to handle.
1 parent 0ce67d1 commit 6bd67a7

File tree

3 files changed

+12
-17
lines changed

3 files changed

+12
-17
lines changed

src/db-copy-mgr.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,7 @@ class db_copy_mgr_t
264264
{
265265
// flush current buffer if there is one
266266
if (m_current) {
267-
m_processor->send_command(
268-
db_cmd_copy_delete_t<DELETER>{std::move(m_current)});
267+
m_processor->send_command(std::move(m_current));
269268
m_current = {};
270269
}
271270
// close any ongoing copy operations

src/db-copy.cpp

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ db_copy_thread_t::db_copy_thread_t(connection_params_t const &connection_params)
8787
// Connection params are captured by copy here, because we don't know
8888
// whether the reference will still be valid once we get around to running
8989
// the thread.
90-
m_worker = std::thread{thread_t{connection_params, &m_shared}};
90+
m_worker =
91+
std::thread{thread_t{pg_conn_t{connection_params, "copy"}, &m_shared}};
9192
}
9293

9394
db_copy_thread_t::~db_copy_thread_t() { finish(); }
@@ -126,24 +127,21 @@ void db_copy_thread_t::finish()
126127
}
127128
}
128129

129-
db_copy_thread_t::thread_t::thread_t(connection_params_t connection_params,
130+
db_copy_thread_t::thread_t::thread_t(pg_conn_t &&db_connection,
130131
shared *shared)
131-
: m_connection_params(std::move(connection_params)), m_shared(shared)
132+
: m_db_connection(std::move(db_connection)), m_shared(shared)
132133
{}
133134

134135
void db_copy_thread_t::thread_t::operator()()
135136
{
136137
try {
137-
m_db_connection =
138-
std::make_unique<pg_conn_t>(m_connection_params, "copy");
139-
140138
// Disable sequential scan on database tables in the copy threads.
141139
// The copy threads only do COPYs (which are unaffected by this
142140
// setting) and DELETEs which we know benefit from the index. For
143141
// some reason PostgreSQL chooses in some cases not to use that index,
144142
// possibly because the DELETEs get a large list of ids to delete of
145143
// which many are not in the table which confuses the query planner.
146-
m_db_connection->exec("SET enable_seqscan = off");
144+
m_db_connection.exec("SET enable_seqscan = off");
147145

148146
bool done = false;
149147
while (!done) {
@@ -166,8 +164,6 @@ void db_copy_thread_t::thread_t::operator()()
166164
}
167165

168166
finish_copy();
169-
170-
m_db_connection.reset();
171167
} catch (std::runtime_error const &e) {
172168
log_error("DB copy thread failed: {}", e.what());
173169
std::exit(2); // NOLINT(concurrency-mt-unsafe)
@@ -182,13 +178,13 @@ bool db_copy_thread_t::thread_t::execute(db_cmd_copy_delete_t<DELETER> &cmd)
182178
finish_copy();
183179
}
184180

185-
cmd.delete_data(*m_db_connection);
181+
cmd.delete_data(m_db_connection);
186182

187183
if (!m_inflight) {
188184
start_copy(cmd.target);
189185
}
190186

191-
m_db_connection->copy_send(cmd.buffer, cmd.target->name());
187+
m_db_connection.copy_send(cmd.buffer, cmd.target->name());
192188

193189
return false;
194190
}
@@ -224,15 +220,15 @@ void db_copy_thread_t::thread_t::start_copy(
224220
}
225221

226222
sql.push_back('\0');
227-
m_db_connection->copy_start(sql.data());
223+
m_db_connection.copy_start(sql.data());
228224

229225
m_inflight = target;
230226
}
231227

232228
void db_copy_thread_t::thread_t::finish_copy()
233229
{
234230
if (m_inflight) {
235-
m_db_connection->copy_end(m_inflight->name());
231+
m_db_connection.copy_end(m_inflight->name());
236232
m_inflight.reset();
237233
}
238234
}

src/db-copy.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ class db_copy_thread_t
277277
class thread_t
278278
{
279279
public:
280-
thread_t(connection_params_t connection_params, shared *shared);
280+
thread_t(pg_conn_t &&db_connection, shared *shared);
281281

282282
void operator()();
283283

@@ -296,7 +296,7 @@ class db_copy_thread_t
296296
void delete_rows(db_cmd_copy_t *buffer);
297297

298298
connection_params_t m_connection_params;
299-
std::unique_ptr<pg_conn_t> m_db_connection;
299+
pg_conn_t m_db_connection;
300300

301301
// Target for copy operation currently ongoing.
302302
std::shared_ptr<db_target_descr_t> m_inflight;

0 commit comments

Comments
 (0)