Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
#include <Storages/System/attachSystemTables.h>
#include <Storages/System/attachInformationSchemaTables.h>
#include <Storages/Cache/registerRemoteFileMetadatas.h>
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/registerFunctions.h>
Expand Down Expand Up @@ -156,6 +157,10 @@
# include <azure/core/diagnostics/logger.hpp>
#endif

#if USE_PARQUET
# include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#endif


#include <incbin.h>
/// A minimal file used when the server is run without installation
Expand Down Expand Up @@ -326,6 +331,11 @@ namespace ServerSetting
extern const ServerSettingsUInt64 os_cpu_busy_time_threshold;
extern const ServerSettingsFloat min_os_cpu_wait_time_ratio_to_drop_connection;
extern const ServerSettingsFloat max_os_cpu_wait_time_ratio_to_drop_connection;
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_size;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_max_entries;
extern const ServerSettingsUInt64 object_storage_list_objects_cache_ttl;

}

namespace ErrorCodes
Expand Down Expand Up @@ -413,6 +423,7 @@ namespace ErrorCodes
extern const int NETWORK_ERROR;
extern const int CORRUPTED_DATA;
extern const int BAD_ARGUMENTS;
extern const int STARTUP_SCRIPTS_ERROR;
}


Expand Down Expand Up @@ -2421,8 +2432,16 @@ try
if (dns_cache_updater)
dns_cache_updater->start();

ObjectStorageListObjectsCache::instance().setMaxSizeInBytes(server_settings[ServerSetting::object_storage_list_objects_cache_size]);
ObjectStorageListObjectsCache::instance().setMaxCount(server_settings[ServerSetting::object_storage_list_objects_cache_max_entries]);
ObjectStorageListObjectsCache::instance().setTTL(server_settings[ServerSetting::object_storage_list_objects_cache_ttl]);

auto replicas_reconnector = ReplicasReconnector::init(global_context);

#if USE_PARQUET
ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]);
#endif

/// Set current database name before loading tables and databases because
/// system logs may copy global context.
std::string default_database = server_settings[ServerSetting::default_database];
Expand Down
2 changes: 2 additions & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ enum class AccessType : uint8_t
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE, "SYSTEM DROP OBJECT STORAGE LIST OBJECTS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \
Expand Down
7 changes: 6 additions & 1 deletion src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,12 @@ The server successfully detected this situation and will download merged part fr
M(IndexGenericExclusionSearchAlgorithm, "Number of times the generic exclusion search algorithm is used over the index marks", ValueType::Number) \
M(ParallelReplicasQueryCount, "Number of (sub)queries executed using parallel replicas during a query execution", ValueType::Number) \
M(DistributedConnectionReconnectCount, "Number of reconnects to other servers done during distributed query execution. It can happen when a stale connection has been acquired from connection pool", ValueType::Number) \

M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheHits, "Number of times object storage list objects operation hit the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheMisses, "Number of times object storage list objects operation miss the cache.", ValueType::Number) \
M(ObjectStorageListObjectsCacheExactMatchHits, "Number of times object storage list objects operation hit the cache with an exact match.", ValueType::Number) \
M(ObjectStorageListObjectsCachePrefixMatchHits, "Number of times object storage list objects operation miss the cache using prefix matching.", ValueType::Number) \

#ifdef APPLY_FOR_EXTERNAL_EVENTS
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)
Expand Down
4 changes: 2 additions & 2 deletions src/Common/TTLCachePolicy.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ class TTLCachePolicy : public ICachePolicy<Key, Mapped, HashFunction, WeightFunc
return res;
}

private:
protected:
using Cache = std::unordered_map<Key, MappedPtr, HashFunction>;
Cache cache;

private:
/// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators
/// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a
/// binary search on the sorted container and erase all left of the found key.
Expand Down
3 changes: 1 addition & 2 deletions src/Core/FormatFactorySettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -1351,8 +1351,7 @@ Limits the size of the blocks formed during data parsing in input formats in byt
DECLARE(Bool, input_format_parquet_allow_geoparquet_parser, true, R"(
Use geo column parser to convert Array(UInt8) into Point/Linestring/Polygon/MultiLineString/MultiPolygon types
)", 0) \


DECLARE(Bool, input_format_parquet_use_metadata_cache, true, R"(Enable parquet file metadata caching)", 0) \
// End of FORMAT_FACTORY_SETTINGS

#define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \
Expand Down
5 changes: 4 additions & 1 deletion src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,10 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
See [Controlling behavior on server CPU overload](/operations/settings/server-overload) for more details.
)", 0) \
DECLARE(Float, distributed_cache_keep_up_free_connections_ratio, 0.1f, "Soft limit for number of active connection distributed cache will try to keep free. After the number of free connections goes below distributed_cache_keep_up_free_connections_ratio * max_connections, connections with oldest activity will be closed until the number goes above the limit.", 0) \

DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_size, 500000000, "Maximum size of ObjectStorage list objects cache in bytes. Zero means disabled.", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_max_entries, 1000, "Maximum size of ObjectStorage list objects cache in entries. Zero means disabled.", 0) \
DECLARE(UInt64, object_storage_list_objects_cache_ttl, 3600, "Time to live of records in ObjectStorage list objects cache in seconds. Zero means unlimited", 0)

