Skip to content

Antalya: Cache the list objects operation on object storage using a TTL + prefix matching cache implementation #743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 55 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
3dc33f3
draft immpl
arthurpassos Apr 17, 2025
7e182c7
remove garbage
arthurpassos Apr 17, 2025
ef985c9
use starts_with
arthurpassos Apr 17, 2025
43c1383
fix a few obvious bugs
arthurpassos Apr 17, 2025
989cfe0
some other fixes
arthurpassos Apr 17, 2025
157450d
another fix
arthurpassos Apr 17, 2025
d4af4ae
Merge branch 'antalya' into list_objects_cache
arthurpassos Apr 18, 2025
727b64d
remove unused method
arthurpassos Apr 18, 2025
68cbad7
some other fixes
arthurpassos Apr 18, 2025
00f58b3
some other fixes
arthurpassos Apr 18, 2025
67ccaf0
use steady_clock instead of system_clock
arthurpassos Apr 18, 2025
2b37e0c
small fixes
arthurpassos Apr 18, 2025
0d6e343
Dont return null in case of exact match with stale entry, perform lin…
arthurpassos Apr 18, 2025
0f605c4
add some metrics
arthurpassos Apr 18, 2025
3ed2349
implement cache clear command
arthurpassos Apr 18, 2025
f345b33
remove ifdef from parquet
arthurpassos Apr 18, 2025
9242843
add stateless tests
arthurpassos Apr 20, 2025
4e19b09
add unit tests
arthurpassos Apr 20, 2025
7a6eaec
Merge branch 'antalya' into list_objects_cache
arthurpassos Apr 20, 2025
8c4ea48
rename ttl argument and member variable
arthurpassos Apr 20, 2025
74b980c
new settings history
arthurpassos Apr 20, 2025
4f55a75
make the setting false by default so other tests are not affected
arthurpassos Apr 21, 2025
303ee27
add ref file
arthurpassos Apr 21, 2025
e6b379e
remove cachedglobiterator in favor of expensive copy. I think I'll re…
arthurpassos Apr 21, 2025
d7b50f4
simplify things a bit
arthurpassos Apr 21, 2025
6cfa510
add some more tests
arthurpassos Apr 22, 2025
be8c6a1
make cache return a copy instead of a pointer, we don't want modifica…
arthurpassos Apr 22, 2025
b60cb95
update ut
arthurpassos Apr 22, 2025
d91bf00
improve prefix matching by implementing search with time complexity o…
arthurpassos Apr 22, 2025
c6e53a1
Merge branch 'antalya' into list_objects_cache
arthurpassos Apr 29, 2025
f1c3591
Merge branch 'antalya' into list_objects_cache
arthurpassos Apr 29, 2025
14973d2
Merge branch 'antalya' into list_objects_cache
arthurpassos Apr 29, 2025
55ac0bc
Merge branch 'antalya' into list_objects_cache
arthurpassos May 1, 2025
e0e19a2
idraft impl of authorization aware cache
arthurpassos May 2, 2025
45af8a5
rename setting
arthurpassos May 2, 2025
7266d92
delete copy/move constructors and assignment operators
arthurpassos May 2, 2025
8e78b28
azure impl and fix aws
arthurpassos May 5, 2025
2ed102d
docs
arthurpassos May 5, 2025
28bfcfb
fix tests
arthurpassos May 5, 2025
aab089c
Merge branch 'antalya' into list_objects_cache
arthurpassos May 5, 2025
dd5934e
incorporate comments on tests
arthurpassos May 5, 2025
0f5057e
remove unused code
arthurpassos May 5, 2025
27c4dea
Revert "remove unused code"
arthurpassos May 6, 2025
d789d1e
Revert "incorporate comments on tests"
arthurpassos May 6, 2025
fef71c0
Revert "docs"
arthurpassos May 6, 2025
6bfcb86
Revert "azure impl and fix aws"
arthurpassos May 6, 2025
f68725a
Revert "idraft impl of authorization aware cache"
arthurpassos May 6, 2025
7597da0
Merge branch 'antalya' into list_objects_cache
arthurpassos May 6, 2025
e7940af
Merge branch 'antalya' into list_objects_cache
arthurpassos May 6, 2025
f863a6e
use description as part of the key
arthurpassos May 6, 2025
cbfe36d
Reapply "incorporate comments on tests"
arthurpassos May 6, 2025
9092aba
remove unused code
arthurpassos May 6, 2025
057b0b5
add supportsListObjectsCache to enable this cache for s3-like (minio,…
arthurpassos May 7, 2025
49748c9
increase default cache size
arthurpassos May 7, 2025
96cf2d2
fix weight funciton
arthurpassos May 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
#include <Storages/System/attachInformationSchemaTables.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Storages/Cache/registerRemoteFileMetadatas.h>
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/registerFunctions.h>
Expand Down Expand Up @@ -318,6 +319,9 @@ namespace ServerSetting
extern const ServerSettingsUInt64 max_prefixes_deserialization_thread_pool_free_size;
extern const ServerSettingsUInt64 prefixes_deserialization_thread_pool_thread_pool_queue_size;
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_size;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl;
}

}
Expand Down Expand Up @@ -2327,6 +2331,10 @@ try
if (dns_cache_updater)
dns_cache_updater->start();

