Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Simplifiy code by using std::variant for copy mgr #2209

Merged
merged 2 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 55 additions & 54 deletions src/db-copy-mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,19 @@ class db_copy_mgr_t
*/
void new_line(std::shared_ptr<db_target_descr_t> const &table)
{
if (!m_current || !m_current->target->same_copy_target(*table)) {
if (!m_current || !m_current.target->same_copy_target(*table)) {
if (m_current) {
m_processor->add_buffer(std::move(m_current));
m_processor->send_command(std::move(m_current));
}

m_current = std::make_unique<db_cmd_copy_delete_t<DELETER>>(table);
m_current = db_cmd_copy_delete_t<DELETER>(table);
}
m_committed = m_current->buffer.size();
m_committed = m_current.buffer.size();
}

void rollback_line()
{
assert(m_current);
m_current->buffer.resize(m_committed);
m_current.buffer.resize(m_committed);
}

/**
Expand All @@ -62,16 +61,17 @@ class db_copy_mgr_t
{
assert(m_current);

auto &buf = m_current->buffer;
auto &buf = m_current.buffer;
assert(!buf.empty());

// Expect that a column has been written last which ended in a '\t'.
// Replace it with the row delimiter '\n'.
assert(buf.back() == '\t');
buf.back() = '\n';

if (m_current->is_full()) {
m_processor->add_buffer(std::move(m_current));
if (m_current.is_full()) {
m_processor->send_command(std::move(m_current));
m_current = {};
}
}

Expand All @@ -96,15 +96,15 @@ class db_copy_mgr_t
void add_column(T value)
{
add_value(value);
m_current->buffer += '\t';
m_current.buffer += '\t';
}

/**
* Add an empty column.
*
* Adds a NULL value for the column.
*/
void add_null_column() { m_current->buffer += "\\N\t"; }
void add_null_column() { m_current.buffer += "\\N\t"; }

/**
* Start an array column.
Expand All @@ -113,7 +113,7 @@ class db_copy_mgr_t
*
* Must be finished with a call to finish_array().
*/
void new_array() { m_current->buffer += "{"; }
void new_array() { m_current.buffer += "{"; }

/**
* Add a single value to an array column.
Expand All @@ -124,7 +124,7 @@ class db_copy_mgr_t
void add_array_elem(osmid_t value)
{
add_value(value);
m_current->buffer += ',';
m_current.buffer += ',';
}

/**
Expand All @@ -135,13 +135,13 @@ class db_copy_mgr_t
*/
void finish_array()
{
assert(!m_current->buffer.empty());
if (m_current->buffer.back() == '{') {
m_current->buffer += '}';
assert(!m_current.buffer.empty());
if (m_current.buffer.back() == '{') {
m_current.buffer += '}';
} else {
m_current->buffer.back() = '}';
m_current.buffer.back() = '}';
}
m_current->buffer += '\t';
m_current.buffer += '\t';
}

/**
Expand Down Expand Up @@ -172,11 +172,11 @@ class db_copy_mgr_t
*/
void add_hash_elem(char const *k, char const *v)
{
m_current->buffer += '"';
m_current.buffer += '"';
add_escaped_string(k);
m_current->buffer += "\"=>\"";
m_current.buffer += "\"=>\"";
add_escaped_string(v);
m_current->buffer += "\",";
m_current.buffer += "\",";
}

/**
Expand All @@ -187,11 +187,11 @@ class db_copy_mgr_t
*/
void add_hash_elem_noescape(char const *k, char const *v)
{
m_current->buffer += '"';
m_current->buffer += k;
m_current->buffer += "\"=>\"";
m_current->buffer += v;
m_current->buffer += "\",";
m_current.buffer += '"';
m_current.buffer += k;
m_current.buffer += "\"=>\"";
m_current.buffer += v;
m_current.buffer += "\",";
}

/**
Expand All @@ -207,11 +207,11 @@ class db_copy_mgr_t
template <typename T>
void add_hstore_num_noescape(char const *k, T const value)
{
m_current->buffer += '"';
m_current->buffer += k;
m_current->buffer += "\"=>\"";
m_current->buffer += std::to_string(value);
m_current->buffer += "\",";
m_current.buffer += '"';
m_current.buffer += k;
m_current.buffer += "\"=>\"";
m_current.buffer += std::to_string(value);
m_current.buffer += "\",";
}

/**
Expand All @@ -222,11 +222,11 @@ class db_copy_mgr_t
*/
void finish_hash()
{
auto const idx = m_current->buffer.size() - 1;
if (!m_current->buffer.empty() && m_current->buffer[idx] == ',') {
m_current->buffer[idx] = '\t';
auto const idx = m_current.buffer.size() - 1;
if (!m_current.buffer.empty() && m_current.buffer[idx] == ',') {
m_current.buffer[idx] = '\t';
} else {
m_current->buffer += '\t';
m_current.buffer += '\t';
}
}

Expand All @@ -241,10 +241,10 @@ class db_copy_mgr_t

for (auto c : wkb) {
unsigned int const num = static_cast<unsigned char>(c);
m_current->buffer += lookup_hex[(num >> 4U) & 0xfU];
m_current->buffer += lookup_hex[num & 0xfU];
m_current.buffer += lookup_hex[(num >> 4U) & 0xfU];
m_current.buffer += lookup_hex[num & 0xfU];
}
m_current->buffer += '\t';
m_current.buffer += '\t';
}

/**
Expand All @@ -257,14 +257,15 @@ class db_copy_mgr_t
void delete_object(ARGS &&... args)
{
assert(m_current);
m_current->add_deletable(std::forward<ARGS>(args)...);
m_current.add_deletable(std::forward<ARGS>(args)...);
}

void flush()
{
// flush current buffer if there is one
if (m_current) {
m_processor->add_buffer(std::move(m_current));
m_processor->send_command(std::move(m_current));
m_current = {};
}
// close any ongoing copy operations
m_processor->end_copy();
Expand All @@ -285,7 +286,7 @@ class db_copy_mgr_t
template <typename T>
void add_value(T value)
{
m_current->buffer += fmt::to_string(value);
m_current.buffer += fmt::to_string(value);
}

void add_value(std::string const &s) { add_value(s.c_str()); }
Expand All @@ -296,22 +297,22 @@ class db_copy_mgr_t
for (char const *c = s; *c; ++c) {
switch (*c) {
case '"':
m_current->buffer += "\\\"";
m_current.buffer += "\\\"";
break;
case '\\':
m_current->buffer += "\\\\";
m_current.buffer += "\\\\";
break;
case '\n':
m_current->buffer += "\\n";
m_current.buffer += "\\n";
break;
case '\r':
m_current->buffer += "\\r";
m_current.buffer += "\\r";
break;
case '\t':
m_current->buffer += "\\t";
m_current.buffer += "\\t";
break;
default:
m_current->buffer += *c;
m_current.buffer += *c;
break;
}
}
Expand All @@ -322,29 +323,29 @@ class db_copy_mgr_t
for (char const *c = s; *c; ++c) {
switch (*c) {
case '"':
m_current->buffer += R"(\\")";
m_current.buffer += R"(\\")";
break;
case '\\':
m_current->buffer += R"(\\\\)";
m_current.buffer += R"(\\\\)";
break;
case '\n':
m_current->buffer += "\\n";
m_current.buffer += "\\n";
break;
case '\r':
m_current->buffer += "\\r";
m_current.buffer += "\\r";
break;
case '\t':
m_current->buffer += "\\t";
m_current.buffer += "\\t";
break;
default:
m_current->buffer += *c;
m_current.buffer += *c;
break;
}
}
}

std::shared_ptr<db_copy_thread_t> m_processor;
std::unique_ptr<db_cmd_copy_delete_t<DELETER>> m_current;
db_cmd_copy_delete_t<DELETER> m_current;
std::size_t m_committed = 0;
};

Expand Down
Loading
Loading