diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 4889c69b1859..3653494094f9 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -35,6 +35,7 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc protocol_client.cc snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc detail/save_stages_controller.cc + detail/snapshot_storage.cc set_family.cc stream_family.cc string_family.cc zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc diff --git a/src/server/detail/save_stages_controller.cc b/src/server/detail/save_stages_controller.cc index aba952b79616..4db66897e4bd 100644 --- a/src/server/detail/save_stages_controller.cc +++ b/src/server/detail/save_stages_controller.cc @@ -6,20 +6,14 @@ #include "server/detail/save_stages_controller.h" #include -#include -#include #include "base/flags.h" #include "base/logging.h" -#include "io/file_util.h" #include "server/main_service.h" #include "server/script_mgr.h" #include "server/search/doc_index.h" #include "server/transaction.h" #include "strings/human_readable.h" -#include "util/cloud/s3.h" -#include "util/fibers/fiber_file.h" -#include "util/uring/uring_file.h" using namespace std; @@ -38,39 +32,14 @@ namespace fs = std::filesystem; namespace { -const size_t kBucketConnectMs = 2000; - -#ifdef __linux__ -const int kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT; -#endif - -constexpr string_view kS3Prefix = "s3://"sv; - bool IsCloudPath(string_view path) { return absl::StartsWith(path, kS3Prefix); } -// Returns bucket_name, obj_path for an s3 path. -optional> GetBucketPath(string_view path) { - string_view clean = absl::StripPrefix(path, kS3Prefix); - - size_t pos = clean.find('/'); - if (pos == string_view::npos) - return nullopt; - - string bucket_name{clean.substr(0, pos)}; - string obj_path{clean.substr(pos + 1)}; - return make_pair(move(bucket_name), move(obj_path)); -} - string FormatTs(absl::Time now) { return absl::FormatTime("%Y-%m-%dT%H:%M:%S", now, absl::LocalTimeZone()); } -void SubstituteFilenameTsPlaceholder(fs::path* filename, std::string_view replacement) { - *filename = absl::StrReplaceAll(filename->string(), {{"{timestamp}", replacement}}); -} - // Create a directory and all its parents if they don't exist. error_code CreateDirs(fs::path dir_path) { error_code ec; @@ -94,32 +63,6 @@ void ExtendDfsFilenameWithShard(int shard, string_view extension, fs::path* file SetExtension(absl::Dec(shard, absl::kZeroPad4), extension, filename); } -// takes ownership over the file. -class LinuxWriteWrapper : public io::Sink { - public: - LinuxWriteWrapper(fb2::LinuxFile* lf) : lf_(lf) { - } - - io::Result WriteSome(const iovec* v, uint32_t len) final; - - error_code Close() { - return lf_->Close(); - } - - private: - unique_ptr lf_; - off_t offset_ = 0; -}; - -io::Result LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) { - io::Result res = lf_->WriteSome(v, len, offset_, 0); - if (res) { - offset_ += *res; - } - - return res; -} - } // namespace GenericError ValidateFilename(const fs::path& filename, bool new_version) { @@ -151,144 +94,11 @@ GenericError ValidateFilename(const fs::path& filename, bool new_version) { return {}; } -FileSnapshotStorage::FileSnapshotStorage(FiberQueueThreadPool* fq_threadpool) - : fq_threadpool_{fq_threadpool} { -} - -io::Result, GenericError> FileSnapshotStorage::OpenFile( - const std::string& path) { - if (fq_threadpool_) { // EPOLL - auto res = util::OpenFiberWriteFile(path, fq_threadpool_); - if (!res) { - return nonstd::make_unexpected(GenericError(res.error(), "Couldn't open file for writing")); - } - - return std::pair(*res, FileType::FILE); - } else { -#ifdef __linux__ - auto res = OpenLinux(path, kRdbWriteFlags, 0666); - if (!res) { - return nonstd::make_unexpected(GenericError( - res.error(), - "Couldn't open file for writing (is direct I/O supported by the file system?)")); - } - - uint8_t file_type = FileType::FILE | FileType::IO_URING; - if (kRdbWriteFlags & O_DIRECT) { - file_type |= FileType::DIRECT; - } - return std::pair(new LinuxWriteWrapper(res->release()), file_type); -#else - LOG(FATAL) << "Linux I/O is not supported on this platform"; -#endif - } -} - -AwsS3SnapshotStorage::AwsS3SnapshotStorage(util::cloud::AWS* aws) : aws_{aws} { -} - -io::Result, GenericError> AwsS3SnapshotStorage::OpenFile( - const std::string& path) { - DCHECK(aws_); - - optional> bucket_path = GetBucketPath(path); - if (!bucket_path) { - return nonstd::make_unexpected(GenericError("Invalid S3 path")); - } - auto [bucket_name, obj_path] = *bucket_path; - - cloud::S3Bucket bucket(*aws_, bucket_name); - error_code ec = bucket.Connect(kBucketConnectMs); - if (ec) { - return nonstd::make_unexpected(GenericError(ec, "Couldn't connect to S3 bucket")); - } - auto res = bucket.OpenWriteFile(obj_path); - if (!res) { - return nonstd::make_unexpected(GenericError(res.error(), "Couldn't open file for writing")); - } - - return std::pair(*res, FileType::CLOUD); -} - -string InferLoadFile(string_view dir, cloud::AWS* aws) { - fs::path data_folder; - string bucket_name, obj_path; - - if (dir.empty()) { - data_folder = fs::current_path(); - } else { - if (IsCloudPath(dir)) { - CHECK(aws); - auto res = GetBucketPath(dir); - if (!res) { - LOG(ERROR) << "Invalid S3 path: " << dir; - return {}; - } - data_folder = dir; - bucket_name = res->first; - obj_path = res->second; - } else { - error_code file_ec; - data_folder = fs::canonical(dir, file_ec); - if (file_ec) { - LOG(ERROR) << "Data directory error: " << file_ec.message() << " for dir " << dir; - return {}; - } - } - } - - LOG(INFO) << "Data directory is " << data_folder; - - const auto& dbname = GetFlag(FLAGS_dbfilename); - if (dbname.empty()) - return string{}; - - if (IsCloudPath(dir)) { - cloud::S3Bucket bucket(*aws, bucket_name); - ProactorBase* proactor = shard_set->pool()->GetNextProactor(); - auto ec = proactor->Await([&] { return bucket.Connect(kBucketConnectMs); }); - if (ec) { - LOG(ERROR) << "Couldn't connect to S3 bucket: " << ec.message(); - return {}; - } - - fs::path fl_path{obj_path}; - fl_path.append(dbname); - - LOG(INFO) << "Loading from s3 path s3://" << bucket_name << "/" << fl_path; - // TODO: to load from S3 file. - return {}; - } - - fs::path fl_path = data_folder.append(dbname); - if (fs::exists(fl_path)) - return fl_path.generic_string(); - - SubstituteFilenameTsPlaceholder(&fl_path, "*"); - if (!fl_path.has_extension()) { - fl_path += "*"; - } - io::Result short_vec = io::StatFiles(fl_path.generic_string()); - - if (short_vec) { - // io::StatFiles returns a list of sorted files. Because our timestamp format has the same - // time order and lexicographic order we iterate from the end to find the latest snapshot. - auto it = std::find_if(short_vec->rbegin(), short_vec->rend(), [](const auto& stat) { - return absl::EndsWith(stat.name, ".rdb") || absl::EndsWith(stat.name, "summary.dfs"); - }); - if (it != short_vec->rend()) - return it->name; - } else { - LOG(WARNING) << "Could not stat " << fl_path << ", error " << short_vec.error().message(); - } - return string{}; -} - GenericError RdbSnapshot::Start(SaveMode save_mode, const std::string& path, const RdbSaver::GlobalData& glob_data) { VLOG(1) << "Saving RDB " << path; - auto res = snapshot_storage_->OpenFile(path); + auto res = snapshot_storage_->OpenWriteFile(path); if (!res) { return res.error(); } diff --git a/src/server/detail/save_stages_controller.h b/src/server/detail/save_stages_controller.h index 710949eacd19..ad0e03592000 100644 --- a/src/server/detail/save_stages_controller.h +++ b/src/server/detail/save_stages_controller.h @@ -7,6 +7,7 @@ #include +#include "server/detail/snapshot_storage.h" #include "server/rdb_save.h" #include "server/server_family.h" #include "util/cloud/aws.h" @@ -19,45 +20,6 @@ class Service; namespace detail { -enum FileType : uint8_t { - FILE = (1u << 0), - CLOUD = (1u << 1), - IO_URING = (1u << 2), - DIRECT = (1u << 3), -}; - -class SnapshotStorage { - public: - virtual ~SnapshotStorage() = default; - - // Opens the file at the given path, and returns the open file and file - // type, which is a bitmask of FileType. - virtual io::Result, GenericError> OpenFile( - const std::string& path) = 0; -}; - -class FileSnapshotStorage : public SnapshotStorage { - public: - FileSnapshotStorage(FiberQueueThreadPool* fq_threadpool); - - io::Result, GenericError> OpenFile( - const std::string& path) override; - - private: - util::fb2::FiberQueueThreadPool* fq_threadpool_; -}; - -class AwsS3SnapshotStorage : public SnapshotStorage { - public: - AwsS3SnapshotStorage(util::cloud::AWS* aws); - - io::Result, GenericError> OpenFile( - const std::string& path) override; - - private: - util::cloud::AWS* aws_; -}; - struct SaveStagesInputs { bool use_dfs_format_; std::string_view basename_; diff --git a/src/server/detail/snapshot_storage.cc b/src/server/detail/snapshot_storage.cc new file mode 100644 index 000000000000..890ccbe830da --- /dev/null +++ b/src/server/detail/snapshot_storage.cc @@ -0,0 +1,213 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. + +#include "server/detail/snapshot_storage.h" + +#include +#include + +#include "base/logging.h" +#include "io/file_util.h" +#include "util/cloud/s3.h" +#include "util/fibers/fiber_file.h" +#include "util/uring/uring_file.h" + +namespace dfly { +namespace detail { + +std::optional> GetBucketPath(std::string_view path) { + std::string_view clean = absl::StripPrefix(path, kS3Prefix); + + size_t pos = clean.find('/'); + if (pos == std::string_view::npos) + return std::nullopt; + + std::string bucket_name{clean.substr(0, pos)}; + std::string obj_path{clean.substr(pos + 1)}; + return std::make_pair(std::move(bucket_name), std::move(obj_path)); +} + +#ifdef __linux__ +const int kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT; +#endif + +FileSnapshotStorage::FileSnapshotStorage(FiberQueueThreadPool* fq_threadpool) + : fq_threadpool_{fq_threadpool} { +} + +io::Result, GenericError> FileSnapshotStorage::OpenWriteFile( + const std::string& path) { + if (fq_threadpool_) { // EPOLL + auto res = util::OpenFiberWriteFile(path, fq_threadpool_); + if (!res) { + return nonstd::make_unexpected(GenericError(res.error(), "Couldn't open file for writing")); + } + + return std::pair(*res, FileType::FILE); + } else { +#ifdef __linux__ + auto res = util::fb2::OpenLinux(path, kRdbWriteFlags, 0666); + if (!res) { + return nonstd::make_unexpected(GenericError( + res.error(), + "Couldn't open file for writing (is direct I/O supported by the file system?)")); + } + + uint8_t file_type = FileType::FILE | FileType::IO_URING; + if (kRdbWriteFlags & O_DIRECT) { + file_type |= FileType::DIRECT; + } + return std::pair(new LinuxWriteWrapper(res->release()), file_type); +#else + LOG(FATAL) << "Linux I/O is not supported on this platform"; +#endif + } +} + +io::ReadonlyFileOrError FileSnapshotStorage::OpenReadFile(const std::string& path) { +#ifdef __linux__ + if (fq_threadpool_) { + return util::OpenFiberReadFile(path, fq_threadpool_); + } else { + return util::fb2::OpenRead(path); + } +#else + return util::OpenFiberReadFile(path, fq_threadpool_); +#endif +} + +std::string FileSnapshotStorage::LoadPath(const std::string_view& dir, + const std::string_view& dbfilename) { + if (dbfilename.empty()) + return ""; + + fs::path data_folder; + if (dir.empty()) { + data_folder = fs::current_path(); + } else { + std::error_code file_ec; + data_folder = fs::canonical(dir, file_ec); + if (file_ec) { + LOG(ERROR) << "Data directory error: " << file_ec.message() << " for dir " << dir; + return ""; + } + } + + LOG(INFO) << "Data directory is " << data_folder; + + fs::path fl_path = data_folder.append(dbfilename); + if (fs::exists(fl_path)) + return fl_path.generic_string(); + + SubstituteFilenameTsPlaceholder(&fl_path, "*"); + if (!fl_path.has_extension()) { + fl_path += "*"; + } + io::Result short_vec = io::StatFiles(fl_path.generic_string()); + + if (short_vec) { + // io::StatFiles returns a list of sorted files. Because our timestamp format has the same + // time order and lexicographic order we iterate from the end to find the latest snapshot. + auto it = std::find_if(short_vec->rbegin(), short_vec->rend(), [](const auto& stat) { + return absl::EndsWith(stat.name, ".rdb") || absl::EndsWith(stat.name, "summary.dfs"); + }); + if (it != short_vec->rend()) + return it->name; + } else { + LOG(WARNING) << "Could not stat " << fl_path << ", error " << short_vec.error().message(); + } + return ""; +} + +io::Result> FileSnapshotStorage::LoadPaths(const std::string& load_path) { + if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) { + LOG(ERROR) << "Bad filename extension \"" << load_path << "\""; + return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument)); + } + + std::vector paths{{load_path}}; + + // Collect all other files in case we're loading dfs. + if (absl::EndsWith(load_path, "summary.dfs")) { + std::string glob = absl::StrReplaceAll(load_path, {{"summary", "????"}}); + io::Result files = io::StatFiles(glob); + + if (files && files->size() == 0) { + LOG(ERROR) << "Cound not find DFS snapshot shard files"; + return nonstd::make_unexpected(std::make_error_code(std::errc::no_such_file_or_directory)); + } + + for (auto& fstat : *files) { + paths.push_back(std::move(fstat.name)); + } + } + + // Check all paths are valid. + for (const auto& path : paths) { + std::error_code ec; + (void)fs::canonical(path, ec); + if (ec) { + LOG(ERROR) << "Error loading " << load_path << " " << ec.message(); + return nonstd::make_unexpected(ec); + } + } + + return paths; +} + +AwsS3SnapshotStorage::AwsS3SnapshotStorage(util::cloud::AWS* aws) : aws_{aws} { +} + +io::Result, GenericError> AwsS3SnapshotStorage::OpenWriteFile( + const std::string& path) { + DCHECK(aws_); + + std::optional> bucket_path = GetBucketPath(path); + if (!bucket_path) { + return nonstd::make_unexpected(GenericError("Invalid S3 path")); + } + auto [bucket_name, obj_path] = *bucket_path; + + util::cloud::S3Bucket bucket(*aws_, bucket_name); + std::error_code ec = bucket.Connect(kBucketConnectMs); + if (ec) { + return nonstd::make_unexpected(GenericError(ec, "Couldn't connect to S3 bucket")); + } + auto res = bucket.OpenWriteFile(obj_path); + if (!res) { + return nonstd::make_unexpected(GenericError(res.error(), "Couldn't open file for writing")); + } + + return std::pair(*res, FileType::CLOUD); +} + +io::ReadonlyFileOrError AwsS3SnapshotStorage::OpenReadFile(const std::string& path) { + return nonstd::make_unexpected(std::make_error_code(std::errc::not_supported)); +} + +std::string AwsS3SnapshotStorage::LoadPath(const std::string_view& dir, + const std::string_view& dbfilename) { + LOG(WARNING) << "Loading snapshots from S3 is not supported"; + return ""; +} + +io::Result> AwsS3SnapshotStorage::LoadPaths(const std::string& load_path) { + LOG(WARNING) << "Loading snapshots from S3 is not supported"; + return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument)); +} + +io::Result LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) { + io::Result res = lf_->WriteSome(v, len, offset_, 0); + if (res) { + offset_ += *res; + } + + return res; +} + +void SubstituteFilenameTsPlaceholder(fs::path* filename, std::string_view replacement) { + *filename = absl::StrReplaceAll(filename->string(), {{"{timestamp}", replacement}}); +} + +} // namespace detail +} // namespace dfly diff --git a/src/server/detail/snapshot_storage.h b/src/server/detail/snapshot_storage.h new file mode 100644 index 000000000000..9edb5a43e893 --- /dev/null +++ b/src/server/detail/snapshot_storage.h @@ -0,0 +1,108 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. + +#pragma once + +#include +#include +#include +#include + +#include "io/io.h" +#include "server/common.h" +#include "util/cloud/aws.h" +#include "util/fibers/fiberqueue_threadpool.h" +#include "util/uring/uring_file.h" + +namespace dfly { +namespace detail { + +namespace fs = std::filesystem; + +constexpr std::string_view kS3Prefix = "s3://"; + +const size_t kBucketConnectMs = 2000; + +enum FileType : uint8_t { + FILE = (1u << 0), + CLOUD = (1u << 1), + IO_URING = (1u << 2), + DIRECT = (1u << 3), +}; + +class SnapshotStorage { + public: + virtual ~SnapshotStorage() = default; + + // Opens the file at the given path, and returns the open file and file + // type, which is a bitmask of FileType. + virtual io::Result, GenericError> OpenWriteFile( + const std::string& path) = 0; + + virtual io::ReadonlyFileOrError OpenReadFile(const std::string& path) = 0; + + // Returns the path of the RDB file or DFS summary file to load. + virtual std::string LoadPath(const std::string_view& dir, const std::string_view& dbfilename) = 0; + + // Returns the snapshot paths given the RDB file or DFS summary file path. + virtual io::Result> LoadPaths(const std::string& load_path) = 0; +}; + +class FileSnapshotStorage : public SnapshotStorage { + public: + FileSnapshotStorage(FiberQueueThreadPool* fq_threadpool); + + io::Result, GenericError> OpenWriteFile( + const std::string& path) override; + + io::ReadonlyFileOrError OpenReadFile(const std::string& path) override; + + std::string LoadPath(const std::string_view& dir, const std::string_view& dbfilename) override; + + io::Result> LoadPaths(const std::string& load_path) override; + + private: + util::fb2::FiberQueueThreadPool* fq_threadpool_; +}; + +class AwsS3SnapshotStorage : public SnapshotStorage { + public: + AwsS3SnapshotStorage(util::cloud::AWS* aws); + + io::Result, GenericError> OpenWriteFile( + const std::string& path) override; + + io::ReadonlyFileOrError OpenReadFile(const std::string& path) override; + + std::string LoadPath(const std::string_view& dir, const std::string_view& dbfilename) override; + + io::Result> LoadPaths(const std::string& load_path) override; + + private: + util::cloud::AWS* aws_; +}; + +// Returns bucket_name, obj_path for an s3 path. +std::optional> GetBucketPath(std::string_view path); + +// takes ownership over the file. +class LinuxWriteWrapper : public io::Sink { + public: + LinuxWriteWrapper(util::fb2::LinuxFile* lf) : lf_(lf) { + } + + io::Result WriteSome(const iovec* v, uint32_t len) final; + + std::error_code Close() { + return lf_->Close(); + } + + private: + std::unique_ptr lf_; + off_t offset_ = 0; +}; + +void SubstituteFilenameTsPlaceholder(fs::path* filename, std::string_view replacement); + +} // namespace detail +} // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 52ad9d470074..85a5f33c53f1 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -441,7 +441,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector(nullptr); } - string load_path = detail::InferLoadFile(flag_dir, aws_.get()); + string load_path = snapshot_storage_->LoadPath(flag_dir, GetFlag(FLAGS_dbfilename)); if (!load_path.empty()) { load_result_ = Load(load_path); } @@ -499,42 +499,14 @@ struct AggregateLoadResult { // It starts one more fiber that waits for all load fibers to finish and returns the first // error (if any occured) with a future. Future ServerFamily::Load(const std::string& load_path) { - if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) { - LOG(ERROR) << "Bad filename extension \"" << load_path << "\""; + io::Result> paths_result = snapshot_storage_->LoadPaths(load_path); + if (!paths_result) { Promise ec_promise; - ec_promise.set_value(make_error_code(errc::invalid_argument)); + ec_promise.set_value(paths_result.error()); return ec_promise.get_future(); } - vector paths{{load_path}}; - - // Collect all other files in case we're loading dfs. - if (absl::EndsWith(load_path, "summary.dfs")) { - std::string glob = absl::StrReplaceAll(load_path, {{"summary", "????"}}); - io::Result files = io::StatFiles(glob); - - if (files && files->size() == 0) { - Promise ec_promise; - ec_promise.set_value(make_error_code(errc::no_such_file_or_directory)); - return ec_promise.get_future(); - } - - for (auto& fstat : *files) { - paths.push_back(std::move(fstat.name)); - } - } - - // Check all paths are valid. - for (const auto& path : paths) { - error_code ec; - (void)fs::canonical(path, ec); - if (ec) { - LOG(ERROR) << "Error loading " << load_path << " " << ec.message(); - Promise ec_promise; - ec_promise.set_value(ec); - return ec_promise.get_future(); - } - } + std::vector paths = *paths_result; LOG(INFO) << "Loading " << load_path; @@ -622,18 +594,7 @@ void ServerFamily::SnapshotScheduling() { io::Result ServerFamily::LoadRdb(const std::string& rdb_file) { error_code ec; - io::ReadonlyFileOrError res; - -#ifdef __linux__ - if (fq_threadpool_) { - res = util::OpenFiberReadFile(rdb_file, fq_threadpool_.get()); - } else { - res = OpenRead(rdb_file); - } -#else - res = util::OpenFiberReadFile(rdb_file, fq_threadpool_.get()); -#endif - + io::ReadonlyFileOrError res = snapshot_storage_->OpenReadFile(rdb_file); if (res) { io::FileSource fs(*res);