ObjectStorageListObjectsCache::instance().setMaxSizeInBytes(server_settings[ServerSetting::object_storage_list_objects_cache_size]);
ObjectStorageListObjectsCache::instance().setMaxCount(server_settings[ServerSetting::object_storage_list_objects_cache_max_entries]);
ObjectStorageListObjectsCache::instance().setTTL(server_settings[ServerSetting::object_storage_list_objects_cache_ttl]);

auto replicas_reconnector = ReplicasReconnector::init(global_context);
ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]);

Expand Down
3 changes: 2 additions & 1 deletion src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ enum class AccessType : uint8_t
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE, "SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \
Expand Down
6 changes: 5 additions & 1 deletion src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,11 @@ The server successfully detected this situation and will download merged part fr
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \
\
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheHits, "Number of times object storage list objects operation hit the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \
M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \

#ifdef APPLY_FOR_EXTERNAL_EVENTS
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)
Expand Down
4 changes: 2 additions & 2 deletions src/Common/TTLCachePolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,10 @@ class TTLCachePolicy : public ICachePolicy<Key, Mapped, HashFunction, WeightFunc
return res;
}

private:
protected:
using Cache = std::unordered_map<Key, MappedPtr, HashFunction>;
Cache cache;

private:
/// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators
/// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a
/// binary search on the sorted container and erase all left of the found key.
Expand Down
6 changes: 4 additions & 2 deletions src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1004,8 +1004,10 @@ namespace DB
```
)", 0) \
DECLARE(Bool, storage_shared_set_join_use_inner_uuid, false, "If enabled, an inner UUID is generated during the creation of SharedSet and SharedJoin. ClickHouse Cloud only", 0) \
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \

DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_size, 500000000, "Maximum size of ObjectStorage list objects cache in bytes. Zero means disabled.", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0) \
// clang-format on

/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6108,6 +6108,9 @@ Limit for hosts used for request in object storage cluster table functions - azu
Possible values:
- Positive integer.
- 0 — All hosts in cluster.
)", EXPERIMENTAL) \
DECLARE(Bool, use_object_storage_list_objects_cache, false, R"(
Cache the list of objects returned by list objects calls in object storage
)", EXPERIMENTAL) \
\
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"use_iceberg_metadata_files_cache", true, true, "New setting"},
{"iceberg_timestamp_ms", 0, 0, "New setting."},
{"iceberg_snapshot_id", 0, 0, "New setting."},
{"use_object_storage_list_objects_cache", true, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "24.12.2.20000",
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class AzureObjectStorage : public IObjectStorage
const String & object_namespace_,
const String & description_);

bool supportsListObjectsCache() override { return true; }

void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override;

ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const override;
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ class IObjectStorage
#endif


virtual bool supportsListObjectsCache() { return false; }

private:
mutable std::mutex throttlers_mutex;
ThrottlerPtr remote_read_throttler;
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/ObjectStorages/S3/S3ObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class S3ObjectStorage : public IObjectStorage

ObjectStorageType getType() const override { return ObjectStorageType::S3; }

bool supportsListObjectsCache() override { return true; }

bool exists(const StoredObject & object) const override;

std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
Expand Down
8 changes: 8 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <Access/Common/AllowedClientHosts.h>
#include <Databases/DatabaseReplicated.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/Freeze.h>
Expand Down Expand Up @@ -437,6 +438,12 @@ BlockIO InterpreterSystemQuery::execute()
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet");
#endif
}
case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
Copy link
Member

@Enmk Enmk Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is caching works only on Parquet files or generally on any S3 ListObject requests?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, copy and paste issues. Should be any :D

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

{
getContext()->checkAccess(AccessType::SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE);
ObjectStorageListObjectsCache::instance().clear();
break;
}
case Type::DROP_COMPILED_EXPRESSION_CACHE:
#if USE_EMBEDDED_COMPILER
getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE);
Expand Down Expand Up @@ -1469,6 +1476,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_SCHEMA_CACHE:
case Type::DROP_FORMAT_SCHEMA_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
case Type::DROP_S3_CLIENT_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
case Type::DROP_COMPILED_EXPRESSION_CACHE:
case Type::DROP_S3_CLIENT_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
case Type::DROP_ICEBERG_METADATA_CACHE:
case Type::RESET_COVERAGE:
case Type::RESTART_REPLICAS:
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
DROP_FORMAT_SCHEMA_CACHE,
DROP_S3_CLIENT_CACHE,
DROP_PARQUET_METADATA_CACHE,
DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE,
STOP_LISTEN,
START_LISTEN,
RESTART_REPLICAS,
Expand Down
210 changes: 210 additions & 0 deletions src/Storages/Cache/ObjectStorageListObjectsCache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
#include <Common/TTLCachePolicy.h>
#include <Common/ProfileEvents.h>
#include <boost/functional/hash.hpp>

namespace ProfileEvents
{
extern const Event ObjectStorageListObjectsCacheHits;
extern const Event ObjectStorageListObjectsCacheMisses;
extern const Event ObjectStorageListObjectsCacheExactMatchHits;
extern const Event ObjectStorageListObjectsCachePrefixMatchHits;
}

namespace DB
{

template <typename Key, typename Mapped, typename HashFunction, typename WeightFunction, typename IsStaleFunction>
class ObjectStorageListObjectsCachePolicy : public TTLCachePolicy<Key, Mapped, HashFunction, WeightFunction, IsStaleFunction>
{
public:
using BasePolicy = TTLCachePolicy<Key, Mapped, HashFunction, WeightFunction, IsStaleFunction>;
using typename BasePolicy::MappedPtr;
using typename BasePolicy::KeyMapped;
using BasePolicy::cache;

ObjectStorageListObjectsCachePolicy()
: BasePolicy(std::make_unique<NoCachePolicyUserQuota>())
{
}

std::optional<KeyMapped> getWithKey(const Key & key) override
{
if (const auto it = cache.find(key); it != cache.end())
{
if (!IsStaleFunction()(it->first))
{
return std::make_optional<KeyMapped>({it->first, it->second});
}
// found a stale entry, remove it but don't return. We still want to perform the prefix matching search
BasePolicy::remove(it->first);
}

if (const auto it = findBestMatchingPrefixAndRemoveExpiredEntries(key); it != cache.end())
{
return std::make_optional<KeyMapped>({it->first, it->second});
}

return std::nullopt;
}

private:
auto findBestMatchingPrefixAndRemoveExpiredEntries(Key key)
{
while (!key.prefix.empty())
{
if (const auto it = cache.find(key); it != cache.end())
{
if (IsStaleFunction()(it->first))
{
BasePolicy::remove(it->first);
}
else
{
return it;
}
}

key.prefix.pop_back();
}

return cache.end();
}
};

ObjectStorageListObjectsCache::Key::Key(
const String & storage_description_,
const String & bucket_,
const String & prefix_,
const std::chrono::steady_clock::time_point & expires_at_,
std::optional<UUID> user_id_)
: storage_description(storage_description_), bucket(bucket_), prefix(prefix_), expires_at(expires_at_), user_id(user_id_) {}

bool ObjectStorageListObjectsCache::Key::operator==(const Key & other) const
{
return storage_description == other.storage_description && bucket == other.bucket && prefix == other.prefix;
}

size_t ObjectStorageListObjectsCache::KeyHasher::operator()(const Key & key) const
{
std::size_t seed = 0;

boost::hash_combine(seed, key.storage_description);
boost::hash_combine(seed, key.bucket);
boost::hash_combine(seed, key.prefix);

return seed;
}

bool ObjectStorageListObjectsCache::IsStale::operator()(const Key & key) const
{
return key.expires_at < std::chrono::steady_clock::now();
}

size_t ObjectStorageListObjectsCache::WeightFunction::operator()(const Value & value) const
{
std::size_t weight = 0;

for (const auto & object : value)
{
const auto object_metadata = object->metadata;
weight += object->relative_path.capacity() + sizeof(object_metadata);

// variable size
if (object_metadata)
{
weight += object_metadata->etag.capacity();
weight += object_metadata->attributes.size() * (sizeof(std::string) * 2);

for (const auto & [k, v] : object_metadata->attributes)
{
weight += k.capacity() + v.capacity();
}
}
}

return weight;
}

ObjectStorageListObjectsCache::ObjectStorageListObjectsCache()
: cache(std::make_unique<ObjectStorageListObjectsCachePolicy<Key, Value, KeyHasher, WeightFunction, IsStale>>())
{
}

void ObjectStorageListObjectsCache::set(
const Key & key,
const std::shared_ptr<Value> & value)
{
auto key_with_ttl = key;
key_with_ttl.expires_at = std::chrono::steady_clock::now() + std::chrono::seconds(ttl_in_seconds);

cache.set(key_with_ttl, value);
}

void ObjectStorageListObjectsCache::clear()
{
cache.clear();
}

std::optional<ObjectStorageListObjectsCache::Value> ObjectStorageListObjectsCache::get(const Key & key, bool filter_by_prefix)
{
const auto pair = cache.getWithKey(key);

if (!pair)
{
ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheMisses);
return {};
}

ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheHits);

if (pair->key == key)
{
ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCacheExactMatchHits);
return *pair->mapped;
}

ProfileEvents::increment(ProfileEvents::ObjectStorageListObjectsCachePrefixMatchHits);

if (!filter_by_prefix)
{
return *pair->mapped;
}

Value filtered_objects;

filtered_objects.reserve(pair->mapped->size());

for (const auto & object : *pair->mapped)
{
if (object->relative_path.starts_with(key.prefix))
{
filtered_objects.push_back(object);
}
}

return filtered_objects;
}

void ObjectStorageListObjectsCache::setMaxSizeInBytes(std::size_t size_in_bytes_)
{
cache.setMaxSizeInBytes(size_in_bytes_);
}

void ObjectStorageListObjectsCache::setMaxCount(std::size_t count)
{
cache.setMaxCount(count);
}

void ObjectStorageListObjectsCache::setTTL(std::size_t ttl_in_seconds_)
{
ttl_in_seconds = ttl_in_seconds_;
}

ObjectStorageListObjectsCache & ObjectStorageListObjectsCache::instance()
{
static ObjectStorageListObjectsCache instance;
return instance;
}

}
Loading
Loading