diff --git a/conanfile.py b/conanfile.py index f4f5d1e2..fec4ccfb 100644 --- a/conanfile.py +++ b/conanfile.py @@ -59,7 +59,7 @@ class KnuthDatabaseConan(KnuthConanFileV2): def build_requirements(self): if self.options.tests: - self.test_requires("catch2/3.6.0") + self.test_requires("catch2/3.7.1") def requirements(self): self.requires("domain/0.37.0", transitive_headers=True, transitive_libs=True) diff --git a/include/kth/database/data_base.hpp b/include/kth/database/data_base.hpp index 0d893b6b..53f37bc7 100644 --- a/include/kth/database/data_base.hpp +++ b/include/kth/database/data_base.hpp @@ -30,6 +30,87 @@ class KD_API data_base : public store, noncopyable { using result_handler = handle0; using path = kth::path; + struct block_cache_t { + using block_t = domain::chain::block; + using blocks_t = std::vector; + + block_cache_t(size_t max_size, internal_database* db) + : max_size_(max_size) + , db_(db) + {} + + ~block_cache_t() { + flush_to_db(); + } + + void add_block(block_t const& blk, size_t height) { + cache_.emplace_back(blk); + + if (height_ == max_size_t) { + height_ = height; + } + + if (cache_.size() >= max_size_) { + flush_to_db(); + } + } + + void add_block(block_t&& blk, size_t height) { + cache_.emplace_back(std::move(blk)); + if (height_ == max_size_t) { + height_ = height; + } + if (cache_.size() >= max_size_) { + flush_to_db(); + } + } + + void add_blocks(blocks_t const& blocks, size_t height) { + cache_.insert(cache_.end(), blocks.begin(), blocks.end()); + + if (height_ == max_size_t) { + height_ = height; + } + + if (cache_.size() >= max_size_) { + flush_to_db(); + } + } + + void add_blocks(blocks_t&& blocks, size_t height) { + cache_.insert( + cache_.end(), + std::make_move_iterator(blocks.begin()), + std::make_move_iterator(blocks.end()) + ); + + if (height_ == max_size_t) { + height_ = height; + } + + if (cache_.size() >= max_size_) { + flush_to_db(); + } + } + + void flush_to_db() { + if ( ! db_) { + LOG_ERROR(LOG_DATABASE, "Internal database not set, when trying to flush block cache"); + return; + } + db_->push_blocks(cache_, height_); + height_ = max_size_t; + cache_.clear(); + } + + private: + size_t height_ = max_size_t; + size_t max_size_; + blocks_t cache_; + internal_database* db_ = nullptr; + }; + + // Construct. // ---------------------------------------------------------------------------- @@ -52,6 +133,9 @@ class KD_API data_base : public store, noncopyable { /// Call close on destruct. ~data_base(); + + bool is_stale() const; + // Readers. // ------------------------------------------------------------------------ @@ -61,8 +145,6 @@ class KD_API data_base : public store, noncopyable { // ------------------------------------------------------------------------ #if ! defined(KTH_DB_READONLY) - - /// Store a block in the database. /// Returns store_block_duplicate if a block already exists at height. code insert(domain::chain::block const& block, size_t height); @@ -132,8 +214,10 @@ class KD_API data_base : public store, noncopyable { void handle_push(code const& ec, result_handler handler) const; #endif // ! defined(KTH_DB_READONLY) + std::atomic closed_; settings const& settings_; + std::optional block_cache_; }; } // namespace kth::database diff --git a/include/kth/database/databases/block_database.ipp b/include/kth/database/databases/block_database.ipp index c510e058..3740d8c0 100644 --- a/include/kth/database/databases/block_database.ipp +++ b/include/kth/database/databases/block_database.ipp @@ -106,7 +106,7 @@ domain::chain::block internal_database_basis::get_block(uint32_t height, } auto data = db_value_to_data_chunk(value); - auto res = domain::create(data); + auto res = domain::create_old(data); return res; } // db_mode_ == db_mode_type::pruned { diff --git a/include/kth/database/databases/header_abla_entry.hpp b/include/kth/database/databases/header_abla_entry.hpp index acdf4a94..90a18971 100644 --- a/include/kth/database/databases/header_abla_entry.hpp +++ b/include/kth/database/databases/header_abla_entry.hpp @@ -30,28 +30,8 @@ void to_data_with_abla_state(W& sink, domain::chain::block const& block) { } } -std::optional get_header_and_abla_state_from_data(data_chunk const& data); -std::optional get_header_and_abla_state_from_data(std::istream& stream); +expect get_header_and_abla_state_from_data(byte_reader& reader); -template -std::optional get_header_and_abla_state_from_data(R& source) { - domain::chain::header header; - header.from_data(source, true); - - if ( ! source) { - return {}; - } - - uint64_t block_size = source.read_8_bytes_little_endian(); - uint64_t control_block_size = source.read_8_bytes_little_endian(); - uint64_t elastic_buffer_size = source.read_8_bytes_little_endian(); - - if ( ! source) { - return std::make_tuple(std::move(header), 0, 0, 0); - } - - return std::make_tuple(std::move(header), block_size, control_block_size, elastic_buffer_size); -} } // namespace kth::database diff --git a/include/kth/database/databases/header_database.ipp b/include/kth/database/databases/header_database.ipp index 44821194..97e79ce5 100644 --- a/include/kth/database/databases/header_database.ipp +++ b/include/kth/database/databases/header_database.ipp @@ -57,7 +57,8 @@ domain::chain::header internal_database_basis::get_header(uint32_t height } auto data = db_value_to_data_chunk(value); - auto opt = get_header_and_abla_state_from_data(data); + byte_reader reader(data); + auto opt = get_header_and_abla_state_from_data(reader); if ( ! opt) { return {}; } @@ -74,7 +75,8 @@ std::optional internal_database_basis::get_head } auto data = db_value_to_data_chunk(value); - auto opt = get_header_and_abla_state_from_data(data); + byte_reader reader(data); + auto opt = get_header_and_abla_state_from_data(reader); if ( ! opt) { return {}; } diff --git a/include/kth/database/databases/history_database.ipp b/include/kth/database/databases/history_database.ipp index 2c994f84..c4e1279a 100644 --- a/include/kth/database/databases/history_database.ipp +++ b/include/kth/database/databases/history_database.ipp @@ -161,7 +161,7 @@ domain::chain::history_compact::list internal_database_basis::get_history if ((rc = kth_db_cursor_get(cursor, &key_hash, &value, MDB_SET)) == 0) { auto data = db_value_to_data_chunk(value); - auto entry = domain::create(data); + auto entry = domain::create_old(data); if (from_height == 0 || entry.height() >= from_height) { result.push_back(history_entry_to_history_compact(entry)); @@ -174,7 +174,7 @@ domain::chain::history_compact::list internal_database_basis::get_history } auto data = db_value_to_data_chunk(value); - auto entry = domain::create(data); + auto entry = domain::create_old(data); if (from_height == 0 || entry.height() >= from_height) { result.push_back(history_entry_to_history_compact(entry)); @@ -220,7 +220,7 @@ std::vector internal_database_basis::get_history_txns(short_ if ((rc = kth_db_cursor_get(cursor, &key_hash, &value, MDB_SET)) == 0) { auto data = db_value_to_data_chunk(value); - auto entry = domain::create(data); + auto entry = domain::create_old(data); if (from_height == 0 || entry.height() >= from_height) { // Avoid inserting the same tx @@ -238,7 +238,7 @@ std::vector internal_database_basis::get_history_txns(short_ } auto data = db_value_to_data_chunk(value); - auto entry = domain::create(data); + auto entry = domain::create_old(data); if (from_height == 0 || entry.height() >= from_height) { // Avoid inserting the same tx @@ -333,7 +333,7 @@ result_code internal_database_basis::remove_history_db(short_hash const& if ((rc = kth_db_cursor_get(cursor, &key_hash, &value, MDB_SET)) == 0) { auto data = db_value_to_data_chunk(value); - auto entry = domain::create(data); + auto entry = domain::create_old(data); if (entry.height() == height) { @@ -346,7 +346,7 @@ result_code internal_database_basis::remove_history_db(short_hash const& while ((rc = kth_db_cursor_get(cursor, &key_hash, &value, MDB_NEXT_DUP)) == 0) { auto data = db_value_to_data_chunk(value); - auto entry = domain::create(data); + auto entry = domain::create_old(data); if (entry.height() == height) { if (kth_db_cursor_del(cursor, 0) != KTH_DB_SUCCESS) { diff --git a/include/kth/database/databases/history_entry.hpp b/include/kth/database/databases/history_entry.hpp index 4298a5bb..9ade8e47 100644 --- a/include/kth/database/databases/history_entry.hpp +++ b/include/kth/database/databases/history_entry.hpp @@ -40,26 +40,8 @@ class KD_API history_entry { factory_to_data(sink,id_, point_, point_kind_, height_, index_, value_or_checksum_ ); } - bool from_data(const data_chunk& data); - bool from_data(std::istream& stream); - - template - bool from_data(R& source) { - reset(); - - id_ = source.read_8_bytes_little_endian(); - point_.from_data(source, false); - point_kind_ = static_cast(source.read_byte()), - height_ = source.read_4_bytes_little_endian(); - index_ = source.read_4_bytes_little_endian(); - value_or_checksum_ = source.read_8_bytes_little_endian(); - - if ( ! source) { - reset(); - } - - return source; - } + static + expect from_data(byte_reader& reader); static data_chunk factory_to_data(uint64_t id, domain::chain::point const& point, domain::chain::point_kind kind, uint32_t height, uint32_t index, uint64_t value_or_checksum); diff --git a/include/kth/database/databases/internal_database.hpp b/include/kth/database/databases/internal_database.hpp index 2cd6acff..f584b69d 100644 --- a/include/kth/database/databases/internal_database.hpp +++ b/include/kth/database/databases/internal_database.hpp @@ -94,12 +94,18 @@ class KD_API internal_database_basis { bool open(); bool close(); + bool is_stale() const; + #if ! defined(KTH_DB_READONLY) result_code push_genesis(domain::chain::block const& block); //TODO(fernando): optimization: consider passing a list of outputs to insert and a list of inputs to delete instead of an entire Block. // avoiding inserting and erasing internal spenders result_code push_block(domain::chain::block const& block, uint32_t height, uint32_t median_time_past); + + + result_code push_blocks(std::vector const& blocks, uint32_t height); + #endif utxo_entry get_utxo(domain::chain::output_point const& point) const; diff --git a/include/kth/database/databases/internal_database.ipp b/include/kth/database/databases/internal_database.ipp index 93063a0f..46554186 100644 --- a/include/kth/database/databases/internal_database.ipp +++ b/include/kth/database/databases/internal_database.ipp @@ -248,9 +248,40 @@ result_code internal_database_basis::push_block(domain::chain::block cons return res; } +template +result_code internal_database_basis::push_blocks(std::vector const& blocks, uint32_t height) { + + KTH_DB_txn* db_txn; + auto const res0 = kth_db_txn_begin(env_, NULL, 0, &db_txn); + if (res0 != KTH_DB_SUCCESS) { + LOG_ERROR(LOG_DATABASE, "Error begining LMDB Transaction [push_blocks] ", res0); + return result_code::other; + } + + + for (auto const& block : blocks) { + auto const median_time_past = block.header().validation.median_time_past; + auto const res = push_block(block, height, median_time_past, ! is_old_block(block), db_txn); + if ( ! succeed(res)) { + kth_db_txn_abort(db_txn); + return res; + } + ++height; + } + + auto const res2 = kth_db_txn_commit(db_txn); + if (res2 != KTH_DB_SUCCESS) { + LOG_ERROR(LOG_DATABASE, "Error commiting LMDB Transaction [push_blocks] ", res2); + return result_code::other; + } + + return result_code::success; +} + #endif // ! defined(KTH_DB_READONLY) +//TODO: change the function interface to return expect template utxo_entry internal_database_basis::get_utxo(domain::chain::output_point const& point, KTH_DB_txn* db_txn) const { @@ -263,7 +294,7 @@ utxo_entry internal_database_basis::get_utxo(domain::chain::output_point return utxo_entry{}; } - return domain::create(db_value_to_data_chunk(value)); + return domain::create_old(db_value_to_data_chunk(value)); } template @@ -411,13 +442,13 @@ domain::chain::header::list internal_database_basis::get_headers(uint32_t } auto data = db_value_to_data_chunk(value); - list.push_back(domain::create(data)); + list.push_back(domain::create_old(data)); while ((rc = kth_db_cursor_get(cursor, &key, &value, KTH_DB_NEXT)) == KTH_DB_SUCCESS) { auto height = *static_cast(kth_db_get_data(key)); if (height > to) break; auto data = db_value_to_data_chunk(value); - list.push_back(domain::create(data)); + list.push_back(domain::create_old(data)); } kth_db_cursor_close(cursor); @@ -523,10 +554,10 @@ result_code internal_database_basis::insert_reorg_into_pool(utxo_pool_t& } auto entry_data = db_value_to_data_chunk(value); - auto entry = domain::create(entry_data); + auto entry = domain::create_old(entry_data); auto point_data = db_value_to_data_chunk(key_point); - auto point = domain::create(point_data, KTH_INTERNAL_DB_WIRE); + auto point = domain::create_old(point_data, KTH_INTERNAL_DB_WIRE); pool.insert({point, std::move(entry)}); //TODO(fernando): use emplace? return result_code::success; @@ -674,7 +705,10 @@ bool internal_database_basis::create_and_open_environment() { // throw0(DB_ERROR(lmdb_error("Failed to set max number of readers: ", result).c_str())); // ---------------------------------------------------------------------------------------------------------------- - auto res = kth_db_env_set_mapsize(env_, adjust_db_size(db_max_size_)); + LOG_DEBUG(LOG_DATABASE, "DB max size (before adjust): ", db_max_size_); + auto const adjusted_db_size = adjust_db_size(db_max_size_); + LOG_DEBUG(LOG_DATABASE, "Adjusted DB size: ", adjusted_db_size); + auto res = kth_db_env_set_mapsize(env_, adjusted_db_size); if (res != KTH_DB_SUCCESS) { LOG_ERROR(LOG_DATABASE, "Error setting max memory map size. Verify do you have enough free space. [create_and_open_environment] ", static_cast(res)); return false; @@ -709,6 +743,8 @@ bool internal_database_basis::create_and_open_environment() { mdb_flags |= KTH_DB_WRITEMAP | KTH_DB_MAPASYNC; } + LOG_DEBUG(LOG_DATABASE, "Opening LMDB Environment in directory: ", db_dir_.string()); + LOG_DEBUG(LOG_DATABASE, "LMDB open flags: ", mdb_flags); res = kth_db_env_open(env_, db_dir_.string().c_str(), mdb_flags, env_open_mode_); return res == KTH_DB_SUCCESS; } diff --git a/include/kth/database/databases/reorg_database.ipp b/include/kth/database/databases/reorg_database.ipp index 90567122..6e63b706 100644 --- a/include/kth/database/databases/reorg_database.ipp +++ b/include/kth/database/databases/reorg_database.ipp @@ -154,7 +154,7 @@ domain::chain::block internal_database_basis::get_block_reorg(uint32_t he } auto data = db_value_to_data_chunk(value); - auto res = domain::create(data); //TODO(fernando): mover fuera de la DbTx + auto res = domain::create_old(data); //TODO(fernando): mover fuera de la DbTx return res; } diff --git a/include/kth/database/databases/spend_database.ipp b/include/kth/database/databases/spend_database.ipp index f2128c7f..2a9a1375 100644 --- a/include/kth/database/databases/spend_database.ipp +++ b/include/kth/database/databases/spend_database.ipp @@ -39,7 +39,7 @@ domain::chain::input_point internal_database_basis::get_spend(domain::cha return domain::chain::input_point{}; } - auto res = domain::create(data); + auto res = domain::create_old(data); return res; } diff --git a/include/kth/database/databases/transaction_database.ipp b/include/kth/database/databases/transaction_database.ipp index c076b0e7..7f2270bd 100644 --- a/include/kth/database/databases/transaction_database.ipp +++ b/include/kth/database/databases/transaction_database.ipp @@ -76,7 +76,7 @@ transaction_entry internal_database_basis::get_transaction(uint64_t id, K } auto data = db_value_to_data_chunk(value); - auto entry = domain::create(data); + auto entry = domain::create_old(data); return entry; } diff --git a/include/kth/database/databases/transaction_entry.hpp b/include/kth/database/databases/transaction_entry.hpp index a18840bc..7d273083 100644 --- a/include/kth/database/databases/transaction_entry.hpp +++ b/include/kth/database/databases/transaction_entry.hpp @@ -52,29 +52,8 @@ class KD_API transaction_entry { factory_to_data(sink, transaction_, height_, median_time_past_, position_ ); } - bool from_data(const data_chunk& data); - bool from_data(std::istream& stream); - - - template - bool from_data(R& source) { - reset(); - -#if defined(KTH_CACHED_RPC_DATA) - transaction_.from_data(source, false, true, false); -#else - transaction_.from_data(source, false, true); -#endif - height_ = source.read_4_bytes_little_endian(); - median_time_past_ = source.read_4_bytes_little_endian(); - position_ = read_position(source); - - if ( ! source) { - reset(); - } - - return source; - } + static + expect from_data(byte_reader& reader); bool confirmed() const; @@ -90,12 +69,7 @@ class KD_API transaction_entry { template static void factory_to_data(W& sink, domain::chain::transaction const& tx, uint32_t height, uint32_t median_time_past, uint32_t position) { -#if defined(KTH_CACHED_RPC_DATA) - tx.to_data(sink, false, true, false); -#else - tx.to_data(sink, false, true); -#endif - + tx.to_data(sink, false); sink.write_4_bytes_little_endian(height); sink.write_4_bytes_little_endian(median_time_past); write_position(sink, position); diff --git a/include/kth/database/databases/transaction_unconfirmed_database.ipp b/include/kth/database/databases/transaction_unconfirmed_database.ipp index d8c81038..3a29017a 100644 --- a/include/kth/database/databases/transaction_unconfirmed_database.ipp +++ b/include/kth/database/databases/transaction_unconfirmed_database.ipp @@ -37,7 +37,7 @@ transaction_unconfirmed_entry internal_database_basis::get_transaction_un } auto data = db_value_to_data_chunk(value); - auto res = domain::create(data); + auto res = domain::create_old(data); return res; } @@ -68,12 +68,12 @@ std::vector internal_database_basis::get_a if ((rc = kth_db_cursor_get(cursor, &key, &value, KTH_DB_NEXT)) == 0) { auto data = db_value_to_data_chunk(value); - auto res = domain::create(data); + auto res = domain::create_old(data); result.push_back(res); while ((rc = kth_db_cursor_get(cursor, &key, &value, KTH_DB_NEXT)) == 0) { auto data = db_value_to_data_chunk(value); - auto res = domain::create(data); + auto res = domain::create_old(data); result.push_back(res); } } diff --git a/include/kth/database/databases/transaction_unconfirmed_entry.hpp b/include/kth/database/databases/transaction_unconfirmed_entry.hpp index 763d4730..df8e806a 100644 --- a/include/kth/database/databases/transaction_unconfirmed_entry.hpp +++ b/include/kth/database/databases/transaction_unconfirmed_entry.hpp @@ -41,29 +41,8 @@ class KD_API transaction_unconfirmed_entry { factory_to_data(sink, transaction_, arrival_time_, height_); } - bool from_data(const data_chunk& data); - bool from_data(std::istream& stream); - - - template - bool from_data(R& source) { - reset(); - -#if defined(KTH_CACHED_RPC_DATA) - transaction_.from_data(source, false, true, true); -#else - transaction_.from_data(source, false, true); -#endif - arrival_time_ = source.read_4_bytes_little_endian(); - - height_ = source.read_4_bytes_little_endian(); - - if ( ! source) { - reset(); - } - - return source; - } + static + expect from_data(byte_reader& reader); static data_chunk factory_to_data(domain::chain::transaction const& tx, uint32_t arrival_time, uint32_t height); @@ -74,16 +53,9 @@ class KD_API transaction_unconfirmed_entry { template static void factory_to_data(W& sink, domain::chain::transaction const& tx, uint32_t arrival_time, uint32_t height) { - -#if defined(KTH_CACHED_RPC_DATA) - tx.to_data(sink, false, true, true); -#else - tx.to_data(sink, false, true); -#endif - + tx.to_data(sink, false); sink.write_4_bytes_little_endian(arrival_time); sink.write_4_bytes_little_endian(height); - } private: diff --git a/include/kth/database/databases/utxo_entry.hpp b/include/kth/database/databases/utxo_entry.hpp index 8062c4f6..1e2068a4 100644 --- a/include/kth/database/databases/utxo_entry.hpp +++ b/include/kth/database/databases/utxo_entry.hpp @@ -36,24 +36,8 @@ class KD_API utxo_entry { to_data_fixed(sink, height_, median_time_past_, coinbase_); } - bool from_data(const data_chunk& data); - bool from_data(std::istream& stream); - - template - bool from_data(R& source) { - reset(); - - output_.from_data(source, false); - height_ = source.read_4_bytes_little_endian(); - median_time_past_ = source.read_4_bytes_little_endian(); - coinbase_ = source.read_byte(); - - if ( ! source) { - reset(); - } - - return source; - } + static + expect from_data(byte_reader& reader); static data_chunk to_data_fixed(uint32_t height, uint32_t median_time_past, bool coinbase); diff --git a/include/kth/database/settings.hpp b/include/kth/database/settings.hpp index 8fe10dea..b89be7c9 100644 --- a/include/kth/database/settings.hpp +++ b/include/kth/database/settings.hpp @@ -26,6 +26,7 @@ class KD_API settings { uint64_t db_max_size; bool safe_mode; uint32_t cache_capacity; + uint32_t block_cache_capacity; }; } // namespace kth::database diff --git a/src/data_base.cpp b/src/data_base.cpp index e6c441f4..5c744c1d 100644 --- a/src/data_base.cpp +++ b/src/data_base.cpp @@ -19,6 +19,19 @@ #include #include #include +// #include +#include + + +namespace { +//TODO: remove from here +time_t floor_subtract_times(time_t left, time_t right) { + static auto const floor = (std::numeric_limits::min)(); + return right >= left ? floor : left - right; +} + +} // namespace kth + namespace kth::database { @@ -69,8 +82,6 @@ bool data_base::create(block const& genesis) { return false; } - // Store the first block. - // push(genesis, 0); push_genesis(genesis); closed_ = false; @@ -94,6 +105,7 @@ bool data_base::close() { return true; } + block_cache_->flush_to_db(); closed_ = true; auto const closed = internal_db_->close(); return closed; @@ -106,6 +118,9 @@ void data_base::start() { settings_.db_mode, settings_.reorg_pool_limit, settings_.db_max_size, settings_.safe_mode); + + LOG_INFO(LOG_DATABASE, "Starting a block cache with ", settings_.block_cache_capacity, " blocks capacity"); + block_cache_ = block_cache_t(settings_.block_cache_capacity, internal_db_.get()); } // Readers. @@ -115,9 +130,10 @@ internal_database const& data_base::internal_db() const { return *internal_db_; } -// Synchronous writers. +// Others // ---------------------------------------------------------------------------- + static inline uint32_t get_next_height(internal_database const& db) { uint32_t current_height; @@ -130,6 +146,30 @@ uint32_t get_next_height(internal_database const& db) { return current_height + 1; } +bool data_base::is_stale() const { + //TODO: Should be a config value + constexpr time_t notify_limit_seconds = 6 * 60 * 60; // 6 hours + //TODO: cache the last header + + uint32_t timestamp = 0; + uint32_t last_height; + auto res = internal_db_->get_last_height(last_height); + if (res != result_code::success) { + return true; + } + + auto last_header = internal_db_->get_header(last_height); + if ( ! last_header.is_valid()) { + return true; + } + timestamp = last_header.timestamp(); + + return timestamp < floor_subtract_times(zulu_time(), notify_limit_seconds); +} + +// Synchronous writers. +// ---------------------------------------------------------------------------- + static inline hash_digest get_previous_hash(internal_database const& db, size_t height) { return height == 0 ? null_hash : db.get_header(height - 1).hash(); @@ -190,9 +230,6 @@ code data_base::insert(domain::chain::block const& block, size_t height) { return error::success; } -#endif //! defined(KTH_DB_READONLY) - -#if ! defined(KTH_DB_READONLY) // This is designed for write exclusivity and read concurrency. code data_base::push(domain::chain::transaction const& tx, uint32_t forks) { @@ -208,6 +245,7 @@ code data_base::push(domain::chain::transaction const& tx, uint32_t forks) { #endif // ! defined(KTH_DB_READONLY) #if ! defined(KTH_DB_READONLY) + // Add a block in order (creates no gaps, must be at top). // This is designed for write exclusivity and read concurrency. code data_base::push(block const& block, size_t height) { @@ -274,7 +312,6 @@ bool data_base::pop(block& out_block) { #if ! defined(KTH_DB_READONLY) // A false return implies store corruption. bool data_base::pop_inputs(const input::list& inputs, size_t height) { - return true; } @@ -294,8 +331,26 @@ bool data_base::pop_outputs(const output::list& outputs, size_t height) { void data_base::push_all(block_const_ptr_list_const_ptr in_blocks, size_t first_height, dispatcher& dispatch, result_handler handler) { DEBUG_ONLY(*safe_add(in_blocks->size(), first_height)); + // Parallel version (disabled at the moment) // This is the beginning of the push_all sequence. - push_next(error::success, in_blocks, 0, first_height, dispatch, handler); + // push_next(error::success, in_blocks, 0, first_height, dispatch, handler); + + // Sequential version + for (size_t i = 0; i < in_blocks->size(); ++i) { + auto const block_ptr = (*in_blocks)[i]; + auto const median_time_past = block_ptr->header().validation.median_time_past; + + block_ptr->validation.start_push = asio::steady_clock::now(); + + auto res = internal_db_->push_block(*block_ptr, first_height + i, median_time_past); + if ( ! succeed(res)) { + handler(error::operation_failed_7); //TODO(fernando): create a new operation_failed + return; + } + + block_ptr->validation.end_push = asio::steady_clock::now(); + } + handler(error::success); } // TODO(legacy): resolve inconsistency with height and median_time_past passing. diff --git a/src/databases/header_abla_entry.cpp b/src/databases/header_abla_entry.cpp index ddded763..f20dcac4 100644 --- a/src/databases/header_abla_entry.cpp +++ b/src/databases/header_abla_entry.cpp @@ -27,14 +27,29 @@ void to_data_with_abla_state(std::ostream& stream, domain::chain::block const& b to_data_with_abla_state(sink, block); } -std::optional get_header_and_abla_state_from_data(data_chunk const& data) { - data_source istream(data); - return get_header_and_abla_state_from_data(istream); +expect get_header_and_abla_state_from_data(byte_reader& reader) { + auto header = domain::chain::header::from_data(reader, true); + if ( ! header) { + return make_unexpected(header.error()); + } + + auto const block_size = reader.read_little_endian(); + if ( ! block_size) { + return std::make_tuple(std::move(*header), 0, 0, 0); + } + + auto const control_block_size = reader.read_little_endian(); + if ( ! control_block_size) { + return std::make_tuple(std::move(*header), 0, 0, 0); + } + + auto const elastic_buffer_size = reader.read_little_endian(); + if ( ! elastic_buffer_size) { + return std::make_tuple(std::move(*header), 0, 0, 0); + } + + return std::make_tuple(std::move(*header), *block_size, *control_block_size, *elastic_buffer_size); } -std::optional get_header_and_abla_state_from_data(std::istream& stream) { - istream_reader source(stream); - return get_header_and_abla_state_from_data(source); -} } // namespace kth::database diff --git a/src/databases/history_entry.cpp b/src/databases/history_entry.cpp index 15835343..a2f155d8 100644 --- a/src/databases/history_entry.cpp +++ b/src/databases/history_entry.cpp @@ -63,6 +63,51 @@ size_t history_entry::serialized_size(domain::chain::point const& point) { return sizeof(uint64_t) + point.serialized_size(false) + sizeof(uint8_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t); } +// Deserialization. +//----------------------------------------------------------------------------- + +// static +expect history_entry::from_data(byte_reader& reader) { + auto const id = reader.read_little_endian(); + if ( ! id) { + return make_unexpected(id.error()); + } + + auto const point = domain::chain::point::from_data(reader, false); + if ( ! point) { + return make_unexpected(point.error()); + } + + auto const point_kind = reader.read_byte(); + if ( ! point_kind) { + return make_unexpected(point_kind.error()); + } + + auto const height = reader.read_little_endian(); + if ( ! height) { + return make_unexpected(height.error()); + } + + auto const index = reader.read_little_endian(); + if ( ! index) { + return make_unexpected(index.error()); + } + + auto const value_or_checksum = reader.read_little_endian(); + if ( ! value_or_checksum) { + return make_unexpected(value_or_checksum.error()); + } + + return history_entry( + *id, + *point, + domain::chain::point_kind(*point_kind), + *height, + *index, + *value_or_checksum + ); +} + // Serialization. //----------------------------------------------------------------------------- diff --git a/src/databases/transaction_entry.cpp b/src/databases/transaction_entry.cpp index abf92704..b0f08638 100644 --- a/src/databases/transaction_entry.cpp +++ b/src/databases/transaction_entry.cpp @@ -59,14 +59,39 @@ bool transaction_entry::is_valid() const { // constexpr //TODO(fernando): make this constexpr size_t transaction_entry::serialized_size(domain::chain::transaction const& tx) { -#if defined(KTH_CACHED_RPC_DATA) - return tx.serialized_size(false, true, false) -#else - return tx.serialized_size(false, true) -#endif + return tx.serialized_size(false) + sizeof(uint32_t) + sizeof(uint32_t) + position_size; } +// Deserialization. +//----------------------------------------------------------------------------- + +// static +expect transaction_entry::from_data(byte_reader& reader) { + auto tx = domain::chain::transaction::from_data(reader, false); + if ( ! tx) { + return make_unexpected(tx.error()); + } + + auto height = reader.read_little_endian(); + if ( ! height) { + return make_unexpected(height.error()); + } + + auto median_time_past = reader.read_little_endian(); + if ( ! median_time_past) { + return make_unexpected(median_time_past.error()); + } + + using position_type = uint32_t; + auto const position = reader.read_little_endian(); + if ( ! position) { + return make_unexpected(position.error()); + } + + return transaction_entry(std::move(*tx), *height, *median_time_past, *position); +} + // Serialization. //----------------------------------------------------------------------------- diff --git a/src/databases/transaction_unconfirmed_entry.cpp b/src/databases/transaction_unconfirmed_entry.cpp index f4d6a3de..caab1962 100644 --- a/src/databases/transaction_unconfirmed_entry.cpp +++ b/src/databases/transaction_unconfirmed_entry.cpp @@ -45,15 +45,33 @@ bool transaction_unconfirmed_entry::is_valid() const { // constexpr //TODO(fernando): make this constexpr size_t transaction_unconfirmed_entry::serialized_size(domain::chain::transaction const& tx) { -#if defined(KTH_CACHED_RPC_DATA) - return tx.serialized_size(false, true, true) -#else - return tx.serialized_size(false, true) -#endif + return tx.serialized_size(false) + sizeof(uint32_t) // arrival_time + sizeof(uint32_t); //height } +// Deserialization. +//----------------------------------------------------------------------------- + +// static +expect transaction_unconfirmed_entry::from_data(byte_reader& reader) { + auto tx = domain::chain::transaction::from_data(reader, false); + if ( ! tx) { + return make_unexpected(tx.error()); + } + + auto arrival_time = reader.read_little_endian(); + if ( ! arrival_time) { + return make_unexpected(arrival_time.error()); + } + + auto height = reader.read_little_endian(); + if ( ! height) { + return make_unexpected(height.error()); + } + + return transaction_unconfirmed_entry(std::move(*tx), *arrival_time, *height); +} // Serialization. //----------------------------------------------------------------------------- diff --git a/src/databases/utxo_entry.cpp b/src/databases/utxo_entry.cpp index 82703fa8..31ec92d2 100644 --- a/src/databases/utxo_entry.cpp +++ b/src/databases/utxo_entry.cpp @@ -100,6 +100,53 @@ void utxo_entry::to_data_with_fixed(std::ostream& stream, domain::chain::output } +// Deserialization. +//----------------------------------------------------------------------------- + // bool from_data(const data_chunk& data); + // bool from_data(std::istream& stream); + + // template + // bool from_data(R& source) { + // reset(); + + // output_.from_data(source, false); + // height_ = source.read_4_bytes_little_endian(); + // median_time_past_ = source.read_4_bytes_little_endian(); + // coinbase_ = source.read_byte(); + + // if ( ! source) { + // reset(); + // } + + // return source; + // } + +// static +expect utxo_entry::from_data(byte_reader& reader) { + auto output = domain::chain::output::from_data(reader, false); + if ( ! output) { + return make_unexpected(output.error()); + } + + auto const height = reader.read_little_endian(); + if ( ! height) { + return make_unexpected(height.error()); + } + + auto const median_time_past = reader.read_little_endian(); + if ( ! median_time_past) { + return make_unexpected(median_time_past.error()); + } + + auto const coinbase = reader.read_byte(); + if ( ! coinbase) { + return make_unexpected(coinbase.error()); + } + + return utxo_entry(std::move(*output), *height, *median_time_past, *coinbase); +} + + // Serialization. //----------------------------------------------------------------------------- diff --git a/src/settings.cpp b/src/settings.cpp index 0f84c863..cd0fda04 100644 --- a/src/settings.cpp +++ b/src/settings.cpp @@ -11,8 +11,8 @@ namespace kth::database { using namespace std::filesystem; //TODO(fernando): look for good defaults -constexpr auto db_size_pruned_mainnet = 100 * (uint64_t(1) << 30); //100 GiB -constexpr auto db_size_default_mainnet = 200 * (uint64_t(1) << 30); //200 GiB +constexpr auto db_size_pruned_mainnet = 200 * (uint64_t(1) << 30); //200 GiB +constexpr auto db_size_default_mainnet = 300 * (uint64_t(1) << 30); //300 GiB constexpr auto db_size_full_mainnet = 600 * (uint64_t(1) << 30); //600 GiB constexpr auto db_size_pruned_testnet4 = 5 * (uint64_t(1) << 30); // 5 GiB @@ -43,6 +43,7 @@ settings::settings() , db_max_size(get_db_max_size_mainnet(db_mode)) , safe_mode(true) , cache_capacity(0) + , block_cache_capacity(100) {} settings::settings(domain::config::network context) diff --git a/test/internal_database.cpp b/test/internal_database.cpp index c7cd2e12..d0cf5671 100644 --- a/test/internal_database.cpp +++ b/test/internal_database.cpp @@ -41,7 +41,7 @@ struct internal_database_directory_setup_fixture { domain::chain::block get_block(std::string const& enc) { data_chunk data; decode_base16(data, enc); - return domain::create(data); + return domain::create_old(data); } domain::chain::block get_genesis() { @@ -228,7 +228,7 @@ void check_reorg_output(KTH_DB_env* env_, KTH_DB_dbi& dbi_reorg_pool_, std::stri REQUIRE(kth_db_txn_commit(db_txn) == KTH_DB_SUCCESS); data_chunk data {static_cast(kth_db_get_data(value)), static_cast(kth_db_get_data(value)) + kth_db_get_size(value)}; - auto output = domain::create(data, false); + auto output = domain::create_old(data, false); REQUIRE(encode_base16(output.to_data(true)) == output_enc); } @@ -300,7 +300,7 @@ void check_blocks_db(KTH_DB_env* env_, KTH_DB_dbi& dbi_blocks_db_, uint32_t heig REQUIRE(kth_db_txn_commit(db_txn) == KTH_DB_SUCCESS); data_chunk data {static_cast(kth_db_get_data(value)), static_cast(kth_db_get_data(value)) + kth_db_get_size(value)}; - auto block = domain::create(data, false); + auto block = domain::create_old(data, false); REQUIRE(block.is_valid()); } @@ -330,7 +330,7 @@ void check_blocks_db(KTH_DB_env* env_, KTH_DB_dbi& dbi_blocks_db_, KTH_DB_dbi& d REQUIRE(kth_db_get(db_txn, dbi_transaction_db_, &key_tx, &value_tx) == KTH_DB_SUCCESS); data_chunk data_tx {static_cast(kth_db_get_data(value_tx)), static_cast(kth_db_get_data(value_tx)) + kth_db_get_size(value_tx)}; - auto entry = domain::create(data_tx); + auto entry = domain::create_old(data_tx); tx_list.push_back(std::move(entry.transaction())); while ((rc = kth_db_cursor_get(cursor, &key, &value, MDB_NEXT_DUP)) == 0) { @@ -341,7 +341,7 @@ void check_blocks_db(KTH_DB_env* env_, KTH_DB_dbi& dbi_blocks_db_, KTH_DB_dbi& d REQUIRE(kth_db_get(db_txn, dbi_transaction_db_, &key_tx, &value_tx) == KTH_DB_SUCCESS); data_chunk data_tx {static_cast(kth_db_get_data(value_tx)), static_cast(kth_db_get_data(value_tx)) + kth_db_get_size(value_tx)}; - auto entry = domain::create(data_tx); + auto entry = domain::create_old(data_tx); tx_list.push_back(std::move(entry.transaction())); } @@ -351,7 +351,7 @@ void check_blocks_db(KTH_DB_env* env_, KTH_DB_dbi& dbi_blocks_db_, KTH_DB_dbi& d REQUIRE(kth_db_get(db_txn, dbi_block_header_, &key, &value) == KTH_DB_SUCCESS); data_chunk data_header {static_cast(kth_db_get_data(value)), static_cast(kth_db_get_data(value)) + kth_db_get_size(value)}; - auto header = domain::create(data_header); + auto header = domain::create_old(data_header); REQUIRE(header.is_valid()); domain::chain::block block{header, std::move(tx_list)}; @@ -418,7 +418,7 @@ void check_reorg_index(KTH_DB_env* env_, KTH_DB_dbi& dbi_reorg_index_, std::stri REQUIRE(kth_db_get(db_txn, dbi_reorg_index_, &height_key, &value) == KTH_DB_SUCCESS); REQUIRE(kth_db_txn_commit(db_txn) == KTH_DB_SUCCESS); data_chunk data2 {static_cast(kth_db_get_data(value)), static_cast(kth_db_get_data(value)) + kth_db_get_size(value)}; - auto point_indexed = domain::create(data2, false); + auto point_indexed = domain::create_old(data2, false); hash_digest txid; REQUIRE(decode_hash(txid, txid_enc)); @@ -449,7 +449,7 @@ void check_reorg_block(KTH_DB_env* env_, KTH_DB_dbi& dbi_reorg_block_, uint32_t REQUIRE(kth_db_txn_commit(db_txn) == KTH_DB_SUCCESS); data_chunk data {static_cast(kth_db_get_data(value)), static_cast(kth_db_get_data(value)) + kth_db_get_size(value)}; - auto block = domain::create(data, false); + auto block = domain::create_old(data, false); REQUIRE(encode_base16(block.to_data(false)) == block_enc); }