Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
eb59a29
Working hardcoded compaction sort!
Alex-Monahan Oct 5, 2025
f2b5947
parse order by string and manually bind.
Alex-Monahan Oct 21, 2025
856e479
approx_order_by param in merge_adjacent_files
Alex-Monahan Oct 24, 2025
3b52466
config option for approx_order_by
Alex-Monahan Oct 25, 2025
3b24a22
Refactor into a method on DuckLakeCompactor
Alex-Monahan Oct 25, 2025
d670375
Move compactor to .hpp, GetApproxOrderBy fn, WIP inlined order by
Alex-Monahan Oct 26, 2025
b214ccd
working tests for ducklake_flush_inlined_data!
Alex-Monahan Oct 27, 2025
295e7f5
Use const references. More commented out attempt to dynamically bind
Alex-Monahan Nov 11, 2025
e67d5e2
rename parameter to local_order_by
Alex-Monahan Nov 11, 2025
87964df
Rename fns, - comments/prints,+ negative test
Alex-Monahan Dec 3, 2025
cf11b9b
Merge remote-tracking branch 'origin' into ordered-compaction
Alex-Monahan Dec 3, 2025
d12e237
Accept SET SORTED BY syntax (do nothing yet)
Alex-Monahan Dec 5, 2025
62beb45
SET SORTED inserts to DB! Lots of wiring...
Alex-Monahan Dec 16, 2025
41da46b
Working compact with sort_data instead of option! WIP cleanup.
Alex-Monahan Dec 17, 2025
cf6c617
Update tests to use new syntax. Passing!
Alex-Monahan Dec 17, 2025
6bc926b
Inlining tests using new syntax working!
Alex-Monahan Dec 17, 2025
d9e4e61
Remove local_order_by option and naming
Alex-Monahan Dec 17, 2025
f8b4425
inline flush sorted within txn works. Not compaction (seems ok?). Tes…
Alex-Monahan Dec 22, 2025
1449bdb
Revert rename of duckdb property
Alex-Monahan Dec 22, 2025
f55888b
RESET SORTED BY works and is tested!
Alex-Monahan Dec 22, 2025
10d527d
If sorts match, don't insert to catalog
Alex-Monahan Dec 22, 2025
f89e12a
Merge branch 'main' into ordered-compaction-catalog
Alex-Monahan Dec 22, 2025
e3bf7a4
Remove duplicate InsertSort fn
Alex-Monahan Dec 22, 2025
6b612cb
batch_queries for sort catalog operations
Alex-Monahan Dec 22, 2025
a0a2424
Remove comment block
Alex-Monahan Dec 22, 2025
4c13a28
retarget to duckdb main
Alex-Monahan Dec 22, 2025
61cc765
try to point duckdb submodule to correct commit
Alex-Monahan Dec 22, 2025
c988bed
extension-ci-tools git commit hash fix
Alex-Monahan Dec 22, 2025
b73fc7d
Undo old ducklake_option edits. Edit comments
Alex-Monahan Dec 22, 2025
a1a6b26
Add FIXME for expressions in sort. (And re-run CI/CD)
Alex-Monahan Dec 22, 2025
b50cf05
Merge branch 'main' into ordered-compaction-catalog
Alex-Monahan Jan 2, 2026
f167222
make format fixes
Alex-Monahan Jan 2, 2026
c1825f1
rename catalog table to ducklake_sort_info
Alex-Monahan Jan 2, 2026
20e2c11
Merge branch 'main' into ordered-compaction-catalog
Alex-Monahan Jan 8, 2026
d864904
Fix merge by adding old sort table back
Alex-Monahan Jan 8, 2026
3286d82
add ducklake_sort_expression table to spec. Update tests.
Alex-Monahan Jan 8, 2026
cbb5318
Error on SET SORTED BY if wrong columns or non-columns
Alex-Monahan Jan 8, 2026
3152e2e
col names from latest_table. Test post rename
Alex-Monahan Jan 9, 2026
7ac3077
PR feedback, case ins. equals, split to fns
Alex-Monahan Jan 12, 2026
7434510
Split existing merge adjacent tests
Alex-Monahan Jan 12, 2026
9f4e352
cleanup test comments
Alex-Monahan Jan 12, 2026
ebcfc4a
Split existing inline flush tests
Alex-Monahan Jan 12, 2026
fb5dd0b
test that SET SORTED BY applies by table_id not by name
Alex-Monahan Jan 13, 2026
918ebf3
test that sort catalog tables cleared out by expire_snapshots
Alex-Monahan Jan 13, 2026
27cfa9a
Don't update schema_version on SET SORTED BY. Track updates without s…
Alex-Monahan Jan 15, 2026
7ae9384
format fixes
Alex-Monahan Jan 15, 2026
bd8c991
Comments on tables, table columns, and views do not increase schema_v…
Alex-Monahan Jan 15, 2026
58f23df
format fix
Alex-Monahan Jan 15, 2026
79064e6
add test for snapshots() changes
Alex-Monahan Jan 15, 2026
f17d7f2
Fix error message
Alex-Monahan Jan 15, 2026
dba148b
Fix error message in test
Alex-Monahan Jan 15, 2026
fcca281
Fix another error message
Alex-Monahan Jan 15, 2026
062d00b
Tests: Rollbacks, sort & rename cols in same txn
Alex-Monahan Jan 16, 2026
cc4488f
format fixes
Alex-Monahan Jan 16, 2026
f24ddc2
Relax assertion: the table_entry_map contains tables and views.
Alex-Monahan Jan 16, 2026
c55a083
std::string to string
Alex-Monahan Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 115 additions & 68 deletions src/functions/ducklake_compaction_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
#include "duckdb/planner/operator/logical_empty_result.hpp"
#include "fmt/format.h"

