diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 0b4bd1af28cf..d04b9bcf2a71 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5856,7 +5856,7 @@ The maximum number of rows in the right table to determine whether to rerange th DECLARE(Bool, allow_experimental_join_right_table_sorting, false, R"( If it is set to true, and the conditions of `join_to_sort_minimum_perkey_rows` and `join_to_sort_maximum_table_rows` are met, rerange the right table by key to improve the performance in left or inner hash join. )", EXPERIMENTAL) \ - DECLARE(Bool, use_hive_partitioning, false, R"( + DECLARE(Bool, use_hive_partitioning, true, R"( When enabled, ClickHouse will detect Hive-style partitioning in path (`/name=value/`) in file-like table engines [File](../../engines/table-engines/special/file.md#hive-style-partitioning)/[S3](../../engines/table-engines/integrations/s3.md#hive-style-partitioning)/[URL](../../engines/table-engines/special/url.md#hive-style-partitioning)/[HDFS](../../engines/table-engines/integrations/hdfs.md#hive-style-partitioning)/[AzureBlobStorage](../../engines/table-engines/integrations/azureBlobStorage.md#hive-style-partitioning) and will allow to use partition columns as virtual columns in the query. These virtual columns will have the same names as in the partitioned path, but starting with `_`. )", EXPERIMENTAL)\ \ diff --git a/src/Functions/generateSnowflakeID.cpp b/src/Functions/generateSnowflakeID.cpp index c95e3edf4ca6..76f7e252ae16 100644 --- a/src/Functions/generateSnowflakeID.cpp +++ b/src/Functions/generateSnowflakeID.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -147,6 +148,13 @@ struct Data } +uint64_t generateSnowflakeID() +{ + Data data; + SnowflakeId snowflake_id = data.reserveRange(getMachineId(), 1); + return fromSnowflakeId(snowflake_id); +} + class FunctionGenerateSnowflakeID : public IFunction { public: diff --git a/src/Functions/generateSnowflakeID.h b/src/Functions/generateSnowflakeID.h new file mode 100644 index 000000000000..954a919c2dee --- /dev/null +++ b/src/Functions/generateSnowflakeID.h @@ -0,0 +1,10 @@ +#pragma once + +#include + +namespace DB +{ + +uint64_t generateSnowflakeID(); + +} diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 929f5fd2d6cc..12adea5e83c0 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -136,7 +136,7 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query) } return table_function_ptr->execute(query.table_function, current_context, table_function_ptr->getName(), - /* cached_columns */ {}, /* use_global_context */ false, /* is_insert_query */true); + /* cached_columns */ {}, /* use_global_context */ false, &query); } if (query.table_id) diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index d29e4bc130ac..930890b8a0c4 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -53,6 +53,61 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +namespace +{ + + ASTs::const_iterator getValueForNamedArgument(const ASTs & arguments, const std::string & argument_name, Field & value) + { + for (const auto * arg_it = arguments.begin(); arg_it != arguments.end(); ++arg_it) + { + auto argument = *arg_it; + const auto * type_ast_function = argument->as(); + + if (!type_ast_function || type_ast_function->name != "equals" || !type_ast_function->arguments || type_ast_function->arguments->children.size() != 2) + { + continue; + } + + const auto * name = type_ast_function->arguments->children[0]->as(); + + if (name && name->name() == argument_name) + { + const auto * ast_literal = type_ast_function->arguments->children[1]->as(); + + if (!ast_literal) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Wrong parameter type for '{}'", + name->name()); + } + + value = ast_literal->value; + + return arg_it; + } + } + + return arguments.end(); + } + + template + std::optional extractNamedArgumentAndRemoveFromList(ASTs & arguments, const std::string & argument_name) + { + Field value; + const auto * p = getValueForNamedArgument(arguments, argument_name, value); + + if (p == arguments.end()) + { + return std::nullopt; + } + + arguments.erase(p); + + return value.safeGet(); + } +} + static const std::unordered_set required_configuration_keys = { "url", }; @@ -74,7 +129,9 @@ static const std::unordered_set optional_configuration_keys = "max_single_part_upload_size", "max_connections", "expiration_window_seconds", - "no_sign_request" + "no_sign_request", + "partition_strategy", + "hive_partition_strategy_write_partition_columns_into_files" }; String StorageS3Configuration::getDataSourceDescription() const @@ -172,6 +229,9 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect else url = S3::URI(collection.get("url"), settings[Setting::allow_archive_path_syntax]); + partition_strategy = collection.getOrDefault("partition_strategy", "auto"); + hive_partition_strategy_write_partition_columns_into_files = collection.getOrDefault("hive_partition_strategy_write_partition_columns_into_files", false); + auth_settings[S3AuthSetting::access_key_id] = collection.getOrDefault("access_key_id", ""); auth_settings[S3AuthSetting::secret_access_key] = collection.getOrDefault("secret_access_key", ""); auth_settings[S3AuthSetting::use_environment_credentials] = collection.getOrDefault("use_environment_credentials", 1); @@ -191,8 +251,16 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_structure) { - size_t count = StorageURL::evalArgsAndCollectHeaders(args, headers_from_ast, context); + /* + * Calls to `extractNamedArgumentAndRemoveFromList` need to happen before count is determined: + * `size_t count = StorageURL::evalArgsAndCollectHeaders` + * This is because extractNamedArgumentAndRemoveFromList alters the list of arguments + * */ + partition_strategy = extractNamedArgumentAndRemoveFromList(args, "partition_strategy").value_or("auto"); + hive_partition_strategy_write_partition_columns_into_files = extractNamedArgumentAndRemoveFromList(args, "hive_partition_strategy_write_partition_columns_into_files").value_or(false); + size_t count = StorageURL::evalArgsAndCollectHeaders(args, headers_from_ast, context); + if (count == 0 || count > getMaxNumberOfArguments(with_structure)) throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Storage S3 requires 1 to {} arguments. All supported signatures:\n{}", diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 1ca4383a82fc..e28831344f27 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -42,6 +42,7 @@ namespace ErrorCodes extern const int DATABASE_ACCESS_DENIED; extern const int NOT_IMPLEMENTED; extern const int LOGICAL_ERROR; + extern const int BAD_ARGUMENTS; } namespace StorageObjectStorageSetting @@ -139,6 +140,21 @@ StorageObjectStorage::StorageObjectStorage( setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context, sample_path, format_settings)); setInMemoryMetadata(metadata); + + // perhaps it is worth adding some extra safeguards for cases like + // create table s3_table engine=s3('{_partition_id}'); -- partition id wildcard set, but no partition expression + // create table s3_table engine=s3(partition_strategy='hive'); -- partition strategy set, but no partition expression + if (partition_by) + { + partition_strategy = PartitionStrategyFactory::get( + partition_by, + metadata.getSampleBlock(), + context, + configuration->format, + configuration->withPartitionWildcard(), + configuration->partition_strategy, + configuration->hive_partition_strategy_write_partition_columns_into_files); + } } String StorageObjectStorage::getName() const @@ -357,7 +373,7 @@ void StorageObjectStorage::read( } SinkToStoragePtr StorageObjectStorage::write( - const ASTPtr & query, + const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /* async_insert */) @@ -380,22 +396,10 @@ SinkToStoragePtr StorageObjectStorage::write( configuration->getPath()); } - if (configuration->withPartitionWildcard()) + if (partition_strategy) { - ASTPtr partition_by_ast = nullptr; - if (auto insert_query = std::dynamic_pointer_cast(query)) - { - if (insert_query->partition_by) - partition_by_ast = insert_query->partition_by; - else - partition_by_ast = partition_by; - } - - if (partition_by_ast) - { - return std::make_shared( - object_storage, configuration, format_settings, sample_block, local_context, partition_by_ast); - } + return std::make_shared( + partition_strategy, object_storage, configuration, format_settings, sample_block, local_context); } auto paths = configuration->getPaths(); @@ -567,6 +571,10 @@ void StorageObjectStorage::Configuration::initialize( else configuration.fromAST(engine_args, local_context, with_table_structure); + if (configuration.isNamespaceWithGlobs()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Expression can not have wildcards inside {} name", configuration.getNamespaceType()); + if (configuration.format == "auto") { if (configuration.isDataLakeConfiguration()) diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 6b5d6e1d423c..841ff8f803fb 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -18,6 +18,7 @@ namespace DB class ReadBufferIterator; class SchemaCache; class NamedCollection; +struct PartitionStrategy; namespace ErrorCodes { @@ -153,6 +154,7 @@ class StorageObjectStorage : public IStorage const std::optional format_settings; const ASTPtr partition_by; const bool distributed_processing; + std::shared_ptr partition_strategy; LoggerPtr log; }; @@ -244,6 +246,8 @@ class StorageObjectStorage::Configuration String format = "auto"; String compression_method = "auto"; String structure = "auto"; + std::string partition_strategy = "auto"; + bool hive_partition_strategy_write_partition_columns_into_files = false; virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context); void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 3a2bfe8a990b..2d281f8be60f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -92,8 +92,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( if (sample_path.empty() && context_->getSettingsRef()[Setting::use_hive_partitioning]) sample_path = getPathSample(metadata, context_); - setInMemoryMetadata(metadata); setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context_, sample_path)); + setInMemoryMetadata(metadata); pure_storage = std::make_shared( configuration, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp index bd7cee407e62..ab66606b3020 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp @@ -20,6 +20,34 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +namespace +{ + void validateKey(const String & str) + { + /// See: + /// - https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html + /// - https://cloud.ibm.com/apidocs/cos/cos-compatibility#putobject + + if (str.empty() || str.size() > 1024) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect key length (not empty, max 1023 characters), got: {}", str.size()); + + if (!UTF8::isValidUTF8(reinterpret_cast(str.data()), str.size())) + throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in key"); + + PartitionedSink::validatePartitionKey(str, true); + } + + void validateNamespace(const String & str, PartitionedStorageObjectStorageSink::ConfigurationPtr configuration) + { + configuration->validateNamespace(str); + + if (!UTF8::isValidUTF8(reinterpret_cast(str.data()), str.size())) + throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in bucket name"); + + PartitionedSink::validatePartitionKey(str, false); + } +} + StorageObjectStorageSink::StorageObjectStorageSink( ObjectStoragePtr object_storage, ConfigurationPtr configuration, @@ -97,13 +125,13 @@ void StorageObjectStorageSink::cancelBuffers() } PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink( + std::shared_ptr partition_strategy_, ObjectStoragePtr object_storage_, ConfigurationPtr configuration_, std::optional format_settings_, const Block & sample_block_, - ContextPtr context_, - const ASTPtr & partition_by) - : PartitionedSink(partition_by, context_, sample_block_) + ContextPtr context_) + : PartitionedSink(partition_strategy_, context_, sample_block_) , object_storage(object_storage_) , configuration(configuration_) , query_settings(configuration_->getQuerySettings(context_)) @@ -121,51 +149,25 @@ StorageObjectStorageSink::~StorageObjectStorageSink() SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String & partition_id) { - auto partition_bucket = replaceWildcards(configuration->getNamespace(), partition_id); - validateNamespace(partition_bucket); + auto file_path = getPartitionStrategy()->getPath(configuration->getPath(), partition_id); - auto partition_key = replaceWildcards(configuration->getPath(), partition_id); - validateKey(partition_key); + validateNamespace(configuration->getNamespace(), configuration); + validateKey(file_path); if (auto new_key = checkAndGetNewFileOnInsertIfNeeded( - *object_storage, *configuration, query_settings, partition_key, /* sequence_number */1)) + *object_storage, *configuration, query_settings, file_path, /* sequence_number */1)) { - partition_key = *new_key; + file_path = *new_key; } return std::make_shared( object_storage, configuration, format_settings, - sample_block, + getPartitionStrategy()->getBlockWithoutPartitionColumnsIfNeeded(), context, - partition_key + file_path ); } -void PartitionedStorageObjectStorageSink::validateKey(const String & str) -{ - /// See: - /// - https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html - /// - https://cloud.ibm.com/apidocs/cos/cos-compatibility#putobject - - if (str.empty() || str.size() > 1024) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect key length (not empty, max 1023 characters), got: {}", str.size()); - - if (!UTF8::isValidUTF8(reinterpret_cast(str.data()), str.size())) - throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in key"); - - validatePartitionKey(str, true); -} - -void PartitionedStorageObjectStorageSink::validateNamespace(const String & str) -{ - configuration->validateNamespace(str); - - if (!UTF8::isValidUTF8(reinterpret_cast(str.data()), str.size())) - throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in bucket name"); - - validatePartitionKey(str, false); -} - } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.h b/src/Storages/ObjectStorage/StorageObjectStorageSink.h index 97fd3d9b4179..bda52af12b21 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.h @@ -42,19 +42,16 @@ class PartitionedStorageObjectStorageSink : public PartitionedSink using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; PartitionedStorageObjectStorageSink( + std::shared_ptr partition_strategy_, ObjectStoragePtr object_storage_, ConfigurationPtr configuration_, std::optional format_settings_, const Block & sample_block_, - ContextPtr context_, - const ASTPtr & partition_by); + ContextPtr context_); SinkPtr createSinkForPartition(const String & partition_id) override; private: - void validateKey(const String & str); - void validateNamespace(const String & str); - ObjectStoragePtr object_storage; ConfigurationPtr configuration; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 49686ab706e7..568de3c95803 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -126,10 +126,6 @@ std::shared_ptr StorageObjectStorageSourc if (distributed_processing) return std::make_shared(local_context->getReadTaskCallback(), local_context->getSettingsRef()[Setting::max_threads]); - if (configuration->isNamespaceWithGlobs()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Expression can not have wildcards inside {} name", configuration->getNamespaceType()); - const bool is_archive = configuration->isArchive(); configuration->updateIfRequired(object_storage, local_context); @@ -620,10 +616,6 @@ StorageObjectStorageSource::GlobIterator::GlobIterator( , local_context(context_) , file_progress_callback(file_progress_callback_) { - if (configuration->isNamespaceWithGlobs()) - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression can not have wildcards inside namespace name"); - } if (configuration->isPathWithGlobs()) { const auto key_with_globs = configuration_->getPath(); diff --git a/src/Storages/PartitionStrategy.cpp b/src/Storages/PartitionStrategy.cpp new file mode 100644 index 000000000000..94f8795f158d --- /dev/null +++ b/src/Storages/PartitionStrategy.cpp @@ -0,0 +1,262 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +extern const int NOT_IMPLEMENTED; +} + +namespace +{ + Names extractPartitionRequiredColumns(ASTPtr & partition_by, const Block & sample_block, ContextPtr context) + { + auto key_description = KeyDescription::getKeyFromAST(partition_by, ColumnsDescription::fromNamesAndTypes(sample_block.getNamesAndTypes()), context); + return key_description.sample_block.getNames(); + } + + HiveStylePartitionStrategy::PartitionExpressionActionsAndColumnName buildExpressionHive( + ASTPtr partition_by, + const Names & partition_expression_required_columns, + const Block & sample_block, + ContextPtr context) + { + HiveStylePartitionStrategy::PartitionExpressionActionsAndColumnName actions_with_column_name; + ASTs concat_args; + + if (const auto * tuple_function = partition_by->as(); + tuple_function && tuple_function->name == "tuple") + { + chassert(tuple_function->arguments->children.size() == partition_expression_required_columns.size()); + + for (size_t i = 0; i < tuple_function->arguments->children.size(); i++) + { + const auto & child = tuple_function->arguments->children[i]; + + concat_args.push_back(std::make_shared(partition_expression_required_columns[i] + "=")); + + concat_args.push_back(makeASTFunction("toString", child)); + + concat_args.push_back(std::make_shared("/")); + } + } + else + { + chassert(partition_expression_required_columns.size() == 1); + + ASTs to_string_args = {1, partition_by}; + concat_args.push_back(std::make_shared(partition_expression_required_columns[0] + "=")); + concat_args.push_back(makeASTFunction("toString", std::move(to_string_args))); + concat_args.push_back(std::make_shared("/")); + } + + ASTPtr hive_expr = makeASTFunction("concat", std::move(concat_args)); + auto hive_syntax_result = TreeRewriter(context).analyze(hive_expr, sample_block.getNamesAndTypesList()); + actions_with_column_name.actions = ExpressionAnalyzer(hive_expr, hive_syntax_result, context).getActions(false); + actions_with_column_name.column_name = hive_expr->getColumnName(); + + return actions_with_column_name; + } + + Block buildBlockWithoutPartitionColumns( + const Block & sample_block, + const std::unordered_set & partition_expression_required_columns_set) + { + Block result; + for (size_t i = 0; i < sample_block.columns(); i++) + { + if (!partition_expression_required_columns_set.contains(sample_block.getByPosition(i).name)) + { + result.insert(sample_block.getByPosition(i)); + } + } + + return result; + } + + void sanityCheckPartitioningConfiguration( + const std::string & partition_strategy, + bool has_partition_wildcard) + { + static std::unordered_map partition_strategy_to_wildcard_acceptance = + { + {"auto", true}, + {"hive", false} + }; + + if (!partition_strategy_to_wildcard_acceptance.contains(partition_strategy)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Unknown partitioning style '{}'", + partition_strategy); + } + + if (has_partition_wildcard && !partition_strategy_to_wildcard_acceptance.at(partition_strategy)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "The {} wildcard can't be used with {} partitioning style", + PartitionedSink::PARTITION_ID_WILDCARD, partition_strategy); + } + + if (!has_partition_wildcard && partition_strategy_to_wildcard_acceptance.at(partition_strategy)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Partitioning style '{}' requires {} wildcard", + partition_strategy, + PartitionedSink::PARTITION_ID_WILDCARD); + } + } +} + +PartitionStrategy::PartitionStrategy(ASTPtr partition_by_, const Block & sample_block_, ContextPtr context_) +: partition_by(partition_by_), sample_block(sample_block_), context(context_) +{ + +} + +std::shared_ptr PartitionStrategyFactory::get(ASTPtr partition_by, + const Block & sample_block, + ContextPtr context, + const std::string & file_format, + bool has_partition_wildcard, + const std::string & partition_strategy, + bool hive_partition_strategy_write_partition_columns_into_files) +{ + sanityCheckPartitioningConfiguration(partition_strategy, has_partition_wildcard); + + if (partition_strategy == "hive") + { + if (file_format.empty()) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "File format can't be empty for hive style partitioning"); + } + + return std::make_shared( + partition_by, + sample_block, + context, + file_format, + hive_partition_strategy_write_partition_columns_into_files); + } + + return std::make_shared(partition_by, sample_block, context); +} + +StringifiedPartitionStrategy::StringifiedPartitionStrategy(ASTPtr partition_by_, const Block & sample_block_, ContextPtr context_) + : PartitionStrategy(partition_by_, sample_block_, context_) +{ +} + +StringifiedPartitionStrategy::PartitionExpressionActionsAndColumnName StringifiedPartitionStrategy::getExpression() +{ + PartitionExpressionActionsAndColumnName actions_with_column_name; + + ASTs arguments(1, partition_by); + ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); + auto syntax_result = TreeRewriter(context).analyze(partition_by_string, sample_block.getNamesAndTypesList()); + actions_with_column_name.actions = ExpressionAnalyzer(partition_by_string, syntax_result, context).getActions(false); + actions_with_column_name.column_name = partition_by_string->getColumnName(); + + return actions_with_column_name; +} + +std::string StringifiedPartitionStrategy::getPath( + const std::string & prefix, + const std::string & partition_key) +{ + return PartitionedSink::replaceWildcards(prefix, partition_key); +} + +HiveStylePartitionStrategy::HiveStylePartitionStrategy( + ASTPtr partition_by_, + const Block & sample_block_, + ContextPtr context_, + const std::string & file_format_, + bool hive_partition_strategy_write_partition_columns_into_files_) + : PartitionStrategy(partition_by_, sample_block_, context_), + file_format(file_format_), + hive_partition_strategy_write_partition_columns_into_files(hive_partition_strategy_write_partition_columns_into_files_), + partition_expression_required_columns(extractPartitionRequiredColumns(partition_by, sample_block, context)), + partition_expression_required_columns_set(partition_expression_required_columns.begin(), partition_expression_required_columns.end()) +{ + actions_with_column_name = buildExpressionHive(partition_by, partition_expression_required_columns, sample_block, context); + block_without_partition_columns = buildBlockWithoutPartitionColumns(sample_block, partition_expression_required_columns_set); +} + +HiveStylePartitionStrategy::PartitionExpressionActionsAndColumnName HiveStylePartitionStrategy::getExpression() +{ + return actions_with_column_name; +} + +std::string HiveStylePartitionStrategy::getPath( + const std::string & prefix, + const std::string & partition_key) +{ + std::string path; + + if (!prefix.empty()) + { + path += prefix + "/"; + } + + /* + * File extension is toLower(format) + * This isn't ideal, but I guess multiple formats can be specified and introduced. + * So I think it is simpler to keep it this way. + * + * Or perhaps implement something like `IInputFormat::getFileExtension()` + */ + return path + partition_key + "/" + std::to_string(generateSnowflakeID()) + "." + Poco::toLower(file_format); +} + +Chunk HiveStylePartitionStrategy::getChunkWithoutPartitionColumnsIfNeeded(const Chunk & chunk) +{ + Chunk result; + + if (hive_partition_strategy_write_partition_columns_into_files) + { + for (const auto & column : chunk.getColumns()) + { + result.addColumn(column); + } + + return result; + } + + chassert(chunk.getColumns().size() == sample_block.columns()); + + for (size_t i = 0; i < sample_block.columns(); i++) + { + if (!partition_expression_required_columns_set.contains(sample_block.getByPosition(i).name)) + { + result.addColumn(chunk.getColumns()[i]); + } + } + + return result; +} + +Block HiveStylePartitionStrategy::getBlockWithoutPartitionColumnsIfNeeded() +{ + if (hive_partition_strategy_write_partition_columns_into_files) + { + return sample_block; + } + + return block_without_partition_columns; +} + +} diff --git a/src/Storages/PartitionStrategy.h b/src/Storages/PartitionStrategy.h new file mode 100644 index 000000000000..7ca515ecfc3b --- /dev/null +++ b/src/Storages/PartitionStrategy.h @@ -0,0 +1,97 @@ +#pragma once + +#include +#include + +#include + +namespace DB +{ + +struct PartitionStrategy +{ + struct PartitionExpressionActionsAndColumnName + { + ExpressionActionsPtr actions; + std::string column_name; + }; + + PartitionStrategy(ASTPtr partition_by_, const Block & sample_block_, ContextPtr context_); + + virtual ~PartitionStrategy() = default; + + virtual PartitionExpressionActionsAndColumnName getExpression() = 0; + virtual std::string getPath(const std::string & prefix, const std::string & partition_key) = 0; + + /* + * Hive style partition strategy will put partition column keys and values in the filepath itself + * So we need to remove those columns from the chunk. + * + * Default behavior is not to remove, therefore the base class simply returns the same chunk + * */ + virtual Chunk getChunkWithoutPartitionColumnsIfNeeded(const Chunk & chunk) + { + return chunk.clone(); + } + + /* + * Hive style partition strategy will put partition column keys and values in the filepath itself + * So we need to remove those columns from the block. + * + * Default behavior is not to remove, therefore the base class simply returns the same block + * */ + virtual Block getBlockWithoutPartitionColumnsIfNeeded() + { + return sample_block; + } + +protected: + ASTPtr partition_by; + Block sample_block; + ContextPtr context; +}; + +struct PartitionStrategyFactory +{ + static std::shared_ptr get( + ASTPtr partition_by, + const Block & sample_block, + ContextPtr context, + const std::string & file_format, + bool has_partition_wildcard, + const std::string & partition_strategy = "auto", + bool hive_partition_strategy_write_partition_columns_into_files = false); +}; + +struct StringifiedPartitionStrategy : PartitionStrategy +{ + StringifiedPartitionStrategy(ASTPtr partition_by_, const Block & sample_block_, ContextPtr context_); + + PartitionExpressionActionsAndColumnName getExpression() override; + std::string getPath(const std::string & prefix, const std::string & partition_key) override; +}; + +struct HiveStylePartitionStrategy : PartitionStrategy +{ + HiveStylePartitionStrategy( + ASTPtr partition_by_, + const Block & sample_block_, + ContextPtr context_, + const std::string & file_format_, + bool hive_partition_strategy_write_partition_columns_into_files_); + + PartitionExpressionActionsAndColumnName getExpression() override; + std::string getPath(const std::string & prefix, const std::string & partition_key) override; + Chunk getChunkWithoutPartitionColumnsIfNeeded(const Chunk & chunk) override; + Block getBlockWithoutPartitionColumnsIfNeeded() override; + +private: + std::string file_format; + bool hive_partition_strategy_write_partition_columns_into_files; + Names partition_expression_required_columns; + std::unordered_set partition_expression_required_columns_set; + PartitionExpressionActionsAndColumnName actions_with_column_name; + Block block_without_partition_columns; +}; + +} diff --git a/src/Storages/PartitionedSink.cpp b/src/Storages/PartitionedSink.cpp index 0f93d1a5b75c..965d389fafc5 100644 --- a/src/Storages/PartitionedSink.cpp +++ b/src/Storages/PartitionedSink.cpp @@ -3,10 +3,11 @@ #include "PartitionedSink.h" #include +#include +#include #include -#include -#include +#include #include @@ -23,19 +24,17 @@ namespace ErrorCodes } PartitionedSink::PartitionedSink( - const ASTPtr & partition_by, + std::shared_ptr partition_strategy_, ContextPtr context_, const Block & sample_block_) : SinkToStorage(sample_block_) + , partition_strategy(partition_strategy_) , context(context_) , sample_block(sample_block_) { - ASTs arguments(1, partition_by); - ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); - - auto syntax_result = TreeRewriter(context).analyze(partition_by_string, sample_block.getNamesAndTypesList()); - partition_by_expr = ExpressionAnalyzer(partition_by_string, syntax_result, context).getActions(false); - partition_by_column_name = partition_by_string->getColumnName(); + auto actions_with_column_name = partition_strategy->getExpression(); + partition_by_expr = actions_with_column_name.actions; + partition_by_column_name = actions_with_column_name.column_name; } @@ -51,16 +50,20 @@ SinkPtr PartitionedSink::getSinkForPartitionKey(StringRef partition_key) return it->second; } -void PartitionedSink::consume(Chunk & chunk) +void PartitionedSink::consume(Chunk & input_chunk) { - const auto & columns = chunk.getColumns(); - Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); - block_with_partition_by_expr.setColumns(columns); + block_with_partition_by_expr.setColumns(input_chunk.getColumns()); partition_by_expr->execute(block_with_partition_by_expr); const auto * partition_by_result_column = block_with_partition_by_expr.getByName(partition_by_column_name).column.get(); + /* + * `hive_partition_strategy_write_partition_columns_into_files` + */ + const auto chunk = partition_strategy->getChunkWithoutPartitionColumnsIfNeeded(input_chunk); + const auto & columns_to_consume = chunk.getColumns(); + size_t chunk_rows = chunk.getNumRows(); chunk_row_index_to_partition_index.resize(chunk_rows); @@ -76,7 +79,7 @@ void PartitionedSink::consume(Chunk & chunk) chunk_row_index_to_partition_index[row] = it->getMapped(); } - size_t columns_size = columns.size(); + size_t columns_size = columns_to_consume.size(); size_t partitions_size = partition_id_to_chunk_index.size(); Chunks partition_index_to_chunk; @@ -84,7 +87,7 @@ void PartitionedSink::consume(Chunk & chunk) for (size_t column_index = 0; column_index < columns_size; ++column_index) { - MutableColumns partition_index_to_column_split = columns[column_index]->scatter(partitions_size, chunk_row_index_to_partition_index); + MutableColumns partition_index_to_column_split = columns_to_consume[column_index]->scatter(partitions_size, chunk_row_index_to_partition_index); /// Add chunks into partition_index_to_chunk with sizes of result columns if (column_index == 0) @@ -108,6 +111,11 @@ void PartitionedSink::consume(Chunk & chunk) } } +std::shared_ptr PartitionedSink::getPartitionStrategy() +{ + return partition_strategy; +} + void PartitionedSink::onException(std::exception_ptr exception) { for (auto & [_, sink] : partition_id_to_sink) diff --git a/src/Storages/PartitionedSink.h b/src/Storages/PartitionedSink.h index 6487eaecfd12..487a08d93342 100644 --- a/src/Storages/PartitionedSink.h +++ b/src/Storages/PartitionedSink.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -16,7 +17,10 @@ class PartitionedSink : public SinkToStorage public: static constexpr auto PARTITION_ID_WILDCARD = "{_partition_id}"; - PartitionedSink(const ASTPtr & partition_by, ContextPtr context_, const Block & sample_block_); + PartitionedSink( + std::shared_ptr partition_strategy_, + ContextPtr context_, + const Block & sample_block_); ~PartitionedSink() override; @@ -34,7 +38,10 @@ class PartitionedSink : public SinkToStorage static String replaceWildcards(const String & haystack, const String & partition_id); + std::shared_ptr getPartitionStrategy(); + private: + std::shared_ptr partition_strategy; ContextPtr context; Block sample_block; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index ba29d02e5b35..498b44e08a20 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1896,7 +1896,7 @@ class PartitionedStorageFileSink : public PartitionedSink { public: PartitionedStorageFileSink( - const ASTPtr & partition_by, + std::shared_ptr partition_strategy_, const StorageMetadataPtr & metadata_snapshot_, const String & table_name_for_log_, std::unique_lock && lock_, @@ -1907,7 +1907,7 @@ class PartitionedStorageFileSink : public PartitionedSink const String format_name_, ContextPtr context_, int flags_) - : PartitionedSink(partition_by, context_, metadata_snapshot_->getSampleBlock()) + : PartitionedSink(partition_strategy_, context_, metadata_snapshot_->getSampleBlock()) , path(path_) , metadata_snapshot(metadata_snapshot_) , table_name_for_log(table_name_for_log_) @@ -1923,19 +1923,19 @@ class PartitionedStorageFileSink : public PartitionedSink SinkPtr createSinkForPartition(const String & partition_id) override { - auto partition_path = PartitionedSink::replaceWildcards(path, partition_id); + std::string filepath = getPartitionStrategy()->getPath(path, partition_id); - fs::create_directories(fs::path(partition_path).parent_path()); + fs::create_directories(fs::path(filepath).parent_path()); - PartitionedSink::validatePartitionKey(partition_path, true); - checkCreationIsAllowed(context, context->getUserFilesPath(), partition_path, /*can_be_directory=*/ true); + PartitionedSink::validatePartitionKey(filepath, true); + checkCreationIsAllowed(context, context->getUserFilesPath(), filepath, /*can_be_directory=*/ true); return std::make_shared( metadata_snapshot, table_name_for_log, -1, /* use_table_fd */false, base_path, - partition_path, + filepath, compression_method, format_settings, format_name, @@ -1985,8 +1985,10 @@ SinkToStoragePtr StorageFile::write( if (path_for_partitioned_write.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty path for partitioned write"); + auto partition_strategy = PartitionStrategyFactory::get(insert_query->partition_by, metadata_snapshot->getSampleBlock(), context, format_name, has_wildcards); + return std::make_shared( - insert_query->partition_by, + partition_strategy, metadata_snapshot, getStorageID().getNameForLogs(), std::unique_lock{rwlock, getLockTimeout(context)}, diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 382e2cfc658e..86e47d28eecd 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -693,7 +693,7 @@ class PartitionedStorageURLSink : public PartitionedSink { public: PartitionedStorageURLSink( - const ASTPtr & partition_by, + std::shared_ptr partition_strategy_, const String & uri_, const String & format_, const std::optional & format_settings_, @@ -703,7 +703,7 @@ class PartitionedStorageURLSink : public PartitionedSink const CompressionMethod compression_method_, const HTTPHeaderEntries & headers_, const String & http_method_) - : PartitionedSink(partition_by, context_, sample_block_) + : PartitionedSink(partition_strategy_, context_, sample_block_) , uri(uri_) , format(format_) , format_settings(format_settings_) @@ -718,7 +718,8 @@ class PartitionedStorageURLSink : public PartitionedSink SinkPtr createSinkForPartition(const String & partition_id) override { - auto partition_path = PartitionedSink::replaceWildcards(uri, partition_id); + std::string partition_path = getPartitionStrategy()->getPath(uri, partition_id); + context->getRemoteHostFilter().checkURL(Poco::URI(partition_path)); return std::make_shared( partition_path, format, format_settings, sample_block, context, timeouts, compression_method, headers, http_method); @@ -1347,8 +1348,10 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad if (is_partitioned_implementation) { + auto partition_strategy = PartitionStrategyFactory::get(partition_by_ast, metadata_snapshot->getSampleBlock(), context, format_name, has_wildcards); + return std::make_shared( - partition_by_ast, + partition_strategy, uri, format_name, format_settings, diff --git a/src/TableFunctions/ITableFunction.cpp b/src/TableFunctions/ITableFunction.cpp index 916ff7ec0222..342cfa8a40e6 100644 --- a/src/TableFunctions/ITableFunction.cpp +++ b/src/TableFunctions/ITableFunction.cpp @@ -20,28 +20,28 @@ AccessType ITableFunction::getSourceAccessType() const } StoragePtr ITableFunction::execute(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, - ColumnsDescription cached_columns, bool use_global_context, bool is_insert_query) const + ColumnsDescription cached_columns, bool use_global_context, ASTInsertQuery * insert_query) const { ProfileEvents::increment(ProfileEvents::TableFunctionExecute); AccessFlags required_access = getSourceAccessType(); auto table_function_properties = TableFunctionFactory::instance().tryGetProperties(getName()); - if (is_insert_query || !(table_function_properties && table_function_properties->allow_readonly)) + if (insert_query || !(table_function_properties && table_function_properties->allow_readonly)) required_access |= AccessType::CREATE_TEMPORARY_TABLE; context->checkAccess(required_access); auto context_to_use = use_global_context ? context->getGlobalContext() : context; if (cached_columns.empty()) - return executeImpl(ast_function, context, table_name, std::move(cached_columns), is_insert_query); + return executeImpl(ast_function, context, table_name, std::move(cached_columns), insert_query); - if (hasStaticStructure() && cached_columns == getActualTableStructure(context, is_insert_query)) - return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns), is_insert_query); + if (hasStaticStructure() && cached_columns == getActualTableStructure(context, insert_query)) + return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns), insert_query); auto this_table_function = shared_from_this(); auto get_storage = [=]() -> StoragePtr { - return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns, is_insert_query); + return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns, insert_query); }; /// It will request actual table structure and create underlying storage lazily diff --git a/src/TableFunctions/ITableFunction.h b/src/TableFunctions/ITableFunction.h index ed7f80e5df97..d9ca9f1c9ea3 100644 --- a/src/TableFunctions/ITableFunction.h +++ b/src/TableFunctions/ITableFunction.h @@ -15,6 +15,7 @@ namespace DB { class Context; +class ASTInsertQuery; /** Interface for table functions. * @@ -80,7 +81,7 @@ class ITableFunction : public std::enable_shared_from_this /// Create storage according to the query. StoragePtr - execute(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns_ = {}, bool use_global_context = false, bool is_insert = false) const; + execute(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns_ = {}, bool use_global_context = false, ASTInsertQuery * insert_query = nullptr) const; virtual ~ITableFunction() = default; @@ -91,6 +92,12 @@ class ITableFunction : public std::enable_shared_from_this virtual StoragePtr executeImpl( const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const = 0; + virtual StoragePtr executeImpl( + const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, ASTInsertQuery * insert_query = nullptr) const + { + return executeImpl(ast_function, context, table_name, cached_columns, insert_query != nullptr); + } + virtual const char * getStorageTypeName() const = 0; }; diff --git a/src/TableFunctions/TableFunctionObjectStorage.cpp b/src/TableFunctions/TableFunctionObjectStorage.cpp index 7207c500b96a..f22681fbca6d 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.cpp +++ b/src/TableFunctions/TableFunctionObjectStorage.cpp @@ -12,6 +12,8 @@ #include +#include + #include #include #include @@ -126,6 +128,40 @@ StoragePtr TableFunctionObjectStorage::executeImpl( return storage; } +template +StoragePtr TableFunctionObjectStorage::executeImpl( + const ASTPtr & /* ast_function */, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns, + ASTInsertQuery * insert_query) const +{ + ColumnsDescription columns; + chassert(configuration); + if (configuration->structure != "auto") + columns = parseColumnsListFromString(configuration->structure, context); + else if (!structure_hint.empty()) + columns = structure_hint; + else if (!cached_columns.empty()) + columns = cached_columns; + + StoragePtr storage = std::make_shared( + configuration, + getObjectStorage(context, !insert_query), + context, + StorageID(getDatabaseName(), table_name), + columns, + ConstraintsDescription{}, + String{}, + /* format_settings */ std::nullopt, + /* mode */ LoadingStrictnessLevel::CREATE, + /* distributed_processing */ false, + insert_query ? insert_query->partition_by : nullptr); + + storage->startup(); + return storage; +} + void registerTableFunctionObjectStorage(TableFunctionFactory & factory) { UNUSED(factory); diff --git a/src/TableFunctions/TableFunctionObjectStorage.h b/src/TableFunctions/TableFunctionObjectStorage.h index f77b95e93c07..c9222ff01d62 100644 --- a/src/TableFunctions/TableFunctionObjectStorage.h +++ b/src/TableFunctions/TableFunctionObjectStorage.h @@ -152,6 +152,13 @@ class TableFunctionObjectStorage : public ITableFunction ColumnsDescription cached_columns, bool is_insert_query) const override; + StoragePtr executeImpl( + const ASTPtr & ast_function, + ContextPtr context, + const std::string & table_name, + ColumnsDescription cached_columns, + ASTInsertQuery * insert_query) const override; + const char * getStorageTypeName() const override { return Definition::storage_type_name; } ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override; diff --git a/tests/queries/0_stateless/01944_insert_partition_by.sql b/tests/queries/0_stateless/01944_insert_partition_by.sql index ac38fcee4905..03bbd17b8ce7 100644 --- a/tests/queries/0_stateless/01944_insert_partition_by.sql +++ b/tests/queries/0_stateless/01944_insert_partition_by.sql @@ -7,5 +7,3 @@ INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/test_{_partition_id}.cs INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/test_{_partition_id}.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, 'abc}{abc'); -- { serverError CANNOT_PARSE_TEXT } INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/test_{_partition_id}.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, 'abc*abc'); -- { serverError CANNOT_PARSE_TEXT } INSERT INTO TABLE FUNCTION s3('http://localhost:9001/foo/{_partition_id}', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, ''); -- { serverError BAD_ARGUMENTS } -INSERT INTO TABLE FUNCTION s3('http://localhost:9001/{_partition_id}/key.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, ''); -- { serverError BAD_ARGUMENTS } -INSERT INTO TABLE FUNCTION s3('http://localhost:9001/{_partition_id}/key.csv', 'admin', 'admin', 'CSV', 'id Int32, val String') PARTITION BY val VALUES (1, 'aa/bb'); -- { serverError CANNOT_PARSE_TEXT } diff --git a/tests/queries/0_stateless/03363_hive_style_partition_write.reference b/tests/queries/0_stateless/03363_hive_style_partition_write.reference new file mode 100644 index 000000000000..bad18852bf33 --- /dev/null +++ b/tests/queries/0_stateless/03363_hive_style_partition_write.reference @@ -0,0 +1,33 @@ +test/t_03363_parquet/year=2022/country=USA/.parquet 1 +test/t_03363_parquet/year=2022/country=Canada/.parquet 2 +test/t_03363_parquet/year=2023/country=USA/.parquet 3 +test/t_03363_parquet/year=2023/country=Mexico/.parquet 4 +test/t_03363_parquet/year=2024/country=France/.parquet 5 +test/t_03363_parquet/year=2024/country=Germany/.parquet 6 +test/t_03363_parquet/year=2024/country=Germany/.parquet 7 +test/t_03363_parquet/year=1999/country=Brazil/.parquet 8 +test/t_03363_parquet/year=2100/country=Japan/.parquet 9 +test/t_03363_parquet/year=2024/country=CN/.parquet 10 +test/t_03363_csv/year=2022/country=USA/.csv 1 +test/t_03363_csv/year=2022/country=Canada/.csv 2 +test/t_03363_csv/year=2023/country=USA/.csv 3 +test/t_03363_csv/year=2023/country=Mexico/.csv 4 +test/t_03363_csv/year=2024/country=France/.csv 5 +test/t_03363_csv/year=2024/country=Germany/.csv 6 +test/t_03363_csv/year=2024/country=Germany/.csv 7 +test/t_03363_csv/year=1999/country=Brazil/.csv 8 +test/t_03363_csv/year=2100/country=Japan/.csv 9 +test/t_03363_csv/year=2024/country=CN/.csv 10 +test/t_03363_function/year=2022/country=USA/.parquet 1 +test/t_03363_function/year=2022/country=Canada/.parquet 2 +test/t_03363_function/year=2023/country=USA/.parquet 3 +test/t_03363_function/year=2023/country=Mexico/.parquet 4 +test/t_03363_function/year=2024/country=France/.parquet 5 +test/t_03363_function/year=2024/country=Germany/.parquet 6 +test/t_03363_function/year=2024/country=Germany/.parquet 7 +test/t_03363_function/year=1999/country=Brazil/.parquet 8 +test/t_03363_function/year=2100/country=Japan/.parquet 9 +test/t_03363_function/year=2024/country=CN/.parquet 10 +1 +3 +USA 2022 1 diff --git a/tests/queries/0_stateless/03363_hive_style_partition_write.sql b/tests/queries/0_stateless/03363_hive_style_partition_write.sql new file mode 100644 index 000000000000..9211d8b1018c --- /dev/null +++ b/tests/queries/0_stateless/03363_hive_style_partition_write.sql @@ -0,0 +1,76 @@ +-- Tags: no-parallel, no-fasttest, no-random-settings + +DROP TABLE IF EXISTS t_03363_parquet, t_03363_parquet_read, t_03363_csv, t_03363_csv_read; + +CREATE TABLE t_03363_parquet (year UInt16, country String, counter UInt8) +ENGINE = S3(s3_conn, filename = 't_03363_parquet', format = Parquet, partition_strategy='hive') +PARTITION BY (year, country); + +INSERT INTO t_03363_parquet VALUES + (2022, 'USA', 1), + (2022, 'Canada', 2), + (2023, 'USA', 3), + (2023, 'Mexico', 4), + (2024, 'France', 5), + (2024, 'Germany', 6), + (2024, 'Germany', 7), + (1999, 'Brazil', 8), + (2100, 'Japan', 9), + (2024, 'CN', 10); + +-- while reading from object storage partitioned table is not supported, an auxiliary table must be created +CREATE TABLE t_03363_parquet_read (year UInt16, country String, counter UInt8) +ENGINE = S3(s3_conn, filename = 't_03363_parquet/**.parquet', format = Parquet); + +-- distinct because minio isn't cleaned up +select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.parquet', '/.parquet') AS _path, counter from t_03363_parquet_read order by counter SETTINGS use_hive_partitioning=1, input_format_parquet_use_native_reader=0; + +-- CSV test +CREATE TABLE t_03363_csv (year UInt16, country String, counter UInt8) +ENGINE = S3(s3_conn, filename = 't_03363_csv', format = CSV, partition_strategy='hive') +PARTITION BY (year, country); + +INSERT INTO t_03363_csv VALUES + (2022, 'USA', 1), + (2022, 'Canada', 2), + (2023, 'USA', 3), + (2023, 'Mexico', 4), + (2024, 'France', 5), + (2024, 'Germany', 6), + (2024, 'Germany', 7), + (1999, 'Brazil', 8), + (2100, 'Japan', 9), + (2024, 'CN', 10); + +CREATE TABLE t_03363_csv_read (year UInt16, country String, counter UInt8) +ENGINE = S3(s3_conn, filename = 't_03363_csv/**.csv', format = CSV); + +select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.csv', '/.csv') AS _path, counter from t_03363_csv_read order by counter; + +-- s3 table function +INSERT INTO FUNCTION s3(s3_conn, filename='t_03363_function', format=Parquet, partition_strategy='hive') PARTITION BY (year, country) SELECT country, year, counter FROM t_03363_parquet_read; +select distinct on (counter) replaceRegexpAll(_path, '/[0-9]+\\.parquet', '/.parquet') AS _path, counter from s3(s3_conn, filename='t_03363_function/**.parquet') order by counter; + +-- should output 1 because partition columns are not written down to the file by default when hive style is being used +select num_columns from s3(s3_conn, filename='t_03363_function/**.parquet', format=ParquetMetadata) limit 1; + +INSERT INTO FUNCTION s3(s3_conn, filename='t_03363_function_write_down_partition_columns', format=Parquet, partition_strategy='hive', hive_partition_strategy_write_partition_columns_into_files=1) PARTITION BY (year, country) SELECT country, year, counter FROM t_03363_parquet_read; +select num_columns from s3(s3_conn, filename='t_03363_function_write_down_partition_columns/**.parquet', format=ParquetMetadata) limit 1; +select * from s3(s3_conn, filename='t_03363_function_write_down_partition_columns/**.parquet', format=Parquet) order by counter limit 1 SETTINGS use_hive_partitioning=0; + +-- hive with partition id placeholder +CREATE TABLE t_03363_s3_sink (year UInt16, country String, counter UInt8) +ENGINE = S3(s3_conn, filename = 't_03363_parquet/{_partition_id}', format = Parquet, partition_strategy='hive') +PARTITION BY (year, country); -- {serverError BAD_ARGUMENTS}; + +-- unknown partitioning style +CREATE TABLE t_03363_s3_sink (year UInt16, country String, counter UInt8) +ENGINE = S3(s3_conn, filename = 't_03363_parquet', format = Parquet, partition_strategy='abc') +PARTITION BY (year, country); -- {serverError BAD_ARGUMENTS}; + +-- auto partitioning style without partition_id wildcard +CREATE TABLE t_03363_s3_sink (year UInt16, country String, counter UInt8) +ENGINE = S3(s3_conn, filename = 't_03363_parquet', format = Parquet) +PARTITION BY (year, country); -- {serverError BAD_ARGUMENTS}; + +DROP TABLE IF EXISTS t_03363_parquet, t_03363_parquet_read, t_03363_csv, t_03363_csv_read; diff --git a/tests/queries/0_stateless/03364_s3_globbed_path_in_bucket_portion.reference b/tests/queries/0_stateless/03364_s3_globbed_path_in_bucket_portion.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03364_s3_globbed_path_in_bucket_portion.sql b/tests/queries/0_stateless/03364_s3_globbed_path_in_bucket_portion.sql new file mode 100644 index 000000000000..4b2ee57c7630 --- /dev/null +++ b/tests/queries/0_stateless/03364_s3_globbed_path_in_bucket_portion.sql @@ -0,0 +1,30 @@ +-- Tags: no-fasttest +-- virtual hosted style +create table s3_03364 (id UInt32) engine=S3('http://{_partition_id}.s3.region.amazonaws.com/key'); -- {serverError BAD_ARGUMENTS} +create table s3_03364 (id UInt32) engine=S3('http://{_partition_id}something.s3.region.amazonaws.com/key'); -- {serverError BAD_ARGUMENTS} + +select * from s3('http://{_partition_id}.s3.region.amazonaws.com/key', 'Parquet'); -- {serverError BAD_ARGUMENTS} +select * from s3('http://{_partition_id}something.s3.region.amazonaws.com/key', 'Parquet'); -- {serverError BAD_ARGUMENTS} + +insert into table function s3('http://{_partition_id}.s3.region.amazonaws.com/key', 'NOSIGN', 'Parquet') select * from numbers(5); -- {serverError BAD_ARGUMENTS} +insert into table function s3('http://{_partition_id}something.s3.region.amazonaws.com/key', 'NOSIGN', 'Parquet') select * from numbers(5); -- {serverError BAD_ARGUMENTS} + +-- path style +create table s3_03364 (id UInt32) engine=S3('http://s3.region.amazonaws.com/{_partition_id}'); -- {serverError BAD_ARGUMENTS} +create table s3_03364 (id UInt32) engine=S3('http://s3.region.amazonaws.com/{_partition_id}/key'); -- {serverError BAD_ARGUMENTS} + +select * from s3('http://s3.region.amazonaws.com/{_partition_id}', 'Parquet'); -- {serverError BAD_ARGUMENTS} +select * from s3('http://s3.region.amazonaws.com/{_partition_id}/key', 'Parquet'); -- {serverError BAD_ARGUMENTS} + +insert into table function s3('http://s3.region.amazonaws.com/{_partition_id}', 'NOSIGN', 'Parquet') select * from numbers(5); -- {serverError BAD_ARGUMENTS} +insert into table function s3('http://s3.region.amazonaws.com/{_partition_id}/key', 'NOSIGN', 'Parquet') select * from numbers(5); -- {serverError BAD_ARGUMENTS} + +-- aws private link style +create table s3_03364 (id UInt32) engine=S3('http://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/{_partition_id}'); -- {serverError BAD_ARGUMENTS} +create table s3_03364 (id UInt32) engine=S3('http://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/{_partition_id}/key'); -- {serverError BAD_ARGUMENTS} + +select * from s3('http://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/{_partition_id}', 'Parquet'); -- {serverError BAD_ARGUMENTS} +select * from s3('http://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/{_partition_id}/key', 'Parquet'); -- {serverError BAD_ARGUMENTS} + +insert into table function s3('http://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/{_partition_id}', 'NOSIGN', 'Parquet') select * from numbers(5); -- {serverError BAD_ARGUMENTS} +insert into table function s3('http://bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/{_partition_id}/key', 'NOSIGN', 'Parquet') select * from numbers(5); -- {serverError BAD_ARGUMENTS}