Skip to content

S3 hive style writes #697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 43 commits into
base: antalya
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
152fc42
Add suport for s3 hive partition style writes
arthurpassos Feb 26, 2025
9af41a7
storageurl and storagefile
arthurpassos Feb 26, 2025
1aa7db4
simplify code
arthurpassos Feb 26, 2025
4f2ef27
reduce changes
arthurpassos Feb 26, 2025
301e61e
add tests for s3, enforce some rules
arthurpassos Mar 3, 2025
7d25cce
some refactoring
arthurpassos Mar 5, 2025
f3f7e9a
extern not_implemented
arthurpassos Mar 5, 2025
6a75897
copy sample bock
arthurpassos Mar 5, 2025
839af35
focus on engine s3 only, new argument to control partition style
arthurpassos Mar 6, 2025
c8805b5
small adjustments
arthurpassos Mar 6, 2025
1926495
tests
arthurpassos Mar 6, 2025
33b61d4
comment out test until we figure out what to do with that syntax
arthurpassos Mar 9, 2025
35d875a
fix tests
arthurpassos Mar 9, 2025
fc546c9
fix
arthurpassos Mar 20, 2025
f9777a5
conflicts
arthurpassos Mar 20, 2025
de868f3
do not allow partition_id to be specified in the bucket portion
arthurpassos Mar 21, 2025
b4bb1a2
add tests to validate partition id macro isn't used where it is not s…
arthurpassos Mar 24, 2025
4622f9e
remove no longer needed test
arthurpassos Mar 24, 2025
3ca53e1
some silly implementation for formatToFileExtension
arthurpassos Mar 24, 2025
82ee821
move check for wildcard inside initialize method
arthurpassos Mar 24, 2025
d9409cc
remove some unrelated changes
arthurpassos Mar 24, 2025
b1bca06
add some sanity checks for partitioning configuration, tests for inse…
arthurpassos Mar 27, 2025
3f65f27
extract named argument partitioning_style
arthurpassos Mar 28, 2025
7ae3743
address pr comments
arthurpassos Mar 28, 2025
35e9c18
remove unnecessary namespace qualifiers
arthurpassos Mar 28, 2025
1ae8b93
add no fast test to test
arthurpassos Mar 29, 2025
abdd84a
write_partition_columns_into_files argument
arthurpassos Apr 2, 2025
b3b14e6
Merge branch 'antalya' into s3_hive_style_writes
arthurpassos Apr 2, 2025
6a6c9a7
Revert "write_partition_columns_into_files argument"
arthurpassos Apr 2, 2025
8b73bb5
Reapply "write_partition_columns_into_files argument"
arthurpassos Apr 2, 2025
1236609
updt
arthurpassos Apr 2, 2025
3e356bc
test write_partition_columns_into_files
arthurpassos Apr 2, 2025
0a8bacb
fix ub
arthurpassos Apr 2, 2025
043ce22
refactor extractNamedArgumentAndRemoveFromList
arthurpassos Apr 3, 2025
3f93ac0
address a few pr comments
arthurpassos Apr 3, 2025
db9bfd0
address some more comments
arthurpassos Apr 3, 2025
0a31b75
simmplify extractPartitionRequiredColumns
arthurpassos Apr 3, 2025
1dc7f9d
rename setting
arthurpassos Apr 3, 2025
baefb50
revert some small changes on storagefile
arthurpassos Apr 3, 2025
1ac2a9c
some intermediate docs
arthurpassos Apr 3, 2025
7e8b4d7
add missing test file updt
arthurpassos Apr 3, 2025
ef99792
simplify sanity check of partition config
arthurpassos Apr 3, 2025
be49e03
default partition strategy in factory
arthurpassos Apr 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5856,7 +5856,7 @@ The maximum number of rows in the right table to determine whether to rerange th
DECLARE(Bool, allow_experimental_join_right_table_sorting, false, R"(
If it is set to true, and the conditions of `join_to_sort_minimum_perkey_rows` and `join_to_sort_maximum_table_rows` are met, rerange the right table by key to improve the performance in left or inner hash join.
)", EXPERIMENTAL) \
DECLARE(Bool, use_hive_partitioning, false, R"(
DECLARE(Bool, use_hive_partitioning, true, R"(
When enabled, ClickHouse will detect Hive-style partitioning in path (`/name=value/`) in file-like table engines [File](../../engines/table-engines/special/file.md#hive-style-partitioning)/[S3](../../engines/table-engines/integrations/s3.md#hive-style-partitioning)/[URL](../../engines/table-engines/special/url.md#hive-style-partitioning)/[HDFS](../../engines/table-engines/integrations/hdfs.md#hive-style-partitioning)/[AzureBlobStorage](../../engines/table-engines/integrations/azureBlobStorage.md#hive-style-partitioning) and will allow to use partition columns as virtual columns in the query. These virtual columns will have the same names as in the partitioned path, but starting with `_`.
)", EXPERIMENTAL)\
\
Expand Down
8 changes: 8 additions & 0 deletions src/Functions/generateSnowflakeID.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <Functions/generateSnowflakeID.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsRandom.h>
Expand Down Expand Up @@ -147,6 +148,13 @@ struct Data

}

uint64_t generateSnowflakeID()
{
Data data;
SnowflakeId snowflake_id = data.reserveRange(getMachineId(), 1);
return fromSnowflakeId(snowflake_id);
}

class FunctionGenerateSnowflakeID : public IFunction
{
public:
Expand Down
10 changes: 10 additions & 0 deletions src/Functions/generateSnowflakeID.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once

#include <cstdint>

namespace DB
{

uint64_t generateSnowflakeID();

}
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
}

return table_function_ptr->execute(query.table_function, current_context, table_function_ptr->getName(),
/* cached_columns */ {}, /* use_global_context */ false, /* is_insert_query */true);
/* cached_columns */ {}, /* use_global_context */ false, &query);
}