#include "functions/ducklake_compaction_functions.hpp"
#include "duckdb/planner/operator/logical_order.hpp"
#include "duckdb/planner/operator/logical_projection.hpp"
#include "duckdb/parser/parser.hpp"
#include "duckdb/parser/query_node/select_node.hpp"
#include "duckdb/planner/expression_binder/order_binder.hpp"
#include "duckdb/planner/expression_binder/select_bind_state.hpp"

namespace duckdb {

//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -91,74 +99,6 @@ string DuckLakeCompaction::GetName() const {
return "DUCKLAKE_COMPACTION";
}

//===--------------------------------------------------------------------===//
// Logical Operator
//===--------------------------------------------------------------------===//
class DuckLakeLogicalCompaction : public LogicalExtensionOperator {
public:
DuckLakeLogicalCompaction(idx_t table_index, DuckLakeTableEntry &table,
vector<DuckLakeCompactionFileEntry> source_files_p, string encryption_key_p,
optional_idx partition_id, vector<string> partition_values_p, optional_idx row_id_start,
CompactionType type)
: table_index(table_index), table(table), source_files(std::move(source_files_p)),
encryption_key(std::move(encryption_key_p)), partition_id(partition_id),
partition_values(std::move(partition_values_p)), row_id_start(row_id_start), type(type) {
}

idx_t table_index;
DuckLakeTableEntry &table;
vector<DuckLakeCompactionFileEntry> source_files;
string encryption_key;
optional_idx partition_id;
vector<string> partition_values;
optional_idx row_id_start;
CompactionType type;

public:
PhysicalOperator &CreatePlan(ClientContext &context, PhysicalPlanGenerator &planner) override {
auto &child = planner.CreatePlan(*children[0]);
return planner.Make<DuckLakeCompaction>(types, table, std::move(source_files), std::move(encryption_key),
partition_id, std::move(partition_values), row_id_start, child, type);
}

string GetExtensionName() const override {
return "ducklake";
}
vector<ColumnBinding> GetColumnBindings() override {
vector<ColumnBinding> result;
result.emplace_back(table_index, 0);
return result;
}

void ResolveTypes() override {
types = {LogicalType::BOOLEAN};
}
};

