From 04eec17600faf852c7d21dfa83cdef9e502100fb Mon Sep 17 00:00:00 2001 From: Rebekah Davis Date: Mon, 20 Oct 2025 16:21:27 -0400 Subject: [PATCH] Remove filesystem #ifdefs from VFS. --- test/src/unit-capi-sparse_array.cc | 2 +- test/src/unit-enumerations.cc | 4 +- test/src/unit-request-handlers.cc | 4 +- test/src/unit-s3.cc | 16 +- test/src/unit-ssl-config.cc | 18 +- test/src/unit-vfs.cc | 69 ++++- test/support/src/vfs_helpers.cc | 41 +-- test/support/src/vfs_helpers.h | 41 ++- tiledb/api/c_api/context/context_api.cc | 6 +- .../c_api/filesystem/filesystem_api_enum.h | 4 +- tiledb/api/c_api/vfs/vfs_api.cc | 7 +- tiledb/api/c_api/vfs/vfs_api_internal.h | 11 +- tiledb/sm/array/array.cc | 42 +-- tiledb/sm/array/array_directory.cc | 34 +-- tiledb/sm/array/test/unit_consistency.cc | 6 +- .../array_schema/array_schema_operations.cc | 12 +- .../array_schema/test/unit_current_domain.cc | 6 +- tiledb/sm/c_api/tiledb_filestore.cc | 24 +- .../consolidator/array_meta_consolidator.cc | 10 +- .../sm/consolidator/commits_consolidator.cc | 8 +- tiledb/sm/consolidator/consolidator.cc | 8 +- .../sm/consolidator/fragment_consolidator.cc | 24 +- .../fragment_meta_consolidator.cc | 8 +- .../consolidator/group_meta_consolidator.cc | 12 +- tiledb/sm/enums/filesystem.h | 10 +- tiledb/sm/filesystem/filesystem_base.cc | 18 ++ tiledb/sm/filesystem/filesystem_base.h | 87 +++++++ tiledb/sm/filesystem/local.h | 11 + tiledb/sm/filesystem/posix.cc | 6 +- tiledb/sm/filesystem/posix.h | 3 +- tiledb/sm/filesystem/s3.cc | 63 ++++- tiledb/sm/filesystem/s3.h | 36 ++- tiledb/sm/filesystem/test/CMakeLists.txt | 11 +- tiledb/sm/filesystem/test/compile_vfs_main.cc | 8 +- tiledb/sm/filesystem/test/unit_ls_filtered.cc | 10 +- .../test/unit_vfs_read_log_modes.cc | 2 +- tiledb/sm/filesystem/vfs.cc | 241 ++++-------------- tiledb/sm/filesystem/vfs.h | 190 ++------------ tiledb/sm/filesystem/win.cc | 6 +- tiledb/sm/filesystem/win.h | 3 +- tiledb/sm/fragment/fragment_info.cc | 4 +- tiledb/sm/fragment/fragment_metadata.cc | 26 +- tiledb/sm/group/group.cc | 31 +-- tiledb/sm/group/group_details.cc | 6 +- tiledb/sm/misc/constants.cc | 3 + tiledb/sm/misc/constants.h | 3 + tiledb/sm/object/object.cc | 18 +- tiledb/sm/object/object_iter.cc | 10 +- .../deletes_and_updates.cc | 2 +- tiledb/sm/query/readers/filtered_data.h | 4 +- .../sm/query/writers/global_order_writer.cc | 31 +-- tiledb/sm/query/writers/global_order_writer.h | 18 +- tiledb/sm/query/writers/ordered_writer.cc | 4 +- tiledb/sm/query/writers/unordered_writer.cc | 4 +- tiledb/sm/query/writers/writer_base.cc | 20 +- tiledb/sm/serialization/query.cc | 4 +- .../sm/storage_manager/context_resources.cc | 38 ++- tiledb/sm/storage_manager/context_resources.h | 20 +- tiledb/sm/storage_manager/storage_manager.cc | 4 +- tiledb/sm/tile/generic_tile_io.cc | 18 +- 60 files changed, 710 insertions(+), 680 deletions(-) diff --git a/test/src/unit-capi-sparse_array.cc b/test/src/unit-capi-sparse_array.cc index f9c5c29c8f1..2b44ba12273 100644 --- a/test/src/unit-capi-sparse_array.cc +++ b/test/src/unit-capi-sparse_array.cc @@ -7162,6 +7162,6 @@ TEST_CASE_METHOD( deserialized_array_dir->timestamp_start() == array_dir.timestamp_start()); REQUIRE(deserialized_array_dir->timestamp_end() == array_dir.timestamp_end()); - REQUIRE_NOTHROW(resources.vfs().remove_dir(tiledb::sm::URI(array_name))); + REQUIRE_NOTHROW(resources.vfs()->remove_dir(tiledb::sm::URI(array_name))); #endif } diff --git a/test/src/unit-enumerations.cc b/test/src/unit-enumerations.cc index 73ea8ed8f38..6cc5a9c7360 100644 --- a/test/src/unit-enumerations.cc +++ b/test/src/unit-enumerations.cc @@ -3108,8 +3108,8 @@ bool EnumerationFx::vec_cmp(std::vector v1, std::vector v2) { } void EnumerationFx::rm_array() { - if (ctx_.resources().vfs().is_dir(uri_)) { - ctx_.resources().vfs().remove_dir(uri_); + if (ctx_.resources().vfs()->is_dir(uri_)) { + ctx_.resources().vfs()->remove_dir(uri_); } } diff --git a/test/src/unit-request-handlers.cc b/test/src/unit-request-handlers.cc index 83b8c9867ec..e5c472d2a2c 100644 --- a/test/src/unit-request-handlers.cc +++ b/test/src/unit-request-handlers.cc @@ -432,8 +432,8 @@ void RequestHandlerFx::create_array() { } void RequestHandlerFx::delete_array() { - if (ctx_.resources().vfs().is_dir(uri_)) { - ctx_.resources().vfs().remove_dir(uri_); + if (ctx_.resources().vfs()->is_dir(uri_)) { + ctx_.resources().vfs()->remove_dir(uri_); } } diff --git a/test/src/unit-s3.cc b/test/src/unit-s3.cc index e2eed15e704..eab7c8ca9ed 100644 --- a/test/src/unit-s3.cc +++ b/test/src/unit-s3.cc @@ -365,8 +365,9 @@ TEST_CASE( return !result_filter(a.first, a.second); }); - auto scan = s3_test.get_s3().scanner( - s3_test.temp_dir_, result_filter, recursive, max_keys); + auto scan = + s3_test.get_fs(s3_test.temp_dir_) + .scanner(s3_test.temp_dir_, result_filter, recursive, max_keys); std::vector results_vector(scan.begin(), scan.end()); CHECK(results_vector.size() == expected.size()); @@ -388,11 +389,12 @@ TEST_CASE("S3: S3Scanner iterator", "[s3][ls-scan-iterator]") { std::vector results_vector; DYNAMIC_SECTION("Testing with " << max_keys << " max keys from S3") { - auto scan = s3_test.get_s3().scanner( - s3_test.temp_dir_, - tiledb::sm::LsScanner::accept_all, - recursive, - max_keys); + auto scan = s3_test.get_fs(s3_test.temp_dir_) + .scanner( + s3_test.temp_dir_, + tiledb::sm::LsScanner::accept_all, + recursive, + max_keys); SECTION("for loop") { SECTION("range based for") { diff --git a/test/src/unit-ssl-config.cc b/test/src/unit-ssl-config.cc index 1317c79b4d9..ad2911c80c8 100644 --- a/test/src/unit-ssl-config.cc +++ b/test/src/unit-ssl-config.cc @@ -242,9 +242,9 @@ std::string get_test_ca_file() { void check_failure(Filesystem fs, Config& cfg) { Context ctx(cfg); - auto& vfs = ctx.resources().vfs(); + auto vfs = ctx.resources().vfs(); - if (!vfs.supports_fs(fs)) { + if (!vfs->supports_fs(fs)) { return; } @@ -266,7 +266,7 @@ void check_failure(Filesystem fs, Config& cfg) { std::vector uris; try { - st = vfs.ls(bucket_uri, &uris); + st = vfs->ls(bucket_uri, &uris); } catch (...) { // Some backends throw exceptions to signal SSL error conditions // so we pass the test by returning early here. @@ -279,9 +279,9 @@ void check_failure(Filesystem fs, Config& cfg) { void check_success(Filesystem fs, Config& cfg) { Context ctx(cfg); - auto& vfs = ctx.resources().vfs(); + auto vfs = ctx.resources().vfs(); - if (!vfs.supports_fs(fs)) { + if (!vfs->supports_fs(fs)) { return; } @@ -298,9 +298,9 @@ void check_success(Filesystem fs, Config& cfg) { } URI bucket_uri = URI(scheme + "://" + bucket_name); - if (vfs.is_bucket(bucket_uri)) { - vfs.remove_bucket(bucket_uri); + if (vfs->is_bucket(bucket_uri)) { + vfs->remove_bucket(bucket_uri); } - vfs.create_bucket(bucket_uri); - REQUIRE(vfs.is_bucket(bucket_uri)); + vfs->create_bucket(bucket_uri); + REQUIRE(vfs->is_bucket(bucket_uri)); } diff --git a/test/src/unit-vfs.cc b/test/src/unit-vfs.cc index 597b4a8999a..0093b2e6972 100644 --- a/test/src/unit-vfs.cc +++ b/test/src/unit-vfs.cc @@ -141,8 +141,15 @@ std::string local_path() { TEST_CASE("VFS: Test long local paths", "[vfs][long-paths]") { ThreadPool compute_tp(4); ThreadPool io_tp(4); + Config config; + Context ctx(config); VFS vfs{ - &g_helper_stats, g_helper_logger().get(), &compute_tp, &io_tp, Config{}}; + &g_helper_stats, + g_helper_logger().get(), + &compute_tp, + &io_tp, + config, + ctx.resources().make_filesystems(&g_helper_stats, &io_tp, config)}; SECTION("- Deep hierarchy") { // Create a nested path with a long total length @@ -210,8 +217,14 @@ TEST_CASE("VFS: copy_file", "[vfs][copy_file]") { ThreadPool compute_tp(4); ThreadPool io_tp(4); Config config = set_config_params(); + Context ctx(config); VFS vfs{ - &g_helper_stats, g_helper_logger().get(), &compute_tp, &io_tp, config}; + &g_helper_stats, + g_helper_logger().get(), + &compute_tp, + &io_tp, + config, + ctx.resources().make_filesystems(&g_helper_stats, &io_tp, config)}; size_t test_str_size = 0; SECTION("Filesize = 0 MB") { @@ -292,8 +305,14 @@ TEST_CASE("VFS: copy_dir", "[vfs][copy_dir]") { ThreadPool compute_tp(4); ThreadPool io_tp(4); Config config = set_config_params(); + Context ctx(config); VFS vfs{ - &g_helper_stats, g_helper_logger().get(), &compute_tp, &io_tp, config}; + &g_helper_stats, + g_helper_logger().get(), + &compute_tp, + &io_tp, + config, + ctx.resources().make_filesystems(&g_helper_stats, &io_tp, config)}; /* Create the following file hierarchy: * @@ -400,8 +419,14 @@ TEMPLATE_LIST_TEST_CASE( ThreadPool compute_tp(4); ThreadPool io_tp(4); Config config = set_config_params(); + Context ctx(config); VFS vfs{ - &g_helper_stats, g_helper_logger().get(), &compute_tp, &io_tp, config}; + &g_helper_stats, + g_helper_logger().get(), + &compute_tp, + &io_tp, + config, + ctx.resources().make_filesystems(&g_helper_stats, &io_tp, config)}; URI path = fs.temp_dir_.add_trailing_slash(); @@ -643,8 +668,14 @@ TEMPLATE_LIST_TEST_CASE("VFS: File I/O", "[vfs][uri][file_io]", AllBackends) { ThreadPool compute_tp(4); ThreadPool io_tp(4); Config config = set_config_params(disable_multipart, max_parallel_ops); + Context ctx(config); VFS vfs{ - &g_helper_stats, g_helper_logger().get(), &compute_tp, &io_tp, config}; + &g_helper_stats, + g_helper_logger().get(), + &compute_tp, + &io_tp, + config, + ctx.resources().make_filesystems(&g_helper_stats, &io_tp, config)}; // Getting file_size on a nonexistent blob shouldn't crash on Azure URI non_existent = URI(path.to_string() + "non_existent"); @@ -731,9 +762,14 @@ TEST_CASE("VFS: Test end-to-end", "[.vfs-e2e]") { ThreadPool io_tp(1); // Will be configured from environment variables. Config config; - + Context ctx(config); VFS vfs{ - &g_helper_stats, g_helper_logger().get(), &compute_tp, &io_tp, config}; + &g_helper_stats, + g_helper_logger().get(), + &compute_tp, + &io_tp, + config, + ctx.resources().make_filesystems(&g_helper_stats, &io_tp, config)}; REQUIRE(vfs.supports_uri_scheme(test_file)); CHECK(vfs.file_size(test_file) > 0); } @@ -741,8 +777,15 @@ TEST_CASE("VFS: Test end-to-end", "[.vfs-e2e]") { TEST_CASE("VFS: test ls_with_sizes", "[vfs][ls-with-sizes]") { ThreadPool compute_tp(4); ThreadPool io_tp(4); + Config config; + Context ctx(config); VFS vfs_ls{ - &g_helper_stats, g_helper_logger().get(), &compute_tp, &io_tp, Config{}}; + &g_helper_stats, + g_helper_logger().get(), + &compute_tp, + &io_tp, + config, + ctx.resources().make_filesystems(&g_helper_stats, &io_tp, config)}; std::string path = local_path(); std::string dir = path + "ls_dir"; @@ -898,7 +941,15 @@ TEST_CASE("VFS: Throwing filters for ls_recursive", "[vfs][ls_recursive]") { TEST_CASE("VFS: Test remove_dir_if_empty", "[vfs][remove-dir-if-empty]") { ThreadPool tp(1); - VFS vfs{&g_helper_stats, g_helper_logger().get(), &tp, &tp, Config{}}; + Config config; + Context ctx(config); + VFS vfs{ + &g_helper_stats, + g_helper_logger().get(), + &tp, + &tp, + config, + ctx.resources().make_filesystems(&g_helper_stats, &tp, config)}; std::string path = local_path(); std::string dir = path + "remove_dir_if_empty/"; diff --git a/test/support/src/vfs_helpers.cc b/test/support/src/vfs_helpers.cc index 8ca7b79d4a9..478298efb70 100644 --- a/test/support/src/vfs_helpers.cc +++ b/test/support/src/vfs_helpers.cc @@ -45,10 +45,6 @@ #include "test/support/src/helpers.h" #include "test/support/src/vfs_helpers.h" -#if HAVE_S3 -#include "tiledb/sm/filesystem/s3.h" -#endif - #include "tiledb/sm/rest/rest_client.h" namespace tiledb::test { @@ -118,7 +114,7 @@ Status vfs_test_init( } for (auto& supported_fs : fs_vec) { - REQUIRE(supported_fs->init(*ctx, *vfs).ok()); + (void)supported_fs->init(*ctx, *vfs); } return Status::Ok(); @@ -432,11 +428,15 @@ VFSTestBase::VFSTestBase( tiledb::test::g_helper_logger().get(), &io_, &compute_, - create_test_config()) + create_test_config(), + tiledb::sm::ContextResources::make_filesystems( + &tiledb::test::g_helper_stats, &io_, create_test_config())) , prefix_(prefix) , temp_dir_(tiledb::test::test_dir(prefix_)) , is_supported_(vfs_.supports_uri_scheme(temp_dir_)) { - // TODO: Throw when we can provide a list of supported filesystems to Catch2. + if (!is_supported_) { + throw tiledb::sm::filesystem::BuiltWithout(prefix_); + } } VFSTestBase::~VFSTestBase() { @@ -500,33 +500,6 @@ VFSTest::VFSTest( std::sort(expected_results_.begin(), expected_results_.end()); } -S3Test::S3Test(const std::vector& test_tree) - : VFSTestBase(test_tree, "s3://") - , S3_within_VFS(&tiledb::test::g_helper_stats, &io_, vfs_.config()) { -#ifdef HAVE_S3 - s3().create_bucket(temp_dir_); - for (size_t i = 1; i <= test_tree_.size(); i++) { - sm::URI path = temp_dir_.join_path("subdir_" + std::to_string(i)); - // VFS::create_dir is a no-op for S3; Just create objects. - if (test_tree_[i - 1] > 0) { - // Do not include an empty prefix in expected results. - // The only way to retrieve an empty prefix in ls_recursive results is to - // explicitly create an empty prefix object through AWS console or SDK. - expected_results_.emplace_back(path.to_string(), 0); - } - for (size_t j = 1; j <= test_tree_[i - 1]; j++) { - auto object_uri = path.join_path("test_file_" + std::to_string(j)); - s3().touch(object_uri); - std::string data(j * 10, 'a'); - s3().write(object_uri, data.data(), data.size()); - s3().flush(object_uri); - expected_results_.emplace_back(object_uri.to_string(), data.size()); - } - } - std::sort(expected_results_.begin(), expected_results_.end()); -#endif -} - LocalFsTest::LocalFsTest(const std::vector& test_tree) : VFSTestBase(test_tree, "file://") { #ifdef _WIN32 diff --git a/test/support/src/vfs_helpers.h b/test/support/src/vfs_helpers.h index 7d9fe2053b8..f71c58d3bb8 100644 --- a/test/support/src/vfs_helpers.h +++ b/test/support/src/vfs_helpers.h @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2020-2023 TileDB, Inc. + * @copyright Copyright (c) 2020-2025 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -39,6 +39,10 @@ #include "tiledb/sm/enums/vfs_mode.h" #include "tiledb/sm/filesystem/vfs.h" +#if HAVE_S3 +#include "tiledb/sm/filesystem/s3.h" +#endif + namespace tiledb::test { // Forward declaration @@ -1042,19 +1046,36 @@ class VFSTest : public VFSTestBase { }; /** Test object for tiledb::sm::S3 functionality. */ -class S3Test : public VFSTestBase, protected tiledb::sm::S3_within_VFS { +class S3Test : public VFSTestBase { public: - explicit S3Test(const std::vector& test_tree); - + explicit S3Test(const std::vector& test_tree) + : VFSTestBase(test_tree, "s3://") { #ifdef HAVE_S3 - /** Expose protected accessor from S3_within_VFS. */ - tiledb::sm::S3& get_s3() { - return s3(); + vfs_.create_bucket(temp_dir_); + for (size_t i = 1; i <= test_tree_.size(); i++) { + sm::URI path = temp_dir_.join_path("subdir_" + std::to_string(i)); + // VFS::create_dir is a no-op for S3; Just create objects. + if (test_tree_[i - 1] > 0) { + // Do not include an empty prefix in expected results. + expected_results_.emplace_back(path.to_string(), 0); + } + for (size_t j = 1; j <= test_tree_[i - 1]; j++) { + auto object_uri = path.join_path("test_file_" + std::to_string(j)); + vfs_.touch(object_uri); + std::string data(j * 10, 'a'); + vfs_.write(object_uri, data.data(), data.size()); + vfs_.close_file(object_uri).ok(); + expected_results_.emplace_back(object_uri.to_string(), data.size()); + } + } + std::sort(expected_results_.begin(), expected_results_.end()); +#endif } - /** Expose protected const accessor from S3_within_VFS. */ - const tiledb::sm::S3& get_s3() const { - return s3(); +#ifdef HAVE_S3 + /** Expose protected accessor from S3 within VFS. */ + tiledb::sm::S3& get_fs(const tiledb::sm::URI& uri) { + return dynamic_cast(vfs_.get_fs(uri)); } #endif }; diff --git a/tiledb/api/c_api/context/context_api.cc b/tiledb/api/c_api/context/context_api.cc index 61a0de7f9db..0fb0c34942c 100644 --- a/tiledb/api/c_api/context/context_api.cc +++ b/tiledb/api/c_api/context/context_api.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2022-2024 TileDB, Inc. + * @copyright Copyright (c) 2022-2025 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -34,6 +34,7 @@ #include "context_api_external.h" #include "context_api_internal.h" #include "tiledb/api/c_api_support/c_api_support.h" +#include "tiledb/sm/enums/filesystem.h" #include "tiledb/sm/rest/rest_client.h" namespace tiledb::api { @@ -119,8 +120,7 @@ capi_return_t tiledb_ctx_get_last_error( capi_return_t tiledb_ctx_is_supported_fs( tiledb_ctx_t* ctx, tiledb_filesystem_t fs, int32_t* is_supported) { ensure_output_pointer_is_valid(is_supported); - - *is_supported = (int32_t)ctx->context().resources().vfs().supports_fs( + *is_supported = (int32_t)ctx->context().resources().vfs()->supports_fs( static_cast(fs)); return TILEDB_OK; } diff --git a/tiledb/api/c_api/filesystem/filesystem_api_enum.h b/tiledb/api/c_api/filesystem/filesystem_api_enum.h index 080310a5500..a9e1d15d012 100644 --- a/tiledb/api/c_api/filesystem/filesystem_api_enum.h +++ b/tiledb/api/c_api/filesystem/filesystem_api_enum.h @@ -1,7 +1,7 @@ /* * The MIT License * - * @copyright Copyright (c) 2022 TileDB, Inc. + * @copyright Copyright (c) 2022-2025 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -40,4 +40,6 @@ TILEDB_FILESYSTEM_ENUM(GCS) = 3, /** In-memory filesystem */ TILEDB_FILESYSTEM_ENUM(MEMFS) = 4, + /** Local filesystem */ + TILEDB_FILESYSTEM_ENUM(LOCAL) = 5, #endif diff --git a/tiledb/api/c_api/vfs/vfs_api.cc b/tiledb/api/c_api/vfs/vfs_api.cc index 6d85f18a06c..c88cdb2f362 100644 --- a/tiledb/api/c_api/vfs/vfs_api.cc +++ b/tiledb/api/c_api/vfs/vfs_api.cc @@ -67,7 +67,12 @@ capi_return_t tiledb_vfs_alloc( ctx_config.inherit((config->config())); } *vfs = tiledb_vfs_t::make_handle( - &stats, logger, &compute_tp, &io_tp, ctx_config); + &stats, + logger, + &compute_tp, + &io_tp, + ctx_config, + resources.make_filesystems(&stats, &io_tp, ctx_config)); return TILEDB_OK; } diff --git a/tiledb/api/c_api/vfs/vfs_api_internal.h b/tiledb/api/c_api/vfs/vfs_api_internal.h index dfa07412f56..98ddbd62298 100644 --- a/tiledb/api/c_api/vfs/vfs_api_internal.h +++ b/tiledb/api/c_api/vfs/vfs_api_internal.h @@ -59,8 +59,15 @@ struct tiledb_vfs_handle_t Logger* logger, ThreadPool* compute_tp, ThreadPool* io_tp, - const tiledb::sm::Config& config) - : vfs_{parent_stats, logger, compute_tp, io_tp, config} { + const tiledb::sm::Config& config, + std::vector>&& filesystems) + : vfs_{ + parent_stats, + logger, + compute_tp, + io_tp, + config, + std::move(filesystems)} { } vfs_type* vfs() { diff --git a/tiledb/sm/array/array.cc b/tiledb/sm/array/array.cc index 2c35059ffa1..0f7e259eac5 100644 --- a/tiledb/sm/array/array.cc +++ b/tiledb/sm/array/array.cc @@ -216,41 +216,41 @@ void Array::create( } // Create array directory - resources.vfs().create_dir(array_uri); + resources.vfs()->create_dir(array_uri); // Create array schema directory URI array_schema_dir_uri = array_uri.join_path(constants::array_schema_dir_name); - resources.vfs().create_dir(array_schema_dir_uri); + resources.vfs()->create_dir(array_schema_dir_uri); // Create the enumerations directory inside the array schema directory URI array_enumerations_uri = array_schema_dir_uri.join_path(constants::array_enumerations_dir_name); - resources.vfs().create_dir(array_enumerations_uri); + resources.vfs()->create_dir(array_enumerations_uri); // Create commit directory URI array_commit_uri = array_uri.join_path(constants::array_commits_dir_name); - resources.vfs().create_dir(array_commit_uri); + resources.vfs()->create_dir(array_commit_uri); // Create fragments directory URI array_fragments_uri = array_uri.join_path(constants::array_fragments_dir_name); - resources.vfs().create_dir(array_fragments_uri); + resources.vfs()->create_dir(array_fragments_uri); // Create array metadata directory URI array_metadata_uri = array_uri.join_path(constants::array_metadata_dir_name); - resources.vfs().create_dir(array_metadata_uri); + resources.vfs()->create_dir(array_metadata_uri); // Create fragment metadata directory URI array_fragment_metadata_uri = array_uri.join_path(constants::array_fragment_meta_dir_name); - resources.vfs().create_dir(array_fragment_metadata_uri); + resources.vfs()->create_dir(array_fragment_metadata_uri); // Create dimension label directory URI array_dimension_labels_uri = array_uri.join_path(constants::array_dimension_labels_dir_name); - resources.vfs().create_dir(array_dimension_labels_uri); + resources.vfs()->create_dir(array_dimension_labels_uri); // Store the array schema try { @@ -282,7 +282,7 @@ void Array::create( store_array_schema(resources, array_schema, encryption_key); } } catch (...) { - resources.vfs().remove_dir(array_uri); + resources.vfs()->remove_dir(array_uri); throw; } } @@ -688,7 +688,7 @@ void Array::delete_fragments( } // Delete fragments and commits - auto vfs = &(resources.vfs()); + auto vfs = resources.vfs(); throw_if_not_ok(parallel_for( &resources.compute_tp(), 0, fragment_uris.size(), [&](size_t i) { vfs->remove_dir(fragment_uris[i].uri_); @@ -719,7 +719,7 @@ void Array::delete_fragments( } void Array::delete_array(ContextResources& resources, const URI& uri) { - auto& vfs = resources.vfs(); + auto vfs = resources.vfs(); auto array_dir = ArrayDirectory(resources, uri, 0, std::numeric_limits::max()); @@ -729,9 +729,9 @@ void Array::delete_array(ContextResources& resources, const URI& uri) { // Delete array metadata, fragment metadata and array schema files // Note: metadata files may not be present, try to delete anyway - vfs.remove_files(&resources.compute_tp(), array_dir.array_meta_uris()); - vfs.remove_files(&resources.compute_tp(), array_dir.fragment_meta_uris()); - vfs.remove_files(&resources.compute_tp(), array_dir.array_schema_uris()); + vfs->remove_files(&resources.compute_tp(), array_dir.array_meta_uris()); + vfs->remove_files(&resources.compute_tp(), array_dir.fragment_meta_uris()); + vfs->remove_files(&resources.compute_tp(), array_dir.array_schema_uris()); // Delete all tiledb child directories // Note: using vfs.ls() here could delete user data @@ -740,8 +740,8 @@ void Array::delete_array(ContextResources& resources, const URI& uri) { for (auto array_dir_name : constants::array_dir_names) { dirs.emplace_back(URI(parent_dir + array_dir_name)); } - vfs.remove_dirs(&resources.compute_tp(), dirs); - vfs.remove_dir_if_empty(array_dir.uri()); + vfs->remove_dirs(&resources.compute_tp(), dirs); + vfs->remove_dir_if_empty(array_dir.uri()); } void Array::delete_array(const URI& uri) { @@ -1788,7 +1788,7 @@ Array::open_for_writes() { auto timer_se = resources_.stats().start_timer("array_open_write_load_schemas"); // Checks - if (!resources_.vfs().supports_uri_scheme(array_uri_)) { + if (!resources_.vfs()->supports_uri_scheme(array_uri_)) { throw ArrayException("Cannot open array; URI scheme unsupported."); } @@ -1954,7 +1954,7 @@ void Array::upgrade_version( // Create array schema directory if necessary URI array_schema_dir_uri = array_uri.join_path(constants::array_schema_dir_name); - resources.vfs().create_dir(array_schema_dir_uri); + resources.vfs()->create_dir(array_schema_dir_uri); // Store array schema store_array_schema(resources, array_schema, encryption_key_cfg); @@ -1962,17 +1962,17 @@ void Array::upgrade_version( // Create commit directory if necessary URI array_commit_uri = array_uri.join_path(constants::array_commits_dir_name); - resources.vfs().create_dir(array_commit_uri); + resources.vfs()->create_dir(array_commit_uri); // Create fragments directory if necessary URI array_fragments_uri = array_uri.join_path(constants::array_fragments_dir_name); - resources.vfs().create_dir(array_fragments_uri); + resources.vfs()->create_dir(array_fragments_uri); // Create fragment metadata directory if necessary URI array_fragment_metadata_uri = array_uri.join_path(constants::array_fragment_meta_dir_name); - resources.vfs().create_dir(array_fragment_metadata_uri); + resources.vfs()->create_dir(array_fragment_metadata_uri); } } diff --git a/tiledb/sm/array/array_directory.cc b/tiledb/sm/array/array_directory.cc index 2dd9156447d..5ce76a5439f 100644 --- a/tiledb/sm/array/array_directory.cc +++ b/tiledb/sm/array/array_directory.cc @@ -75,7 +75,7 @@ ArrayDirectory::ArrayDirectory( ArrayDirectoryMode mode) : resources_(resources) , uri_(uri.add_trailing_slash()) - , stats_(resources_.get().vfs().stats()->create_child("ArrayDirectory")) + , stats_(resources_.get().vfs()->stats()->create_child("ArrayDirectory")) , timestamp_start_(timestamp_start) , timestamp_end_(timestamp_end) , mode_(mode) @@ -293,8 +293,8 @@ void ArrayDirectory::write_commit_ignore_file( auto data = ss.str(); URI ignore_file_uri = get_commits_dir(constants::format_version) .join_path(name + constants::ignore_file_suffix); - resources_.get().vfs().write(ignore_file_uri, data.c_str(), data.size()); - throw_if_not_ok(resources_.get().vfs().close_file(ignore_file_uri)); + resources_.get().vfs()->write(ignore_file_uri, data.c_str(), data.size()); + throw_if_not_ok(resources_.get().vfs()->close_file(ignore_file_uri)); } void ArrayDirectory::delete_fragments_list( @@ -322,10 +322,10 @@ void ArrayDirectory::delete_fragments_list( // Delete fragments and commits throw_if_not_ok(parallel_for( &resources_.get().compute_tp(), 0, uris.size(), [&](size_t i) { - auto& vfs = resources_.get().vfs(); - vfs.remove_dir(uris[i]); - if (vfs.is_file(commit_uris_to_delete[i])) { - vfs.remove_file(commit_uris_to_delete[i]); + auto vfs = resources_.get().vfs(); + vfs->remove_dir(uris[i]); + if (vfs->is_file(commit_uris_to_delete[i])) { + vfs->remove_file(commit_uris_to_delete[i]); } return Status::Ok(); })); @@ -589,7 +589,7 @@ const std::set& ArrayDirectory::dir_names() { } std::vector ArrayDirectory::ls(const URI& uri) const { - auto dir_entries = resources_.get().vfs().ls_with_sizes(uri); + auto dir_entries = resources_.get().vfs()->ls_with_sizes(uri); auto& dirs = dir_names(); std::vector uris; uris.reserve(dir_entries.size()); @@ -729,11 +729,11 @@ ArrayDirectory::load_consolidated_commit_uris( for (auto& uri : commits_dir_uris) { if (stdx::string::ends_with( uri.to_string(), constants::ignore_file_suffix)) { - uint64_t size = resources_.get().vfs().file_size(uri); + uint64_t size = resources_.get().vfs()->file_size(uri); std::string names; names.resize(size); RETURN_NOT_OK_TUPLE( - resources_.get().vfs().read_exactly(uri, 0, &names[0], size), + resources_.get().vfs()->read_exactly(uri, 0, &names[0], size), nullopt, nullopt); std::stringstream ss(names); @@ -751,13 +751,13 @@ ArrayDirectory::load_consolidated_commit_uris( auto& uri = commits_dir_uris[i]; if (stdx::string::ends_with( uri.to_string(), constants::con_commits_file_suffix)) { - uint64_t size = resources_.get().vfs().file_size(uri); + uint64_t size = resources_.get().vfs()->file_size(uri); meta_files.emplace_back(uri, std::string()); auto& names = meta_files.back().second; names.resize(size); RETURN_NOT_OK_TUPLE( - resources_.get().vfs().read_exactly(uri, 0, &names[0], size), + resources_.get().vfs()->read_exactly(uri, 0, &names[0], size), nullopt, nullopt); std::stringstream ss(names); @@ -1082,11 +1082,11 @@ ArrayDirectory::compute_uris_to_vacuum( std::vector to_vacuum_vac_files_vec(vac_files.size(), 0); auto& tp = resources_.get().compute_tp(); auto status = parallel_for(&tp, 0, vac_files.size(), [&](size_t i) { - auto& vfs = resources_.get().vfs(); - uint64_t size = vfs.file_size(vac_files[i]); + auto vfs = resources_.get().vfs(); + uint64_t size = vfs->file_size(vac_files[i]); std::string names; names.resize(size); - throw_if_not_ok(vfs.read_exactly(vac_files[i], 0, &names[0], size)); + throw_if_not_ok(vfs->read_exactly(vac_files[i], 0, &names[0], size)); std::stringstream ss(names); bool vacuum_vac_file = true; for (std::string uri_str; std::getline(ss, uri_str);) { @@ -1195,7 +1195,7 @@ Status ArrayDirectory::compute_array_schema_uris( // dir. // Optionally add the old array schema from the root array folder auto old_schema_uri = uri_.join_path(constants::array_schema_filename); - if (resources_.get().vfs().is_file(old_schema_uri)) { + if (resources_.get().vfs()->is_file(old_schema_uri)) { array_schema_uris_.push_back(old_schema_uri); } } @@ -1305,7 +1305,7 @@ Status ArrayDirectory::is_fragment( } // Versions < 5 - *is_fragment = (int)resources_.get().vfs().is_file( + *is_fragment = (int)resources_.get().vfs()->is_file( uri.join_path(constants::fragment_metadata_filename)); return Status::Ok(); } diff --git a/tiledb/sm/array/test/unit_consistency.cc b/tiledb/sm/array/test/unit_consistency.cc index 2e6467b99f2..d7bcf97fde5 100644 --- a/tiledb/sm/array/test/unit_consistency.cc +++ b/tiledb/sm/array/test/unit_consistency.cc @@ -185,7 +185,7 @@ TEST_CASE( REQUIRE(x.is_open(uri) == false); // Clean up - REQUIRE_NOTHROW(resources.vfs().remove_dir(uri)); + REQUIRE_NOTHROW(resources.vfs()->remove_dir(uri)); } TEST_CASE( @@ -225,7 +225,7 @@ TEST_CASE( // Clean up for (auto uri : uris) { - REQUIRE_NOTHROW(resources.vfs().remove_dir(uri)); + REQUIRE_NOTHROW(resources.vfs()->remove_dir(uri)); } } @@ -280,5 +280,5 @@ TEST_CASE( REQUIRE(array.get()->close().ok()); REQUIRE(x.registry_size() == 0); REQUIRE(x.is_open(uri) == false); - REQUIRE_NOTHROW(resources.vfs().remove_dir(uri)); + REQUIRE_NOTHROW(resources.vfs()->remove_dir(uri)); } diff --git a/tiledb/sm/array_schema/array_schema_operations.cc b/tiledb/sm/array_schema/array_schema_operations.cc index 8bd4997eaaa..9d6cdcdf99c 100644 --- a/tiledb/sm/array_schema/array_schema_operations.cc +++ b/tiledb/sm/array_schema/array_schema_operations.cc @@ -176,16 +176,16 @@ void store_array_schema( resources.stats().add_counter("write_array_schema_size", tile->size()); // Delete file if it exists already - if (resources.vfs().is_file(schema_uri)) { - resources.vfs().remove_file(schema_uri); + if (resources.vfs()->is_file(schema_uri)) { + resources.vfs()->remove_file(schema_uri); } // Check if the array schema directory exists // If not create it, this is caused by a pre-v10 array URI array_schema_dir_uri = array_schema->array_uri().join_path(constants::array_schema_dir_name); - if (!resources.vfs().is_dir(array_schema_dir_uri)) { - resources.vfs().create_dir(array_schema_dir_uri); + if (!resources.vfs()->is_dir(array_schema_dir_uri)) { + resources.vfs()->create_dir(array_schema_dir_uri); } GenericTileIO::store_data(resources, schema_uri, tile, encryption_key); @@ -195,8 +195,8 @@ void store_array_schema( // array created before version 19. URI array_enumerations_dir_uri = array_schema_dir_uri.join_path(constants::array_enumerations_dir_name); - if (!resources.vfs().is_dir(array_enumerations_dir_uri)) { - resources.vfs().create_dir(array_enumerations_dir_uri); + if (!resources.vfs()->is_dir(array_enumerations_dir_uri)) { + resources.vfs()->create_dir(array_enumerations_dir_uri); } // Serialize all enumerations into the `__enumerations` directory diff --git a/tiledb/sm/array_schema/test/unit_current_domain.cc b/tiledb/sm/array_schema/test/unit_current_domain.cc index 4c008bb9017..d910a4cf56f 100644 --- a/tiledb/sm/array_schema/test/unit_current_domain.cc +++ b/tiledb/sm/array_schema/test/unit_current_domain.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2023-2024 TileDB Inc. + * @copyright Copyright (c) 2023-2025 TileDB Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -109,8 +109,8 @@ CurrentDomainFx::~CurrentDomainFx() { template void CurrentDomainFx::rm_array() { - if (ctx_.resources().vfs().is_dir(uri_)) { - ctx_.resources().vfs().remove_dir(uri_); + if (ctx_.resources().vfs()->is_dir(uri_)) { + ctx_.resources().vfs()->remove_dir(uri_); } } diff --git a/tiledb/sm/c_api/tiledb_filestore.cc b/tiledb/sm/c_api/tiledb_filestore.cc index 6ae9429e3bb..3796a918e85 100644 --- a/tiledb/sm/c_api/tiledb_filestore.cc +++ b/tiledb/sm/c_api/tiledb_filestore.cc @@ -97,8 +97,8 @@ int32_t tiledb_filestore_schema_create( if (uri) { // The user provided a uri, let's examine the file and get some insights // Get the file size, calculate a reasonable tile extent - auto& vfs = ctx->resources().vfs(); - uint64_t file_size = vfs.file_size(tiledb::sm::URI(uri)); + auto vfs = ctx->resources().vfs(); + uint64_t file_size = vfs->file_size(tiledb::sm::URI(uri)); if (file_size) { tile_extent = compute_tile_extent_based_on_file_size(file_size); } @@ -191,8 +191,8 @@ int32_t tiledb_filestore_uri_import( tiledb::sm::Context& context = ctx->context(); // Get the file size - auto& vfs = ctx->resources().vfs(); - uint64_t file_size = vfs.file_size(tiledb::sm::URI(file_uri)); + auto vfs = ctx->resources().vfs(); + uint64_t file_size = vfs->file_size(tiledb::sm::URI(file_uri)); if (!file_size) { return TILEDB_OK; // NOOP } @@ -242,7 +242,7 @@ int32_t tiledb_filestore_uri_import( fext.c_str()); // Write the data in batches using the global order writer - if (!vfs.open_file(tiledb::sm::URI(file_uri), tiledb::sm::VFSMode::VFS_READ) + if (!vfs->open_file(tiledb::sm::URI(file_uri), tiledb::sm::VFSMode::VFS_READ) .ok()) { throw api::CAPIException( "Failed to open the file; Invalid file URI or incorrect file " @@ -315,7 +315,7 @@ int32_t tiledb_filestore_uri_import( if (start + buffer.size() > file_size) { readlen = file_size - start; } - throw_if_not_ok(vfs.read_exactly( + throw_if_not_ok(vfs->read_exactly( tiledb::sm::URI(file_uri), start, buffer.data(), readlen)); return readlen; }; @@ -349,7 +349,7 @@ int32_t tiledb_filestore_uri_import( if (start_range < file_size) { // Something must have gone wrong whilst reading the file - throw_if_not_ok(vfs.close_file(tiledb::sm::URI(file_uri))); + throw_if_not_ok(vfs->close_file(tiledb::sm::URI(file_uri))); throw api::CAPIStatusException("Error whilst reading the file"); } @@ -357,7 +357,7 @@ int32_t tiledb_filestore_uri_import( // Dump the fragment on disk throw_if_not_ok(query.finalize()); } - throw_if_not_ok(vfs.close_file(tiledb::sm::URI(file_uri))); + throw_if_not_ok(vfs->close_file(tiledb::sm::URI(file_uri))); throw_if_not_ok(array->close()); @@ -370,8 +370,8 @@ int32_t tiledb_filestore_uri_export( ensure_uri_is_valid(file_uri); tiledb::sm::Context& context = ctx->context(); - auto& vfs = ctx->resources().vfs(); - if (!vfs.open_file(tiledb::sm::URI(file_uri), tiledb::sm::VFSMode::VFS_WRITE) + auto vfs = ctx->resources().vfs(); + if (!vfs->open_file(tiledb::sm::URI(file_uri), tiledb::sm::VFSMode::VFS_WRITE) .ok()) { throw api::CAPIException( "Failed to open the file; Invalid file URI or incorrect file " @@ -447,7 +447,7 @@ int32_t tiledb_filestore_uri_export( } throw_if_not_ok(query.submit()); - vfs.write( + vfs->write( tiledb::sm::URI(file_uri), reinterpret_cast(data.data()), write_size); @@ -456,7 +456,7 @@ int32_t tiledb_filestore_uri_export( end_range = std::min(file_size - 1, end_range + buffer_size); } while (start_range <= end_range); - throw_if_not_ok(vfs.close_file(tiledb::sm::URI(file_uri))); + throw_if_not_ok(vfs->close_file(tiledb::sm::URI(file_uri))); throw_if_not_ok(array->close()); diff --git a/tiledb/sm/consolidator/array_meta_consolidator.cc b/tiledb/sm/consolidator/array_meta_consolidator.cc index ad2c75846ef..01e7c2e8adb 100644 --- a/tiledb/sm/consolidator/array_meta_consolidator.cc +++ b/tiledb/sm/consolidator/array_meta_consolidator.cc @@ -119,8 +119,8 @@ Status ArrayMetaConsolidator::consolidate( throw_if_not_ok(array_for_writes.close()); // Write vacuum file - resources_.vfs().write(vac_uri, data.c_str(), data.size()); - throw_if_not_ok(resources_.vfs().close_file(vac_uri)); + resources_.vfs()->write(vac_uri, data.c_str(), data.size()); + throw_if_not_ok(resources_.vfs()->close_file(vac_uri)); return Status::Ok(); } @@ -132,14 +132,14 @@ void ArrayMetaConsolidator::vacuum(const char* array_name) { } // Get the array metadata URIs and vacuum file URIs to be vacuum - auto& vfs = resources_.vfs(); + auto vfs = resources_.vfs(); auto& compute_tp = resources_.compute_tp(); auto array_dir = ArrayDirectory( resources_, URI(array_name), 0, std::numeric_limits::max()); // Delete the array metadata and vacuum files - vfs.remove_files(&compute_tp, array_dir.array_meta_uris_to_vacuum()); - vfs.remove_files(&compute_tp, array_dir.array_meta_vac_uris_to_vacuum()); + vfs->remove_files(&compute_tp, array_dir.array_meta_uris_to_vacuum()); + vfs->remove_files(&compute_tp, array_dir.array_meta_vac_uris_to_vacuum()); } /* ****************************** */ diff --git a/tiledb/sm/consolidator/commits_consolidator.cc b/tiledb/sm/consolidator/commits_consolidator.cc index 35e7cffa5f1..657f330b355 100644 --- a/tiledb/sm/consolidator/commits_consolidator.cc +++ b/tiledb/sm/consolidator/commits_consolidator.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2022-2024 TileDB, Inc. + * @copyright Copyright (c) 2022-2025 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -116,10 +116,10 @@ void CommitsConsolidator::vacuum(const char* array_name) { ArrayDirectoryMode::COMMITS); // Delete the commits and vacuum files - auto& vfs = resources_.vfs(); + auto vfs = resources_.vfs(); auto& compute_tp = resources_.compute_tp(); - vfs.remove_files(&compute_tp, array_dir.commit_uris_to_vacuum()); - vfs.remove_files( + vfs->remove_files(&compute_tp, array_dir.commit_uris_to_vacuum()); + vfs->remove_files( &compute_tp, array_dir.consolidated_commits_uris_to_vacuum()); } diff --git a/tiledb/sm/consolidator/consolidator.cc b/tiledb/sm/consolidator/consolidator.cc index 1b713b9cec3..3365a7c2432 100644 --- a/tiledb/sm/consolidator/consolidator.cc +++ b/tiledb/sm/consolidator/consolidator.cc @@ -272,7 +272,7 @@ void Consolidator::write_consolidated_commits_file( // the size variable. if (stdx::string::ends_with( uri.to_string(), constants::delete_file_suffix)) { - file_sizes[i] = resources.vfs().file_size(uri); + file_sizes[i] = resources.vfs()->file_size(uri); total_size += file_sizes[i]; total_size += sizeof(storage_size_t); } @@ -293,7 +293,7 @@ void Consolidator::write_consolidated_commits_file( uri.to_string(), constants::delete_file_suffix)) { memcpy(&data[file_index], &file_sizes[i], sizeof(storage_size_t)); file_index += sizeof(storage_size_t); - throw_if_not_ok(resources.vfs().read_exactly( + throw_if_not_ok(resources.vfs()->read_exactly( uri, 0, &data[file_index], file_sizes[i])); file_index += file_sizes[i]; } @@ -303,8 +303,8 @@ void Consolidator::write_consolidated_commits_file( URI consolidated_commits_uri = array_dir.get_commits_dir(write_version) .join_path(name + constants::con_commits_file_suffix); - resources.vfs().write(consolidated_commits_uri, data.data(), data.size()); - throw_if_not_ok(resources.vfs().close_file(consolidated_commits_uri)); + resources.vfs()->write(consolidated_commits_uri, data.data(), data.size()); + throw_if_not_ok(resources.vfs()->close_file(consolidated_commits_uri)); } void Consolidator::array_vacuum( diff --git a/tiledb/sm/consolidator/fragment_consolidator.cc b/tiledb/sm/consolidator/fragment_consolidator.cc index 51b384483c2..b7dae11a3b7 100644 --- a/tiledb/sm/consolidator/fragment_consolidator.cc +++ b/tiledb/sm/consolidator/fragment_consolidator.cc @@ -522,24 +522,24 @@ void FragmentConsolidator::vacuum(const char* array_name) { } // Delete fragment directories - auto& vfs = resources_.vfs(); + auto vfs = resources_.vfs(); auto& compute_tp = resources_.compute_tp(); throw_if_not_ok(parallel_for( &compute_tp, 0, fragment_uris_to_vacuum.size(), [&](size_t i) { // Remove the commit file, if present. auto commit_uri = array_dir.get_commit_uri(fragment_uris_to_vacuum[i]); - if (vfs.is_file(commit_uri)) { - vfs.remove_file(commit_uri); + if (vfs->is_file(commit_uri)) { + vfs->remove_file(commit_uri); } - if (vfs.is_dir(fragment_uris_to_vacuum[i])) { - vfs.remove_dir(fragment_uris_to_vacuum[i]); + if (vfs->is_dir(fragment_uris_to_vacuum[i])) { + vfs->remove_dir(fragment_uris_to_vacuum[i]); } return Status::Ok(); })); // Delete the vacuum files. - vfs.remove_files( + vfs->remove_files( &compute_tp, filtered_fragment_uris.fragment_vac_uris_to_vacuum()); } @@ -664,8 +664,8 @@ Status FragmentConsolidator::consolidate_internal( // Finalize write query auto st = query_w->finalize(); if (!st.ok()) { - if (resources_.vfs().is_dir(*new_fragment_uri)) - resources_.vfs().remove_dir(*new_fragment_uri); + if (resources_.vfs()->is_dir(*new_fragment_uri)) + resources_.vfs()->remove_dir(*new_fragment_uri); return st; } @@ -676,8 +676,8 @@ Status FragmentConsolidator::consolidate_internal( vac_uri, to_consolidate); if (!st.ok()) { - if (resources_.vfs().is_dir(*new_fragment_uri)) - resources_.vfs().remove_dir(*new_fragment_uri); + if (resources_.vfs()->is_dir(*new_fragment_uri)) + resources_.vfs()->remove_dir(*new_fragment_uri); return st; } @@ -1084,8 +1084,8 @@ Status FragmentConsolidator::write_vacuum_file( } auto data = ss.str(); - resources_.vfs().write(vac_uri, data.c_str(), data.size()); - throw_if_not_ok(resources_.vfs().close_file(vac_uri)); + resources_.vfs()->write(vac_uri, data.c_str(), data.size()); + throw_if_not_ok(resources_.vfs()->close_file(vac_uri)); return Status::Ok(); } diff --git a/tiledb/sm/consolidator/fragment_meta_consolidator.cc b/tiledb/sm/consolidator/fragment_meta_consolidator.cc index 6374f94ed69..2646640d5d2 100644 --- a/tiledb/sm/consolidator/fragment_meta_consolidator.cc +++ b/tiledb/sm/consolidator/fragment_meta_consolidator.cc @@ -96,7 +96,7 @@ Status FragmentMetaConsolidator::consolidate( first, last, write_version); auto frag_md_uri = array_dir.get_fragment_metadata_dir(write_version); - resources_.vfs().create_dir(frag_md_uri); + resources_.vfs()->create_dir(frag_md_uri); uri = URI(frag_md_uri.to_string() + name + constants::meta_file_suffix); // Get the consolidated fragment metadata version @@ -173,7 +173,7 @@ Status FragmentMetaConsolidator::consolidate( GenericTileIO tile_io(resources_, uri); [[maybe_unused]] uint64_t nbytes = 0; tile_io.write_generic(tile, enc_key, &nbytes); - throw_if_not_ok(resources_.vfs().close_file(uri)); + throw_if_not_ok(resources_.vfs()->close_file(uri)); return Status::Ok(); } @@ -200,7 +200,7 @@ void FragmentMetaConsolidator::vacuum(const char* array_name) { } // Vacuum - auto& vfs = resources_.vfs(); + auto vfs = resources_.vfs(); auto& compute_tp = resources_.compute_tp(); throw_if_not_ok( parallel_for(&compute_tp, 0, fragment_meta_uris.size(), [&](size_t i) { @@ -208,7 +208,7 @@ void FragmentMetaConsolidator::vacuum(const char* array_name) { FragmentID fragment_id{uri}; auto timestamp_range{fragment_id.timestamp_range()}; if (timestamp_range.second != t_latest) { - vfs.remove_file(uri); + vfs->remove_file(uri); } return Status::Ok(); })); diff --git a/tiledb/sm/consolidator/group_meta_consolidator.cc b/tiledb/sm/consolidator/group_meta_consolidator.cc index 79ef2a782b6..9ea8832e252 100644 --- a/tiledb/sm/consolidator/group_meta_consolidator.cc +++ b/tiledb/sm/consolidator/group_meta_consolidator.cc @@ -107,8 +107,8 @@ Status GroupMetaConsolidator::consolidate( group_for_writes.close(); // Write vacuum file - resources_.vfs().write(vac_uri, data.c_str(), data.size()); - throw_if_not_ok(resources_.vfs().close_file(vac_uri)); + resources_.vfs()->write(vac_uri, data.c_str(), data.size()); + throw_if_not_ok(resources_.vfs()->close_file(vac_uri)); return Status::Ok(); } @@ -120,18 +120,18 @@ void GroupMetaConsolidator::vacuum(const char* group_name) { } // Get the group metadata URIs and vacuum file URIs to be vacuumed - auto& vfs = resources_.vfs(); + auto vfs = resources_.vfs(); auto& compute_tp = resources_.compute_tp(); GroupDirectory group_dir( - vfs, + *vfs.get(), compute_tp, URI(group_name), 0, std::numeric_limits::max()); // Delete the group metadata and vacuum files - vfs.remove_files(&compute_tp, group_dir.group_meta_uris_to_vacuum()); - vfs.remove_files(&compute_tp, group_dir.group_meta_vac_uris_to_vacuum()); + vfs->remove_files(&compute_tp, group_dir.group_meta_uris_to_vacuum()); + vfs->remove_files(&compute_tp, group_dir.group_meta_vac_uris_to_vacuum()); } /* ****************************** */ diff --git a/tiledb/sm/enums/filesystem.h b/tiledb/sm/enums/filesystem.h index 18391407594..d19b9a56d24 100644 --- a/tiledb/sm/enums/filesystem.h +++ b/tiledb/sm/enums/filesystem.h @@ -36,8 +36,7 @@ #include "tiledb/sm/misc/constants.h" -namespace tiledb { -namespace sm { +namespace tiledb::sm { /** Defines a filesystem. */ enum class Filesystem : uint8_t { @@ -59,6 +58,8 @@ inline const std::string& filesystem_str(Filesystem filesystem_type) { return constants::filesystem_type_gcs_str; case Filesystem::MEMFS: return constants::filesystem_type_mem_str; + case Filesystem::LOCAL: + return constants::filesystem_type_local_str; default: return constants::empty_str; } @@ -77,13 +78,14 @@ inline Status filesystem_enum( *filesystem_type = Filesystem::GCS; else if (filesystem_type_str == constants::filesystem_type_mem_str) *filesystem_type = Filesystem::MEMFS; + else if (filesystem_type_str == constants::filesystem_type_local_str) + *filesystem_type = Filesystem::LOCAL; else return Status_Error("Invalid Filesystem " + filesystem_type_str); return Status::Ok(); } -} // namespace sm -} // namespace tiledb +} // namespace tiledb::sm #endif // TILEDB_FILESYSTEM_H diff --git a/tiledb/sm/filesystem/filesystem_base.cc b/tiledb/sm/filesystem/filesystem_base.cc index db7fec73ca3..1ffafb86f8c 100644 --- a/tiledb/sm/filesystem/filesystem_base.cc +++ b/tiledb/sm/filesystem/filesystem_base.cc @@ -47,6 +47,10 @@ LsObjects FilesystemBase::ls_filtered_v2( throw UnsupportedOperation("ls_filtered"); } +void FilesystemBase::remove_dir_if_empty(const URI&) const { + throw UnsupportedOperation("remove_dir_if_empty"); +} + void FilesystemBase::move_file(const URI&, const URI&) const { throw UnsupportedOperation("move_file"); } @@ -71,6 +75,20 @@ void FilesystemBase::sync(const URI&) const { throw UnsupportedOperation("sync"); } +void FilesystemBase::set_multipart_upload_state( + const URI&, const MultiPartUploadState&) { + throw UnsupportedOperation("set_multipart_upload_state"); +} + +std::optional +FilesystemBase::multipart_upload_state(const URI&) { + throw UnsupportedOperation("multipart_upload_state"); +} + +void FilesystemBase::flush_multipart_file_buffer(const URI&) { + throw UnsupportedOperation("flush_multipart_file_buffer"); +} + bool FilesystemBase::is_bucket(const URI&) const { throw UnsupportedOperation("is_bucket"); } diff --git a/tiledb/sm/filesystem/filesystem_base.h b/tiledb/sm/filesystem/filesystem_base.h index 20f821828e9..544a42f46ca 100644 --- a/tiledb/sm/filesystem/filesystem_base.h +++ b/tiledb/sm/filesystem/filesystem_base.h @@ -72,10 +72,53 @@ class UnsupportedURI : public FilesystemException { class FilesystemBase { public: + /* ********************************* */ + /* CONSTRUCTORS & DESTRUCTORS */ + /* ********************************* */ FilesystemBase() = default; virtual ~FilesystemBase() = default; + /* ********************************* */ + /* TYPE DEFINITIONS */ + /* ********************************* */ + struct BufferedChunk { + std::string uri; + uint64_t size; + + BufferedChunk() + : uri("") + , size(0) { + } + BufferedChunk(std::string chunk_uri, uint64_t chunk_size) + : uri(chunk_uri) + , size(chunk_size) { + } + }; + + /** + * Multipart upload state definition used in the serialization of remote + * global order writes. This state is a generalization of + * the multipart upload state types currently defined independently by each + * backend implementation. + */ + struct MultiPartUploadState { + struct CompletedParts { + optional e_tag; + uint64_t part_number; + }; + + uint64_t part_number; + optional upload_id; + optional> buffered_chunks; + std::vector completed_parts; + Status status; + }; + + /* ********************************* */ + /* API */ + /* ********************************* */ + /** * Checks if the filesystem supports the given URI. * @@ -128,6 +171,15 @@ class FilesystemBase { */ virtual void remove_dir(const URI& uri) const = 0; + /** + * Removes a given empty directory. + * + * @invariant Currently valid only for local filesystems. + * + * @param path The path of the directory. + */ + virtual void remove_dir_if_empty(const URI& uri) const; + /** * Deletes a file. * @@ -256,6 +308,41 @@ class FilesystemBase { */ virtual void sync(const URI& uri) const; + /** + * Used in serialization of global order writes to set the multipart upload + * state in the internal maps of cloud backends during deserialization. + * + * @invariant Currently valid only for S3 filesystems. + * + * @param uri The file uri used as key in the internal map of the backend. + * @param state The multipart upload state info. + */ + virtual void set_multipart_upload_state( + const URI& uri, const MultiPartUploadState& state); + + /** + * Used in serialization to share the multipart upload state among cloud + * executors during global order writes. + * + * @invariant Currently valid only for S3 filesystems. + * + * @param uri The file uri used as key in the internal map of the backend. + * @return A MultiPartUploadState object. + */ + virtual std::optional multipart_upload_state( + const URI& uri); + + /** + * Used in remote global order writes to flush the internal + * in-memory buffer for an URI that backends maintain to modulate the + * frequency of multipart upload requests. + * + * @invariant Currently valid only for S3 filesystems. + * + * @param uri The file uri identifying the backend file buffer. + */ + virtual void flush_multipart_file_buffer(const URI& uri); + /** * Writes the contents of a buffer into a file. * diff --git a/tiledb/sm/filesystem/local.h b/tiledb/sm/filesystem/local.h index c686cc7a837..b2b87fdab31 100644 --- a/tiledb/sm/filesystem/local.h +++ b/tiledb/sm/filesystem/local.h @@ -57,6 +57,17 @@ class LocalFilesystem : public FilesystemBase { void copy_dir(const URI& old_uri, const URI& new_uri) override; + void set_multipart_upload_state( + const URI&, const MultiPartUploadState&) override { + // No-op for local filesystems. + } + + std::optional multipart_upload_state( + const URI&) override { + // No-op for local filesystems. + return {}; + } + protected: /** * Creates the containing directories of a path if they do not exist. diff --git a/tiledb/sm/filesystem/posix.cc b/tiledb/sm/filesystem/posix.cc index c29aa26d86b..33a50160d89 100644 --- a/tiledb/sm/filesystem/posix.cc +++ b/tiledb/sm/filesystem/posix.cc @@ -191,16 +191,16 @@ void Posix::remove_dir(const URI& uri) const { } } -bool Posix::remove_dir_if_empty(const std::string& path) const { +void Posix::remove_dir_if_empty(const URI& uri) const { + auto path = uri.to_path(); if (rmdir(path.c_str()) != 0) { if (errno == ENOTEMPTY) { - return false; + return; } throw IOError( std::string("Failed to delete path '") + path + "'; " + strerror(errno)); } - return true; } void Posix::remove_file(const URI& uri) const { diff --git a/tiledb/sm/filesystem/posix.h b/tiledb/sm/filesystem/posix.h index bddb4b84284..f495a5818b6 100644 --- a/tiledb/sm/filesystem/posix.h +++ b/tiledb/sm/filesystem/posix.h @@ -135,9 +135,8 @@ class Posix : public LocalFilesystem { * Removes a given empty directory. * * @param path The path of the directory. - * @return true if the directory was removed, false otherwise. */ - bool remove_dir_if_empty(const std::string& path) const; + void remove_dir_if_empty(const URI& path) const override; /** * Removes a given path. diff --git a/tiledb/sm/filesystem/s3.cc b/tiledb/sm/filesystem/s3.cc index a6a42477980..4d17ee90dd8 100644 --- a/tiledb/sm/filesystem/s3.cc +++ b/tiledb/sm/filesystem/s3.cc @@ -704,6 +704,63 @@ void S3::remove_file(const URI& uri) const { delete_object_request.GetBucket(), delete_object_request.GetKey())); } +void S3::set_multipart_upload_state( + const URI& uri, const FilesystemBase::MultiPartUploadState& state) { + S3::MultiPartUploadState s3_state; + s3_state.part_number = state.part_number; + s3_state.upload_id = *state.upload_id; + s3_state.st = state.status; + for (auto& part : state.completed_parts) { + auto rv = s3_state.completed_parts.try_emplace(part.part_number); + rv.first->second.SetETag(part.e_tag->c_str()); + rv.first->second.SetPartNumber(part.part_number); + } + + if (state.buffered_chunks.has_value()) { + for (auto& chunk : *state.buffered_chunks) { + // Chunk URI gets reconstructed from the serialized chunk name + // and the real attribute uri + s3_state.buffered_chunks.emplace_back( + generate_chunk_uri(uri, chunk.uri).to_string(), chunk.size); + } + } + + set_multipart_upload_state_internal(uri.to_string(), s3_state); +} + +std::optional S3::multipart_upload_state( + const URI& uri) { + auto state_fetched = multipart_upload_state_internal(uri); + if (!state_fetched.has_value()) { + return nullopt; + } + FilesystemBase::MultiPartUploadState state; + state.upload_id = state_fetched->upload_id; + state.part_number = state_fetched->part_number; + state.status = state_fetched->st; + auto& completed_parts = state_fetched->completed_parts; + for (auto& entry : completed_parts) { + state.completed_parts.emplace_back(); + state.completed_parts.back().e_tag = entry.second.GetETag(); + state.completed_parts.back().part_number = entry.second.GetPartNumber(); + } + if (!state_fetched->buffered_chunks.empty()) { + state.buffered_chunks.emplace(); + for (auto& chunk : state_fetched->buffered_chunks) { + state.buffered_chunks->emplace_back( + URI(chunk.uri).remove_trailing_slash().last_path_part(), chunk.size); + } + } + return state; +} + +void S3::flush_multipart_file_buffer(const URI& uri) { + Buffer* buff = nullptr; + throw_if_not_ok(get_file_buffer(uri, &buff)); + global_order_write(uri, buff->data(), buff->size()); + buff->reset_size(); +} + std::vector S3::ls_with_sizes(const URI& parent) const { return ls_with_sizes(parent, "/", -1); } @@ -2024,7 +2081,7 @@ Status S3::get_make_upload_part_req( return Status::Ok(); } -Status S3::set_multipart_upload_state( +void S3::set_multipart_upload_state_internal( const std::string& uri, MultiPartUploadState& state) { Aws::Http::URI aws_uri(uri.c_str()); std::string uri_path(aws_uri.GetPath()); @@ -2034,11 +2091,9 @@ Status S3::set_multipart_upload_state( UniqueWriteLock unique_wl(&multipart_upload_rwlock_); multipart_upload_states_[uri_path] = state; - - return Status::Ok(); } -std::optional S3::multipart_upload_state( +std::optional S3::multipart_upload_state_internal( const URI& uri) { const Aws::Http::URI aws_uri(uri.c_str()); const std::string uri_path(aws_uri.GetPath()); diff --git a/tiledb/sm/filesystem/s3.h b/tiledb/sm/filesystem/s3.h index 34798549064..119a5133b54 100644 --- a/tiledb/sm/filesystem/s3.h +++ b/tiledb/sm/filesystem/s3.h @@ -738,6 +738,37 @@ class S3 : public FilesystemBase { void sync(const URI&) const override { // No-op for S3. } + + /** + * Used in serialization of global order writes to set the multipart upload + * state in the internal maps of cloud backends during deserialization. + * + * @param uri The file uri used as key in the internal map of the backend. + * @param state The multipart upload state info. + */ + void set_multipart_upload_state( + const URI& uri, + const FilesystemBase::MultiPartUploadState& state) override; + + /** + * Used in serialization to share the multipart upload state among cloud + * executors during global order writes. + * + * @param uri The file uri used as key in the internal map of the backend. + * @return A MultiPartUploadState object. + */ + std::optional multipart_upload_state( + const URI& uri) override; + + /** + * Used in remote global order writes to flush the internal + * in-memory buffer for an URI that backends maintain to modulate the + * frequency of multipart upload requests. + * + * @param uri The file uri identifying the backend file buffer. + */ + void flush_multipart_file_buffer(const URI& uri) override; + /** * Retrieves all the entries contained in the parent. * @@ -1560,9 +1591,8 @@ class S3 : public FilesystemBase { * * @param uri The file uri used as key in the internal map * @param state The multipart upload state info - * @return Status */ - Status set_multipart_upload_state( + void set_multipart_upload_state_internal( const std::string& uri, S3::MultiPartUploadState& state); /** @@ -1571,7 +1601,7 @@ class S3 : public FilesystemBase { * @param uri The URI of the multipart state * @return an optional MultiPartUploadState object */ - std::optional multipart_upload_state( + std::optional multipart_upload_state_internal( const URI& uri); /** diff --git a/tiledb/sm/filesystem/test/CMakeLists.txt b/tiledb/sm/filesystem/test/CMakeLists.txt index f41457f6231..a52db986375 100644 --- a/tiledb/sm/filesystem/test/CMakeLists.txt +++ b/tiledb/sm/filesystem/test/CMakeLists.txt @@ -3,7 +3,7 @@ # # The MIT License # -# Copyright (c) 2021-2022 TileDB, Inc. +# Copyright (c) 2021-2025 TileDB, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -26,8 +26,13 @@ include(unit_test) commence(unit_test vfs) - this_target_object_libraries(vfs) - this_target_sources(main.cc unit_uri.cc unit_ls_filtered.cc) + this_target_object_libraries(context_resources) + this_target_sources(main.cc unit_uri.cc) +conclude(unit_test) + +commence(unit_test vfs_ls_filtered) + this_target_object_libraries(context_resources) + this_target_sources(main.cc unit_ls_filtered.cc) conclude(unit_test) commence(unit_test vfs_read_log_modes) diff --git a/tiledb/sm/filesystem/test/compile_vfs_main.cc b/tiledb/sm/filesystem/test/compile_vfs_main.cc index a96833eaf25..991ffcfc80f 100644 --- a/tiledb/sm/filesystem/test/compile_vfs_main.cc +++ b/tiledb/sm/filesystem/test/compile_vfs_main.cc @@ -29,11 +29,15 @@ #include "../vfs.h" #include "tiledb/common/logger.h" +using namespace tiledb::sm; + int main() { - static tiledb::sm::stats::Stats stats("test"); + static stats::Stats stats("test"); static tiledb::common::Logger logger("test"); ThreadPool compute_tp(4); ThreadPool io_tp(4); - tiledb::sm::VFS x{&stats, &logger, &compute_tp, &io_tp, tiledb::sm::Config{}}; + std::vector> fses; + fses.emplace_back(std::make_unique()); + VFS x{&stats, &logger, &compute_tp, &io_tp, Config{}, std::move(fses)}; return 0; } diff --git a/tiledb/sm/filesystem/test/unit_ls_filtered.cc b/tiledb/sm/filesystem/test/unit_ls_filtered.cc index 2ed192bf54d..afbbe526611 100644 --- a/tiledb/sm/filesystem/test/unit_ls_filtered.cc +++ b/tiledb/sm/filesystem/test/unit_ls_filtered.cc @@ -35,6 +35,7 @@ #include "tiledb/common/logger.h" #include "tiledb/sm/config/config.h" #include "tiledb/sm/filesystem/vfs.h" +#include "tiledb/sm/storage_manager/context_resources.h" namespace tiledb::sm { /** @@ -63,7 +64,14 @@ class VFSTest { , logger_("unit_ls_filtered") , io_(4) , compute_(4) - , vfs_(&stats_, &logger_, &io_, &compute_, tiledb::sm::Config()) + , vfs_( + &stats_, + &logger_, + &io_, + &compute_, + tiledb::sm::Config(), + tiledb::sm::ContextResources::make_filesystems( + &stats_, &io_, tiledb::sm::Config())) , test_tree_(test_tree) , prefix_(prefix) , temp_dir_(prefix_) diff --git a/tiledb/sm/filesystem/test/unit_vfs_read_log_modes.cc b/tiledb/sm/filesystem/test/unit_vfs_read_log_modes.cc index 1337ef0f5a3..d7f7a055a69 100644 --- a/tiledb/sm/filesystem/test/unit_vfs_read_log_modes.cc +++ b/tiledb/sm/filesystem/test/unit_vfs_read_log_modes.cc @@ -75,7 +75,7 @@ TEST_CASE("VFS Read Log Modes", "[vfs][read-logging-modes]") { for (auto& uri : uris_to_read) { // None of these files exist, so we expect every read to fail. REQUIRE_THROWS( - throw_if_not_ok(res.vfs().read_exactly(URI(uri), 123, buffer, 456))); + throw_if_not_ok(res.vfs()->read_exactly(URI(uri), 123, buffer, 456))); } } } diff --git a/tiledb/sm/filesystem/vfs.cc b/tiledb/sm/filesystem/vfs.cc index dd76414bea9..2db5be17e34 100644 --- a/tiledb/sm/filesystem/vfs.cc +++ b/tiledb/sm/filesystem/vfs.cc @@ -64,13 +64,12 @@ VFS::VFS( Logger* const logger, ThreadPool* const compute_tp, ThreadPool* const io_tp, - const Config& config) + const Config& config, + std::vector>&& filesystems) : VFSBase(parent_stats) - , Azure_within_VFS(io_tp, config) - , GCS_within_VFS(io_tp, config) - , S3_within_VFS(stats_, io_tp, config) , config_(config) , logger_(logger) + , filesystems_(std::move(filesystems)) , compute_tp_(compute_tp) , io_tp_(io_tp) , vfs_params_(VFSParameters(config)) { @@ -81,22 +80,6 @@ VFS::VFS( // Construct the read-ahead cache. read_ahead_cache_ = tdb_unique_ptr( tdb_new(ReadAheadCache, vfs_params_.read_ahead_cache_size_)); - - if constexpr (s3_enabled) { - supported_fs_.insert(Filesystem::S3); - } - -#ifdef HAVE_AZURE - supported_fs_.insert(Filesystem::AZURE); -#endif - -#ifdef HAVE_GCS - supported_fs_.insert(Filesystem::GCS); -#endif - - local_ = LocalFS(config_); - - supported_fs_.insert(Filesystem::MEMFS); } /* ********************************* */ @@ -104,32 +87,10 @@ VFS::VFS( /* ********************************* */ const FilesystemBase& VFS::get_fs(const URI& uri) const { - if (uri.is_file()) { - return local_; - } - if (uri.is_s3()) { -#ifdef HAVE_S3 - return s3(); -#else - throw BuiltWithout("S3"); -#endif - } - if (uri.is_azure()) { -#ifdef HAVE_AZURE - return azure(); -#else - throw BuiltWithout("Azure"); -#endif - } - if (uri.is_gcs()) { -#ifdef HAVE_GCS - return gcs(); -#else - throw BuiltWithout("GCS"); -#endif - } - if (uri.is_memfs()) { - return memfs_; + for (auto& fs : filesystems_) { + if (fs->supports_uri(uri)) { + return *fs; + } } throw UnsupportedURI(uri.to_string()); } @@ -243,10 +204,7 @@ void VFS::remove_dir(const URI& uri) const { } void VFS::remove_dir_if_empty(const URI& uri) const { - if (uri.is_file()) { - local_.remove_dir_if_empty(uri.to_path()); - } - // Object stores do not have directories. + get_fs(uri).remove_dir_if_empty(uri); } void VFS::remove_dirs( @@ -655,20 +613,37 @@ bool VFS::supports_uri(const URI& uri) const { return get_fs(uri).supports_uri(uri); } +bool VFS::supports_fs(FilesystemBase& filesystem) const { + for (auto& fs : filesystems_) { + if (fs.get() == &filesystem) { + return true; + } + } + return false; +} + bool VFS::supports_fs(Filesystem fs) const { - return (supported_fs_.find(fs) != supported_fs_.end()); + std::string path = ""; + switch (static_cast(fs)) { + case tiledb::sm::Filesystem::HDFS: + path = "hdfs://"; + case tiledb::sm::Filesystem::S3: + path = "s3://"; + case tiledb::sm::Filesystem::AZURE: + path = "azure://"; + case tiledb::sm::Filesystem::GCS: + path = "gcs://"; + case tiledb::sm::Filesystem::MEMFS: + path = "mem://"; + case tiledb::sm::Filesystem::LOCAL: + path = "file://"; + } + + return supports_uri(tiledb::sm::URI(path)); } bool VFS::supports_uri_scheme(const URI& uri) const { - if (uri.is_s3()) { - return supports_fs(Filesystem::S3); - } else if (uri.is_azure()) { - return supports_fs(Filesystem::AZURE); - } else if (uri.is_gcs()) { - return supports_fs(Filesystem::GCS); - } else { - return true; - } + return get_fs(uri).supports_uri(uri); } void VFS::sync(const URI& uri) const { @@ -689,32 +664,10 @@ Status VFS::open_file(const URI& uri, VFSMode mode) { remove_file(uri); break; case VFSMode::VFS_APPEND: - if (uri.is_s3()) { - if constexpr (s3_enabled) { - throw VFSException( - "Cannot open file '" + uri.to_string() + - "'; S3 does not support append mode"); - } else { - throw BuiltWithout("S3"); - } - } - if (uri.is_azure()) { - if constexpr (azure_enabled) { - throw VFSException( - "Cannot open file '" + uri.to_string() + - "'; Azure does not support append mode"); - } else { - throw BuiltWithout("Azure"); - } - } - if (uri.is_gcs()) { - if constexpr (gcs_enabled) { - throw VFSException( - "Cannot open file '" + uri.to_string() + - "'; GCS does not support append mode"); - } else { - throw BuiltWithout("GCS"); - } + if (!is_file) { + throw VFSException( + "Cannot open file '" + uri.to_string() + + "'; Object store does not support append mode"); } break; } @@ -743,120 +696,20 @@ void VFS::write( get_fs(uri).write(uri, buffer, buffer_size, remote_global_order_write); } -std::pair> -VFS::multipart_upload_state(const URI& uri) { - if (uri.is_file()) { - return {Status::Ok(), {}}; - } else if (uri.is_s3()) { -#ifdef HAVE_S3 - VFS::MultiPartUploadState state; - auto s3_state = s3().multipart_upload_state(uri); - if (!s3_state.has_value()) { - return {Status::Ok(), nullopt}; - } - state.upload_id = s3_state->upload_id; - state.part_number = s3_state->part_number; - state.status = s3_state->st; - auto& completed_parts = s3_state->completed_parts; - for (auto& entry : completed_parts) { - state.completed_parts.emplace_back(); - state.completed_parts.back().e_tag = entry.second.GetETag(); - state.completed_parts.back().part_number = entry.second.GetPartNumber(); - } - if (!s3_state->buffered_chunks.empty()) { - state.buffered_chunks.emplace(); - for (auto& chunk : s3_state->buffered_chunks) { - state.buffered_chunks->emplace_back( - URI(chunk.uri).remove_trailing_slash().last_path_part(), - chunk.size); - } - } - - return {Status::Ok(), state}; -#else - return {Status_VFSError("TileDB was built without S3 support"), nullopt}; -#endif - } else if (uri.is_azure()) { - if constexpr (azure_enabled) { - return {Status_VFSError("Not yet supported for Azure"), nullopt}; - } else { - return { - Status_VFSError("TileDB was built without Azure support"), nullopt}; - } - } else if (uri.is_gcs()) { - if constexpr (gcs_enabled) { - return {Status_VFSError("Not yet supported for GCS"), nullopt}; - } else { - return {Status_VFSError("TileDB was built without GCS support"), nullopt}; - } - } - - return { - Status_VFSError("Unsupported URI schemes: " + uri.to_string()), nullopt}; +std::optional VFS::multipart_upload_state( + const URI& uri) { + return get_fs(uri).multipart_upload_state(uri); } -Status VFS::set_multipart_upload_state( - const URI& uri, [[maybe_unused]] const MultiPartUploadState& state) { - if (uri.is_file()) { - return Status::Ok(); - } else if (uri.is_s3()) { -#ifdef HAVE_S3 - S3::MultiPartUploadState s3_state; - s3_state.part_number = state.part_number; - s3_state.upload_id = *state.upload_id; - s3_state.st = state.status; - for (auto& part : state.completed_parts) { - auto rv = s3_state.completed_parts.try_emplace(part.part_number); - rv.first->second.SetETag(part.e_tag->c_str()); - rv.first->second.SetPartNumber(part.part_number); - } - - if (state.buffered_chunks.has_value()) { - for (auto& chunk : *state.buffered_chunks) { - // Chunk URI gets reconstructed from the serialized chunk name - // and the real attribute uri - s3_state.buffered_chunks.emplace_back( - s3().generate_chunk_uri(uri, chunk.uri).to_string(), chunk.size); - } - } - - return s3().set_multipart_upload_state(uri.to_string(), s3_state); -#else - throw BuiltWithout("S3"); -#endif - } else if (uri.is_azure()) { - if constexpr (azure_enabled) { - throw VFSException("Not yet supported for Azure"); - } else { - throw BuiltWithout("Azure"); - } - } else if (uri.is_gcs()) { - if constexpr (gcs_enabled) { - throw VFSException("Not yet supported for GCS"); - } else { - throw BuiltWithout("GCS"); - } - } - - throw UnsupportedURI(uri.to_string()); +void VFS::set_multipart_upload_state( + const URI& uri, const MultiPartUploadState& state) { + get_fs(uri).set_multipart_upload_state(uri, state); } -Status VFS::flush_multipart_file_buffer(const URI& uri) { +void VFS::flush_multipart_file_buffer(const URI& uri) { auto instrument = make_log_duration_instrument(uri, "flush_multipart_file_buffer"); - if (uri.is_s3()) { -#ifdef HAVE_S3 - Buffer* buff = nullptr; - throw_if_not_ok(s3().get_file_buffer(uri, &buff)); - s3().global_order_write(uri, buff->data(), buff->size()); - buff->reset_size(); - -#else - throw BuiltWithout("S3"); -#endif - } - - return Status::Ok(); + get_fs(uri).flush_multipart_file_buffer(uri); } void VFS::log_read(const URI& uri, uint64_t offset, uint64_t nbytes) { diff --git a/tiledb/sm/filesystem/vfs.h b/tiledb/sm/filesystem/vfs.h index 62812ea1f40..144738ca06c 100644 --- a/tiledb/sm/filesystem/vfs.h +++ b/tiledb/sm/filesystem/vfs.h @@ -227,164 +227,26 @@ struct VFSBase { stats::Stats* stats_; }; -/** The Azure filesystem. */ -#ifdef HAVE_AZURE -class Azure_within_VFS { - /** Private member variable */ - Azure azure_; - - protected: - template - Azure_within_VFS(Args&&... args) - : azure_(std::forward(args)...) { - } - - /** Protected accessor for the Azure object. */ - inline Azure& azure() { - return azure_; - } - - /** Protected accessor for the const Azure object. */ - inline const Azure& azure() const { - return azure_; - } -}; -#else -class Azure_within_VFS { - protected: - template - Azure_within_VFS(Args&&...) { - } // empty constructor -}; -#endif - -/** The GCS filesystem. */ -#ifdef HAVE_GCS -class GCS_within_VFS { - /** Private member variable */ - GCS gcs_; - - protected: - template - GCS_within_VFS(Args&&... args) - : gcs_(std::forward(args)...) { - } - - /** Protected accessor for the GCS object. */ - inline GCS& gcs() { - return gcs_; - } - - /** Protected accessor for the const GCS object. */ - inline const GCS& gcs() const { - return gcs_; - } -}; -#else -class GCS_within_VFS { - protected: - template - GCS_within_VFS(Args&&...) { - } // empty constructor -}; -#endif - -/** The S3 filesystem. */ -#ifdef HAVE_S3 -class S3_within_VFS { - /** Private member variable */ - tdb_unique_ptr s3_; - - protected: - template - S3_within_VFS(Args&&... args) - : s3_(tdb_unique_ptr(tdb_new(S3, std::forward(args)...))) { - } - - ~S3_within_VFS() = default; - - /** Protected accessor for the S3 object. */ - inline S3& s3() { - return *s3_; - } - - /** Protected accessor for the const S3 object. */ - inline const S3& s3() const { - return *s3_; - } -}; -#else -class S3_within_VFS { - protected: - template - S3_within_VFS(Args&&...) { - } // empty constructor -}; -#endif - /** * This class implements a virtual filesystem that directs filesystem-related * function execution to the appropriate backend based on the input URI. */ -class VFS : FilesystemBase, - private VFSBase, - protected Azure_within_VFS, - GCS_within_VFS, - S3_within_VFS { +class VFS : FilesystemBase, private VFSBase { public: - /* ********************************* */ - /* TYPE DEFINITIONS */ - /* ********************************* */ - - struct BufferedChunk { - std::string uri; - uint64_t size; - - BufferedChunk() - : uri("") - , size(0) { - } - BufferedChunk(std::string chunk_uri, uint64_t chunk_size) - : uri(chunk_uri) - , size(chunk_size) { - } - }; - - /** - * Multipart upload state definition used in the serialization of remote - * global order writes. This state is a generalization of - * the multipart upload state types currently defined independently by each - * backend implementation. - */ - struct MultiPartUploadState { - struct CompletedParts { - optional e_tag; - uint64_t part_number; - }; - - uint64_t part_number; - optional upload_id; - optional> buffered_chunks; - std::vector completed_parts; - Status status; - }; - - /* ********************************* */ - /* CONSTRUCTORS & DESTRUCTORS */ - /* ********************************* */ - /** Constructor. * @param parent_stats The parent stats to inherit from. * @param logger The logger to use. Optional, can be nullptr. * @param compute_tp Thread pool for compute-bound tasks. * @param io_tp Thread pool for io-bound tasks. * @param config Configuration parameters. + * @param filesystems The supported filesystems. **/ VFS(stats::Stats* parent_stats, Logger* logger, ThreadPool* compute_tp, ThreadPool* io_tp, - const Config& config); + const Config& config, + std::vector>&& filesystems); /** Destructor. */ ~VFS() = default; @@ -506,7 +368,7 @@ class VFS : FilesystemBase, * * @param uri The uri of the directory to be removed */ - void remove_dir_if_empty(const URI& uri) const; + void remove_dir_if_empty(const URI& uri) const override; /** * Deletes directories in parallel from the given vector of directories. @@ -727,6 +589,9 @@ class VFS : FilesystemBase, Status read_exactly( const URI& uri, uint64_t offset, void* buffer, uint64_t nbytes); + /** Checks if a given filesystem is supported. */ + bool supports_fs(FilesystemBase& fs) const; + /** Checks if a given filesystem is supported. */ bool supports_fs(Filesystem fs) const; @@ -808,34 +673,32 @@ class VFS : FilesystemBase, /** * Used in serialization to share the multipart upload state - * among cloud executors during global order writes + * among cloud executors during global order writes. * * @param uri The file uri used as key in the internal map of the backend - * @return A pair of status and VFS::MultiPartUploadState object. + * @return An optional MultiPartUploadState object. */ - std::pair> multipart_upload_state( - const URI& uri); + std::optional multipart_upload_state( + const URI& uri) override; /** * Used in serialization of global order writes to set the multipart upload - * state in the internal maps of cloud backends during deserialization + * state in the internal maps of cloud backends during deserialization. * - * @param uri The file uri used as key in the internal map of the backend - * @param state The multipart upload state info - * @return Status + * @param uri The file uri used as key in the internal map of the backend. + * @param state The multipart upload state info. */ - Status set_multipart_upload_state( - const URI& uri, const MultiPartUploadState& state); + void set_multipart_upload_state( + const URI& uri, const MultiPartUploadState& state) override; /** * Used in remote global order writes to flush the internal * in-memory buffer for an URI that backends maintain to modulate the * frequency of multipart upload requests. * - * @param uri The file uri identifying the backend file buffer - * @return Status + * @param uri The file uri identifying the backend file buffer. */ - Status flush_multipart_file_buffer(const URI& uri); + void flush_multipart_file_buffer(const URI& uri) override; inline stats::Stats* stats() const { return stats_; @@ -1004,17 +867,6 @@ class VFS : FilesystemBase, /* PRIVATE ATTRIBUTES */ /* ********************************* */ -#ifdef _WIN32 - using LocalFS = Win; -#else - using LocalFS = Posix; -#endif - - LocalFS local_; - - /** The in-memory filesystem which is always supported */ - MemFilesystem memfs_; - /** * Config. * @@ -1027,8 +879,8 @@ class VFS : FilesystemBase, /** Logger. */ Logger* logger_; - /** The set with the supported filesystems. */ - std::set supported_fs_; + /** The supported filesystems. */ + std::vector> filesystems_; /** Thread pool for compute-bound tasks. */ ThreadPool* compute_tp_; diff --git a/tiledb/sm/filesystem/win.cc b/tiledb/sm/filesystem/win.cc index c0e8a51f6f9..2e04054af1f 100644 --- a/tiledb/sm/filesystem/win.cc +++ b/tiledb/sm/filesystem/win.cc @@ -289,17 +289,17 @@ void Win::remove_dir(const URI& uri) const { } } -bool Win::remove_dir_if_empty(const std::string& path) const { +void Win::remove_dir_if_empty(const URI& uri) const { + auto path = uri.to + path(); if (!RemoveDirectoryA(path.c_str())) { auto gle = GetLastError(); if (gle == ERROR_DIR_NOT_EMPTY) { - return false; + return; } throw IOError( std::string("Failed to delete directory '") + path + "' " + get_last_error_msg(gle, "RemoveDirectory")); } - return true; } void Win::remove_file(const URI& uri) const { diff --git a/tiledb/sm/filesystem/win.h b/tiledb/sm/filesystem/win.h index 134d74c58e1..788d9259d65 100644 --- a/tiledb/sm/filesystem/win.h +++ b/tiledb/sm/filesystem/win.h @@ -139,9 +139,8 @@ class Win : public LocalFilesystem { * Removes a given empty directory. * * @param path The path of the directory. - * @return true if the directory was removed, false otherwise. */ - bool remove_dir_if_empty(const std::string& path) const; + void remove_dir_if_empty(const URI& uri) const override; /** * Removes a given path. diff --git a/tiledb/sm/fragment/fragment_info.cc b/tiledb/sm/fragment/fragment_info.cc index 87c3c7a76e2..d56ae4dc4a4 100644 --- a/tiledb/sm/fragment/fragment_info.cc +++ b/tiledb/sm/fragment/fragment_info.cc @@ -1113,7 +1113,7 @@ Status FragmentInfo::set_default_timestamp_range() { tuple> FragmentInfo::load( const URI& new_fragment_uri) const { SingleFragmentInfo ret; - auto& vfs = resources_->vfs(); + auto vfs = resources_->vfs(); const auto& array_schema_latest = single_fragment_info_vec_.back().meta()->array_schema(); @@ -1127,7 +1127,7 @@ tuple> FragmentInfo::load( URI coords_uri = new_fragment_uri.join_path(constants::coords + constants::file_suffix); try { - vfs.is_file(coords_uri); + vfs->is_file(coords_uri); } catch (std::exception& e) { return {Status_Error(e.what()), nullopt}; } diff --git a/tiledb/sm/fragment/fragment_metadata.cc b/tiledb/sm/fragment/fragment_metadata.cc index 75f5273b51e..826ba00324f 100644 --- a/tiledb/sm/fragment/fragment_metadata.cc +++ b/tiledb/sm/fragment/fragment_metadata.cc @@ -679,7 +679,7 @@ uint64_t FragmentMetadata::fragment_size() const { if (meta_file_size == 0) { auto meta_uri = fragment_uri_.join_path( std::string(constants::fragment_metadata_filename)); - meta_file_size = resources_->vfs().file_size(meta_uri); + meta_file_size = resources_->vfs()->file_size(meta_uri); } // Validate that the meta_file_size is not zero, either preloaded or fetched // above @@ -777,7 +777,7 @@ std::vector> FragmentMetadata::load( sf.uri_, sf.timestamp_range_, memory_tracker, - !resources.vfs().is_file(coords_uri)); + !resources.vfs()->is_file(coords_uri)); } else { // Fragment format version > 2 metadata = make_shared( @@ -827,7 +827,7 @@ void FragmentMetadata::load( // Load the metadata file size when we are not reading from consolidated // buffer if (fragment_metadata_tile == nullptr) { - meta_file_size_ = resources_->vfs().file_size(meta_uri); + meta_file_size_ = resources_->vfs()->file_size(meta_uri); } // Get fragment name version @@ -928,7 +928,7 @@ void FragmentMetadata::store_v7_v10(const EncryptionKey& encryption_key) { store_footer(encryption_key); // Close file - throw_if_not_ok(resources_->vfs().close_file(fragment_metadata_uri)); + throw_if_not_ok(resources_->vfs()->close_file(fragment_metadata_uri)); } void FragmentMetadata::store_v11(const EncryptionKey& encryption_key) { @@ -1010,7 +1010,7 @@ void FragmentMetadata::store_v11(const EncryptionKey& encryption_key) { store_footer(encryption_key); // Close file - throw_if_not_ok(resources_->vfs().close_file(fragment_metadata_uri)); + throw_if_not_ok(resources_->vfs()->close_file(fragment_metadata_uri)); } void FragmentMetadata::store_v12_v14(const EncryptionKey& encryption_key) { @@ -1097,7 +1097,7 @@ void FragmentMetadata::store_v12_v14(const EncryptionKey& encryption_key) { store_footer(encryption_key); // Close file - throw_if_not_ok(resources_->vfs().close_file(fragment_metadata_uri)); + throw_if_not_ok(resources_->vfs()->close_file(fragment_metadata_uri)); } void FragmentMetadata::store_v15_or_higher( @@ -1190,7 +1190,7 @@ void FragmentMetadata::store_v15_or_higher( store_footer(encryption_key); // Close file - throw_if_not_ok(resources_->vfs().close_file(fragment_metadata_uri)); + throw_if_not_ok(resources_->vfs()->close_file(fragment_metadata_uri)); } void FragmentMetadata::set_num_tiles(uint64_t num_tiles) { @@ -1679,7 +1679,7 @@ void FragmentMetadata::get_footer_offset_and_size( URI fragment_metadata_uri = fragment_uri_.join_path( std::string(constants::fragment_metadata_filename)); uint64_t size_offset = meta_file_size_ - sizeof(uint64_t); - throw_if_not_ok(resources_->vfs().read_exactly( + throw_if_not_ok(resources_->vfs()->read_exactly( fragment_metadata_uri, size_offset, size, sizeof(uint64_t))); *offset = meta_file_size_ - *size - sizeof(uint64_t); resources_->stats().add_counter("read_frag_meta_size", sizeof(uint64_t)); @@ -2758,7 +2758,7 @@ void FragmentMetadata::read_file_footer( } // Read footer - throw_if_not_ok(resources_->vfs().read_exactly( + throw_if_not_ok(resources_->vfs()->read_exactly( fragment_metadata_uri, *footer_offset, tile->data_as(), @@ -2781,11 +2781,11 @@ void FragmentMetadata::write_footer_to_file(shared_ptr tile) const { std::string(constants::fragment_metadata_filename)); uint64_t size = tile->size(); - resources_->vfs().write(fragment_metadata_uri, tile->data(), tile->size()); + resources_->vfs()->write(fragment_metadata_uri, tile->data(), tile->size()); // Write the size in the end if there is at least one var-sized dimension if (!array_schema_->domain().all_dims_fixed() || version_ >= 10) { - resources_->vfs().write(fragment_metadata_uri, &size, sizeof(uint64_t)); + resources_->vfs()->write(fragment_metadata_uri, &size, sizeof(uint64_t)); } } @@ -3464,8 +3464,8 @@ void FragmentMetadata::clean_up() { auto fragment_metadata_uri = fragment_uri_.join_path(constants::fragment_metadata_filename); - throw_if_not_ok(resources_->vfs().close_file(fragment_metadata_uri)); - resources_->vfs().remove_file(fragment_metadata_uri); + throw_if_not_ok(resources_->vfs()->close_file(fragment_metadata_uri)); + resources_->vfs()->remove_file(fragment_metadata_uri); } const shared_ptr& FragmentMetadata::array_schema() const { diff --git a/tiledb/sm/group/group.cc b/tiledb/sm/group/group.cc index 5ad3521ed5e..0e46f79fe15 100644 --- a/tiledb/sm/group/group.cc +++ b/tiledb/sm/group/group.cc @@ -103,17 +103,18 @@ void Group::create(ContextResources& resources, const URI& uri) { } // Create group directory - resources.vfs().create_dir(uri); + resources.vfs()->create_dir(uri); // Create group file URI group_filename = uri.join_path(constants::group_filename); - resources.vfs().touch(group_filename); + resources.vfs()->touch(group_filename); // Create metadata folder - resources.vfs().create_dir(uri.join_path(constants::group_metadata_dir_name)); + resources.vfs()->create_dir( + uri.join_path(constants::group_metadata_dir_name)); // Create group detail folder - resources.vfs().create_dir(uri.join_path(constants::group_detail_dir_name)); + resources.vfs()->create_dir(uri.join_path(constants::group_detail_dir_name)); } void Group::open( @@ -195,7 +196,7 @@ void Group::open( } else if (query_type == QueryType::READ) { group_dir_ = make_shared( HERE(), - resources_.vfs(), + *resources_.vfs().get(), resources_.compute_tp(), group_uri_, timestamp_start, @@ -204,7 +205,7 @@ void Group::open( } else { group_dir_ = make_shared( HERE(), - resources_.vfs(), + *resources_.vfs().get(), resources_.compute_tp(), group_uri_, timestamp_start, @@ -364,17 +365,17 @@ void Group::delete_group(const URI& uri, bool recursive) { } } - auto& vfs = resources_.vfs(); + auto vfs = resources_.vfs(); auto& compute_tp = resources_.compute_tp(); auto group_dir = GroupDirectory( - vfs, compute_tp, uri, 0, std::numeric_limits::max()); + *vfs.get(), compute_tp, uri, 0, std::numeric_limits::max()); // Delete the group detail, group metadata and group files - vfs.remove_files(&compute_tp, group_dir.group_detail_uris()); - vfs.remove_files(&compute_tp, group_dir.group_meta_uris()); - vfs.remove_files(&compute_tp, group_dir.group_meta_uris_to_vacuum()); - vfs.remove_files(&compute_tp, group_dir.group_meta_vac_uris_to_vacuum()); - vfs.remove_files(&compute_tp, group_dir.group_file_uris()); + vfs->remove_files(&compute_tp, group_dir.group_detail_uris()); + vfs->remove_files(&compute_tp, group_dir.group_meta_uris()); + vfs->remove_files(&compute_tp, group_dir.group_meta_uris_to_vacuum()); + vfs->remove_files(&compute_tp, group_dir.group_meta_vac_uris_to_vacuum()); + vfs->remove_files(&compute_tp, group_dir.group_file_uris()); // Delete all tiledb child directories // Note: using vfs().ls() here could delete user data @@ -383,8 +384,8 @@ void Group::delete_group(const URI& uri, bool recursive) { for (auto group_dir_name : constants::group_dir_names) { dirs.emplace_back(URI(parent_dir + group_dir_name)); } - vfs.remove_dirs(&compute_tp, dirs); - vfs.remove_dir_if_empty(group_dir.uri()); + vfs->remove_dirs(&compute_tp, dirs); + vfs->remove_dir_if_empty(group_dir.uri()); } // Clear metadata and other pending changes to avoid patching a deleted group. metadata_.clear(); diff --git a/tiledb/sm/group/group_details.cc b/tiledb/sm/group/group_details.cc index b3ebc51d706..57098be7927 100644 --- a/tiledb/sm/group/group_details.cc +++ b/tiledb/sm/group/group_details.cc @@ -228,9 +228,9 @@ void GroupDetails::store( // Check if the array schema directory exists // If not create it, this is caused by a pre-v10 array - auto& vfs = resources.vfs(); - if (!vfs.is_dir(group_detail_folder_uri)) { - vfs.create_dir(group_detail_folder_uri); + auto vfs = resources.vfs(); + if (!vfs->is_dir(group_detail_folder_uri)) { + vfs->create_dir(group_detail_folder_uri); } GenericTileIO::store_data(resources, group_detail_uri, tile, encryption_key); } diff --git a/tiledb/sm/misc/constants.cc b/tiledb/sm/misc/constants.cc index 18c20eacdd0..a7fd88a8078 100644 --- a/tiledb/sm/misc/constants.cc +++ b/tiledb/sm/misc/constants.cc @@ -670,6 +670,9 @@ const std::string filesystem_type_gcs_str = "GCS"; /** The string representation for in-memory filesystem */ const std::string filesystem_type_mem_str = "MEM"; +/** The string representation for local filesystem */ +const std::string filesystem_type_local_str = "LOCALß"; + /** The string representation for WalkOrder preorder. */ const std::string walkorder_preorder_str = "PREORDER"; diff --git a/tiledb/sm/misc/constants.h b/tiledb/sm/misc/constants.h index 6db29c26702..108242d3f83 100644 --- a/tiledb/sm/misc/constants.h +++ b/tiledb/sm/misc/constants.h @@ -666,6 +666,9 @@ extern const std::string filesystem_type_gcs_str; /** The string representation for in-memory filesystem. */ extern const std::string filesystem_type_mem_str; +/** The string representation for local filesystem. */ +extern const std::string filesystem_type_local_str; + /** The string representation for WalkOrder preorder. */ extern const std::string walkorder_preorder_str; diff --git a/tiledb/sm/object/object.cc b/tiledb/sm/object/object.cc index 8cfdeda26c6..7c2735aaeb2 100644 --- a/tiledb/sm/object/object.cc +++ b/tiledb/sm/object/object.cc @@ -59,13 +59,13 @@ bool is_array(ContextResources& resources, const URI& uri) { return exists.value(); } else { // Check if the schema directory exists or not - auto& vfs = resources.vfs(); - if (vfs.is_dir(uri.join_path(constants::array_schema_dir_name))) { + auto vfs = resources.vfs(); + if (vfs->is_dir(uri.join_path(constants::array_schema_dir_name))) { return true; } // If there is no schema directory, we check schema file - return vfs.is_file(uri.join_path(constants::array_schema_filename)); + return vfs->is_file(uri.join_path(constants::array_schema_filename)); } } @@ -78,13 +78,13 @@ bool is_group(ContextResources& resources, const URI& uri) { return exists.value(); } else { // Check for new group details directory - auto& vfs = resources.vfs(); - if (vfs.is_dir(uri.join_path(constants::group_detail_dir_name))) { + auto vfs = resources.vfs(); + if (vfs->is_dir(uri.join_path(constants::group_detail_dir_name))) { return true; } // Fall back to older group file to check for legacy (pre-format 12) groups - return vfs.is_file(uri.join_path(constants::group_filename)); + return vfs->is_file(uri.join_path(constants::group_filename)); } } @@ -99,7 +99,7 @@ ObjectType object_type(ContextResources& resources, const URI& uri) { URI(utils::parse::ends_with(uri_str, "/") ? uri_str : (uri_str + "/")); } else if (!uri.is_tiledb()) { // For non public cloud backends, listing a non-directory is an error. - if (!resources.vfs().is_dir(uri)) { + if (!resources.vfs()->is_dir(uri)) { return ObjectType::INVALID; } } @@ -134,7 +134,7 @@ void object_move( "'; Invalid TileDB object"); } - resources.vfs().move_dir(old_uri, new_uri); + resources.vfs()->move_dir(old_uri, new_uri); } void object_remove(ContextResources& resources, const char* path) { @@ -150,7 +150,7 @@ void object_remove(ContextResources& resources, const char* path) { "'; Invalid TileDB object"); } - resources.vfs().remove_dir(uri); + resources.vfs()->remove_dir(uri); } } // namespace tiledb::sm diff --git a/tiledb/sm/object/object_iter.cc b/tiledb/sm/object/object_iter.cc index 43599d6de22..c3752a4e126 100644 --- a/tiledb/sm/object/object_iter.cc +++ b/tiledb/sm/object/object_iter.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2024 TileDB, Inc. + * @copyright Copyright (c) 2024-2025 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -54,7 +54,7 @@ ObjectIter* object_iter_begin( // Get all contents of path std::vector uris; - throw_if_not_ok(resources.vfs().ls(path_uri, &uris)); + throw_if_not_ok(resources.vfs()->ls(path_uri, &uris)); // Create a new object iterator ObjectIter* obj_iter = tdb_new(ObjectIter); @@ -90,7 +90,7 @@ ObjectIter* object_iter_begin(ContextResources& resources, const char* path) { // Get all contents of path std::vector uris; - throw_if_not_ok(resources.vfs().ls(path_uri, &uris)); + throw_if_not_ok(resources.vfs()->ls(path_uri, &uris)); // Create a new object iterator ObjectIter* obj_iter = tdb_new(ObjectIter); @@ -158,7 +158,7 @@ Status object_iter_next_postorder( do { obj_num = obj_iter->objs_.size(); std::vector uris; - throw_if_not_ok(resources.vfs().ls(obj_iter->objs_.front(), &uris)); + throw_if_not_ok(resources.vfs()->ls(obj_iter->objs_.front(), &uris)); obj_iter->expanded_.front() = true; // Push the new TileDB objects in the front of the iterator's list @@ -209,7 +209,7 @@ Status object_iter_next_preorder( // Get all contents of the next URI std::vector uris; - throw_if_not_ok(resources.vfs().ls(front_uri, &uris)); + throw_if_not_ok(resources.vfs()->ls(front_uri, &uris)); // Push the new TileDB objects in the front of the iterator's list ObjectType obj_type; diff --git a/tiledb/sm/query/deletes_and_updates/deletes_and_updates.cc b/tiledb/sm/query/deletes_and_updates/deletes_and_updates.cc index d6278ec68ca..3d14bcef120 100644 --- a/tiledb/sm/query/deletes_and_updates/deletes_and_updates.cc +++ b/tiledb/sm/query/deletes_and_updates/deletes_and_updates.cc @@ -141,7 +141,7 @@ Status DeletesAndUpdates::dowork() { // Create the commit URI if needed. auto& array_dir = array_->array_directory(); auto commit_uri = array_dir.get_commits_dir(write_version); - resources_.vfs().create_dir(commit_uri); + resources_.vfs()->create_dir(commit_uri); // Serialize the negated condition (aud update values if they are not empty) // and write to disk. diff --git a/tiledb/sm/query/readers/filtered_data.h b/tiledb/sm/query/readers/filtered_data.h index 4c8e8df0ad7..d33e60ad567 100644 --- a/tiledb/sm/query/readers/filtered_data.h +++ b/tiledb/sm/query/readers/filtered_data.h @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2023-2024 TileDB, Inc. + * @copyright Copyright (c) 2023-2025 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -395,7 +395,7 @@ class FilteredData { auto size{block.size()}; URI uri{file_uri(fragment_metadata_[block.frag_idx()].get(), type)}; auto task = resources_.io_tp().execute([this, offset, data, size, uri]() { - throw_if_not_ok(resources_.vfs().read_exactly(uri, offset, data, size)); + throw_if_not_ok(resources_.vfs()->read_exactly(uri, offset, data, size)); return Status::Ok(); }); read_tasks_.push_back(std::move(task)); diff --git a/tiledb/sm/query/writers/global_order_writer.cc b/tiledb/sm/query/writers/global_order_writer.cc index 21254a764f2..5e78c3874f1 100644 --- a/tiledb/sm/query/writers/global_order_writer.cc +++ b/tiledb/sm/query/writers/global_order_writer.cc @@ -230,22 +230,21 @@ GlobalOrderWriter::GlobalWriteState* GlobalOrderWriter::get_global_state() { return global_write_state_.get(); } -std::pair> +// #TODO ensure this works as-intended +std::pair> GlobalOrderWriter::multipart_upload_state(bool client) { if (client) { return {Status::Ok(), global_write_state_->multipart_upload_state_}; } auto meta = global_write_state_->frag_meta_; - std::unordered_map result; + std::unordered_map result; // TODO: to be refactored, there are multiple places in writers where // we iterate over the internal fragment files manually for (const auto& name : buffer_names()) { auto uri = meta->uri(name); - - auto&& [st2, state] = resources_.vfs().multipart_upload_state(uri); - RETURN_NOT_OK_TUPLE(st2, {}); + auto state = resources_.vfs()->multipart_upload_state(uri); // If there is no entry for this uri, probably multipart upload is disabled // or no write was issued so far if (!state.has_value()) { @@ -255,16 +254,13 @@ GlobalOrderWriter::multipart_upload_state(bool client) { if (array_schema_.var_size(name)) { auto var_uri = meta->var_uri(name); - auto&& [st, var_state] = resources_.vfs().multipart_upload_state(var_uri); - RETURN_NOT_OK_TUPLE(st, {}); + auto var_state = resources_.vfs()->multipart_upload_state(var_uri); result[var_uri.remove_trailing_slash().last_path_part()] = std::move(*var_state); } if (array_schema_.is_nullable(name)) { auto validity_uri = meta->validity_uri(name); - auto&& [st, val_state] = - resources_.vfs().multipart_upload_state(validity_uri); - RETURN_NOT_OK_TUPLE(st, {}); + auto val_state = resources_.vfs()->multipart_upload_state(validity_uri); result[validity_uri.remove_trailing_slash().last_path_part()] = std::move(*val_state); } @@ -274,9 +270,7 @@ GlobalOrderWriter::multipart_upload_state(bool client) { } Status GlobalOrderWriter::set_multipart_upload_state( - const std::string& uri, - const VFS::MultiPartUploadState& state, - bool client) { + const std::string& uri, const MultiPartUploadState& state, bool client) { if (client) { global_write_state_->multipart_upload_state_[uri] = state; return Status::Ok(); @@ -285,7 +279,8 @@ Status GlobalOrderWriter::set_multipart_upload_state( // uri in this case holds only the buffer name auto absolute_uri = global_write_state_->frag_meta_->fragment_uri().join_path(uri); - return resources_.vfs().set_multipart_upload_state(absolute_uri, state); + resources_.vfs()->set_multipart_upload_state(absolute_uri, state); + return Status::Ok(); } /* ****************************** */ @@ -502,13 +497,13 @@ void GlobalOrderWriter::clean_up() { // Cleanup the fragment we are currently writing. There is a chance that the // URI is empty if creating the first fragment had failed. if (!uri.empty()) { - resources_.vfs().remove_dir(uri); + resources_.vfs()->remove_dir(uri); } global_write_state_.reset(nullptr); // Cleanup all fragments pending commit. for (auto& uri : frag_uris_to_commit_) { - resources_.vfs().remove_dir(uri); + resources_.vfs()->remove_dir(uri); } frag_uris_to_commit_.clear(); } @@ -699,7 +694,7 @@ Status GlobalOrderWriter::finalize_global_write_state() { // Write either one commit file or a consolidated commit file if multiple // fragments were written. if (frag_uris_to_commit_.size() == 0) { - resources_.vfs().touch(commit_uri); + resources_.vfs()->touch(commit_uri); } else { std::vector commit_uris; commit_uris.reserve(frag_uris_to_commit_.size() + 1); @@ -848,7 +843,7 @@ Status GlobalOrderWriter::global_write_handle_last_tile() { void GlobalOrderWriter::nuke_global_write_state() { auto meta = global_write_state_->frag_meta_; throw_if_not_ok(close_files(meta)); - resources_.vfs().remove_dir(meta->fragment_uri()); + resources_.vfs()->remove_dir(meta->fragment_uri()); global_write_state_.reset(nullptr); } diff --git a/tiledb/sm/query/writers/global_order_writer.h b/tiledb/sm/query/writers/global_order_writer.h index c15b81f67c8..5d50b85c08b 100644 --- a/tiledb/sm/query/writers/global_order_writer.h +++ b/tiledb/sm/query/writers/global_order_writer.h @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2017-2022 TileDB, Inc. + * @copyright Copyright (c) 2017-2025 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -43,8 +43,9 @@ using namespace tiledb::common; -namespace tiledb { -namespace sm { +namespace tiledb::sm { + +using MultiPartUploadState = FilesystemBase::MultiPartUploadState; /** Processes write queries. */ class GlobalOrderWriter : public WriterBase { @@ -106,7 +107,7 @@ class GlobalOrderWriter : public WriterBase { * A mapping of buffer names to multipart upload state used by clients * to track the write state in remote global order writes */ - std::unordered_map + std::unordered_map multipart_upload_state_; }; @@ -168,7 +169,7 @@ class GlobalOrderWriter : public WriterBase { * or from within the cloud backend internal mappings if the code is executed * on the rest server. */ - std::pair> + std::pair> multipart_upload_state(bool client); /** @@ -182,9 +183,7 @@ class GlobalOrderWriter : public WriterBase { * @return Status */ Status set_multipart_upload_state( - const std::string& uri, - const VFS::MultiPartUploadState& state, - bool client); + const std::string& uri, const MultiPartUploadState& state, bool client); private: /* ********************************* */ @@ -393,7 +392,6 @@ class GlobalOrderWriter : public WriterBase { Status start_new_fragment(); }; -} // namespace sm -} // namespace tiledb +} // namespace tiledb::sm #endif // TILEDB_GLOBAL_ORDER_WRITER_H diff --git a/tiledb/sm/query/writers/ordered_writer.cc b/tiledb/sm/query/writers/ordered_writer.cc index 3a3ec42e04e..7811eec381b 100644 --- a/tiledb/sm/query/writers/ordered_writer.cc +++ b/tiledb/sm/query/writers/ordered_writer.cc @@ -149,7 +149,7 @@ std::string OrderedWriter::name() { void OrderedWriter::clean_up() { if (frag_uri_.has_value()) { - resources_.vfs().remove_dir(frag_uri_.value()); + resources_.vfs()->remove_dir(frag_uri_.value()); } } @@ -281,7 +281,7 @@ Status OrderedWriter::ordered_write() { // The following will make the fragment visible URI commit_uri = array_->array_directory().get_commit_uri(frag_uri_.value()); - resources_.vfs().touch(commit_uri); + resources_.vfs()->touch(commit_uri); return Status::Ok(); } diff --git a/tiledb/sm/query/writers/unordered_writer.cc b/tiledb/sm/query/writers/unordered_writer.cc index 1779a204253..9a7f700be29 100644 --- a/tiledb/sm/query/writers/unordered_writer.cc +++ b/tiledb/sm/query/writers/unordered_writer.cc @@ -179,7 +179,7 @@ Status UnorderedWriter::alloc_frag_meta() { void UnorderedWriter::clean_up() { if (frag_uri_.has_value()) { - resources_.vfs().remove_dir(frag_uri_.value()); + resources_.vfs()->remove_dir(frag_uri_.value()); } } @@ -724,7 +724,7 @@ Status UnorderedWriter::unordered_write() { // The following will make the fragment visible URI commit_uri = array_->array_directory().get_commit_uri(frag_uri_.value()); - resources_.vfs().touch(commit_uri); + resources_.vfs()->touch(commit_uri); // Clear some data to prevent it from being serialized. cell_pos_.clear(); diff --git a/tiledb/sm/query/writers/writer_base.cc b/tiledb/sm/query/writers/writer_base.cc index 4962018733e..fcf32224877 100644 --- a/tiledb/sm/query/writers/writer_base.cc +++ b/tiledb/sm/query/writers/writer_base.cc @@ -601,9 +601,9 @@ Status WriterBase::close_files(shared_ptr meta) const { const auto& file_uri = file_uris[i]; if (layout_ == Layout::GLOBAL_ORDER && remote_query()) { // flush with finalize == true - resources_.vfs().flush(file_uri, true); + resources_.vfs()->flush(file_uri, true); } else { - throw_if_not_ok(resources_.vfs().close_file(file_uri)); + throw_if_not_ok(resources_.vfs()->close_file(file_uri)); } return Status::Ok(); }); @@ -767,9 +767,9 @@ Status WriterBase::create_fragment( // Create the directories. // Create the fragment directory, the directory for the new fragment // URI, and the commit directory. - resources_.vfs().create_dir(array_dir.get_fragments_dir(write_version)); - resources_.vfs().create_dir(fragment_uri_); - resources_.vfs().create_dir(array_dir.get_commits_dir(write_version)); + resources_.vfs()->create_dir(array_dir.get_fragments_dir(write_version)); + resources_.vfs()->create_dir(fragment_uri_); + resources_.vfs()->create_dir(array_dir.get_commits_dir(write_version)); // Create fragment metadata. auto timestamp_range = std::pair(timestamp, timestamp); @@ -1105,7 +1105,7 @@ Status WriterBase::write_tiles( ++i, ++tile_id) { auto& tile = (*tiles)[i]; auto& t = var_size ? tile.offset_tile() : tile.fixed_tile(); - resources_.vfs().write( + resources_.vfs()->write( uri, t.filtered_buffer().data(), t.filtered_buffer().size(), @@ -1115,7 +1115,7 @@ Status WriterBase::write_tiles( if (var_size) { auto& t_var = tile.var_tile(); - resources_.vfs().write( + resources_.vfs()->write( var_uri, t_var.filtered_buffer().data(), t_var.filtered_buffer().size(), @@ -1140,7 +1140,7 @@ Status WriterBase::write_tiles( if (nullable) { auto& t_val = tile.validity_tile(); - resources_.vfs().write( + resources_.vfs()->write( validity_uri, t_val.filtered_buffer().data(), t_val.filtered_buffer().size(), @@ -1169,10 +1169,10 @@ Status WriterBase::write_tiles( // requirement of remote global order writes, it should only be // done if this code is executed as a result of a remote query if (remote_query()) { - throw_if_not_ok(resources_.vfs().flush_multipart_file_buffer(u)); + resources_.vfs()->flush_multipart_file_buffer(u); } } else { - throw_if_not_ok(resources_.vfs().close_file(u)); + throw_if_not_ok(resources_.vfs()->close_file(u)); } } } diff --git a/tiledb/sm/serialization/query.cc b/tiledb/sm/serialization/query.cc index bb5f4ae0f44..29e820ceaba 100644 --- a/tiledb/sm/serialization/query.cc +++ b/tiledb/sm/serialization/query.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2017-2024 TileDB, Inc. + * @copyright Copyright (c) 2017-2025 TileDB, Inc. * @copyright Copyright (c) 2016 MIT and Intel Corporation * * Permission is hereby granted, free of charge, to any person obtaining a copy @@ -2969,7 +2969,7 @@ Status global_write_state_from_capnp( auto multipart_reader = state_reader.getMultiPartUploadStates(); if (multipart_reader.hasEntries()) { for (auto entry : multipart_reader.getEntries()) { - VFS::MultiPartUploadState deserialized_state; + FilesystemBase::MultiPartUploadState deserialized_state; auto state = entry.getValue(); auto buffer_uri = std::string_view{entry.getKey().cStr(), entry.getKey().size()}; diff --git a/tiledb/sm/storage_manager/context_resources.cc b/tiledb/sm/storage_manager/context_resources.cc index 521ec52e551..9d3705e31db 100644 --- a/tiledb/sm/storage_manager/context_resources.cc +++ b/tiledb/sm/storage_manager/context_resources.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2017-2021 TileDB, Inc. + * @copyright Copyright (c) 2017-2025 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -58,7 +58,8 @@ ContextResources::ContextResources( , compute_tp_(compute_thread_count) , io_tp_(io_thread_count) , stats_(make_shared(HERE(), stats_name)) - , vfs_(stats_.get(), logger_.get(), &compute_tp_, &io_tp_, config) + //, vfs_(stats_.get(), logger_.get(), &compute_tp_, &io_tp_, config, + // std::move(make_filesystems())) , rest_client_{RestClientFactory::make( *(stats_.get()), config_, @@ -77,6 +78,14 @@ ContextResources::ContextResources( throw std::logic_error("Logger must not be nullptr"); } + vfs_ = make_shared( + stats_.get(), + logger_.get(), + &compute_tp_, + &io_tp_, + config, + make_filesystems(stats_.get(), &io_tp_, config_)); + memory_tracker_reporter_->start(); } @@ -93,4 +102,29 @@ shared_ptr ContextResources::serialization_memory_tracker() return serialization_memory_tracker_; } +std::vector> ContextResources::make_filesystems( + [[maybe_unused]] stats::Stats* parent_stats, + [[maybe_unused]] ThreadPool* io_tp, + const Config& config) { + std::vector> filesystems; + filesystems.emplace_back(std::make_unique()); +#ifdef _WIN32 + using LocalFS = Win; +#else + using LocalFS = Posix; +#endif + filesystems.emplace_back(std::make_unique(config)); + +#ifdef HAVE_AZURE + filesystems.emplace_back(std::make_unique(io_tp, config)); +#endif +#ifdef HAVE_GCS + filesystems.emplace_back(std::make_unique(io_tp, config)); +#endif +#ifdef HAVE_S3 + filesystems.emplace_back(std::make_unique(parent_stats, io_tp, config)); +#endif + return filesystems; +} + } // namespace tiledb::sm diff --git a/tiledb/sm/storage_manager/context_resources.h b/tiledb/sm/storage_manager/context_resources.h index 1ea083fdc2c..9880a672bea 100644 --- a/tiledb/sm/storage_manager/context_resources.h +++ b/tiledb/sm/storage_manager/context_resources.h @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2018-2022 TileDB, Inc. + * @copyright Copyright (c) 2018-2025 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -103,7 +103,7 @@ class ContextResources { return *(stats_.get()); } - [[nodiscard]] inline VFS& vfs() const { + [[nodiscard]] inline shared_ptr vfs() const { return vfs_; } @@ -144,6 +144,20 @@ class ContextResources { */ shared_ptr serialization_memory_tracker() const; + /** + * Return the vector of supported filesystems. + * + * @param stats The parent stats to inherit from. + * @param io_tp Thread pool for io-bound tasks. + * @param config Configuration parameters. + * + * @return The vector of supported filesystems. + */ + static std::vector> make_filesystems( + [[maybe_unused]] stats::Stats* parent_stats, + [[maybe_unused]] ThreadPool* io_tp, + const Config& config); + private: /* ********************************* */ /* PRIVATE ATTRIBUTES */ @@ -180,7 +194,7 @@ class ContextResources { * Virtual filesystem handler. It directs queries to the appropriate * filesystem backend. Note that this is stateful. */ - mutable VFS vfs_; + shared_ptr vfs_; /** The rest client (may be null if none was configured). */ shared_ptr rest_client_; diff --git a/tiledb/sm/storage_manager/storage_manager.cc b/tiledb/sm/storage_manager/storage_manager.cc index b67261aaa0c..eda7b42d0c0 100644 --- a/tiledb/sm/storage_manager/storage_manager.cc +++ b/tiledb/sm/storage_manager/storage_manager.cc @@ -5,7 +5,7 @@ * * The MIT License * - * @copyright Copyright (c) 2017-2024 TileDB, Inc. + * @copyright Copyright (c) 2017-2025 TileDB, Inc. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -61,7 +61,7 @@ StorageManagerCanonical::StorageManagerCanonical( const shared_ptr&, // unused const Config& config) : global_state_(global_state::GlobalState::GetGlobalState()) - , vfs_(resources.vfs()) + , vfs_(*resources.vfs().get()) , cancellation_in_progress_(false) , config_(config) , queries_in_progress_(0) { diff --git a/tiledb/sm/tile/generic_tile_io.cc b/tiledb/sm/tile/generic_tile_io.cc index 6c84d07f95a..20b9902f158 100644 --- a/tiledb/sm/tile/generic_tile_io.cc +++ b/tiledb/sm/tile/generic_tile_io.cc @@ -45,8 +45,7 @@ using namespace tiledb::common; -namespace tiledb { -namespace sm { +namespace tiledb::sm { /** Class for locally generated status exceptions. */ class GenericTileIOException : public StatusException { @@ -124,7 +123,7 @@ shared_ptr GenericTileIO::read_generic( memory_tracker->get_resource(MemoryType::GENERIC_TILE_IO)); // Read the tile. - throw_if_not_ok(resources_.vfs().read_exactly( + throw_if_not_ok(resources_.vfs()->read_exactly( uri_, file_offset + tile_data_offset, tile->filtered_data(), @@ -144,7 +143,7 @@ GenericTileIO::GenericTileHeader GenericTileIO::read_generic_tile_header( std::vector base_buf(GenericTileHeader::BASE_SIZE); - throw_if_not_ok(resources.vfs().read_exactly( + throw_if_not_ok(resources.vfs()->read_exactly( uri, file_offset, base_buf.data(), base_buf.size())); Deserializer base_deserializer(base_buf.data(), base_buf.size()); @@ -159,7 +158,7 @@ GenericTileIO::GenericTileHeader GenericTileIO::read_generic_tile_header( // Read header filter pipeline. std::vector filter_pipeline_buf(header.filter_pipeline_size); - throw_if_not_ok(resources.vfs().read_exactly( + throw_if_not_ok(resources.vfs()->read_exactly( uri, file_offset + GenericTileHeader::BASE_SIZE, filter_pipeline_buf.data(), @@ -184,7 +183,7 @@ void GenericTileIO::store_data( GenericTileIO tile_io(resources, uri); uint64_t nbytes = 0; tile_io.write_generic(tile, encryption_key, &nbytes); - throw_if_not_ok(resources.vfs().close_file(uri)); + throw_if_not_ok(resources.vfs()->close_file(uri)); } void GenericTileIO::write_generic( @@ -204,7 +203,7 @@ void GenericTileIO::write_generic( write_generic_tile_header(&header); - resources_.vfs().write( + resources_.vfs()->write( uri_, tile->filtered_buffer().data(), tile->filtered_buffer().size()); *nbytes = GenericTileIO::GenericTileHeader::BASE_SIZE + @@ -237,7 +236,7 @@ void GenericTileIO::write_generic_tile_header(GenericTileHeader* header) { serialize_generic_tile_header(serializer, *header); // Write buffer to file - resources_.vfs().write(uri_, data.data(), data.size()); + resources_.vfs()->write(uri_, data.data(), data.size()); } void GenericTileIO::configure_encryption_filter( @@ -278,5 +277,4 @@ void GenericTileIO::init_generic_tile_header( &header->filters, encryption_key)); } -} // namespace sm -} // namespace tiledb +} // namespace tiledb::sm