// clang-format on

Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6865,6 +6865,9 @@ Default number of buckets for distributed shuffle-hash-join.
)", EXPERIMENTAL) \
DECLARE(UInt64, distributed_plan_default_reader_bucket_count, 8, R"(
Default number of tasks for parallel reading in distributed query. Tasks are spread across between replicas.
)", EXPERIMENTAL) \
DECLARE(Bool, use_object_storage_list_objects_cache, false, R"(
Cache the list of objects returned by list objects calls in object storage
)", EXPERIMENTAL) \
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
Expand Down
9 changes: 9 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"parallel_hash_join_threshold", 0, 0, "New setting"},
{"function_date_trunc_return_type_behavior", 1, 0, "Change the result type for dateTrunc function for DateTime64/Date32 arguments to DateTime64/Date32 regardless of time unit to get correct result for negative values"},
/// Release closed. Please use 25.5
// Altinity Antalya modifications atop of 25.2
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"use_object_storage_list_objects_cache", true, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.3",
{
Expand All @@ -192,6 +196,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"parallel_hash_join_threshold", 0, 0, "New setting"},
/// Release closed. Please use 25.4
});
addSettingsChanges(settings_changes_history, "24.12.2.20000",
{
// Altinity Antalya modifications atop of 24.12
{"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586
});
addSettingsChanges(settings_changes_history, "25.2",
{
/// Release closed. Please use 25.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class AzureObjectStorage : public IObjectStorage
const String & description_,
const String & common_key_prefix_);

bool supportsListObjectsCache() override { return true; }

void listObjects(const std::string & path, RelativePathsWithMetadata & children, size_t max_keys) const override;

/// Sanitizer build may crash with max_keys=1; this looks like a false positive.
Expand Down
17 changes: 17 additions & 0 deletions src/Disks/ObjectStorages/IObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,23 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
return write_settings;
}

void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file)
{
if (!metadata)
{
const auto & path = isArchive() ? getPathToArchive() : getPath();

if (ignore_non_existent_file)
{
metadata = object_storage->tryGetObjectMetadata(path);
}
else
{
metadata = object_storage->getObjectMetadata(path);
}
}
}

RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
{
Poco::JSON::Parser parser;
Expand Down
3 changes: 3 additions & 0 deletions src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ struct RelativePathWithMetadata
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }

void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file);
const CommandInTaskResponse & getCommand() const { return command; }
};

Expand Down Expand Up @@ -339,6 +340,8 @@ class IObjectStorage
#endif


virtual bool supportsListObjectsCache() { return false; }

private:
mutable std::mutex throttlers_mutex;
ThrottlerPtr remote_read_throttler;
Expand Down
2 changes: 2 additions & 0 deletions src/Disks/ObjectStorages/S3/S3ObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class S3ObjectStorage : public IObjectStorage

ObjectStorageType getType() const override { return ObjectStorageType::S3; }

bool supportsListObjectsCache() override { return true; }

bool exists(const StoredObject & object) const override;

std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
Expand Down
22 changes: 22 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTSystemQuery.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Storages/Cache/ObjectStorageListObjectsCache.h>
#include <Storages/Freeze.h>
#include <Storages/MaterializedView/RefreshTask.h>
#include <Storages/ObjectStorage/Azure/Configuration.h>
Expand Down Expand Up @@ -79,6 +80,10 @@
#include <Formats/ProtobufSchemas.h>
#endif

#if USE_PARQUET
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#endif

#if USE_AWS_S3
#include <IO/S3/Client.h>
#endif
Expand Down Expand Up @@ -433,6 +438,21 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->clearQueryResultCache(query.query_result_cache_tag);
break;
}
case Type::DROP_PARQUET_METADATA_CACHE:
#if USE_PARQUET
getContext()->checkAccess(AccessType::SYSTEM_DROP_PARQUET_METADATA_CACHE);
ParquetFileMetaDataCache::instance()->clear();
break;
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet");
#endif

case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE);
ObjectStorageListObjectsCache::instance().clear();
break;
}
case Type::DROP_COMPILED_EXPRESSION_CACHE:
#if USE_EMBEDDED_COMPILER
getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE);
Expand Down Expand Up @@ -1533,6 +1553,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_PAGE_CACHE:
case Type::DROP_SCHEMA_CACHE:
case Type::DROP_FORMAT_SCHEMA_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
case Type::DROP_S3_CLIENT_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
Expand Down
2 changes: 2 additions & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,8 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
case Type::DROP_COMPILED_EXPRESSION_CACHE:
case Type::DROP_S3_CLIENT_CACHE:
case Type::DROP_ICEBERG_METADATA_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE:
case Type::RESET_COVERAGE:
case Type::RESTART_REPLICAS:
case Type::JEMALLOC_PURGE:
Expand Down
2 changes: 2 additions & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
DROP_SCHEMA_CACHE,
DROP_FORMAT_SCHEMA_CACHE,
DROP_S3_CLIENT_CACHE,
DROP_PARQUET_METADATA_CACHE,
DROP_OBJECT_STORAGE_LIST_OBJECTS_CACHE,
STOP_LISTEN,
START_LISTEN,
RESTART_REPLICAS,
Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class IInputFormat : public SourceWithKeyCondition

void needOnlyCount() { need_only_count = true; }

/// Set additional info/key/id related to underlying storage of the ReadBuffer
virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {}

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }

Expand Down
Loading
Loading