Skip to content

Write to Merge storage #683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: antalya
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 220 additions & 98 deletions src/Storages/StorageMerge.cpp

Large diffs are not rendered by default.

44 changes: 39 additions & 5 deletions src/Storages/StorageMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class StorageMerge final : public IStorage, WithContext
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const DBToTableSetMap & source_databases_and_tables_,
const std::optional<String> & table_to_write_,
bool table_to_write_auto_,
ContextPtr context_);

StorageMerge(
Expand All @@ -39,6 +41,8 @@ class StorageMerge final : public IStorage, WithContext
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const String & source_table_regexp_,
const std::optional<String> & table_to_write_,
bool table_to_write_auto_,
ContextPtr context_);

std::string getName() const override { return "Merge"; }
Expand Down Expand Up @@ -70,6 +74,12 @@ class StorageMerge final : public IStorage, WithContext
size_t max_block_size,
size_t num_streams) override;

SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool async_insert) override;

void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;

/// you need to add and remove columns in the sub-tables manually
Expand All @@ -87,9 +97,12 @@ class StorageMerge final : public IStorage, WithContext
using DatabaseTablesIterators = std::vector<DatabaseTablesIteratorPtr>;
DatabaseTablesIterators getDatabaseIterators(ContextPtr context) const;

/// Returns a unified column structure among multiple tables.
/// Takes a function that invokes a callback for every table. NOTE: This is quite inconvenient.
static ColumnsDescription unifyColumnsDescription(std::function<void(std::function<void(const StoragePtr &)>)> for_each_table);
static ColumnsDescription getColumnsDescriptionFromSourceTables(
const ContextPtr & query_context,
const String & source_database_name_or_regexp,
bool database_is_regexp,
const String & source_table_regexp,
size_t max_tables_to_look);

private:
/// (Database, Table, Lock, TableName)
Expand Down Expand Up @@ -119,15 +132,31 @@ class StorageMerge final : public IStorage, WithContext

DatabaseNameOrRegexp database_name_or_regexp;

std::optional<QualifiedTableName> table_to_write;
bool table_to_write_auto = false;

template <typename F>
StoragePtr getFirstTable(F && predicate) const;
StoragePtr traverseTablesUntil(F && predicate) const;

template <typename F>
void forEachTable(F && func) const;

template <typename F>
void forEachTableName(F && func) const;

template <typename F>
static StoragePtr traverseTablesUntilImpl(const ContextPtr & query_context, const IStorage * ignore_self, const DatabaseNameOrRegexp & database_name_or_regexp, F && predicate);

/// Returns a unified column structure among multiple tables.
static ColumnsDescription getColumnsDescriptionFromSourceTablesImpl(
const ContextPtr & context,
const DatabaseNameOrRegexp & database_name_or_regexp,
size_t max_tables_to_look,
const IStorage * ignore_self);

ColumnSizeByName getColumnSizes() const override;

ColumnsDescription getColumnsDescriptionFromSourceTables(size_t max_tables_to_look) const;
ColumnsDescription getColumnsDescriptionFromSourceTables(const ContextPtr & context) const;

static VirtualColumnsDescription createVirtuals();

Expand All @@ -136,6 +165,11 @@ class StorageMerge final : public IStorage, WithContext
template <typename F>
std::optional<UInt64> totalRowsOrBytes(F && func) const;

void setTableToWrite(
const std::optional<String> & table_to_write_,
const String & source_database_name_or_regexp_,
bool database_is_regexp_);

friend class ReadFromMerge;
};

Expand Down
103 changes: 13 additions & 90 deletions src/TableFunctions/TableFunctionMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,13 @@ class TableFunctionMerge : public ITableFunction
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
const char * getStorageTypeName() const override { return "Merge"; }

using TableSet = std::set<String>;
using DBToTableSetMap = std::map<String, TableSet>;
const DBToTableSetMap & getSourceDatabasesAndTables(ContextPtr context) const;
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
static TableSet getMatchedTablesWithAccess(const String & database_name, const String & table_regexp, const ContextPtr & context);

String source_database_name_or_regexp;
String source_table_regexp;
bool database_is_regexp = false;
mutable std::optional<DBToTableSetMap> source_databases_and_tables;
};

std::vector<size_t> TableFunctionMerge::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const
Expand Down Expand Up @@ -129,111 +124,39 @@ void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, ContextPtr
}
}


