Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor domain objects deserialization #25

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class KnuthDatabaseConan(KnuthConanFileV2):

def build_requirements(self):
if self.options.tests:
self.test_requires("catch2/3.6.0")
self.test_requires("catch2/3.7.1")

def requirements(self):
self.requires("domain/0.37.0", transitive_headers=True, transitive_libs=True)
Expand Down
328 changes: 326 additions & 2 deletions include/kth/database/data_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include <filesystem>
#include <memory>

#include <boost/unordered/unordered_flat_map.hpp>
#include <boost/unordered/unordered_flat_set.hpp>

#include <kth/domain.hpp>
#include <kth/database/define.hpp>
#include <kth/database/databases/internal_database.hpp>
Expand All @@ -21,15 +24,325 @@
#include <kth/infrastructure/utility/noncopyable.hpp>
#include <kth/infrastructure/utility/dispatcher.hpp>

#include <kth/infrastructure/utility/timer.hpp>

namespace kth::database {

//TODO: remove from here
inline
time_t floor_subtract_times(time_t left, time_t right) {
static auto const floor = (std::numeric_limits<time_t>::min)();
return right >= left ? floor : left - right;
}

/// This class is thread safe and implements the sequential locking pattern.
class KD_API data_base : public store, noncopyable {
public:
using handle = store::handle;
using result_handler = handle0;
using path = kth::path;

using height_t = uint32_t;
using header_t = domain::chain::header;
using header_with_height_t = std::pair<header_t, uint32_t>;
using block_t = domain::chain::block;
using block_with_height_t = std::pair<block_t, uint32_t>;
using header_with_abla_state_t = std::tuple<header_t, uint64_t, uint64_t, uint64_t>;

struct block_cache_t {
using transaction_t = domain::chain::transaction;
using blocks_t = std::vector<block_t>;
using hash_to_height_map_t = boost::unordered_flat_map<hash_digest, size_t>;
// using point_t = domain::chain::output_point;
using point_t = domain::chain::point;
using utxo_map_t = boost::unordered_flat_map<point_t, utxo_entry>;
using utxo_set_t = boost::unordered_flat_set<point_t>;

constexpr static auto max_height = std::numeric_limits<height_t>::max();

block_cache_t(size_t max_size, internal_database* db)
: max_size_(max_size)
, db_(db)
{
// LOG_INFO(LOG_DATABASE, "block_cache_t - ", __func__);
}

~block_cache_t() {
// LOG_INFO(LOG_DATABASE, "block_cache_t - ", __func__);
flush_to_db();
db_ = nullptr;
}

// Queries
// ------------------------------------------------------------------------------

expect<height_t> last_height() const {
if (start_height_ == max_height) {
return make_unexpected(error::empty_cache);
}
return start_height_ + blocks_.size() - 1;
}

//TODO: test it
bool is_height_in_cache(height_t height) const {
return height >= start_height_ && height < start_height_ + blocks_.size();
}

expect<block_t> get_block(height_t height) const {
if ( ! is_height_in_cache(height)) {
return make_unexpected(error::height_not_found);
}
return blocks_[height_to_position(height)];
}

expect<block_with_height_t> get_block(hash_digest const& hash) const {
auto const it = hash_to_height_map_.find(hash);
if (it == hash_to_height_map_.end()) {
return make_unexpected(error::hash_not_found);
}
auto const pos = it->second;
return block_with_height_t{blocks_[pos], position_to_height(pos)};
}

expect<header_t> get_header(height_t height) const {
if ( ! is_height_in_cache(height)) {
return make_unexpected(error::height_not_found);
}
return blocks_[height_to_position(height)].header();
}

expect<header_with_abla_state_t> get_header_and_abla_state(height_t height) const {
if ( ! is_height_in_cache(height)) {
return make_unexpected(error::height_not_found);
}
auto const& blk = blocks_[height_to_position(height)];
if ( ! blk.validation.state) {
return header_with_abla_state_t{blk.header(), 0, 0, 0};
}
return header_with_abla_state_t{
blk.header(),
blk.validation.state->abla_state().block_size,
blk.validation.state->abla_state().control_block_size,
blk.validation.state->abla_state().elastic_buffer_size
};
}

expect<header_with_height_t> get_header(hash_digest const& hash) const {
auto const it = hash_to_height_map_.find(hash);
if (it == hash_to_height_map_.end()) {
return make_unexpected(error::hash_not_found);
}
auto const pos = it->second;
return header_with_height_t{blocks_[pos].header(), position_to_height(pos)};
}

expect<utxo_entry> get_utxo(point_t const& point) const {
auto const it = utxo_map_.find(point);
if (it == utxo_map_.end()) {
return make_unexpected(error::utxo_not_found);
}
return it->second;
}

// Commands
// ------------------------------------------------------------------------------
void add_block(block_t const& blk, height_t height) {
blocks_.emplace_back(blk);
hash_to_height_map_.emplace(blk.hash(), blocks_.size() - 1);
process_utxos(blk, height);

if (start_height_ == max_height) {
start_height_ = height;
}

if (blocks_.size() >= max_size_) {
flush_to_db();
}
}

void add_block(block_t&& blk, height_t height) {
blocks_.emplace_back(std::move(blk));
hash_to_height_map_.emplace(blk.hash(), blocks_.size() - 1);
process_utxos(blocks_.back(), height);

if (start_height_ == max_height) {
start_height_ = height;
}

if (blocks_.size() >= max_size_) {
flush_to_db();
}
}

// void add_blocks(blocks_t const& blocks, height_t height) {
// // LOG_INFO(LOG_DATABASE, "block_cache_t - ", __func__);
// blocks_.insert(blocks_.end(), blocks.begin(), blocks.end());

// for (size_t le = 0; i < blocks.size(); ++i) {
// hash_to_height_map_.emplace(blocks[i].hash(), blocks_.size() - 1 - i);
// }

// if (start_height_ == max_height) {
// start_height_ = height;
// }

// if (blocks_.size() >= max_size_) {
// flush_to_db();
// }
// }

// void add_blocks(blocks_t&& blocks, height_t height) {
// // LOG_INFO(LOG_DATABASE, "block_cache_t - ", __func__);
// blocks_.insert(
// blocks_.end(),
// std::make_move_iterator(blocks.begin()),
// std::make_move_iterator(blocks.end())
// );

// for (size_t i = 0; i < blocks.size(); ++i) {
// hash_to_height_map_.emplace(blocks[i].hash(), blocks_.size() - 1 - i);

// }

// if (start_height_ == max_height) {
// start_height_ = height;
// }

// if (blocks_.size() >= max_size_) {
// flush_to_db();
// }
// }



bool is_stale() const {
//TODO: Should be a config value
constexpr time_t notify_limit_seconds = 6 * 60 * 60; // 6 hours
//TODO: cache the last header

uint32_t timestamp = 0;
auto last_height_exp = last_height();
if ( ! last_height_exp) {
return true;
}
auto last_height = *last_height_exp;

auto last_header_exp = get_header(last_height);
if ( ! last_header_exp) {
return true;
}
auto last_header = *last_header_exp;
timestamp = last_header.timestamp();

return timestamp < floor_subtract_times(zulu_time(), notify_limit_seconds);
}

void flush_to_db() {
// LOG_INFO(LOG_DATABASE, "block_cache_t - ", __func__);
if (blocks_.empty()) {
return;
}
if ( ! db_) {
LOG_ERROR(LOG_DATABASE, "Internal database not set, when trying to flush block cache");
return;
}
// db_->push_blocks(blocks_, start_height_);

// result_code push_blocks_and_utxos(
// std::vector<domain::chain::block> const& blocks,
// uint32_t height,
// utxo_pool_t const& utxos,
// utxos_to_remove_t const& utxos_to_remove,
// bool is_stale
// );

db_->push_blocks_and_utxos(blocks_, start_height_, utxo_map_, utxo_to_remove_, is_stale());

reset();
}

void reset() {
start_height_ = max_height;
blocks_.clear();
hash_to_height_map_.clear();
utxo_to_remove_.clear();
utxo_map_.clear();
}

private:
size_t height_to_position(height_t height) const {
return height - start_height_;
}

height_t position_to_height(size_t position) const {
return start_height_ + position;
}

void insert_utxos_from_transaction(transaction_t const& tx, height_t height, uint32_t median_time_past, utxo_map_t& utxos) {
for (size_t i = 0; i < tx.outputs().size(); ++i) {
auto const& output = tx.outputs()[i];
point_t point {tx.hash(), uint32_t(i)};
utxos.emplace(point, utxo_entry{output, height, median_time_past, false});
}
}

void process_utxos(block_t const& blk, height_t height) {
// precondition: blk.transactions().size() > 0

utxo_map_t block_utxos;

// first insert the utxos, then remove the spent
// utxo_entry(domain::chain::output output, uint32_t height, uint32_t median_time_past, bool coinbase);

auto const median_time_past = blk.header().validation.median_time_past;
auto const& coinbase_tx = blk.transactions().front();
insert_utxos_from_transaction(coinbase_tx, height, median_time_past, block_utxos);

for (size_t i = 1; i < blk.transactions().size(); ++i) {
auto const& tx = blk.transactions()[i];
insert_utxos_from_transaction(tx, height, median_time_past, block_utxos);
}

// Remove the spent utxos, try first in the block_utxos
// If not found, try in the utxo_map_, in this case they also have
// to be removed from the DB

// The coinbase tx does not spend coins, so it is skipped
for (size_t i = 1; i < blk.transactions().size(); ++i) {
auto const& tx = blk.transactions()[i];

// if ( ! tx.inputs().empty()) {
// LOG_INFO(LOG_DATABASE, "process_utxos() - height: ", height, " - spending: ", tx.inputs().size());
// }
for (auto const& input : tx.inputs()) {
auto const& point = input.previous_output();
auto count = block_utxos.erase(point);
// LOG_INFO(LOG_DATABASE, "process_utxos() - height: ", height, " - erasing utxo: ", encode_hash(point.hash()), ":", point.index(), " - count: ", count);
if (count == 0) {
count = utxo_map_.erase(point);
// LOG_INFO(LOG_DATABASE, "process_utxos() - height: ", height, " - erasing utxo: ", encode_hash(point.hash()), ":", point.index(), " - count: ", count);
if (count == 0) {
// LOG_INFO(LOG_DATABASE, "UTXO not found in block neither in cache, have to be in DB");
utxo_to_remove_.insert(point);
}
}
}
}

utxo_map_.insert(block_utxos.begin(), block_utxos.end());
}

size_t max_size_;
internal_database* db_ = nullptr;

height_t start_height_ = max_height;
blocks_t blocks_;
hash_to_height_map_t hash_to_height_map_;
utxo_map_t utxo_map_;
utxo_set_t utxo_to_remove_;
};


// Construct.
// ----------------------------------------------------------------------------

Expand All @@ -52,17 +365,26 @@ class KD_API data_base : public store, noncopyable {
/// Call close on destruct.
~data_base();


bool is_stale() const;

// Readers.
// ------------------------------------------------------------------------

expect<domain::chain::header> get_header(size_t height) const;
expect<header_with_height_t> get_header(hash_digest const& hash) const;
expect<header_with_abla_state_t> get_header_and_abla_state(size_t height) const;
expect<block_t> get_block(size_t height) const;
expect<block_with_height_t> get_block(hash_digest const& hash) const;
expect<height_t> get_last_height() const;
expect<utxo_entry> get_utxo(domain::chain::output_point const& point) const;

internal_database const& internal_db() const;

// Synchronous writers.
// ------------------------------------------------------------------------

#if ! defined(KTH_DB_READONLY)


/// Store a block in the database.
/// Returns store_block_duplicate if a block already exists at height.
code insert(domain::chain::block const& block, size_t height);
Expand Down Expand Up @@ -132,8 +454,10 @@ class KD_API data_base : public store, noncopyable {
void handle_push(code const& ec, result_handler handler) const;
#endif // ! defined(KTH_DB_READONLY)


std::atomic<bool> closed_;
settings const& settings_;
std::optional<block_cache_t> block_cache_;
};

} // namespace kth::database
Expand Down
2 changes: 1 addition & 1 deletion include/kth/database/databases/block_database.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ domain::chain::block internal_database_basis<Clock>::get_block(uint32_t height,
}

auto data = db_value_to_data_chunk(value);
auto res = domain::create<domain::chain::block>(data);
auto res = domain::create_old<domain::chain::block>(data);
return res;
}
// db_mode_ == db_mode_type::pruned {
Expand Down
Loading
Loading