diff --git a/ydb/include/userver/ydb/cache.hpp b/ydb/include/userver/ydb/cache.hpp new file mode 100644 index 000000000000..c3e8a7c475e5 --- /dev/null +++ b/ydb/include/userver/ydb/cache.hpp @@ -0,0 +1,734 @@ +#pragma once + +/// @file userver/cache/ydb/cache.hpp +/// @brief @copybrief components::YdbCache + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace components { + +// clang-format off + +/// @page Caching Component for Ydb +/// +/// A typical components::YdbCache usage consists of trait definition: +/// +/// @snippet cache/ydb_cache_test.cpp ydb Cache Policy Trivial +/// +/// and registration of the component in components::ComponentList: +/// +/// @snippet cache/ydb_cache_test.cpp ydb Cache Trivial Usage +/// +/// See @ref scripts/docs/en/userver/caches.md for introduction into caches. +/// +/// +/// @section ydb_cc_configuration Configuration +/// +/// components::YdbCache static configuration file should have a Ydb +/// component name specified in `ydbcomponent` configuration parameter. +/// +/// Optionally the operation timeouts for cache loading can be specified. +/// +/// ### Avoiding memory leaks +/// components::CachingComponentBase +/// +/// Name | Description | Default value +/// ---- | ----------- | ------------- +/// full-update-op-timeout | timeout for a full update | 1m +/// incremental-update-op-timeout | timeout for an incremental update | 1s +/// update-correction | incremental update window adjustment | - (0 for caches with defined GetLastKnownUpdated) +/// chunk-size | number of rows to request from Ydb via portals, 0 to fetch all rows in one request without portals | 1000 +/// +/// @section ydb_cc_cache_policy Cache policy +/// +/// Cache policy is the template argument of components::YdbCache component. +/// Please see the following code snippet for documentation. +/// +/// @snippet cache/ydb_cache_test.cpp ydb Cache Policy Example +/// +/// The query can be a std::string. But due to non-guaranteed order of static +/// data members initialization, std::string should be returned from a static +/// member function, please see the following code snippet. +/// +/// @snippet cache/ydb_cache_test.cpp ydb Cache Policy GetQuery Example +/// +/// Policy may have static function GetLastKnownUpdated. It should be used +/// when new entries from database are taken via revision, identifier, or +/// anything else, but not timestamp of the last update. +/// If this function is supplied, new entries are taken from db with condition +/// 'WHERE kUpdatedField > GetLastKnownUpdated(cache_container)'. +/// Otherwise, condition is +/// 'WHERE kUpdatedField > last_update - correction_'. +/// See the following code snippet for an example of usage +/// +/// @snippet cache/ydb_cache_test.cpp ydb Cache Policy Custom Updated Example +/// +/// In case one provides a custom CacheContainer within Policy, it is notified +/// of Update completion via its public member function OnWritesDone, if any. +/// See the following code snippet for an example of usage: +/// +/// @snippet cache/ydb_cache_test.cpp ydb Cache Policy Custom Container With Write Notification Example +/// +/// @section ydb_cc_forward_declaration Forward Declaration +/// +/// To forward declare a cache you can forward declare a trait and +/// include userver/cache/base_ydb_cache_fwd.hpp header. It is also useful to +/// forward declare the cache value type. +/// +/// @snippet cache/ydb_cache_test_fwd.hpp ydb Cache Fwd Example +/// +/// ---------- +/// +/// @htmlonly
@endhtmlonly +/// ⇦ @ref scripts/docs/en/userver/cache_dumps.md | @ref scripts/docs/en/userver/lru_cache.md ⇨ +/// @htmlonly
@endhtmlonly + +// clang-format on + +namespace ydb_cache::detail { + +template +using ValueType = typename T::ValueType; +template +inline constexpr bool kHasValueType = meta::kIsDetected; + +template +using RawValueTypeImpl = typename T::RawValueType; +template +inline constexpr bool kHasRawValueType = meta::kIsDetected; +template +using RawValueType = meta::DetectedOr, RawValueTypeImpl, T>; + +template +auto ExtractValue(RawValueType&& raw) { + if constexpr (kHasRawValueType) { + return Convert(std::move(raw), + formats::parse::To>()); + } else { + return std::move(raw); + } +} + +// Component name in policy +template +using HasNameImpl = std::enable_if_t; +template +inline constexpr bool kHasName = meta::kIsDetected; + +// Component query in policy +template +using HasQueryImpl = decltype(T::kQuery); +template +inline constexpr bool kHasQuery = meta::kIsDetected; + +// Component GetQuery in policy +template +using HasGetQueryImpl = decltype(T::GetQuery()); +template +inline constexpr bool kHasGetQuery = meta::kIsDetected; + +// Component kWhere in policy +template +using HasWhere = decltype(T::kWhere); +template +inline constexpr bool kHasWhere = meta::kIsDetected; + +// Update field +template +using HasUpdatedField = decltype(T::kUpdatedField); +template +inline constexpr bool kHasUpdatedField = meta::kIsDetected; + +template +using WantIncrementalUpdates = + std::enable_if_t; +template +inline constexpr bool kWantIncrementalUpdates = + meta::kIsDetected; + +// Key member in policy +template +using KeyMemberTypeImpl = + std::decay_t>>; +template +inline constexpr bool kHasKeyMember = meta::kIsDetected; +template +using KeyMemberType = meta::DetectedType; + +// Data container for cache +template > +struct DataCacheContainer { + static_assert(meta::kIsStdHashable>, + "With default CacheContainer, key type must be std::hash-able"); + + using type = std::unordered_map, ValueType>; +}; + +template +struct DataCacheContainer< + T, USERVER_NAMESPACE::utils::void_t> { + using type = typename T::CacheContainer; +}; + +template +using DataCacheContainerType = typename DataCacheContainer::type; + +// We have to whitelist container types, for which we perform by-element +// copying, because it's not correct for certain custom containers. +template +inline constexpr bool kIsContainerCopiedByElement = + meta::kIsInstantiationOf || + meta::kIsInstantiationOf; + +template +std::unique_ptr CopyContainer( + const T& container, [[maybe_unused]] std::size_t cpu_relax_iterations, + tracing::ScopeTime& scope) { + if constexpr (kIsContainerCopiedByElement) { + auto copy = std::make_unique(); + if constexpr (meta::kIsReservable) { + copy->reserve(container.size()); + } + + utils::CpuRelax relax{cpu_relax_iterations, &scope}; + for (const auto& kv : container) { + relax.Relax(); + copy->insert(kv); + } + return copy; + } else { + return std::make_unique(container); + } +} + +template +void CacheInsertOrAssign(Container& container, Value&& value, + const KeyMember& key_member, Args&&... /*args*/) { + // Args are only used to de-prioritize this default overload. + static_assert(sizeof...(Args) == 0); + // Copy 'key' to avoid aliasing issues in 'insert_or_assign'. + auto key = std::invoke(key_member, value); + container.insert_or_assign(std::move(key), std::forward(value)); +} + +template +using HasOnWritesDoneImpl = decltype(std::declval().OnWritesDone()); + +template +void OnWritesDone(T& container) { + if constexpr (meta::kIsDetected) { + container.OnWritesDone(); + } +} + +template +using HasCustomUpdatedImpl = + decltype(T::GetLastKnownUpdated(std::declval>())); + +template +inline constexpr bool kHasCustomUpdated = + meta::kIsDetected; + +template +using UpdatedFieldTypeImpl = typename T::UpdatedFieldType; +template +inline constexpr bool kHasUpdatedFieldType = + meta::kIsDetected; +template +using UpdatedFieldType = + meta::DetectedOr; + +template +constexpr bool CheckUpdatedFieldType() { + if constexpr (kHasUpdatedFieldType) { + static_assert( + std::is_same_v || + std::is_same_v || + kHasCustomUpdated, + "Invalid UpdatedFieldType, must be either TimePointTz or TimePoint"); + } else { + static_assert(!kWantIncrementalUpdates, + "UpdatedFieldType must be explicitly specified when using " + "incremental updates"); + } + return true; +} + +// Cluster host type policy +template +using HasClusterHostTypeImpl = decltype(T::ClusterHostType); + +template +constexpr storages::ydb::ClusterHostType ClusterHostType() { + if constexpr (meta::kIsDetected) { + return T::ClusterHostType; + } else { + return storages::ydb::ClusterHostType::kPrimary; + } +} + +// May return null policy +template +using HasMayReturnNull = decltype(T::kMayReturnNull); + +template +constexpr bool MayReturnNull() { + if constexpr (meta::kIsDetected) { + return T::kMayReturnNull; + } else { + return false; + } +} + +template +struct PolicyChecker { + // Static assertions for cache traits + static_assert( + kHasName, + "The PosgreSQL cache policy must contain a static member `kName`"); + static_assert( + kHasValueType, + "The PosgreSQL cache policy must define a type alias `ValueType`"); + static_assert( + kHasKeyMember, + "The Ydb cache policy must contain a static member `kKeyMember` " + "with a pointer to a data or a function member with the object's key"); + static_assert(kHasQuery || + kHasGetQuery, + "The PosgreSQL cache policy must contain a static data member " + "`kQuery` with a select statement or a static member function " + "`GetQuery` returning the query"); + static_assert(!(kHasQuery && + kHasGetQuery), + "The PosgreSQL cache policy must define `kQuery` or " + "`GetQuery`, not both"); + static_assert( + kHasUpdatedField, + "The PosgreSQL cache policy must contain a static member " + "`kUpdatedField`. If you don't want to use incremental updates, " + "please set its value to `nullptr`"); + static_assert(CheckUpdatedFieldType()); + + /* + static_assert(ClusterHostType() & + storages::ydb::kClusterHostRolesMask, + "Cluster host role must be specified for caching component, " + "please be more specific"); + */ + + static storages::ydb::Query GetQuery() { + if constexpr (kHasGetQuery) { + return YdbCachePolicy::GetQuery(); + } else { + return YdbCachePolicy::kQuery; + } + } + + using BaseType = + CachingComponentBase>; +}; + +inline constexpr std::chrono::minutes kDefaultFullUpdateTimeout{1}; +inline constexpr std::chrono::seconds kDefaultIncrementalUpdateTimeout{1}; +inline constexpr std::chrono::milliseconds kStatementTimeoutOff{0}; +inline constexpr std::chrono::milliseconds kCpuRelaxThreshold{10}; +inline constexpr std::chrono::milliseconds kCpuRelaxInterval{2}; + +inline constexpr std::string_view kCopyStage = "copy_data"; +inline constexpr std::string_view kFetchStage = "fetch"; +inline constexpr std::string_view kParseStage = "parse"; + +inline constexpr std::size_t kDefaultChunkSize = 1000; +} // namespace ydb_cache::detail + +/// @ingroup userver_components +/// +/// @brief Caching component for Ydb. See @ref ydb_cache. +/// +/// @see @ref ydb_cache, @ref scripts/docs/en/userver/caches.md +template +class YdbCache final + : public ydb_cache::detail::PolicyChecker::BaseType { + public: + // Type aliases + using PolicyType = YdbCachePolicy; + using ValueType = ydb_cache::detail::ValueType; + using RawValueType = ydb_cache::detail::RawValueType; + using DataType = ydb_cache::detail::DataCacheContainerType; + using PolicyCheckerType = ydb_cache::detail::PolicyChecker; + using UpdatedFieldType = + ydb_cache::detail::UpdatedFieldType; + using BaseType = typename PolicyCheckerType::BaseType; + + // Calculated constants + constexpr static bool kIncrementalUpdates = + ydb_cache::detail::kWantIncrementalUpdates; + constexpr static auto kClusterHostTypeFlags = + ydb_cache::detail::ClusterHostType(); + constexpr static auto kName = PolicyType::kName; + + YdbCache(const ComponentConfig&, const ComponentContext&); + ~YdbCache() override; + + static yaml_config::Schema GetStaticConfigSchema(); + + private: + using CachedData = std::unique_ptr; + + UpdatedFieldType GetLastUpdated( + std::chrono::system_clock::time_point last_update, + const DataType& cache) const; + + void Update(cache::UpdateType type, + const std::chrono::system_clock::time_point& last_update, + const std::chrono::system_clock::time_point& now, + cache::UpdateStatisticsScope& stats_scope) override; + + bool MayReturnNull() const override; + + CachedData GetDataSnapshot(cache::UpdateType type, tracing::ScopeTime& scope); + void CacheResults(std::vector res, CachedData& data_cache, + cache::UpdateStatisticsScope& stats_scope, + tracing::ScopeTime& scope); + + static storages::ydb::Query GetAllQuery(); + static storages::ydb::Query GetDeltaQuery(); + + std::chrono::milliseconds ParseCorrection(const ComponentConfig& config); + + std::vector> clusters_; + + const std::chrono::system_clock::duration correction_; + const std::chrono::milliseconds full_update_timeout_; + const std::chrono::milliseconds incremental_update_timeout_; + const std::size_t chunk_size_; + std::size_t cpu_relax_iterations_parse_{0}; + std::size_t cpu_relax_iterations_copy_{0}; +}; + +template +inline constexpr bool kHasValidate> = true; + +template +YdbCache::YdbCache(const ComponentConfig& config, + const ComponentContext& context) + : BaseType{config, context}, + correction_{ParseCorrection(config)}, + full_update_timeout_{ + config["full-update-op-timeout"].As( + ydb_cache::detail::kDefaultFullUpdateTimeout)}, + incremental_update_timeout_{ + config["incremental-update-op-timeout"].As( + ydb_cache::detail::kDefaultIncrementalUpdateTimeout)}, + chunk_size_{config["chunk-size"].As( + ydb_cache::detail::kDefaultChunkSize)} { + /* TODO + UINVARIANT( + !chunk_size_ || storages::ydb::Portal::IsSupportedByDriver(), + "Either set 'chunk-size' to 0, or enable Ydb portals by building " + "the framework with CMake option USERVER_FEATURE_PATCH_LIBPQ set to ON."); + */ + if (this->GetAllowedUpdateTypes() == + cache::AllowedUpdateTypes::kFullAndIncremental && + !kIncrementalUpdates) { + throw std::logic_error( + "Incremental update support is requested in config but no update field " + "name is specified in traits of '" + + config.Name() + "' cache"); + } + if (correction_.count() < 0) { + throw std::logic_error( + "Refusing to set forward (negative) update correction requested in " + "config for '" + + config.Name() + "' cache"); + } + + const auto ydb_alias = config["ydbcomponent"].As(""); + /* TODO + if (ydb_alias.empty()) { + throw storages::ydb::InvalidConfig{ + "No `ydbcomponent` entry in configuration"}; + } + */ + auto& ydb_cluster_comp = context.FindComponent(ydb_alias); + const auto shard_count = 1; + clusters_.resize(shard_count); + for (size_t i = 0; i < shard_count; ++i) { + clusters_[i] = ydb_cluster_comp.GetCluster(); + } + + LOG_INFO() << "Cache " << kName << " full update query `" + << GetAllQuery().GetStatement() << "` incremental update query `" + << GetDeltaQuery().GetStatement() << "`"; + + this->StartPeriodicUpdates(); +} + +template +YdbCache::~YdbCache() { + this->StopPeriodicUpdates(); +} + +template +storages::ydb::Query YdbCache::GetAllQuery() { + storages::ydb::Query query = PolicyCheckerType::GetQuery(); + if constexpr (ydb_cache::detail::kHasWhere) { + return {fmt::format("{} where {}", query.GetStatement(), + YdbCachePolicy::kWhere)}; + } else { + return query; + } +} + +template +storages::ydb::Query YdbCache::GetDeltaQuery() { + if constexpr (kIncrementalUpdates) { + storages::ydb::Query query = PolicyCheckerType::GetQuery(); + if constexpr (ydb_cache::detail::kHasWhere) { + return { + fmt::format("{} where ({}) and {} >= $1", query.GetStatement(), + YdbCachePolicy::kWhere, PolicyType::kUpdatedField)}; + } else { + return {fmt::format("{} where {} >= $1", query.GetStatement(), + PolicyType::kUpdatedField)}; + } + } else { + return GetAllQuery(); + } +} + +template +std::chrono::milliseconds YdbCache::ParseCorrection( + const ComponentConfig& config) { + static constexpr std::string_view kUpdateCorrection = "update-correction"; + if (ydb_cache::detail::kHasCustomUpdated || + this->GetAllowedUpdateTypes() == cache::AllowedUpdateTypes::kOnlyFull) { + return config[kUpdateCorrection].As(0); + } else { + return config[kUpdateCorrection].As(); + } +} + +template +typename YdbCache::UpdatedFieldType +YdbCache::GetLastUpdated( + [[maybe_unused]] std::chrono::system_clock::time_point last_update, + const DataType& cache) const { + if constexpr (ydb_cache::detail::kHasCustomUpdated) { + return YdbCachePolicy::GetLastKnownUpdated(cache); + } else { + return UpdatedFieldType{last_update - correction_}; + } +} + +template +void YdbCache::Update( + cache::UpdateType type, + const std::chrono::system_clock::time_point& /*last_update*/, + const std::chrono::system_clock::time_point& /*now*/, + cache::UpdateStatisticsScope& stats_scope) { + if constexpr (!kIncrementalUpdates) { + type = cache::UpdateType::kFull; + } + const auto query = + (type == cache::UpdateType::kFull) ? GetAllQuery() : GetDeltaQuery(); + /* todo + const std::chrono::milliseconds timeout = (type == cache::UpdateType::kFull) + ? full_update_timeout_ + : incremental_update_timeout_; + */ + // COPY current cached data + auto scope = tracing::Span::CurrentSpan().CreateScopeTime( + std::string{ydb_cache::detail::kCopyStage}); + auto data_cache = GetDataSnapshot(type, scope); + [[maybe_unused]] const auto old_size = data_cache->size(); + + scope.Reset(std::string{ydb_cache::detail::kFetchStage}); + + size_t changes = 0; + // Iterate clusters + // TODO + for (const std::shared_ptr& cluster : clusters_) { + if (chunk_size_ > 0) { + /*auto trx = cluster->Begin(kClusterHostTypeFlags); + auto portal = + trx.MakePortal(query, GetLastUpdated(last_update, *data_cache)); + while (portal) { + scope.Reset(std::string{ydb_cache::detail::kFetchStage}); + auto res = portal.Fetch(chunk_size_); + stats_scope.IncreaseDocumentsReadCount(res.Size()); + + scope.Reset(std::string{ydb_cache::detail::kParseStage}); + CacheResults(res, data_cache, stats_scope, scope); + changes += res.Size(); + } + trx.Commit();*/ + } else { + //bool has_parameter = query.GetStatement().find('$') != std::string::npos; + auto resultValues = cluster->Execute(userver::storages::ydb::ClusterHostType::kPrimary, query.GetStatement()).AsVector(); + stats_scope.IncreaseDocumentsReadCount(resultValues.size()); + + scope.Reset(std::string{ydb_cache::detail::kParseStage}); + CacheResults(resultValues, data_cache, stats_scope, scope); + changes += resultValues.size(); + } + } + + scope.Reset(); + + if constexpr (ydb_cache::detail::kIsContainerCopiedByElement) { + if (old_size > 0) { + const auto elapsed_copy = + scope.ElapsedTotal(std::string{ydb_cache::detail::kCopyStage}); + if (elapsed_copy > ydb_cache::detail::kCpuRelaxThreshold) { + cpu_relax_iterations_copy_ = static_cast( + static_cast(old_size) / + (elapsed_copy / ydb_cache::detail::kCpuRelaxInterval)); + LOG_TRACE() << "Elapsed time for copying " << kName << " " + << elapsed_copy.count() << " for " << changes + << " data items is over threshold. Will relax CPU every " + << cpu_relax_iterations_parse_ << " iterations"; + } + } + } + + if (changes > 0) { + const auto elapsed_parse = + scope.ElapsedTotal(std::string{ydb_cache::detail::kParseStage}); + if (elapsed_parse > ydb_cache::detail::kCpuRelaxThreshold) { + cpu_relax_iterations_parse_ = static_cast( + static_cast(changes) / + (elapsed_parse / ydb_cache::detail::kCpuRelaxInterval)); + LOG_TRACE() << "Elapsed time for parsing " << kName << " " + << elapsed_parse.count() << " for " << changes + << " data items is over threshold. Will relax CPU every " + << cpu_relax_iterations_parse_ << " iterations"; + } + } + if (changes > 0 || type == cache::UpdateType::kFull) { + // Set current cache + stats_scope.Finish(data_cache->size()); + ydb_cache::detail::OnWritesDone(*data_cache); + this->Set(std::move(data_cache)); + } else { + stats_scope.FinishNoChanges(); + } +} + +template +bool YdbCache::MayReturnNull() const { + return ydb_cache::detail::MayReturnNull(); +} + +template +void YdbCache::CacheResults( + std::vector res, CachedData& data_cache, + cache::UpdateStatisticsScope& stats_scope, tracing::ScopeTime& scope) { + auto values = res; + utils::CpuRelax relax{cpu_relax_iterations_parse_, &scope}; + for (auto p = values.begin(); p != values.end(); ++p) { + relax.Relax(); + try { + using ydb_cache::detail::CacheInsertOrAssign; + CacheInsertOrAssign( + *data_cache, *p, + YdbCachePolicy::kKeyMember); + } catch (const std::exception& e) { + stats_scope.IncreaseDocumentsParseFailures(1); + LOG_ERROR() << "Error parsing data row in cache '" << kName << "' to '" + << compiler::GetTypeName() << "': " << e.what(); + } + } +} + +template +typename YdbCache::CachedData +YdbCache::GetDataSnapshot(cache::UpdateType type, + tracing::ScopeTime& scope) { + if (type == cache::UpdateType::kIncremental) { + auto data = this->Get(); + if (data) { + return ydb_cache::detail::CopyContainer(*data, cpu_relax_iterations_copy_, + scope); + } + } + return std::make_unique(); +} + +namespace impl { + +} // namespace impl + +template +yaml_config::Schema YdbCache::GetStaticConfigSchema() { + using ParentType = + typename ydb_cache::detail::PolicyChecker::BaseType; + const static std::string schema = R"( +type: object +description: Caching component for Ydb derived from components::CachingComponentBase. +additionalProperties: false +properties: + full-update-op-timeout: + type: string + description: timeout for a full update + defaultDescription: 1m + incremental-update-op-timeout: + type: string + description: timeout for an incremental update + defaultDescription: 1s + update-correction: + type: string + description: incremental update window adjustment + defaultDescription: 0 for caches with defined GetLastKnownUpdated + chunk-size: + type: integer + description: number of rows to request from Ydb, 0 to fetch all rows in one request + defaultDescription: 1000 + ydbcomponent: + type: string + description: Ydb component name + defaultDescription: "" +)"; + return yaml_config::MergeSchemas(schema); +} + +} // namespace components + +namespace utils::impl::projected_set { + +template +void CacheInsertOrAssign(Set& set, Value&& value, + const KeyMember& /*key_member*/) { + DoInsert(set, std::forward(value)); +} + +} // namespace utils::impl::projected_set + +USERVER_NAMESPACE_END \ No newline at end of file diff --git a/ydb/tests/cache.cpp b/ydb/tests/cache.cpp new file mode 100644 index 000000000000..18067df19021 --- /dev/null +++ b/ydb/tests/cache.cpp @@ -0,0 +1,353 @@ +#include "cache.hpp" + +#include + +#include +#include + +USERVER_NAMESPACE_BEGIN + +// This is a snippet for documentation +/*! [Ydb Cache Policy Example] */ +namespace example { + +struct MyStructure { + int id = 0; + std::string bar{}; + std::chrono::system_clock::time_point updated; + + int get_id() const { return id; } +}; + +struct YdbExamplePolicy { + // Name of caching policy component. + // + // Required: **yes** + static constexpr std::string_view kName = "ydb-cache"; + + // Object type. + // + // Required: **yes** + using ValueType = MyStructure; + + // Key by which the object must be identified in cache. + // + // One of: + // - A pointer-to-member in the object + // - A pointer-to-member-function in the object that returns the key + // - A pointer-to-function that takes the object and returns the key + // - A lambda that takes the object and returns the key + // + // Required: **yes** + static constexpr auto kKeyMember = &MyStructure::id; + + // Data retrieve query. + // + // The query should not contain any clauses after the `from` clause. Either + // `kQuery` or `GetQuery` static member function must be defined. + // + // Required: **yes** + static constexpr const char* kQuery = + "select id, bar, updated from test.my_data"; + + // Name of the field containing timestamp of an object. + // + // To turn off incremental updates, set the value to `nullptr`. + // + // Required: **yes** + static constexpr const char* kUpdatedField = "updated"; + + // Type of the field containing timestamp of an object. + // + // Specifies whether updated field should be treated as a timestamp + // with or without timezone in database queries. + // + // Required: **yes** if incremental updates are used. + using UpdatedFieldType = storages::ydb::DateTime; + + // Where clause of the query. Either `kWhere` or `GetWhere` can be defined. + // + // Required: no + static constexpr const char* kWhere = "id > 10"; + + // Cache container type. + // + // It can be of any map type. The default is `unordered_map`, it is not + // necessary to declare the DataType alias if you are OK with + // `unordered_map`. + // The key type must match the type of kKeyMember. + // + // Required: no + using CacheContainer = std::unordered_map; + + // Cluster host selection flags to use when retrieving data. + // + // Default value is storages::Ydb::ClusterHostType::kSlave, at least one + // cluster role must be present in flags. + // + // Required: no + static constexpr auto kClusterHostType = + ydb::ClusterHostType::kPrimary; + + // Whether Get() is expected to return nullptr. + // + // Default value is false, Get() will throw an exception instead of + // returning nullptr. + // + // Required: no + static constexpr bool kMayReturnNull = false; +}; + +} // namespace example +/*! [Ydb Cache Policy Example] */ + +namespace components::example { + +using USERVER_NAMESPACE::example::MyStructure; +using USERVER_NAMESPACE::example::YdbExamplePolicy; + +struct YdbExamplePolicy2 { + using ValueType = MyStructure; + static constexpr std::string_view kName = "ydb-cache"; + static constexpr const char* kQuery = + "select id, bar, updated from test.my_data"; + static constexpr const char* kUpdatedField = ""; // Intentionally left blank + static constexpr auto kKeyMember = &MyStructure::get_id; + static constexpr auto kClusterHostType = + storages::ydb::ClusterHostType::kPrimary; +}; + +static_assert(ydb_cache::detail::kHasName); +static_assert(ydb_cache::detail::kHasName); +static_assert(ydb_cache::detail::kHasName); + +static_assert((std::is_same< + ydb_cache::detail::KeyMemberType, int>{})); +static_assert( + (std::is_same, + int>{})); + +static_assert(ydb_cache::detail::ClusterHostType() == + storages::ydb::ClusterHostType::kPrimary); +static_assert(ydb_cache::detail::ClusterHostType() == + storages::ydb::ClusterHostType::kPrimary); + +// Example of custom updated in cache +/*! [Ydb Cache Policy Custom Updated Example] */ +struct MyStructureWithRevision { + int id = 0; + std::string bar{}; + std::chrono::system_clock::time_point updated; + int32_t revision = 0; + + int get_id() const { return id; } +}; + +class UserSpecificCache { + public: + void insert_or_assign(int, MyStructureWithRevision&& item) { + latest_revision_ = std::max(latest_revision_, item.revision); + } + // todo ivan check + void insert_or_assign(int, MyStructureWithRevision& item) { + latest_revision_ = std::max(latest_revision_, item.revision); + } + static size_t size() { return 0; } + + int GetLatestRevision() const { return latest_revision_; } + + private: + int latest_revision_ = 0; +}; + +struct YdbExamplePolicy3 { + using ValueType = MyStructureWithRevision; + static constexpr std::string_view kName = "ydb-cache"; + static constexpr const char* kQuery = + "select id, bar, revision from test.my_data"; + using CacheContainer = UserSpecificCache; + static constexpr const char* kUpdatedField = "revision"; + using UpdatedFieldType = int32_t; + static constexpr auto kKeyMember = &MyStructureWithRevision::get_id; + + // Function to get last known revision/time + // + // Optional + // If one wants to get cache updates not based on updated time, but, for + // example, based on revision > known_revision, this method should be used. + static int32_t GetLastKnownUpdated(const UserSpecificCache& container) { + return container.GetLatestRevision(); + } +}; +/*! [Ydb Cache Policy Custom Updated Example] */ + +static_assert(ydb_cache::detail::kHasCustomUpdated); + +/*! [Ydb Cache Policy GetQuery Example] */ +struct YdbExamplePolicy4 { + static constexpr std::string_view kName = "ydb-cache"; + + using ValueType = MyStructure; + + static constexpr auto kKeyMember = &MyStructure::id; + + static std::string GetQuery() { + return "select id, bar, updated from test.my_data"; + } + + static constexpr const char* kUpdatedField = "updated"; + using UpdatedFieldType = + storages::ydb::DateTime; // no time zone (should be avoided) +}; +/*! [Ydb Cache Policy GetQuery Example] */ + +static_assert(ydb_cache::detail::kHasGetQuery); + +/*! [Ydb Cache Policy Trivial] */ +struct YdbTrivialPolicy { + static constexpr std::string_view kName = "ydb-cache"; + + using ValueType = MyStructure; + + static constexpr auto kKeyMember = &MyStructure::id; + + static constexpr const char* kQuery = "SELECT a, b, updated FROM test.data"; + + static constexpr const char* kUpdatedField = "updated"; + using UpdatedFieldType = storages::ydb::DateTime; +}; +/*! [Ydb Cache Policy Trivial] */ + +/*! [Ydb Cache Policy Compound Primary Key Example] */ +struct MyStructureCompoundKey { + int id; + std::string bar; + + bool operator==(const MyStructureCompoundKey& other) const { + return id == other.id && bar == other.bar; + } +}; + +// Alternatively, specialize std::hash +struct MyStructureCompoundKeyHash { + size_t operator()(const MyStructureCompoundKey& key) const { + size_t seed = 0; + boost::hash_combine(seed, key.id); + boost::hash_combine(seed, key.bar); + return seed; + } +}; + +struct YdbExamplePolicy5 { + static constexpr std::string_view kName = "ydb-cache"; + + using ValueType = MyStructure; + + // maybe_unused is required due to a Clang bug + [[maybe_unused]] static constexpr auto kKeyMember = + [](const MyStructure& my_structure) { + return MyStructureCompoundKey{my_structure.id, my_structure.bar}; + }; + + static std::string GetQuery() { + return "select id, bar, updated from test.my_data"; + } + + static constexpr const char* kUpdatedField = "updated"; + using UpdatedFieldType = storages::ydb::DateTime; + + using CacheContainer = std::unordered_map; +}; +/*! [Ydb Cache Policy Compound Primary Key Example] */ + +static_assert(ydb_cache::detail::kHasGetQuery); + +/*! [Ydb Cache Policy Custom Container With Write Notification Example] */ +class UserSpecificCacheWithWriteNotification { + public: + void insert_or_assign(int, MyStructure&&) {} + void insert_or_assign(int, MyStructure&) {} + static size_t size() { return 0; } + + void OnWritesDone() {} +}; +/*! [Ydb Cache Policy Custom Container With Write Notification Example] */ + +// Tests a container with OnWritesDone +struct YdbExamplePolicy6 { + static constexpr std::string_view kName = "ydb-cache"; + using ValueType = MyStructure; + static constexpr auto kKeyMember = &MyStructure::id; + static constexpr const char* kQuery = + "select id, bar, updated from test.my_data"; + static constexpr const char* kUpdatedField = "updated"; + using UpdatedFieldType = storages::ydb::DateTime; + using CacheContainer = UserSpecificCacheWithWriteNotification; +}; + +// Tests ProjectedUnorderedSet as container +struct YdbExamplePolicy7 { + static constexpr std::string_view kName = "ydb-cache"; + using ValueType = MyStructure; + static constexpr auto kKeyMember = &MyStructure::id; + static constexpr const char* kQuery = + "select id, bar, updated from test.my_data"; + static constexpr const char* kUpdatedField = "updated"; + using UpdatedFieldType = storages::ydb::DateTime; + using CacheContainer = utils::ProjectedUnorderedSet; +}; + +// Instantiation test + +using MyCache1 = YdbCache; +using MyCache2 = YdbCache; +using MyCache3 = YdbCache; +using MyCache4 = YdbCache; +using MyTrivialCache = YdbCache; +using MyCache5 = YdbCache; +using MyCache6 = YdbCache; +using MyCache7 = YdbCache; + +// NB: field access required for actual instantiation +static_assert(MyCache1::kIncrementalUpdates); +static_assert(!MyCache2::kIncrementalUpdates); +static_assert(MyCache3::kIncrementalUpdates); +static_assert(MyCache4::kIncrementalUpdates); +static_assert(MyCache5::kIncrementalUpdates); +static_assert(MyCache6::kIncrementalUpdates); +static_assert(MyCache7::kIncrementalUpdates); + +namespace ydb = storages::ydb; +static_assert(MyCache1::kClusterHostTypeFlags == ydb::ClusterHostType::kPrimary); +static_assert(MyCache2::kClusterHostTypeFlags == ydb::ClusterHostType::kPrimary); +static_assert(MyCache3::kClusterHostTypeFlags == ydb::ClusterHostType::kPrimary); +static_assert(MyCache4::kClusterHostTypeFlags == ydb::ClusterHostType::kPrimary); +static_assert(MyCache5::kClusterHostTypeFlags == ydb::ClusterHostType::kPrimary); +static_assert(MyCache6::kClusterHostTypeFlags == ydb::ClusterHostType::kPrimary); +static_assert(MyCache7::kClusterHostTypeFlags == ydb::ClusterHostType::kPrimary); + +// Update() instantiation test +[[maybe_unused]] void VerifyUpdateCompiles( + const components::ComponentConfig& config, + const components::ComponentContext& context) { + MyCache1 cache1{config, context}; + MyCache2 cache2{config, context}; + MyCache3 cache3{config, context}; + MyCache4 cache4{config, context}; + MyTrivialCache my_trivial_cache{config, context}; + MyCache5 cache5{config, context}; + MyCache6 cache6{config, context}; + MyCache7 cache7{config, context}; +} + +inline auto SampleOfComponentRegistration() { + /*! [Ydb Cache Trivial Usage] */ + return components::MinimalServerComponentList() + .Append>(); + /*! [Ydb Cache Trivial Usage] */ +} + +} // namespace components::example + +USERVER_NAMESPACE_END \ No newline at end of file diff --git a/ydb/tests/cache.hpp b/ydb/tests/cache.hpp new file mode 100644 index 000000000000..c832fbcca410 --- /dev/null +++ b/ydb/tests/cache.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include // for std::shared_ptr + +#include + +USERVER_NAMESPACE_BEGIN + +namespace example { // replace with a namespace of your trait + +struct YdbExamplePolicy; +struct YdbStructure; + +} // namespace example + +namespace caches { + +using MyCache1 = components::YdbCache; +using MyCache1Data = std::shared_ptr; + +} // namespace caches + + +USERVER_NAMESPACE_END \ No newline at end of file