//===--------------------------------------------------------------------===//
// Compaction Command Generator
//===--------------------------------------------------------------------===//
class DuckLakeCompactor {
public:
DuckLakeCompactor(ClientContext &context, DuckLakeCatalog &catalog, DuckLakeTransaction &transaction,
Binder &binder, TableIndex table_id, DuckLakeMergeAdjacentOptions options);
DuckLakeCompactor(ClientContext &context, DuckLakeCatalog &catalog, DuckLakeTransaction &transaction,
Binder &binder, TableIndex table_id, double delete_threshold);
void GenerateCompactions(DuckLakeTableEntry &table, vector<unique_ptr<LogicalOperator>> &compactions);
unique_ptr<LogicalOperator> GenerateCompactionCommand(vector<DuckLakeCompactionFileEntry> source_files);

private:
ClientContext &context;
DuckLakeCatalog &catalog;
DuckLakeTransaction &transaction;
Binder &binder;
TableIndex table_id;
double delete_threshold = 0.95;
DuckLakeMergeAdjacentOptions options;

CompactionType type;
};

DuckLakeCompactor::DuckLakeCompactor(ClientContext &context, DuckLakeCatalog &catalog, DuckLakeTransaction &transaction,
Binder &binder, TableIndex table_id, DuckLakeMergeAdjacentOptions options)
: context(context), catalog(catalog), transaction(transaction), binder(binder), table_id(table_id),
Expand Down Expand Up @@ -221,6 +161,8 @@ void DuckLakeCompactor::GenerateCompactions(DuckLakeTableEntry &table,
filter_options.min_file_size = options.min_file_size;
filter_options.max_file_size = options.max_file_size;
filter_options.target_file_size = target_file_size;
// FIXME: pass in the sort_data so that list of files is approximately sorted in the same way
// (sorted by the min/max metadata)
auto files = metadata_manager.GetFilesForCompaction(table, type, delete_threshold, snapshot, filter_options);

// iterate over the files and split into separate compaction groups
Expand Down Expand Up @@ -303,6 +245,91 @@ void DuckLakeCompactor::GenerateCompactions(DuckLakeTableEntry &table,
}
}

