From f7f6bbef993e12524d61c3b6254b0103884042c7 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 2 Jun 2025 10:11:01 +0000 Subject: [PATCH 01/15] Throw clearer errors on (known) snapshot invalidities --- python/src/ccf/ledger.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/python/src/ccf/ledger.py b/python/src/ccf/ledger.py index 569ec55ca208..a8df90758a09 100644 --- a/python/src/ccf/ledger.py +++ b/python/src/ccf/ledger.py @@ -817,6 +817,9 @@ def __init__(self, filename: str): self._filename = filename self._file_size = os.path.getsize(filename) + if self._file_size == 0: + raise InvalidSnapshotException(f"{filename} is currently empty") + entry_start_pos = super()._read_header() # 1.x snapshots do not include evidence @@ -824,7 +827,13 @@ def __init__(self, filename: str): receipt_pos = entry_start_pos + self._header.size receipt_bytes = _peek_all(self._file, pos=receipt_pos) - receipt = json.loads(receipt_bytes.decode("utf-8")) + try: + receipt = json.loads(receipt_bytes.decode("utf-8")) + except: + raise InvalidSnapshotException( + f"Cannot read receipt from snapshot {os.path.basename(self._filename)}: Receipt starts at {receipt_pos} (file is {self._file_size} bytes), and contains {receipt_bytes}" + ) + # Receipts included in snapshots always contain leaf components, # including a claims digest and commit evidence, from 2.0.0-rc0 onwards. # This verification code deliberately does not support snapshots @@ -1178,3 +1187,7 @@ class UntrustedNodeException(Exception): class UnknownTransaction(Exception): """The transaction at seqno does not exist in ledger""" + + +class InvalidSnapshotException(Exception): + """The given snapshot file has size 0""" From fa539a75eb073b587e58bb9a928ff57bc6daee1c Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 2 Jun 2025 10:11:44 +0000 Subject: [PATCH 02/15] Repro --- tests/e2e_operations.py | 57 +++++++++++++++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index 90a0017af2ae..7324f6635cbf 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -30,6 +30,7 @@ from pycose.messages import Sign1Message import sys import pathlib +import infra.concurrency from loguru import logger as LOG @@ -57,14 +58,54 @@ def test_save_committed_ledger_files(network, args): def test_parse_snapshot_file(network, args): - primary, _ = network.find_primary() - network.txs.issue(network, number_txs=args.snapshot_tx_interval * 2) - committed_snapshots_dir = network.get_committed_snapshots(primary) - for snapshot in os.listdir(committed_snapshots_dir): - with ccf.ledger.Snapshot(os.path.join(committed_snapshots_dir, snapshot)) as s: - assert len( - s.get_public_domain().get_tables() - ), "No public table in snapshot" + class ReaderThread(infra.concurrency.StoppableThread): + def __init__(self, network): + super().__init__(name="reader") + primary, _ = network.find_primary() + self.snapshots_dir = os.path.join( + primary.remote.remote.root, + primary.remote.snapshots_dir_name, + ) + + def run(self): + seen = set() + while not self.is_stopped(): + for snapshot in os.listdir(self.snapshots_dir): + if snapshot not in seen: + seen.add(snapshot) + with ccf.ledger.Snapshot( + os.path.join(self.snapshots_dir, snapshot) + ) as s: + assert len( + s.get_public_domain().get_tables() + ), "No public table in snapshot" + LOG.success(f"Successfully parsed snapshot: {snapshot}") + + class WriterThread(infra.concurrency.StoppableThread): + def __init__(self, network, reader): + super().__init__(name="writer") + self.primary, _ = network.find_primary() + self.member = network.consortium.get_any_active_member() + self.reader = reader + + def run(self): + while not self.is_stopped() and self.reader.is_alive(): + self.member.update_ack_state_digest(self.primary) + + reader_thread = ReaderThread(network) + reader_thread.start() + + writer_thread = WriterThread(network, reader_thread) + writer_thread.start() + + time.sleep(5) + + writer_thread.stop() + writer_thread.join() + + reader_thread.stop() + reader_thread.join() + return network From 31e6c6f365e5dc47777b1940227c5f557d40b3c2 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 2 Jun 2025 10:34:01 +0000 Subject: [PATCH 03/15] More precise repro --- python/src/ccf/ledger.py | 4 ++++ tests/e2e_operations.py | 9 ++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/python/src/ccf/ledger.py b/python/src/ccf/ledger.py index a8df90758a09..e15b9dd690b2 100644 --- a/python/src/ccf/ledger.py +++ b/python/src/ccf/ledger.py @@ -86,6 +86,10 @@ def is_ledger_chunk_committed(file_name): return file_name.endswith(COMMITTED_FILE_SUFFIX) +def is_snapshot_file_committed(file_name): + return file_name.endswith(COMMITTED_FILE_SUFFIX) + + def digest(data): return sha256(data).digest() diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index 7324f6635cbf..d417512cd891 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -71,7 +71,10 @@ def run(self): seen = set() while not self.is_stopped(): for snapshot in os.listdir(self.snapshots_dir): - if snapshot not in seen: + if ( + ccf.ledger.is_snapshot_file_committed(snapshot) + and snapshot not in seen + ): seen.add(snapshot) with ccf.ledger.Snapshot( os.path.join(self.snapshots_dir, snapshot) @@ -80,6 +83,7 @@ def run(self): s.get_public_domain().get_tables() ), "No public table in snapshot" LOG.success(f"Successfully parsed snapshot: {snapshot}") + LOG.info(f"Tested {len(seen)} snapshots") class WriterThread(infra.concurrency.StoppableThread): def __init__(self, network, reader): @@ -98,6 +102,9 @@ def run(self): writer_thread = WriterThread(network, reader_thread) writer_thread.start() + # When this test was added, the original failure was occurring 100% of the time within 0.5s. + # This fix has been manually verified across multi-minute runs. + # 5s is a plausible run-time in the CI, that should still provide convincing coverage. time.sleep(5) writer_thread.stop() From 990269c93e8bc9d75495986e25ab16f9d9b66d00 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 2 Jun 2025 10:36:41 +0000 Subject: [PATCH 04/15] Fix: write to temporary file and then rename --- src/snapshots/snapshot_manager.h | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/snapshots/snapshot_manager.h b/src/snapshots/snapshot_manager.h index dfaa1d15a42d..648f6af48f0a 100644 --- a/src/snapshots/snapshot_manager.h +++ b/src/snapshots/snapshot_manager.h @@ -96,15 +96,14 @@ namespace snapshots { if (snapshot_idx == it->first) { - // e.g. snapshot_100_105.committed + // e.g. snapshot_100_105 auto file_name = fmt::format( - "{}{}{}{}{}{}", + "{}{}{}{}{}", snapshot_file_prefix, snapshot_idx_delimiter, it->first, snapshot_idx_delimiter, - it->second.evidence_idx, - snapshot_committed_suffix); + it->second.evidence_idx); auto full_snapshot_path = snapshot_dir / file_name; if (fs::exists(full_snapshot_path)) @@ -136,6 +135,18 @@ namespace snapshots "New snapshot file written to {} [{} bytes]", file_name, static_cast(snapshot_file.tellp())); + + // e.g. snapshot_100_105.committed + const auto committed_file_name = + fmt::format("{}{}", file_name, snapshot_committed_suffix); + const auto full_committed_path = + snapshot_dir / committed_file_name; + + files::rename(full_snapshot_path, full_committed_path); + LOG_INFO_FMT( + "Renamed temporary snapshot {} to committed {}", + file_name, + committed_file_name); } } From a2e7c2e6d34a62f0732eefd17459c8f32824bf7f Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 2 Jun 2025 10:44:52 +0000 Subject: [PATCH 05/15] Add CHANGELOG entry --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 857e6fc633e3..919d0d92d812 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. [6.0.4]: https://github.com/microsoft/CCF/releases/tag/ccf-6.0.4 +### Fixed + +- CCF will no longer create in-progress snapshot files with a `.committed` suffix. It will only rename files to `.committed` when they are complete and ready for reading (#7029). + ### Changed - Templated URL parsing will no longer allow `:` within regex matched components, since `:` is already used to delimit actions. Concretely, a call to `GET .../state-digests/abcd:update` should now correctly return a 404, rather than dispatching to `GET .../state-digests/{memberId}` and returning `No ACK record exists for member m[abcd:update]`. From 45a7d5a6949735942a48b17788a9a3bbceefbcd7 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 2 Jun 2025 12:24:11 +0000 Subject: [PATCH 06/15] Lint --- python/src/ccf/ledger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/src/ccf/ledger.py b/python/src/ccf/ledger.py index e15b9dd690b2..8033701286a1 100644 --- a/python/src/ccf/ledger.py +++ b/python/src/ccf/ledger.py @@ -833,10 +833,10 @@ def __init__(self, filename: str): try: receipt = json.loads(receipt_bytes.decode("utf-8")) - except: + except json.decoder.JSONDecodeError as e: raise InvalidSnapshotException( f"Cannot read receipt from snapshot {os.path.basename(self._filename)}: Receipt starts at {receipt_pos} (file is {self._file_size} bytes), and contains {receipt_bytes}" - ) + ) from e # Receipts included in snapshots always contain leaf components, # including a claims digest and commit evidence, from 2.0.0-rc0 onwards. From c6de7eb7e9b21b50d8f0cd0109ff284c278c4f08 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 2 Jun 2025 13:19:07 +0000 Subject: [PATCH 07/15] Add a flush, update docstring --- python/src/ccf/ledger.py | 2 +- src/snapshots/snapshot_manager.h | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/python/src/ccf/ledger.py b/python/src/ccf/ledger.py index 8033701286a1..f4482f42323f 100644 --- a/python/src/ccf/ledger.py +++ b/python/src/ccf/ledger.py @@ -1194,4 +1194,4 @@ class UnknownTransaction(Exception): class InvalidSnapshotException(Exception): - """The given snapshot file has size 0""" + """The given snapshot file is invalid and cannot be parsed""" diff --git a/src/snapshots/snapshot_manager.h b/src/snapshots/snapshot_manager.h index 648f6af48f0a..3f18b01cc912 100644 --- a/src/snapshots/snapshot_manager.h +++ b/src/snapshots/snapshot_manager.h @@ -131,6 +131,7 @@ namespace snapshots snapshot_file.write( reinterpret_cast(receipt_data), receipt_size); + snapshot_file.flush(); LOG_INFO_FMT( "New snapshot file written to {} [{} bytes]", file_name, From dee9d7f12f8d549603b9c37b6dd53abf9b56f134 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 2 Jun 2025 14:01:24 +0000 Subject: [PATCH 08/15] Close is stronger than flush --- src/snapshots/snapshot_manager.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/snapshots/snapshot_manager.h b/src/snapshots/snapshot_manager.h index 3f18b01cc912..2e5310ec147a 100644 --- a/src/snapshots/snapshot_manager.h +++ b/src/snapshots/snapshot_manager.h @@ -131,7 +131,7 @@ namespace snapshots snapshot_file.write( reinterpret_cast(receipt_data), receipt_size); - snapshot_file.flush(); + snapshot_file.close(); LOG_INFO_FMT( "New snapshot file written to {} [{} bytes]", file_name, From cb91aaf661df67ff7c48c45a12878f178cffa4cf Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 2 Jun 2025 15:05:10 +0000 Subject: [PATCH 09/15] Switch to raw file descriptors, so we can fsync --- src/snapshots/snapshot_manager.h | 89 +++++++++++++++++++------------- 1 file changed, 53 insertions(+), 36 deletions(-) diff --git a/src/snapshots/snapshot_manager.h b/src/snapshots/snapshot_manager.h index 2e5310ec147a..c520cb4e5789 100644 --- a/src/snapshots/snapshot_manager.h +++ b/src/snapshots/snapshot_manager.h @@ -106,50 +106,67 @@ namespace snapshots it->second.evidence_idx); auto full_snapshot_path = snapshot_dir / file_name; - if (fs::exists(full_snapshot_path)) + int snapshot_fd = open( + full_snapshot_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0664); + if (snapshot_fd == -1) { - // In the case that a file with this name already exists, keep - // existing file and drop pending snapshot - LOG_FAIL_FMT( - "Cannot write snapshot as file already exists: {}", file_name); - } - else - { - std::ofstream snapshot_file( - full_snapshot_path, std::ios::app | std::ios::binary); - if (!snapshot_file.good()) + if (errno == EEXIST) { + // In the case that a file with this name already exists, keep + // existing file and drop pending snapshot LOG_FAIL_FMT( - "Cannot write snapshot: error opening file {}", file_name); + "Cannot write snapshot as file already exists: {}", + file_name); } else { - const auto& snapshot = it->second.snapshot; - snapshot_file.write( - reinterpret_cast(snapshot->data()), - snapshot->size()); - snapshot_file.write( - reinterpret_cast(receipt_data), receipt_size); - - snapshot_file.close(); - LOG_INFO_FMT( - "New snapshot file written to {} [{} bytes]", - file_name, - static_cast(snapshot_file.tellp())); - - // e.g. snapshot_100_105.committed - const auto committed_file_name = - fmt::format("{}{}", file_name, snapshot_committed_suffix); - const auto full_committed_path = - snapshot_dir / committed_file_name; - - files::rename(full_snapshot_path, full_committed_path); - LOG_INFO_FMT( - "Renamed temporary snapshot {} to committed {}", - file_name, - committed_file_name); + LOG_FAIL_FMT( + "Cannot write snapshot: error ({}) opening file {}", + errno, + file_name); } } + else + { + const auto& snapshot = it->second.snapshot; + +#define THROW_ON_ERROR(x) \ + do \ + { \ + auto rc = x; \ + if (rc == -1) \ + { \ + throw std::runtime_error(fmt::format( \ + "Error ({}) writing snapshot {} in " #x, errno, file_name)); \ + } \ + } while (0) + + THROW_ON_ERROR( + write(snapshot_fd, snapshot->data(), snapshot->size())); + THROW_ON_ERROR(write(snapshot_fd, receipt_data, receipt_size)); + + THROW_ON_ERROR(fsync(snapshot_fd)); + THROW_ON_ERROR(close(snapshot_fd)); + +#undef THROW_ON_ERROR + + LOG_INFO_FMT( + "New snapshot file written to {} [{} bytes]", + file_name, + snapshot->size() + receipt_size); + + // e.g. snapshot_100_105.committed + const auto committed_file_name = + fmt::format("{}{}", file_name, snapshot_committed_suffix); + const auto full_committed_path = + snapshot_dir / committed_file_name; + + files::rename(full_snapshot_path, full_committed_path); + LOG_INFO_FMT( + "Renamed temporary snapshot {} to committed {}", + file_name, + committed_file_name); + } pending_snapshots.erase(it); From 2ebb1a31801174c28334a136eab2ef8552b3ea23 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 2 Jun 2025 15:38:10 +0000 Subject: [PATCH 10/15] Assert that at least 1 snapshot was seen --- tests/e2e_operations.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index d417512cd891..bc2ca81c11f2 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -84,6 +84,7 @@ def run(self): ), "No public table in snapshot" LOG.success(f"Successfully parsed snapshot: {snapshot}") LOG.info(f"Tested {len(seen)} snapshots") + assert len(seen) > 0, f"No snapshots seen, so this tested nothing" class WriterThread(infra.concurrency.StoppableThread): def __init__(self, network, reader): From e09553da504ccd17ee9ad7c8edeabc4f371cb168 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 2 Jun 2025 16:21:22 +0000 Subject: [PATCH 11/15] fsync files and dirs around renames --- src/host/ledger.h | 40 +++++++++++++++++++++++++++++++- src/snapshots/snapshot_manager.h | 39 +++++++++++++++++++++---------- 2 files changed, 66 insertions(+), 13 deletions(-) diff --git a/src/host/ledger.h b/src/host/ledger.h index dd3114666c51..3b1ef854374b 100644 --- a/src/host/ledger.h +++ b/src/host/ledger.h @@ -155,6 +155,7 @@ namespace asynchost // This uses C stdio instead of fstream because an fstream // cannot be truncated. + FILE* parent_dir = nullptr; FILE* file = nullptr; ccf::pal::Mutex file_lock; @@ -187,6 +188,13 @@ namespace asynchost fmt::format("{}.{}", file_name.string(), ledger_recovery_file_suffix); } + parent_dir = fopen(dir.c_str(), "r"); + if (!parent_dir) + { + throw std::logic_error(fmt::format( + "Unable to open ledger directory {}: {}", dir, strerror(errno))); + } + auto file_path = dir / file_name; if (fs::exists(file_path)) { @@ -223,10 +231,16 @@ namespace asynchost committed = is_ledger_file_name_committed(file_name); start_idx = get_start_idx_from_file_name(file_name); + parent_dir = fopen(dir.c_str(), "r"); + if (!parent_dir) + { + throw std::logic_error(fmt::format( + "Unable to open ledger directory {}: {}", dir, strerror(errno))); + } + const auto mode = committed ? "rb" : "r+b"; file = fopen(file_path.c_str(), mode); - if (!file) { throw std::logic_error(fmt::format( @@ -350,6 +364,11 @@ namespace asynchost ~LedgerFile() { + if (parent_dir) + { + fclose(parent_dir); + } + if (file) { fclose(file); @@ -630,6 +649,19 @@ namespace asynchost fmt::format("Failed to flush ledger file: {}", strerror(errno))); } + if (fsync(fileno(file)) != 0) + { + throw std::logic_error(fmt::format( + "Failed to sync completed ledger file: {}", strerror(errno))); + } + + if (fsync(fileno(parent_dir)) != 0) + { + throw std::logic_error(fmt::format( + "Failed to sync ledger directory after completing file: {}", + strerror(errno))); + } + LOG_TRACE_FMT("Completed ledger file {}", file_name); completed = true; @@ -643,6 +675,12 @@ namespace asynchost try { files::rename(file_path, new_file_path); + if (fsync(fileno(parent_dir)) != 0) + { + throw std::logic_error(fmt::format( + "Failed to sync ledger directory after renaming file: {}", + strerror(errno))); + } } catch (const std::exception& e) { diff --git a/src/snapshots/snapshot_manager.h b/src/snapshots/snapshot_manager.h index c520cb4e5789..eb82b15dd599 100644 --- a/src/snapshots/snapshot_manager.h +++ b/src/snapshots/snapshot_manager.h @@ -106,6 +106,26 @@ namespace snapshots it->second.evidence_idx); auto full_snapshot_path = snapshot_dir / file_name; +#define THROW_ON_ERROR(x) \ + do \ + { \ + auto rc = x; \ + if (rc == -1) \ + { \ + throw std::runtime_error(fmt::format( \ + "Error ({}) writing snapshot {} in " #x, errno, file_name)); \ + } \ + } while (0) + + int dir_fd = open(snapshot_dir.c_str(), O_DIRECTORY); + if (dir_fd == -1) + { + throw std::runtime_error(fmt::format( + "Error ({}) opening snapshots directory {}", + errno, + snapshot_dir)); + } + int snapshot_fd = open( full_snapshot_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0664); if (snapshot_fd == -1) @@ -130,17 +150,6 @@ namespace snapshots { const auto& snapshot = it->second.snapshot; -#define THROW_ON_ERROR(x) \ - do \ - { \ - auto rc = x; \ - if (rc == -1) \ - { \ - throw std::runtime_error(fmt::format( \ - "Error ({}) writing snapshot {} in " #x, errno, file_name)); \ - } \ - } while (0) - THROW_ON_ERROR( write(snapshot_fd, snapshot->data(), snapshot->size())); THROW_ON_ERROR(write(snapshot_fd, receipt_data, receipt_size)); @@ -148,7 +157,7 @@ namespace snapshots THROW_ON_ERROR(fsync(snapshot_fd)); THROW_ON_ERROR(close(snapshot_fd)); -#undef THROW_ON_ERROR + THROW_ON_ERROR(fsync(dir_fd)); LOG_INFO_FMT( "New snapshot file written to {} [{} bytes]", @@ -166,8 +175,14 @@ namespace snapshots "Renamed temporary snapshot {} to committed {}", file_name, committed_file_name); + + THROW_ON_ERROR(fsync(dir_fd)); } + THROW_ON_ERROR(close(dir_fd)); + +#undef THROW_ON_ERROR + pending_snapshots.erase(it); return; From 27ef411e9f86c7d9e4ced2c78e9dd10fd99c1e9f Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 3 Jun 2025 11:52:18 +0000 Subject: [PATCH 12/15] Just so its in the git history: strace the primary during snapshot spamming --- tests/e2e_operations.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index bc2ca81c11f2..92f78b6cf425 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -97,6 +97,23 @@ def run(self): while not self.is_stopped() and self.reader.is_alive(): self.member.update_ack_state_digest(self.primary) + # TODO: Strip this before merge + primary, _ = network.find_primary() + strace_command = [ + "strace", + f"--attach={primary.remote.remote.proc.pid}", + "-tt", + "-T", + "--trace=fsync,open,write,rename", + "--decode-fds=all", + "--output=strace_output.txt", + ] + strace_process = subprocess.Popen( + strace_command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + reader_thread = ReaderThread(network) reader_thread.start() @@ -114,6 +131,9 @@ def run(self): reader_thread.stop() reader_thread.join() + strace_process.terminate() + strace_process.communicate() + return network From 4c44d65bcdf57cc10188b58c4cd4c746734e2bf6 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 3 Jun 2025 12:16:58 +0000 Subject: [PATCH 13/15] Revert "Just so its in the git history: strace the primary during snapshot spamming" This reverts commit 27ef411e9f86c7d9e4ced2c78e9dd10fd99c1e9f. --- tests/e2e_operations.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index 92f78b6cf425..bc2ca81c11f2 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -97,23 +97,6 @@ def run(self): while not self.is_stopped() and self.reader.is_alive(): self.member.update_ack_state_digest(self.primary) - # TODO: Strip this before merge - primary, _ = network.find_primary() - strace_command = [ - "strace", - f"--attach={primary.remote.remote.proc.pid}", - "-tt", - "-T", - "--trace=fsync,open,write,rename", - "--decode-fds=all", - "--output=strace_output.txt", - ] - strace_process = subprocess.Popen( - strace_command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - reader_thread = ReaderThread(network) reader_thread.start() @@ -131,9 +114,6 @@ def run(self): reader_thread.stop() reader_thread.join() - strace_process.terminate() - strace_process.communicate() - return network From ae708b9c6021ea77e565f6c333d979fbff45965b Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 3 Jun 2025 15:12:51 +0000 Subject: [PATCH 14/15] Don't hold fd for directory, just open it when needed for fsync --- src/host/ledger.h | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/src/host/ledger.h b/src/host/ledger.h index 3b1ef854374b..f9aece2985a3 100644 --- a/src/host/ledger.h +++ b/src/host/ledger.h @@ -155,7 +155,6 @@ namespace asynchost // This uses C stdio instead of fstream because an fstream // cannot be truncated. - FILE* parent_dir = nullptr; FILE* file = nullptr; ccf::pal::Mutex file_lock; @@ -188,13 +187,6 @@ namespace asynchost fmt::format("{}.{}", file_name.string(), ledger_recovery_file_suffix); } - parent_dir = fopen(dir.c_str(), "r"); - if (!parent_dir) - { - throw std::logic_error(fmt::format( - "Unable to open ledger directory {}: {}", dir, strerror(errno))); - } - auto file_path = dir / file_name; if (fs::exists(file_path)) { @@ -231,13 +223,6 @@ namespace asynchost committed = is_ledger_file_name_committed(file_name); start_idx = get_start_idx_from_file_name(file_name); - parent_dir = fopen(dir.c_str(), "r"); - if (!parent_dir) - { - throw std::logic_error(fmt::format( - "Unable to open ledger directory {}: {}", dir, strerror(errno))); - } - const auto mode = committed ? "rb" : "r+b"; file = fopen(file_path.c_str(), mode); @@ -364,11 +349,6 @@ namespace asynchost ~LedgerFile() { - if (parent_dir) - { - fclose(parent_dir); - } - if (file) { fclose(file); @@ -655,12 +635,19 @@ namespace asynchost "Failed to sync completed ledger file: {}", strerror(errno))); } + auto parent_dir = fopen(dir.c_str(), "r"); + if (!parent_dir) + { + throw std::logic_error(fmt::format( + "Unable to open ledger directory {}: {}", dir, strerror(errno))); + } if (fsync(fileno(parent_dir)) != 0) { throw std::logic_error(fmt::format( "Failed to sync ledger directory after completing file: {}", strerror(errno))); } + fclose(parent_dir); LOG_TRACE_FMT("Completed ledger file {}", file_name); @@ -675,12 +662,20 @@ namespace asynchost try { files::rename(file_path, new_file_path); + + auto parent_dir = fopen(dir.c_str(), "r"); + if (!parent_dir) + { + throw std::logic_error(fmt::format( + "Unable to open ledger directory {}: {}", dir, strerror(errno))); + } if (fsync(fileno(parent_dir)) != 0) { throw std::logic_error(fmt::format( "Failed to sync ledger directory after renaming file: {}", strerror(errno))); } + fclose(parent_dir); } catch (const std::exception& e) { From 083dc2bc2cfc108e990b41bc01624e36dfc2765c Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Fri, 6 Jun 2025 10:29:48 +0000 Subject: [PATCH 15/15] WIP merge? --- src/snapshots/snapshot_manager.h | 36 +++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/src/snapshots/snapshot_manager.h b/src/snapshots/snapshot_manager.h index 73e738ab0a4a..77aeccc31300 100644 --- a/src/snapshots/snapshot_manager.h +++ b/src/snapshots/snapshot_manager.h @@ -110,11 +110,29 @@ namespace snapshots { asynchost::TimeBoundLogger log_if_slow( fmt::format("Committing snapshot - fsync({})", data->tmp_file_name)); - fsync(data->snapshot_fd); + if (fsync(data->snapshot_fd) != 0) + { + throw std::logic_error( + fmt::format("Failed to sync snapshot file: {}", strerror(errno))); + } } close(data->snapshot_fd); + auto parent_dir = fopen(data->dir.c_str(), "r"); + if (parent_dir) + { + asynchost::TimeBoundLogger log_if_slow(fmt::format( + "Committing snapshot - fsync({}) (after file close)", data->dir)); + if (fsync(fileno(parent_dir)) != 0) + { + throw std::logic_error(fmt::format( + "Failed to sync snapshot directory after closing committed " + "snapshot file: {}", + strerror(errno))); + } + } + // e.g. snapshot_100_105.committed data->committed_file_name = fmt::format("{}{}", data->tmp_file_name, snapshot_committed_suffix); @@ -122,6 +140,20 @@ namespace snapshots const auto full_tmp_path = data->dir / data->tmp_file_name; files::rename(full_tmp_path, full_committed_path); + + if (parent_dir) + { + asynchost::TimeBoundLogger log_if_slow(fmt::format( + "Committing snapshot - fsync({}) (after rename)", data->dir)); + if (fsync(fileno(parent_dir)) != 0) + { + throw std::logic_error(fmt::format( + "Failed to sync snapshot directory after renaming file: {}", + strerror(errno))); + } + } + + fclose(parent_dir); } static void on_snapshot_sync_and_rename_complete(uv_work_t* req, int status) @@ -242,8 +274,6 @@ namespace snapshots #endif } - THROW_ON_ERROR(close(dir_fd)); - #undef THROW_ON_ERROR pending_snapshots.erase(it);