From b42a8e78d60b30363139a966e42bd33a3dd305a5 Mon Sep 17 00:00:00 2001 From: Gabor Szarnyas Date: Sat, 4 Jan 2025 08:46:59 +0100 Subject: [PATCH 1/4] Update year in license file to 2025 --- LICENSE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LICENSE b/LICENSE index 43eec6098..4e1fbb769 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright 2018-2024 Stichting DuckDB Foundation +Copyright 2018-2025 Stichting DuckDB Foundation Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: From c1365cfbe551059e8783999436ce93dc37b60972 Mon Sep 17 00:00:00 2001 From: Eiichi Arikawa <157803904+e1arikawa@users.noreply.github.com> Date: Thu, 16 Jan 2025 00:37:59 +0900 Subject: [PATCH 2/4] Add OPFS (Origin Private File System) Support (#1856) * add opfs feature * add test for url with long query string * update s3rver cors settings * update httpfs test. * update httpfs test. * update httpfs test for eslint * Fixup patch, now allowing installing from other repositories via 'INSTALL x FROM community' * fix dropfile * Update packages/duckdb-wasm/test/opfs.test.ts Co-authored-by: asrar * Improve README * Update README.md * Add npm_tags.yml * Perform checkout * Fix registerFileHandle. * update comment * add test for using file in dirrectory --------- Co-authored-by: Carlo Piovesan Co-authored-by: asrar --- examples/esbuild-node/index.ts | 2 +- lib/include/duckdb/web/config.h | 2 + lib/include/duckdb/web/io/web_filesystem.h | 2 + lib/js-stubs.js | 4 + lib/src/arrow_type_mapping.cc | 4 + lib/src/config.cc | 3 + lib/src/io/web_filesystem.cc | 23 +- lib/src/json_typedef.cc | 6 + lib/src/webdb.cc | 22 +- .../src/system/arquero_benchmarks.ts | 104 +++--- packages/duckdb-wasm/karma/s3rver/s3rver.js | 1 + .../duckdb-wasm/src/bindings/bindings_base.ts | 43 ++- .../src/bindings/bindings_interface.ts | 3 +- packages/duckdb-wasm/src/bindings/config.ts | 4 + packages/duckdb-wasm/src/bindings/runtime.ts | 11 + .../src/bindings/runtime_browser.ts | 190 ++++++++--- .../duckdb-wasm/src/bindings/runtime_node.ts | 1 + .../src/parallel/worker_dispatcher.ts | 12 +- packages/duckdb-wasm/test/httpfs_test.ts | 27 ++ packages/duckdb-wasm/test/index_browser.ts | 2 + packages/duckdb-wasm/test/opfs.test.ts | 306 ++++++++++++++++++ .../duckdb-wasm/test/string_test_helper.ts | 20 ++ patches/duckdb/bind_copy_direct_io.patch | 15 + patches/duckdb/fix_load_database.patch | 20 ++ 24 files changed, 721 insertions(+), 106 deletions(-) create mode 100644 packages/duckdb-wasm/test/opfs.test.ts create mode 100644 packages/duckdb-wasm/test/string_test_helper.ts create mode 100644 patches/duckdb/bind_copy_direct_io.patch create mode 100644 patches/duckdb/fix_load_database.patch diff --git a/examples/esbuild-node/index.ts b/examples/esbuild-node/index.ts index 734d22c2b..183de2a7a 100644 --- a/examples/esbuild-node/index.ts +++ b/examples/esbuild-node/index.ts @@ -1,11 +1,11 @@ import * as duckdb from '@duckdb/duckdb-wasm'; import * as arrow from 'apache-arrow'; import path from 'path'; -import Worker from 'web-worker'; import { createRequire } from 'module'; const require = createRequire(import.meta.url); const DUCKDB_DIST = path.dirname(require.resolve('@duckdb/duckdb-wasm')); +const Worker = require('web-worker'); (async () => { try { diff --git a/lib/include/duckdb/web/config.h b/lib/include/duckdb/web/config.h index c112cb390..a8826eda0 100644 --- a/lib/include/duckdb/web/config.h +++ b/lib/include/duckdb/web/config.h @@ -81,6 +81,8 @@ struct WebDBConfig { std::optional access_mode = std::nullopt; /// The thread count uint32_t maximum_threads = (STATIC_WEBDB_FEATURES & (1 << WebDBFeature::THREADS)) ? 4 : 1; + /// The direct io flag + bool use_direct_io = false; /// The query config QueryConfig query = { .cast_bigint_to_double = std::nullopt, diff --git a/lib/include/duckdb/web/io/web_filesystem.h b/lib/include/duckdb/web/io/web_filesystem.h index 71a1ef780..96638f992 100644 --- a/lib/include/duckdb/web/io/web_filesystem.h +++ b/lib/include/duckdb/web/io/web_filesystem.h @@ -209,6 +209,8 @@ class WebFileSystem : public duckdb::FileSystem { DataBuffer file_buffer); /// Try to drop a specific file bool TryDropFile(std::string_view file_name); + /// drop a specific file + void DropFile(std::string_view file_name); /// Drop all files without references (including buffers) void DropDanglingFiles(); /// Configure file statistics diff --git a/lib/js-stubs.js b/lib/js-stubs.js index 2371d1e93..86a8cfbfc 100644 --- a/lib/js-stubs.js +++ b/lib/js-stubs.js @@ -15,6 +15,10 @@ addToLibrary({ duckdb_web_fs_file_sync: function (fileId) { return globalThis.DUCKDB_RUNTIME.syncFile(Module, fileId); }, + duckdb_web_fs_file_drop_file__sig: 'vpi', + duckdb_web_fs_file_drop_file: function (fileName, fileNameLen) { + return globalThis.DUCKDB_RUNTIME.dropFile(Module, fileName, fileNameLen); + }, duckdb_web_fs_file_close__sig: 'vi', duckdb_web_fs_file_close: function (fileId) { return globalThis.DUCKDB_RUNTIME.closeFile(Module, fileId); diff --git a/lib/src/arrow_type_mapping.cc b/lib/src/arrow_type_mapping.cc index 4ecbf43fc..10c995304 100644 --- a/lib/src/arrow_type_mapping.cc +++ b/lib/src/arrow_type_mapping.cc @@ -122,6 +122,10 @@ arrow::Result mapArrowTypeToDuckDB(const arrow::DataType& t case arrow::Type::type::EXTENSION: case arrow::Type::type::SPARSE_UNION: case arrow::Type::type::DENSE_UNION: + case arrow::Type::type::STRING_VIEW: + case arrow::Type::type::BINARY_VIEW: + case arrow::Type::type::LIST_VIEW: + case arrow::Type::type::LARGE_LIST_VIEW: return arrow::Status::NotImplemented("DuckDB type mapping for: ", type.ToString()); } return duckdb::LogicalTypeId::INVALID; diff --git a/lib/src/config.cc b/lib/src/config.cc index ea984f26b..887f28448 100644 --- a/lib/src/config.cc +++ b/lib/src/config.cc @@ -76,6 +76,9 @@ WebDBConfig WebDBConfig::ReadFrom(std::string_view args_json) { if (doc.HasMember("allowUnsignedExtensions") && doc["allowUnsignedExtensions"].IsBool()) { config.allow_unsigned_extensions = doc["allowUnsignedExtensions"].GetBool(); } + if (doc.HasMember("useDirectIO") && doc["useDirectIO"].IsBool()) { + config.use_direct_io = doc["useDirectIO"].GetBool(); + } if (doc.HasMember("query") && doc["query"].IsObject()) { auto q = doc["query"].GetObject(); if (q.HasMember("queryPollingInterval") && q["queryPollingInterval"].IsNumber()) { diff --git a/lib/src/io/web_filesystem.cc b/lib/src/io/web_filesystem.cc index e46f7f14e..aa6cdfa28 100644 --- a/lib/src/io/web_filesystem.cc +++ b/lib/src/io/web_filesystem.cc @@ -117,6 +117,7 @@ RT_FN(void duckdb_web_fs_file_close(size_t file_id), { auto &infos = GetLocalState(); infos.handles.erase(file_id); }); +RT_FN(void duckdb_web_fs_file_drop_file(const char *fileName, size_t pathLen), {}); RT_FN(void duckdb_web_fs_file_truncate(size_t file_id, double new_size), { GetOrOpen(file_id).Truncate(new_size); }); RT_FN(time_t duckdb_web_fs_file_get_last_modified_time(size_t file_id), { auto &file = GetOrOpen(file_id); @@ -226,6 +227,8 @@ WebFileSystem::DataProtocol WebFileSystem::inferDataProtocol(std::string_view ur proto = WebFileSystem::DataProtocol::HTTP; } else if (hasPrefix(url, "s3://")) { proto = WebFileSystem::DataProtocol::S3; + } else if (hasPrefix(url, "opfs://")) { + proto = WebFileSystem::DataProtocol::BROWSER_FSACCESS; } else if (hasPrefix(url, "file://")) { data_url = std::string_view{url}.substr(7); proto = default_data_protocol_; @@ -453,6 +456,7 @@ void WebFileSystem::DropDanglingFiles() { for (auto &[file_id, file] : files_by_id_) { if (file->handle_count_ == 0) { files_by_name_.erase(file->file_name_); + DropFile(file->file_name_); if (file->data_url_.has_value()) { files_by_url_.erase(file->data_url_.value()); } @@ -481,6 +485,13 @@ bool WebFileSystem::TryDropFile(std::string_view file_name) { return false; } +/// drop a file +void WebFileSystem::DropFile(std::string_view file_name) { + DEBUG_TRACE(); + std::string fileNameS = std::string{file_name}; + duckdb_web_fs_file_drop_file(fileNameS.c_str(), fileNameS.size()); +} + /// Write the global filesystem info rapidjson::Value WebFileSystem::WriteGlobalFileInfo(rapidjson::Document &doc, uint32_t cache_epoch) { DEBUG_TRACE(); @@ -793,7 +804,7 @@ void WebFileSystem::Write(duckdb::FileHandle &handle, void *buffer, int64_t nr_b auto file_size = file_hdl.file_->file_size_; auto writer = static_cast(buffer); file_hdl.position_ = location; - while (nr_bytes > 0 && location < file_size) { + while (nr_bytes > 0) { auto n = Write(handle, writer, nr_bytes); writer += n; nr_bytes -= n; @@ -1006,10 +1017,12 @@ void WebFileSystem::FileSync(duckdb::FileHandle &handle) { vector WebFileSystem::Glob(const std::string &path, FileOpener *opener) { std::unique_lock fs_guard{fs_mutex_}; std::vector results; - auto glob = glob_to_regex(path); - for (auto [name, file] : files_by_name_) { - if (std::regex_match(file->file_name_, glob)) { - results.push_back(std::string{name}); + if (!FileSystem::IsRemoteFile(path)) { + auto glob = glob_to_regex(path); + for (auto [name, file] : files_by_name_) { + if (std::regex_match(file->file_name_, glob)) { + results.push_back(std::string{name}); + } } } auto &state = GetLocalState(); diff --git a/lib/src/json_typedef.cc b/lib/src/json_typedef.cc index 890ee8a73..1b98e21ee 100644 --- a/lib/src/json_typedef.cc +++ b/lib/src/json_typedef.cc @@ -396,6 +396,12 @@ arrow::Result WriteSQLType(rapidjson::Document& doc, const duc case duckdb::LogicalTypeId::AGGREGATE_STATE: case duckdb::LogicalTypeId::BIT: case duckdb::LogicalTypeId::LAMBDA: + case duckdb::LogicalTypeId::STRING_LITERAL: + case duckdb::LogicalTypeId::INTEGER_LITERAL: + case duckdb::LogicalTypeId::UHUGEINT: + case duckdb::LogicalTypeId::UNION: + case duckdb::LogicalTypeId::ARRAY: + case duckdb::LogicalTypeId::VARINT: break; } return out; diff --git a/lib/src/webdb.cc b/lib/src/webdb.cc index 447e10ffe..89c769da2 100644 --- a/lib/src/webdb.cc +++ b/lib/src/webdb.cc @@ -828,6 +828,7 @@ arrow::Status WebDB::Open(std::string_view args_json) { db_config.options.access_mode = access_mode; db_config.options.duckdb_api = "wasm"; db_config.options.custom_user_agent = config_->custom_user_agent; + db_config.options.use_direct_io = config_->use_direct_io; auto db = make_shared_ptr(config_->path, &db_config); #ifndef WASM_LOADABLE_EXTENSIONS duckdb_web_parquet_init(db.get()); @@ -912,18 +913,29 @@ arrow::Status WebDB::RegisterFileBuffer(std::string_view file_name, std::unique_ /// Drop all files arrow::Status WebDB::DropFiles() { file_page_buffer_->DropDanglingFiles(); - pinned_web_files_.clear(); + std::vector files_to_drop; + for (const auto& [key, handle] : pinned_web_files_) { + files_to_drop.push_back(handle->GetName()); + } + for (const auto& fileName : files_to_drop) { + arrow::Status status = DropFile(fileName); + if (!status.ok()) { + return arrow::Status::Invalid("Failed to drop file: " + fileName); + } + } if (auto fs = io::WebFileSystem::Get()) { fs->DropDanglingFiles(); } return arrow::Status::OK(); } /// Drop a file -arrow::Status WebDB::DropFile(std::string_view file_name) { - file_page_buffer_->TryDropFile(file_name); - pinned_web_files_.erase(file_name); +arrow::Status WebDB::DropFile(std::string_view fileName) { + file_page_buffer_->TryDropFile(fileName); + pinned_web_files_.erase(fileName); if (auto fs = io::WebFileSystem::Get()) { - if (!fs->TryDropFile(file_name)) { + if (fs->TryDropFile(fileName)) { + fs->DropFile(fileName); + } else { return arrow::Status::Invalid("file is in use"); } } diff --git a/packages/benchmarks/src/system/arquero_benchmarks.ts b/packages/benchmarks/src/system/arquero_benchmarks.ts index b18f32747..5a742a21d 100644 --- a/packages/benchmarks/src/system/arquero_benchmarks.ts +++ b/packages/benchmarks/src/system/arquero_benchmarks.ts @@ -84,12 +84,12 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { case 2: { const tmp = this.tables['region'] .filter((d: any) => d.op.equal(d.r_name, 'EUROPE')) - .join(this.tables['nation'], ['r_regionkey', 'n_regionkey']) - .join(this.tables['supplier'], ['n_nationkey', 's_nationkey']); - const sub = tmp.join(this.tables['partsupp'], ['s_suppkey', 'ps_suppkey']); + .join(this.tables['nation'], [['r_regionkey'], ['n_regionkey']]) + .join(this.tables['supplier'], [['n_nationkey'], ['s_nationkey']]); + const sub = tmp.join(this.tables['partsupp'], [['s_suppkey'], ['ps_suppkey']]); const sub2 = this.tables['part'] .filter((d: any) => d.p_size == 15 && aq.op.match(d.p_type, /.*BRASS$/g, 0) != null) - .join(sub, ['p_partkey', 'ps_partkey']) + .join(sub, [['p_partkey'], ['ps_partkey']]) .groupby('p_partkey') .rollup({ min_ps_supplycost: (d: any) => aq.op.min(d.ps_supplycost), @@ -99,7 +99,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { (a: any, b: any) => a.p_partkey == b.ps_partkey && a.min_ps_supplycost == b.ps_supplycost, ); const query = tmp - .join(sub2, ['s_suppkey', 'ps_suppkey']) + .join(sub2, [['s_suppkey'], ['ps_suppkey']]) .orderby(aq.desc('s_acctbal'), 'n_name', 's_name', 'p_partkey'); for (const v of query.objects()) { noop(v); @@ -111,8 +111,8 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { const o = this.tables['orders'].filter((d: any) => d.o_orderdate < aq.op.timestamp('1995-03-15')); const l = this.tables['lineitem'].filter((d: any) => d.l_shipdate < aq.op.timestamp('1995-03-15')); const query = c - .join(o, ['c_custkey', 'o_custkey']) - .join(l, ['o_orderkey', 'l_orderkey']) + .join(o, [['c_custkey'], ['o_custkey']]) + .join(l, [['o_orderkey'], ['l_orderkey']]) .derive({ disc_price: (d: any) => d.l_extendedprice * (1 - d.l_discount), }) @@ -133,7 +133,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { ); const l = this.tables['lineitem'].filter((d: any) => d.l_commitdate < d.l_receiptdate); const query = o - .join(l, ['o_orderkey', 'l_orderkey']) + .join(l, [['o_orderkey'], ['l_orderkey']]) .groupby('o_orderpriority') .rollup({ order_count: aq.op.count(), @@ -156,10 +156,10 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { const n = this.tables['nation']; const right = r - .join(n, ['r_regionkey', 'n_regionkey']) - .join(c, ['n_nationkey', 'c_nationkey']) - .join(o, ['c_custkey', 'o_custkey']) - .join(l, ['o_orderkey', 'l_orderkey']); + .join(n, [['r_regionkey'], ['n_regionkey']]) + .join(c, [['n_nationkey'], ['c_nationkey']]) + .join(o, [['c_custkey'], ['o_custkey']]) + .join(l, [['o_orderkey'], ['l_orderkey']]); const query = s .join( right, @@ -232,11 +232,11 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { (a.n1_nationkey == 'GERMANY' && b.n2_nationkey == 'FRANCE'), ); const right = nations - .join(c, ['n2_nationkey', 'c_nationkey']) - .join(o, ['c_custkey', 'o_custkey']) - .join(l, ['o_orderkey', 'l_orderkey']); + .join(c, [['n2_nationkey'], ['c_nationkey']]) + .join(o, [['c_custkey'], ['o_custkey']]) + .join(l, [['o_orderkey'], ['l_orderkey']]); const query = s - .join(right, ['s_suppkey', 'l_suppkey']) + .join(right, [['s_suppkey'], ['l_suppkey']]) .groupby('n1_name', 'n2_name', 'l_year') .rollup({ revenue: (d: any) => aq.op.sum(d.volume), @@ -255,9 +255,9 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { d.o_orderdate <= aq.op.timestamp('1996-12-31'), ); const sub = p - .join(this.tables['lineitem'], ['p_partkey', 'l_partkey']) - .join(o, ['l_orderkey', 'o_orderkey']) - .join(this.tables['customer'], ['o_custkey', 'c_custkey']); + .join(this.tables['lineitem'], [['p_partkey'], ['l_partkey']]) + .join(o, [['l_orderkey'], ['o_orderkey']]) + .join(this.tables['customer'], [['o_custkey'], ['c_custkey']]); const r2 = this.tables['region'] .filter((d: any) => d.r_name == 'AMERICA') .rename({ @@ -269,11 +269,11 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { n_name: 'n2_name', }); const sub2 = r2 - .join(n2, ['r2_regionkey', 'n2_regionkey']) - .join(sub, ['n2_nationkey', 'c_nationkey']) - .join(this.tables['supplier'], ['l_suppkey', 's_suppkey']); + .join(n2, [['r2_regionkey'], ['n2_regionkey']]) + .join(sub, [['n2_nationkey'], ['c_nationkey']]) + .join(this.tables['supplier'], [['l_suppkey'], ['s_suppkey']]); const query = this.tables['nation'] - .join(sub2, ['n_nationkey', 's_nationkey']) + .join(sub2, [['n_nationkey'], ['s_nationkey']]) .derive({ o_year: (d: any) => aq.op.year(d.o_orderdate), volume: (d: any) => d.l_extendedprice * (1 - d.l_discount), @@ -290,16 +290,16 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { break; } case 9: { - const sub = this.tables['nation'].join(this.tables['supplier'], ['n_nationkey', 's_nationkey']); + const sub = this.tables['nation'].join(this.tables['supplier'], [['n_nationkey'], ['s_nationkey']]); const query = this.tables['part'] .filter((d: any) => aq.op.match(d.p_name, /.*green.*/g, 0) != null) - .join(this.tables['partsupp'], ['p_partkey', 'ps_partkey']) - .join(sub, ['ps_suppkey', 's_suppkey']) + .join(this.tables['partsupp'], [['p_partkey'], ['ps_partkey']]) + .join(sub, [['ps_suppkey'], ['s_suppkey']]) .join( this.tables['lineitem'], (a: any, b: any) => a.p_partkey == b.l_partkey && a.s_suppkey == b.l_suppkey, ) - .join(this.tables['orders'], ['l_orderkey', 'o_orderkey']) + .join(this.tables['orders'], [['l_orderkey'], ['o_orderkey']]) .derive({ o_year: (d: any) => aq.op.year(d.o_orderdate), amount: (d: any) => d.l_extendedprice * (1 - d.l_discount) - d.ps_supplycost * d.l_quantity, @@ -323,10 +323,10 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { ) .join( this.tables['lineitem'].filter((d: any) => d.l_returnflag == 'R'), - ['o_orderkey', 'l_orderkey'], + [['o_orderkey'], ['l_orderkey']], ) - .join(this.tables['customer'], ['o_custkey', 'c_custkey']) - .join(this.tables['nation'], ['c_nationkey', 'n_nationkey']) + .join(this.tables['customer'], [['o_custkey'], ['c_custkey']]) + .join(this.tables['nation'], [['c_nationkey'], ['n_nationkey']]) .derive({ realprice: (d: any) => d.l_extendedprice * (1 - d.l_discount), }) @@ -343,8 +343,8 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { case 11: { const temp = this.tables['nation'] .filter((d: any) => d.n_name == 'GERMANY') - .join(this.tables['supplier'], ['n_nationkey', 's_nationkey']) - .join(this.tables['partsupp'], ['s_suppkey', 'ps_suppkey']) + .join(this.tables['supplier'], [['n_nationkey'], ['s_nationkey']]) + .join(this.tables['partsupp'], [['s_suppkey'], ['ps_suppkey']]) .derive({ value: (d: any) => d.ps_supplycost * d.ps_availqty, }); @@ -373,7 +373,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { d.l_receiptdate >= aq.op.timestamp('1994-01-01') && d.l_receiptdate <= aq.op.timestamp('1994-12-31'), ) - .join(this.tables['orders'], ['l_orderkey', 'o_orderkey']) + .join(this.tables['orders'], [['l_orderkey'], ['o_orderkey']]) .derive({ high_line: (d: any) => d.o_orderpriority == '1-URGENT' || d.o_orderpriority == '2-HIGH' ? 1 : 0, @@ -396,7 +396,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { (d: any) => aq.op.match(d.o_comment, /^.*special.*requests.*$/g, 0) == null, ); const query = this.tables['customer'] - .join_left(o, ['c_custkey', 'o_custkey']) + .join_left(o, [['c_custkey'], ['o_custkey']]) .derive({ o_orderkey_not_null: (d: any) => (d.o_orderkey != null ? 1 : 0), }) @@ -421,7 +421,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { d.l_receiptdate >= aq.op.timestamp('1995-09-01') && d.l_receiptdate <= aq.op.timestamp('1995-09-30'), ) - .join(this.tables['part'], ['l_partkey', 'p_partkey']) + .join(this.tables['part'], [['l_partkey'], ['p_partkey']]) .derive({ realprice: (d: any) => d.l_extendedprice * (1 - d.l_discount), promoprice: (d: any) => @@ -458,7 +458,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { total_revenue: (d: any) => aq.op.max(d.revenue), }) .join(temp, (a: any, b: any) => aq.op.equal(a.total_revenue, b.revenue)) - .join(this.tables['supplier'], ['l_suppkey', 's_suppkey']) + .join(this.tables['supplier'], [['l_suppkey'], ['s_suppkey']]) .orderby('s_suppkey'); for (const v of query.objects({ grouped: true })) { noop(v); @@ -482,8 +482,8 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { d.p_size == 45 || d.p_size == 19), ) - .join(this.tables['partsupp'], ['p_partkey', 'ps_partkey']) - .antijoin(supplier, ['ps_partkey', 's_suppkey']) + .join(this.tables['partsupp'], [['p_partkey'], ['ps_partkey']]) + .antijoin(supplier, [['ps_partkey'], ['s_suppkey']]) .groupby('p_brand', 'p_type', 'p_size') .rollup({ supplier_cnt: aq.op.distinct('ps_suppkey'), @@ -497,7 +497,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { case 17: { const tmp = this.tables['part'] .filter((d: any) => d.p_brand == 'Brand#23' && d.p_container == 'MED BOX') - .join(this.tables['lineitem'], ['p_partkey', 'l_partkey']); + .join(this.tables['lineitem'], [['p_partkey'], ['l_partkey']]); const agg = tmp.groupby('p_partkey').rollup({ avg_qty: aq.op.mean('l_quantity'), }); @@ -518,9 +518,9 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { quantity: aq.op.sum('l_quantity'), }) .filter((d: any) => d.quantity > 300) - .join(this.tables['orders'], ['l_orderkey', 'o_orderkey']) - .join(this.tables['customer'], ['o_custkey', 'c_custkey']) - .join(this.tables['lineitem'], ['o_orderkey', 'l_orderkey']) + .join(this.tables['orders'], [['l_orderkey'], ['o_orderkey']]) + .join(this.tables['customer'], [['o_custkey'], ['c_custkey']]) + .join(this.tables['lineitem'], [['o_orderkey'], ['l_orderkey']]) .groupby('c_name', 'c_custkey', 'o_orderkey', 'o_orderdate', 'o_totalprice') .rollup({ quantity: aq.op.sum('l_quantity'), @@ -609,7 +609,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { }); const sub = this.tables['part'] .filter((d: any) => aq.op.match(d.p_name, /^forest.*$/, 0) != null) - .join(this.tables['partsupp'], ['p_partkey', 'ps_partkey']) + .join(this.tables['partsupp'], [['p_partkey'], ['ps_partkey']]) .join( qty, (a: any, b: any) => @@ -619,8 +619,8 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { ); const query = this.tables['nation'] .filter((d: any) => d.n_name == 'CANADA') - .join(this.tables['supplier'], ['n_nationkey', 's_nationkey']) - .semijoin(sub, ['s_suppkey', 'ps_suppkey']) + .join(this.tables['supplier'], [['n_nationkey'], ['s_nationkey']]) + .semijoin(sub, [['s_suppkey'], ['ps_suppkey']]) .orderby('s_name'); for (const v of query.objects({ grouped: true })) { noop(v); @@ -632,9 +632,9 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { const orders = this.tables['orders'].filter((d: any) => d.o_orderstatus == 'F'); const query = this.tables['nation'] .filter((d: any) => d.n_name == 'SAUDI ARABIA') - .join(this.tables['supplier'], ['n_nationkey', 's_nationkey']) - .join(lineitem, ['s_suppkey', 'l_suppkey']) - .join(orders, ['l_orderkey', 'o_orderkey']) + .join(this.tables['supplier'], [['n_nationkey'], ['s_nationkey']]) + .join(lineitem, [['s_suppkey'], ['l_suppkey']]) + .join(orders, [['l_orderkey'], ['o_orderkey']]) .antijoin(lineitem, (a: any, b: any) => a.l_suppkey != b.l_suppkey && a.l_orderkey == b.l_orderkey) .semijoin( this.tables['lineitem'], @@ -661,7 +661,7 @@ export class ArqueroTPCHBenchmark implements SystemBenchmark { }); const query = customers .join(total_avg, (a: any, b: any) => a.c_acctbal > b.avg_c_acctbal) - .antijoin(this.tables['orders'], ['c_custkey', 'o_custkey']) + .antijoin(this.tables['orders'], [['c_custkey'], ['o_custkey']]) .derive({ cntrycode: (d: any) => aq.op.substring(d.c_phone, 0, 2), }) @@ -979,7 +979,7 @@ export class ArqueroIntegerJoin2Benchmark implements SystemBenchmark { .params({ filter }) .filter((row: any) => row.v0 < filter) .rename({ v0: 'a0' }) - .join(this.tables['B'].rename({ v0: 'b0', v1: 'b1' }), ['a0', 'b1']); + .join(this.tables['B'].rename({ v0: 'b0', v1: 'b1' }), [['a0'], ['b1']]); let n = 0; for (const v of result) { noop(v); @@ -1048,8 +1048,8 @@ export class ArqueroIntegerJoin3Benchmark implements SystemBenchmark { .params({ filter }) .filter((row: any) => row.v0 < filter) .rename({ v0: 'a0' }) - .join(this.tables['B'].rename({ v0: 'b0', v1: 'b1' }), ['a0', 'b1']) - .join(this.tables['C'].rename({ v0: 'c0', v1: 'c1' }), ['b0', 'c1']); + .join(this.tables['B'].rename({ v0: 'b0', v1: 'b1' }), [['a0'], ['b1']]) + .join(this.tables['C'].rename({ v0: 'c0', v1: 'c1' }), [['b0'], ['c1']]); let n = 0; for (const v of result) { noop(v); diff --git a/packages/duckdb-wasm/karma/s3rver/s3rver.js b/packages/duckdb-wasm/karma/s3rver/s3rver.js index d133af956..72a8c51ec 100644 --- a/packages/duckdb-wasm/karma/s3rver/s3rver.js +++ b/packages/duckdb-wasm/karma/s3rver/s3rver.js @@ -7,6 +7,7 @@ const CORS_CONFIG = "\n" + " GET\n" + " HEAD\n" + " *\n" + + " Content-Range\n" + " \n" + ""; diff --git a/packages/duckdb-wasm/src/bindings/bindings_base.ts b/packages/duckdb-wasm/src/bindings/bindings_base.ts index 70d5475cf..84f01c33d 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_base.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_base.ts @@ -469,13 +469,49 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { } dropResponseBuffers(this.mod); } + /** Prepare a file handle that could only be acquired aschronously */ + public async prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise { + if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS && this._runtime.prepareDBFileHandle) { + const list = await this._runtime.prepareDBFileHandle(path, DuckDBDataProtocol.BROWSER_FSACCESS); + for (const item of list) { + const { handle, path: filePath, fromCached } = item; + if (!fromCached && handle.getSize()) { + await this.registerFileHandle(filePath, handle, DuckDBDataProtocol.BROWSER_FSACCESS, true); + } + } + return; + } + throw new Error(`prepareDBFileHandle: unsupported protocol ${protocol}`); + } /** Register a file object URL */ - public registerFileHandle( + public async registerFileHandle( name: string, handle: HandleType, protocol: DuckDBDataProtocol, directIO: boolean, - ): void { + ): Promise { + if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS) { + if( handle instanceof FileSystemSyncAccessHandle ){ + // already a handle is sync handle. + } else if( handle instanceof FileSystemFileHandle ){ + // handle is an async handle, should convert to sync handle + const fileHandle: FileSystemFileHandle = handle as any; + try { + handle = (await fileHandle.createSyncAccessHandle()) as any; + } catch (e: any) { + throw new Error( e.message + ":" + name ); + } + } else if( name != null ){ + // should get sync handle from the file name. + try { + const opfsRoot = await navigator.storage.getDirectory(); + const fileHandle = await opfsRoot.getFileHandle(name); + handle = (await fileHandle.createSyncAccessHandle()) as any; + } catch (e: any) { + throw new Error( e.message + ":" + name ); + } + } + } const [s, d, n] = callSRet( this.mod, 'duckdb_web_fs_register_file_url', @@ -487,6 +523,9 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { } dropResponseBuffers(this.mod); globalThis.DUCKDB_RUNTIME._files = (globalThis.DUCKDB_RUNTIME._files || new Map()).set(name, handle); + if (globalThis.DUCKDB_RUNTIME._preparedHandles?.[name]) { + delete globalThis.DUCKDB_RUNTIME._preparedHandles[name]; + } if (this.pthread) { for (const worker of this.pthread.runningWorkers) { worker.postMessage({ diff --git a/packages/duckdb-wasm/src/bindings/bindings_interface.ts b/packages/duckdb-wasm/src/bindings/bindings_interface.ts index 4d2ad5e17..31ccca43b 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_interface.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_interface.ts @@ -41,7 +41,8 @@ export interface DuckDBBindings { handle: HandleType, protocol: DuckDBDataProtocol, directIO: boolean, - ): void; + ): Promise; + prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise; globFiles(path: string): WebFile[]; dropFile(name: string): void; dropFiles(): void; diff --git a/packages/duckdb-wasm/src/bindings/config.ts b/packages/duckdb-wasm/src/bindings/config.ts index ed5bb3cdb..ce29ca0f5 100644 --- a/packages/duckdb-wasm/src/bindings/config.ts +++ b/packages/duckdb-wasm/src/bindings/config.ts @@ -50,6 +50,10 @@ export interface DuckDBConfig { * Note that this will only work with cross-origin isolated sites since it requires SharedArrayBuffers. */ maximumThreads?: number; + /** + * The direct io flag + */ + useDirectIO?: boolean; /** * The query config */ diff --git a/packages/duckdb-wasm/src/bindings/runtime.ts b/packages/duckdb-wasm/src/bindings/runtime.ts index 83632deca..4bc360db3 100644 --- a/packages/duckdb-wasm/src/bindings/runtime.ts +++ b/packages/duckdb-wasm/src/bindings/runtime.ts @@ -89,6 +89,12 @@ export interface DuckDBGlobalFileInfo { s3Config?: S3Config; } +export interface PreparedDBFileHandle { + path: string; + handle: any; + fromCached: boolean; +} + /** Call a function with packed response buffer */ export function callSRet( mod: DuckDBModule, @@ -134,6 +140,7 @@ export interface DuckDBRuntime { openFile(mod: DuckDBModule, fileId: number, flags: FileFlags): void; syncFile(mod: DuckDBModule, fileId: number): void; closeFile(mod: DuckDBModule, fileId: number): void; + dropFile(mod: DuckDBModule, fileNamePtr: number, fileNameLen:number): void; getLastFileModificationTime(mod: DuckDBModule, fileId: number): number; truncateFile(mod: DuckDBModule, fileId: number, newSize: number): void; readFile(mod: DuckDBModule, fileId: number, buffer: number, bytes: number, location: number): number; @@ -149,6 +156,9 @@ export interface DuckDBRuntime { checkFile(mod: DuckDBModule, pathPtr: number, pathLen: number): boolean; removeFile(mod: DuckDBModule, pathPtr: number, pathLen: number): void; + // Prepare a file handle that could only be acquired aschronously + prepareDBFileHandle?: (path: string, protocol: DuckDBDataProtocol) => Promise; + // Call a scalar UDF function callScalarUDF( mod: DuckDBModule, @@ -169,6 +179,7 @@ export const DEFAULT_RUNTIME: DuckDBRuntime = { openFile: (_mod: DuckDBModule, _fileId: number, flags: FileFlags): void => {}, syncFile: (_mod: DuckDBModule, _fileId: number): void => {}, closeFile: (_mod: DuckDBModule, _fileId: number): void => {}, + dropFile: (_mod: DuckDBModule, _fileNamePtr: number, _fileNameLen:number): void => {}, getLastFileModificationTime: (_mod: DuckDBModule, _fileId: number): number => { return 0; }, diff --git a/packages/duckdb-wasm/src/bindings/runtime_browser.ts b/packages/duckdb-wasm/src/bindings/runtime_browser.ts index 2c0c270d3..8e46955fe 100644 --- a/packages/duckdb-wasm/src/bindings/runtime_browser.ts +++ b/packages/duckdb-wasm/src/bindings/runtime_browser.ts @@ -1,5 +1,5 @@ -import { StatusCode } from '../status'; -import { addS3Headers, getHTTPUrl } from '../utils'; +import {StatusCode} from '../status'; +import {addS3Headers, getHTTPUrl} from '../utils'; import { callSRet, @@ -11,13 +11,19 @@ import { failWith, FileFlags, readString, + PreparedDBFileHandle, } from './runtime'; import { DuckDBModule } from './duckdb_module'; import * as udf from './udf_runtime'; +const OPFS_PREFIX_LEN = 'opfs://'.length; +const PATH_SEP_REGEX = /\/|\\/; + export const BROWSER_RUNTIME: DuckDBRuntime & { + _files: Map; _fileInfoCache: Map; _globalFileInfo: DuckDBGlobalFileInfo | null; + _preparedHandles: Record; getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null; getGlobalFileInfo(mod: DuckDBModule): DuckDBGlobalFileInfo | null; @@ -26,6 +32,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { _fileInfoCache: new Map(), _udfFunctions: new Map(), _globalFileInfo: null, + _preparedHandles: {} as any, getFileInfo(mod: DuckDBModule, fileId: number): DuckDBFileInfo | null { try { @@ -47,13 +54,17 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { try { const info = JSON.parse(infoStr); if (info == null) { - return null; + return null; } const file = { ...info, blob: null } as DuckDBFileInfo; BROWSER_RUNTIME._fileInfoCache.set(fileId, file); + if (!BROWSER_RUNTIME._files.has(file.fileName) && BROWSER_RUNTIME._preparedHandles[file.fileName]) { + BROWSER_RUNTIME._files.set(file.fileName, BROWSER_RUNTIME._preparedHandles[file.fileName]); + delete BROWSER_RUNTIME._preparedHandles[file.fileName]; + } return file; } catch (error) { - console.warn(error); + console.warn(error); return null; } } catch (e: any) { @@ -82,7 +93,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { if (info == null) { return null; } - BROWSER_RUNTIME._globalFileInfo = { ...info, blob: null } as DuckDBGlobalFileInfo; + BROWSER_RUNTIME._globalFileInfo = { ...info, blob: null} as DuckDBGlobalFileInfo; return BROWSER_RUNTIME._globalFileInfo; } catch (e: any) { @@ -91,6 +102,63 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } }, + /** Prepare a file handle that could only be acquired aschronously */ + async prepareDBFileHandle(dbPath: string, protocol: DuckDBDataProtocol): Promise { + if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS) { + const filePaths = [dbPath, `${dbPath}.wal`]; + const prepare = async (path: string): Promise => { + if (BROWSER_RUNTIME._files.has(path)) { + return { + path, + handle: BROWSER_RUNTIME._files.get(path), + fromCached: true, + }; + } + const opfsRoot = await navigator.storage.getDirectory(); + let dirHandle: FileSystemDirectoryHandle = opfsRoot; + // check if mkdir -p is needed + const opfsPath = path.slice(OPFS_PREFIX_LEN); + let fileName = opfsPath; + if (PATH_SEP_REGEX.test(opfsPath)) { + const folders = opfsPath.split(PATH_SEP_REGEX); + fileName = folders.pop()!; + if (!fileName) { + throw new Error(`Invalid path ${path}`); + } + // mkdir -p + for (const folder of folders) { + dirHandle = await dirHandle.getDirectoryHandle(folder, { create: true }); + } + } + const fileHandle = await dirHandle.getFileHandle(fileName, { create: false }).catch(e => { + if (e?.name === 'NotFoundError') { + console.debug(`File ${path} does not exists yet, creating...`); + return dirHandle.getFileHandle(fileName, { create: true }); + } + throw e; + }); + try { + const handle = await fileHandle.createSyncAccessHandle(); + BROWSER_RUNTIME._preparedHandles[path] = handle; + return { + path, + handle, + fromCached: false, + }; + } catch (e: any) { + throw new Error(e.message + ":" + name); + } + }; + const result: PreparedDBFileHandle[] = []; + for (const filePath of filePaths) { + const res = await prepare(filePath); + result.push(res); + } + return result; + } + throw new Error(`Unsupported protocol ${protocol} for path ${dbPath} with protocol ${protocol}`); + }, + testPlatformFeature: (_mod: DuckDBModule, feature: number): boolean => { switch (feature) { case 1: @@ -160,32 +228,32 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { let contentLength = null; let error: any | null = null; if (file.reliableHeadRequests || !file.allowFullHttpReads) { - try { - // Send a dummy HEAD request with range protocol - // -> good IFF status is 206 and contentLenght is present - const xhr = new XMLHttpRequest(); - if (file.dataProtocol == DuckDBDataProtocol.S3) { - xhr.open('HEAD', getHTTPUrl(file.s3Config, file.dataUrl!), false); - addS3Headers(xhr, file.s3Config, file.dataUrl!, 'HEAD'); - } else { - xhr.open('HEAD', file.dataUrl!, false); - } - xhr.setRequestHeader('Range', `bytes=0-`); - xhr.send(null); + try { + // Send a dummy HEAD request with range protocol + // -> good IFF status is 206 and contentLenght is present + const xhr = new XMLHttpRequest(); + if (file.dataProtocol == DuckDBDataProtocol.S3) { + xhr.open('HEAD', getHTTPUrl(file.s3Config, file.dataUrl!), false); + addS3Headers(xhr, file.s3Config, file.dataUrl!, 'HEAD'); + } else { + xhr.open('HEAD', file.dataUrl!, false); + } + xhr.setRequestHeader('Range', `bytes=0-`); + xhr.send(null); - // Supports range requests - contentLength = xhr.getResponseHeader('Content-Length'); - if (contentLength !== null && xhr.status == 206) { - const result = mod._malloc(2 * 8); - mod.HEAPF64[(result >> 3) + 0] = +contentLength; - mod.HEAPF64[(result >> 3) + 1] = 0; - return result; - } + // Supports range requests + contentLength = xhr.getResponseHeader('Content-Length'); + if (contentLength !== null && xhr.status == 206) { + const result = mod._malloc(2 * 8); + mod.HEAPF64[(result >> 3) + 0] = +contentLength; + mod.HEAPF64[(result >> 3) + 1] = 0; + return result; + } - } catch (e: any) { - error = e; - console.warn(`HEAD request with range header failed: ${e}`); - } + } catch (e: any) { + error = e; + console.warn(`HEAD request with range header failed: ${e}`); + } } // Try to fallback to full read? @@ -223,7 +291,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } head.setRequestHeader('Range', `bytes=0-`); head.send(null); - + // Supports range requests contentLength = head.getResponseHeader('Content-Length'); if (contentLength !== null && +contentLength > 1) { @@ -296,6 +364,20 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { mod.HEAPF64[(result >> 3) + 1] = buffer; return result; } + case DuckDBDataProtocol.BROWSER_FSACCESS: { + const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName); + if (!handle) { + throw new Error(`No OPFS access handle registered with name: ${file.fileName}`); + } + if (flags & FileFlags.FILE_FLAGS_FILE_CREATE_NEW) { + handle.truncate(0); + } + const result = mod._malloc(2 * 8); + const fileSize = handle.getSize(); + mod.HEAPF64[(result >> 3) + 0] = fileSize; + mod.HEAPF64[(result >> 3) + 1] = 0; + return result; + } } } catch (e: any) { // TODO (samansmink): this path causes the WASM code to hang @@ -343,11 +425,19 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { return 0; } const contentLength = xhr2.getResponseHeader('Content-Length'); - if (contentLength && (+contentLength > 1)) { - console.warn(`Range request for ${path} did not return a partial response: ${xhr2.status} "${xhr2.statusText}"`); + if (contentLength && +contentLength > 1) { + console.warn( + `Range request for ${path} did not return a partial response: ${xhr2.status} "${xhr2.statusText}"`, + ); } } mod.ccall('duckdb_web_fs_glob_add_path', null, ['string'], [path]); + } else { + for (const [filePath] of BROWSER_RUNTIME._files!.entries() || []) { + if (filePath.startsWith(path)) { + mod.ccall('duckdb_web_fs_glob_add_path', null, ['string'], [filePath]); + } + } } } catch (e: any) { console.log(e); @@ -372,6 +462,8 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } xhr.send(null); return xhr.status == 206 || xhr.status == 200; + } else { + return BROWSER_RUNTIME._files.has(path); } } catch (e: any) { console.log(e); @@ -393,7 +485,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { // XXX Remove from registry return; case DuckDBDataProtocol.BROWSER_FSACCESS: { - const handle = BROWSER_RUNTIME._files?.get(file.fileName); + const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName); if (!handle) { throw new Error(`No OPFS access handle registered with name: ${file.fileName}`); } @@ -401,6 +493,24 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } } }, + dropFile: (mod: DuckDBModule, fileNamePtr: number, fileNameLen: number) => { + const fileName = readString(mod, fileNamePtr, fileNameLen); + const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(fileName); + if (handle) { + BROWSER_RUNTIME._files.delete(fileName); + if (handle instanceof FileSystemSyncAccessHandle) { + try { + handle.flush(); + handle.close(); + } catch (e: any) { + throw new Error(`Cannot drop file with name: ${fileName}`); + } + } + if (handle instanceof Blob) { + // nothing + } + } + }, truncateFile: (mod: DuckDBModule, fileId: number, newSize: number) => { const file = BROWSER_RUNTIME.getFileInfo(mod, fileId); switch (file?.dataProtocol) { @@ -461,8 +571,14 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { } else if (xhr.status == 200) { // TODO: here we are actually throwing away all non-relevant bytes, but this is still better than failing // proper solution would require notifying duckdb-wasm cache, while we are piggybackign on browser cache - console.warn(`Range request for ${file.dataUrl} did not return a partial response: ${xhr.status} "${xhr.statusText}"`); - const src = new Uint8Array(xhr.response, location, Math.min(xhr.response.byteLength-location, bytes)); + console.warn( + `Range request for ${file.dataUrl} did not return a partial response: ${xhr.status} "${xhr.statusText}"`, + ); + const src = new Uint8Array( + xhr.response, + location, + Math.min(xhr.response.byteLength - location, bytes), + ); mod.HEAPU8.set(src, buf); return src.byteLength; } else { @@ -486,7 +602,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { return data.byteLength; } case DuckDBDataProtocol.BROWSER_FSACCESS: { - const handle = BROWSER_RUNTIME._files?.get(file.fileName); + const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files.get(file.fileName); if (!handle) { throw new Error(`No OPFS access handle registered with name: ${file.fileName}`); } @@ -523,7 +639,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { failWith(mod, 'cannot write using the html5 file reader api'); return 0; case DuckDBDataProtocol.BROWSER_FSACCESS: { - const handle = BROWSER_RUNTIME._files?.get(file.fileName); + const handle: FileSystemSyncAccessHandle = BROWSER_RUNTIME._files?.get(file.fileName); if (!handle) { throw new Error(`No OPFS access handle registered with name: ${file.fileName}`); } diff --git a/packages/duckdb-wasm/src/bindings/runtime_node.ts b/packages/duckdb-wasm/src/bindings/runtime_node.ts index aa63822e2..2f92368bc 100644 --- a/packages/duckdb-wasm/src/bindings/runtime_node.ts +++ b/packages/duckdb-wasm/src/bindings/runtime_node.ts @@ -127,6 +127,7 @@ export const NODE_RUNTIME: DuckDBRuntime & { } return 0; }, + dropFile: (mod: DuckDBModule, _fileNamePtr: number, _fileNameLen:number) => {}, truncateFile: (mod: DuckDBModule, fileId: number, newSize: number) => { try { const file = NODE_RUNTIME.resolveFileInfo(mod, fileId); diff --git a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts index 9e1c9aa41..2ebcfd07c 100644 --- a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts +++ b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts @@ -1,4 +1,4 @@ -import { DuckDBBindings } from '../bindings'; +import { DuckDBBindings, DuckDBDataProtocol } from '../bindings'; import { WorkerResponseVariant, WorkerRequestVariant, WorkerRequestType, WorkerResponseType } from './worker_request'; import { Logger, LogEntryVariant } from '../log'; import { InstantiationProgress } from '../bindings/progress'; @@ -134,10 +134,16 @@ export abstract class AsyncDuckDBDispatcher implements Logger { this.sendOK(request); break; - case WorkerRequestType.OPEN: + case WorkerRequestType.OPEN: { + const path = request.data.path; + if (path?.startsWith('opfs://')) { + await this._bindings.prepareDBFileHandle(path, DuckDBDataProtocol.BROWSER_FSACCESS); + request.data.useDirectIO = true; + } this._bindings.open(request.data); this.sendOK(request); break; + } case WorkerRequestType.DROP_FILE: this._bindings.dropFile(request.data); this.sendOK(request); @@ -322,7 +328,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger { break; case WorkerRequestType.REGISTER_FILE_HANDLE: - this._bindings.registerFileHandle( + await this._bindings.registerFileHandle( request.data[0], request.data[1], request.data[2], diff --git a/packages/duckdb-wasm/test/httpfs_test.ts b/packages/duckdb-wasm/test/httpfs_test.ts index 6a0762d13..4ffe2ac0b 100644 --- a/packages/duckdb-wasm/test/httpfs_test.ts +++ b/packages/duckdb-wasm/test/httpfs_test.ts @@ -2,6 +2,7 @@ import * as duckdb from '../src/'; import { getS3Params, S3Params, S3PayloadParams, createS3Headers, uriEncode, getHTTPUrl } from '../src/utils'; import { AsyncDuckDBConnection, DuckDBBindings, DuckDBBindingsBase, DuckDBModule } from '../src/'; import BROWSER_RUNTIME from '../src/bindings/runtime_browser'; +import {generateLongQueryString} from "./string_test_helper"; // S3 config for tests const BUCKET_NAME = 'test-bucket'; @@ -312,5 +313,31 @@ export function testHTTPFSAsync( ), ).toBeRejectedWithError('Invalid Error: File is not opened in write mode'); }); + + it('can read parquet file from URL with long query string', async () => { + // Create S3 file + const data = await resolveData('/uni/studenten.parquet'); + await putTestFileToS3('correct_auth_test', 'parquet', data); + // Generate a long query string, similar to an S3 Presigned URL + const queryString = generateLongQueryString(); + // Execute the query + const result = await conn!.query( + `SELECT * FROM "${S3_ENDPOINT}/${BUCKET_NAME}/correct_auth_test.parquet?${queryString}";`, + ); + expect(Number((result.getChildAt(0)?.get(6)))).toEqual(Number(29120)); + }); + + it('can read csv file from URL with long query string', async () => { + // Create S3 file + const data = await resolveData('/uni/studenten.parquet'); + await putTestFileToS3('correct_auth_test', 'csv', data); + // Generate a long query string, similar to an S3 Presigned URL + const queryString = generateLongQueryString(); + // Execute the query + const result = await conn!.query( + `SELECT * FROM "${S3_ENDPOINT}/${BUCKET_NAME}/correct_auth_test.csv?${queryString}";`, + ); + expect(Number((result.getChildAt(0)?.get(6)))).toEqual(Number(29120)); + }); }); } diff --git a/packages/duckdb-wasm/test/index_browser.ts b/packages/duckdb-wasm/test/index_browser.ts index 0e3ad0c75..1588c4fdd 100644 --- a/packages/duckdb-wasm/test/index_browser.ts +++ b/packages/duckdb-wasm/test/index_browser.ts @@ -100,6 +100,7 @@ import { testBindings, testAsyncBindings } from './bindings.test'; import { testBatchStream } from './batch_stream.test'; import { testAsyncBatchStream } from './batch_stream_async.test'; import { testFilesystem } from './filesystem.test'; +import { testOPFS } from './opfs.test'; import { testArrowInsert, testArrowInsertAsync } from './insert_arrow.test'; import { testJSONInsert, testJSONInsertAsync } from './insert_json.test'; import { testCSVInsert, testCSVInsertAsync } from './insert_csv.test'; @@ -128,6 +129,7 @@ testAsyncBindings(() => adb!, dataURL, duckdb.DuckDBDataProtocol.HTTP); testBatchStream(() => db!); testAsyncBatchStream(() => adb!); testFilesystem(() => adb!, resolveData, dataURL, duckdb.DuckDBDataProtocol.HTTP); +testOPFS(dataURL, () => DUCKDB_BUNDLE!); testArrowInsert(() => db!); testArrowInsertAsync(() => adb!); testJSONInsert(() => db!); diff --git a/packages/duckdb-wasm/test/opfs.test.ts b/packages/duckdb-wasm/test/opfs.test.ts new file mode 100644 index 000000000..47b9e2497 --- /dev/null +++ b/packages/duckdb-wasm/test/opfs.test.ts @@ -0,0 +1,306 @@ +import * as duckdb from '../src/'; +import {LogLevel} from '../src/'; +import * as arrow from 'apache-arrow'; + +export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): void { + let db: duckdb.AsyncDuckDB; + let conn: duckdb.AsyncDuckDBConnection; + + beforeAll(async () => { + removeFiles(); + }); + + afterAll(async () => { + if (conn) { + await conn.close(); + } + if (db) { + await db.terminate(); + } + removeFiles(); + }); + + beforeEach(async () => { + removeFiles(); + // + const logger = new duckdb.ConsoleLogger(LogLevel.ERROR); + const worker = new Worker(bundle().mainWorker!); + db = new duckdb.AsyncDuckDB(logger, worker); + await db.instantiate(bundle().mainModule, bundle().pthreadWorker); + await db.open({ + path: 'opfs://test.db', + accessMode: duckdb.DuckDBAccessMode.READ_WRITE + }); + conn = await db.connect(); + }); + + afterEach(async () => { + if (conn) { + await conn.close(); + } + if (db) { + await db.terminate(); + } + removeFiles(); + }); + + describe('Load Data in OPFS', () => { + it('Import Small Parquet file', async () => { + await conn.send(`CREATE TABLE stu AS SELECT * FROM "${baseDir}/uni/studenten.parquet"`); + await conn.send(`CHECKPOINT;`); + const result = await conn.send(`SELECT matrnr FROM stu;`); + const batches = []; + for await (const batch of result) { + batches.push(batch); + } + const table = await new arrow.Table<{ cnt: arrow.Int }>(batches); + expect(table.getChildAt(0)?.toArray()).toEqual( + new Int32Array([24002, 25403, 26120, 26830, 27550, 28106, 29120, 29555]), + ); + }); + + it('Import Larget Parquet file', async () => { + await conn.send(`CREATE TABLE lineitem AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + await conn.send(`CHECKPOINT;`); + const result = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem;`); + const batches = []; + for await (const batch of result) { + batches.push(batch); + } + const table = await new arrow.Table<{ cnt: arrow.Int }>(batches); + expect(table.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + }); + + it('Load Existing DB File', async () => { + await conn.send(`CREATE TABLE tmp AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + await conn.send(`CHECKPOINT;`); + await conn.close(); + await db.terminate(); + + const logger = new duckdb.ConsoleLogger(LogLevel.ERROR); + const worker = new Worker(bundle().mainWorker!); + db = new duckdb.AsyncDuckDB(logger, worker); + await db.instantiate(bundle().mainModule, bundle().pthreadWorker); + await db.open({ + path: 'opfs://test.db', + accessMode: duckdb.DuckDBAccessMode.READ_WRITE + }); + conn = await db.connect(); + + const result = await conn.send(`SELECT count(*)::INTEGER as cnt FROM tmp;`); + const batches = []; + for await (const batch of result) { + batches.push(batch); + } + const table = await new arrow.Table<{ cnt: arrow.Int }>(batches); + expect(table.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + }); + + it('Load Parquet file that are already with empty handler', async () => { + //1. write to opfs + const parquetBuffer = await fetch(`${baseDir}/tpch/0_01/parquet/lineitem.parquet`).then(res => + res.arrayBuffer(), + ); + const opfsRoot = await navigator.storage.getDirectory(); + const fileHandle = await opfsRoot.getFileHandle('test.parquet', {create: true}); + const writable = await fileHandle.createWritable(); + await writable.write(parquetBuffer); + await writable.close(); + //2. handle is empty object, because worker gets a File Handle using the file name. + await db.registerFileHandle('test.parquet', null, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await conn.send(`CREATE TABLE lineitem1 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CHECKPOINT;`); + + const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem1;`); + const batches1 = []; + for await (const batch of result1) { + batches1.push(batch); + } + const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1); + expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + }); + + it('Load Parquet file that are already with opfs file handler in datadir', async () => { + //1. write to opfs + const parquetBuffer = await fetch(`${baseDir}/tpch/0_01/parquet/lineitem.parquet`).then(res => + res.arrayBuffer(), + ); + const opfsRoot = await navigator.storage.getDirectory(); + const datadir = await opfsRoot.getDirectoryHandle("datadir", {create: true}); + const fileHandle = await datadir.getFileHandle('test.parquet', {create: true}); + const writable = await fileHandle.createWritable(); + await writable.write(parquetBuffer); + await writable.close(); + //2. handle is opfs file handler + await db.registerFileHandle('test.parquet', fileHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await conn.send(`CREATE TABLE lineitem1 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CHECKPOINT;`); + + const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem1;`); + const batches1 = []; + for await (const batch of result1) { + batches1.push(batch); + } + const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1); + expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + }); + + it('Load Parquet file that are already', async () => { + const parquetBuffer = await fetch(`${baseDir}/tpch/0_01/parquet/lineitem.parquet`).then(res => + res.arrayBuffer(), + ); + const opfsRoot = await navigator.storage.getDirectory(); + const fileHandle = await opfsRoot.getFileHandle('test.parquet', {create: true}); + const writable = await fileHandle.createWritable(); + await writable.write(parquetBuffer); + await writable.close(); + + await db.registerFileHandle('test.parquet', fileHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await conn.send(`CREATE TABLE lineitem1 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CHECKPOINT;`); + await conn.send(`CREATE TABLE lineitem2 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CHECKPOINT;`); + await conn.send(`CREATE TABLE lineitem3 AS SELECT * FROM read_parquet('test.parquet')`); + await conn.send(`CHECKPOINT;`); + + { + const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem1;`); + const batches1 = []; + for await (const batch of result1) { + batches1.push(batch); + } + const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1); + expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + + { + const result2 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem2;`); + const batches2 = []; + for await (const batch of result2) { + batches2.push(batch); + } + const table2 = await new arrow.Table<{ cnt: arrow.Int }>(batches2); + expect(table2.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + + { + const result3 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM lineitem3;`); + const batches3 = []; + for await (const batch of result3) { + batches3.push(batch); + } + const table3 = await new arrow.Table<{ cnt: arrow.Int }>(batches3); + expect(table3.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + + }); + + it('Drop File + Export as CSV to OPFS + Load CSV', async () => { + const opfsRoot = await navigator.storage.getDirectory(); + const testHandle = await opfsRoot.getFileHandle('test.csv', {create: true}); + await db.registerFileHandle('test.csv', testHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await conn.send(`CREATE TABLE zzz AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + await conn.send(`COPY (SELECT * FROM zzz) TO 'test.csv'`); + await conn.close(); + await db.dropFile('test.csv'); + await db.reset(); + + await db.open({}); + conn = await db.connect(); + await db.registerFileHandle('test.csv', testHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + + const result = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test.csv';`); + const batches = []; + for await (const batch of result) { + batches.push(batch); + } + const table = await new arrow.Table<{ cnt: arrow.Int }>(batches); + expect(table.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + + await db.dropFile('test.csv'); + }); + + + it('Drop Files + Export as CSV to OPFS + Load CSV', async () => { + const opfsRoot = await navigator.storage.getDirectory(); + const testHandle1 = await opfsRoot.getFileHandle('test1.csv', {create: true}); + const testHandle2 = await opfsRoot.getFileHandle('test2.csv', {create: true}); + const testHandle3 = await opfsRoot.getFileHandle('test3.csv', {create: true}); + await db.registerFileHandle('test1.csv', testHandle1, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test2.csv', testHandle2, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test3.csv', testHandle3, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + + await conn.send(`CREATE TABLE zzz AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); + await conn.send(`COPY (SELECT * FROM zzz) TO 'test1.csv'`); + await conn.send(`COPY (SELECT * FROM zzz) TO 'test2.csv'`); + await conn.send(`COPY (SELECT * FROM zzz) TO 'test3.csv'`); + await conn.close(); + + await db.dropFiles(); + await db.reset(); + + await db.open({}); + conn = await db.connect(); + await db.registerFileHandle('test1.csv', testHandle1, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test2.csv', testHandle2, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + await db.registerFileHandle('test3.csv', testHandle3, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); + + { + const result1 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test1.csv';`); + const batches1 = []; + for await (const batch of result1) { + batches1.push(batch); + } + const table1 = await new arrow.Table<{ cnt: arrow.Int }>(batches1); + expect(table1.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + { + const result2 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test2.csv';`); + const batches2 = []; + for await (const batch of result2) { + batches2.push(batch); + } + const table2 = await new arrow.Table<{ cnt: arrow.Int }>(batches2); + expect(table2.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + { + const result3 = await conn.send(`SELECT count(*)::INTEGER as cnt FROM 'test3.csv';`); + const batches3 = []; + for await (const batch of result3) { + batches3.push(batch); + } + const table3 = await new arrow.Table<{ cnt: arrow.Int }>(batches3); + expect(table3.getChildAt(0)?.get(0)).toBeGreaterThan(60_000); + } + + await db.dropFiles(); + }); + }); + + async function removeFiles() { + const opfsRoot = await navigator.storage.getDirectory(); + await opfsRoot.removeEntry('test.db').catch(() => { + }); + await opfsRoot.removeEntry('test.db.wal').catch(() => { + }); + await opfsRoot.removeEntry('test.csv').catch(() => { + }); + await opfsRoot.removeEntry('test1.csv').catch(() => { + }); + await opfsRoot.removeEntry('test2.csv').catch(() => { + }); + await opfsRoot.removeEntry('test3.csv').catch(() => { + }); + await opfsRoot.removeEntry('test.parquet').catch(() => { + }); + try { + const datadir = await opfsRoot.getDirectoryHandle('datadir'); + datadir.removeEntry('test.parquet').catch(() => { + }); + } catch (e) { + // + } + await opfsRoot.removeEntry('datadir').catch(() => { + }); + } +} diff --git a/packages/duckdb-wasm/test/string_test_helper.ts b/packages/duckdb-wasm/test/string_test_helper.ts new file mode 100644 index 000000000..96e83b823 --- /dev/null +++ b/packages/duckdb-wasm/test/string_test_helper.ts @@ -0,0 +1,20 @@ +export function generateLongQueryString(): string { + const aaa = repeatCharacter('A', 512); + const ccc = repeatCharacter('C', 256); + const ddd = repeatCharacter('D', 512); + const eee = repeatCharacter('E', 256); + const ggg = repeatCharacter('G', 128); + + return `test=inline` + + `&Test-Security-Token=${aaa}` + + `&Test-Algorithm=${ccc}` + + `&Test-Date=${ddd}` + + `&Test-SignedHeaders=host` + + `&Test-Expires=43200` + + `&Test-Credential=${eee}` + + `&Test-Signature=${ggg}`; +} + +export function repeatCharacter(char: string, length: number): string { + return char.repeat(length); +} \ No newline at end of file diff --git a/patches/duckdb/bind_copy_direct_io.patch b/patches/duckdb/bind_copy_direct_io.patch new file mode 100644 index 000000000..994917fe8 --- /dev/null +++ b/patches/duckdb/bind_copy_direct_io.patch @@ -0,0 +1,15 @@ +diff --git a/src/planner/binder/statement/bind_copy.cpp b/src/planner/binder/statement/bind_copy.cpp +index 7db1db812d..60131d5916 100644 +--- a/src/planner/binder/statement/bind_copy.cpp ++++ b/src/planner/binder/statement/bind_copy.cpp +@@ -137,7 +137,9 @@ BoundStatement Binder::BindCopyTo(CopyStatement &stmt) { + throw NotImplementedException("Can't combine FILE_SIZE_BYTES and PARTITION_BY for COPY"); + } + bool is_remote_file = FileSystem::IsRemoteFile(stmt.info->file_path); +- if (is_remote_file) { ++ if ( is_remote_file ) { ++ use_tmp_file = false; ++ } else if( config.options.use_direct_io ) { + use_tmp_file = false; + } else { + auto &fs = FileSystem::GetFileSystem(context); diff --git a/patches/duckdb/fix_load_database.patch b/patches/duckdb/fix_load_database.patch new file mode 100644 index 000000000..e9d785701 --- /dev/null +++ b/patches/duckdb/fix_load_database.patch @@ -0,0 +1,20 @@ +diff --git a/src/storage/storage_manager.cpp b/src/storage/storage_manager.cpp +index 456cd22614..c4777dd28e 100644 +--- a/src/storage/storage_manager.cpp ++++ b/src/storage/storage_manager.cpp +@@ -163,9 +163,12 @@ void SingleFileStorageManager::LoadDatabase(const optional_idx block_alloc_size) + options.use_direct_io = config.options.use_direct_io; + options.debug_initialize = config.options.debug_initialize; + +- // Check if the database file already exists. +- // Note: a file can also exist if there was a ROLLBACK on a previous transaction creating that file. +- if (!read_only && !fs.FileExists(path)) { ++ auto db_file_handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ | FileFlags::FILE_FLAGS_NULL_IF_NOT_EXISTS); ++ bool is_empty_file = db_file_handle->GetFileSize() == 0; ++ db_file_handle.reset(); ++ ++ // first check if the database exists ++ if (!read_only && ( !fs.FileExists(path) || ( options.use_direct_io && is_empty_file )) ) { + // file does not exist and we are in read-write mode + // create a new file + From 7456b464792fe79b35886f4ff8e534204ae3dbd4 Mon Sep 17 00:00:00 2001 From: Carlo Piovesan Date: Wed, 15 Jan 2025 14:07:43 +0100 Subject: [PATCH 3/4] Move back to syncronous function registerFileHandle in DuckDBBindings Iterating on https://github.com/duckdb/duckdb-wasm/pull/1856 --- .../duckdb-wasm/src/bindings/bindings_base.ts | 27 ++++++++++++++++--- .../src/bindings/bindings_interface.ts | 12 +++++++++ .../src/parallel/worker_dispatcher.ts | 2 +- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/packages/duckdb-wasm/src/bindings/bindings_base.ts b/packages/duckdb-wasm/src/bindings/bindings_base.ts index 84f01c33d..b33524e4f 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_base.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_base.ts @@ -476,20 +476,20 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { for (const item of list) { const { handle, path: filePath, fromCached } = item; if (!fromCached && handle.getSize()) { - await this.registerFileHandle(filePath, handle, DuckDBDataProtocol.BROWSER_FSACCESS, true); + await this.registerFileHandleAsync(filePath, handle, DuckDBDataProtocol.BROWSER_FSACCESS, true); } } return; } throw new Error(`prepareDBFileHandle: unsupported protocol ${protocol}`); } - /** Register a file object URL */ - public async registerFileHandle( + /** Prepare a file object URL */ + public async prepareFileHandleAsync( name: string, handle: HandleType, protocol: DuckDBDataProtocol, directIO: boolean, - ): Promise { + ): Promise { if (protocol === DuckDBDataProtocol.BROWSER_FSACCESS) { if( handle instanceof FileSystemSyncAccessHandle ){ // already a handle is sync handle. @@ -512,6 +512,25 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings { } } } + return handle; + } + /** Register a file object URL async */ + public async registerFileHandleAsync( + name: string, + handle: HandleType, + protocol: DuckDBDataProtocol, + directIO: boolean, + ): Promise { + const handle_inner = await this.prepareFileHandleAsync(name, handle, protocol, directIO); + this.registerFileHandle(name, handle_inner, protocol, directIO); + } + /** Register a file object URL */ + public registerFileHandle( + name: string, + handle: HandleType, + protocol: DuckDBDataProtocol, + directIO: boolean, + ): void { const [s, d, n] = callSRet( this.mod, 'duckdb_web_fs_register_file_url', diff --git a/packages/duckdb-wasm/src/bindings/bindings_interface.ts b/packages/duckdb-wasm/src/bindings/bindings_interface.ts index 31ccca43b..dcf0fb926 100644 --- a/packages/duckdb-wasm/src/bindings/bindings_interface.ts +++ b/packages/duckdb-wasm/src/bindings/bindings_interface.ts @@ -41,7 +41,19 @@ export interface DuckDBBindings { handle: HandleType, protocol: DuckDBDataProtocol, directIO: boolean, + ): void; + registerFileHandleAsync( + name: string, + handle: HandleType, + protocol: DuckDBDataProtocol, + directIO: boolean, ): Promise; + prepareFileHandleAsync( + name: string, + handle: HandleType, + protocol: DuckDBDataProtocol, + directIO: boolean, + ): Promise; prepareDBFileHandle(path: string, protocol: DuckDBDataProtocol): Promise; globFiles(path: string): WebFile[]; dropFile(name: string): void; diff --git a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts index 2ebcfd07c..06b81a5b8 100644 --- a/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts +++ b/packages/duckdb-wasm/src/parallel/worker_dispatcher.ts @@ -328,7 +328,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger { break; case WorkerRequestType.REGISTER_FILE_HANDLE: - await this._bindings.registerFileHandle( + await this._bindings.registerFileHandleAsync( request.data[0], request.data[1], request.data[2], From 0f29e562f90a0389e6b36ee731f760d4d3e4b76e Mon Sep 17 00:00:00 2001 From: Carlo Piovesan Date: Wed, 15 Jan 2025 21:05:42 +0100 Subject: [PATCH 4/4] Handle graciously failures on Close Needs first to avoid throwing in destructor (big no), AND to convert JS exception in C++ exception More iteration on comments to https://github.com/duckdb/duckdb-wasm/pull/1856 --- lib/include/duckdb/web/io/web_filesystem.h | 8 +++++++- packages/duckdb-wasm/src/bindings/runtime_browser.ts | 5 +++++ packages/duckdb-wasm/test/opfs.test.ts | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/include/duckdb/web/io/web_filesystem.h b/lib/include/duckdb/web/io/web_filesystem.h index 96638f992..4f42934ee 100644 --- a/lib/include/duckdb/web/io/web_filesystem.h +++ b/lib/include/duckdb/web/io/web_filesystem.h @@ -141,7 +141,13 @@ class WebFileSystem : public duckdb::FileSystem { /// Delete copy constructor WebFileHandle(const WebFileHandle &) = delete; /// Destructor - virtual ~WebFileHandle() { Close(); } + virtual ~WebFileHandle() { + try { + Close(); + } catch (...) { + // Avoid crashes if Close happens to throw + } + } /// Get the file name auto &GetName() const { return file_->file_name_; } /// Resolve readahead diff --git a/packages/duckdb-wasm/src/bindings/runtime_browser.ts b/packages/duckdb-wasm/src/bindings/runtime_browser.ts index 8e46955fe..8099acde7 100644 --- a/packages/duckdb-wasm/src/bindings/runtime_browser.ts +++ b/packages/duckdb-wasm/src/bindings/runtime_browser.ts @@ -475,6 +475,7 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { closeFile: (mod: DuckDBModule, fileId: number) => { const file = BROWSER_RUNTIME.getFileInfo(mod, fileId); BROWSER_RUNTIME._fileInfoCache.delete(fileId); + try { switch (file?.dataProtocol) { case DuckDBDataProtocol.BUFFER: case DuckDBDataProtocol.HTTP: @@ -492,6 +493,10 @@ export const BROWSER_RUNTIME: DuckDBRuntime & { return handle.flush(); } } + } catch (e: any) { + console.log(e); + failWith(mod, e.toString()); + } }, dropFile: (mod: DuckDBModule, fileNamePtr: number, fileNameLen: number) => { const fileName = readString(mod, fileNamePtr, fileNameLen); diff --git a/packages/duckdb-wasm/test/opfs.test.ts b/packages/duckdb-wasm/test/opfs.test.ts index 47b9e2497..eaf1a0fcc 100644 --- a/packages/duckdb-wasm/test/opfs.test.ts +++ b/packages/duckdb-wasm/test/opfs.test.ts @@ -201,6 +201,7 @@ export function testOPFS(baseDir: string, bundle: () => duckdb.DuckDBBundle): vo await db.registerFileHandle('test.csv', testHandle, duckdb.DuckDBDataProtocol.BROWSER_FSACCESS, true); await conn.send(`CREATE TABLE zzz AS SELECT * FROM "${baseDir}/tpch/0_01/parquet/lineitem.parquet"`); await conn.send(`COPY (SELECT * FROM zzz) TO 'test.csv'`); + await conn.send(`COPY (SELECT * FROM zzz) TO 'non_existing.csv'`); await conn.close(); await db.dropFile('test.csv'); await db.reset();