if (query.table_id)
Expand Down
72 changes: 70 additions & 2 deletions src/Storages/ObjectStorage/S3/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,61 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

namespace
{

ASTs::const_iterator getValueForNamedArgument(const ASTs & arguments, const std::string & argument_name, Field & value)
{
for (const auto * arg_it = arguments.begin(); arg_it != arguments.end(); ++arg_it)
{
auto argument = *arg_it;
const auto * type_ast_function = argument->as<ASTFunction>();

if (!type_ast_function || type_ast_function->name != "equals" || !type_ast_function->arguments || type_ast_function->arguments->children.size() != 2)
{
continue;
}

const auto * name = type_ast_function->arguments->children[0]->as<ASTIdentifier>();

if (name && name->name() == argument_name)
{
const auto * ast_literal = type_ast_function->arguments->children[1]->as<ASTLiteral>();

if (!ast_literal)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Wrong parameter type for '{}'",
name->name());
}

value = ast_literal->value;

return arg_it;
}
}

return arguments.end();
}

template <typename T>
std::optional<T> extractNamedArgumentAndRemoveFromList(ASTs & arguments, const std::string & argument_name)
{
Field value;
const auto * p = getValueForNamedArgument(arguments, argument_name, value);

if (p == arguments.end())
{
return std::nullopt;
}

arguments.erase(p);

return value.safeGet<T>();
}
}

static const std::unordered_set<std::string_view> required_configuration_keys = {
"url",
};
Expand All @@ -74,7 +129,9 @@ static const std::unordered_set<std::string_view> optional_configuration_keys =
"max_single_part_upload_size",
"max_connections",
"expiration_window_seconds",
"no_sign_request"
"no_sign_request",
"partition_strategy",
"hive_partition_strategy_write_partition_columns_into_files"
};

String StorageS3Configuration::getDataSourceDescription() const
Expand Down Expand Up @@ -172,6 +229,9 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect
else
url = S3::URI(collection.get<String>("url"), settings[Setting::allow_archive_path_syntax]);

partition_strategy = collection.getOrDefault<String>("partition_strategy", "auto");
hive_partition_strategy_write_partition_columns_into_files = collection.getOrDefault<bool>("hive_partition_strategy_write_partition_columns_into_files", false);

auth_settings[S3AuthSetting::access_key_id] = collection.getOrDefault<String>("access_key_id", "");
auth_settings[S3AuthSetting::secret_access_key] = collection.getOrDefault<String>("secret_access_key", "");
auth_settings[S3AuthSetting::use_environment_credentials] = collection.getOrDefault<UInt64>("use_environment_credentials", 1);
Expand All @@ -191,8 +251,16 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect

