Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/include/metadata_manager/postgres_metadata_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@ class PostgresMetadataManager : public DuckLakeMetadataManager {
public:
explicit PostgresMetadataManager(DuckLakeTransaction &transaction);

static unique_ptr<DuckLakeMetadataManager> Create(DuckLakeTransaction &transaction) {
return make_uniq<PostgresMetadataManager>(transaction);
}

bool TypeIsNativelySupported(const LogicalType &type) override;

string GetColumnTypeInternal(const LogicalType &type) override;

unique_ptr<QueryResult> Execute(DuckLakeSnapshot snapshot, string &query) override;

unique_ptr<QueryResult> Query(DuckLakeSnapshot snapshot, string &query) override;
unique_ptr<QueryResult> Query(string query) override;
unique_ptr<QueryResult> Query(DuckLakeSnapshot snapshot, string query) override;

protected:
string GetLatestSnapshotQuery() const override;

private:
unique_ptr<QueryResult> ExecuteQuery(DuckLakeSnapshot snapshot, string &query, string command);
unique_ptr<QueryResult> ExecuteQuery(string &query, string command);
};

} // namespace duckdb
1 change: 0 additions & 1 deletion src/include/storage/ducklake_initializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class DuckLakeInitializer {
void InitializeNewDuckLake(DuckLakeTransaction &transaction, bool has_explicit_schema);
void LoadExistingDuckLake(DuckLakeTransaction &transaction);
void InitializeDataPath();
string GetAttachOptions();
void CheckAndAutoloadedRequiredExtension(const string &pattern);

private:
Expand Down
19 changes: 17 additions & 2 deletions src/include/storage/ducklake_metadata_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "duckdb/common/reference_map.hpp"
#include "duckdb/common/types/value.hpp"
#include "common/ducklake_snapshot.hpp"
#include "storage/ducklake_catalog.hpp"
#include "storage/ducklake_partition_data.hpp"
#include "storage/ducklake_stats.hpp"
#include "duckdb/common/types/timestamp.hpp"
Expand Down Expand Up @@ -97,6 +98,9 @@ class DuckLakeMetadataManager {
explicit DuckLakeMetadataManager(DuckLakeTransaction &transaction);
virtual ~DuckLakeMetadataManager();

typedef unique_ptr<DuckLakeMetadataManager> (*create_t)(DuckLakeTransaction &transaction);
static void Register(const string &name, create_t);

static unique_ptr<DuckLakeMetadataManager> Create(DuckLakeTransaction &transaction);

virtual bool TypeIsNativelySupported(const LogicalType &type);
Expand All @@ -105,13 +109,22 @@ class DuckLakeMetadataManager {

DuckLakeMetadataManager &Get(DuckLakeTransaction &transaction);

virtual bool IsInitialized(DuckLakeOptions &options);
//! Initialize a new DuckLake
virtual void InitializeDuckLake(bool has_explicit_schema, DuckLakeEncryption encryption);
virtual DuckLakeMetadata LoadDuckLake();

static void FillSnapshotArgs(string &query, const DuckLakeSnapshot &snapshot);
static void FillSnapshotCommitArgs(string &query, const DuckLakeSnapshotCommit &commit_info);
static void FillCatalogArgs(string &query, const DuckLakeCatalog &ducklake_catalog);

//! Directly execute on metadata
virtual unique_ptr<QueryResult> Execute(DuckLakeSnapshot snapshot, string &query);

virtual unique_ptr<QueryResult> Query(DuckLakeSnapshot snapshot, string &query);
//! Directly query on metadata
virtual unique_ptr<QueryResult> Query(string query);
//! Directly query on metadata
virtual unique_ptr<QueryResult> Query(DuckLakeSnapshot snapshot, string query);
//! Get the catalog information for a specific snapshot
virtual DuckLakeCatalogInfo GetCatalogForSnapshot(DuckLakeSnapshot snapshot);
virtual vector<DuckLakeGlobalStatsInfo> GetGlobalTableStats(DuckLakeSnapshot snapshot);
Expand Down Expand Up @@ -175,7 +188,7 @@ class DuckLakeMetadataManager {
virtual string UpdateGlobalTableStats(const DuckLakeGlobalStatsInfo &stats);
virtual SnapshotChangeInfo GetSnapshotAndStatsAndChanges(DuckLakeSnapshot start_snapshot,
SnapshotAndStats &current_snapshot);
SnapshotDeletedFromFiles GetFilesDeletedOrDroppedAfterSnapshot(const DuckLakeSnapshot &start_snapshot) const;
SnapshotDeletedFromFiles GetFilesDeletedOrDroppedAfterSnapshot(const DuckLakeSnapshot &start_snapshot);
virtual unique_ptr<DuckLakeSnapshot> GetSnapshot();
virtual unique_ptr<DuckLakeSnapshot> GetSnapshot(BoundAtClause &at_clause, SnapshotBound bound);
virtual idx_t GetNextColumnId(TableIndex table_id);
Expand Down Expand Up @@ -257,6 +270,8 @@ class DuckLakeMetadataManager {

private:
unordered_map<idx_t, string> inlined_table_name_cache;
static unordered_map<string /* name */, create_t> metadata_managers;
static mutex metadata_managers_lock;

protected:
DuckLakeTransaction &transaction;
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/ducklake_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DuckLakeTransaction : public Transaction, public enable_shared_from_this<D
DuckLakeSnapshotCommit &GetCommitInfo() {
return commit_info;
}
unique_ptr<QueryResult> Query(DuckLakeSnapshot snapshot, string query);
//! Run query on duckdb
unique_ptr<QueryResult> Query(string query);
Connection &GetConnection();

Expand Down
41 changes: 15 additions & 26 deletions src/metadata_manager/postgres_metadata_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "common/ducklake_util.hpp"
#include "duckdb/main/database.hpp"
#include "storage/ducklake_catalog.hpp"
#include "storage/ducklake_metadata_manager.hpp"
#include "storage/ducklake_transaction.hpp"

namespace duckdb {
Expand Down Expand Up @@ -46,44 +47,32 @@ string PostgresMetadataManager::GetColumnTypeInternal(const LogicalType &column_
}
}

unique_ptr<QueryResult> PostgresMetadataManager::ExecuteQuery(DuckLakeSnapshot snapshot, string &query,
string command) {
unique_ptr<QueryResult> PostgresMetadataManager::ExecuteQuery(string &query, string command) {
auto &commit_info = transaction.GetCommitInfo();

query = StringUtil::Replace(query, "{SNAPSHOT_ID}", to_string(snapshot.snapshot_id));
query = StringUtil::Replace(query, "{SCHEMA_VERSION}", to_string(snapshot.schema_version));
query = StringUtil::Replace(query, "{NEXT_CATALOG_ID}", to_string(snapshot.next_catalog_id));
query = StringUtil::Replace(query, "{NEXT_FILE_ID}", to_string(snapshot.next_file_id));
query = StringUtil::Replace(query, "{AUTHOR}", commit_info.author.ToSQLString());
query = StringUtil::Replace(query, "{COMMIT_MESSAGE}", commit_info.commit_message.ToSQLString());
query = StringUtil::Replace(query, "{COMMIT_EXTRA_INFO}", commit_info.commit_extra_info.ToSQLString());
DuckLakeMetadataManager::FillSnapshotCommitArgs(query, commit_info);

auto &connection = transaction.GetConnection();
auto &ducklake_catalog = transaction.GetCatalog();
auto catalog_identifier = DuckLakeUtil::SQLIdentifierToString(ducklake_catalog.MetadataDatabaseName());
auto catalog_literal = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataDatabaseName());
auto schema_identifier = DuckLakeUtil::SQLIdentifierToString(ducklake_catalog.MetadataSchemaName());
auto schema_identifier_escaped = StringUtil::Replace(schema_identifier, "'", "''");
auto schema_literal = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataSchemaName());
auto metadata_path = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.MetadataPath());
auto data_path = DuckLakeUtil::SQLLiteralToString(ducklake_catalog.DataPath());

query = StringUtil::Replace(query, "{METADATA_CATALOG_NAME_LITERAL}", catalog_literal);
query = StringUtil::Replace(query, "{METADATA_CATALOG_NAME_IDENTIFIER}", catalog_identifier);
query = StringUtil::Replace(query, "{METADATA_SCHEMA_NAME_LITERAL}", schema_literal);
query = StringUtil::Replace(query, "{METADATA_CATALOG}", schema_identifier);
query = StringUtil::Replace(query, "{METADATA_SCHEMA_ESCAPED}", schema_identifier_escaped);
query = StringUtil::Replace(query, "{METADATA_PATH}", metadata_path);
query = StringUtil::Replace(query, "{DATA_PATH}", data_path);
DuckLakeMetadataManager::FillCatalogArgs(query, ducklake_catalog);

return connection.Query(StringUtil::Format("CALL %s(%s, %s)", command, catalog_literal, SQLString(query)));
return connection.Query(
StringUtil::Format("CALL %s(%s, %s)", std::move(command), catalog_literal, SQLString(query)));
}
unique_ptr<QueryResult> PostgresMetadataManager::Execute(DuckLakeSnapshot snapshot, string &query) {
return ExecuteQuery(snapshot, query, "postgres_execute");
DuckLakeMetadataManager::FillSnapshotArgs(query, snapshot);
return ExecuteQuery(query, "postgres_execute");
}

unique_ptr<QueryResult> PostgresMetadataManager::Query(DuckLakeSnapshot snapshot, string &query) {
return ExecuteQuery(snapshot, query, "postgres_query");
unique_ptr<QueryResult> PostgresMetadataManager::Query(string query) {
return ExecuteQuery(query, "postgres_query");
}

unique_ptr<QueryResult> PostgresMetadataManager::Query(DuckLakeSnapshot snapshot, string query) {
DuckLakeMetadataManager::FillSnapshotArgs(query, snapshot);
return Query(query);
}

string PostgresMetadataManager::GetLatestSnapshotQuery() const {
Expand Down
59 changes: 4 additions & 55 deletions src/storage/ducklake_initializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,16 @@ DuckLakeInitializer::DuckLakeInitializer(ClientContext &context, DuckLakeCatalog
InitializeDataPath();
}

string DuckLakeInitializer::GetAttachOptions() {
vector<string> attach_options;
if (options.access_mode != AccessMode::AUTOMATIC) {
switch (options.access_mode) {
case AccessMode::READ_ONLY:
attach_options.push_back("READ_ONLY");
break;
case AccessMode::READ_WRITE:
attach_options.push_back("READ_WRITE");
break;
default:
throw InternalException("Unsupported access mode in DuckLake attach");
}
}
for (auto &option : options.metadata_parameters) {
attach_options.push_back(option.first + " " + option.second.ToSQLString());
}

if (attach_options.empty()) {
return string();
}
string result;
for (auto &option : attach_options) {
if (!result.empty()) {
result += ", ";
}
result += option;
}
return " (" + result + ")";
}

void DuckLakeInitializer::Initialize() {
auto &transaction = DuckLakeTransaction::Get(context, catalog);
// attach the metadata database
auto result =
transaction.Query("ATTACH {METADATA_PATH} AS {METADATA_CATALOG_NAME_IDENTIFIER}" + GetAttachOptions());
if (result->HasError()) {
auto &error_obj = result->GetErrorObject();
error_obj.Throw("Failed to attach DuckLake MetaData \"" + catalog.MetadataDatabaseName() + "\" at path + \"" +
catalog.MetadataPath() + "\"");
}
// explicitly load all secrets - work-around to secret initialization bug
transaction.Query("FROM duckdb_secrets()");

auto &metadata_manager = transaction.GetMetadataManager();
bool has_explicit_schema = !options.metadata_schema.empty();
if (options.metadata_schema.empty()) {
// if the schema is not explicitly set by the user - set it to the default schema in the catalog
options.metadata_schema = transaction.GetDefaultSchemaName();
}
// after the metadata database is attached initialize the ducklake
// check if we are loading an existing DuckLake or creating a new one
// FIXME: verify that all tables are in the correct format instead
result = transaction.Query(
"SELECT COUNT(*) FROM duckdb_tables() WHERE database_name={METADATA_CATALOG_NAME_LITERAL} AND "
"schema_name={METADATA_SCHEMA_NAME_LITERAL} AND table_name LIKE 'ducklake_%'");
if (result->HasError()) {
auto &error_obj = result->GetErrorObject();
error_obj.Throw("Failed to load DuckLake table data");
}
auto count = result->Fetch()->GetValue(0, 0).GetValue<idx_t>();
if (count == 0) {

bool is_initialized = metadata_manager.IsInitialized(options);
if (!is_initialized) {
if (!options.create_if_not_exists) {
throw InvalidInputException("Existing DuckLake at metadata catalog \"%s\" does not exist - and creating a "
"new DuckLake is explicitly disabled",
Expand Down
Loading
Loading