From 1b93fbe059b94db9e2a09c9e027a7bc81002cf81 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Tue, 22 Apr 2025 17:13:30 +0000 Subject: [PATCH 01/14] Merge pull request #79407 from ClickHouse/support-settings-for-delta-table-in-catalog Allow to specify storage settings for DatabaseCatalog --- .../scripts/check_style/check-settings-style | 2 +- src/Databases/DataLake/DatabaseDataLake.cpp | 2 + .../DataLake/DatabaseDataLakeSettings.cpp | 13 +++++- .../DataLake/DatabaseDataLakeSettings.h | 9 +++- .../StorageObjectStorageSettings.cpp | 41 +++++-------------- .../StorageObjectStorageSettings.h | 36 ++++++++++++++++ 6 files changed, 68 insertions(+), 35 deletions(-) diff --git a/ci/jobs/scripts/check_style/check-settings-style b/ci/jobs/scripts/check_style/check-settings-style index 42c4df16d103..ad0d7616395c 100755 --- a/ci/jobs/scripts/check_style/check-settings-style +++ b/ci/jobs/scripts/check_style/check-settings-style @@ -37,7 +37,7 @@ ALL_DECLARATION_FILES=" $ROOT_PATH/src/Storages/MySQL/MySQLSettings.cpp $ROOT_PATH/src/Storages/NATS/NATSSettings.cpp $ROOT_PATH/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp - $ROOT_PATH/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp + $ROOT_PATH/src/Storages/ObjectStorage/StorageObjectStorageSettings.h $ROOT_PATH/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.cpp $ROOT_PATH/src/Storages/RabbitMQ/RabbitMQSettings.cpp $ROOT_PATH/src/Storages/RocksDB/RocksDBSettings.cpp diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 4eb3aeaa2d1d..493e6b5e1377 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -380,6 +380,8 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con const auto configuration = getConfiguration(storage_type); auto storage_settings = std::make_shared(); + storage_settings->loadFromSettingsChanges(settings.allChanged()); + if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties(); table_specific_properties.has_value()) { diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index 412e96f865cf..5afbc55fdd64 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include namespace DB @@ -26,10 +27,10 @@ namespace ErrorCodes DECLARE(String, aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \ DECLARE(String, region, "", "Region for Glue catalog", 0) \ DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \ - DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \ #define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ - DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) + DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \ + LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS(M, ALIAS) \ DECLARE_SETTINGS_TRAITS(DatabaseDataLakeSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS) IMPLEMENT_SETTINGS_TRAITS(DatabaseDataLakeSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS) @@ -89,4 +90,12 @@ void DatabaseDataLakeSettings::loadFromQuery(const ASTStorage & storage_def) } } +SettingsChanges DatabaseDataLakeSettings::allChanged() const +{ + SettingsChanges changes; + for (const auto & setting : impl->allChanged()) + changes.emplace_back(setting.getName(), setting.getValue()); + return changes; +} + } diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.h b/src/Databases/DataLake/DatabaseDataLakeSettings.h index bd4d83f0cf0a..e0adbf442269 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.h +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -19,7 +20,11 @@ class SettingsChanges; M(CLASS_NAME, Bool) \ M(CLASS_NAME, DatabaseDataLakeCatalogType) \ -DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseDataLakeSettings, DECLARE_SETTING_TRAIT) +#define LIST_OF_DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \ + DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \ + STORAGE_OBJECT_STORAGE_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) + +LIST_OF_DATABASE_ICEBERG_SETTINGS_SUPPORTED_TYPES(DatabaseDataLakeSettings, DECLARE_SETTING_TRAIT) struct DatabaseDataLakeSettings { @@ -34,6 +39,8 @@ struct DatabaseDataLakeSettings void applyChanges(const SettingsChanges & changes); + SettingsChanges allChanged() const; + private: std::unique_ptr impl; }; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp index 469bbaa4e456..512a8d53552b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSettings.cpp @@ -11,37 +11,6 @@ namespace DB { -// clang-format off - -#define STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(DECLARE, ALIAS) \ - DECLARE(Bool, allow_dynamic_metadata_for_data_lakes, false, R"( -If enabled, indicates that metadata is taken from iceberg specification that is pulled from cloud before each query. -)", 0) \ - DECLARE(Bool, allow_experimental_delta_kernel_rs, false, R"( -If enabled, the engine would use delta-kernel-rs for DeltaLake metadata parsing -)", 0) \ - DECLARE(Bool, delta_lake_read_schema_same_as_table_schema, false, R"( -Whether delta-lake read schema is the same as table schema. -)", 0) \ - DECLARE(String, iceberg_metadata_file_path, "", R"( -Explicit path to desired Iceberg metadata file, should be relative to path in object storage. Make sense for table function use case only. -)", 0) \ - DECLARE(String, object_storage_cluster, "", R"( -Cluster for distributed requests -)", 0) \ - DECLARE(String, iceberg_metadata_table_uuid, "", R"( -Explicit table UUID to read metadata for. Ignored if iceberg_metadata_file_path is set. -)", 0) \ - DECLARE(Bool, iceberg_recent_metadata_file_by_last_updated_ms_field, false, R"( -If enabled, the engine would use the metadata file with the most recent last_updated_ms json field. Does not make sense to use with iceberg_metadata_file_path. -)", 0) - -// clang-format on - -#define LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS(M, ALIAS) \ - STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(M, ALIAS) \ - LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) - DECLARE_SETTINGS_TRAITS(StorageObjectStorageSettingsTraits, LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS) IMPLEMENT_SETTINGS_TRAITS(StorageObjectStorageSettingsTraits, LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS) @@ -93,4 +62,14 @@ bool StorageObjectStorageSettings::hasBuiltin(std::string_view name) { return StorageObjectStorageSettingsImpl::hasBuiltin(name); } + +void StorageObjectStorageSettings::loadFromSettingsChanges(const SettingsChanges & changes) +{ + for (const auto & [name, value] : changes) + { + if (impl->has(name)) + impl->set(name, value); + } +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSettings.h b/src/Storages/ObjectStorage/StorageObjectStorageSettings.h index d3f72aa2f8a3..864cb6b45f53 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSettings.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSettings.h @@ -43,6 +43,36 @@ class SettingsChanges; M(CLASS_NAME, UInt64Auto) \ M(CLASS_NAME, URI) +// clang-format off + +#define STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(DECLARE, ALIAS) \ + DECLARE(Bool, allow_dynamic_metadata_for_data_lakes, false, R"( +If enabled, indicates that metadata is taken from iceberg specification that is pulled from cloud before each query. +)", 0) \ + DECLARE(Bool, allow_experimental_delta_kernel_rs, false, R"( +If enabled, the engine would use delta-kernel-rs for DeltaLake metadata parsing +)", 0) \ + DECLARE(Bool, delta_lake_read_schema_same_as_table_schema, false, R"( +Whether delta-lake read schema is the same as table schema. +)", 0) \ + DECLARE(String, iceberg_metadata_file_path, "", R"( +Explicit path to desired Iceberg metadata file, should be relative to path in object storage. Make sense for table function use case only. +)", 0) \ + DECLARE(String, object_storage_cluster, "", R"( +Cluster for distributed requests +)", 0) \ + DECLARE(String, iceberg_metadata_table_uuid, "", R"( +Explicit table UUID to read metadata for. Ignored if iceberg_metadata_file_path is set. +)", 0) \ + DECLARE(Bool, iceberg_recent_metadata_file_by_last_updated_ms_field, false, R"( +If enabled, the engine would use the metadata file with the most recent last_updated_ms json field. Does not make sense to use with iceberg_metadata_file_path. +)", 0) \ + DECLARE(Bool, iceberg_use_version_hint, false, R"( +Get latest metadata path from version-hint.text file. +)", 0) \ + +// clang-format on + STORAGE_OBJECT_STORAGE_SETTINGS_SUPPORTED_TYPES(StorageObjectStorageSettings, DECLARE_SETTING_TRAIT) struct StorageObjectStorageSettings @@ -56,6 +86,8 @@ struct StorageObjectStorageSettings void loadFromQuery(ASTSetQuery & settings_ast); + void loadFromSettingsChanges(const SettingsChanges & changes); + Field get(const std::string & name); static bool hasBuiltin(std::string_view name); @@ -66,4 +98,8 @@ struct StorageObjectStorageSettings using StorageObjectStorageSettingsPtr = std::shared_ptr; +#define LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS(M, ALIAS) \ + STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(M, ALIAS) \ + LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) + } From 1562ac7b03065fcc90cb4422f31dce63a66373fb Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Wed, 28 May 2025 13:22:04 +0000 Subject: [PATCH 02/14] Merge pull request #80745 from ClickHouse/fix-settings-ambiguity Decouple StorageObjectStorageSettings and DataLakeStorageSettings --- .../scripts/check_style/check-settings-style | 1 + src/Databases/DataLake/DatabaseDataLake.cpp | 38 +++---- src/Databases/DataLake/DatabaseDataLake.h | 6 +- .../DataLake/DatabaseDataLakeSettings.cpp | 5 +- .../DataLakes/DataLakeConfiguration.h | 34 +++--- .../DataLakes/DataLakeStorageSettings.cpp | 74 +++++++++++++ .../DataLakes/DataLakeStorageSettings.h | 100 ++++++++++++++++++ .../DataLakes/DeltaLake/TableSnapshot.cpp | 2 - .../DataLakes/DeltaLake/TableSnapshot.h | 1 - .../DataLakes/DeltaLakeMetadata.cpp | 7 ++ .../DataLakes/DeltaLakeMetadata.h | 5 - .../DeltaLakeMetadataDeltaKernel.cpp | 4 +- .../DataLakes/DeltaLakeMetadataDeltaKernel.h | 17 +-- .../DataLakes/Iceberg/IcebergMetadata.cpp | 21 ++-- .../ObjectStorage/StorageObjectStorage.cpp | 9 +- .../ObjectStorage/StorageObjectStorage.h | 12 ++- .../StorageObjectStorageSettings.h | 23 +--- .../registerStorageObjectStorage.cpp | 45 +++++--- .../registerQueueStorage.cpp | 2 +- .../TableFunctionObjectStorage.cpp | 54 ++++++---- .../TableFunctionObjectStorage.h | 30 ++++-- .../TableFunctionObjectStorageCluster.cpp | 4 +- .../TableFunctionObjectStorageCluster.h | 16 +-- 23 files changed, 351 insertions(+), 159 deletions(-) create mode 100644 src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.cpp create mode 100644 src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h diff --git a/ci/jobs/scripts/check_style/check-settings-style b/ci/jobs/scripts/check_style/check-settings-style index ad0d7616395c..cb24b45cef8a 100755 --- a/ci/jobs/scripts/check_style/check-settings-style +++ b/ci/jobs/scripts/check_style/check-settings-style @@ -38,6 +38,7 @@ ALL_DECLARATION_FILES=" $ROOT_PATH/src/Storages/NATS/NATSSettings.cpp $ROOT_PATH/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp $ROOT_PATH/src/Storages/ObjectStorage/StorageObjectStorageSettings.h + $ROOT_PATH/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h $ROOT_PATH/src/Storages/PostgreSQL/MaterializedPostgreSQLSettings.cpp $ROOT_PATH/src/Storages/RabbitMQ/RabbitMQSettings.cpp $ROOT_PATH/src/Storages/RocksDB/RocksDBSettings.cpp diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 493e6b5e1377..580dbdcb3068 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -55,9 +55,9 @@ namespace Setting extern const SettingsBool allow_experimental_database_glue_catalog; extern const SettingsBool use_hive_partitioning; } -namespace StorageObjectStorageSetting +namespace DataLakeStorageSetting { - extern const StorageObjectStorageSettingsString iceberg_metadata_file_path; + extern const DataLakeStorageSettingsString iceberg_metadata_file_path; } namespace ErrorCodes @@ -161,7 +161,9 @@ std::shared_ptr DatabaseDataLake::getCatalog() const return catalog_impl; } -std::shared_ptr DatabaseDataLake::getConfiguration(DatabaseDataLakeStorageType type) const +std::shared_ptr DatabaseDataLake::getConfiguration( + DatabaseDataLakeStorageType type, + DataLakeStorageSettingsPtr storage_settings) const { /// TODO: add tests for azure, local storage types. @@ -175,24 +177,24 @@ std::shared_ptr DatabaseDataLake::getConfig #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(); + return std::make_shared(storage_settings); } #endif #if USE_AZURE_BLOB_STORAGE case DB::DatabaseDataLakeStorageType::Azure: { - return std::make_shared(); + return std::make_shared(storage_settings); } #endif #if USE_HDFS case DB::DatabaseDataLakeStorageType::HDFS: { - return std::make_shared(); + return std::make_shared(storage_settings); } #endif case DB::DatabaseDataLakeStorageType::Local: { - return std::make_shared(); + return std::make_shared(storage_settings); } /// Fake storage in case when catalog store not only /// primary-type tables (DeltaLake or Iceberg), but for @@ -204,7 +206,7 @@ std::shared_ptr DatabaseDataLake::getConfig /// dependencies and the most lightweight case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(); + return std::make_shared(storage_settings); } #if !USE_AWS_S3 || !USE_AZURE_BLOB_STORAGE || !USE_HDFS default: @@ -221,12 +223,12 @@ std::shared_ptr DatabaseDataLake::getConfig #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(); + return std::make_shared(storage_settings); } #endif case DB::DatabaseDataLakeStorageType::Local: { - return std::make_shared(); + return std::make_shared(storage_settings); } /// Fake storage in case when catalog store not only /// primary-type tables (DeltaLake or Iceberg), but for @@ -238,7 +240,7 @@ std::shared_ptr DatabaseDataLake::getConfig /// dependencies and the most lightweight case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(); + return std::make_shared(storage_settings); } default: throw Exception(ErrorCodes::BAD_ARGUMENTS, @@ -253,12 +255,12 @@ std::shared_ptr DatabaseDataLake::getConfig #if USE_AWS_S3 case DB::DatabaseDataLakeStorageType::S3: { - return std::make_shared(); + return std::make_shared(storage_settings); } #endif case DB::DatabaseDataLakeStorageType::Other: { - return std::make_shared(); + return std::make_shared(storage_settings); } default: throw Exception(ErrorCodes::BAD_ARGUMENTS, @@ -377,9 +379,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con storage_type = table_metadata.getStorageType(); } - const auto configuration = getConfiguration(storage_type); - - auto storage_settings = std::make_shared(); + auto storage_settings = std::make_shared(); storage_settings->loadFromSettingsChanges(settings.allChanged()); if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties(); @@ -396,9 +396,11 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con } } - (*storage_settings)[DB::StorageObjectStorageSetting::iceberg_metadata_file_path] = metadata_location; + (*storage_settings)[DB::DataLakeStorageSetting::iceberg_metadata_file_path] = metadata_location; } + const auto configuration = getConfiguration(storage_type, storage_settings); + /// HACK: Hacky-hack to enable lazy load ContextMutablePtr context_copy = Context::createCopy(context_); Settings settings_copy = context_copy->getSettingsCopy(); @@ -407,7 +409,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con /// with_table_structure = false: because there will be /// no table structure in table definition AST. - configuration->initialize(args, context_copy, /* with_table_structure */false, storage_settings); + configuration->initialize(args, context_copy, /* with_table_structure */false); auto cluster_name = settings[DatabaseDataLakeSetting::object_storage_cluster].value; diff --git a/src/Databases/DataLake/DatabaseDataLake.h b/src/Databases/DataLake/DatabaseDataLake.h index 3b3123dfb6f9..ee6ed0903168 100644 --- a/src/Databases/DataLake/DatabaseDataLake.h +++ b/src/Databases/DataLake/DatabaseDataLake.h @@ -68,7 +68,11 @@ class DatabaseDataLake final : public IDatabase, WithContext void validateSettings(); std::shared_ptr getCatalog() const; - std::shared_ptr getConfiguration(DatabaseDataLakeStorageType type) const; + + std::shared_ptr getConfiguration( + DatabaseDataLakeStorageType type, + DataLakeStorageSettingsPtr storage_settings) const; + std::string getStorageEndpointForTable(const DataLake::TableMetadata & table_metadata) const; diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index 5afbc55fdd64..345e8d575a1e 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include namespace DB @@ -27,10 +27,11 @@ namespace ErrorCodes DECLARE(String, aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \ DECLARE(String, region, "", "Region for Glue catalog", 0) \ DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \ + DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \ #define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \ - LIST_OF_STORAGE_OBJECT_STORAGE_SETTINGS(M, ALIAS) \ + LIST_OF_DATA_LAKE_STORAGE_SETTINGS(M, ALIAS) \ DECLARE_SETTINGS_TRAITS(DatabaseDataLakeSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS) IMPLEMENT_SETTINGS_TRAITS(DatabaseDataLakeSettingsTraits, LIST_OF_DATABASE_ICEBERG_SETTINGS) diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 0dedb5d388f5..00b617492f82 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -7,11 +7,11 @@ #include #include #include +#include #include #include #include #include -#include #include #include #include @@ -33,13 +33,13 @@ namespace DB namespace ErrorCodes { -extern const int FORMAT_VERSION_TOO_OLD; -extern const int LOGICAL_ERROR; + extern const int FORMAT_VERSION_TOO_OLD; + extern const int LOGICAL_ERROR; } -namespace StorageObjectStorageSetting +namespace DataLakeStorageSetting { -extern const StorageObjectStorageSettingsBool allow_dynamic_metadata_for_data_lakes; + extern DataLakeStorageSettingsBool allow_dynamic_metadata_for_data_lakes; } @@ -52,8 +52,12 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl public: using Configuration = StorageObjectStorage::Configuration; + explicit DataLakeConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {} + bool isDataLakeConfiguration() const override { return true; } + const DataLakeStorageSettings & getDataLakeSettings() const override { return *settings; } + std::string getEngineName() const override { return DataLakeMetadata::name + BaseStorageConfiguration::getEngineName(); } void update(ObjectStoragePtr object_storage, ContextPtr local_context) override @@ -119,7 +123,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl bool hasExternalDynamicMetadata() override { - return BaseStorageConfiguration::getSettingsRef()[StorageObjectStorageSetting::allow_dynamic_metadata_for_data_lakes] + return (*settings)[DataLakeStorageSetting::allow_dynamic_metadata_for_data_lakes] && current_metadata && current_metadata->supportsExternalMetadataChange(); } @@ -162,6 +166,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl private: DataLakeMetadataPtr current_metadata; LoggerPtr log = getLogger("DataLakeConfiguration"); + const DataLakeStorageSettingsPtr settings; ReadFromFormatInfo prepareReadingFromFormat( ObjectStoragePtr object_storage, @@ -284,6 +289,9 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, friend class StorageObjectStorage::Configuration; public: + StorageIcebergConfiguration() = default; + explicit StorageIcebergConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {} + ObjectStorageType getType() const override { return getImpl().getType(); } std::string getTypeName() const override { return getImpl().getTypeName(); } @@ -373,11 +381,10 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, void initialize( ASTs & engine_args, ContextPtr local_context, - bool with_table_structure, - std::shared_ptr settings) override + bool with_table_structure) override { createDynamicConfiguration(engine_args, local_context); - getImpl().initialize(engine_args, local_context, with_table_structure, settings); + getImpl().initialize(engine_args, local_context, with_table_structure); } ASTPtr createArgsWithAccessData() const override @@ -506,21 +513,21 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, { # if USE_AWS_S3 case ObjectStorageType::S3: - impl = std::make_unique(); + impl = std::make_unique(settings); break; # endif # if USE_AZURE_BLOB_STORAGE case ObjectStorageType::Azure: - impl = std::make_unique(); + impl = std::make_unique(settings); break; # endif # if USE_HDFS case ObjectStorageType::HDFS: - impl = std::make_unique(); + impl = std::make_unique(settings); break; # endif case ObjectStorageType::Local: - impl = std::make_unique(); + impl = std::make_unique(settings); break; default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsuported DataLake storage {}", type); @@ -528,6 +535,7 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, } std::shared_ptr impl; + DataLakeStorageSettingsPtr settings; }; #endif diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.cpp b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.cpp new file mode 100644 index 000000000000..07824c4ab9d4 --- /dev/null +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.cpp @@ -0,0 +1,74 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +DECLARE_SETTINGS_TRAITS(DataLakeStorageSettingsTraits, LIST_OF_DATA_LAKE_STORAGE_SETTINGS) +IMPLEMENT_SETTINGS_TRAITS(DataLakeStorageSettingsTraits, LIST_OF_DATA_LAKE_STORAGE_SETTINGS) + +struct DataLakeStorageSettingsImpl : public BaseSettings +{ +}; + +#define INITIALIZE_SETTING_EXTERN(TYPE, NAME, DEFAULT, DESCRIPTION, FLAGS, ...) \ + DataLakeStorageSettings##TYPE NAME = &DataLakeStorageSettingsImpl ::NAME; + +namespace DataLakeStorageSetting +{ +LIST_OF_DATA_LAKE_STORAGE_SETTINGS(INITIALIZE_SETTING_EXTERN, INITIALIZE_SETTING_EXTERN) +} + +#undef INITIALIZE_SETTING_EXTERN + +DataLakeStorageSettings::DataLakeStorageSettings() : impl(std::make_unique()) +{ +} + +DataLakeStorageSettings::DataLakeStorageSettings(const DataLakeStorageSettings & settings) + : impl(std::make_unique(*settings.impl)) +{ +} + +DataLakeStorageSettings::DataLakeStorageSettings(DataLakeStorageSettings && settings) noexcept + : impl(std::make_unique(std::move(*settings.impl))) +{ +} + + +DataLakeStorageSettings::~DataLakeStorageSettings() = default; + +STORAGE_DATA_LAKE_STORAGE_SETTINGS_SUPPORTED_TYPES(DataLakeStorageSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPERATOR) + + +void DataLakeStorageSettings::loadFromQuery(ASTSetQuery & settings_ast) +{ + impl->applyChanges(settings_ast.changes); +} + +Field DataLakeStorageSettings::get(const std::string & name) +{ + return impl->get(name); +} + +bool DataLakeStorageSettings::hasBuiltin(std::string_view name) +{ + return DataLakeStorageSettingsImpl::hasBuiltin(name); +} + +void DataLakeStorageSettings::loadFromSettingsChanges(const SettingsChanges & changes) +{ + for (const auto & [name, value] : changes) + { + if (impl->has(name)) + impl->set(name, value); + } +} + +} diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h new file mode 100644 index 000000000000..a87a0a43c935 --- /dev/null +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h @@ -0,0 +1,100 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ +class ASTSetQuery; +struct DataLakeStorageSettingsImpl; +struct MutableColumnsAndConstraints; +class StorageObjectStorage; +class SettingsChanges; + +/// List of available types supported in DataLakeStorageSettingsSettings object +#define STORAGE_DATA_LAKE_STORAGE_SETTINGS_SUPPORTED_TYPES(CLASS_NAME, M) \ + M(CLASS_NAME, ArrowCompression) \ + M(CLASS_NAME, Bool) \ + M(CLASS_NAME, CapnProtoEnumComparingMode) \ + M(CLASS_NAME, Char) \ + M(CLASS_NAME, DateTimeInputFormat) \ + M(CLASS_NAME, DateTimeOutputFormat) \ + M(CLASS_NAME, DateTimeOverflowBehavior) \ + M(CLASS_NAME, Double) \ + M(CLASS_NAME, EscapingRule) \ + M(CLASS_NAME, Float) \ + M(CLASS_NAME, IdentifierQuotingRule) \ + M(CLASS_NAME, IdentifierQuotingStyle) \ + M(CLASS_NAME, Int64) \ + M(CLASS_NAME, IntervalOutputFormat) \ + M(CLASS_NAME, MsgPackUUIDRepresentation) \ + M(CLASS_NAME, ORCCompression) \ + M(CLASS_NAME, ParquetCompression) \ + M(CLASS_NAME, ParquetVersion) \ + M(CLASS_NAME, SchemaInferenceMode) \ + M(CLASS_NAME, String) \ + M(CLASS_NAME, UInt32) \ + M(CLASS_NAME, UInt64) \ + M(CLASS_NAME, NonZeroUInt64) \ + M(CLASS_NAME, UInt64Auto) \ + M(CLASS_NAME, URI) + +// clang-format off + +#define DATA_LAKE_STORAGE_RELATED_SETTINGS(DECLARE, ALIAS) \ + DECLARE(Bool, allow_dynamic_metadata_for_data_lakes, false, R"( +If enabled, indicates that metadata is taken from iceberg specification that is pulled from cloud before each query. +)", 0) \ + DECLARE(String, iceberg_metadata_file_path, "", R"( +Explicit path to desired Iceberg metadata file, should be relative to path in object storage. Make sense for table function use case only. +)", 0) \ + DECLARE(String, iceberg_metadata_table_uuid, "", R"( +Explicit table UUID to read metadata for. Ignored if iceberg_metadata_file_path is set. +)", 0) \ + DECLARE(Bool, iceberg_recent_metadata_file_by_last_updated_ms_field, false, R"( +If enabled, the engine would use the metadata file with the most recent last_updated_ms json field. Does not make sense to use with iceberg_metadata_file_path. +)", 0) \ + DECLARE(Bool, iceberg_use_version_hint, false, R"( +Get latest metadata path from version-hint.text file. +)", 0) \ + +#define OBSOLETE_SETTINGS(M, ALIAS) \ + MAKE_OBSOLETE(M, Bool, allow_experimental_delta_kernel_rs, true) \ + MAKE_OBSOLETE(M, Bool, delta_lake_read_schema_same_as_table_schema, false) + +// clang-format on + +STORAGE_DATA_LAKE_STORAGE_SETTINGS_SUPPORTED_TYPES(DataLakeStorageSettings, DECLARE_SETTING_TRAIT) + +struct DataLakeStorageSettings +{ + DataLakeStorageSettings(); + DataLakeStorageSettings(const DataLakeStorageSettings & settings); + DataLakeStorageSettings(DataLakeStorageSettings && settings) noexcept; + ~DataLakeStorageSettings(); + + STORAGE_DATA_LAKE_STORAGE_SETTINGS_SUPPORTED_TYPES(DataLakeStorageSettings, DECLARE_SETTING_SUBSCRIPT_OPERATOR) + + void loadFromQuery(ASTSetQuery & settings_ast); + + void loadFromSettingsChanges(const SettingsChanges & changes); + + Field get(const std::string & name); + + static bool hasBuiltin(std::string_view name); + +private: + std::unique_ptr impl; +}; + +using DataLakeStorageSettingsPtr = std::shared_ptr; + +#define LIST_OF_DATA_LAKE_STORAGE_SETTINGS(M, ALIAS) \ + DATA_LAKE_STORAGE_RELATED_SETTINGS(M, ALIAS) \ + OBSOLETE_SETTINGS(M, ALIAS) \ + LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) + +} diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp index fb791b83dded..f2eda645cfba 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp @@ -307,11 +307,9 @@ class TableSnapshot::Iterator final : public DB::IObjectIterator TableSnapshot::TableSnapshot( KernelHelperPtr helper_, DB::ObjectStoragePtr object_storage_, - bool read_schema_same_as_table_schema_, LoggerPtr log_) : helper(helper_) , object_storage(object_storage_) - , read_schema_same_as_table_schema(read_schema_same_as_table_schema_) , log(log_) { } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h index 7b83c993be5d..7a9a8b5d94ad 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h @@ -30,7 +30,6 @@ class TableSnapshot explicit TableSnapshot( KernelHelperPtr helper_, DB::ObjectStoragePtr object_storage_, - bool read_schema_same_as_table_schema_, LoggerPtr log_); /// Get snapshot version. diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index 7e98a26ae96c..98d42fbf02e7 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -55,6 +56,12 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } +namespace Setting +{ + extern const SettingsBool allow_experimental_delta_kernel_rs; +} + + namespace { diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index 580a9645361d..063341a70827 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -15,11 +15,6 @@ namespace DB { -namespace StorageObjectStorageSetting -{ -extern const StorageObjectStorageSettingsBool allow_experimental_delta_kernel_rs; -extern const StorageObjectStorageSettingsBool delta_lake_read_schema_same_as_table_schema; -} struct DeltaLakePartitionColumn { diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp index 1fa3f6636729..ef043f9725be 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp @@ -9,14 +9,12 @@ namespace DB DeltaLakeMetadataDeltaKernel::DeltaLakeMetadataDeltaKernel( ObjectStoragePtr object_storage, - ConfigurationObserverPtr configuration_, - bool read_schema_same_as_table_schema_) + ConfigurationObserverPtr configuration_) : log(getLogger("DeltaLakeMetadata")) , table_snapshot( std::make_shared( getKernelHelper(configuration_.lock(), object_storage), object_storage, - read_schema_same_as_table_schema_, log)) { } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h index 88a733b75434..8e817974215c 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h @@ -7,7 +7,6 @@ #include #include #include -#include #include #include #include @@ -19,11 +18,6 @@ class TableSnapshot; namespace DB { -namespace StorageObjectStorageSetting -{ -extern const StorageObjectStorageSettingsBool allow_experimental_delta_kernel_rs; -extern const StorageObjectStorageSettingsBool delta_lake_read_schema_same_as_table_schema; -} class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata { @@ -33,8 +27,7 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata DeltaLakeMetadataDeltaKernel( ObjectStoragePtr object_storage_, - ConfigurationObserverPtr configuration_, - bool read_schema_same_as_table_schema_); + ConfigurationObserverPtr configuration_); bool supportsUpdate() const override { return true; } @@ -51,14 +44,10 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata static DataLakeMetadataPtr create( ObjectStoragePtr object_storage, ConfigurationObserverPtr configuration, - ContextPtr, bool) + ContextPtr /* context */) { auto configuration_ptr = configuration.lock(); - const auto & settings_ref = configuration_ptr->getSettingsRef(); - return std::make_unique( - object_storage, - configuration, - settings_ref[StorageObjectStorageSetting::delta_lake_read_schema_same_as_table_schema]); + return std::make_unique(object_storage, configuration); } ObjectIterator iterate( diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index bcf3aef8b26c..ecd76413303f 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -11,7 +11,7 @@ #include #include -#include +#include #include #include @@ -32,11 +32,11 @@ namespace ProfileEvents namespace DB { -namespace StorageObjectStorageSetting +namespace DataLakeStorageSetting { - extern const StorageObjectStorageSettingsString iceberg_metadata_file_path; - extern const StorageObjectStorageSettingsString iceberg_metadata_table_uuid; - extern const StorageObjectStorageSettingsBool iceberg_recent_metadata_file_by_last_updated_ms_field; + extern const DataLakeStorageSettingsString iceberg_metadata_file_path; + extern const DataLakeStorageSettingsString iceberg_metadata_table_uuid; + extern const DataLakeStorageSettingsBool iceberg_recent_metadata_file_by_last_updated_ms_field; } namespace ErrorCodes @@ -309,7 +309,7 @@ static std::pair getLatestMetadataFileAndVersion( { auto log = getLogger("IcebergMetadataFileResolver"); MostRecentMetadataFileSelectionWay selection_way - = configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_recent_metadata_file_by_last_updated_ms_field].value + = configuration.getDataLakeSettings()[DataLakeStorageSetting::iceberg_recent_metadata_file_by_last_updated_ms_field].value ? MostRecentMetadataFileSelectionWay::BY_LAST_UPDATED_MS_FIELD : MostRecentMetadataFileSelectionWay::BY_METADATA_FILE_VERSION; bool need_all_metadata_files_parsing @@ -386,9 +386,10 @@ static std::pair getLatestOrExplicitMetadataFileAndVersion( const ContextPtr & local_context, Poco::Logger * log) { - if (configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_file_path].changed) + const auto & data_lake_settings = configuration.getDataLakeSettings(); + if (data_lake_settings[DataLakeStorageSetting::iceberg_metadata_file_path].changed) { - auto explicit_metadata_path = configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_file_path].value; + auto explicit_metadata_path = data_lake_settings[DataLakeStorageSetting::iceberg_metadata_file_path].value; try { LOG_TEST(log, "Explicit metadata file path is specified {}, will read from this metadata file", explicit_metadata_path); @@ -409,9 +410,9 @@ static std::pair getLatestOrExplicitMetadataFileAndVersion( throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid path {} specified for iceberg_metadata_file_path: '{}'", explicit_metadata_path, ex.what()); } } - else if (configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_table_uuid].changed) + else if (data_lake_settings[DataLakeStorageSetting::iceberg_metadata_table_uuid].changed) { - std::optional table_uuid = configuration.getSettingsRef()[StorageObjectStorageSetting::iceberg_metadata_table_uuid].value; + std::optional table_uuid = data_lake_settings[DataLakeStorageSetting::iceberg_metadata_table_uuid].value; return getLatestMetadataFileAndVersion(object_storage, configuration, local_context, table_uuid); } else diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 7639ff35228e..08c371b0c39d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -607,8 +607,7 @@ SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, c void StorageObjectStorage::Configuration::initialize( ASTs & engine_args, ContextPtr local_context, - bool with_table_structure, - StorageObjectStorageSettingsPtr settings) + bool with_table_structure) { if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, local_context)) fromNamedCollection(*named_collection, local_context); @@ -632,15 +631,9 @@ void StorageObjectStorage::Configuration::initialize( else FormatFactory::instance().checkFormatName(format); - storage_settings = settings; initialized = true; } -const StorageObjectStorageSettings & StorageObjectStorage::Configuration::getSettingsRef() const -{ - return *storage_settings; -} - void StorageObjectStorage::Configuration::check(ContextPtr) const { FormatFactory::instance().checkFormatName(format); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 5b4e10b9a872..7da189454a78 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -9,8 +9,10 @@ #include #include #include +#include #include + namespace DB { @@ -176,8 +178,7 @@ class StorageObjectStorage::Configuration virtual void initialize( ASTs & engine_args, ContextPtr local_context, - bool with_table_structure, - StorageObjectStorageSettingsPtr settings); + bool with_table_structure); /// Storage type: s3, hdfs, azure, local. virtual ObjectStorageType getType() const = 0; @@ -260,7 +261,10 @@ class StorageObjectStorage::Configuration virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); virtual void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context); - const StorageObjectStorageSettings & getSettingsRef() const; + virtual const DataLakeStorageSettings & getDataLakeSettings() const + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getDataLakeSettings() is not implemented for configuration type {}", getTypeName()); + } /// Create arguments for table function with path and access parameters virtual ASTPtr createArgsWithAccessData() const @@ -291,8 +295,6 @@ class StorageObjectStorage::Configuration bool initialized = false; std::atomic updated = false; - - StorageObjectStorageSettingsPtr storage_settings; }; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSettings.h b/src/Storages/ObjectStorage/StorageObjectStorageSettings.h index 864cb6b45f53..63d582f66920 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSettings.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSettings.h @@ -46,30 +46,9 @@ class SettingsChanges; // clang-format off #define STORAGE_OBJECT_STORAGE_RELATED_SETTINGS(DECLARE, ALIAS) \ - DECLARE(Bool, allow_dynamic_metadata_for_data_lakes, false, R"( -If enabled, indicates that metadata is taken from iceberg specification that is pulled from cloud before each query. -)", 0) \ - DECLARE(Bool, allow_experimental_delta_kernel_rs, false, R"( -If enabled, the engine would use delta-kernel-rs for DeltaLake metadata parsing -)", 0) \ - DECLARE(Bool, delta_lake_read_schema_same_as_table_schema, false, R"( -Whether delta-lake read schema is the same as table schema. -)", 0) \ - DECLARE(String, iceberg_metadata_file_path, "", R"( -Explicit path to desired Iceberg metadata file, should be relative to path in object storage. Make sense for table function use case only. -)", 0) \ DECLARE(String, object_storage_cluster, "", R"( Cluster for distributed requests -)", 0) \ - DECLARE(String, iceberg_metadata_table_uuid, "", R"( -Explicit table UUID to read metadata for. Ignored if iceberg_metadata_file_path is set. -)", 0) \ - DECLARE(Bool, iceberg_recent_metadata_file_by_last_updated_ms_field, false, R"( -If enabled, the engine would use the metadata file with the most recent last_updated_ms json field. Does not make sense to use with iceberg_metadata_file_path. -)", 0) \ - DECLARE(Bool, iceberg_use_version_hint, false, R"( -Get latest metadata path from version-hint.text file. -)", 0) \ +)", 0) // clang-format on diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index 16b8697b1d21..30ab6b751241 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -47,7 +47,7 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject auto cluster_name = (*storage_settings)[StorageObjectStorageSetting::object_storage_cluster].value; - configuration->initialize(args.engine_args, context, false, storage_settings); + configuration->initialize(args.engine_args, context, false); // Use format settings from global server context + settings from // the SETTINGS clause of the create query. Settings from current @@ -178,20 +178,29 @@ void registerStorageObjectStorage(StorageFactory & factory) #if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format. +static DataLakeStorageSettingsPtr getDataLakeStorageSettings(const ASTStorage & storage_def) +{ + auto storage_settings = std::make_shared(); + if (storage_def.settings) + storage_settings->loadFromQuery(*storage_def.settings); + return storage_settings; +} + void registerStorageIceberg(StorageFactory & factory) { factory.registerStorage( "Iceberg", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::NONE, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # if USE_AWS_S3 @@ -199,14 +208,15 @@ void registerStorageIceberg(StorageFactory & factory) "IcebergS3", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::S3, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # endif # if USE_AZURE_BLOB_STORAGE @@ -214,14 +224,15 @@ void registerStorageIceberg(StorageFactory & factory) "IcebergAzure", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::AZURE, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # endif # if USE_HDFS @@ -229,28 +240,30 @@ void registerStorageIceberg(StorageFactory & factory) "IcebergHDFS", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::HDFS, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # endif factory.registerStorage( "IcebergLocal", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::FILE, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); } @@ -265,14 +278,15 @@ void registerStorageDeltaLake(StorageFactory & factory) "DeltaLake", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = true, .supports_schema_inference = true, .source_access_type = AccessType::S3, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); # endif UNUSED(factory); @@ -286,14 +300,15 @@ void registerStorageHudi(StorageFactory & factory) "Hudi", [&](const StorageFactory::Arguments & args) { - auto configuration = std::make_shared(); + const auto storage_settings = getDataLakeStorageSettings(*args.storage_def); + auto configuration = std::make_shared(storage_settings); return createStorageObjectStorage(args, configuration); }, { .supports_settings = false, .supports_schema_inference = true, .source_access_type = AccessType::S3, - .has_builtin_setting_fn = StorageObjectStorageSettings::hasBuiltin, + .has_builtin_setting_fn = DataLakeStorageSettings::hasBuiltin, }); #endif UNUSED(factory); diff --git a/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp b/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp index 63d7f7a286a1..63666faf1a98 100644 --- a/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp +++ b/src/Storages/ObjectStorageQueue/registerQueueStorage.cpp @@ -33,7 +33,7 @@ StoragePtr createQueueStorage(const StorageFactory::Arguments & args) throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); auto configuration = std::make_shared(); - configuration->initialize(args.engine_args, args.getContext(), false, nullptr); + configuration->initialize(args.engine_args, args.getContext(), false); // Use format settings from global server context + settings from // the SETTINGS clause of the create query. Settings from current diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 57b6cda5039d..a57e2bfd5ce9 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace DB @@ -42,24 +43,29 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } -template -ObjectStoragePtr TableFunctionObjectStorage::getObjectStorage(const ContextPtr & context, bool create_readonly) const +template +ObjectStoragePtr TableFunctionObjectStorage::getObjectStorage(const ContextPtr & context, bool create_readonly) const { if (!object_storage) object_storage = configuration->createObjectStorage(context, create_readonly); return object_storage; } -template -StorageObjectStorage::ConfigurationPtr TableFunctionObjectStorage::getConfiguration() const +template +StorageObjectStorage::ConfigurationPtr TableFunctionObjectStorage::getConfiguration() const { if (!configuration) - configuration = std::make_shared(); + { + if constexpr (is_data_lake) + configuration = std::make_shared(settings); + else + configuration = std::make_shared(); + } return configuration; } -template -std::vector TableFunctionObjectStorage::skipAnalysisForArguments( +template +std::vector TableFunctionObjectStorage::skipAnalysisForArguments( const QueryTreeNodePtr & query_node_table_function, ContextPtr) const { auto & table_function_node = query_node_table_function->as(); @@ -76,8 +82,18 @@ std::vector TableFunctionObjectStorage::skipA return result; } -template -void TableFunctionObjectStorage::parseArguments(const ASTPtr & ast_function, ContextPtr context) +template +std::shared_ptr::Settings> +TableFunctionObjectStorage::createEmptySettings() +{ + if constexpr (is_data_lake) + return std::make_shared(); + else + return std::make_shared(); +} + +template +void TableFunctionObjectStorage::parseArguments(const ASTPtr & ast_function, ContextPtr context) { /// Clone ast function, because we can modify its arguments like removing headers. auto ast_copy = ast_function->clone(); @@ -85,7 +101,7 @@ void TableFunctionObjectStorage::parseArguments(const if (args_func.size() != 1) throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Table function '{}' must have arguments.", getName()); - settings = std::make_shared(); + settings = createEmptySettings(); auto & args = args_func.at(0)->children; /// Support storage settings in table function, @@ -105,9 +121,9 @@ void TableFunctionObjectStorage::parseArguments(const parseArgumentsImpl(args, context); } -template +template ColumnsDescription TableFunctionObjectStorage< - Definition, Configuration>::getActualTableStructure(ContextPtr context, bool is_insert_query) const + Definition, Configuration, is_data_lake>::getActualTableStructure(ContextPtr context, bool is_insert_query) const { if (configuration->getStructure() == "auto") { @@ -121,8 +137,8 @@ ColumnsDescription TableFunctionObjectStorage< return parseColumnsListFromString(configuration->getStructure(), context); } -template -StoragePtr TableFunctionObjectStorage::executeImpl( +template +StoragePtr TableFunctionObjectStorage::executeImpl( const ASTPtr & /* ast_function */, ContextPtr context, const std::string & table_name, @@ -249,23 +265,23 @@ template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AVRO && USE_AZURE_BLOB_STORAGE -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AVRO && USE_HDFS -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_PARQUET && USE_AWS_S3 && USE_DELTA_KERNEL_RS -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AWS_S3 -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AVRO diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 9493baf18571..7dcad1383fd3 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -107,11 +108,15 @@ struct HudiDefinition static constexpr auto storage_type_name = "S3"; }; -template +template class TableFunctionObjectStorage : public ITableFunction { public: static constexpr auto name = Definition::name; + using Settings = typename std::conditional_t< + is_data_lake, + DataLakeStorageSettings, + StorageObjectStorageSettings>; String getName() const override { return name; } @@ -134,7 +139,7 @@ class TableFunctionObjectStorage : public ITableFunction virtual void parseArgumentsImpl(ASTs & args, const ContextPtr & context) { - getConfiguration()->initialize(args, context, true, settings); + getConfiguration()->initialize(args, context, true); } static void updateStructureAndFormatArgumentsIfNeeded( @@ -143,7 +148,10 @@ class TableFunctionObjectStorage : public ITableFunction const String & format, const ContextPtr & context) { - Configuration().addStructureAndFormatToArgsIfNeeded(args, structure, format, context, /*with_structure=*/true); + if constexpr (is_data_lake) + Configuration(createEmptySettings()).addStructureAndFormatToArgsIfNeeded(args, structure, format, context, /*with_structure=*/true); + else + Configuration().addStructureAndFormatToArgsIfNeeded(args, structure, format, context, /*with_structure=*/true); } protected: @@ -164,10 +172,12 @@ class TableFunctionObjectStorage : public ITableFunction ObjectStoragePtr getObjectStorage(const ContextPtr & context, bool create_readonly) const; ConfigurationPtr getConfiguration() const; + static std::shared_ptr createEmptySettings(); + mutable ConfigurationPtr configuration; mutable ObjectStoragePtr object_storage; ColumnsDescription structure_hint; - std::shared_ptr settings; + std::shared_ptr settings; std::vector skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override; }; @@ -191,20 +201,20 @@ using TableFunctionLocal = TableFunctionObjectStorage; # if USE_AWS_S3 -using TableFunctionIcebergS3 = TableFunctionObjectStorage; +using TableFunctionIcebergS3 = TableFunctionObjectStorage; # endif # if USE_AZURE_BLOB_STORAGE -using TableFunctionIcebergAzure = TableFunctionObjectStorage; +using TableFunctionIcebergAzure = TableFunctionObjectStorage; # endif # if USE_HDFS -using TableFunctionIcebergHDFS = TableFunctionObjectStorage; +using TableFunctionIcebergHDFS = TableFunctionObjectStorage; # endif -using TableFunctionIcebergLocal = TableFunctionObjectStorage; +using TableFunctionIcebergLocal = TableFunctionObjectStorage; #endif #if USE_AWS_S3 # if USE_PARQUET && USE_DELTA_KERNEL_RS -using TableFunctionDeltaLake = TableFunctionObjectStorage; +using TableFunctionDeltaLake = TableFunctionObjectStorage; # endif -using TableFunctionHudi = TableFunctionObjectStorage; +using TableFunctionHudi = TableFunctionObjectStorage; #endif } diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index c30ce59e330c..0995fef125e7 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -14,8 +14,8 @@ namespace DB { -template -StoragePtr TableFunctionObjectStorageCluster::executeImpl( +template +StoragePtr TableFunctionObjectStorageCluster::executeImpl( const ASTPtr & /*function*/, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const { diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index d03f7198d359..57686d7a1866 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -75,8 +75,8 @@ struct HudiClusterDefinition * On worker node it asks initiator about next task to process, processes it. * This is repeated until the tasks are finished. */ -template -class TableFunctionObjectStorageCluster : public ITableFunctionCluster> +template +class TableFunctionObjectStorageCluster : public ITableFunctionCluster> { public: static constexpr auto name = Definition::name; @@ -84,7 +84,7 @@ class TableFunctionObjectStorageCluster : public ITableFunctionCluster; + using Base = TableFunctionObjectStorage; StoragePtr executeImpl( const ASTPtr & ast_function, @@ -117,23 +117,23 @@ using TableFunctionHDFSCluster = TableFunctionObjectStorageCluster; #if USE_AVRO && USE_AWS_S3 -using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster; +using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster; #endif #if USE_AVRO && USE_AZURE_BLOB_STORAGE -using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster; +using TableFunctionIcebergAzureCluster = TableFunctionObjectStorageCluster; #endif #if USE_AVRO && USE_HDFS -using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster; +using TableFunctionIcebergHDFSCluster = TableFunctionObjectStorageCluster; #endif #if USE_AWS_S3 && USE_PARQUET && USE_DELTA_KERNEL_RS -using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster; +using TableFunctionDeltaLakeCluster = TableFunctionObjectStorageCluster; #endif #if USE_AWS_S3 -using TableFunctionHudiCluster = TableFunctionObjectStorageCluster; +using TableFunctionHudiCluster = TableFunctionObjectStorageCluster; #endif } From 16b3e0ac857cf4572631e061ecf5b9e79ca05e38 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Sun, 8 Jun 2025 13:02:51 +0000 Subject: [PATCH 03/14] Merge pull request #81300 from ClickHouse/fix-segfault-in-datalake-configuration A few fixes for data lake cluster engines --- src/Analyzer/Resolve/IdentifierResolver.cpp | 5 +- src/Interpreters/InterpreterDescribeQuery.cpp | 14 +- src/Interpreters/InterpreterSelectQuery.cpp | 4 +- src/Storages/IStorage.cpp | 5 - src/Storages/IStorage.h | 6 +- .../DataLakes/DataLakeConfiguration.h | 107 ++++++----- .../ObjectStorage/StorageObjectStorage.cpp | 168 +++++++++++------- .../ObjectStorage/StorageObjectStorage.h | 34 +++- .../StorageObjectStorageCluster.cpp | 33 ++++ .../StorageObjectStorageCluster.h | 35 +--- .../StorageObjectStorageSource.cpp | 2 +- src/Storages/ObjectStorage/Utils.cpp | 49 ++++- src/Storages/ObjectStorage/Utils.h | 4 + .../TableFunctionObjectStorage.cpp | 18 +- .../TableFunctionObjectStorage.h | 8 +- .../TableFunctionObjectStorageCluster.cpp | 3 +- tests/integration/test_storage_delta/test.py | 52 ++++-- 17 files changed, 353 insertions(+), 194 deletions(-) diff --git a/src/Analyzer/Resolve/IdentifierResolver.cpp b/src/Analyzer/Resolve/IdentifierResolver.cpp index a756a51a6313..1bed3402fe66 100644 --- a/src/Analyzer/Resolve/IdentifierResolver.cpp +++ b/src/Analyzer/Resolve/IdentifierResolver.cpp @@ -186,10 +186,7 @@ IdentifierResolveResult IdentifierResolver::tryResolveTableIdentifierFromDatabas if (!storage) return {}; - if (storage->hasExternalDynamicMetadata()) - { - storage->updateExternalDynamicMetadata(context); - } + storage->updateExternalDynamicMetadataIfExists(context); if (!storage_lock) storage_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]); diff --git a/src/Interpreters/InterpreterDescribeQuery.cpp b/src/Interpreters/InterpreterDescribeQuery.cpp index f1e299a42493..53ef032ea01c 100644 --- a/src/Interpreters/InterpreterDescribeQuery.cpp +++ b/src/Interpreters/InterpreterDescribeQuery.cpp @@ -177,13 +177,13 @@ void InterpreterDescribeQuery::fillColumnsFromTableFunction(const ASTTableExpres void InterpreterDescribeQuery::fillColumnsFromTable(const ASTTableExpression & table_expression) { - auto table_id = getContext()->resolveStorageID(table_expression.database_and_table_name); - getContext()->checkAccess(AccessType::SHOW_COLUMNS, table_id); - auto table = DatabaseCatalog::instance().getTable(table_id, getContext()); - if (table->hasExternalDynamicMetadata()) - { - table->updateExternalDynamicMetadata(getContext()); - } + auto query_context = getContext(); + auto table_id = query_context->resolveStorageID(table_expression.database_and_table_name); + query_context->checkAccess(AccessType::SHOW_COLUMNS, table_id); + + auto table = DatabaseCatalog::instance().getTable(table_id, query_context); + + table->updateExternalDynamicMetadataIfExists(query_context); auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings[Setting::lock_acquire_timeout]); diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 10011fa10cec..59ade6302a2f 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -568,11 +568,11 @@ InterpreterSelectQuery::InterpreterSelectQuery( if (storage) { - if (storage->hasExternalDynamicMetadata()) + if (storage->updateExternalDynamicMetadataIfExists(context)) { - storage->updateExternalDynamicMetadata(context); metadata_snapshot = storage->getInMemoryMetadataPtr(); } + table_lock = storage->lockForShare(context->getInitialQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]); table_id = storage->getStorageID(); diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index a8dd6549b27a..54113f0464bc 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -102,11 +102,6 @@ std::optional IStorage::tryLockForAlter(const std::ch return lock; } -void IStorage::updateExternalDynamicMetadata(ContextPtr) -{ - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method updateExternalDynamicMetadata is not supported by storage {}", getName()); -} - IStorage::AlterLockHolder IStorage::lockForAlter(const std::chrono::milliseconds & acquire_timeout) { auto lock = tryLockForAlter(acquire_timeout); diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 276281592800..ab49c34ae54d 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -104,9 +104,6 @@ class IStorage : public std::enable_shared_from_this, public TypePromo /// Returns true if the storage is dictionary virtual bool isDictionary() const { return false; } - /// Returns true if the metadata of a table can be changed normally by other processes - virtual bool hasExternalDynamicMetadata() const { return false; } - /// Returns true if the storage supports queries with the SAMPLE section. virtual bool supportsSampling() const { return getInMemoryMetadataPtr()->hasSamplingKey(); } @@ -488,7 +485,8 @@ class IStorage : public std::enable_shared_from_this, public TypePromo virtual void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & alter_lock_holder); /// Updates metadata that can be changed by other processes - virtual void updateExternalDynamicMetadata(ContextPtr); + /// Return true if external metadata exists and was updated. + virtual bool updateExternalDynamicMetadataIfExists(ContextPtr /* context */) { return false; } /** Checks that alter commands can be applied to storage. For example, columns can be modified, * or primary key can be changes, etc. diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 00b617492f82..99f1ee744a61 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -60,32 +60,39 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl std::string getEngineName() const override { return DataLakeMetadata::name + BaseStorageConfiguration::getEngineName(); } - void update(ObjectStoragePtr object_storage, ContextPtr local_context) override + /// Returns true, if metadata is of the latest version, false if unknown. + bool update( + ObjectStoragePtr object_storage, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata) override { - BaseStorageConfiguration::update(object_storage, local_context); + const bool updated_before = current_metadata != nullptr; + if (updated_before && if_not_updated_before) + return false; - bool existed = current_metadata != nullptr; + BaseStorageConfiguration::update( + object_storage, local_context, if_not_updated_before, check_consistent_with_previous_metadata); + + const bool changed = updateMetadataIfChanged(object_storage, local_context); + if (!changed) + return true; - if (updateMetadataObjectIfNeeded(object_storage, local_context)) + if (check_consistent_with_previous_metadata && hasExternalDynamicMetadata() && updated_before) { - if (hasExternalDynamicMetadata() && existed) - { - throw Exception( - ErrorCodes::FORMAT_VERSION_TOO_OLD, - "Metadata is not consinsent with the one which was used to infer table schema. Please, retry the query."); - } + throw Exception( + ErrorCodes::FORMAT_VERSION_TOO_OLD, + "Metadata is not consinsent with the one which was used to infer table schema. " + "Please, retry the query."); } + return true; } std::optional tryGetTableStructureFromMetadata() const override { - if (!current_metadata) - return std::nullopt; - auto schema_from_metadata = current_metadata->getTableSchema(); - if (!schema_from_metadata.empty()) - { - return ColumnsDescription(std::move(schema_from_metadata)); - } + _assertInitialized(); + if (auto schema = current_metadata->getTableSchema(); !schema.empty()) + return ColumnsDescription(std::move(schema)); return std::nullopt; } @@ -101,42 +108,29 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl std::optional totalRows() override { - if (!current_metadata) - return {}; - + _assertInitialized(); return current_metadata->totalRows(); } std::shared_ptr getInitialSchemaByPath(const String & data_path) const override { - if (!current_metadata) - return {}; + _assertInitialized(); return current_metadata->getInitialSchemaByPath(data_path); } std::shared_ptr getSchemaTransformer(const String & data_path) const override { - if (!current_metadata) - return {}; + _assertInitialized(); return current_metadata->getSchemaTransformer(data_path); } bool hasExternalDynamicMetadata() override { + _assertInitialized(); return (*settings)[DataLakeStorageSetting::allow_dynamic_metadata_for_data_lakes] - && current_metadata && current_metadata->supportsExternalMetadataChange(); } - ColumnsDescription updateAndGetCurrentSchema( - ObjectStoragePtr object_storage, - ContextPtr context) override - { - BaseStorageConfiguration::update(object_storage, context); - updateMetadataObjectIfNeeded(object_storage, context); - return ColumnsDescription{current_metadata->getTableSchema()}; - } - bool supportsFileIterator() const override { return true; } ObjectIterator iterate( @@ -144,7 +138,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl IDataLakeMetadata::FileProgressCallback callback, size_t list_batch_size) override { - chassert(current_metadata); + _assertInitialized(); return current_metadata->iterate(filter_dag, callback, list_batch_size); } @@ -156,6 +150,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl #if USE_PARQUET && USE_AWS_S3 DeltaLakePartitionColumns getDeltaLakePartitionColumns() const { + _assertInitialized(); const auto * delta_lake_metadata = dynamic_cast(current_metadata.get()); if (delta_lake_metadata) return delta_lake_metadata->getPartitionColumns(); @@ -168,6 +163,12 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl LoggerPtr log = getLogger("DataLakeConfiguration"); const DataLakeStorageSettingsPtr settings; + void _assertInitialized() const + { + if (!current_metadata) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Metadata is not initialized"); + } + ReadFromFormatInfo prepareReadingFromFormat( ObjectStoragePtr object_storage, const Strings & requested_columns, @@ -231,7 +232,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl return info; } - bool updateMetadataObjectIfNeeded( + bool updateMetadataIfChanged( ObjectStoragePtr object_storage, ContextPtr context) { @@ -254,15 +255,11 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl weak_from_this(), context); - if (*current_metadata != *new_metadata) - { - current_metadata = std::move(new_metadata); - return true; - } - else - { + if (*current_metadata == *new_metadata) return false; - } + + current_metadata = std::move(new_metadata); + return true; } }; @@ -373,10 +370,22 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, return getImpl().iterate(filter_dag, callback, list_batch_size); } - void update(ObjectStoragePtr object_storage, ContextPtr local_context) override - { return getImpl().update(object_storage, local_context); } - void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context) override - { return getImpl().updateIfRequired(object_storage, local_context); } + bool update( + ObjectStoragePtr object_storage, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata) override + { + return getImpl().update(object_storage, local_context, if_not_updated_before, check_consistent_with_previous_metadata); + } + bool updateIfRequired( + ObjectStoragePtr object_storage, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata) override + { + return getImpl().updateIfRequired(object_storage, local_context, if_not_updated_before, check_consistent_with_previous_metadata); + } void initialize( ASTs & engine_args, @@ -487,7 +496,7 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, return getImpl().tryGetSamplePathFromMetadata(); } - virtual void assertInitialized() const override { return getImpl().assertInitialized(); } + void assertInitialized() const override { return getImpl().assertInitialized(); } private: diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 08c371b0c39d..0638fa0cd480 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -90,7 +90,7 @@ StorageObjectStorage::StorageObjectStorage( LoadingStrictnessLevel mode, bool distributed_processing_, ASTPtr partition_by_, - bool is_table_function_, + bool is_table_function, bool lazy_init, std::optional sample_path_) : IStorage(table_id_) @@ -101,40 +101,50 @@ StorageObjectStorage::StorageObjectStorage( , distributed_processing(distributed_processing_) , log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName()))) { - bool do_lazy_init = lazy_init && !columns_.empty() && !configuration->getFormat().empty(); - update_configuration_on_read = !is_table_function_ || do_lazy_init; - bool failed_init = false; - auto do_init = [&]() + const bool need_resolve_columns_or_format = columns_.empty() || (configuration->getFormat() == "auto"); + const bool need_resolve_sample_path = context->getSettingsRef()[Setting::use_hive_partitioning] + && !configuration->withPartitionWildcard() + && !configuration->isDataLakeConfiguration(); + const bool do_lazy_init = lazy_init && !need_resolve_columns_or_format && !need_resolve_sample_path; + + bool updated_configuration = false; + try { - try + if (!do_lazy_init) { - if (configuration->hasExternalDynamicMetadata()) - configuration->updateAndGetCurrentSchema(object_storage, context); - else - configuration->update(object_storage, context); + configuration->update( + object_storage, + context, + /* if_not_updated_before */is_table_function, + /* check_consistent_with_previous_metadata */true); + + updated_configuration = true; } - catch (...) + } + catch (...) + { + // If we don't have format or schema yet, we can't ignore failed configuration update, + // because relevant configuration is crucial for format and schema inference + if (mode <= LoadingStrictnessLevel::CREATE || need_resolve_columns_or_format) { - // If we don't have format or schema yet, we can't ignore failed configuration update, - // because relevant configuration is crucial for format and schema inference - if (mode <= LoadingStrictnessLevel::CREATE || columns_.empty() || (configuration->getFormat() == "auto")) - { - throw; - } - else - { - tryLogCurrentException(log); - failed_init = true; - } + throw; } - }; + tryLogCurrentException(log); + } - if (!do_lazy_init) - do_init(); + /// We always update configuration on read for table engine, + /// but this is not needed for table function, + /// which exists only for the duration of a single query + /// (e.g. read always follows constructor immediately). + update_configuration_on_read_write = !is_table_function || !updated_configuration; std::string sample_path = sample_path_.value_or(""); ColumnsDescription columns{columns_}; - resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context); + if (need_resolve_columns_or_format) + resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context); + else + validateSupportedColumns(columns, *configuration); + configuration->check(context); StorageInMemoryMetadata metadata; @@ -144,13 +154,8 @@ StorageObjectStorage::StorageObjectStorage( /// FIXME: We need to call getPathSample() lazily on select /// in case it failed to be initialized in constructor. - if (!failed_init - && sample_path.empty() - && context->getSettingsRef()[Setting::use_hive_partitioning] - && !configuration->withPartitionWildcard()) + if (updated_configuration && sample_path.empty() && need_resolve_sample_path) { - if (do_lazy_init) - do_init(); try { sample_path = getPathSample(context); @@ -158,8 +163,10 @@ StorageObjectStorage::StorageObjectStorage( catch (...) { LOG_WARNING( - log, "Failed to list object storage, cannot use hive partitioning. " - "Error: {}", getCurrentExceptionMessage(true)); + log, + "Failed to list object storage, cannot use hive partitioning. " + "Error: {}", + getCurrentExceptionMessage(true)); } } @@ -187,40 +194,79 @@ bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) c return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->getFormat(), context, format_settings); } -void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage_ptr, ContextPtr context) +bool StorageObjectStorage::Configuration::update( ///NOLINT + ObjectStoragePtr object_storage_ptr, + ContextPtr context, + bool /* if_not_updated_before */, + bool /* check_consistent_with_previous_metadata */) { IObjectStorage::ApplyNewSettingsOptions options{.allow_client_change = !isStaticConfiguration()}; object_storage_ptr->applyNewSettings(context->getConfigRef(), getTypeName() + ".", context, options); updated = true; + return true; } -void StorageObjectStorage::Configuration::updateIfRequired(ObjectStoragePtr object_storage_ptr, ContextPtr local_context) +bool StorageObjectStorage::Configuration::updateIfRequired( + ObjectStoragePtr object_storage_ptr, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata) { if (!updated) - update(object_storage_ptr, local_context); + update(object_storage_ptr, local_context, if_not_updated_before, check_consistent_with_previous_metadata); + return true; } -bool StorageObjectStorage::hasExternalDynamicMetadata() const +bool StorageObjectStorage::updateExternalDynamicMetadataIfExists(ContextPtr query_context) { - return configuration->hasExternalDynamicMetadata(); -} + bool updated = configuration->update( + object_storage, + query_context, + /* if_not_updated_before */true, + /* check_consistent_with_previous_metadata */false); + + if (!configuration->hasExternalDynamicMetadata()) + return false; + + if (!updated) + { + /// Force the update. + configuration->update( + object_storage, + query_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */false); + } + + auto columns = configuration->tryGetTableStructureFromMetadata(); + if (!columns.has_value()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No schema in table metadata"); -void StorageObjectStorage::updateExternalDynamicMetadata(ContextPtr context_ptr) -{ StorageInMemoryMetadata metadata; - metadata.setColumns(configuration->updateAndGetCurrentSchema(object_storage, context_ptr)); + metadata.setColumns(std::move(columns.value())); setInMemoryMetadata(metadata); + return true; } std::optional StorageObjectStorage::totalRows(ContextPtr query_context) const { - configuration->update(object_storage, query_context); + configuration->update( + object_storage, + query_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + return configuration->totalRows(); } std::optional StorageObjectStorage::totalBytes(ContextPtr query_context) const { - configuration->update(object_storage, query_context); + configuration->update( + object_storage, + query_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + return configuration->totalBytes(); } @@ -371,8 +417,14 @@ void StorageObjectStorage::read( { /// We did configuration->update() in constructor, /// so in case of table function there is no need to do the same here again. - if (update_configuration_on_read) - configuration->update(object_storage, local_context); + if (update_configuration_on_read_write) + { + configuration->update( + object_storage, + local_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + } if (partition_by && configuration->withPartitionWildcard()) { @@ -411,7 +463,15 @@ SinkToStoragePtr StorageObjectStorage::write( ContextPtr local_context, bool /* async_insert */) { - configuration->update(object_storage, local_context); + if (update_configuration_on_read_write) + { + configuration->update( + object_storage, + local_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + } + const auto sample_block = metadata_snapshot->getSampleBlock(); const auto & settings = configuration->getQuerySettings(local_context); @@ -520,18 +580,6 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData( std::string & sample_path, const ContextPtr & context) { - if (configuration->isDataLakeConfiguration()) - { - if (configuration->hasExternalDynamicMetadata()) - configuration->updateAndGetCurrentSchema(object_storage, context); - else - configuration->update(object_storage, context); - - auto table_structure = configuration->tryGetTableStructureFromMetadata(); - if (table_structure) - return table_structure.value(); - } - ObjectInfos read_keys; auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context); auto schema = readSchemaFromFormat(configuration->getFormat(), format_settings, *iterator, context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 7da189454a78..072dec4129b6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -139,15 +139,16 @@ class StorageObjectStorage : public IStorage void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override; - bool hasExternalDynamicMetadata() const override; - - void updateExternalDynamicMetadata(ContextPtr) override; + bool updateExternalDynamicMetadataIfExists(ContextPtr query_context) override; std::optional totalRows(ContextPtr query_context) const override; std::optional totalBytes(ContextPtr query_context) const override; + protected: + /// Get path sample for hive partitioning implementation. String getPathSample(ContextPtr context); + /// Creates ReadBufferIterator for schema inference implementation. static std::unique_ptr createReadBufferIterator( const ObjectStoragePtr & object_storage, const ConfigurationPtr & configuration, @@ -155,12 +156,21 @@ class StorageObjectStorage : public IStorage ObjectInfos & read_keys, const ContextPtr & context); + /// Storage configuration (S3, Azure, HDFS, Local, DataLake). + /// Contains information about table engine configuration + /// and underlying storage access. ConfigurationPtr configuration; + /// `object_storage` to allow direct access to data storage. const ObjectStoragePtr object_storage; const std::optional format_settings; + /// Partition by expression from CREATE query. const ASTPtr partition_by; + /// Whether this engine is a part of according Cluster engine implementation. + /// (One of the reading replicas, not the initiator). const bool distributed_processing; - bool update_configuration_on_read; + /// Whether we need to call `configuration->update()` + /// (e.g. refresh configuration) on each read() method call. + bool update_configuration_on_read_write = true; LoggerPtr log; }; @@ -175,6 +185,7 @@ class StorageObjectStorage::Configuration using Path = std::string; using Paths = std::vector; + /// Initialize configuration from either AST or NamedCollection. virtual void initialize( ASTs & engine_args, ContextPtr local_context, @@ -239,6 +250,8 @@ class StorageObjectStorage::Configuration throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method updateAndGetCurrentSchema is not supported by storage {}", getEngineName()); } + virtual void modifyFormatSettings(FormatSettings &) const {} + virtual ReadFromFormatInfo prepareReadingFromFormat( ObjectStoragePtr object_storage, const Strings & requested_columns, @@ -258,8 +271,17 @@ class StorageObjectStorage::Configuration throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method iterate() is not implemented for configuration type {}", getTypeName()); } - virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); - virtual void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context); + /// Returns true, if metadata is of the latest version, false if unknown. + virtual bool update( + ObjectStoragePtr object_storage, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata); + virtual bool updateIfRequired( + ObjectStoragePtr object_storage, + ContextPtr local_context, + bool if_not_updated_before, + bool check_consistent_with_previous_metadata); virtual const DataLakeStorageSettings & getDataLakeSettings() const { diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index cdfad7f646cb..7621e4c34b30 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -86,6 +86,15 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( , object_storage(object_storage_) , cluster_name_in_settings(false) { + /// We allow exceptions to be thrown on update(), + /// because Cluster engine can only be used as table function, + /// so no lazy initialization is allowed. + configuration->update( + object_storage, + context_, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + ColumnsDescription columns{columns_}; std::string sample_path; resolveSchemaAndFormat(columns, object_storage, configuration, {}, sample_path, context_); @@ -130,6 +139,30 @@ std::string StorageObjectStorageCluster::getName() const return configuration->getEngineName(); } +std::optional StorageObjectStorageCluster::totalRows(ContextPtr query_context) const +{ + if (pure_storage) + return pure_storage->totalRows(query_context); + configuration->update( + object_storage, + query_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + return configuration->totalRows(); +} + +std::optional StorageObjectStorageCluster::totalBytes(ContextPtr query_context) const +{ + if (pure_storage) + return pure_storage->totalBytes(query_context); + configuration->update( + object_storage, + query_context, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + return configuration->totalBytes(); +} + void StorageObjectStorageCluster::updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context) { // Change table engine on table function for distributed request diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 7b7031a7c728..8c1694de15ff 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -41,24 +41,6 @@ class StorageObjectStorageCluster : public IStorageCluster String getClusterName(ContextPtr context) const override; - bool hasExternalDynamicMetadata() const override - { - return (pure_storage && pure_storage->hasExternalDynamicMetadata()) - || configuration->hasExternalDynamicMetadata(); - } - - void updateExternalDynamicMetadata(ContextPtr context_ptr) override - { - if (pure_storage && pure_storage->hasExternalDynamicMetadata()) - pure_storage->updateExternalDynamicMetadata(context_ptr); - if (configuration->hasExternalDynamicMetadata()) - { - StorageInMemoryMetadata metadata; - metadata.setColumns(configuration->updateAndGetCurrentSchema(object_storage, context_ptr)); - IStorageCluster::setInMemoryMetadata(metadata); - } - } - QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; void truncate( @@ -69,21 +51,8 @@ class StorageObjectStorageCluster : public IStorageCluster void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override; - std::optional totalRows(ContextPtr query_context) const override - { - if (pure_storage) - return pure_storage->totalRows(query_context); - configuration->update(object_storage, query_context); - return configuration->totalRows(); - } - - std::optional totalBytes(ContextPtr query_context) const override - { - if (pure_storage) - return pure_storage->totalBytes(query_context); - configuration->update(object_storage, query_context); - return configuration->totalBytes(); - } + std::optional totalRows(ContextPtr query_context) const override; + std::optional totalBytes(ContextPtr query_context) const override; private: void updateQueryToSendIfNeeded( diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 814b99d78032..f1d3277799ed 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -151,7 +151,7 @@ std::shared_ptr StorageObjectStorageSource::createFileIterator( throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression can not have wildcards inside {} name", configuration->getNamespaceType()); - configuration->updateIfRequired(object_storage, local_context); + configuration->updateIfRequired(object_storage, local_context, true, true); std::unique_ptr iterator; if (configuration->isPathWithGlobs()) diff --git a/src/Storages/ObjectStorage/Utils.cpp b/src/Storages/ObjectStorage/Utils.cpp index d4f152bfd582..9c71ea7431e8 100644 --- a/src/Storages/ObjectStorage/Utils.cpp +++ b/src/Storages/ObjectStorage/Utils.cpp @@ -51,29 +51,60 @@ void resolveSchemaAndFormat( std::string & sample_path, const ContextPtr & context) { + if (configuration->getFormat() == "auto") + { + if (configuration->isDataLakeConfiguration()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Format must be already specified for {} storage.", + configuration->getTypeName()); + } + } + if (columns.empty()) { - if (configuration->getFormat() == "auto") + if (configuration->isDataLakeConfiguration()) + { + auto table_structure = configuration->tryGetTableStructureFromMetadata(); + if (table_structure) + columns = table_structure.value(); + } + + if (columns.empty()) { - std::string format; - std::tie(columns, format) = - StorageObjectStorage::resolveSchemaAndFormatFromData(object_storage, configuration, format_settings, sample_path, context); - configuration->setFormat(format); + if (configuration->getFormat() == "auto") + { + std::string format; + std::tie(columns, format) = StorageObjectStorage::resolveSchemaAndFormatFromData( + object_storage, configuration, format_settings, sample_path, context); + configuration->setFormat(format); + } + else + { + chassert(!configuration->getFormat().empty()); + columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, sample_path, context); + } } - else - columns = StorageObjectStorage::resolveSchemaFromData(object_storage, configuration, format_settings, sample_path, context); } else if (configuration->getFormat() == "auto") { configuration->setFormat(StorageObjectStorage::resolveFormatFromData(object_storage, configuration, format_settings, sample_path, context)); } + validateSupportedColumns(columns, *configuration); +} + +void validateSupportedColumns( + ColumnsDescription & columns, + const StorageObjectStorage::Configuration & configuration) +{ if (!columns.hasOnlyOrdinary()) { /// We don't allow special columns. throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Special columns are not supported for {} storage" - "like MATERIALIZED, ALIAS or EPHEMERAL", configuration->getTypeName()); + "Special columns like MATERIALIZED, ALIAS or EPHEMERAL are not supported for {} storage.", + configuration.getTypeName()); } } diff --git a/src/Storages/ObjectStorage/Utils.h b/src/Storages/ObjectStorage/Utils.h index 17e30babb709..7631d92173db 100644 --- a/src/Storages/ObjectStorage/Utils.h +++ b/src/Storages/ObjectStorage/Utils.h @@ -21,4 +21,8 @@ void resolveSchemaAndFormat( std::string & sample_path, const ContextPtr & context); +void validateSupportedColumns( + ColumnsDescription & columns, + const StorageObjectStorage::Configuration & configuration); + } diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index a57e2bfd5ce9..7f908e5c9af8 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -128,10 +128,24 @@ ColumnsDescription TableFunctionObjectStorage< if (configuration->getStructure() == "auto") { context->checkAccess(getSourceAccessType()); - ColumnsDescription columns; + auto storage = getObjectStorage(context, !is_insert_query); + configuration->update( + object_storage, + context, + /* if_not_updated_before */true, + /* check_consistent_with_previous_metadata */true); + std::string sample_path; - resolveSchemaAndFormat(columns, storage, configuration, std::nullopt, sample_path, context); + ColumnsDescription columns; + resolveSchemaAndFormat( + columns, + std::move(storage), + configuration, + /* format_settings */std::nullopt, + sample_path, + context); + return columns; } return parseColumnsListFromString(configuration->getStructure(), context); diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 7dcad1383fd3..22ef293948c1 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -149,7 +149,13 @@ class TableFunctionObjectStorage : public ITableFunction const ContextPtr & context) { if constexpr (is_data_lake) - Configuration(createEmptySettings()).addStructureAndFormatToArgsIfNeeded(args, structure, format, context, /*with_structure=*/true); + { + Configuration configuration(createEmptySettings()); + if (configuration.getFormat() == "auto") + configuration.setFormat("Parquet"); /// Default format of data lakes. + + configuration.addStructureAndFormatToArgsIfNeeded(args, structure, format, context, /*with_structure=*/true); + } else Configuration().addStructureAndFormatToArgsIfNeeded(args, structure, format, context, /*with_structure=*/true); } diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index 0995fef125e7..a6bdb32028a3 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -46,7 +46,8 @@ StoragePtr TableFunctionObjectStorageCluster 0 print(f"Uploaded files: {files}") - result = instance.query( - f"describe table deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', '{minio_secret_key}', SETTINGS allow_experimental_delta_kernel_rs={use_delta_kernel})" - ).strip() + table_function = f"deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', '{minio_secret_key}', SETTINGS allow_experimental_delta_kernel_rs={use_delta_kernel})" + result = instance.query(f"describe table {table_function}").strip() assert ( result == "a\tNullable(Int32)\t\t\t\t\t\nb\tNullable(String)\t\t\t\t\t\nc\tNullable(Date32)\t\t\t\t\t\nd\tNullable(Int32)\t\t\t\t\t\ne\tNullable(Bool)" ) - result = int( - instance.query( - f"""SELECT count() - FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', '{minio_secret_key}', SETTINGS allow_experimental_delta_kernel_rs={use_delta_kernel}) - """ - ) - ) + result = int(instance.query(f"SELECT count() FROM {table_function}")) assert result == num_rows query_id = f"query_with_filter_{TABLE_NAME}" @@ -1217,3 +1210,42 @@ def check_pruned(count, query_id): ) check_pruned(num_files - 1, query_id) + + +@pytest.mark.parametrize("new_analyzer", ["1", "0"]) +def test_cluster_function(started_cluster, new_analyzer): + instance = started_cluster.instances["node1"] + table_name = randomize_table_name("test_cluster_function") + + schema = pa.schema([("a", pa.int32()), ("b", pa.string())]) + data = [ + pa.array([1, 2, 3, 4, 5], type=pa.int32()), + pa.array(["aa", "bb", "cc", "aa", "bb"], type=pa.string()), + ] + + storage_options = { + "AWS_ENDPOINT_URL": f"http://{started_cluster.minio_ip}:{started_cluster.minio_port}", + "AWS_ACCESS_KEY_ID": minio_access_key, + "AWS_SECRET_ACCESS_KEY": minio_secret_key, + "AWS_ALLOW_HTTP": "true", + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } + path = f"s3://root/{table_name}" + table = pa.Table.from_arrays(data, schema=schema) + write_deltalake(path, table, storage_options=storage_options) + + table_function = f""" +deltaLakeCluster(cluster, + 'http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , + '{minio_access_key}', + '{minio_secret_key}', + SETTINGS allow_experimental_delta_kernel_rs=1) + """ + instance.query( + f"SELECT * FROM {table_function} SETTINGS allow_experimental_analyzer={new_analyzer}" + ) + assert 5 == int( + instance.query( + f"SELECT count() FROM {table_function} SETTINGS allow_experimental_analyzer={new_analyzer}" + ) + ) From 3e8266e933fe7eb050366a7203bf01164de4186c Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 4 Jul 2025 10:49:26 +0200 Subject: [PATCH 04/14] Try fix build --- src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index 063341a70827..8f4a38f23745 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -57,12 +57,13 @@ class DeltaLakeMetadata final : public IDataLakeMetadata { #if USE_DELTA_KERNEL_RS auto configuration_ptr = configuration.lock(); - const auto & settings_ref = configuration_ptr->getSettingsRef(); - if (settings_ref[StorageObjectStorageSetting::allow_experimental_delta_kernel_rs]) + const auto & query_settings_ref = local_context->getSettingsRef(); + + bool enable_delta_kernel = query_settings_ref[Setting::allow_experimental_delta_kernel_rs]; + if (enable_delta_kernel) return std::make_unique( object_storage, - configuration, - settings_ref[StorageObjectStorageSetting::delta_lake_read_schema_same_as_table_schema]); + configuration); else return std::make_unique(object_storage, configuration, local_context); #else From e0745d5d5dd7a52ad199e6b58ecf5a620f82bd0c Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Wed, 23 Apr 2025 08:59:37 +0000 Subject: [PATCH 05/14] Merge pull request #79418 from ClickHouse/add-query-level-setting-for-delta-kernel Add a query level setting to enable delta kernel rs --- src/Core/Settings.cpp | 3 +++ src/Core/SettingsChangesHistory.cpp | 9 +++++++ .../enableAllExperimentalSettings.cpp | 1 + .../DataLakes/DeltaLakeMetadata.h | 7 ++++++ tests/integration/test_storage_delta/test.py | 24 +++++++++++++++---- 5 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 1aff9006c305..3ee1a7d7a8bf 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6121,6 +6121,9 @@ Enable PRQL - an alternative to SQL. )", EXPERIMENTAL) \ DECLARE(Bool, enable_adaptive_memory_spill_scheduler, false, R"( Trigger processor to spill data into external storage adpatively. grace join is supported at present. +)", EXPERIMENTAL) \ + DECLARE(Bool, allow_experimental_delta_kernel_rs, false, R"( +Allow experimental delta-kernel-rs implementation. )", EXPERIMENTAL) \ DECLARE(String, object_storage_cluster, "", R"( Cluster to make distributed requests to object storages with alternative syntax. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 9fb07c7ef5b5..16a01e6ba40e 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -70,6 +70,15 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() { // Altinity Antalya modifications atop of 25.3 {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, + {"geotoh3_lon_lat_input_order", true, false, "A new setting for legacy behaviour to set lon and lat order"}, + {"secondary_indices_enable_bulk_filtering", false, true, "A new algorithm for filtering by data skipping indices"}, + {"implicit_table_at_top_level", "", "", "A new setting, used in clickhouse-local"}, + {"use_skip_indexes_if_final_exact_mode", 0, 0, "This setting was introduced to help FINAL query return correct results with skip indexes"}, + {"parallel_replicas_insert_select_local_pipeline", false, false, "Use local pipeline during distributed INSERT SELECT with parallel replicas. Currently disabled due to performance issues"}, + {"page_cache_block_size", 1048576, 1048576, "Made this setting adjustable on a per-query level."}, + {"page_cache_lookahead_blocks", 16, 16, "Made this setting adjustable on a per-query level."}, + {"output_format_pretty_glue_chunks", "0", "auto", "A new setting to make Pretty formats prettier."}, + {"allow_experimental_delta_kernel_rs", false, false, "New setting"}, }); addSettingsChanges(settings_changes_history, "25.2.1.20000", { diff --git a/src/Databases/enableAllExperimentalSettings.cpp b/src/Databases/enableAllExperimentalSettings.cpp index c6883a83c097..f2ce80ae1549 100644 --- a/src/Databases/enableAllExperimentalSettings.cpp +++ b/src/Databases/enableAllExperimentalSettings.cpp @@ -53,6 +53,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context) context->setSetting("allow_not_comparable_types_in_order_by", 1); context->setSetting("allow_experimental_database_unity_catalog", 1); context->setSetting("allow_experimental_database_glue_catalog", 1); + context->setSetting("allow_experimental_delta_kernel_rs", 1); /// clickhouse-private settings context->setSetting("allow_experimental_shared_set_join", 1); diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index 8f4a38f23745..6f4bc8881fc6 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -12,9 +12,14 @@ #include #include #include +#include namespace DB { +namespace Setting +{ +extern const SettingsBool allow_experimental_delta_kernel_rs; +} struct DeltaLakePartitionColumn { @@ -61,9 +66,11 @@ class DeltaLakeMetadata final : public IDataLakeMetadata bool enable_delta_kernel = query_settings_ref[Setting::allow_experimental_delta_kernel_rs]; if (enable_delta_kernel) + { return std::make_unique( object_storage, configuration); + } else return std::make_unique(object_storage, configuration, local_context); #else diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index 1008740b2fd9..afb15935508d 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -1148,10 +1148,15 @@ def test_partition_columns_2(started_cluster, use_delta_kernel): 'http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , '{minio_access_key}', '{minio_secret_key}', - SETTINGS allow_experimental_delta_kernel_rs={use_delta_kernel}) + SETTINGS allow_experimental_delta_kernel_rs=0) """ - num_files = int(node.query(f"SELECT uniqExact(_path) FROM {delta_function}")) + num_files = int( + node.query( + f"SELECT uniqExact(_path) FROM {delta_function}", + settings={"allow_experimental_delta_kernel_rs": 1}, + ) + ) assert num_files == 5 new_data = [ @@ -1172,10 +1177,19 @@ def test_partition_columns_2(started_cluster, use_delta_kernel): "b\tNullable(Int32)\t\t\t\t\t\n" "c\tNullable(Int32)\t\t\t\t\t\n" "d\tNullable(String)\t\t\t\t\t\n" - "e\tNullable(String)" == node.query(f"DESCRIBE TABLE {delta_function}").strip() + "e\tNullable(String)" + == node.query( + f"DESCRIBE TABLE {delta_function}", + settings={"allow_experimental_delta_kernel_rs": 1}, + ).strip() ) - num_files = int(node.query(f"SELECT uniqExact(_path) FROM {delta_function}")) + num_files = int( + node.query( + f"SELECT uniqExact(_path) FROM {delta_function}", + settings={"allow_experimental_delta_kernel_rs": 1}, + ) + ) assert num_files == 6 query_id = f"{table_name}-{uuid.uuid4()}" @@ -1184,6 +1198,7 @@ def test_partition_columns_2(started_cluster, use_delta_kernel): in node.query( f" SELECT a FROM {delta_function} WHERE c = 7 and d = 'aa'", query_id=query_id, + settings={"allow_experimental_delta_kernel_rs": 1}, ).strip() ) @@ -1206,6 +1221,7 @@ def check_pruned(count, query_id): in node.query( f"SELECT a FROM {delta_function} WHERE c = 7 and d = 'bb'", query_id=query_id, + settings={"allow_experimental_delta_kernel_rs": 1}, ).strip() ) From e205c50ec0c8f3bb2bf5c9873e3ca30642cf5a61 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Wed, 9 Apr 2025 17:29:54 +0000 Subject: [PATCH 06/14] Merge pull request #78690 from ClickHouse/delta-lake-fix-read-schema delta kernel fix read schema vs table schema matching --- .../DataLakes/DataLakeConfiguration.h | 48 +------- .../DataLakes/DeltaLake/TableSnapshot.cpp | 69 ++++------- .../DataLakes/DeltaLake/TableSnapshot.h | 15 ++- .../DeltaLake/getSchemaFromSnapshot.cpp | 116 ++++++++++-------- .../DeltaLake/getSchemaFromSnapshot.h | 16 +-- .../DeltaLakeMetadataDeltaKernel.cpp | 71 +++++++++-- .../DataLakes/DeltaLakeMetadataDeltaKernel.h | 6 +- .../DataLakes/IDataLakeMetadata.cpp | 9 ++ .../DataLakes/IDataLakeMetadata.h | 14 ++- .../DataLakes/Iceberg/IcebergMetadata.h | 2 +- tests/integration/test_storage_delta/test.py | 2 +- 11 files changed, 191 insertions(+), 177 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 99f1ee744a61..4102d780b3a0 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -176,7 +176,6 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl bool supports_subset_of_columns, ContextPtr local_context) override { - auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns); if (!current_metadata) { current_metadata = DataLakeMetadata::create( @@ -184,52 +183,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl weak_from_this(), local_context); } - auto read_schema = current_metadata->getReadSchema(); - if (!read_schema.empty()) - { - /// There is a difference between "table schema" and "read schema". - /// "table schema" is a schema from data lake table metadata, - /// while "read schema" is a schema from data files. - /// In most cases they would be the same. - /// TODO: Try to hide this logic inside IDataLakeMetadata. - - const auto read_schema_names = read_schema.getNames(); - const auto table_schema_names = current_metadata->getTableSchema().getNames(); - chassert(read_schema_names.size() == table_schema_names.size()); - - if (read_schema_names != table_schema_names) - { - LOG_TEST(log, "Read schema: {}, table schema: {}, requested columns: {}", - fmt::join(read_schema_names, ", "), - fmt::join(table_schema_names, ", "), - fmt::join(info.requested_columns.getNames(), ", ")); - - auto column_name_mapping = [&]() - { - std::map result; - for (size_t i = 0; i < read_schema_names.size(); ++i) - result[table_schema_names[i]] = read_schema_names[i]; - return result; - }(); - - /// Go through requested columns and change column name - /// from table schema to column name from read schema. - - std::vector read_columns; - for (const auto & column_name : info.requested_columns) - { - const auto pos = info.format_header.getPositionByName(column_name.name); - auto column = info.format_header.getByPosition(pos); - column.name = column_name_mapping.at(column_name.name); - info.format_header.setColumn(pos, column); - - read_columns.emplace_back(column.name, column.type); - } - info.requested_columns = NamesAndTypesList(read_columns.begin(), read_columns.end()); - } - } - - return info; + return current_metadata->prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns); } bool updateMetadataIfChanged( diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp index f2eda645cfba..2b8699d86605 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.cpp @@ -64,6 +64,7 @@ class TableSnapshot::Iterator final : public DB::IObjectIterator Iterator( const KernelExternEngine & engine_, const KernelSnapshot & snapshot_, + KernelScan & scan_, const std::string & data_prefix_, const DB::NamesAndTypesList & schema_, const DB::Names & partition_columns_, @@ -74,6 +75,7 @@ class TableSnapshot::Iterator final : public DB::IObjectIterator LoggerPtr log_) : engine(engine_) , snapshot(snapshot_) + , scan(scan_) , data_prefix(data_prefix_) , schema(schema_) , partition_columns(partition_columns_) @@ -268,10 +270,9 @@ class TableSnapshot::Iterator final : public DB::IObjectIterator using KernelScan = KernelPointerWrapper; using KernelScanDataIterator = KernelPointerWrapper; - const KernelExternEngine & engine; const KernelSnapshot & snapshot; - KernelScan scan; + KernelScan & scan; KernelScanDataIterator scan_data_iterator; std::optional pruner; @@ -348,15 +349,20 @@ void TableSnapshot::initSnapshotImpl() const snapshot = KernelUtils::unwrapResult( ffi::snapshot(KernelUtils::toDeltaString(helper->getTableLocation()), engine.get()), "snapshot"); snapshot_version = ffi::version(snapshot.get()); - LOG_TRACE(log, "Snapshot version: {}", snapshot_version); -} -ffi::SharedSnapshot * TableSnapshot::getSnapshot() const -{ - if (!snapshot.get()) - initSnapshot(); - return snapshot.get(); + scan = KernelUtils::unwrapResult(ffi::scan(snapshot.get(), engine.get(), /* predicate */{}), "scan"); + scan_state = ffi::get_global_scan_state(scan.get()); + LOG_TRACE(log, "Initialized scan state"); + + std::tie(table_schema, physical_names_map) = getTableSchemaFromSnapshot(snapshot.get()); + LOG_TRACE(log, "Table schema: {}", fmt::join(table_schema.getNames(), ", ")); + + read_schema = getReadSchemaFromSnapshot(scan_state.get()); + LOG_TRACE(log, "Read schema: {}", fmt::join(read_schema.getNames(), ", ")); + + partition_columns = getPartitionColumnsFromSnapshot(scan_state.get()); + LOG_TRACE(log, "Partition columns: {}", fmt::join(partition_columns, ", ")); } DB::ObjectIterator TableSnapshot::iterate( @@ -368,6 +374,7 @@ DB::ObjectIterator TableSnapshot::iterate( return std::make_shared( engine, snapshot, + scan, helper->getDataPath(), getTableSchema(), getPartitionColumns(), @@ -380,52 +387,26 @@ DB::ObjectIterator TableSnapshot::iterate( const DB::NamesAndTypesList & TableSnapshot::getTableSchema() const { - if (!table_schema.has_value()) - { - table_schema = getTableSchemaFromSnapshot(getSnapshot()); - LOG_TRACE(log, "Fetched table schema"); - LOG_TEST(log, "Table schema: {}", table_schema->toString()); - } - return table_schema.value(); + initSnapshot(); + return table_schema; } const DB::NamesAndTypesList & TableSnapshot::getReadSchema() const { - if (read_schema_same_as_table_schema) - return getTableSchema(); - if (!read_schema.has_value()) - loadReadSchemaAndPartitionColumns(); - return read_schema.value(); + initSnapshot(); + return read_schema; } const DB::Names & TableSnapshot::getPartitionColumns() const { - if (!partition_columns.has_value()) - loadReadSchemaAndPartitionColumns(); - return partition_columns.value(); + initSnapshot(); + return partition_columns; } -void TableSnapshot::loadReadSchemaAndPartitionColumns() const +const DB::NameToNameMap & TableSnapshot::getPhysicalNamesMap() const { - auto * current_snapshot = getSnapshot(); - chassert(engine.get()); - if (read_schema_same_as_table_schema) - { - partition_columns = getPartitionColumnsFromSnapshot(current_snapshot, engine.get()); - LOG_TRACE( - log, "Fetched partition columns: {}", - fmt::join(partition_columns.value(), ", ")); - } - else - { - std::tie(read_schema, partition_columns) = getReadSchemaAndPartitionColumnsFromSnapshot(current_snapshot, engine.get()); - LOG_TRACE( - log, "Fetched read schema and partition columns: {}", - fmt::join(partition_columns.value(), ", ")); - - LOG_TEST(log, "Read schema: {}", read_schema->toString()); - } - + initSnapshot(); + return physical_names_map; } } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h index 7a9a8b5d94ad..6945719f3071 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLake/TableSnapshot.h @@ -54,33 +54,32 @@ class TableSnapshot /// Therefore "table schema" would contain partition columns, /// but "read schema" would not. const DB::Names & getPartitionColumns() const; + const DB::NameToNameMap & getPhysicalNamesMap() const; private: class Iterator; using KernelExternEngine = KernelPointerWrapper; using KernelSnapshot = KernelPointerWrapper; using KernelScan = KernelPointerWrapper; + using KernelGlobalScanState = KernelPointerWrapper; const KernelHelperPtr helper; const DB::ObjectStoragePtr object_storage; - const bool read_schema_same_as_table_schema; const LoggerPtr log; mutable KernelExternEngine engine; mutable KernelSnapshot snapshot; mutable KernelScan scan; + mutable KernelGlobalScanState scan_state; mutable size_t snapshot_version; - mutable std::optional table_schema; - mutable std::optional read_schema; - mutable std::optional partition_columns; + mutable DB::NamesAndTypesList table_schema; + mutable DB::NameToNameMap physical_names_map; + mutable DB::NamesAndTypesList read_schema; + mutable DB::Names partition_columns; void initSnapshot() const; void initSnapshotImpl() const; - /// Both read schema and partition columns are loaded with the same data scan object, - /// therefore we load them together. - void loadReadSchemaAndPartitionColumns() const; - ffi::SharedSnapshot * getSnapshot() const; }; /// TODO; Enable event tracing in DeltaKernel. diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.cpp index 53745f4b0a54..5c86ad5b94d4 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.cpp @@ -5,6 +5,7 @@ #include "KernelUtils.h" #include "KernelPointerWrapper.h" +#include #include #include @@ -77,26 +78,25 @@ class SchemaVisitorData friend class SchemaVisitor; public: - DB::NamesAndTypesList getSchemaResult(); - const DB::Names & getPartitionColumns() const { return partition_columns; } - - void initScanState( - ffi::SharedSnapshot * snapshot, - ffi::SharedExternEngine * engine) + struct SchemaResult { - if (!scan.get()) - scan = KernelUtils::unwrapResult(ffi::scan(snapshot, engine, /* predicate */{}), "scan"); - if (!scan_state.get()) - scan_state = ffi::get_global_scan_state(scan.get()); - } + DB::NamesAndTypesList names_and_types; + DB::NameToNameMap physical_names_map; + }; + SchemaResult getSchemaResult(); + const DB::Names & getPartitionColumns() const { return partition_columns; } private: DB::DataTypes getDataTypesFromTypeList(size_t list_idx); struct Field { - Field(const std::string & name_, const DB::TypeIndex & type_, bool nullable_) - : name(name_), type(type_), nullable(nullable_) {} + Field( + const std::string & name_, + const DB::TypeIndex & type_, + bool nullable_, + const std::string & physical_name_) + : name(name_), type(type_), nullable(nullable_), physical_name(physical_name_) {} /// Column name. const std::string name; @@ -104,6 +104,10 @@ class SchemaVisitorData const DB::TypeIndex type; /// Column nullability. const bool nullable; + /// In case of columnMapping.mode = 'name', + /// physical name of the column in parquet metadata + /// will be different from table schema column name. + const std::string physical_name; /// If type is complex (array, map, struct), whether it can contain nullable values. bool value_contains_null; @@ -134,8 +138,6 @@ class SchemaVisitorData using KernelScan = KernelPointerWrapper; using KernelGlobalScanState = KernelPointerWrapper; - KernelScan scan; - KernelGlobalScanState scan_state; }; /** @@ -157,25 +159,20 @@ class SchemaVisitor } static void visitReadSchema( - ffi::SharedSnapshot * snapshot, - ffi::SharedExternEngine * engine, + ffi::SharedGlobalScanState * scan_state, SchemaVisitorData & data) { - data.initScanState(snapshot, engine); - KernelSharedSchema schema = ffi::get_global_read_schema(data.scan_state.get()); - + KernelSharedSchema schema = ffi::get_global_read_schema(scan_state); auto visitor = createVisitor(data); size_t result = ffi::visit_schema(schema.get(), &visitor); chassert(result == 0, "Unexpected result: " + DB::toString(result)); } static void visitPartitionColumns( - ffi::SharedSnapshot * snapshot, - ffi::SharedExternEngine * engine, + ffi::SharedGlobalScanState * scan_state, SchemaVisitorData & data) { - data.initScanState(snapshot, engine); - KernelStringSliceIterator partition_columns_iter = ffi::get_partition_columns(data.scan_state.get()); + KernelStringSliceIterator partition_columns_iter = ffi::get_partition_columns(scan_state); while (ffi::string_slice_next(partition_columns_iter.get(), &data, &visitPartitionColumn)) {} } @@ -226,13 +223,22 @@ class SchemaVisitor return id; } + static std::unique_ptr extractPhysicalName(const ffi::CStringMap * metadata) + { + std::string * physical_name = static_cast(ffi::get_from_string_map( + metadata, + KernelUtils::toDeltaString("delta.columnMapping.physicalName"), + KernelUtils::allocateString)); + return physical_name ? std::unique_ptr(physical_name) : nullptr; + } + template static void simpleTypeVisitor( void * data, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool nullable, - const ffi::CStringMap * /* metadata */) + const ffi::CStringMap * metadata) { SchemaVisitorData * state = static_cast(data); auto it = state->type_lists.find(sibling_list_id); @@ -244,13 +250,15 @@ class SchemaVisitor } const std::string column_name(name.ptr, name.len); + const auto physical_name_ptr = extractPhysicalName(metadata); + const std::string physical_name = physical_name_ptr ? *physical_name_ptr : ""; LOG_TEST( state->log, - "List id: {}, column name: {}, type: {}, nullable: {}", - sibling_list_id, column_name, type, nullable); + "List id: {}, column name: {} (physical name: {}), type: {}, nullable: {}", + sibling_list_id, column_name, physical_name, type, nullable); - SchemaVisitorData::Field field(column_name, std::move(type), nullable); + SchemaVisitorData::Field field(column_name, std::move(type), nullable, physical_name); field.is_bool = is_bool; it->second->push_back(std::move(field)); } @@ -260,7 +268,7 @@ class SchemaVisitor uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool nullable, - const ffi::CStringMap * /* metadata */, + const ffi::CStringMap * metadata, uint8_t precision, uint8_t scale) { @@ -275,13 +283,15 @@ class SchemaVisitor } const std::string column_name(name.ptr, name.len); + const auto physical_name_ptr = extractPhysicalName(metadata); + const std::string physical_name = physical_name_ptr ? *physical_name_ptr : ""; LOG_TEST( state->log, - "List id: {}, column name: {}, type: {}, nullable: {}", - sibling_list_id, column_name, type, nullable); + "List id: {}, column name: {} (physical name: {}), type: {}, nullable: {}", + sibling_list_id, column_name, physical_name, type, nullable); - SchemaVisitorData::Field field(column_name, type, nullable); + SchemaVisitorData::Field field(column_name, type, nullable, physical_name); field.precision = precision; field.scale = scale; it->second->push_back(std::move(field)); @@ -326,7 +336,7 @@ class SchemaVisitor uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool nullable, - const ffi::CStringMap * /* metadata */, + const ffi::CStringMap * metadata, uintptr_t child_list_id) { SchemaVisitorData * state = static_cast(data); @@ -339,31 +349,40 @@ class SchemaVisitor } const std::string column_name(name.ptr, name.len); + const auto physical_name_ptr = extractPhysicalName(metadata); + const std::string physical_name = physical_name_ptr ? *physical_name_ptr : ""; LOG_TEST( state->log, - "List id: {}, column name: {}, type: {}, " + "List id: {}, column name: {} (physical name: {}), type: {}, " "nullable: {}, child list id: {}", - sibling_list_id, column_name, type, nullable, child_list_id); + sibling_list_id, column_name, physical_name, type, nullable, child_list_id); - SchemaVisitorData::Field field(column_name, std::move(type), nullable); + SchemaVisitorData::Field field(column_name, std::move(type), nullable, physical_name); field.child_list_id = child_list_id; it->second->push_back(field); } }; -DB::NamesAndTypesList SchemaVisitorData::getSchemaResult() +SchemaVisitorData::SchemaResult SchemaVisitorData::getSchemaResult() { const auto types = getDataTypesFromTypeList(0); chassert(types.size() == type_lists[0]->size()); - std::list result; + std::list names_and_types; + SchemaResult result; for (size_t i = 0; i < types.size(); ++i) { const auto & field = (*type_lists[0])[i]; - result.emplace_back(field.name, types[i]); + names_and_types.emplace_back(field.name, types[i]); + if (!field.physical_name.empty()) + { + [[maybe_unused]] bool inserted = result.physical_names_map.emplace(field.name, field.physical_name).second; + chassert(inserted); + } } - return DB::NamesAndTypesList(result.begin(), result.end()); + result.names_and_types = DB::NamesAndTypesList(names_and_types.begin(), names_and_types.end()); + return result; } DB::DataTypes SchemaVisitorData::getDataTypesFromTypeList(size_t list_idx) @@ -446,26 +465,25 @@ DB::DataTypes SchemaVisitorData::getDataTypesFromTypeList(size_t list_idx) return types; } -DB::NamesAndTypesList getTableSchemaFromSnapshot(ffi::SharedSnapshot * snapshot) +std::pair getTableSchemaFromSnapshot(ffi::SharedSnapshot * snapshot) { SchemaVisitorData data; SchemaVisitor::visitTableSchema(snapshot, data); - return data.getSchemaResult(); + auto result = data.getSchemaResult(); + return {result.names_and_types, result.physical_names_map}; } -std::pair -getReadSchemaAndPartitionColumnsFromSnapshot(ffi::SharedSnapshot * snapshot, ffi::SharedExternEngine * engine) +DB::NamesAndTypesList getReadSchemaFromSnapshot(ffi::SharedGlobalScanState * scan_state) { SchemaVisitorData data; - SchemaVisitor::visitReadSchema(snapshot, engine, data); - SchemaVisitor::visitPartitionColumns(snapshot, engine, data); - return {data.getSchemaResult(), data.getPartitionColumns()}; + SchemaVisitor::visitReadSchema(scan_state, data); + return data.getSchemaResult().names_and_types; } -DB::Names getPartitionColumnsFromSnapshot(ffi::SharedSnapshot * snapshot, ffi::SharedExternEngine * engine) +DB::Names getPartitionColumnsFromSnapshot(ffi::SharedGlobalScanState * scan_state) { SchemaVisitorData data; - SchemaVisitor::visitPartitionColumns(snapshot, engine, data); + SchemaVisitor::visitPartitionColumns(scan_state, data); return data.getPartitionColumns(); } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.h b/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.h index c7d511baa951..cf98e214106c 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLake/getSchemaFromSnapshot.h @@ -9,25 +9,25 @@ namespace ffi { struct SharedSnapshot; -struct SharedExternEngine; +struct SharedGlobalScanState; } namespace DeltaLake { -/// Get table schema. +/// Get table schema and physical column map (logical name to physical name mapping). /// Represents table schema from DeltaLake metadata. /// Contains partition columns. -DB::NamesAndTypesList getTableSchemaFromSnapshot(ffi::SharedSnapshot * snapshot); +std::pair getTableSchemaFromSnapshot(ffi::SharedSnapshot * snapshot); -/// Get read schema and partition columns. +/// Get read schema. /// Represents read schema based on data files. +DB::NamesAndTypesList getReadSchemaFromSnapshot(ffi::SharedGlobalScanState * scan_state); + +/// Get list of partition columns. /// Read schema does not contain partition columns, /// therefore partition columns are passed separately. -std::pair -getReadSchemaAndPartitionColumnsFromSnapshot(ffi::SharedSnapshot * snapshot, ffi::SharedExternEngine * engine); - -DB::Names getPartitionColumnsFromSnapshot(ffi::SharedSnapshot * snapshot, ffi::SharedExternEngine * engine); +DB::Names getPartitionColumnsFromSnapshot(ffi::SharedGlobalScanState * scan_state); } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp index ef043f9725be..fa9f1b3d0cee 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp @@ -3,9 +3,14 @@ #if USE_PARQUET && USE_DELTA_KERNEL_RS #include #include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} DeltaLakeMetadataDeltaKernel::DeltaLakeMetadataDeltaKernel( ObjectStoragePtr object_storage, @@ -48,25 +53,65 @@ NamesAndTypesList DeltaLakeMetadataDeltaKernel::getTableSchema() const return table_snapshot->getTableSchema(); } -NamesAndTypesList DeltaLakeMetadataDeltaKernel::getReadSchema() const +DB::ReadFromFormatInfo DeltaLakeMetadataDeltaKernel::prepareReadingFromFormat( + const Strings & requested_columns, + const DB::StorageSnapshotPtr & storage_snapshot, + const ContextPtr & context, + bool supports_subset_of_columns) { - auto schema = table_snapshot->getReadSchema(); + auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, context, supports_subset_of_columns); + info.format_header.clear(); + + /// Read schema is different from table schema in case: + /// 1. we have partition columns (they are not stored in the actual data) + /// 2. columnMapping.mode = 'name' or 'id'. + /// So we add partition columns to read schema and put it together into format_header. + /// Partition values will be added to result data right after data is read. - /// Read schema does not contain partition columns - /// because they are not present in the actual data. - /// We have to add them here. - auto partition_columns = table_snapshot->getPartitionColumns(); - if (!partition_columns.empty()) + for (const auto & [column_name, column_type] : table_snapshot->getReadSchema()) + info.format_header.insert({column_type->createColumn(), column_type, column_name}); + + const auto & physical_names_map = table_snapshot->getPhysicalNamesMap(); + auto get_physical_name = [&](const std::string & column_name) { - auto table_schema = getTableSchema(); - for (const auto & column : partition_columns) + if (physical_names_map.empty()) + return column_name; + auto it = physical_names_map.find(column_name); + if (it == physical_names_map.end()) { - auto name_and_type = table_schema.tryGetByName(column); - if (name_and_type.has_value()) - schema.insert(schema.end(), name_and_type.value()); + Names keys; + keys.reserve(physical_names_map.size()); + for (const auto & [key, _] : physical_names_map) + keys.push_back(key); + + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Not found column {} in physical names map. There are only columns: {}", + column_name, fmt::join(keys, ", ")); } + return it->second; + }; + + const auto & table_schema = table_snapshot->getTableSchema(); + for (const auto & column_name : table_snapshot->getPartitionColumns()) + { + auto name_and_type = table_schema.tryGetByName(column_name); + if (!name_and_type) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Not found partition column {} in table schema", column_name); + + info.format_header.insert({name_and_type->type->createColumn(), name_and_type->type, get_physical_name(column_name)}); + } + + /// Update requested columns to reference actual physical column names. + if (!physical_names_map.empty()) + { + for (auto & [column_name, _] : info.requested_columns) + column_name = get_physical_name(column_name); } - return schema; + + return info; } } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h index 8e817974215c..eb94c44fa3d0 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h @@ -37,7 +37,11 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata NamesAndTypesList getTableSchema() const override; - NamesAndTypesList getReadSchema() const override; + DB::ReadFromFormatInfo prepareReadingFromFormat( + const Strings & requested_columns, + const DB::StorageSnapshotPtr & storage_snapshot, + const ContextPtr & context, + bool supports_subset_of_columns) override; bool operator ==(const IDataLakeMetadata &) const override; diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index 6bbf81c74965..df4f5ed3a45b 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -64,4 +64,13 @@ ObjectIterator IDataLakeMetadata::createKeysIterator( return std::make_shared(std::move(data_files_), object_storage_, callback_); } +DB::ReadFromFormatInfo IDataLakeMetadata::prepareReadingFromFormat( + const Strings & requested_columns, + const DB::StorageSnapshotPtr & storage_snapshot, + const ContextPtr & context, + bool supports_subset_of_columns) +{ + return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, context, supports_subset_of_columns); +} + } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 1984750351bc..5315d266e880 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -4,6 +4,7 @@ #include #include "Interpreters/ActionsDAG.h" #include +#include namespace DB { @@ -35,10 +36,14 @@ class IDataLakeMetadata : boost::noncopyable /// Read schema is the schema of actual data files, /// which can differ from table schema from data lake metadata. /// Return nothing if read schema is the same as table schema. - virtual NamesAndTypesList getReadSchema() const { return {}; } + virtual DB::ReadFromFormatInfo prepareReadingFromFormat( + const Strings & requested_columns, + const DB::StorageSnapshotPtr & storage_snapshot, + const ContextPtr & context, + bool supports_subset_of_columns); - virtual std::shared_ptr getInitialSchemaByPath(const String &) const { return {}; } - virtual std::shared_ptr getSchemaTransformer(const String &) const { return {}; } + virtual std::shared_ptr getInitialSchemaByPath(const String & /* path */) const { return {}; } + virtual std::shared_ptr getSchemaTransformer(const String & /* path */) const { return {}; } /// Whether metadata is updateable (instead of recreation from scratch) /// to the latest version of table state in data lake. @@ -46,8 +51,7 @@ class IDataLakeMetadata : boost::noncopyable /// Update metadata to the latest version. virtual bool update(const ContextPtr &) { return false; } - /// Whether schema evolution is supported. - virtual bool supportsExternalMetadataChange() const { return false; } + virtual bool supportsSchemaEvolution() const { return false; } virtual std::optional totalRows() const { return {}; } virtual std::optional totalBytes() const { return {}; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index c2cf03afa198..57dc04044d86 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -78,7 +78,7 @@ class IcebergMetadata : public IDataLakeMetadata : nullptr; } - bool supportsExternalMetadataChange() const override { return true; } + bool supportsSchemaEvolution() const override { return true; } static Int32 parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, IcebergSchemaProcessor & schema_processor, LoggerPtr metadata_logger); diff --git a/tests/integration/test_storage_delta/test.py b/tests/integration/test_storage_delta/test.py index afb15935508d..55e687cb9eea 100644 --- a/tests/integration/test_storage_delta/test.py +++ b/tests/integration/test_storage_delta/test.py @@ -593,7 +593,7 @@ def test_partition_columns(started_cluster, use_delta_kernel): bucket = started_cluster.minio_bucket TABLE_NAME = randomize_table_name("test_partition_columns") result_file = f"{TABLE_NAME}" - partition_columns = ["b", "c", "d", "e"] + partition_columns = ["b", "c", "d"] delta_table = ( DeltaTable.create(spark) From e7c251af7cb12e6547f60209697355307424b264 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 4 Jul 2025 13:14:59 +0200 Subject: [PATCH 07/14] Fix after cherry-pick --- src/Core/SettingsChangesHistory.cpp | 8 -------- .../ObjectStorage/DataLakes/DataLakeConfiguration.h | 2 +- src/Storages/ObjectStorage/StorageObjectStorageSettings.h | 1 + 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 16a01e6ba40e..ab7e037056be 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -70,14 +70,6 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() { // Altinity Antalya modifications atop of 25.3 {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, - {"geotoh3_lon_lat_input_order", true, false, "A new setting for legacy behaviour to set lon and lat order"}, - {"secondary_indices_enable_bulk_filtering", false, true, "A new algorithm for filtering by data skipping indices"}, - {"implicit_table_at_top_level", "", "", "A new setting, used in clickhouse-local"}, - {"use_skip_indexes_if_final_exact_mode", 0, 0, "This setting was introduced to help FINAL query return correct results with skip indexes"}, - {"parallel_replicas_insert_select_local_pipeline", false, false, "Use local pipeline during distributed INSERT SELECT with parallel replicas. Currently disabled due to performance issues"}, - {"page_cache_block_size", 1048576, 1048576, "Made this setting adjustable on a per-query level."}, - {"page_cache_lookahead_blocks", 16, 16, "Made this setting adjustable on a per-query level."}, - {"output_format_pretty_glue_chunks", "0", "auto", "A new setting to make Pretty formats prettier."}, {"allow_experimental_delta_kernel_rs", false, false, "New setting"}, }); addSettingsChanges(settings_changes_history, "25.2.1.20000", diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 4102d780b3a0..eed624100e0c 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -128,7 +128,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl { _assertInitialized(); return (*settings)[DataLakeStorageSetting::allow_dynamic_metadata_for_data_lakes] - && current_metadata->supportsExternalMetadataChange(); + && current_metadata->supportsSchemaEvolution(); } bool supportsFileIterator() const override { return true; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSettings.h b/src/Storages/ObjectStorage/StorageObjectStorageSettings.h index 63d582f66920..76c4cb7dcc06 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSettings.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSettings.h @@ -79,6 +79,7 @@ using StorageObjectStorageSettingsPtr = std::shared_ptr Date: Mon, 7 Jul 2025 12:32:28 +0200 Subject: [PATCH 08/14] Fix settings for iceberg table function --- src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h | 1 - src/TableFunctions/TableFunctionObjectStorage.cpp | 2 +- src/TableFunctions/TableFunctionObjectStorage.h | 2 +- src/TableFunctions/TableFunctionObjectStorageCluster.h | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index eed624100e0c..9eee0d7c328c 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -240,7 +240,6 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration, friend class StorageObjectStorage::Configuration; public: - StorageIcebergConfiguration() = default; explicit StorageIcebergConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {} ObjectStorageType getType() const override { return getImpl().getType(); } diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 7f908e5c9af8..3a71f8aed077 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -275,7 +275,7 @@ template class TableFunctionObjectStorage; #if USE_AVRO -template class TableFunctionObjectStorage; +template class TableFunctionObjectStorage; #endif #if USE_AVRO && USE_AWS_S3 diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index 22ef293948c1..992b50d1eb20 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -204,7 +204,7 @@ using TableFunctionLocal = TableFunctionObjectStorage; +using TableFunctionIceberg = TableFunctionObjectStorage; # if USE_AWS_S3 using TableFunctionIcebergS3 = TableFunctionObjectStorage; diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.h b/src/TableFunctions/TableFunctionObjectStorageCluster.h index 57686d7a1866..e0002d4ed6a1 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.h +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.h @@ -114,7 +114,7 @@ using TableFunctionAzureBlobCluster = TableFunctionObjectStorageCluster; #endif -using TableFunctionIcebergCluster = TableFunctionObjectStorageCluster; +using TableFunctionIcebergCluster = TableFunctionObjectStorageCluster; #if USE_AVRO && USE_AWS_S3 using TableFunctionIcebergS3Cluster = TableFunctionObjectStorageCluster; From a2bbd9627c4c353e708eca9311c1d10f8af83014 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 7 Jul 2025 14:15:59 +0200 Subject: [PATCH 09/14] Fix Iceberg table engine settings --- src/Databases/DataLake/DatabaseDataLakeSettings.cpp | 1 - src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index 345e8d575a1e..54f79692f329 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -27,7 +27,6 @@ namespace ErrorCodes DECLARE(String, aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \ DECLARE(String, region, "", "Region for Glue catalog", 0) \ DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \ - DECLARE(String, object_storage_cluster, "", "Cluster for distributed requests", 0) \ #define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ DATABASE_ICEBERG_RELATED_SETTINGS(M, ALIAS) \ diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h index a87a0a43c935..e63f80088a84 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h @@ -60,6 +60,9 @@ If enabled, the engine would use the metadata file with the most recent last_upd DECLARE(Bool, iceberg_use_version_hint, false, R"( Get latest metadata path from version-hint.text file. )", 0) \ + DECLARE(String, object_storage_cluster, "", R"( +Cluster for distributed requests +)", 0) #define OBSOLETE_SETTINGS(M, ALIAS) \ MAKE_OBSOLETE(M, Bool, allow_experimental_delta_kernel_rs, true) \ From 5cb5cb49ba007629341f31b799b3a7480ba6f40b Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 7 Jul 2025 15:45:04 +0200 Subject: [PATCH 10/14] Fix object_storage_cluster setting --- .../registerStorageObjectStorage.cpp | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp index 30ab6b751241..ee7d6eaaf815 100644 --- a/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/registerStorageObjectStorage.cpp @@ -21,11 +21,6 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } -namespace StorageObjectStorageSetting -{ - extern const StorageObjectStorageSettingsString object_storage_cluster; -} - namespace { @@ -40,12 +35,14 @@ createStorageObjectStorage(const StorageFactory::Arguments & args, StorageObject throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments"); const auto context = args.getLocalContext(); - auto storage_settings = std::make_shared(); - if (args.storage_def->settings) - storage_settings->loadFromQuery(*args.storage_def->settings); + std::string cluster_name = ""; - auto cluster_name = (*storage_settings)[StorageObjectStorageSetting::object_storage_cluster].value; + if (args.storage_def->settings) + { + if (const auto * value = args.storage_def->settings->changes.tryGet("object_storage_cluster")) + cluster_name = value->safeGet(); + } configuration->initialize(args.engine_args, context, false); From 077b83d20a8a15cefa1a818aa6e88ca41c825a3e Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 8 Jul 2025 22:22:35 +0200 Subject: [PATCH 11/14] Fix --- .../integration/test_storage_iceberg/test.py | 39 ------------------- 1 file changed, 39 deletions(-) diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 87bea8d25018..87602309e239 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3156,45 +3156,6 @@ def check_validity_and_get_prunned_files(select_expression): == 1 ) -@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) -def test_explicit_metadata_file(started_cluster, storage_type): - instance = started_cluster.instances["node1"] - spark = started_cluster.spark_session - TABLE_NAME = ( - "test_explicit_metadata_file_" - + storage_type - + "_" - + get_uuid_str() - ) - - spark.sql( - f"CREATE TABLE {TABLE_NAME} (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')" - ) - - for i in range(50): - spark.sql( - f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)" - ) - - default_upload_directory( - started_cluster, - storage_type, - f"/iceberg_data/default/{TABLE_NAME}/", - f"/iceberg_data/default/{TABLE_NAME}/", - ) - - create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="") - - assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 500 - - create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="metadata/v31.metadata.json") - - assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 300 - - create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="metadata/v11.metadata.json") - - assert int(instance.query(f"SELECT count() FROM {TABLE_NAME}")) == 100 - @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) def test_bucket_partition_pruning(started_cluster, storage_type): From 8d7a74b6078af65499c83a0e7c66e3337dfdcc32 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 9 Jul 2025 21:23:50 +0200 Subject: [PATCH 12/14] Fix metadata update --- src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp | 7 +++++++ src/Storages/ObjectStorage/StorageObjectStorageCluster.h | 2 ++ 2 files changed, 9 insertions(+) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 7621e4c34b30..0af9180caad1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -509,4 +509,11 @@ void StorageObjectStorageCluster::addInferredEngineArgsToCreateQuery(ASTs & args configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->getFormat(), context, /*with_structure=*/false); } +bool StorageObjectStorageCluster::updateExternalDynamicMetadataIfExists(ContextPtr context) +{ + if (pure_storage) + return pure_storage->updateExternalDynamicMetadataIfExists(context); + return false; +} + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 8c1694de15ff..e7e4b5a5d5c6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -51,6 +51,8 @@ class StorageObjectStorageCluster : public IStorageCluster void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override; + bool updateExternalDynamicMetadataIfExists(ContextPtr context) override; + std::optional totalRows(ContextPtr query_context) const override; std::optional totalBytes(ContextPtr query_context) const override; From ba0cf5cf3b0c6adcacc730d3d7857ea31006e6ab Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 10 Jul 2025 12:29:44 +0200 Subject: [PATCH 13/14] More fix --- src/Storages/IStorage.h | 2 +- .../ObjectStorage/StorageObjectStorageCluster.cpp | 9 ++++++++- src/Storages/ObjectStorage/StorageObjectStorageCluster.h | 2 ++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index ab49c34ae54d..c2591aa09206 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -191,7 +191,7 @@ class IStorage : public std::enable_shared_from_this, public TypePromo /// Get immutable version (snapshot) of storage metadata. Metadata object is /// multiversion, so it can be concurrently changed, but returned copy can be /// used without any locks. - StorageMetadataPtr getInMemoryMetadataPtr() const { return metadata.get(); } + virtual StorageMetadataPtr getInMemoryMetadataPtr() const { return metadata.get(); } /// Update storage metadata. Used in ALTER or initialization of Storage. /// Metadata object is multiversion, so this method can be called without diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 0af9180caad1..8ab5002d92bd 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -513,7 +513,14 @@ bool StorageObjectStorageCluster::updateExternalDynamicMetadataIfExists(ContextP { if (pure_storage) return pure_storage->updateExternalDynamicMetadataIfExists(context); - return false; + return IStorageCluster::updateExternalDynamicMetadataIfExists(context); +} + +StorageMetadataPtr StorageObjectStorageCluster::getInMemoryMetadataPtr() const +{ + if (pure_storage) + return pure_storage->getInMemoryMetadataPtr(); + return IStorageCluster::getInMemoryMetadataPtr(); } } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index e7e4b5a5d5c6..4b4145bfd456 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -53,6 +53,8 @@ class StorageObjectStorageCluster : public IStorageCluster bool updateExternalDynamicMetadataIfExists(ContextPtr context) override; + StorageMetadataPtr getInMemoryMetadataPtr() const override; + std::optional totalRows(ContextPtr query_context) const override; std::optional totalBytes(ContextPtr query_context) const override; From 2488a68fa550d2b199eb39215f74c43b7feb54b1 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 10 Jul 2025 13:31:24 +0200 Subject: [PATCH 14/14] Lazy init for StorageObjectStorageCluster --- src/Databases/DataLake/DatabaseDataLake.cpp | 3 +- .../StorageObjectStorageCluster.cpp | 71 +++++++++++++++---- .../StorageObjectStorageCluster.h | 3 +- .../TableFunctionObjectStorageCluster.cpp | 6 +- 4 files changed, 64 insertions(+), 19 deletions(-) diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 580dbdcb3068..09b23a5da8d3 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -424,7 +424,8 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con /* comment */"", getFormatSettings(context_copy), LoadingStrictnessLevel::CREATE, - /* partition_by */nullptr); + /* partition_by */nullptr, + /* lazy_init */true); } DatabaseTablesIteratorPtr DatabaseDataLake::getTablesIterator( diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 8ab5002d92bd..a8b229c63b98 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -78,7 +78,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( const String & comment_, std::optional format_settings_, LoadingStrictnessLevel mode_, - ASTPtr partition_by_ + ASTPtr partition_by_, + bool lazy_init ) : IStorageCluster( cluster_name_, table_id_, getLogger(fmt::format("{}({})", configuration_->getEngineName(), table_id_.table_name))) @@ -86,28 +87,68 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( , object_storage(object_storage_) , cluster_name_in_settings(false) { - /// We allow exceptions to be thrown on update(), - /// because Cluster engine can only be used as table function, - /// so no lazy initialization is allowed. - configuration->update( - object_storage, - context_, - /* if_not_updated_before */false, - /* check_consistent_with_previous_metadata */true); + const bool need_resolve_columns_or_format = columns_.empty() || (configuration->getFormat() == "auto"); + const bool need_resolve_sample_path = context_->getSettingsRef()[Setting::use_hive_partitioning] + && !configuration->withPartitionWildcard() + && !configuration->isDataLakeConfiguration(); + const bool do_lazy_init = lazy_init && !need_resolve_columns_or_format && !need_resolve_sample_path; + + auto log_ = getLogger("StorageObjectStorageCluster"); + + bool updated_configuration = false; + try + { + if (!do_lazy_init) + { + /// We allow exceptions to be thrown on update(), + /// because Cluster engine can only be used as table function, + /// so no lazy initialization is allowed. + configuration->update( + object_storage, + context_, + /* if_not_updated_before */false, + /* check_consistent_with_previous_metadata */true); + updated_configuration = true; + } + } + catch (...) + { + // If we don't have format or schema yet, we can't ignore failed configuration update, + // because relevant configuration is crucial for format and schema inference + if (mode_ <= LoadingStrictnessLevel::CREATE || need_resolve_columns_or_format) + { + throw; + } + tryLogCurrentException(log_); + } ColumnsDescription columns{columns_}; std::string sample_path; - resolveSchemaAndFormat(columns, object_storage, configuration, {}, sample_path, context_); + if (need_resolve_columns_or_format) + resolveSchemaAndFormat(columns, object_storage, configuration, {}, sample_path, context_); + else + validateSupportedColumns(columns, *configuration); configuration->check(context_); StorageInMemoryMetadata metadata; metadata.setColumns(columns); metadata.setConstraints(constraints_); - if (sample_path.empty() - && context_->getSettingsRef()[Setting::use_hive_partitioning] - && !configuration->withPartitionWildcard()) - sample_path = getPathSample(metadata, context_); + if (updated_configuration && sample_path.empty() && need_resolve_sample_path) + { + try + { + sample_path = getPathSample(metadata, context_); + } + catch (...) + { + LOG_WARNING( + log_, + "Failed to list object storage, cannot use hive partitioning. " + "Error: {}", + getCurrentExceptionMessage(true)); + } + } setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context_, sample_path)); setInMemoryMetadata(metadata); @@ -125,7 +166,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( /* distributed_processing */false, partition_by_, /* is_table_function */false, - /* lazy_init */false, + /* lazy_init */lazy_init, sample_path); auto virtuals_ = getVirtualsPtr(); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 4b4145bfd456..6d66d8f0c4e5 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -24,7 +24,8 @@ class StorageObjectStorageCluster : public IStorageCluster const String & comment_, std::optional format_settings_, LoadingStrictnessLevel mode_, - ASTPtr partition_by_ = nullptr + ASTPtr partition_by_ = nullptr, + bool lazy_init = false ); std::string getName() const override; diff --git a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp index a6bdb32028a3..c1516a88649f 100644 --- a/src/TableFunctions/TableFunctionObjectStorageCluster.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageCluster.cpp @@ -47,7 +47,8 @@ StoragePtr TableFunctionObjectStorageClusterstartup();