Skip to content

Commit

Permalink
feat: refactor domain objects deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
fpelliccioni committed Oct 24, 2024
1 parent 4e7beb6 commit 9e431d6
Show file tree
Hide file tree
Showing 25 changed files with 405 additions and 178 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class KnuthDatabaseConan(KnuthConanFileV2):

def build_requirements(self):
if self.options.tests:
self.test_requires("catch2/3.6.0")
self.test_requires("catch2/3.7.1")

def requirements(self):
self.requires("domain/0.37.0", transitive_headers=True, transitive_libs=True)
Expand Down
88 changes: 86 additions & 2 deletions include/kth/database/data_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,87 @@ class KD_API data_base : public store, noncopyable {
using result_handler = handle0;
using path = kth::path;

struct block_cache_t {
using block_t = domain::chain::block;
using blocks_t = std::vector<block_t>;

block_cache_t(size_t max_size, internal_database* db)
: max_size_(max_size)
, db_(db)
{}

~block_cache_t() {
flush_to_db();
}

void add_block(block_t const& blk, size_t height) {
cache_.emplace_back(blk);

if (height_ == max_size_t) {
height_ = height;
}

if (cache_.size() >= max_size_) {
flush_to_db();
}
}

void add_block(block_t&& blk, size_t height) {
cache_.emplace_back(std::move(blk));
if (height_ == max_size_t) {
height_ = height;
}
if (cache_.size() >= max_size_) {
flush_to_db();
}
}

void add_blocks(blocks_t const& blocks, size_t height) {
cache_.insert(cache_.end(), blocks.begin(), blocks.end());

if (height_ == max_size_t) {
height_ = height;
}

if (cache_.size() >= max_size_) {
flush_to_db();
}
}

void add_blocks(blocks_t&& blocks, size_t height) {
cache_.insert(
cache_.end(),
std::make_move_iterator(blocks.begin()),
std::make_move_iterator(blocks.end())
);

if (height_ == max_size_t) {
height_ = height;
}

if (cache_.size() >= max_size_) {
flush_to_db();
}
}

void flush_to_db() {
if ( ! db_) {
LOG_ERROR(LOG_DATABASE, "Internal database not set, when trying to flush block cache");
return;
}
db_->push_blocks(cache_, height_);
height_ = max_size_t;
cache_.clear();
}

private:
size_t height_ = max_size_t;
size_t max_size_;
blocks_t cache_;
internal_database* db_ = nullptr;
};


// Construct.
// ----------------------------------------------------------------------------

Expand All @@ -52,6 +133,9 @@ class KD_API data_base : public store, noncopyable {
/// Call close on destruct.
~data_base();


bool is_stale() const;

// Readers.
// ------------------------------------------------------------------------

Expand All @@ -61,8 +145,6 @@ class KD_API data_base : public store, noncopyable {
// ------------------------------------------------------------------------

#if ! defined(KTH_DB_READONLY)


/// Store a block in the database.
/// Returns store_block_duplicate if a block already exists at height.
code insert(domain::chain::block const& block, size_t height);
Expand Down Expand Up @@ -132,8 +214,10 @@ class KD_API data_base : public store, noncopyable {
void handle_push(code const& ec, result_handler handler) const;
#endif // ! defined(KTH_DB_READONLY)


std::atomic<bool> closed_;
settings const& settings_;
std::optional<block_cache_t> block_cache_;
};

} // namespace kth::database
Expand Down
2 changes: 1 addition & 1 deletion include/kth/database/databases/block_database.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ domain::chain::block internal_database_basis<Clock>::get_block(uint32_t height,
}

auto data = db_value_to_data_chunk(value);
auto res = domain::create<domain::chain::block>(data);
auto res = domain::create_old<domain::chain::block>(data);
return res;
}
// db_mode_ == db_mode_type::pruned {
Expand Down
22 changes: 1 addition & 21 deletions include/kth/database/databases/header_abla_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,8 @@ void to_data_with_abla_state(W& sink, domain::chain::block const& block) {
}
}

