Skip to content
8 changes: 8 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
#include <Storages/System/attachInformationSchemaTables.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Storages/Cache/registerRemoteFileMetadatas.h>
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/registerFunctions.h>
Expand Down Expand Up @@ -325,6 +326,9 @@ namespace ServerSetting
extern const ServerSettingsDouble page_cache_free_memory_ratio;
extern const ServerSettingsUInt64 page_cache_lookahead_blocks;
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_size;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl;
}

}
Expand Down Expand Up @@ -2397,6 +2401,10 @@ try
if (dns_cache_updater)
dns_cache_updater->start();

ObjectStorageListObjectsCache::instance().setMaxSizeInBytes(server_settings[ServerSetting::object_storage_list_objects_cache_size]);
ObjectStorageListObjectsCache::instance().setMaxCount(server_settings[ServerSetting::object_storage_list_objects_cache_max_entries]);
ObjectStorageListObjectsCache::instance().setTTL(server_settings[ServerSetting::object_storage_list_objects_cache_ttl]);

auto replicas_reconnector = ReplicasReconnector::init(global_context);
ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]);

Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ enum class AccessType : uint8_t
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE, "SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \
Expand Down
4 changes: 4 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,10 @@ The server successfully detected this situation and will download merged part fr
M(QueryPreempted, "How many times tasks are paused and waiting due to 'priority' setting", ValueType::Number) \
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheHits, "Number of times object storage list objects operation hit the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \
M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \

#ifdef APPLY_FOR_EXTERNAL_EVENTS
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)
Expand Down
4 changes: 2 additions & 2 deletions src/Common/TTLCachePolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,10 @@ class TTLCachePolicy : public ICachePolicy<Key, Mapped, HashFunction, WeightFunc
return res;
}

private:
protected:
using Cache = std::unordered_map<Key, MappedPtr, HashFunction>;
Cache cache;

private:
/// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators
/// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a
/// binary search on the sorted container and erase all left of the found key.
Expand Down
3 changes: 3 additions & 0 deletions src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,9 @@ namespace DB
)", 0) \
DECLARE(Bool, storage_shared_set_join_use_inner_uuid, false, "If enabled, an inner UUID is generated during the creation of SharedSet and SharedJoin. ClickHouse Cloud only", 0) \
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_size, 500000000, "Maximum size of ObjectStorage list objects cache in bytes. Zero means disabled.", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0)
// clang-format on

/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6135,6 +6135,9 @@ Possible values:
/** Experimental tsToGrid aggregate function. */ \
DECLARE(Bool, allow_experimental_ts_to_grid_aggregate_function, false, R"(
Experimental tsToGrid aggregate function for Prometheus-like timeseries resampling. Cloud only
)", EXPERIMENTAL) \
DECLARE(Bool, use_object_storage_list_objects_cache, false, R"(
Cache the list of objects returned by list objects calls in object storage

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong merge here, as result object_storage_remote_initiator goes into description.

)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
// Altinity Antalya modifications atop of 25.2
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"use_object_storage_list_objects_cache", true, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.3",
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class AzureObjectStorage : public IObjectStorage
const String & object_namespace_,
const String & description_);

bool supportsListObjectsCache() override { return true; }

void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override;

ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const override;
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ class IObjectStorage
#endif


virtual bool supportsListObjectsCache() { return false; }

private:
mutable std::mutex throttlers_mutex;
ThrottlerPtr remote_read_throttler;
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/ObjectStorages/S3/S3ObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class S3ObjectStorage : public IObjectStorage

ObjectStorageType getType() const override { return ObjectStorageType::S3; }

bool supportsListObjectsCache() override { return true; }

bool exists(const StoredObject & object) const override;

