Skip to content

Commit 7033c57

Browse files
committed
Write to Merge storage
1 parent 4ef8063 commit 7033c57

File tree

5 files changed

+130
-4
lines changed

5 files changed

+130
-4
lines changed

src/Storages/StorageMerge.cpp

+55-4
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ StorageMerge::StorageMerge(
143143
const String & source_database_name_or_regexp_,
144144
bool database_is_regexp_,
145145
const DBToTableSetMap & source_databases_and_tables_,
146+
const std::optional<String> & table_to_write_,
146147
ContextPtr context_)
147148
: IStorage(table_id_)
148149
, WithContext(context_->getGlobalContext())
@@ -157,6 +158,7 @@ StorageMerge::StorageMerge(
157158
storage_metadata.setComment(comment);
158159
setInMemoryMetadata(storage_metadata);
159160
setVirtuals(createVirtuals());
161+
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
160162
}
161163

162164
StorageMerge::StorageMerge(
@@ -166,6 +168,7 @@ StorageMerge::StorageMerge(
166168
const String & source_database_name_or_regexp_,
167169
bool database_is_regexp_,
168170
const String & source_table_regexp_,
171+
const std::optional<String> & table_to_write_,
169172
ContextPtr context_)
170173
: IStorage(table_id_)
171174
, WithContext(context_->getGlobalContext())
@@ -180,6 +183,7 @@ StorageMerge::StorageMerge(
180183
storage_metadata.setComment(comment);
181184
setInMemoryMetadata(storage_metadata);
182185
setVirtuals(createVirtuals());
186+
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
183187
}
184188

185189
StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators(ContextPtr context_) const
@@ -1668,6 +1672,44 @@ std::optional<UInt64> StorageMerge::totalRowsOrBytes(F && func) const
16681672
return first_table ? std::nullopt : std::make_optional(total_rows_or_bytes);
16691673
}
16701674

1675+
void StorageMerge::setTableToWrite(
1676+
const std::optional<String> & table_to_write_,
1677+
const String & source_database_name_or_regexp_,
1678+
bool database_is_regexp_)
1679+
{
1680+
if (!table_to_write_.has_value())
1681+
{
1682+
table_to_write = std::nullopt;
1683+
return;
1684+
}
1685+
1686+
auto qualified_name = QualifiedTableName::parseFromString(*table_to_write_);
1687+
1688+
if (qualified_name.database.empty())
1689+
{
1690+
if (database_is_regexp_)
1691+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument 'table_to_write' must contain database if 'db_name' is regular expression.");
1692+
1693+
qualified_name.database = source_database_name_or_regexp_;
1694+
}
1695+
1696+
table_to_write = qualified_name;
1697+
}
1698+
1699+
SinkToStoragePtr StorageMerge::write(
1700+
const ASTPtr & query,
1701+
const StorageMetadataPtr & metadata_snapshot,
1702+
ContextPtr context_,
1703+
bool async_insert)
1704+
{
1705+
if (!table_to_write.has_value())
1706+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not allowed in storage {} without described table to write.", getName());
1707+
1708+
auto database = DatabaseCatalog::instance().getDatabase(table_to_write->database);
1709+
auto table = database->getTable(table_to_write->table, context_);
1710+
return table->write(query, metadata_snapshot, context_, async_insert);
1711+
}
1712+
16711713
void registerStorageMerge(StorageFactory & factory)
16721714
{
16731715
factory.registerStorage("Merge", [](const StorageFactory::Arguments & args)
@@ -1678,10 +1720,12 @@ void registerStorageMerge(StorageFactory & factory)
16781720

16791721
ASTs & engine_args = args.engine_args;
16801722

1681-
if (engine_args.size() != 2)
1723+
size_t size = engine_args.size();
1724+
1725+
if (size < 2 || size > 3)
16821726
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
1683-
"Storage Merge requires exactly 2 parameters - name "
1684-
"of source database and regexp for table names.");
1727+
"Storage Merge requires 2 or 3 parameters - name "
1728+
"of source database, regexp for table names, and optional table name for writing.");
16851729

16861730
auto [is_regexp, database_ast] = StorageMerge::evaluateDatabaseName(engine_args[0], args.getLocalContext());
16871731

@@ -1693,8 +1737,15 @@ void registerStorageMerge(StorageFactory & factory)
16931737
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.getLocalContext());
16941738
String table_name_regexp = checkAndGetLiteralArgument<String>(engine_args[1], "table_name_regexp");
16951739

1740+
std::optional<String> table_to_write = std::nullopt;
1741+
if (size == 3)
1742+
{
1743+
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.getLocalContext());
1744+
table_to_write = checkAndGetLiteralArgument<String>(engine_args[2], "table_to_write");
1745+
}
1746+
16961747
return std::make_shared<StorageMerge>(
1697-
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp, table_name_regexp, args.getContext());
1748+
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp, table_name_regexp, table_to_write, args.getContext());
16981749
},
16991750
{
17001751
.supports_schema_inference = true

src/Storages/StorageMerge.h

+15
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class StorageMerge final : public IStorage, WithContext
3030
const String & source_database_name_or_regexp_,
3131
bool database_is_regexp_,
3232
const DBToTableSetMap & source_databases_and_tables_,
33+
const std::optional<String> & table_to_write_,
3334
ContextPtr context_);
3435