unique_ptr<LogicalOperator> DuckLakeCompactor::InsertSort(Binder &binder, unique_ptr<LogicalOperator> &plan,
DuckLakeTableEntry &table,
optional_ptr<DuckLakeSort> sort_data) {
auto bindings = plan->GetColumnBindings();

vector<BoundOrderByNode> orders;

vector<OrderByNode> pre_bound_orders;

for (auto &pre_bound_order : sort_data->fields) {
if (pre_bound_order.dialect != "duckdb") {
continue;
}
Comment on lines +258 to +260
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should error if it is an unsupported dialect?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I will look at the macros as an example. Would you prefer that I filter them out in SQL when I pull from the catalog?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of a question, I'm open to suggestions. On one hand, we can be more permissive with anything dialect related, but I fear this might cause confusion, while an error removes any kind of ambiguity of the system's state.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking back at this, the reason I implemented it this way was in case we wanted to deliberately have 2 sort orders active at once: one for DuckDB, and one for another engine like Spark for example. So if there were 2 separate sets of sorts, skipping the non-DuckDB ones (but not throwing an error) should allow them to co-exist.

Unfortunately, this case is very difficult to test in this repo! I think as catalog editing gets added on the Spark side, we can do some testing in that repo.

So, do you mind if we leave it as-is?

auto parsed_expression = Parser::ParseExpressionList(pre_bound_order.expression);
OrderByNode order_node(pre_bound_order.sort_direction, pre_bound_order.null_order,
std::move(parsed_expression[0]));
pre_bound_orders.emplace_back(std::move(order_node));
}

if (pre_bound_orders.empty()) {
// Then the sorts were not in the DuckDB dialect and we return the original plan
return std::move(plan);
}
auto root_get = unique_ptr_cast<LogicalOperator, LogicalGet>(std::move(plan));

// FIXME: Allow arbitrary expressions instead of just column references. (Need a dynamic bind)
// Build a map of the names of columns in the query plan so we can bind to them
case_insensitive_map_t<idx_t> alias_map;
auto current_columns = table.GetColumns().GetColumnNames();
for (idx_t col_idx = 0; col_idx < current_columns.size(); col_idx++) {
alias_map[current_columns[col_idx]] = col_idx;
}

root_get->ResolveOperatorTypes();
auto &root_types = root_get->types;

vector<string> unmatching_names;
for (auto &pre_bound_order : pre_bound_orders) {
auto name = pre_bound_order.expression->GetName();
auto order_idx_check = alias_map.find(name);
if (order_idx_check != alias_map.end()) {
auto order_idx = order_idx_check->second;
auto expr = make_uniq<BoundColumnRefExpression>(root_types[order_idx], bindings[order_idx], 0);
orders.emplace_back(pre_bound_order.type, pre_bound_order.null_order, std::move(expr));
} else {
// Then we did not find the column in the table
// We want to record all of the ones that we do not find and then throw a more informative error that
// includes all incorrect columns.
unmatching_names.push_back(name);
}
}

if (!unmatching_names.empty()) {
string error_string =
"Columns in the SET SORTED BY statement were not found in the DuckLake table. Unmatched columns were: ";
for (auto &unmatching_name : unmatching_names) {
error_string += unmatching_name + ", ";
}
error_string.resize(error_string.length() - 2); // Remove trailing ", "
throw BinderException(error_string);
}

auto order = make_uniq<LogicalOrder>(std::move(orders));

order->children.push_back(std::move(root_get));

vector<unique_ptr<Expression>> cast_expressions;
order->ResolveOperatorTypes();

auto &types = order->types;
auto order_bindings = order->GetColumnBindings();

for (idx_t col_idx = 0; col_idx < types.size(); col_idx++) {
auto &type = types[col_idx];
auto &binding = order_bindings[col_idx];
auto ref_expr = make_uniq<BoundColumnRefExpression>(type, binding);
cast_expressions.push_back(std::move(ref_expr));
}

auto projected = make_uniq<LogicalProjection>(binder.GenerateTableIndex(), std::move(cast_expressions));
projected->children.push_back(std::move(order));

return std::move(projected);
}