std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
Expand Down
8 changes: 8 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <Access/Common/AllowedClientHosts.h>
#include <Databases/DatabaseReplicated.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/Freeze.h>
Expand Down Expand Up @@ -431,6 +432,12 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->clearQueryResultCache(query.query_result_cache_tag);
break;
}
case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE);
ObjectStorageListObjectsCache::instance().clear();
break;
}
case Type::DROP_PARQUET_METADATA_CACHE:
{
#if USE_PARQUET
Expand Down Expand Up @@ -1488,6 +1495,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_SCHEMA_CACHE:
case Type::DROP_FORMAT_SCHEMA_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
case Type::DROP_S3_CLIENT_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
case Type::DROP_S3_CLIENT_CACHE:
case Type::DROP_ICEBERG_METADATA_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
case Type::RESET_COVERAGE:
case Type::RESTART_REPLICAS:
case Type::JEMALLOC_PURGE:
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
DROP_SCHEMA_CACHE,
DROP_FORMAT_SCHEMA_CACHE,
DROP_S3_CLIENT_CACHE,
DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE,
DROP_PARQUET_METADATA_CACHE,
STOP_LISTEN,
START_LISTEN,
Expand Down
210 changes: 210 additions & 0 deletions src/Storages/Cache/ObjectStorageListObjectsCache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
#include <Common/TTLCachePolicy.h>
#include <Common/ProfileEvents.h>
#include <boost/functional/hash.hpp>

namespace ProfileEvents
{
extern const Event ObjectStorageListObjectsCacheHits;
extern const Event ObjectStorageListObjectsCacheMisses;
extern const Event ObjectStorageListObjectsCacheExactMatchHits;
extern const Event ObjectStorageListObjectsCachePrefixMatchHits;
}

namespace DB
{

template <typename Key, typename Mapped, typename HashFunction, typename WeightFunction, typename IsStaleFunction>
class ObjectStorageListObjectsCachePolicy : public TTLCachePolicy<Key, Mapped, HashFunction, WeightFunction, IsStaleFunction>
{
public:
using BasePolicy = TTLCachePolicy<Key, Mapped, HashFunction, WeightFunction, IsStaleFunction>;
using typename BasePolicy::MappedPtr;
using typename BasePolicy::KeyMapped;
using BasePolicy::cache;

ObjectStorageListObjectsCachePolicy()
: BasePolicy(std::make_unique<NoCachePolicyUserQuota>())
{
}

std::optional<KeyMapped> getWithKey(const Key & key) override
{
if (const auto it = cache.find(key); it != cache.end())
{
if (!IsStaleFunction()(it->first))
{
return std::make_optional<KeyMapped>({it->first, it->second});
}
// found a stale entry, remove it but don't return. We still want to perform the prefix matching search
BasePolicy::remove(it->first);
}

if (const auto it = findBestMatchingPrefixAndRemoveExpiredEntries(key); it != cache.end())
{
return std::make_optional<KeyMapped>({it->first, it->second});
}

return std::nullopt;
}

private:
auto findBestMatchingPrefixAndRemoveExpiredEntries(Key key)
{
while (!key.prefix.empty())
{
if (const auto it = cache.find(key); it != cache.end())
{
if (IsStaleFunction()(it->first))
{
BasePolicy::remove(it->first);
}
else
{
return it;
}
}

key.prefix.pop_back();
}

return cache.end();
}
};

ObjectStorageListObjectsCache::Key::Key(
const String & storage_description_,
const String & bucket_,
const String & prefix_,
const std::chrono::steady_clock::time_point & expires_at_,
std::optional<UUID> user_id_)
: storage_description(storage_description_), bucket(bucket_), prefix(prefix_), expires_at(expires_at_), user_id(user_id_) {}

bool ObjectStorageListObjectsCache::Key::operator==(const Key & other) const
{
return storage_description == other.storage_description && bucket == other.bucket && prefix == other.prefix;
}

size_t ObjectStorageListObjectsCache::KeyHasher::operator()(const Key & key) const
{
std::size_t seed = 0;

boost::hash_combine(seed, key.storage_description);
boost::hash_combine(seed, key.bucket);
boost::hash_combine(seed, key.prefix);

return seed;
}

bool ObjectStorageListObjectsCache::IsStale::operator()(const Key & key) const
{
return key.expires_at < std::chrono::steady_clock::now();
}

size_t ObjectStorageListObjectsCache::WeightFunction::operator()(const Value & value) const
{
std::size_t weight = 0;

for (const auto & object : value)
{
const auto object_metadata = object->metadata;
weight += object->relative_path.capacity() + sizeof(object_metadata);

// variable size
if (object_metadata)
{
weight += object_metadata->etag.capacity();
weight += object_metadata->attributes.size() * (sizeof(std::string) * 2);

for (const auto & [k, v] : object_metadata->attributes)
{
weight += k.capacity() + v.capacity();
}
}
}

return weight;
}

ObjectStorageListObjectsCache::ObjectStorageListObjectsCache()
: cache(std::make_unique<ObjectStorageListObjectsCachePolicy<Key, Value, KeyHasher, WeightFunction, IsStale>>())
{
}

void ObjectStorageListObjectsCache::set(
const Key & key,
const std::shared_ptr<Value> & value)
{
auto key_with_ttl = key;
key_with_ttl.expires_at = std::chrono::steady_clock::now() + std::chrono::seconds(ttl_in_seconds);

cache.set(key_with_ttl, value);
}

void ObjectStorageListObjectsCache::clear()
{
cache.clear();
}

std::optional<ObjectStorageListObjectsCache::Value> ObjectStorageListObjectsCache::get(const Key & key, bool filter_by_prefix)
{
const auto pair = cache.getWithKey(key);

if (!pair)
{
ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheMisses);
return {};
}

ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheHits);

if (pair->key == key)
{
ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheExactMatchHits);
return *pair->mapped;
}

ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCachePrefixMatchHits);

if (!filter_by_prefix)
{
return *pair->mapped;
}

Value filtered_objects;

filtered_objects.reserve(pair->mapped->size());

for (const auto & object : *pair->mapped)
{
if (object->relative_path.starts_with(key.prefix))
{
filtered_objects.push_back(object);
}
}

return filtered_objects;
}

void ObjectStorageListObjectsCache::setMaxSizeInBytes(std::size_t size_in_bytes_)
{
cache.setMaxSizeInBytes(size_in_bytes_);
}

void ObjectStorageListObjectsCache::setMaxCount(std::size_t count)
{
cache.setMaxCount(count);
}

void ObjectStorageListObjectsCache::setTTL(std::size_t ttl_in_seconds_)
{
ttl_in_seconds = ttl_in_seconds_;
}

ObjectStorageListObjectsCache & ObjectStorageListObjectsCache::instance()
{
static ObjectStorageListObjectsCache instance;
return instance;
}

}
Loading
Loading