const TableFunctionMerge::DBToTableSetMap & TableFunctionMerge::getSourceDatabasesAndTables(ContextPtr context) const
{
if (source_databases_and_tables)
return *source_databases_and_tables;

source_databases_and_tables.emplace();

/// database_name is not a regexp
if (!database_is_regexp)
{
auto source_tables = getMatchedTablesWithAccess(source_database_name_or_regexp, source_table_regexp, context);
if (source_tables.empty())
throwNoTablesMatchRegexp(source_database_name_or_regexp, source_table_regexp);
(*source_databases_and_tables)[source_database_name_or_regexp] = source_tables;
}

/// database_name is a regexp
else
{
OptimizedRegularExpression database_re(source_database_name_or_regexp);
auto databases = DatabaseCatalog::instance().getDatabases();

for (const auto & db : databases)
if (database_re.match(db.first))
(*source_databases_and_tables)[db.first] = getMatchedTablesWithAccess(db.first, source_table_regexp, context);

if (source_databases_and_tables->empty())
throwNoTablesMatchRegexp(source_database_name_or_regexp, source_table_regexp);
}

return *source_databases_and_tables;
}

ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{
size_t table_num = 0;
size_t max_tables_to_look = context->getSettingsRef()[Setting::merge_table_max_tables_to_look_for_schema_inference];
auto res = StorageMerge::getColumnsDescriptionFromSourceTables(
context,
source_database_name_or_regexp,
database_is_regexp,
source_table_regexp,
context->getSettingsRef()[Setting::merge_table_max_tables_to_look_for_schema_inference]);
if (res.empty())
throwNoTablesMatchRegexp(source_database_name_or_regexp, source_table_regexp);

return StorageMerge::unifyColumnsDescription([&table_num, &context, max_tables_to_look, this](std::function<void(const StoragePtr &)> callback)
{
for (const auto & db_with_tables : getSourceDatabasesAndTables(context))
{
for (const auto & table : db_with_tables.second)
{
if (table_num >= max_tables_to_look)
return;

auto storage = DatabaseCatalog::instance().tryGetTable(StorageID{db_with_tables.first, table}, context);
if (storage)
{
++table_num;
callback(storage);
}
}
}
});
return res;
}


StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
{
std::optional<std::string> table_to_write = std::nullopt;
auto res = std::make_shared<StorageMerge>(
StorageID(getDatabaseName(), table_name),
ColumnsDescription{},
String{},
source_database_name_or_regexp,
database_is_regexp,
getSourceDatabasesAndTables(context),
source_table_regexp,
table_to_write,
false,
context);

res->startup();
return res;
}

TableFunctionMerge::TableSet
TableFunctionMerge::getMatchedTablesWithAccess(const String & database_name, const String & table_regexp, const ContextPtr & context)
{
OptimizedRegularExpression table_re(table_regexp);

auto table_name_match = [&](const String & table_name) { return table_re.match(table_name); };

auto access = context->getAccess();

auto database = DatabaseCatalog::instance().getDatabase(database_name);

bool granted_show_on_all_tables = access->isGranted(AccessType::SHOW_TABLES, database_name);
bool granted_select_on_all_tables = access->isGranted(AccessType::SELECT, database_name);

TableSet tables;

for (auto it = database->getTablesIterator(context, table_name_match); it->isValid(); it->next())
{
if (!it->table())
continue;
bool granted_show = granted_show_on_all_tables || access->isGranted(AccessType::SHOW_TABLES, database_name, it->name());
if (!granted_show)
continue;
if (!granted_select_on_all_tables)
access->checkAccess(AccessType::SELECT, database_name, it->name());
tables.emplace(it->name());
}
return tables;
}

}

void registerTableFunctionMerge(TableFunctionFactory & factory)
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_table_functions_access_rights/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_merge():
)

instance.query("GRANT CREATE TEMPORARY TABLE ON *.* TO A")
assert "no tables in the database matches" in instance.query_and_get_error(
assert "no tables satisfied provided regexp" in instance.query_and_get_error(
select_query, user="A"
)

Expand All @@ -63,7 +63,7 @@ def test_merge():
instance.query("GRANT SELECT ON default.table1 TO A")
instance.query("GRANT INSERT ON default.table2 TO A")
assert (
"it's necessary to have the grant SELECT ON default.table2"
"it's necessary to have the grant SELECT(x) ON default.table2"
in instance.query_and_get_error(select_query, user="A")
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
-- Tags: no-parallel

DROP DATABASE IF EXISTS 01902_db_params;
CREATE DATABASE 01902_db_params;
CREATE TABLE 01902_db_params.t(n Int8) ENGINE=MergeTree ORDER BY n;
INSERT INTO 01902_db_params.t SELECT * FROM numbers(3);
SELECT _database, _table, n FROM merge(REGEXP('^01902_db_params'), '^t') ORDER BY _database, _table, n;

SELECT _database, _table, n FROM merge() ORDER BY _database, _table, n; -- {serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH}
SELECT _database, _table, n FROM merge('^t') ORDER BY _database, _table, n; -- {serverError BAD_ARGUMENTS}
SELECT _database, _table, n FROM merge('^t') ORDER BY _database, _table, n; -- {serverError CANNOT_EXTRACT_TABLE_STRUCTURE}

USE 01902_db_params;
SELECT _database, _table, n FROM merge('^t') ORDER BY _database, _table, n;
Expand Down
Loading
Loading