std::optional<header_with_abla_state_t> get_header_and_abla_state_from_data(data_chunk const& data);
std::optional<header_with_abla_state_t> get_header_and_abla_state_from_data(std::istream& stream);
expect<header_with_abla_state_t> get_header_and_abla_state_from_data(byte_reader& reader);

template <typename R, KTH_IS_READER(R)>
std::optional<header_with_abla_state_t> get_header_and_abla_state_from_data(R& source) {
domain::chain::header header;
header.from_data(source, true);

if ( ! source) {
return {};
}

uint64_t block_size = source.read_8_bytes_little_endian();
uint64_t control_block_size = source.read_8_bytes_little_endian();
uint64_t elastic_buffer_size = source.read_8_bytes_little_endian();

if ( ! source) {
return std::make_tuple(std::move(header), 0, 0, 0);
}

return std::make_tuple(std::move(header), block_size, control_block_size, elastic_buffer_size);
}

} // namespace kth::database

Expand Down
6 changes: 4 additions & 2 deletions include/kth/database/databases/header_database.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ domain::chain::header internal_database_basis<Clock>::get_header(uint32_t height
}

auto data = db_value_to_data_chunk(value);
auto opt = get_header_and_abla_state_from_data(data);
byte_reader reader(data);
auto opt = get_header_and_abla_state_from_data(reader);
if ( ! opt) {
return {};
}
Expand All @@ -74,7 +75,8 @@ std::optional<header_with_abla_state_t> internal_database_basis<Clock>::get_head
}