3536
StorageMerge(
@@ -39,6 +40,7 @@ class StorageMerge final : public IStorage, WithContext
3940
const String & source_database_name_or_regexp_,
4041
bool database_is_regexp_,
4142
const String & source_table_regexp_,
43+
const std::optional<String> & table_to_write_,
4244
ContextPtr context_);
4345

4446
std::string getName() const override { return "Merge"; }
@@ -70,6 +72,12 @@ class StorageMerge final : public IStorage, WithContext
7072
size_t max_block_size,
7173
size_t num_streams) override;
7274

75+
SinkToStoragePtr write(
76+
const ASTPtr & query,
77+
const StorageMetadataPtr & metadata_snapshot,
78+
ContextPtr context,
79+
bool async_insert) override;
80+
7381
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;
7482

7583
/// you need to add and remove columns in the sub-tables manually
@@ -115,6 +123,8 @@ class StorageMerge final : public IStorage, WithContext
115123

116124
DatabaseNameOrRegexp database_name_or_regexp;
117125

126+
std::optional<QualifiedTableName> table_to_write;
127+
118128
template <typename F>
119129
StoragePtr getFirstTable(F && predicate) const;
120130

@@ -132,6 +142,11 @@ class StorageMerge final : public IStorage, WithContext
132142
template <typename F>
133143
std::optional<UInt64> totalRowsOrBytes(F && func) const;
134144

145+
void setTableToWrite(
146+
const std::optional<String> & table_to_write_,
147+
const String & source_database_name_or_regexp_,
148+
bool database_is_regexp_);
149+
135150
friend class ReadFromMerge;
136151
};
137152

src/TableFunctions/TableFunctionMerge.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,15 @@ ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr contex
175175

176176
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
177177
{
178+
std::optional<std::string> table_to_write = std::nullopt;
178179
auto res = std::make_shared<StorageMerge>(
179180
StorageID(getDatabaseName(), table_name),
180181
getActualTableStructure(context, is_insert_query),
181182
String{},
182183
source_database_name_or_regexp,
183184
database_is_regexp,
184185
getSourceDatabasesAndTables(context),
186+
table_to_write,
185187
context);
186188

187189
res->startup();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
2 1
2+
2 2
3+
2 3
4+
1 1
5+
2 1
6+
2 2
7+
2 3
8+
1 1
9+
2 1
10+
2 2
11+
2 3
12+
1 1
13+
2 1
14+
2 2
15+
2 3
16+
1 1
17+
2 1
18+
2 2
19+
2 3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
DROP TABLE IF EXISTS test03373_db.test03373_table_1;
2+
DROP TABLE IF EXISTS test03373_db.test03373_table_2;
3+
DROP TABLE IF EXISTS test03373_db.test03373_merge_ro;
4+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_1;
5+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_2;
6+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_3;
7+
DROP DATABASE IF EXISTS test03373_db;
8+
9+
CREATE DATABASE test03373_db;
10+
11+
CREATE TABLE test03373_db.test03373_table_1 (key UInt32, value UInt32) ENGINE=MergeTree() ORDER BY key;
12+
CREATE TABLE test03373_db.test03373_table_2 (key UInt32, value UInt32) ENGINE=MergeTree() ORDER BY key;
13+
14+
CREATE TABLE test03373_db.test03373_merge_ro (key UInt32, value UInt32) ENGINE=Merge(test03373_db, 'test03373_table_\d+');
15+
16+
CREATE TABLE test03373_db.test03373_merge_wr_1 (key UInt32, value UInt32) ENGINE=Merge(test03373_db, 'test03373_table_\d+', test03373_table_2);
17+
CREATE TABLE test03373_db.test03373_merge_wr_2 (key UInt32, value UInt32) ENGINE=Merge(test03373_db, 'test03373_table_\d+', test03373_db.test03373_table_2);
18+
CREATE TABLE test03373_db.test03373_merge_wr_3 (key UInt32, value UInt32) ENGINE=Merge(REGEXP('test03373_.*'), 'test03373_table_\d+', test03373_db.test03373_table_2);
19+
20+
INSERT INTO test03373_db.test03373_table_1 VALUES (1,1);
21+
22+
INSERT INTO test03373_db.test03373_merge_wr_1 VALUES (2,1);
23+
INSERT INTO test03373_db.test03373_merge_wr_2 VALUES (2,2);
24+
INSERT INTO test03373_db.test03373_merge_wr_3 VALUES (2,3);
25+
26+
SELECT * FROM test03373_db.test03373_table_2 ORDER BY key, value;
27+
28+
SELECT * FROM test03373_db.test03373_merge_ro ORDER BY key, value;
29+
SELECT * FROM test03373_db.test03373_merge_wr_1 ORDER BY key, value;
30+
SELECT * FROM test03373_db.test03373_merge_wr_2 ORDER BY key, value;
31+
SELECT * FROM test03373_db.test03373_merge_wr_3 ORDER BY key, value;
32+
33+
DROP TABLE IF EXISTS test03373_db.test03373_table_1;
34+
DROP TABLE IF EXISTS test03373_db.test03373_table_2;
35+
DROP TABLE IF EXISTS test03373_db.test03373_merge_ro;
36+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_1;
37+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_2;
38+
DROP TABLE IF EXISTS test03373_db.test03373_merge_wr_3;
39+
DROP DATABASE IF EXISTS test03373_db;

0 commit comments

Comments
 (0)