void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_structure)
{
size_t count = StorageURL::evalArgsAndCollectHeaders(args, headers_from_ast, context);
/*
* Calls to `extractNamedArgumentAndRemoveFromList` need to happen before count is determined:
* `size_t count = StorageURL::evalArgsAndCollectHeaders`
* This is because extractNamedArgumentAndRemoveFromList alters the list of arguments
* */
partition_strategy = extractNamedArgumentAndRemoveFromList<std::string>(args, "partition_strategy").value_or("auto");
hive_partition_strategy_write_partition_columns_into_files = extractNamedArgumentAndRemoveFromList<bool>(args, "hive_partition_strategy_write_partition_columns_into_files").value_or(false);

size_t count = StorageURL::evalArgsAndCollectHeaders(args, headers_from_ast, context);

if (count == 0 || count > getMaxNumberOfArguments(with_structure))
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage S3 requires 1 to {} arguments. All supported signatures:\n{}",
Expand Down
40 changes: 24 additions & 16 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace ErrorCodes
extern const int DATABASE_ACCESS_DENIED;
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}

namespace StorageObjectStorageSetting
Expand Down Expand Up @@ -139,6 +140,21 @@ StorageObjectStorage::StorageObjectStorage(

setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context, sample_path, format_settings));
setInMemoryMetadata(metadata);

// perhaps it is worth adding some extra safeguards for cases like
// create table s3_table engine=s3('{_partition_id}'); -- partition id wildcard set, but no partition expression
// create table s3_table engine=s3(partition_strategy='hive'); -- partition strategy set, but no partition expression
if (partition_by)
{
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat problematic.

PartitionStrategyFactory needs a sample block, so it needs to be after the call to VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context, sample_path, format_settings) since it might alter the number of columns. This function needs a sample_path, that is resolved by getPathSample.

On the other hand, PartitionStrategyFactory performs some bucket & key validation which should happen before getPathSample

partition_strategy = PartitionStrategyFactory::get(
partition_by,
metadata.getSampleBlock(),
context,
configuration->format,
configuration->withPartitionWildcard(),
configuration->partition_strategy,
configuration->hive_partition_strategy_write_partition_columns_into_files);
}
}

String StorageObjectStorage::getName() const
Expand Down Expand Up @@ -357,7 +373,7 @@ void StorageObjectStorage::read(
}

SinkToStoragePtr StorageObjectStorage::write(
const ASTPtr & query,
const ASTPtr &,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr local_context,
bool /* async_insert */)
Expand All @@ -380,22 +396,10 @@ SinkToStoragePtr StorageObjectStorage::write(
configuration->getPath());
}

if (configuration->withPartitionWildcard())
if (partition_strategy)
{
ASTPtr partition_by_ast = nullptr;
if (auto insert_query = std::dynamic_pointer_cast<ASTInsertQuery>(query))
{
if (insert_query->partition_by)
partition_by_ast = insert_query->partition_by;
else
partition_by_ast = partition_by;
}

if (partition_by_ast)
{
return std::make_shared<PartitionedStorageObjectStorageSink>(
object_storage, configuration, format_settings, sample_block, local_context, partition_by_ast);
}
return std::make_shared<PartitionedStorageObjectStorageSink>(
partition_strategy, object_storage, configuration, format_settings, sample_block, local_context);
}

auto paths = configuration->getPaths();
Expand Down Expand Up @@ -567,6 +571,10 @@ void StorageObjectStorage::Configuration::initialize(
else
configuration.fromAST(engine_args, local_context, with_table_structure);

if (configuration.isNamespaceWithGlobs())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Expression can not have wildcards inside {} name", configuration.getNamespaceType());

if (configuration.format == "auto")
{
if (configuration.isDataLakeConfiguration())
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace DB
class ReadBufferIterator;
class SchemaCache;
class NamedCollection;
struct PartitionStrategy;

namespace ErrorCodes
{
Expand Down Expand Up @@ -153,6 +154,7 @@ class StorageObjectStorage : public IStorage
const std::optional<FormatSettings> format_settings;
const ASTPtr partition_by;
const bool distributed_processing;
std::shared_ptr<PartitionStrategy> partition_strategy;

LoggerPtr log;
};
Expand Down Expand Up @@ -244,6 +246,8 @@ class StorageObjectStorage::Configuration
String format = "auto";
String compression_method = "auto";
String structure = "auto";
std::string partition_strategy = "auto";
bool hive_partition_strategy_write_partition_columns_into_files = false;

virtual void update(ObjectStoragePtr object_storage, ContextPtr local_context);
void updateIfRequired(ObjectStoragePtr object_storage, ContextPtr local_context);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
if (sample_path.empty() && context_->getSettingsRef()[Setting::use_hive_partitioning])
sample_path = getPathSample(metadata, context_);

setInMemoryMetadata(metadata);
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns, context_, sample_path));
setInMemoryMetadata(metadata);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