auto data = db_value_to_data_chunk(value);
auto opt = get_header_and_abla_state_from_data(data);
byte_reader reader(data);
auto opt = get_header_and_abla_state_from_data(reader);
if ( ! opt) {
return {};
}
Expand Down
12 changes: 6 additions & 6 deletions include/kth/database/databases/history_database.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ domain::chain::history_compact::list internal_database_basis<Clock>::get_history
if ((rc = kth_db_cursor_get(cursor, &key_hash, &value, MDB_SET)) == 0) {

auto data = db_value_to_data_chunk(value);
auto entry = domain::create<history_entry>(data);
auto entry = domain::create_old<history_entry>(data);

if (from_height == 0 || entry.height() >= from_height) {
result.push_back(history_entry_to_history_compact(entry));
Expand All @@ -174,7 +174,7 @@ domain::chain::history_compact::list internal_database_basis<Clock>::get_history
}

auto data = db_value_to_data_chunk(value);
auto entry = domain::create<history_entry>(data);
auto entry = domain::create_old<history_entry>(data);

if (from_height == 0 || entry.height() >= from_height) {
result.push_back(history_entry_to_history_compact(entry));
Expand Down Expand Up @@ -220,7 +220,7 @@ std::vector<hash_digest> internal_database_basis<Clock>::get_history_txns(short_
if ((rc = kth_db_cursor_get(cursor, &key_hash, &value, MDB_SET)) == 0) {

auto data = db_value_to_data_chunk(value);
auto entry = domain::create<history_entry>(data);
auto entry = domain::create_old<history_entry>(data);

if (from_height == 0 || entry.height() >= from_height) {
// Avoid inserting the same tx
Expand All @@ -238,7 +238,7 @@ std::vector<hash_digest> internal_database_basis<Clock>::get_history_txns(short_
}

auto data = db_value_to_data_chunk(value);
auto entry = domain::create<history_entry>(data);
auto entry = domain::create_old<history_entry>(data);

if (from_height == 0 || entry.height() >= from_height) {
// Avoid inserting the same tx
Expand Down Expand Up @@ -333,7 +333,7 @@ result_code internal_database_basis<Clock>::remove_history_db(short_hash const&
if ((rc = kth_db_cursor_get(cursor, &key_hash, &value, MDB_SET)) == 0) {

auto data = db_value_to_data_chunk(value);
auto entry = domain::create<history_entry>(data);
auto entry = domain::create_old<history_entry>(data);

if (entry.height() == height) {

Expand All @@ -346,7 +346,7 @@ result_code internal_database_basis<Clock>::remove_history_db(short_hash const&
while ((rc = kth_db_cursor_get(cursor, &key_hash, &value, MDB_NEXT_DUP)) == 0) {

auto data = db_value_to_data_chunk(value);
auto entry = domain::create<history_entry>(data);
auto entry = domain::create_old<history_entry>(data);

if (entry.height() == height) {
if (kth_db_cursor_del(cursor, 0) != KTH_DB_SUCCESS) {
Expand Down
22 changes: 2 additions & 20 deletions include/kth/database/databases/history_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,8 @@ class KD_API history_entry {
factory_to_data(sink,id_, point_, point_kind_, height_, index_, value_or_checksum_ );
}

bool from_data(const data_chunk& data);
bool from_data(std::istream& stream);

template <typename R, KTH_IS_READER(R)>
bool from_data(R& source) {
reset();

id_ = source.read_8_bytes_little_endian();
point_.from_data(source, false);
point_kind_ = static_cast<domain::chain::point_kind>(source.read_byte()),
height_ = source.read_4_bytes_little_endian();
index_ = source.read_4_bytes_little_endian();
value_or_checksum_ = source.read_8_bytes_little_endian();

if ( ! source) {
reset();
}

return source;
}
static
expect<history_entry> from_data(byte_reader& reader);

static
data_chunk factory_to_data(uint64_t id, domain::chain::point const& point, domain::chain::point_kind kind, uint32_t height, uint32_t index, uint64_t value_or_checksum);
Expand Down
6 changes: 6 additions & 0 deletions include/kth/database/databases/internal_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,18 @@ class KD_API internal_database_basis {
bool open();
bool close();

bool is_stale() const;

#if ! defined(KTH_DB_READONLY)
result_code push_genesis(domain::chain::block const& block);

//TODO(fernando): optimization: consider passing a list of outputs to insert and a list of inputs to delete instead of an entire Block.
// avoiding inserting and erasing internal spenders
result_code push_block(domain::chain::block const& block, uint32_t height, uint32_t median_time_past);


result_code push_blocks(std::vector<domain::chain::block> const& blocks, uint32_t height);

#endif

utxo_entry get_utxo(domain::chain::output_point const& point) const;
Expand Down
48 changes: 42 additions & 6 deletions include/kth/database/databases/internal_database.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,40 @@ result_code internal_database_basis<Clock>::push_block(domain::chain::block cons
return res;
}

template <typename Clock>
result_code internal_database_basis<Clock>::push_blocks(std::vector<domain::chain::block> const& blocks, uint32_t height) {

KTH_DB_txn* db_txn;
auto const res0 = kth_db_txn_begin(env_, NULL, 0, &db_txn);
if (res0 != KTH_DB_SUCCESS) {
LOG_ERROR(LOG_DATABASE, "Error begining LMDB Transaction [push_blocks] ", res0);
return result_code::other;
}


for (auto const& block : blocks) {
auto const median_time_past = block.header().validation.median_time_past;
auto const res = push_block(block, height, median_time_past, ! is_old_block(block), db_txn);
if ( ! succeed(res)) {
kth_db_txn_abort(db_txn);
return res;
}
++height;
}

auto const res2 = kth_db_txn_commit(db_txn);
if (res2 != KTH_DB_SUCCESS) {
LOG_ERROR(LOG_DATABASE, "Error commiting LMDB Transaction [push_blocks] ", res2);
return result_code::other;
}

return result_code::success;
}

#endif // ! defined(KTH_DB_READONLY)


//TODO: change the function interface to return expect<utxo_entry>
template <typename Clock>
utxo_entry internal_database_basis<Clock>::get_utxo(domain::chain::output_point const& point, KTH_DB_txn* db_txn) const {

Expand All @@ -263,7 +294,7 @@ utxo_entry internal_database_basis<Clock>::get_utxo(domain::chain::output_point
return utxo_entry{};
}

return domain::create<utxo_entry>(db_value_to_data_chunk(value));
return domain::create_old<utxo_entry>(db_value_to_data_chunk(value));
}

template <typename Clock>
Expand Down Expand Up @@ -411,13 +442,13 @@ domain::chain::header::list internal_database_basis<Clock>::get_headers(uint32_t
}

auto data = db_value_to_data_chunk(value);
list.push_back(domain::create<domain::chain::header>(data));
list.push_back(domain::create_old<domain::chain::header>(data));

while ((rc = kth_db_cursor_get(cursor, &key, &value, KTH_DB_NEXT)) == KTH_DB_SUCCESS) {
auto height = *static_cast<uint32_t*>(kth_db_get_data(key));
if (height > to) break;
auto data = db_value_to_data_chunk(value);
list.push_back(domain::create<domain::chain::header>(data));
list.push_back(domain::create_old<domain::chain::header>(data));
}

kth_db_cursor_close(cursor);
Expand Down Expand Up @@ -523,10 +554,10 @@ result_code internal_database_basis<Clock>::insert_reorg_into_pool(utxo_pool_t&
}

auto entry_data = db_value_to_data_chunk(value);
auto entry = domain::create<utxo_entry>(entry_data);
auto entry = domain::create_old<utxo_entry>(entry_data);

auto point_data = db_value_to_data_chunk(key_point);
auto point = domain::create<domain::chain::output_point>(point_data, KTH_INTERNAL_DB_WIRE);
auto point = domain::create_old<domain::chain::output_point>(point_data, KTH_INTERNAL_DB_WIRE);
pool.insert({point, std::move(entry)}); //TODO(fernando): use emplace?

return result_code::success;
Expand Down Expand Up @@ -674,7 +705,10 @@ bool internal_database_basis<Clock>::create_and_open_environment() {
// throw0(DB_ERROR(lmdb_error("Failed to set max number of readers: ", result).c_str()));
// ----------------------------------------------------------------------------------------------------------------

auto res = kth_db_env_set_mapsize(env_, adjust_db_size(db_max_size_));
LOG_DEBUG(LOG_DATABASE, "DB max size (before adjust): ", db_max_size_);
auto const adjusted_db_size = adjust_db_size(db_max_size_);
LOG_DEBUG(LOG_DATABASE, "Adjusted DB size: ", adjusted_db_size);
auto res = kth_db_env_set_mapsize(env_, adjusted_db_size);
if (res != KTH_DB_SUCCESS) {
LOG_ERROR(LOG_DATABASE, "Error setting max memory map size. Verify do you have enough free space. [create_and_open_environment] ", static_cast<int32_t>(res));
return false;
Expand Down Expand Up @@ -709,6 +743,8 @@ bool internal_database_basis<Clock>::create_and_open_environment() {
mdb_flags |= KTH_DB_WRITEMAP | KTH_DB_MAPASYNC;
}

LOG_DEBUG(LOG_DATABASE, "Opening LMDB Environment in directory: ", db_dir_.string());
LOG_DEBUG(LOG_DATABASE, "LMDB open flags: ", mdb_flags);
res = kth_db_env_open(env_, db_dir_.string().c_str(), mdb_flags, env_open_mode_);
return res == KTH_DB_SUCCESS;
}
Expand Down
2 changes: 1 addition & 1 deletion include/kth/database/databases/reorg_database.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ domain::chain::block internal_database_basis<Clock>::get_block_reorg(uint32_t he
}

auto data = db_value_to_data_chunk(value);
auto res = domain::create<domain::chain::block>(data); //TODO(fernando): mover fuera de la DbTx
auto res = domain::create_old<domain::chain::block>(data); //TODO(fernando): mover fuera de la DbTx
return res;
}

Expand Down
Loading

0 comments on commit 9e431d6

Please sign in to comment.