unique_ptr<LogicalOperator>
DuckLakeCompactor::GenerateCompactionCommand(vector<DuckLakeCompactionFileEntry> source_files) {
// get the table entry at the specified snapshot
Expand Down Expand Up @@ -452,6 +479,26 @@ DuckLakeCompactor::GenerateCompactionCommand(vector<DuckLakeCompactionFileEntry>
root = DuckLakeInsert::InsertCasts(binder, root);
}

// If compaction should be ordered, add Order By (and projection) to logical plan
// Do not pull the sort setting at the time of the creation of the files being compacted,
// and instead pull the latest sort setting
// First, see if there are transaction local changes to the table
// Then fall back to latest snapshot if no local changes
auto latest_entry = transaction.GetTransactionLocalEntry(CatalogType::TABLE_ENTRY, table.schema.name, table.name);
if (!latest_entry) {
auto latest_snapshot = transaction.GetSnapshot();
latest_entry = catalog.GetEntryById(transaction, latest_snapshot, table_id);
if (!latest_entry) {
throw InternalException("DuckLakeCompactor: failed to find local table entry");
}
}
auto &latest_table = latest_entry->Cast<DuckLakeTableEntry>();

auto sort_data = latest_table.GetSortData();
if (sort_data) {
root = DuckLakeCompactor::InsertSort(binder, root, latest_table, sort_data);
}

// generate the LogicalCopyToFile
auto copy = make_uniq<LogicalCopyToFile>(std::move(copy_options.copy_function), std::move(copy_options.bind_data),
std::move(copy_options.info));
Expand Down
22 changes: 22 additions & 0 deletions src/functions/ducklake_flush_inlined_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "storage/ducklake_flush_data.hpp"
#include "duckdb/planner/operator/logical_projection.hpp"

#include "functions/ducklake_compaction_functions.hpp"

namespace duckdb {

//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -200,6 +202,26 @@ unique_ptr<LogicalOperator> DuckLakeDataFlusher::GenerateFlushCommand() {
root = DuckLakeInsert::InsertCasts(binder, root);
}

// If flush should be ordered, add Order By (and projection) to logical plan
// Do not pull the sort setting at the time of the creation of the rows being flushed,
// and instead pull the latest sort setting
// First, see if there are transaction local changes to the table
// Then fall back to latest snapshot if no local changes
auto latest_entry = transaction.GetTransactionLocalEntry(CatalogType::TABLE_ENTRY, table.schema.name, table.name);
if (!latest_entry) {
auto latest_snapshot = transaction.GetSnapshot();
latest_entry = catalog.GetEntryById(transaction, latest_snapshot, table_id);
if (!latest_entry) {
throw InternalException("DuckLakeDataFlusher: failed to find latest table entry for latest snapshot id");
}
}
auto &latest_table = latest_entry->Cast<DuckLakeTableEntry>();

auto sort_data = latest_table.GetSortData();
if (sort_data) {
root = DuckLakeCompactor::InsertSort(binder, root, latest_table, sort_data);
}

// generate the LogicalCopyToFile
auto copy = make_uniq<LogicalCopyToFile>(std::move(copy_options.copy_function), std::move(copy_options.bind_data),
std::move(copy_options.info));
Expand Down
3 changes: 2 additions & 1 deletion src/include/common/local_change.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ enum class LocalChangeType {
ADD_COLUMN,
REMOVE_COLUMN,
CHANGE_COLUMN_TYPE,
SET_DEFAULT
SET_DEFAULT,
SET_SORT_KEY
};

struct LocalChange {
Expand Down
99 changes: 99 additions & 0 deletions src/include/functions/ducklake_compaction_functions.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// functions/ducklake_compaction_functions.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "functions/ducklake_table_functions.hpp"
#include "storage/ducklake_transaction.hpp"
#include "storage/ducklake_catalog.hpp"
#include "storage/ducklake_schema_entry.hpp"
#include "storage/ducklake_table_entry.hpp"
#include "storage/ducklake_insert.hpp"
#include "storage/ducklake_multi_file_reader.hpp"
#include "duckdb/planner/operator/logical_get.hpp"
#include "duckdb/planner/operator/logical_copy_to_file.hpp"
#include "duckdb/planner/operator/logical_extension_operator.hpp"
#include "duckdb/planner/operator/logical_set_operation.hpp"
#include "storage/ducklake_compaction.hpp"
#include "duckdb/common/multi_file/multi_file_function.hpp"
#include "storage/ducklake_multi_file_list.hpp"
#include "duckdb/planner/tableref/bound_at_clause.hpp"
#include "duckdb/planner/operator/logical_empty_result.hpp"

namespace duckdb {
//===--------------------------------------------------------------------===//
// Logical Operator
//===--------------------------------------------------------------------===//
class DuckLakeLogicalCompaction : public LogicalExtensionOperator {
public:
DuckLakeLogicalCompaction(idx_t table_index, DuckLakeTableEntry &table,
vector<DuckLakeCompactionFileEntry> source_files_p, string encryption_key_p,
optional_idx partition_id, vector<string> partition_values_p, optional_idx row_id_start,
CompactionType type)
: table_index(table_index), table(table), source_files(std::move(source_files_p)),
encryption_key(std::move(encryption_key_p)), partition_id(partition_id),
partition_values(std::move(partition_values_p)), row_id_start(row_id_start), type(type) {
}

idx_t table_index;
DuckLakeTableEntry &table;
vector<DuckLakeCompactionFileEntry> source_files;
string encryption_key;
optional_idx partition_id;
vector<string> partition_values;
optional_idx row_id_start;
CompactionType type;

public:
PhysicalOperator &CreatePlan(ClientContext &context, PhysicalPlanGenerator &planner) override {
auto &child = planner.CreatePlan(*children[0]);
return planner.Make<DuckLakeCompaction>(types, table, std::move(source_files), std::move(encryption_key),
partition_id, std::move(partition_values), row_id_start, child, type);
}

string GetExtensionName() const override {
return "ducklake";
}
vector<ColumnBinding> GetColumnBindings() override {
vector<ColumnBinding> result;
result.emplace_back(table_index, 0);
return result;
}

void ResolveTypes() override {
types = {LogicalType::BOOLEAN};
}
};

//===--------------------------------------------------------------------===//
// Compaction Command Generator
//===--------------------------------------------------------------------===//
class DuckLakeCompactor {
public:
DuckLakeCompactor(ClientContext &context, DuckLakeCatalog &catalog, DuckLakeTransaction &transaction,
Binder &binder, TableIndex table_id, DuckLakeMergeAdjacentOptions options);
DuckLakeCompactor(ClientContext &context, DuckLakeCatalog &catalog, DuckLakeTransaction &transaction,
Binder &binder, TableIndex table_id, double delete_threshold);
void GenerateCompactions(DuckLakeTableEntry &table, vector<unique_ptr<LogicalOperator>> &compactions);
unique_ptr<LogicalOperator> GenerateCompactionCommand(vector<DuckLakeCompactionFileEntry> source_files);
static unique_ptr<LogicalOperator> InsertSort(Binder &binder, unique_ptr<LogicalOperator> &plan,
DuckLakeTableEntry &table, optional_ptr<DuckLakeSort> sort_data);

private:
ClientContext &context;
DuckLakeCatalog &catalog;
DuckLakeTransaction &transaction;
Binder &binder;
TableIndex table_id;
double delete_threshold = 0.95;
DuckLakeMergeAdjacentOptions options;

CompactionType type;
};

} // namespace duckdb
10 changes: 10 additions & 0 deletions src/include/storage/ducklake_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct DuckLakeFileListEntry;
struct DuckLakeConfigOption;
struct DeleteFileMap;
class LogicalGet;
struct DuckLakeSort;

class DuckLakeCatalog : public Catalog {
public:
Expand Down Expand Up @@ -165,6 +166,15 @@ class DuckLakeCatalog : public Catalog {
DuckLakeCatalogSet &schema);
//! Return the schema for the given snapshot - loading it if it is not yet loaded
DuckLakeCatalogSet &GetSchemaForSnapshot(DuckLakeTransaction &transaction, DuckLakeSnapshot snapshot);
//! Update the sort data for a table in the cached schema (used when SET_SORT_KEY commits without schema change)
void UpdateSortDataInCache(idx_t schema_version, TableIndex table_id, unique_ptr<DuckLakeSort> sort_data);
//! Update the table comment in the cached schema (used when SET_COMMENT commits without schema change)
void UpdateTableCommentInCache(idx_t schema_version, TableIndex table_id, const Value &new_comment);
//! Update a column comment in the cached schema (used when SET_COLUMN_COMMENT commits without schema change)
void UpdateColumnCommentInCache(idx_t schema_version, TableIndex table_id, FieldIndex field_index,
const Value &new_comment);
//! Update a view comment in the cached schema (used when SET_COMMENT on view commits without schema change)
void UpdateViewCommentInCache(idx_t schema_version, TableIndex view_id, const Value &new_comment);

private:
void DropSchema(ClientContext &context, DropInfo &info) override;
Expand Down
Loading