pure_storage = std::make_shared<StorageObjectStorage>(
configuration,
Expand Down
74 changes: 38 additions & 36 deletions src/Storages/ObjectStorage/StorageObjectStorageSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,34 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}

namespace
{
void validateKey(const String & str)
{
/// See:
/// - https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html
/// - https://cloud.ibm.com/apidocs/cos/cos-compatibility#putobject

if (str.empty() || str.size() > 1024)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just comment, not for change right now, because this code already exists, just moved in namespace, but I don't like it. As I understand key is generated inside clickhouse code and customer can't fully control key length. And when he gets this error - what's next? "Ok, key is to long, how can I fix it?".
May be we need to add task in TODO list to think about autodecreasing key length in cases like this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fully generated by ClickHouse. The key here represents the path without the bucket. Part of that can be specified by user upon table creation.

throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect key length (not empty, max 1023 characters), got: {}", str.size());

if (!UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in key");

PartitionedSink::validatePartitionKey(str, true);
}

void validateNamespace(const String & str, PartitionedStorageObjectStorageSink::ConfigurationPtr configuration)
{
configuration->validateNamespace(str);

if (!UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in bucket name");

PartitionedSink::validatePartitionKey(str, false);
}
}

StorageObjectStorageSink::StorageObjectStorageSink(
ObjectStoragePtr object_storage,
ConfigurationPtr configuration,
Expand Down Expand Up @@ -97,13 +125,13 @@ void StorageObjectStorageSink::cancelBuffers()
}

PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink(
std::shared_ptr<PartitionStrategy> partition_strategy_,
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
std::optional<FormatSettings> format_settings_,
const Block & sample_block_,
ContextPtr context_,
const ASTPtr & partition_by)
: PartitionedSink(partition_by, context_, sample_block_)
ContextPtr context_)
: PartitionedSink(partition_strategy_, context_, sample_block_)
, object_storage(object_storage_)
, configuration(configuration_)
, query_settings(configuration_->getQuerySettings(context_))
Expand All @@ -121,51 +149,25 @@ StorageObjectStorageSink::~StorageObjectStorageSink()

SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String & partition_id)
{
auto partition_bucket = replaceWildcards(configuration->getNamespace(), partition_id);
validateNamespace(partition_bucket);
auto file_path = getPartitionStrategy()->getPath(configuration->getPath(), partition_id);

auto partition_key = replaceWildcards(configuration->getPath(), partition_id);
validateKey(partition_key);
validateNamespace(configuration->getNamespace(), configuration);
validateKey(file_path);

if (auto new_key = checkAndGetNewFileOnInsertIfNeeded(
*object_storage, *configuration, query_settings, partition_key, /* sequence_number */1))
*object_storage, *configuration, query_settings, file_path, /* sequence_number */1))
{
partition_key = *new_key;
file_path = *new_key;
}

return std::make_shared<StorageObjectStorageSink>(
object_storage,
configuration,
format_settings,
sample_block,
getPartitionStrategy()->getBlockWithoutPartitionColumnsIfNeeded(),
context,
partition_key
file_path
);
}

void PartitionedStorageObjectStorageSink::validateKey(const String & str)
{
/// See:
/// - https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html
/// - https://cloud.ibm.com/apidocs/cos/cos-compatibility#putobject

if (str.empty() || str.size() > 1024)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Incorrect key length (not empty, max 1023 characters), got: {}", str.size());

if (!UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in key");

validatePartitionKey(str, true);
}

void PartitionedStorageObjectStorageSink::validateNamespace(const String & str)
{
configuration->validateNamespace(str);

if (!UTF8::isValidUTF8(reinterpret_cast<const UInt8 *>(str.data()), str.size()))
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Incorrect non-UTF8 sequence in bucket name");

validatePartitionKey(str, false);
}

}
7 changes: 2 additions & 5 deletions src/Storages/ObjectStorage/StorageObjectStorageSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,16 @@ class PartitionedStorageObjectStorageSink : public PartitionedSink
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;

PartitionedStorageObjectStorageSink(
std::shared_ptr<PartitionStrategy> partition_strategy_,
ObjectStoragePtr object_storage_,
ConfigurationPtr configuration_,
std::optional<FormatSettings> format_settings_,
const Block & sample_block_,
ContextPtr context_,
const ASTPtr & partition_by);
ContextPtr context_);

SinkPtr createSinkForPartition(const String & partition_id) override;

private:
void validateKey(const String & str);
void validateNamespace(const String & str);

ObjectStoragePtr object_storage;
ConfigurationPtr configuration;

Expand Down
Loading
Loading