diff --git a/src/include/metadata_manager/postgres_metadata_manager.hpp b/src/include/metadata_manager/postgres_metadata_manager.hpp index 1c7a7ad7d65..94b14dd9f0d 100644 --- a/src/include/metadata_manager/postgres_metadata_manager.hpp +++ b/src/include/metadata_manager/postgres_metadata_manager.hpp @@ -16,19 +16,24 @@ class PostgresMetadataManager : public DuckLakeMetadataManager { public: explicit PostgresMetadataManager(DuckLakeTransaction &transaction); + static unique_ptr Create(DuckLakeTransaction &transaction) { + return make_uniq(transaction); + } + bool TypeIsNativelySupported(const LogicalType &type) override; string GetColumnTypeInternal(const LogicalType &type) override; unique_ptr Execute(DuckLakeSnapshot snapshot, string &query) override; - unique_ptr Query(DuckLakeSnapshot snapshot, string &query) override; + unique_ptr Query(string query) override; + unique_ptr Query(DuckLakeSnapshot snapshot, string query) override; protected: string GetLatestSnapshotQuery() const override; private: - unique_ptr ExecuteQuery(DuckLakeSnapshot snapshot, string &query, string command); + unique_ptr ExecuteQuery(string &query, string command); }; } // namespace duckdb diff --git a/src/include/storage/ducklake_initializer.hpp b/src/include/storage/ducklake_initializer.hpp index d6599348beb..00ee8f31367 100644 --- a/src/include/storage/ducklake_initializer.hpp +++ b/src/include/storage/ducklake_initializer.hpp @@ -26,7 +26,6 @@ class DuckLakeInitializer { void InitializeNewDuckLake(DuckLakeTransaction &transaction, bool has_explicit_schema); void LoadExistingDuckLake(DuckLakeTransaction &transaction); void InitializeDataPath(); - string GetAttachOptions(); void CheckAndAutoloadedRequiredExtension(const string &pattern); private: diff --git a/src/include/storage/ducklake_metadata_manager.hpp b/src/include/storage/ducklake_metadata_manager.hpp index ab64202b463..46d7ca1df9f 100644 --- a/src/include/storage/ducklake_metadata_manager.hpp +++ b/src/include/storage/ducklake_metadata_manager.hpp @@ -15,6 +15,7 @@ #include "duckdb/common/reference_map.hpp" #include "duckdb/common/types/value.hpp" #include "common/ducklake_snapshot.hpp" +#include "storage/ducklake_catalog.hpp" #include "storage/ducklake_partition_data.hpp" #include "storage/ducklake_stats.hpp" #include "duckdb/common/types/timestamp.hpp" @@ -97,6 +98,9 @@ class DuckLakeMetadataManager { explicit DuckLakeMetadataManager(DuckLakeTransaction &transaction); virtual ~DuckLakeMetadataManager(); + typedef unique_ptr (*create_t)(DuckLakeTransaction &transaction); + static void Register(const string &name, create_t); + static unique_ptr Create(DuckLakeTransaction &transaction); virtual bool TypeIsNativelySupported(const LogicalType &type); @@ -105,13 +109,22 @@ class DuckLakeMetadataManager { DuckLakeMetadataManager &Get(DuckLakeTransaction &transaction); + virtual bool IsInitialized(DuckLakeOptions &options); //! Initialize a new DuckLake virtual void InitializeDuckLake(bool has_explicit_schema, DuckLakeEncryption encryption); virtual DuckLakeMetadata LoadDuckLake(); + static void FillSnapshotArgs(string &query, const DuckLakeSnapshot &snapshot); + static void FillSnapshotCommitArgs(string &query, const DuckLakeSnapshotCommit &commit_info); + static void FillCatalogArgs(string &query, const DuckLakeCatalog &ducklake_catalog); + + //! Directly execute on metadata virtual unique_ptr Execute(DuckLakeSnapshot snapshot, string &query); - virtual unique_ptr Query(DuckLakeSnapshot snapshot, string &query); + //! Directly query on metadata + virtual unique_ptr Query(string query); + //! Directly query on metadata + virtual unique_ptr Query(DuckLakeSnapshot snapshot, string query); //! Get the catalog information for a specific snapshot virtual DuckLakeCatalogInfo GetCatalogForSnapshot(DuckLakeSnapshot snapshot); virtual vector GetGlobalTableStats(DuckLakeSnapshot snapshot); @@ -175,7 +188,7 @@ class DuckLakeMetadataManager { virtual string UpdateGlobalTableStats(const DuckLakeGlobalStatsInfo &stats); virtual SnapshotChangeInfo GetSnapshotAndStatsAndChanges(DuckLakeSnapshot start_snapshot, SnapshotAndStats ¤t_snapshot); - SnapshotDeletedFromFiles GetFilesDeletedOrDroppedAfterSnapshot(const DuckLakeSnapshot &start_snapshot) const; + SnapshotDeletedFromFiles GetFilesDeletedOrDroppedAfterSnapshot(const DuckLakeSnapshot &start_snapshot); virtual unique_ptr GetSnapshot(); virtual unique_ptr GetSnapshot(BoundAtClause &at_clause, SnapshotBound bound); virtual idx_t GetNextColumnId(TableIndex table_id); @@ -257,6 +270,8 @@ class DuckLakeMetadataManager { private: unordered_map inlined_table_name_cache; + static unordered_map metadata_managers; + static mutex metadata_managers_lock; protected: DuckLakeTransaction &transaction; diff --git a/src/include/storage/ducklake_transaction.hpp b/src/include/storage/ducklake_transaction.hpp index 2d2f76dacc0..251e1e0af57 100644 --- a/src/include/storage/ducklake_transaction.hpp +++ b/src/include/storage/ducklake_transaction.hpp @@ -69,7 +69,7 @@ class DuckLakeTransaction : public Transaction, public enable_shared_from_this Query(DuckLakeSnapshot snapshot, string query); + //! Run query on duckdb unique_ptr Query(string query); Connection &GetConnection(); diff --git a/src/metadata_manager/postgres_metadata_manager.cpp b/src/metadata_manager/postgres_metadata_manager.cpp index ef5096ae188..c3e9e8e28e3 100644 --- a/src/metadata_manager/postgres_metadata_manager.cpp +++ b/src/metadata_manager/postgres_metadata_manager.cpp @@ -2,6 +2,7 @@ #include "common/ducklake_util.hpp" #include "duckdb/main/database.hpp" #include "storage/ducklake_catalog.hpp" +#include "storage/ducklake_metadata_manager.hpp" #include "storage/ducklake_transaction.hpp" namespace duckdb { @@ -46,44 +47,32 @@ string PostgresMetadataManager::GetColumnTypeInternal(const LogicalType &column_ } } -unique_ptr PostgresMetadataManager::ExecuteQuery(DuckLakeSnapshot snapshot, string &query, - string command) { +unique_ptr PostgresMetadataManager::ExecuteQuery(string &query, string command) { auto &commit_info = transaction.GetCommitInfo(); - query = StringUtil::Replace(query, "{SNAPSHOT_ID}", to_string(snapshot.snapshot_id)); - query = StringUtil::Replace(query, "{SCHEMA_VERSION}", to_string(snapshot.schema_version)); - query = StringUtil::Replace(query, "{NEXT_CATALOG_ID}", to_string(snapshot.next_catalog_id)); - query = StringUtil::Replace(query, "{NEXT_FILE_ID}", to_string(snapshot.next_file_id)); - query = StringUtil::Replace(query, "{AUTHOR}", commit_info.author.ToSQLString()); - query = StringUtil::Replace(query, "{COMMIT_MESSAGE}", commit_info.commit_message.ToSQLString()); - query = StringUtil::Replace(query, "{COMMIT_EXTRA_INFO}", commit_info.commit_extra_info.ToSQLString()); + DuckLakeMetadataManager::FillSnapshotCommitArgs(query, commit_info); auto &connection = transaction.GetConnection(); auto &ducklake_catalog = transaction.GetCatalog(); - auto catalog_identifier = DuckLakeUtil::SQLIdentifierToString(ducklake_catalog.MetadataDatabaseName()); auto catalog_literal = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataDatabaseName()); - auto schema_identifier = DuckLakeUtil::SQLIdentifierToString(ducklake_catalog.MetadataSchemaName()); - auto schema_identifier_escaped = StringUtil::Replace(schema_identifier, "'", "''"); - auto schema_literal = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataSchemaName()); - auto metadata_path = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataPath()); - auto data_path = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.DataPath()); - query = StringUtil::Replace(query, "{METADATA_CATALOG_NAME_LITERAL}", catalog_literal); - query = StringUtil::Replace(query, "{METADATA_CATALOG_NAME_IDENTIFIER}", catalog_identifier); - query = StringUtil::Replace(query, "{METADATA_SCHEMA_NAME_LITERAL}", schema_literal); - query = StringUtil::Replace(query, "{METADATA_CATALOG}", schema_identifier); - query = StringUtil::Replace(query, "{METADATA_SCHEMA_ESCAPED}", schema_identifier_escaped); - query = StringUtil::Replace(query, "{METADATA_PATH}", metadata_path); - query = StringUtil::Replace(query, "{DATA_PATH}", data_path); + DuckLakeMetadataManager::FillCatalogArgs(query, ducklake_catalog); - return connection.Query(StringUtil::Format("CALL %s(%s, %s)", command, catalog_literal, SQLString(query))); + return connection.Query( + StringUtil::Format("CALL %s(%s, %s)", std::move(command), catalog_literal, SQLString(query))); } unique_ptr PostgresMetadataManager::Execute(DuckLakeSnapshot snapshot, string &query) { - return ExecuteQuery(snapshot, query, "postgres_execute"); + DuckLakeMetadataManager::FillSnapshotArgs(query, snapshot); + return ExecuteQuery(query, "postgres_execute"); } -unique_ptr PostgresMetadataManager::Query(DuckLakeSnapshot snapshot, string &query) { - return ExecuteQuery(snapshot, query, "postgres_query"); +unique_ptr PostgresMetadataManager::Query(string query) { + return ExecuteQuery(query, "postgres_query"); +} + +unique_ptr PostgresMetadataManager::Query(DuckLakeSnapshot snapshot, string query) { + DuckLakeMetadataManager::FillSnapshotArgs(query, snapshot); + return Query(query); } string PostgresMetadataManager::GetLatestSnapshotQuery() const { diff --git a/src/storage/ducklake_initializer.cpp b/src/storage/ducklake_initializer.cpp index d5f970bd32e..8408cac605c 100644 --- a/src/storage/ducklake_initializer.cpp +++ b/src/storage/ducklake_initializer.cpp @@ -18,67 +18,16 @@ DuckLakeInitializer::DuckLakeInitializer(ClientContext &context, DuckLakeCatalog InitializeDataPath(); } -string DuckLakeInitializer::GetAttachOptions() { - vector attach_options; - if (options.access_mode != AccessMode::AUTOMATIC) { - switch (options.access_mode) { - case AccessMode::READ_ONLY: - attach_options.push_back("READ_ONLY"); - break; - case AccessMode::READ_WRITE: - attach_options.push_back("READ_WRITE"); - break; - default: - throw InternalException("Unsupported access mode in DuckLake attach"); - } - } - for (auto &option : options.metadata_parameters) { - attach_options.push_back(option.first + " " + option.second.ToSQLString()); - } - - if (attach_options.empty()) { - return string(); - } - string result; - for (auto &option : attach_options) { - if (!result.empty()) { - result += ", "; - } - result += option; - } - return " (" + result + ")"; -} - void DuckLakeInitializer::Initialize() { auto &transaction = DuckLakeTransaction::Get(context, catalog); - // attach the metadata database - auto result = - transaction.Query("ATTACH {METADATA_PATH} AS {METADATA_CATALOG_NAME_IDENTIFIER}" + GetAttachOptions()); - if (result->HasError()) { - auto &error_obj = result->GetErrorObject(); - error_obj.Throw("Failed to attach DuckLake MetaData \"" + catalog.MetadataDatabaseName() + "\" at path + \"" + - catalog.MetadataPath() + "\""); - } - // explicitly load all secrets - work-around to secret initialization bug - transaction.Query("FROM duckdb_secrets()"); - + auto &metadata_manager = transaction.GetMetadataManager(); bool has_explicit_schema = !options.metadata_schema.empty(); - if (options.metadata_schema.empty()) { - // if the schema is not explicitly set by the user - set it to the default schema in the catalog - options.metadata_schema = transaction.GetDefaultSchemaName(); - } // after the metadata database is attached initialize the ducklake // check if we are loading an existing DuckLake or creating a new one // FIXME: verify that all tables are in the correct format instead - result = transaction.Query( - "SELECT COUNT(*) FROM duckdb_tables() WHERE database_name={METADATA_CATALOG_NAME_LITERAL} AND " - "schema_name={METADATA_SCHEMA_NAME_LITERAL} AND table_name LIKE 'ducklake_%'"); - if (result->HasError()) { - auto &error_obj = result->GetErrorObject(); - error_obj.Throw("Failed to load DuckLake table data"); - } - auto count = result->Fetch()->GetValue(0, 0).GetValue(); - if (count == 0) { + + bool is_initialized = metadata_manager.IsInitialized(options); + if (!is_initialized) { if (!options.create_if_not_exists) { throw InvalidInputException("Existing DuckLake at metadata catalog \"%s\" does not exist - and creating a " "new DuckLake is explicitly disabled", diff --git a/src/storage/ducklake_metadata_manager.cpp b/src/storage/ducklake_metadata_manager.cpp index a9debd22d15..599e02d5993 100644 --- a/src/storage/ducklake_metadata_manager.cpp +++ b/src/storage/ducklake_metadata_manager.cpp @@ -1,4 +1,5 @@ #include "storage/ducklake_metadata_manager.hpp" +#include "common/ducklake_options.hpp" #include "storage/ducklake_transaction.hpp" #include "common/ducklake_util.hpp" #include "duckdb/planner/tableref/bound_at_clause.hpp" @@ -26,11 +27,26 @@ DuckLakeMetadataManager::~DuckLakeMetadataManager() { } optional_ptr GetDatabase(ClientContext &context, const string &name); +unordered_map DuckLakeMetadataManager::metadata_managers = { + {"postgres", PostgresMetadataManager::Create}, {"postgres_scanner", PostgresMetadataManager::Create}}; + +mutex DuckLakeMetadataManager::metadata_managers_lock; + +void DuckLakeMetadataManager::Register(const string &name, DuckLakeMetadataManager::create_t create) { + lock_guard lock(metadata_managers_lock); + if (metadata_managers.find(name) != metadata_managers.end()) { + throw InternalException("Metadata manager with name \"%s\" already exists!", name); + } + metadata_managers[name] = create; +} + unique_ptr DuckLakeMetadataManager::Create(DuckLakeTransaction &transaction) { + lock_guard lock(metadata_managers_lock); auto &catalog = transaction.GetCatalog(); auto catalog_type = catalog.MetadataType(); - if (catalog_type == "postgres" || catalog_type == "postgres_scanner") { - return make_uniq(transaction); + auto create = metadata_managers[catalog_type]; + if (create) { + return create(transaction); } return make_uniq(transaction); } @@ -47,6 +63,66 @@ FileSystem &DuckLakeMetadataManager::GetFileSystem() { return FileSystem::GetFileSystem(transaction.GetCatalog().GetDatabase()); } +static string GetAttachOptions(const DuckLakeOptions &options) { + vector attach_options; + if (options.access_mode != AccessMode::AUTOMATIC) { + switch (options.access_mode) { + case AccessMode::READ_ONLY: + attach_options.push_back("READ_ONLY"); + break; + case AccessMode::READ_WRITE: + attach_options.push_back("READ_WRITE"); + break; + default: + throw InternalException("Unsupported access mode in DuckLake attach"); + } + } + for (auto &option : options.metadata_parameters) { + attach_options.push_back(option.first + " " + option.second.ToSQLString()); + } + + if (attach_options.empty()) { + return string(); + } + string result; + for (auto &option : attach_options) { + if (!result.empty()) { + result += ", "; + } + result += option; + } + return " (" + result + ")"; +} + +bool DuckLakeMetadataManager::IsInitialized(DuckLakeOptions &options) { + auto &catalog = transaction.GetCatalog(); + // attach the metadata database + auto result = + transaction.Query("ATTACH {METADATA_PATH} AS {METADATA_CATALOG_NAME_IDENTIFIER}" + GetAttachOptions(options)); + if (result->HasError()) { + auto &error_obj = result->GetErrorObject(); + error_obj.Throw("Failed to attach DuckLake MetaData \"" + catalog.MetadataDatabaseName() + "\" at path + \"" + + catalog.MetadataPath() + "\""); + } + // explicitly load all secrets - work-around to secret initialization bug + transaction.Query("FROM duckdb_secrets()"); + + if (options.metadata_schema.empty()) { + // if the schema is not explicitly set by the user - set it to the default schema in the catalog + options.metadata_schema = transaction.GetDefaultSchemaName(); + } + + result = transaction.Query( + "SELECT COUNT(*) FROM duckdb_tables() WHERE database_name={METADATA_CATALOG_NAME_LITERAL} AND " + "schema_name={METADATA_SCHEMA_NAME_LITERAL} AND table_name LIKE 'ducklake_%'"); + if (result->HasError()) { + auto &error_obj = result->GetErrorObject(); + error_obj.Throw("Failed to load DuckLake table data"); + } + auto count = result->Fetch()->GetValue(0, 0).GetValue(); + return count > 0; +} + void DuckLakeMetadataManager::InitializeDuckLake(bool has_explicit_schema, DuckLakeEncryption encryption) { string initialize_query; if (has_explicit_schema) { @@ -58,7 +134,9 @@ void DuckLakeMetadataManager::InitializeDuckLake(bool has_explicit_schema, DuckL auto &base_data_path = ducklake_catalog.DataPath(); string data_path = StorePath(base_data_path); string encryption_str = encryption == DuckLakeEncryption::ENCRYPTED ? "true" : "false"; - initialize_query += StringUtil::Format(R"( + string initial_schema_uuid = transaction.GenerateUUID(); + initialize_query += + StringUtil::Format(R"( CREATE TABLE {METADATA_CATALOG}.ducklake_metadata(key VARCHAR NOT NULL, value VARCHAR NOT NULL, scope VARCHAR, scope_id BIGINT); CREATE TABLE {METADATA_CATALOG}.ducklake_snapshot(snapshot_id BIGINT PRIMARY KEY, snapshot_time TIMESTAMPTZ, schema_version BIGINT, next_catalog_id BIGINT, next_file_id BIGINT); CREATE TABLE {METADATA_CATALOG}.ducklake_snapshot_changes(snapshot_id BIGINT PRIMARY KEY, changes_made VARCHAR, author VARCHAR, commit_message VARCHAR, commit_extra_info VARCHAR); @@ -85,14 +163,14 @@ INSERT INTO {METADATA_CATALOG}.ducklake_schema_versions VALUES (0,0); INSERT INTO {METADATA_CATALOG}.ducklake_snapshot VALUES (0, NOW(), 0, 1, 0); INSERT INTO {METADATA_CATALOG}.ducklake_snapshot_changes VALUES (0, 'created_schema:"main"', NULL, NULL, NULL); INSERT INTO {METADATA_CATALOG}.ducklake_metadata (key, value) VALUES ('version', '0.3'), ('created_by', 'DuckDB %s'), ('data_path', %s), ('encrypted', '%s'); -INSERT INTO {METADATA_CATALOG}.ducklake_schema VALUES (0, UUID(), 0, NULL, 'main', 'main/', true); +INSERT INTO {METADATA_CATALOG}.ducklake_schema VALUES (0, '%s'::UUID, 0, NULL, 'main', 'main/', true); )", - DuckDB::SourceID(), SQLString(data_path), encryption_str); + DuckDB::SourceID(), SQLString(data_path), encryption_str, initial_schema_uuid); // TODO: add // ducklake_sorting_info // ducklake_sorting_column_info // ducklake_macro - auto result = transaction.Query(initialize_query); + auto result = Query(initialize_query); if (result->HasError()) { result->GetErrorObject().Throw("Failed to initialize DuckLake:"); } @@ -112,7 +190,7 @@ CREATE TABLE {METADATA_CATALOG}.ducklake_name_mapping(mapping_id BIGINT, column_ UPDATE {METADATA_CATALOG}.ducklake_partition_column SET column_id = (SELECT LIST(column_id ORDER BY column_order) FROM {METADATA_CATALOG}.ducklake_column WHERE table_id = ducklake_partition_column.table_id AND parent_column IS NULL AND end_snapshot IS NULL)[ducklake_partition_column.column_id + 1]; UPDATE {METADATA_CATALOG}.ducklake_metadata SET value = '0.2' WHERE key = 'version'; )"; - auto result = transaction.Query(migrate_query); + auto result = Query(migrate_query); if (result->HasError()) { result->GetErrorObject().Throw("Failed to migrate DuckLake from v0.1 to v0.2:"); } @@ -143,19 +221,19 @@ ALTER TABLE {METADATA_CATALOG}.ducklake_table_column_stats ADD COLUMN {IF_NOT_EX migrate_query = StringUtil::Replace(migrate_query, "{IF_EXISTS}", ""); migrate_query = StringUtil::Replace(migrate_query, "{WHERE_EMPTY}", ""); } - auto result = transaction.Query(migrate_query); + auto result = Query(migrate_query); if (result->HasError()) { result->GetErrorObject().Throw("Failed to migrate DuckLake from v0.2 to v0.3:"); } } DuckLakeMetadata DuckLakeMetadataManager::LoadDuckLake() { - auto result = transaction.Query(R"( + auto result = Query(R"( SELECT key, value, scope, scope_id FROM {METADATA_CATALOG}.ducklake_metadata )"); if (result->HasError()) { // we might be loading from a v0.1 database - if so we don't have scope yet - result = transaction.Query(R"( + result = Query(R"( SELECT key, value FROM {METADATA_CATALOG}.ducklake_metadata )"); if (result->HasError()) { @@ -243,7 +321,7 @@ FROM {METADATA_CATALOG}.ducklake_inlined_data_tables INNER JOIN {METADATA_CATALOG}.ducklake_table ON (ducklake_table.table_id = ducklake_inlined_data_tables.table_id) WHERE schema_version = {SCHEMA_ID})"; query = StringUtil::Replace(query, "{SCHEMA_ID}", to_string(schema_id)).c_str(); - auto result = transaction.Query(query); + auto result = Query(query); for (auto &row : *result) { return row.GetValue(0); } @@ -255,7 +333,7 @@ DuckLakeCatalogInfo DuckLakeMetadataManager::GetCatalogForSnapshot(DuckLakeSnaps auto &base_data_path = ducklake_catalog.DataPath(); DuckLakeCatalogInfo catalog; // load the schema information - auto result = transaction.Query(snapshot, R"( + auto result = Query(snapshot, R"( SELECT schema_id, schema_uuid::VARCHAR, schema_name, path, path_is_relative FROM {METADATA_CATALOG}.ducklake_schema WHERE {SNAPSHOT_ID} >= begin_snapshot AND ({SNAPSHOT_ID} < end_snapshot OR end_snapshot IS NULL) @@ -285,7 +363,7 @@ WHERE {SNAPSHOT_ID} >= begin_snapshot AND ({SNAPSHOT_ID} < end_snapshot OR end_s } // load the table information - result = transaction.Query(snapshot, R"( + result = Query(snapshot, R"( SELECT schema_id, tbl.table_id, table_uuid::VARCHAR, table_name, ( SELECT LIST({'key': key, 'value': value}) @@ -390,7 +468,7 @@ ORDER BY table_id, parent_column NULLS FIRST, column_order } } // load view information - result = transaction.Query(snapshot, R"( + result = Query(snapshot, R"( SELECT view_id, view_uuid, schema_id, view_name, dialect, sql, column_aliases, ( SELECT LIST({'key': key, 'value': value}) @@ -422,7 +500,7 @@ WHERE {SNAPSHOT_ID} >= begin_snapshot AND ({SNAPSHOT_ID} < view.end_snapshot OR } // load partition information - result = transaction.Query(snapshot, R"( + result = Query(snapshot, R"( SELECT partition_id, part.table_id, partition_key_index, column_id, transform FROM {METADATA_CATALOG}.ducklake_partition_info part JOIN {METADATA_CATALOG}.ducklake_partition_column part_col USING (partition_id) @@ -529,7 +607,7 @@ vector TransformGlobalStats(QueryResult &result) { vector DuckLakeMetadataManager::GetGlobalTableStats(DuckLakeSnapshot snapshot) { // query the most recent stats - auto result = transaction.Query(snapshot, R"( + auto result = Query(snapshot, R"( SELECT table_id, column_id, record_count, next_row_id, file_size_bytes, contains_null, contains_nan, min_value, max_value, extra_stats FROM {METADATA_CATALOG}.ducklake_table_stats LEFT JOIN {METADATA_CATALOG}.ducklake_table_column_stats USING (table_id) @@ -1048,7 +1126,7 @@ WHERE data.table_id=%d AND {SNAPSHOT_ID} >= data.begin_snapshot AND ({SNAPSHOT_I if (!where_clause.empty()) { query += "\nAND " + where_clause; } - auto result = transaction.Query(snapshot, query); + auto result = Query(snapshot, query); if (result->HasError()) { result->GetErrorObject().Throw("Failed to get data file list from DuckLake: "); } @@ -1093,7 +1171,7 @@ WHERE data.table_id=%d AND data.begin_snapshot >= %d AND data.begin_snapshot <= )", select_list, table_id.index, start_snapshot.snapshot_id); - auto result = transaction.Query(end_snapshot, query); + auto result = Query(end_snapshot, query); if (result->HasError()) { result->GetErrorObject().Throw("Failed to get table insertion file list from DuckLake: "); } @@ -1173,7 +1251,7 @@ USING (data_file_id), ( )", select_list, table_id.index, start_snapshot.snapshot_id, table_id.index, table_id.index, select_list, table_id.index, start_snapshot.snapshot_id, table_id.index); - auto result = transaction.Query(end_snapshot, query); + auto result = Query(end_snapshot, query); if (result->HasError()) { result->GetErrorObject().Throw("Failed to get table insertion file list from DuckLake: "); } @@ -1234,7 +1312,7 @@ WHERE data.table_id=%d AND {SNAPSHOT_ID} >= data.begin_snapshot AND ({SNAPSHOT_I query += "\nAND " + where_clause; } - auto result = transaction.Query(snapshot, query); + auto result = Query(snapshot, query); if (result->HasError()) { result->GetErrorObject().Throw("Failed to get extended data file list from DuckLake: "); } @@ -1320,7 +1398,7 @@ ORDER BY data.begin_snapshot, data.row_id_start, data.data_file_id, del.begin_sn )", select_list, table_id.index, table_id.index, deletion_threshold_clause, file_size_filter_clause); - auto result = transaction.Query(query); + auto result = Query(query); if (result->HasError()) { result->GetErrorObject().Throw("Failed to get compaction file list from DuckLake: "); } @@ -1456,12 +1534,54 @@ string DuckLakeMetadataManager::DropViews(const set &ids) { return FlushDrop("ducklake_view", "view_id", ids); } +void DuckLakeMetadataManager::FillSnapshotArgs(string &query, const DuckLakeSnapshot &snapshot) { + query = StringUtil::Replace(query, "{SNAPSHOT_ID}", to_string(snapshot.snapshot_id)); + query = StringUtil::Replace(query, "{SCHEMA_VERSION}", to_string(snapshot.schema_version)); + query = StringUtil::Replace(query, "{NEXT_CATALOG_ID}", to_string(snapshot.next_catalog_id)); + query = StringUtil::Replace(query, "{NEXT_FILE_ID}", to_string(snapshot.next_file_id)); +} + +void DuckLakeMetadataManager::FillSnapshotCommitArgs(string &query, const DuckLakeSnapshotCommit &commit_info) { + query = StringUtil::Replace(query, "{AUTHOR}", commit_info.author.ToSQLString()); + query = StringUtil::Replace(query, "{COMMIT_MESSAGE}", commit_info.commit_message.ToSQLString()); + query = StringUtil::Replace(query, "{COMMIT_EXTRA_INFO}", commit_info.commit_extra_info.ToSQLString()); +} + +void DuckLakeMetadataManager::FillCatalogArgs(string &query, const DuckLakeCatalog &ducklake_catalog) { + auto catalog_identifier = DuckLakeUtil::SQLIdentifierToString(ducklake_catalog.MetadataDatabaseName()); + auto catalog_literal = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataDatabaseName()); + auto schema_identifier = DuckLakeUtil::SQLIdentifierToString(ducklake_catalog.MetadataSchemaName()); + auto schema_identifier_escaped = StringUtil::Replace(schema_identifier, "'", "''"); + auto schema_literal = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataSchemaName()); + auto metadata_path = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataPath()); + auto data_path = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.DataPath()); + + query = StringUtil::Replace(query, "{METADATA_CATALOG_NAME_LITERAL}", catalog_literal); + query = StringUtil::Replace(query, "{METADATA_CATALOG_NAME_IDENTIFIER}", catalog_identifier); + query = StringUtil::Replace(query, "{METADATA_SCHEMA_NAME_LITERAL}", schema_literal); + query = StringUtil::Replace(query, "{METADATA_CATALOG}", schema_identifier); + query = StringUtil::Replace(query, "{METADATA_SCHEMA_ESCAPED}", schema_identifier_escaped); + query = StringUtil::Replace(query, "{METADATA_PATH}", metadata_path); + query = StringUtil::Replace(query, "{DATA_PATH}", data_path); +} + unique_ptr DuckLakeMetadataManager::Execute(DuckLakeSnapshot snapshot, string &query) { - return transaction.Query(snapshot, query); + return Query(snapshot, query); +} + +unique_ptr DuckLakeMetadataManager::Query(string query) { + auto &ducklake_catalog = transaction.GetCatalog(); + FillCatalogArgs(query, ducklake_catalog); + return transaction.Query(query); } -unique_ptr DuckLakeMetadataManager::Query(DuckLakeSnapshot snapshot, string &query) { - return transaction.Query(snapshot, query); +unique_ptr DuckLakeMetadataManager::Query(DuckLakeSnapshot snapshot, string query) { + auto &commit_info = transaction.GetCommitInfo(); + + FillSnapshotArgs(query, snapshot); + FillSnapshotCommitArgs(query, commit_info); + + return Query(query); } string DuckLakeMetadataManager::WriteNewSchemas(const vector &new_schemas) { @@ -1722,7 +1842,7 @@ WHERE table_id = %d AND schema_version=( WHERE table_id=%d );)", entry.table_id.index, entry.table_id.index); - auto result = transaction.Query(commit_snapshot, query); + auto result = Query(commit_snapshot, query); for (auto &row : *result) { inlined_table_name = row.GetValue(0); inlined_table_name_cache[entry.table_id.index] = inlined_table_name; @@ -1852,11 +1972,11 @@ shared_ptr DuckLakeMetadataManager::ReadInlinedData(DuckLak const string &inlined_table_name, const vector &columns_to_read) { auto projection = GetProjection(columns_to_read); - auto result = transaction.Query(snapshot, StringUtil::Format(R"( + auto result = Query(snapshot, StringUtil::Format(R"( SELECT %s FROM {METADATA_CATALOG}.%s inlined_data WHERE {SNAPSHOT_ID} >= begin_snapshot AND ({SNAPSHOT_ID} < end_snapshot OR end_snapshot IS NULL);)", - projection, inlined_table_name)); + projection, inlined_table_name)); return TransformInlinedData(*result); } @@ -1865,12 +1985,11 @@ DuckLakeMetadataManager::ReadInlinedDataInsertions(DuckLakeSnapshot start_snapsh const string &inlined_table_name, const vector &columns_to_read) { auto projection = GetProjection(columns_to_read); - auto result = - transaction.Query(end_snapshot, StringUtil::Format(R"( + auto result = Query(end_snapshot, StringUtil::Format(R"( SELECT %s FROM {METADATA_CATALOG}.%s inlined_data WHERE inlined_data.begin_snapshot >= %d AND inlined_data.begin_snapshot <= {SNAPSHOT_ID};)", - projection, inlined_table_name, start_snapshot.snapshot_id)); + projection, inlined_table_name, start_snapshot.snapshot_id)); return TransformInlinedData(*result); } @@ -1879,12 +1998,11 @@ DuckLakeMetadataManager::ReadInlinedDataDeletions(DuckLakeSnapshot start_snapsho const string &inlined_table_name, const vector &columns_to_read) { auto projection = GetProjection(columns_to_read); - auto result = - transaction.Query(end_snapshot, StringUtil::Format(R"( + auto result = Query(end_snapshot, StringUtil::Format(R"( SELECT %s FROM {METADATA_CATALOG}.%s inlined_data WHERE inlined_data.end_snapshot >= %d AND inlined_data.end_snapshot <= {SNAPSHOT_ID};)", - projection, inlined_table_name, start_snapshot.snapshot_id)); + projection, inlined_table_name, start_snapshot.snapshot_id)); return TransformInlinedData(*result); } @@ -1898,11 +2016,11 @@ string DuckLakeMetadataManager::GetPathForSchema(SchemaIndex schema_id, return FromRelativePath(path); } } - auto result = transaction.Query(StringUtil::Format(R"( + auto result = Query(StringUtil::Format(R"( SELECT path, path_is_relative FROM {METADATA_CATALOG}.ducklake_schema WHERE schema_id = %d;)", - schema_id.index)); + schema_id.index)); for (auto &row : *result) { DuckLakePath path; path.path = row.GetValue(0); @@ -1918,11 +2036,11 @@ string DuckLakeMetadataManager::GetPathForTable(TableIndex table_id, const vecto for (auto new_table : new_tables) { if (new_table.id == table_id) { // This is a table not yet in the catalog - auto result = transaction.Query(StringUtil::Format(R"( + auto result = Query(StringUtil::Format(R"( SELECT s.path, s.path_is_relative FROM {METADATA_CATALOG}.ducklake_schema s WHERE schema_id = %d;)", - new_table.schema_id.index)); + new_table.schema_id.index)); for (auto &row : *result) { DuckLakePath schema_path; schema_path.path = row.GetValue(0); @@ -1949,13 +2067,13 @@ WHERE schema_id = %d;)", } } } - auto result = transaction.Query(StringUtil::Format(R"( + auto result = Query(StringUtil::Format(R"( SELECT s.path, s.path_is_relative, t.path, t.path_is_relative FROM {METADATA_CATALOG}.ducklake_schema s JOIN {METADATA_CATALOG}.ducklake_table t USING (schema_id) WHERE table_id = %d;)", - table_id.index)); + table_id.index)); for (auto &row : *result) { DuckLakePath schema_path; schema_path.path = row.GetValue(0); @@ -2198,14 +2316,14 @@ vector DuckLakeMetadataManager::GetColumnMappings(opt if (start_from.IsValid()) { filter = "WHERE mapping_id >= " + to_string(start_from.GetIndex()); } - auto result = transaction.Query(StringUtil::Format(R"( + auto result = Query(StringUtil::Format(R"( SELECT mapping_id, table_id, type, column_id, source_name, target_field_id, parent_column, is_partition FROM {METADATA_CATALOG}.ducklake_column_mapping JOIN {METADATA_CATALOG}.ducklake_name_mapping USING (mapping_id) %s ORDER BY mapping_id, parent_column NULLS FIRST )", - filter)); + filter)); vector column_maps; for (auto &row : *result) { MappingIndex mapping_id(row.GetValue(0)); @@ -2356,9 +2474,9 @@ ORDER BY table_id NULLS FIRST; } SnapshotDeletedFromFiles -DuckLakeMetadataManager::GetFilesDeletedOrDroppedAfterSnapshot(const DuckLakeSnapshot &start_snapshot) const { +DuckLakeMetadataManager::GetFilesDeletedOrDroppedAfterSnapshot(const DuckLakeSnapshot &start_snapshot) { // get all changes made to the system after the snapshot was started - auto result = transaction.Query(start_snapshot, R"( + auto result = Query(start_snapshot, R"( SELECT data_file_id FROM {METADATA_CATALOG}.ducklake_delete_file WHERE begin_snapshot > {SNAPSHOT_ID} @@ -2399,7 +2517,7 @@ string DuckLakeMetadataManager::GetLatestSnapshotQuery() const { } unique_ptr DuckLakeMetadataManager::GetSnapshot() { - auto result = transaction.Query(GetLatestSnapshotQuery()); + auto result = Query(GetLatestSnapshotQuery()); if (result->HasError()) { result->GetErrorObject().Throw("Failed to query most recent snapshot for DuckLake: "); } @@ -2417,21 +2535,21 @@ unique_ptr DuckLakeMetadataManager::GetSnapshot(BoundAtClause const string timestamp_aggregate = bound == SnapshotBound::LOWER_BOUND ? "MIN" : "MAX"; const string timestamp_condition = bound == SnapshotBound::LOWER_BOUND ? ">" : "<"; if (StringUtil::CIEquals(unit, "version")) { - result = transaction.Query(StringUtil::Format(R"( + result = Query(StringUtil::Format(R"( SELECT snapshot_id, schema_version, next_catalog_id, next_file_id FROM {METADATA_CATALOG}.ducklake_snapshot WHERE snapshot_id = %llu;)", - val.DefaultCastAs(LogicalType::UBIGINT).GetValue())); + val.DefaultCastAs(LogicalType::UBIGINT).GetValue())); } else if (StringUtil::CIEquals(unit, "timestamp")) { - result = transaction.Query(StringUtil::Format(R"( + result = Query(StringUtil::Format(R"( SELECT snapshot_id, schema_version, next_catalog_id, next_file_id FROM {METADATA_CATALOG}.ducklake_snapshot WHERE snapshot_id = ( SELECT %s_BY(snapshot_id, snapshot_time) FROM {METADATA_CATALOG}.ducklake_snapshot WHERE snapshot_time %s= %s);)", - timestamp_aggregate, timestamp_condition, - val.DefaultCastAs(LogicalType::VARCHAR).ToSQLString())); + timestamp_aggregate, timestamp_condition, + val.DefaultCastAs(LogicalType::VARCHAR).ToSQLString())); } else { throw InvalidInputException("Unsupported AT clause unit - %s", unit); } @@ -2689,14 +2807,14 @@ static timestamp_tz_t GetTimestampTZFromRow(ClientContext &context, const T &row vector DuckLakeMetadataManager::GetAllSnapshots(const string &filter) { - auto res = transaction.Query(StringUtil::Format(R"( + auto res = Query(StringUtil::Format(R"( SELECT snapshot_id, snapshot_time, schema_version, changes_made, author, commit_message, commit_extra_info FROM {METADATA_CATALOG}.ducklake_snapshot LEFT JOIN {METADATA_CATALOG}.ducklake_snapshot_changes USING (snapshot_id) %s %s ORDER BY snapshot_id )", - filter.empty() ? "" : "WHERE", filter)); + filter.empty() ? "" : "WHERE", filter)); if (res->HasError()) { res->GetErrorObject().Throw("Failed to get snapshot information from DuckLake: "); } @@ -2722,7 +2840,7 @@ vector DuckLakeMetadataManager::GetOldFilesForCleanup(co SELECT data_file_id, path, path_is_relative, schedule_start FROM {METADATA_CATALOG}.ducklake_files_scheduled_for_deletion )" + filter; - auto res = transaction.Query(query); + auto res = Query(query); if (res->HasError()) { res->GetErrorObject().Throw("Failed to get files scheduled for deletion from DuckLake: "); } @@ -2782,7 +2900,7 @@ FROM {METADATA_CATALOG}.ducklake_files_scheduled_for_deletion f ) )" + filter; query = StringUtil::Replace(query, "{SEPARATOR}", separator); - auto res = transaction.Query(query); + auto res = Query(query); if (res->HasError()) { res->GetErrorObject().Throw("Failed to get files scheduled for deletion from DuckLake: "); } @@ -2816,23 +2934,23 @@ void DuckLakeMetadataManager::RemoveFilesScheduledForCleanup(const vectorHasError()) { result->GetErrorObject().Throw("Failed to delete scheduled cleanup files in DuckLake: "); } } idx_t DuckLakeMetadataManager::GetNextColumnId(TableIndex table_id) { - auto result = transaction.Query(StringUtil::Format(R"( + auto result = Query(StringUtil::Format(R"( SELECT MAX(column_id) FROM {METADATA_CATALOG}.ducklake_column WHERE table_id=%d )", - table_id.index)); + table_id.index)); if (result->HasError()) { result->GetErrorObject().Throw("Failed to get next column id in DuckLake: "); } @@ -2983,17 +3101,17 @@ void DuckLakeMetadataManager::DeleteSnapshots(const vector } vector tables_to_delete_from {"ducklake_snapshot", "ducklake_snapshot_changes"}; for (auto &delete_tbl : tables_to_delete_from) { - result = transaction.Query(StringUtil::Format(R"( + result = Query(StringUtil::Format(R"( DELETE FROM {METADATA_CATALOG}.%s WHERE snapshot_id IN (%s); )", - delete_tbl, snapshot_ids)); + delete_tbl, snapshot_ids)); if (result->HasError()) { result->GetErrorObject().Throw("Failed to delete snapshots in DuckLake: "); } } // get a list of tables that are no longer required after these deletions - result = transaction.Query(R"( + result = Query(R"( SELECT table_id FROM {METADATA_CATALOG}.ducklake_table t WHERE end_snapshot IS NOT NULL AND NOT EXISTS ( @@ -3028,7 +3146,7 @@ AND NOT EXISTS ( table_id_filter = StringUtil::Format("table_id IN (%s) OR", deleted_table_ids); } - result = transaction.Query(StringUtil::Format(R"( + result = Query(StringUtil::Format(R"( SELECT data_file_id, table_id, path, path_is_relative FROM {METADATA_CATALOG}.ducklake_data_file WHERE %s (end_snapshot IS NOT NULL AND NOT EXISTS( @@ -3036,7 +3154,7 @@ WHERE %s (end_snapshot IS NOT NULL AND NOT EXISTS( FROM {METADATA_CATALOG}.ducklake_snapshot WHERE snapshot_id >= begin_snapshot AND snapshot_id < end_snapshot ));)", - table_id_filter)); + table_id_filter)); vector cleanup_files; for (auto &row : *result) { DuckLakeFileForCleanup info; @@ -3069,21 +3187,21 @@ WHERE %s (end_snapshot IS NOT NULL AND NOT EXISTS( // delete the data files tables_to_delete_from = {"ducklake_data_file", "ducklake_file_column_stats"}; for (auto &delete_tbl : tables_to_delete_from) { - result = transaction.Query(StringUtil::Format(R"( + result = Query(StringUtil::Format(R"( DELETE FROM {METADATA_CATALOG}.%s WHERE data_file_id IN (%s); )", - delete_tbl, deleted_file_ids)); + delete_tbl, deleted_file_ids)); if (result->HasError()) { result->GetErrorObject().Throw("Failed to delete old data file information in DuckLake: "); } } // insert the to-be-cleaned-up files - result = transaction.Query(StringUtil::Format(R"( + result = Query(StringUtil::Format(R"( INSERT INTO {METADATA_CATALOG}.ducklake_files_scheduled_for_deletion VALUES %s; )", - files_scheduled_for_cleanup)); + files_scheduled_for_cleanup)); if (result->HasError()) { result->GetErrorObject().Throw("Failed to schedule files for clean-up in DuckLake: "); } @@ -3095,7 +3213,7 @@ VALUES %s; file_id_filter = StringUtil::Format("data_file_id IN (%s) OR", deleted_file_ids); } - result = transaction.Query(StringUtil::Format(R"( + result = Query(StringUtil::Format(R"( SELECT delete_file_id, table_id, path, path_is_relative FROM {METADATA_CATALOG}.ducklake_delete_file WHERE %s %s (end_snapshot IS NOT NULL AND NOT EXISTS( @@ -3103,7 +3221,7 @@ WHERE %s %s (end_snapshot IS NOT NULL AND NOT EXISTS( FROM {METADATA_CATALOG}.ducklake_snapshot WHERE snapshot_id >= begin_snapshot AND snapshot_id < end_snapshot ));)", - table_id_filter, file_id_filter)); + table_id_filter, file_id_filter)); vector cleanup_deletes; for (auto &row : *result) { DuckLakeFileForCleanup info; @@ -3134,20 +3252,20 @@ WHERE %s %s (end_snapshot IS NOT NULL AND NOT EXISTS( "(%d, %s, %s, NOW())", file.id.index, SQLString(path.path), path.path_is_relative ? "true" : "false"); } // delete the delete files - result = transaction.Query(StringUtil::Format(R"( + result = Query(StringUtil::Format(R"( DELETE FROM {METADATA_CATALOG}.ducklake_delete_file WHERE delete_file_id IN (%s); )", - deleted_delete_ids)); + deleted_delete_ids)); if (result->HasError()) { result->GetErrorObject().Throw("Failed to delete old delete file information in DuckLake: "); } // insert the to-be-cleaned-up files - result = transaction.Query(StringUtil::Format(R"( + result = Query(StringUtil::Format(R"( INSERT INTO {METADATA_CATALOG}.ducklake_files_scheduled_for_deletion VALUES %s; )", - files_scheduled_for_cleanup)); + files_scheduled_for_cleanup)); if (result->HasError()) { result->GetErrorObject().Throw("Failed to schedule files for clean-up in DuckLake: "); } @@ -3159,10 +3277,10 @@ VALUES %s; "ducklake_partition_info", "ducklake_partition_column", "ducklake_column", "ducklake_column_tag"}; for (auto &delete_tbl : tables_to_delete_from) { - auto result = transaction.Query(StringUtil::Format(R"( + auto result = Query(StringUtil::Format(R"( DELETE FROM {METADATA_CATALOG}.%s WHERE table_id IN (%s);)", - delete_tbl, deleted_table_ids)); + delete_tbl, deleted_table_ids)); if (result->HasError()) { result->GetErrorObject().Throw("Failed to delete from " + delete_tbl + " in DuckLake: "); } @@ -3172,14 +3290,14 @@ WHERE table_id IN (%s);)", // delete any views, schemas, etc that are no longer referenced tables_to_delete_from = {"ducklake_schema", "ducklake_view", "ducklake_tag"}; for (auto &delete_tbl : tables_to_delete_from) { - auto result = transaction.Query(StringUtil::Format(R"( + auto result = Query(StringUtil::Format(R"( DELETE FROM {METADATA_CATALOG}.%s WHERE end_snapshot IS NOT NULL AND NOT EXISTS( SELECT snapshot_id FROM {METADATA_CATALOG}.ducklake_snapshot WHERE snapshot_id >= begin_snapshot AND snapshot_id < end_snapshot );)", - delete_tbl)); + delete_tbl)); if (result->HasError()) { result->GetErrorObject().Throw("Failed to delete from " + delete_tbl + " in DuckLake: "); } @@ -3187,10 +3305,10 @@ WHERE end_snapshot IS NOT NULL AND NOT EXISTS( } void DuckLakeMetadataManager::DeleteInlinedData(const DuckLakeInlinedTableInfo &inlined_table) { - auto result = transaction.Query(StringUtil::Format(R"( + auto result = Query(StringUtil::Format(R"( DELETE FROM {METADATA_CATALOG}.%s )", - SQLIdentifier(inlined_table.table_name))); + SQLIdentifier(inlined_table.table_name))); if (result->HasError()) { result->GetErrorObject().Throw("Failed to delete inlined data in DuckLake from table " + inlined_table.table_name + ": "); @@ -3204,7 +3322,7 @@ string DuckLakeMetadataManager::InsertNewSchema(const DuckLakeSnapshot &snapshot vector DuckLakeMetadataManager::GetTableSizes(DuckLakeSnapshot snapshot) { vector table_sizes; - auto result = transaction.Query(snapshot, R"( + auto result = Query(snapshot, R"( SELECT schema_id, table_id, table_name, table_uuid, data_file_info.file_count, data_file_info.total_file_size, delete_file_info.file_count, delete_file_info.total_file_size FROM {METADATA_CATALOG}.ducklake_table tbl, LATERAL ( SELECT COUNT(*) file_count, SUM(file_size_bytes) total_file_size @@ -3260,26 +3378,26 @@ void DuckLakeMetadataManager::SetConfigOption(const DuckLakeConfigOption &option scope_id = "NULL"; scope_filter = "scope IS NULL"; } - auto result = transaction.Query(StringUtil::Format(R"( + auto result = Query(StringUtil::Format(R"( SELECT COUNT(*) FROM {METADATA_CATALOG}.ducklake_metadata WHERE key = %s AND %s )", - SQLString(option_key), scope_filter)); + SQLString(option_key), scope_filter)); auto count = result->Fetch()->GetValue(0, 0).GetValue(); if (count == 0) { // option does not yet exist - insert the value - result = transaction.Query(StringUtil::Format(R"( + result = Query(StringUtil::Format(R"( INSERT INTO {METADATA_CATALOG}.ducklake_metadata VALUES (%s, %s, %s, %s) )", - SQLString(option_key), SQLString(option_value), scope, scope_id)); + SQLString(option_key), SQLString(option_value), scope, scope_id)); } else { // option already exists - update it - result = transaction.Query(StringUtil::Format(R"( + result = Query(StringUtil::Format(R"( UPDATE {METADATA_CATALOG}.ducklake_metadata SET value=%s WHERE key=%s AND %s )", - SQLString(option_value), SQLString(option_key), scope_filter)); + SQLString(option_value), SQLString(option_key), scope_filter)); } if (result->HasError()) { result->GetErrorObject().Throw("Failed to insert config option in DuckLake: "); diff --git a/src/storage/ducklake_transaction.cpp b/src/storage/ducklake_transaction.cpp index 1b61c2b36b2..2b43cf0b53b 100644 --- a/src/storage/ducklake_transaction.cpp +++ b/src/storage/ducklake_transaction.cpp @@ -9,6 +9,7 @@ #include "duckdb/main/database_manager.hpp" #include "duckdb/planner/tableref/bound_at_clause.hpp" #include "storage/ducklake_catalog.hpp" +#include "storage/ducklake_metadata_manager.hpp" #include "storage/ducklake_schema_entry.hpp" #include "storage/ducklake_table_entry.hpp" #include "storage/ducklake_transaction_changes.hpp" @@ -1470,7 +1471,9 @@ void DuckLakeTransaction::FlushChanges() { if (res->HasError()) { res->GetErrorObject().Throw("Failed to flush changes into DuckLake: "); } - connection->Commit(); + if (connection) { + connection->Commit(); + } catalog_version = commit_snapshot.schema_version; // finished writing @@ -1540,36 +1543,10 @@ void DuckLakeTransaction::DeleteInlinedData(const DuckLakeInlinedTableInfo &inli unique_ptr DuckLakeTransaction::Query(string query) { auto &connection = GetConnection(); - auto catalog_identifier = DuckLakeUtil::SQLIdentifierToString(ducklake_catalog.MetadataDatabaseName()); - auto catalog_literal = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataDatabaseName()); - auto schema_identifier = DuckLakeUtil::SQLIdentifierToString(ducklake_catalog.MetadataSchemaName()); - auto schema_identifier_escaped = StringUtil::Replace(schema_identifier, "'", "''"); - auto schema_literal = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataSchemaName()); - auto metadata_path = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataPath()); - auto data_path = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.DataPath()); - - query = StringUtil::Replace(query, "{METADATA_CATALOG_NAME_LITERAL}", catalog_literal); - query = StringUtil::Replace(query, "{METADATA_CATALOG_NAME_IDENTIFIER}", catalog_identifier); - query = StringUtil::Replace(query, "{METADATA_SCHEMA_NAME_LITERAL}", schema_literal); - query = StringUtil::Replace(query, "{METADATA_CATALOG}", catalog_identifier + "." + schema_identifier); - query = StringUtil::Replace(query, "{METADATA_SCHEMA_ESCAPED}", schema_identifier_escaped); - query = StringUtil::Replace(query, "{METADATA_PATH}", metadata_path); - query = StringUtil::Replace(query, "{DATA_PATH}", data_path); + DuckLakeMetadataManager::FillCatalogArgs(query, ducklake_catalog); return connection.Query(query); } -unique_ptr DuckLakeTransaction::Query(DuckLakeSnapshot snapshot, string query) { - query = StringUtil::Replace(query, "{SNAPSHOT_ID}", to_string(snapshot.snapshot_id)); - query = StringUtil::Replace(query, "{SCHEMA_VERSION}", to_string(snapshot.schema_version)); - query = StringUtil::Replace(query, "{NEXT_CATALOG_ID}", to_string(snapshot.next_catalog_id)); - query = StringUtil::Replace(query, "{NEXT_FILE_ID}", to_string(snapshot.next_file_id)); - query = StringUtil::Replace(query, "{AUTHOR}", commit_info.author.ToSQLString()); - query = StringUtil::Replace(query, "{COMMIT_MESSAGE}", commit_info.commit_message.ToSQLString()); - query = StringUtil::Replace(query, "{COMMIT_EXTRA_INFO}", commit_info.commit_extra_info.ToSQLString()); - - return Query(std::move(query)); -} - string DuckLakeTransaction::GetDefaultSchemaName() { auto &metadata_context = *connection->context; auto &db_manager = DatabaseManager::Get(metadata_context);