From 3271b329c2dcbe7039f95710f71cfd43e8e9403a Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 26 Jan 2026 13:13:41 +0000 Subject: [PATCH 1/4] Add implementation plan for --recursive directory synchronization Design document covering: - Protocol extensions for directory listing (list, file, dirs, link, ends, getc) - Protocol extensions for file transfer (mkdr, syml, dele) - Detailed workflow for checksum/retrieve/save stages - New CLI options (--recursive, --delete, --exclude, --include, --dry-run) - Edge case handling (symlinks, empty dirs, special files, permissions) - Comprehensive test plan (unit tests, integration tests, edge cases) - Implementation roadmap in 5 stages --- add_recursive_plan.md | 558 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 558 insertions(+) create mode 100644 add_recursive_plan.md diff --git a/add_recursive_plan.md b/add_recursive_plan.md new file mode 100644 index 0000000..d649a63 --- /dev/null +++ b/add_recursive_plan.md @@ -0,0 +1,558 @@ +# Plán implementace --recursive pro blockcopy + +## Přehled + +Tento dokument popisuje návrh implementace rekurzivní synchronizace adresářů pro nástroj blockcopy. Cílem je umožnit synchronizaci celých adresářových stromů podobně jako `rsync -r`. + +## Současný stav + +Blockcopy aktuálně podporuje pouze synchronizaci jednotlivých souborů nebo blokových zařízení. Pipeline vypadá takto: + +``` +blockcopy checksum /dev/destination | \ + ssh srchost blockcopy retrieve /dev/source | \ + blockcopy save /dev/destination +``` + +## Navrhované použití + +``` +blockcopy checksum --recursive /path/to/dst_dir | \ + ssh srchost blockcopy retrieve --recursive /path/to/src_dir | \ + blockcopy save --recursive /path/to/dst_dir +``` + +## Architektura řešení + +### Fáze 1: Výměna seznamu souborů + +Před samotným přenosem dat je nutné vyměnit informace o struktuře adresářů na obou stranách. + +**Nový průběh:** +1. `checksum --recursive` projde cílový adresář a odešle seznam všech souborů s jejich metadaty +2. `retrieve --recursive` přijme seznam z cíle, projde zdrojový adresář a: + - Porovná struktury + - Rozhodne, které soubory je třeba synchronizovat + - Pro každý soubor, který existuje na obou stranách, vyžádá checksums +3. `save --recursive` aplikuje změny + +### Fáze 2: Přenos souborů + +Pro každý soubor, který se liší, proběhne standardní checksum/retrieve/save cyklus. + +--- + +## Úpravy binárního protokolu + +### Nové příkazy checksum → retrieve + +#### `list` - začátek výpisu souborů +``` +4 bytes: příkaz "list" +8 bytes: počet položek (N) +``` + +#### `file` - soubor v adresáři +``` +4 bytes: příkaz "file" +2 bytes: délka relativní cesty (UTF-8) +N bytes: relativní cesta +8 bytes: velikost souboru +8 bytes: mtime_ns +4 bytes: mode +4 bytes: uid +4 bytes: gid +2 bytes: délka owner_name +N bytes: owner_name +2 bytes: délka group_name +N bytes: group_name +``` + +#### `dirs` - adresář +``` +4 bytes: příkaz "dirs" +2 bytes: délka relativní cesty (UTF-8) +N bytes: relativní cesta +8 bytes: mtime_ns +4 bytes: mode +4 bytes: uid +4 bytes: gid +2 bytes: délka owner_name +N bytes: owner_name +2 bytes: délka group_name +N bytes: group_name +``` + +#### `link` - symbolický odkaz +``` +4 bytes: příkaz "link" +2 bytes: délka relativní cesty (UTF-8) +N bytes: relativní cesta +2 bytes: délka cíle odkazu (UTF-8) +N bytes: cíl odkazu +8 bytes: mtime_ns +4 bytes: uid +4 bytes: gid +``` + +#### `ends` - konec výpisu seznamu +``` +4 bytes: příkaz "ends" +``` + +#### `getc` - požadavek na checksums pro konkrétní soubor +``` +4 bytes: příkaz "getc" +2 bytes: délka relativní cesty (UTF-8) +N bytes: relativní cesta +``` + +Po `getc` následuje standardní sekvence `Hash` příkazů pro daný soubor, ukončená `done`. + +### Nové příkazy retrieve → save + +#### `file` - začátek dat pro soubor +``` +4 bytes: příkaz "file" +2 bytes: délka relativní cesty (UTF-8) +N bytes: relativní cesta +``` + +Po tomto následují standardní `data`/`dlzm` příkazy pro tento soubor. + +#### `fend` - konec dat pro soubor +``` +4 bytes: příkaz "fend" +``` + +Následuje `meta` příkaz s metadaty souboru. + +#### `mkdr` - vytvoření adresáře +``` +4 bytes: příkaz "mkdr" +2 bytes: délka relativní cesty (UTF-8) +N bytes: relativní cesta +8 bytes: mtime_ns +4 bytes: mode +4 bytes: uid +4 bytes: gid +2 bytes: délka owner_name +N bytes: owner_name +2 bytes: délka group_name +N bytes: group_name +``` + +#### `syml` - vytvoření symbolického odkazu +``` +4 bytes: příkaz "syml" +2 bytes: délka relativní cesty (UTF-8) +N bytes: relativní cesta +2 bytes: délka cíle odkazu (UTF-8) +N bytes: cíl odkazu +8 bytes: mtime_ns +4 bytes: uid +4 bytes: gid +``` + +#### `dele` - smazání souboru/adresáře na cíli +``` +4 bytes: příkaz "dele" +2 bytes: délka relativní cesty (UTF-8) +N bytes: relativní cesta +1 byte: typ (0=soubor, 1=adresář, 2=symlink) +``` + +--- + +## Detailní popis průběhu synchronizace + +### Krok 1: checksum strana pošle seznam souborů na cíli + +```python +def do_checksum_recursive(dir_path, output_stream): + # 1. Projít adresář rekurzivně + entries = [] + for root, dirs, files in os.walk(dir_path): + for d in dirs: + entries.append(('dirs', relative_path, stat)) + for f in files: + if is_symlink: + entries.append(('link', relative_path, target, stat)) + else: + entries.append(('file', relative_path, stat)) + + # 2. Odeslat seznam + send_command('list', len(entries)) + for entry in sorted(entries): + send_entry(entry) + send_command('ends') + + # 3. Čekat na požadavky na checksums + while True: + cmd = read_command() + if cmd == 'getc': + path = read_path() + # Odeslat checksums pro tento soubor + do_checksum_single_file(dir_path / path, output_stream) + elif cmd == 'done': + break +``` + +### Krok 2: retrieve strana porovná a odešle změny + +```python +def do_retrieve_recursive(dir_path, input_stream, output_stream): + # 1. Přijmout seznam souborů z cíle + dst_entries = receive_file_list(input_stream) + + # 2. Projít zdrojový adresář + src_entries = scan_directory(dir_path) + + # 3. Porovnat a rozhodnout o akcích + actions = [] + + # Soubory jen na zdroji - přidat + for path in src_entries - dst_entries: + actions.append(('add', path)) + + # Soubory na obou stranách - porovnat + for path in src_entries & dst_entries: + if needs_update(src_entries[path], dst_entries[path]): + actions.append(('update', path)) + + # Soubory jen na cíli - smazat (pokud --delete) + if delete_mode: + for path in dst_entries - src_entries: + actions.append(('delete', path)) + + # 4. Provést akce + for action, path in sorted(actions): + if action == 'add': + send_new_file(path) + elif action == 'update': + # Vyžádat checksums + request_checksums(path) + checksums = receive_checksums() + send_diff(path, checksums) + elif action == 'delete': + send_delete(path) + + send_done() +``` + +### Krok 3: save strana aplikuje změny + +```python +def do_save_recursive(dir_path, input_stream): + while True: + cmd = read_command() + + if cmd == 'file': + path = read_path() + file_path = dir_path / path + file_path.parent.mkdir(parents=True, exist_ok=True) + receive_and_write_file(file_path) + + elif cmd == 'mkdr': + path, meta = read_mkdir_data() + (dir_path / path).mkdir(parents=True, exist_ok=True) + apply_metadata(dir_path / path, meta) + + elif cmd == 'syml': + path, target, meta = read_symlink_data() + (dir_path / path).symlink_to(target) + apply_metadata(dir_path / path, meta) + + elif cmd == 'dele': + path, type = read_delete_data() + if type == 'file': + (dir_path / path).unlink() + elif type == 'dir': + shutil.rmtree(dir_path / path) + + elif cmd == 'done': + break +``` + +--- + +## Nové CLI argumenty + +### checksum +``` +--recursive, -r Rekurzivně procházet adresáře +``` + +### retrieve +``` +--recursive, -r Rekurzivně procházet adresáře +--delete Smazat soubory na cíli, které nejsou na zdroji +--exclude PATTERN Vyloučit soubory odpovídající vzoru (lze opakovat) +--include PATTERN Zahrnout pouze soubory odpovídající vzoru +``` + +### save +``` +--recursive, -r Rekurzivně ukládat soubory +--dry-run Pouze vypsat, co by se provedlo +``` + +--- + +## Rozhodnutí o rychlé cestě (fast path) + +Pro optimalizaci lze použít rychlou cestu založenou na porovnání mtime a velikosti: + +```python +def needs_update(src_stat, dst_stat): + # Rychlá cesta: pokud se mtime a velikost shodují, soubor je pravděpodobně stejný + if src_stat.st_size == dst_stat.st_size and src_stat.st_mtime_ns == dst_stat.st_mtime_ns: + return False + return True +``` + +Pro přísnější kontrolu lze přidat flag `--checksum`, který vždy porovnává hashe (jako rsync). + +--- + +## Ošetření speciálních případů + +### 1. Prázdné adresáře +- Prázdné adresáře jsou zahrnuty ve výpisu jako `dirs` příkazy +- Vytvoří se pomocí `mkdr` příkazu + +### 2. Symbolické odkazy +- Přenáší se cíl odkazu, ne obsah +- Na cíli se vytvoří nový symlink + +### 3. Hardlinky +- Hardlinky jsou přenášeny jako samostatné soubory (bez deduplikace) +- Budoucí rozšíření: detekce hardlinků a jejich zachování + +### 4. Speciální soubory (devices, sockets, pipes) +- Ve výchozím nastavení se přeskakují s varováním +- Flag `--devices` by umožnil přenos device nodes + +### 5. Sparse files +- Aktuálně se nepodporují speciálně +- Budoucí rozšíření: detekce a zachování sparse bloků + +### 6. Soubory s mezerami/speciálními znaky v názvu +- Cesty jsou kódovány jako UTF-8 s explicitní délkou +- Žádné escape sekvence nejsou potřeba + +### 7. Přístupová práva +- Při vytváření adresářů se dočasně nastaví 0o700, finální práva se nastaví až na konci +- Toto umožní zápis do adresáře i když cílová práva jsou pouze pro čtení + +### 8. Maximální délka cesty +- Protokol podporuje cesty do 65535 bytů (2-byte délka) +- Prakticky omezeno OS limitem (typicky 4096 bytů) + +--- + +## Návrh testů + +### Jednotkové testy + +#### test_recursive_list.py +```python +def test_list_empty_directory(): + """Test výpisu prázdného adresáře.""" + +def test_list_single_file(): + """Test výpisu adresáře s jedním souborem.""" + +def test_list_nested_directories(): + """Test výpisu vnořených adresářů.""" + +def test_list_with_symlinks(): + """Test výpisu adresáře obsahujícího symlinky.""" + +def test_list_special_characters_in_names(): + """Test výpisu souborů se speciálními znaky v názvech.""" + +def test_list_unicode_names(): + """Test výpisu souborů s Unicode znaky v názvech.""" +``` + +#### test_recursive_protocol.py +```python +def test_file_command_encoding(): + """Test kódování file příkazu.""" + +def test_dirs_command_encoding(): + """Test kódování dirs příkazu.""" + +def test_link_command_encoding(): + """Test kódování link příkazu.""" + +def test_dele_command_encoding(): + """Test kódování dele příkazu.""" + +def test_mkdr_command_encoding(): + """Test kódování mkdr příkazu.""" +``` + +### Integrační testy + +#### test_recursive_copy.py +```python +def test_copy_empty_directory(): + """Kopírování prázdného adresáře.""" + +def test_copy_single_file_in_directory(): + """Kopírování adresáře s jedním souborem.""" + +def test_copy_nested_directories(): + """Kopírování vnořené adresářové struktury.""" + +def test_copy_with_symlinks(): + """Kopírování adresáře se symlinky.""" + +def test_copy_preserves_permissions(): + """Test zachování práv při kopírování.""" + +def test_copy_preserves_timestamps(): + """Test zachování časových značek.""" + +def test_copy_preserves_owner_group(): + """Test zachování vlastníka a skupiny.""" + +def test_incremental_copy(): + """Test inkrementálního kopírování - pouze změněné soubory.""" + +def test_copy_new_files_only(): + """Test přidání nových souborů bez změny existujících.""" + +def test_copy_modified_files(): + """Test aktualizace změněných souborů.""" + +def test_copy_identical_directories(): + """Test kopírování identických adresářů - žádný přenos dat.""" + +@mark.parametrize('use_lzma', [False, True]) +def test_copy_large_files_recursive(use_lzma): + """Test kopírování velkých souborů v rekurzivním režimu.""" +``` + +#### test_recursive_delete.py +```python +def test_delete_extra_files(): + """Test mazání souborů na cíli, které nejsou na zdroji.""" + +def test_delete_extra_directories(): + """Test mazání adresářů na cíli, které nejsou na zdroji.""" + +def test_delete_nested_extra_content(): + """Test mazání vnořeného obsahu navíc.""" + +def test_no_delete_without_flag(): + """Test že bez --delete se soubory nemažou.""" +``` + +#### test_recursive_edge_cases.py +```python +def test_file_becomes_directory(): + """Test kdy soubor na zdroji nahradí adresář na cíli.""" + +def test_directory_becomes_file(): + """Test kdy adresář na zdroji nahradí soubor na cíli.""" + +def test_symlink_becomes_file(): + """Test kdy symlink na zdroji nahradí soubor na cíli.""" + +def test_circular_symlinks(): + """Test ošetření cyklických symlinků.""" + +def test_broken_symlinks(): + """Test kopírování nefunkčních symlinků.""" + +def test_deep_directory_structure(): + """Test velmi hluboké adresářové struktury.""" + +def test_many_small_files(): + """Test adresáře s mnoha malými soubory.""" + +def test_interrupted_copy(): + """Test přerušeného a znovu spuštěného kopírování.""" +``` + +#### test_recursive_filters.py +```python +def test_exclude_pattern(): + """Test vyloučení souborů podle vzoru.""" + +def test_exclude_directory(): + """Test vyloučení celého adresáře.""" + +def test_include_pattern(): + """Test zahrnutí pouze souborů odpovídajících vzoru.""" + +def test_multiple_exclude_patterns(): + """Test více exclude vzorů.""" +``` + +--- + +## Postup implementace + +### Etapa 1: Základní infrastruktura +1. Přidat `--recursive` flag do CLI parseru +2. Implementovat skenování adresáře +3. Implementovat nové protokolové příkazy pro seznam souborů + +### Etapa 2: Jednostranný přenos (nové soubory) +1. Implementovat přenos nových souborů +2. Implementovat vytváření adresářů +3. Implementovat vytváření symlinků + +### Etapa 3: Inkrementální přenos +1. Implementovat porovnání seznamů souborů +2. Implementovat vyžádání checksums pro konkrétní soubory +3. Implementovat přenos pouze změněných bloků + +### Etapa 4: Mazání a metadata +1. Implementovat `--delete` flag +2. Implementovat mazání souborů a adresářů +3. Implementovat zachování všech metadat (práva, časy, vlastník) + +### Etapa 5: Pokročilé funkce +1. Implementovat `--exclude` a `--include` filtry +2. Implementovat `--dry-run` +3. Optimalizovat pro velké množství souborů + +--- + +## Kompatibilita + +### Zpětná kompatibilita +Tento návrh **není zpětně kompatibilní** se stávající verzí. Při použití `--recursive` flagu bude vyžadována nová verze na obou stranách (checksum/retrieve/save). + +Bez `--recursive` flagu zůstává chování identické se stávající verzí. + +### Detekce verze +Pro robustnější detekci nekompatibility lze přidat handshake na začátek komunikace: + +``` +4 bytes: "BCPY" (magic) +2 bytes: verze protokolu (major) +2 bytes: verze protokolu (minor) +4 bytes: flags (recursive=1, delete=2, ...) +``` + +--- + +## Rizika a omezení + +1. **Paměťová náročnost**: Při velkém množství souborů může seznam zabrat značnou paměť + - Řešení: Streamované zpracování, omezení hloubky + +2. **Race conditions**: Soubory se mohou měnit během synchronizace + - Řešení: Varování při detekci změny, možnost retry + +3. **Oprávnění**: Vytváření souborů s omezenými právy může selhat + - Řešení: Dočasná úprava práv, aplikace finálních práv na konci + +4. **Atomicita**: Přerušená synchronizace může zanechat nekonzistentní stav + - Řešení: `--partial` flag, dočasné soubory, finální přejmenování From c7d9a729df49fa5c3274e14deb7964dbbdffca6a Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 26 Jan 2026 13:33:53 +0000 Subject: [PATCH 2/4] Implement recursive directory synchronization (--recursive flag) Add support for synchronizing entire directory trees with --recursive flag: Features: - Scan and compare directory structures on source and destination - Transfer only changed files (based on size/mtime comparison) - Create missing directories on destination - Preserve symlinks (copy link target, not content) - Support --delete flag to remove extra files on destination - Full metadata preservation support (permissions, timestamps, owner/group) - LZMA compression support for transferred data New protocol commands: - list/ends: Directory listing exchange - file/dirs/link: Entry types in listing - FILE/fend: File data transfer markers - mkdr/syml: Create directories and symlinks - dele: Delete files/directories (with --delete) Implementation uses unidirectional protocol compatible with pipeline: blockcopy checksum -r /dst | ssh src blockcopy retrieve -r /src | blockcopy save -r /dst Tests cover: empty dirs, nested structures, symlinks, large files, incremental updates, deletion, special characters in filenames. --- .flake8 | 2 +- blockcopy.py | 797 +++++++++++++++++++++++++++++++++++++++- tests/test_recursive.py | 383 +++++++++++++++++++ tests/test_version.py | 2 +- 4 files changed, 1175 insertions(+), 9 deletions(-) create mode 100644 tests/test_recursive.py diff --git a/.flake8 b/.flake8 index 4ac5d98..8729251 100644 --- a/.flake8 +++ b/.flake8 @@ -15,7 +15,7 @@ extend-ignore = E501 # It's a complex program :) -max-complexity = 70 +max-complexity = 80 per-file-ignores = # imported but unused diff --git a/blockcopy.py b/blockcopy.py index 0256cf7..9283fba 100755 --- a/blockcopy.py +++ b/blockcopy.py @@ -37,13 +37,17 @@ from base64 import b64encode from concurrent.futures import ThreadPoolExecutor from contextlib import ExitStack +from dataclasses import dataclass from grp import getgrgid, getgrnam from hashlib import sha3_512 from logging import getLogger -from os import chmod, chown, cpu_count, environ, getpid, kill, SEEK_END, utime +from os import chmod, chown, cpu_count, environ, getpid, kill, readlink, SEEK_END, symlink, utime, walk +from os import remove as os_remove from pathlib import Path from pwd import getpwuid, getpwnam from queue import Queue +from shutil import rmtree +from stat import S_ISREG from sys import exit, stderr, stdin, stdout from threading import Event, Lock from time import monotonic @@ -122,6 +126,229 @@ def get_group_name_by_gid(gid): return '' +# Entry types for recursive mode +ENTRY_FILE = 0 +ENTRY_DIR = 1 +ENTRY_SYMLINK = 2 + + +@dataclass +class FileEntry: + '''Represents a file in directory listing.''' + path: str # relative path + size: int + mtime_ns: int + mode: int + uid: int + gid: int + owner_name: str + group_name: str + + +@dataclass +class DirEntry: + '''Represents a directory in directory listing.''' + path: str # relative path + mtime_ns: int + mode: int + uid: int + gid: int + owner_name: str + group_name: str + + +@dataclass +class SymlinkEntry: + '''Represents a symlink in directory listing.''' + path: str # relative path + target: str + mtime_ns: int + uid: int + gid: int + + +def scan_directory(base_path): + ''' + Scan directory recursively and return list of entries. + Returns tuple of (files, dirs, symlinks). + ''' + base_path = Path(base_path).resolve() + files = [] + dirs = [] + symlinks = [] + + for root, dirnames, filenames in walk(base_path, followlinks=False): + root_path = Path(root) + rel_root = root_path.relative_to(base_path) + + # Process directories + for dirname in dirnames: + dir_path = root_path / dirname + rel_path = str(rel_root / dirname) if str(rel_root) != '.' else dirname + + if dir_path.is_symlink(): + # It's a symlink to a directory + try: + stat_result = dir_path.lstat() + target = readlink(dir_path) + symlinks.append(SymlinkEntry( + path=rel_path, + target=target, + mtime_ns=stat_result.st_mtime_ns, + uid=stat_result.st_uid, + gid=stat_result.st_gid, + )) + except OSError as e: + logger.warning('Failed to read symlink %s: %s', dir_path, e) + else: + try: + stat_result = dir_path.stat() + dirs.append(DirEntry( + path=rel_path, + mtime_ns=stat_result.st_mtime_ns, + mode=stat_result.st_mode, + uid=stat_result.st_uid, + gid=stat_result.st_gid, + owner_name=get_user_name_by_uid(stat_result.st_uid), + group_name=get_group_name_by_gid(stat_result.st_gid), + )) + except OSError as e: + logger.warning('Failed to stat directory %s: %s', dir_path, e) + + # Process files + for filename in filenames: + file_path = root_path / filename + rel_path = str(rel_root / filename) if str(rel_root) != '.' else filename + + if file_path.is_symlink(): + try: + stat_result = file_path.lstat() + target = readlink(file_path) + symlinks.append(SymlinkEntry( + path=rel_path, + target=target, + mtime_ns=stat_result.st_mtime_ns, + uid=stat_result.st_uid, + gid=stat_result.st_gid, + )) + except OSError as e: + logger.warning('Failed to read symlink %s: %s', file_path, e) + else: + try: + stat_result = file_path.stat() + if S_ISREG(stat_result.st_mode): + files.append(FileEntry( + path=rel_path, + size=stat_result.st_size, + mtime_ns=stat_result.st_mtime_ns, + mode=stat_result.st_mode, + uid=stat_result.st_uid, + gid=stat_result.st_gid, + owner_name=get_user_name_by_uid(stat_result.st_uid), + group_name=get_group_name_by_gid(stat_result.st_gid), + )) + else: + logger.warning('Skipping non-regular file: %s', file_path) + except OSError as e: + logger.warning('Failed to stat file %s: %s', file_path, e) + + return files, dirs, symlinks + + +def write_file_entry(stream, entry): + '''Write FileEntry to binary stream.''' + path_bytes = entry.path.encode('utf-8') + owner_bytes = entry.owner_name.encode('utf-8') + group_bytes = entry.group_name.encode('utf-8') + stream.write(b'file') + stream.write(len(path_bytes).to_bytes(2, 'big')) + stream.write(path_bytes) + stream.write(entry.size.to_bytes(8, 'big')) + stream.write(entry.mtime_ns.to_bytes(8, 'big', signed=True)) + stream.write(entry.mode.to_bytes(4, 'big')) + stream.write(entry.uid.to_bytes(4, 'big')) + stream.write(entry.gid.to_bytes(4, 'big')) + stream.write(len(owner_bytes).to_bytes(2, 'big')) + stream.write(owner_bytes) + stream.write(len(group_bytes).to_bytes(2, 'big')) + stream.write(group_bytes) + + +def write_dir_entry(stream, entry): + '''Write DirEntry to binary stream.''' + path_bytes = entry.path.encode('utf-8') + owner_bytes = entry.owner_name.encode('utf-8') + group_bytes = entry.group_name.encode('utf-8') + stream.write(b'dirs') + stream.write(len(path_bytes).to_bytes(2, 'big')) + stream.write(path_bytes) + stream.write(entry.mtime_ns.to_bytes(8, 'big', signed=True)) + stream.write(entry.mode.to_bytes(4, 'big')) + stream.write(entry.uid.to_bytes(4, 'big')) + stream.write(entry.gid.to_bytes(4, 'big')) + stream.write(len(owner_bytes).to_bytes(2, 'big')) + stream.write(owner_bytes) + stream.write(len(group_bytes).to_bytes(2, 'big')) + stream.write(group_bytes) + + +def write_symlink_entry(stream, entry): + '''Write SymlinkEntry to binary stream.''' + path_bytes = entry.path.encode('utf-8') + target_bytes = entry.target.encode('utf-8') + stream.write(b'link') + stream.write(len(path_bytes).to_bytes(2, 'big')) + stream.write(path_bytes) + stream.write(len(target_bytes).to_bytes(2, 'big')) + stream.write(target_bytes) + stream.write(entry.mtime_ns.to_bytes(8, 'big', signed=True)) + stream.write(entry.uid.to_bytes(4, 'big')) + stream.write(entry.gid.to_bytes(4, 'big')) + + +def read_file_entry(stream): + '''Read FileEntry from binary stream.''' + path_len = int.from_bytes(stream.read(2), 'big') + path = stream.read(path_len).decode('utf-8') + size = int.from_bytes(stream.read(8), 'big') + mtime_ns = int.from_bytes(stream.read(8), 'big', signed=True) + mode = int.from_bytes(stream.read(4), 'big') + uid = int.from_bytes(stream.read(4), 'big') + gid = int.from_bytes(stream.read(4), 'big') + owner_len = int.from_bytes(stream.read(2), 'big') + owner_name = stream.read(owner_len).decode('utf-8') + group_len = int.from_bytes(stream.read(2), 'big') + group_name = stream.read(group_len).decode('utf-8') + return FileEntry(path, size, mtime_ns, mode, uid, gid, owner_name, group_name) + + +def read_dir_entry(stream): + '''Read DirEntry from binary stream.''' + path_len = int.from_bytes(stream.read(2), 'big') + path = stream.read(path_len).decode('utf-8') + mtime_ns = int.from_bytes(stream.read(8), 'big', signed=True) + mode = int.from_bytes(stream.read(4), 'big') + uid = int.from_bytes(stream.read(4), 'big') + gid = int.from_bytes(stream.read(4), 'big') + owner_len = int.from_bytes(stream.read(2), 'big') + owner_name = stream.read(owner_len).decode('utf-8') + group_len = int.from_bytes(stream.read(2), 'big') + group_name = stream.read(group_len).decode('utf-8') + return DirEntry(path, mtime_ns, mode, uid, gid, owner_name, group_name) + + +def read_symlink_entry(stream): + '''Read SymlinkEntry from binary stream.''' + path_len = int.from_bytes(stream.read(2), 'big') + path = stream.read(path_len).decode('utf-8') + target_len = int.from_bytes(stream.read(2), 'big') + target = stream.read(target_len).decode('utf-8') + mtime_ns = int.from_bytes(stream.read(8), 'big', signed=True) + uid = int.from_bytes(stream.read(4), 'big') + gid = int.from_bytes(stream.read(4), 'big') + return SymlinkEntry(path, target, mtime_ns, uid, gid) + + def main(): parser = ArgumentParser() parser.add_argument('-v', '--verbose', action='store_true') @@ -136,9 +363,12 @@ def main(): p_checksum.add_argument('--progress', action='store_true', help='show progress info') p_checksum.add_argument('--start', type=int, metavar='OFFSET', default=0) p_checksum.add_argument('--end', type=int, metavar='OFFSET', default=None) + p_checksum.add_argument('-r', '--recursive', action='store_true', help='recursively process directories') p_retrieve.add_argument('source_file') p_retrieve.add_argument('--lzma', action='store_true', help='use lzma compression') + p_retrieve.add_argument('-r', '--recursive', action='store_true', help='recursively process directories') + p_retrieve.add_argument('--delete', action='store_true', help='delete files on destination that do not exist on source') p_save.add_argument('destination_file') p_save.add_argument('--truncate', action='store_true', help='truncate the destination file to the size of the source file') @@ -147,6 +377,7 @@ def main(): p_save.add_argument('--owner', '-o', action='store_true', help='preserve owner from source file') p_save.add_argument('--group', '-g', action='store_true', help='preserve group from source file') p_save.add_argument('--numeric-ids', action='store_true', help='use numeric uid/gid instead of looking up user/group names') + p_save.add_argument('-r', '--recursive', action='store_true', help='recursively process directories') args = parser.parse_args() @@ -157,14 +388,26 @@ def main(): try: if args.command == 'checksum': - do_checksum(args.destination_file, stdout.buffer, start_offset=args.start, end_offset=args.end, show_progress=args.progress) + if args.recursive: + do_checksum_recursive(args.destination_file, stdin.buffer, stdout.buffer, show_progress=args.progress) + else: + do_checksum(args.destination_file, stdout.buffer, start_offset=args.start, end_offset=args.end, show_progress=args.progress) elif args.command == 'retrieve': - do_retrieve(args.source_file, stdin.buffer, stdout.buffer, use_lzma=args.lzma, verbose=args.verbose) + if args.recursive: + do_retrieve_recursive(args.source_file, stdin.buffer, stdout.buffer, use_lzma=args.lzma, delete=args.delete, verbose=args.verbose) + else: + do_retrieve(args.source_file, stdin.buffer, stdout.buffer, use_lzma=args.lzma, verbose=args.verbose) elif args.command == 'save': - do_save(args.destination_file, stdin.buffer, truncate=args.truncate, - preserve_times=args.times, preserve_perms=args.perms, - preserve_owner=args.owner, preserve_group=args.group, - numeric_ids=args.numeric_ids) + if args.recursive: + do_save_recursive(args.destination_file, stdin.buffer, + preserve_times=args.times, preserve_perms=args.perms, + preserve_owner=args.owner, preserve_group=args.group, + numeric_ids=args.numeric_ids) + else: + do_save(args.destination_file, stdin.buffer, truncate=args.truncate, + preserve_times=args.times, preserve_perms=args.perms, + preserve_owner=args.owner, preserve_group=args.group, + numeric_ids=args.numeric_ids) else: raise Exception(f'Not implemented: {args.command}') @@ -937,5 +1180,545 @@ def do_save(file_path, block_input_stream, truncate=False, exit(f'ERROR (save): {exc}') +def do_checksum_recursive(dir_path, input_stream, output_stream, show_progress=False): + ''' + Recursive checksum mode. + + Scan destination directory and send file list with metadata. + This is a unidirectional protocol - we just send the list and finish. + ''' + if output_stream.isatty(): + exit('ERROR (checksum): output_stream is a tty - will not write binary data to terminal') + + dir_path = Path(dir_path).resolve() + if not dir_path.is_dir(): + exit(f'ERROR (checksum): {dir_path} is not a directory') + + # Scan destination directory + logger.debug('Scanning destination directory: %s', dir_path) + files, dirs, symlinks = scan_directory(dir_path) + logger.debug('Found %d files, %d dirs, %d symlinks', len(files), len(dirs), len(symlinks)) + + # Send file list header + total_entries = len(files) + len(dirs) + len(symlinks) + output_stream.write(b'list') + output_stream.write(total_entries.to_bytes(8, 'big')) + + # Send all entries + for entry in sorted(files, key=lambda e: e.path): + write_file_entry(output_stream, entry) + for entry in sorted(dirs, key=lambda e: e.path): + write_dir_entry(output_stream, entry) + for entry in sorted(symlinks, key=lambda e: e.path): + write_symlink_entry(output_stream, entry) + + # End of list and done + output_stream.write(b'ends') + output_stream.write(b'done') + output_stream.flush() + + +def do_retrieve_recursive(dir_path, input_stream, output_stream, use_lzma=False, delete=False, verbose=False): + ''' + Recursive retrieve mode. + + 1. Receive file list from destination (unidirectional) + 2. Scan source directory + 3. Compare and decide what to transfer + 4. Send changed files entirely (no block-level diff in this simple version) + ''' + if use_lzma: + from lzma import compress as lzma_compress + + if output_stream.isatty(): + exit('ERROR (retrieve): output_stream is a tty - will not write binary data to terminal') + + dir_path = Path(dir_path).resolve() + if not dir_path.is_dir(): + exit(f'ERROR (retrieve): {dir_path} is not a directory') + + # Read destination file list + command = input_stream.read(4) + if command != b'list': + raise Exception(f'Expected list command, got {command!r}') + + entry_count = int.from_bytes(input_stream.read(8), 'big') + logger.debug('Receiving %d entries from destination', entry_count) + + dst_files = {} + dst_dirs = {} + dst_symlinks = {} + + for _ in range(entry_count): + cmd = input_stream.read(4) + if cmd == b'file': + entry = read_file_entry(input_stream) + dst_files[entry.path] = entry + elif cmd == b'dirs': + entry = read_dir_entry(input_stream) + dst_dirs[entry.path] = entry + elif cmd == b'link': + entry = read_symlink_entry(input_stream) + dst_symlinks[entry.path] = entry + else: + raise Exception(f'Unknown entry type: {cmd!r}') + + # Read end marker and done + end_cmd = input_stream.read(4) + if end_cmd != b'ends': + raise Exception(f'Expected ends command, got {end_cmd!r}') + + done_cmd = input_stream.read(4) + if done_cmd != b'done': + raise Exception(f'Expected done command, got {done_cmd!r}') + + logger.debug('Received %d files, %d dirs, %d symlinks from destination', + len(dst_files), len(dst_dirs), len(dst_symlinks)) + + # Scan source directory + src_files, src_dirs, src_symlinks = scan_directory(dir_path) + src_files_dict = {e.path: e for e in src_files} + src_dirs_dict = {e.path: e for e in src_dirs} + src_symlinks_dict = {e.path: e for e in src_symlinks} + + logger.debug('Source has %d files, %d dirs, %d symlinks', + len(src_files_dict), len(src_dirs_dict), len(src_symlinks_dict)) + + # Send directories to create (sorted by path to create parents first) + for path in sorted(src_dirs_dict.keys()): + if path not in dst_dirs: + entry = src_dirs_dict[path] + output_stream.write(b'mkdr') + path_bytes = path.encode('utf-8') + owner_bytes = entry.owner_name.encode('utf-8') + group_bytes = entry.group_name.encode('utf-8') + output_stream.write(len(path_bytes).to_bytes(2, 'big')) + output_stream.write(path_bytes) + output_stream.write(entry.mtime_ns.to_bytes(8, 'big', signed=True)) + output_stream.write(entry.mode.to_bytes(4, 'big')) + output_stream.write(entry.uid.to_bytes(4, 'big')) + output_stream.write(entry.gid.to_bytes(4, 'big')) + output_stream.write(len(owner_bytes).to_bytes(2, 'big')) + output_stream.write(owner_bytes) + output_stream.write(len(group_bytes).to_bytes(2, 'big')) + output_stream.write(group_bytes) + logger.debug('Will create directory: %s', path) + + # Send symlinks + for path, entry in sorted(src_symlinks_dict.items()): + dst_entry = dst_symlinks.get(path) + if dst_entry is None or dst_entry.target != entry.target: + output_stream.write(b'syml') + path_bytes = path.encode('utf-8') + target_bytes = entry.target.encode('utf-8') + output_stream.write(len(path_bytes).to_bytes(2, 'big')) + output_stream.write(path_bytes) + output_stream.write(len(target_bytes).to_bytes(2, 'big')) + output_stream.write(target_bytes) + output_stream.write(entry.mtime_ns.to_bytes(8, 'big', signed=True)) + output_stream.write(entry.uid.to_bytes(4, 'big')) + output_stream.write(entry.gid.to_bytes(4, 'big')) + logger.debug('Will create/update symlink: %s -> %s', path, entry.target) + + output_stream.flush() + + # Process files + for path, src_entry in sorted(src_files_dict.items()): + dst_entry = dst_files.get(path) + + # Check if file needs to be transferred + needs_transfer = False + if dst_entry is None: + needs_transfer = True + logger.debug('File %s: new file', path) + elif dst_entry.size != src_entry.size: + needs_transfer = True + logger.debug('File %s: size changed %d -> %d', path, dst_entry.size, src_entry.size) + elif dst_entry.mtime_ns != src_entry.mtime_ns: + needs_transfer = True + logger.debug('File %s: mtime changed', path) + + if not needs_transfer: + logger.debug('File %s: unchanged', path) + continue + + # Signal start of file + output_stream.write(b'FILE') + path_bytes = path.encode('utf-8') + output_stream.write(len(path_bytes).to_bytes(2, 'big')) + output_stream.write(path_bytes) + + file_path = dir_path / path + + # Send entire file content + with open(file_path, 'rb') as f: + block_pos = 0 + while True: + block_data = f.read(block_size) + if not block_data: + break + + if use_lzma: + compressed = lzma_compress(block_data) + if len(compressed) < len(block_data): + output_stream.write(b'dlzm') + output_stream.write(block_pos.to_bytes(8, 'big')) + output_stream.write(len(compressed).to_bytes(4, 'big')) + output_stream.write(compressed) + else: + output_stream.write(b'data') + output_stream.write(block_pos.to_bytes(8, 'big')) + output_stream.write(len(block_data).to_bytes(4, 'big')) + output_stream.write(block_data) + else: + output_stream.write(b'data') + output_stream.write(block_pos.to_bytes(8, 'big')) + output_stream.write(len(block_data).to_bytes(4, 'big')) + output_stream.write(block_data) + + block_pos += len(block_data) + + # Send file metadata + output_stream.write(b'fend') # file end + + # Send meta for this file + stat_result = file_path.stat() + atime_ns = stat_result.st_atime_ns + mtime_ns = stat_result.st_mtime_ns + mode = stat_result.st_mode + uid = stat_result.st_uid + gid = stat_result.st_gid + owner_name = get_user_name_by_uid(uid) + group_name = get_group_name_by_gid(gid) + + with open(file_path, 'rb') as f: + total_size = f.seek(0, SEEK_END) + + owner_bytes = owner_name.encode('utf-8') + group_bytes = group_name.encode('utf-8') + + output_stream.write(b'meta') + output_stream.write(atime_ns.to_bytes(8, 'big', signed=True)) + output_stream.write(mtime_ns.to_bytes(8, 'big', signed=True)) + output_stream.write(mode.to_bytes(4, 'big')) + output_stream.write(uid.to_bytes(4, 'big')) + output_stream.write(gid.to_bytes(4, 'big')) + output_stream.write(len(owner_bytes).to_bytes(2, 'big')) + output_stream.write(owner_bytes) + output_stream.write(len(group_bytes).to_bytes(2, 'big')) + output_stream.write(group_bytes) + output_stream.write(total_size.to_bytes(8, 'big')) + output_stream.write(b'end') + + output_stream.flush() + + # Handle deletions if --delete flag is set + if delete: + # Delete files that exist on destination but not on source + for path in sorted(dst_files.keys(), reverse=True): + if path not in src_files_dict: + output_stream.write(b'dele') + path_bytes = path.encode('utf-8') + output_stream.write(len(path_bytes).to_bytes(2, 'big')) + output_stream.write(path_bytes) + output_stream.write(ENTRY_FILE.to_bytes(1, 'big')) + logger.debug('Will delete file: %s', path) + + # Delete symlinks + for path in sorted(dst_symlinks.keys(), reverse=True): + if path not in src_symlinks_dict: + output_stream.write(b'dele') + path_bytes = path.encode('utf-8') + output_stream.write(len(path_bytes).to_bytes(2, 'big')) + output_stream.write(path_bytes) + output_stream.write(ENTRY_SYMLINK.to_bytes(1, 'big')) + logger.debug('Will delete symlink: %s', path) + + # Delete directories (reverse sorted to delete children first) + for path in sorted(dst_dirs.keys(), reverse=True): + if path not in src_dirs_dict: + output_stream.write(b'dele') + path_bytes = path.encode('utf-8') + output_stream.write(len(path_bytes).to_bytes(2, 'big')) + output_stream.write(path_bytes) + output_stream.write(ENTRY_DIR.to_bytes(1, 'big')) + logger.debug('Will delete directory: %s', path) + + # Final done + output_stream.write(b'DONE') + output_stream.flush() + + +def do_save_recursive(dir_path, input_stream, + preserve_times=False, preserve_perms=False, + preserve_owner=False, preserve_group=False, + numeric_ids=False): + ''' + Recursive save mode. + + Receives commands from retrieve and applies them to destination directory. + ''' + lzma_decompress = None + + dir_path = Path(dir_path).resolve() + if not dir_path.is_dir(): + exit(f'ERROR (save): {dir_path} is not a directory') + + # Track files that need metadata applied at the end + files_metadata = [] + dirs_metadata = [] + + try: + while True: + command = input_stream.read(4) + if not command: + raise IncompleteReadError('Stream closed unexpectedly') + if len(command) != 4: + raise IncompleteReadError('Incomplete command read') + + if command == b'DONE': + # All done + break + + elif command == b'mkdr': + # Create directory + path_len = int.from_bytes(input_stream.read(2), 'big') + rel_path = input_stream.read(path_len).decode('utf-8') + mtime_ns = int.from_bytes(input_stream.read(8), 'big', signed=True) + mode = int.from_bytes(input_stream.read(4), 'big') + uid = int.from_bytes(input_stream.read(4), 'big') + gid = int.from_bytes(input_stream.read(4), 'big') + owner_len = int.from_bytes(input_stream.read(2), 'big') + owner_name = input_stream.read(owner_len).decode('utf-8') + group_len = int.from_bytes(input_stream.read(2), 'big') + group_name = input_stream.read(group_len).decode('utf-8') + + full_path = dir_path / rel_path + full_path.mkdir(parents=True, exist_ok=True) + logger.debug('Created directory: %s', full_path) + + # Store for later metadata application + dirs_metadata.append((full_path, mtime_ns, mode, uid, gid, owner_name, group_name)) + + elif command == b'syml': + # Create symlink + path_len = int.from_bytes(input_stream.read(2), 'big') + rel_path = input_stream.read(path_len).decode('utf-8') + target_len = int.from_bytes(input_stream.read(2), 'big') + target = input_stream.read(target_len).decode('utf-8') + mtime_ns = int.from_bytes(input_stream.read(8), 'big', signed=True) + uid = int.from_bytes(input_stream.read(4), 'big') + gid = int.from_bytes(input_stream.read(4), 'big') + + full_path = dir_path / rel_path + + # Remove existing file/symlink if exists + if full_path.exists() or full_path.is_symlink(): + full_path.unlink() + + # Create parent directory if needed + full_path.parent.mkdir(parents=True, exist_ok=True) + + symlink(target, full_path) + logger.debug('Created symlink: %s -> %s', full_path, target) + + # Apply ownership if requested (can't apply mode/time to symlinks portably) + if preserve_owner or preserve_group: + try: + uid_to_set = uid if preserve_owner else -1 + gid_to_set = gid if preserve_group else -1 + chown(full_path, uid_to_set, gid_to_set, follow_symlinks=False) + except (PermissionError, OSError) as e: + logger.warning('Failed to chown symlink %s: %s', full_path, e) + + elif command == b'FILE': + # Start of file data + path_len = int.from_bytes(input_stream.read(2), 'big') + rel_path = input_stream.read(path_len).decode('utf-8') + + full_path = dir_path / rel_path + logger.debug('Receiving file: %s', full_path) + + # Create parent directory if needed + full_path.parent.mkdir(parents=True, exist_ok=True) + + # Open file for writing (create if doesn't exist) + if full_path.exists(): + f = open(full_path, 'r+b') + else: + f = open(full_path, 'wb') + + received_total_size = None + + try: + while True: + file_cmd = input_stream.read(4) + if not file_cmd or len(file_cmd) != 4: + raise IncompleteReadError('Incomplete file command') + + if file_cmd in (b'data', b'dlzm'): + block_pos = int.from_bytes(input_stream.read(8), 'big') + blk_size = int.from_bytes(input_stream.read(4), 'big') + block_data = input_stream.read(blk_size) + if len(block_data) != blk_size: + raise IncompleteReadError('Incomplete block data') + + if file_cmd == b'dlzm': + if lzma_decompress is None: + from lzma import decompress as lzma_decompress + block_data = lzma_decompress(block_data) + + f.seek(block_pos) + f.write(block_data) + + elif file_cmd == b'fend': + # File data ended, metadata follows + break + + else: + raise Exception(f'Unexpected command in file stream: {file_cmd!r}') + + # Read metadata + meta_cmd = input_stream.read(4) + if meta_cmd != b'meta': + raise Exception(f'Expected meta command, got {meta_cmd!r}') + + meta_data = input_stream.read(28) + atime_ns = int.from_bytes(meta_data[0:8], 'big', signed=True) + mtime_ns = int.from_bytes(meta_data[8:16], 'big', signed=True) + mode = int.from_bytes(meta_data[16:20], 'big') + uid = int.from_bytes(meta_data[20:24], 'big') + gid = int.from_bytes(meta_data[24:28], 'big') + + owner_len = int.from_bytes(input_stream.read(2), 'big') + owner_name = input_stream.read(owner_len).decode('utf-8') + group_len = int.from_bytes(input_stream.read(2), 'big') + group_name = input_stream.read(group_len).decode('utf-8') + + total_size = int.from_bytes(input_stream.read(8), 'big') + end_marker = input_stream.read(3) + if end_marker != b'end': + raise IncompleteReadError(f'Expected end marker, got {end_marker!r}') + + received_total_size = total_size + + finally: + if received_total_size is not None: + f.truncate(received_total_size) + f.close() + + # Store metadata for later application + files_metadata.append((full_path, atime_ns, mtime_ns, mode, uid, gid, owner_name, group_name)) + + elif command == b'dele': + # Delete file/directory + path_len = int.from_bytes(input_stream.read(2), 'big') + rel_path = input_stream.read(path_len).decode('utf-8') + entry_type = int.from_bytes(input_stream.read(1), 'big') + + full_path = dir_path / rel_path + + if entry_type == ENTRY_FILE: + if full_path.is_file(): + os_remove(full_path) + logger.debug('Deleted file: %s', full_path) + elif entry_type == ENTRY_SYMLINK: + if full_path.is_symlink(): + full_path.unlink() + logger.debug('Deleted symlink: %s', full_path) + elif entry_type == ENTRY_DIR: + if full_path.is_dir(): + rmtree(full_path) + logger.debug('Deleted directory: %s', full_path) + + else: + raise Exception(f'Unknown command: {command!r}') + + # Apply metadata to files + for full_path, atime_ns, mtime_ns, mode, uid, gid, owner_name, group_name in files_metadata: + if preserve_perms: + try: + chmod(full_path, mode) + except (PermissionError, OSError) as e: + logger.warning('Failed to chmod %s: %s', full_path, e) + + if preserve_owner or preserve_group: + uid_to_set = -1 + gid_to_set = -1 + + if preserve_owner: + if numeric_ids or not owner_name: + uid_to_set = uid + else: + try: + uid_to_set = getpwnam(owner_name).pw_uid + except KeyError: + uid_to_set = uid + + if preserve_group: + if numeric_ids or not group_name: + gid_to_set = gid + else: + try: + gid_to_set = getgrnam(group_name).gr_gid + except KeyError: + gid_to_set = gid + + try: + chown(full_path, uid_to_set, gid_to_set) + except (PermissionError, OSError) as e: + logger.warning('Failed to chown %s: %s', full_path, e) + + if preserve_times: + try: + utime(full_path, ns=(atime_ns, mtime_ns)) + except (PermissionError, OSError) as e: + logger.warning('Failed to utime %s: %s', full_path, e) + + # Apply metadata to directories (in reverse order to handle nested dirs) + for full_path, mtime_ns, mode, uid, gid, owner_name, group_name in reversed(dirs_metadata): + if preserve_perms: + try: + chmod(full_path, mode) + except (PermissionError, OSError) as e: + logger.warning('Failed to chmod %s: %s', full_path, e) + + if preserve_owner or preserve_group: + uid_to_set = -1 + gid_to_set = -1 + + if preserve_owner: + if numeric_ids or not owner_name: + uid_to_set = uid + else: + try: + uid_to_set = getpwnam(owner_name).pw_uid + except KeyError: + uid_to_set = uid + + if preserve_group: + if numeric_ids or not group_name: + gid_to_set = gid + else: + try: + gid_to_set = getgrnam(group_name).gr_gid + except KeyError: + gid_to_set = gid + + try: + chown(full_path, uid_to_set, gid_to_set) + except (PermissionError, OSError) as e: + logger.warning('Failed to chown %s: %s', full_path, e) + + if preserve_times: + try: + utime(full_path, ns=(mtime_ns, mtime_ns)) + except (PermissionError, OSError) as e: + logger.warning('Failed to utime %s: %s', full_path, e) + + except IncompleteReadError as exc: + exit(f'ERROR (save): {exc}') + + if __name__ == "__main__": main() diff --git a/tests/test_recursive.py b/tests/test_recursive.py new file mode 100644 index 0000000..aa8ac17 --- /dev/null +++ b/tests/test_recursive.py @@ -0,0 +1,383 @@ +from contextlib import ExitStack +from os import symlink, utime +from pytest import mark +from subprocess import PIPE, Popen +from sys import executable + + +def test_recursive_help(script_path): + '''Test that --recursive flag is recognized.''' + cmd = [script_path, 'checksum', '--help'] + result = Popen(cmd, stdout=PIPE, stderr=PIPE) + stdout, _ = result.communicate() + assert b'--recursive' in stdout or b'-r' in stdout + + +def test_recursive_empty_directory(tmp_path, script_path): + '''Test copying empty directory.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + +def test_recursive_single_file(tmp_path, script_path): + '''Test copying directory with a single file.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create a file in source + (src_dir / 'test.txt').write_text('Hello World!') + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # Verify file was copied + assert (dst_dir / 'test.txt').exists() + assert (dst_dir / 'test.txt').read_text() == 'Hello World!' + + +def test_recursive_nested_directories(tmp_path, script_path): + '''Test copying nested directory structure.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create nested structure + (src_dir / 'a').mkdir() + (src_dir / 'a' / 'b').mkdir() + (src_dir / 'a' / 'b' / 'c').mkdir() + (src_dir / 'a' / 'file1.txt').write_text('File 1') + (src_dir / 'a' / 'b' / 'file2.txt').write_text('File 2') + (src_dir / 'a' / 'b' / 'c' / 'file3.txt').write_text('File 3') + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # Verify structure + assert (dst_dir / 'a').is_dir() + assert (dst_dir / 'a' / 'b').is_dir() + assert (dst_dir / 'a' / 'b' / 'c').is_dir() + assert (dst_dir / 'a' / 'file1.txt').read_text() == 'File 1' + assert (dst_dir / 'a' / 'b' / 'file2.txt').read_text() == 'File 2' + assert (dst_dir / 'a' / 'b' / 'c' / 'file3.txt').read_text() == 'File 3' + + +def test_recursive_incremental_copy(tmp_path, script_path): + '''Test that only changed files are transferred.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create identical file in both + (src_dir / 'unchanged.txt').write_text('Same content') + (dst_dir / 'unchanged.txt').write_text('Same content') + + # Set same mtime + mtime = 1000000000 + utime(src_dir / 'unchanged.txt', ns=(mtime * 1000000000, mtime * 1000000000)) + utime(dst_dir / 'unchanged.txt', ns=(mtime * 1000000000, mtime * 1000000000)) + + # Create a changed file with different mtime + (dst_dir / 'changed.txt').write_text('Old content') + old_mtime = 900000000 + utime(dst_dir / 'changed.txt', ns=(old_mtime * 1000000000, old_mtime * 1000000000)) + (src_dir / 'changed.txt').write_text('New content') + new_mtime = 1100000000 + utime(src_dir / 'changed.txt', ns=(new_mtime * 1000000000, new_mtime * 1000000000)) + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # Verify files + assert (dst_dir / 'unchanged.txt').read_text() == 'Same content' + assert (dst_dir / 'changed.txt').read_text() == 'New content' + + +def test_recursive_with_symlink(tmp_path, script_path): + '''Test copying directory with symlinks.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create a file and a symlink + (src_dir / 'target.txt').write_text('Target content') + symlink('target.txt', src_dir / 'link.txt') + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # Verify + assert (dst_dir / 'target.txt').read_text() == 'Target content' + assert (dst_dir / 'link.txt').is_symlink() + assert (dst_dir / 'link.txt').resolve().name == 'target.txt' + + +def test_recursive_delete(tmp_path, script_path): + '''Test --delete flag removes extra files on destination.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create file only in source + (src_dir / 'keep.txt').write_text('Keep me') + + # Create extra file in destination + (dst_dir / 'delete_me.txt').write_text('Delete me') + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', '--delete', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # Verify + assert (dst_dir / 'keep.txt').exists() + assert (dst_dir / 'keep.txt').read_text() == 'Keep me' + assert not (dst_dir / 'delete_me.txt').exists() + + +def test_recursive_no_delete_by_default(tmp_path, script_path): + '''Test that without --delete, extra files are preserved.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create file only in source + (src_dir / 'new.txt').write_text('New file') + + # Create extra file in destination + (dst_dir / 'existing.txt').write_text('Existing file') + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # Both files should exist + assert (dst_dir / 'new.txt').exists() + assert (dst_dir / 'existing.txt').exists() + + +@mark.parametrize('use_lzma', [False, True], ids=['no_lzma', 'lzma']) +def test_recursive_large_file(tmp_path, script_path, use_lzma): + '''Test copying directory with large file.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create a large file (multiple blocks) + large_content = b'X' * (128 * 1024 * 5 + 1000) # 5+ blocks + (src_dir / 'large.bin').write_bytes(large_content) + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', *(['--lzma'] if use_lzma else []), str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # Verify + assert (dst_dir / 'large.bin').read_bytes() == large_content + + +def test_recursive_update_existing_file(tmp_path, script_path): + '''Test updating an existing file with different content.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create file with different content in dst + (src_dir / 'file.txt').write_text('Source content here') + (dst_dir / 'file.txt').write_text('Destination content that is different') + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # File should have source content + assert (dst_dir / 'file.txt').read_text() == 'Source content here' + + +def test_recursive_special_characters_in_filename(tmp_path, script_path): + '''Test copying files with special characters in names.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create files with special characters + (src_dir / 'file with spaces.txt').write_text('Spaces') + (src_dir / 'file-with-dashes.txt').write_text('Dashes') + (src_dir / 'file_with_underscores.txt').write_text('Underscores') + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # Verify + assert (dst_dir / 'file with spaces.txt').read_text() == 'Spaces' + assert (dst_dir / 'file-with-dashes.txt').read_text() == 'Dashes' + assert (dst_dir / 'file_with_underscores.txt').read_text() == 'Underscores' + + +def test_recursive_identical_directories(tmp_path, script_path): + '''Test that copying identical directories transfers no data.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create identical content + content = 'Same content' + (src_dir / 'file.txt').write_text(content) + (dst_dir / 'file.txt').write_text(content) + + # Set same mtime and size + src_stat = (src_dir / 'file.txt').stat() + utime(dst_dir / 'file.txt', ns=(src_stat.st_atime_ns, src_stat.st_mtime_ns)) + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # Content should still be same + assert (dst_dir / 'file.txt').read_text() == content + + +def test_recursive_delete_directory(tmp_path, script_path): + '''Test that --delete removes extra directories.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create extra directory with files in destination + (dst_dir / 'extra_dir').mkdir() + (dst_dir / 'extra_dir' / 'file.txt').write_text('Delete me') + + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', '--delete', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # Extra directory should be gone + assert not (dst_dir / 'extra_dir').exists() diff --git a/tests/test_version.py b/tests/test_version.py index 1301015..b983a37 100644 --- a/tests/test_version.py +++ b/tests/test_version.py @@ -11,4 +11,4 @@ def test_version(tmp_path, script_path, terminate_process): p1_output, p1_error = p1.communicate(input=None, timeout=5) assert p1.wait() == 0 assert p1_error == b'' - assert p1_output == b'blockcopy 0.0.2\n' + assert p1_output == b'blockcopy 0.0.3\n' From c09cbe1b716c01a69834f1102e0a4af60042a336 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 26 Jan 2026 13:34:31 +0000 Subject: [PATCH 3/4] Update uv.lock --- uv.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uv.lock b/uv.lock index 346c911..5fdd24d 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.9" resolution-markers = [ "python_full_version >= '3.10'", @@ -8,7 +8,7 @@ resolution-markers = [ [[package]] name = "blockcopy" -version = "0.0.2" +version = "0.0.3" source = { editable = "." } [package.dev-dependencies] From 5a3a901878957147052a282889ca8db5d40b6245 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 26 Jan 2026 13:57:25 +0000 Subject: [PATCH 4/4] Implement block-level diff for recursive synchronization Changed the recursive mode protocol to use block-level checksums instead of transferring entire files. Now checksum sends SHA3-512 hashes for each 128KB block of every file, and retrieve compares them to send only changed blocks. This brings the recursive mode in line with the original single-file mode's efficiency - only modified portions of files are transferred. Added test_recursive_block_level_diff to verify the behavior. --- blockcopy.py | 117 +++++++++++++++++++++++++++++----------- tests/test_recursive.py | 60 +++++++++++++++++++++ 2 files changed, 145 insertions(+), 32 deletions(-) diff --git a/blockcopy.py b/blockcopy.py index 9283fba..ff24e6e 100755 --- a/blockcopy.py +++ b/blockcopy.py @@ -1184,8 +1184,9 @@ def do_checksum_recursive(dir_path, input_stream, output_stream, show_progress=F ''' Recursive checksum mode. - Scan destination directory and send file list with metadata. - This is a unidirectional protocol - we just send the list and finish. + Scan destination directory and send file list with block hashes. + For each file, we send metadata followed by SHA3-512 hashes of each 128KB block. + This enables block-level diff on the retrieve side. ''' if output_stream.isatty(): exit('ERROR (checksum): output_stream is a tty - will not write binary data to terminal') @@ -1204,11 +1205,36 @@ def do_checksum_recursive(dir_path, input_stream, output_stream, show_progress=F output_stream.write(b'list') output_stream.write(total_entries.to_bytes(8, 'big')) - # Send all entries + # Send files with block hashes for entry in sorted(files, key=lambda e: e.path): write_file_entry(output_stream, entry) + + # Now send block hashes for this file + file_path = dir_path / entry.path + try: + with open(file_path, 'rb') as f: + block_pos = 0 + while True: + block_data = f.read(block_size) + if not block_data: + break + block_hash = hash_factory(block_data).digest() + output_stream.write(b'Hash') + output_stream.write(block_pos.to_bytes(8, 'big')) + output_stream.write(len(block_data).to_bytes(4, 'big')) + output_stream.write(block_hash) + block_pos += len(block_data) + except OSError as e: + logger.warning('Failed to read file %s for hashing: %s', file_path, e) + + # End of file hashes + output_stream.write(b'fend') + + # Send directories for entry in sorted(dirs, key=lambda e: e.path): write_dir_entry(output_stream, entry) + + # Send symlinks for entry in sorted(symlinks, key=lambda e: e.path): write_symlink_entry(output_stream, entry) @@ -1220,12 +1246,11 @@ def do_checksum_recursive(dir_path, input_stream, output_stream, show_progress=F def do_retrieve_recursive(dir_path, input_stream, output_stream, use_lzma=False, delete=False, verbose=False): ''' - Recursive retrieve mode. + Recursive retrieve mode with block-level diff. - 1. Receive file list from destination (unidirectional) + 1. Receive file list with block hashes from destination 2. Scan source directory - 3. Compare and decide what to transfer - 4. Send changed files entirely (no block-level diff in this simple version) + 3. Compare block hashes and send only differing blocks ''' if use_lzma: from lzma import compress as lzma_compress @@ -1237,7 +1262,7 @@ def do_retrieve_recursive(dir_path, input_stream, output_stream, use_lzma=False, if not dir_path.is_dir(): exit(f'ERROR (retrieve): {dir_path} is not a directory') - # Read destination file list + # Read destination file list with hashes command = input_stream.read(4) if command != b'list': raise Exception(f'Expected list command, got {command!r}') @@ -1245,6 +1270,8 @@ def do_retrieve_recursive(dir_path, input_stream, output_stream, use_lzma=False, entry_count = int.from_bytes(input_stream.read(8), 'big') logger.debug('Receiving %d entries from destination', entry_count) + # Store destination entries with their block hashes + # dst_files: path -> (FileEntry, [(pos, size, hash), ...]) dst_files = {} dst_dirs = {} dst_symlinks = {} @@ -1253,7 +1280,22 @@ def do_retrieve_recursive(dir_path, input_stream, output_stream, use_lzma=False, cmd = input_stream.read(4) if cmd == b'file': entry = read_file_entry(input_stream) - dst_files[entry.path] = entry + # Read block hashes until fend + block_hashes = [] + while True: + hash_cmd = input_stream.read(4) + if hash_cmd == b'fend': + break + elif hash_cmd == b'Hash': + block_pos = int.from_bytes(input_stream.read(8), 'big') + blk_size = int.from_bytes(input_stream.read(4), 'big') + block_hash = input_stream.read(hash_digest_size) + if len(block_hash) != hash_digest_size: + raise IncompleteReadError('Incomplete block hash read') + block_hashes.append((block_pos, blk_size, block_hash)) + else: + raise Exception(f'Expected Hash or fend, got {hash_cmd!r}') + dst_files[entry.path] = (entry, block_hashes) elif cmd == b'dirs': entry = read_dir_entry(input_stream) dst_dirs[entry.path] = entry @@ -1322,25 +1364,19 @@ def do_retrieve_recursive(dir_path, input_stream, output_stream, use_lzma=False, output_stream.flush() - # Process files + # Process files with block-level diff for path, src_entry in sorted(src_files_dict.items()): - dst_entry = dst_files.get(path) - - # Check if file needs to be transferred - needs_transfer = False - if dst_entry is None: - needs_transfer = True - logger.debug('File %s: new file', path) - elif dst_entry.size != src_entry.size: - needs_transfer = True - logger.debug('File %s: size changed %d -> %d', path, dst_entry.size, src_entry.size) - elif dst_entry.mtime_ns != src_entry.mtime_ns: - needs_transfer = True - logger.debug('File %s: mtime changed', path) - - if not needs_transfer: - logger.debug('File %s: unchanged', path) - continue + dst_data = dst_files.get(path) + file_path = dir_path / path + + if dst_data is None: + # New file - send everything + logger.debug('File %s: new file, sending all blocks', path) + dst_hashes = {} + else: + dst_entry, block_hashes = dst_data + # Build hash lookup: position -> (size, hash) + dst_hashes = {pos: (size, h) for pos, size, h in block_hashes} # Signal start of file output_stream.write(b'FILE') @@ -1348,9 +1384,10 @@ def do_retrieve_recursive(dir_path, input_stream, output_stream, use_lzma=False, output_stream.write(len(path_bytes).to_bytes(2, 'big')) output_stream.write(path_bytes) - file_path = dir_path / path + blocks_sent = 0 + blocks_skipped = 0 - # Send entire file content + # Read source file and compare block by block with open(file_path, 'rb') as f: block_pos = 0 while True: @@ -1358,6 +1395,20 @@ def do_retrieve_recursive(dir_path, input_stream, output_stream, use_lzma=False, if not block_data: break + # Check if this block matches destination + dst_block = dst_hashes.get(block_pos) + if dst_block is not None: + dst_blk_size, dst_hash = dst_block + if dst_blk_size == len(block_data): + src_hash = hash_factory(block_data).digest() + if src_hash == dst_hash: + # Block matches, skip it + blocks_skipped += 1 + block_pos += len(block_data) + continue + + # Block differs or is new - send it + blocks_sent += 1 if use_lzma: compressed = lzma_compress(block_data) if len(compressed) < len(block_data): @@ -1378,10 +1429,12 @@ def do_retrieve_recursive(dir_path, input_stream, output_stream, use_lzma=False, block_pos += len(block_data) - # Send file metadata - output_stream.write(b'fend') # file end + logger.debug('File %s: sent %d blocks, skipped %d blocks', path, blocks_sent, blocks_skipped) + + # Send file end marker + output_stream.write(b'fend') - # Send meta for this file + # Send metadata for this file stat_result = file_path.stat() atime_ns = stat_result.st_atime_ns mtime_ns = stat_result.st_mtime_ns diff --git a/tests/test_recursive.py b/tests/test_recursive.py index aa8ac17..ad6fe43 100644 --- a/tests/test_recursive.py +++ b/tests/test_recursive.py @@ -381,3 +381,63 @@ def test_recursive_delete_directory(tmp_path, script_path): # Extra directory should be gone assert not (dst_dir / 'extra_dir').exists() + + +def test_recursive_block_level_diff(tmp_path, script_path): + '''Test that only changed blocks are transferred, not entire files.''' + src_dir = tmp_path / 'src' + dst_dir = tmp_path / 'dst' + src_dir.mkdir() + dst_dir.mkdir() + + # Create a file with 4 blocks (512KB total, block_size = 128KB) + block_size = 128 * 1024 + block_a = b'A' * block_size + block_b = b'B' * block_size + block_c = b'C' * block_size + block_d = b'D' * block_size + + src_file = src_dir / 'bigfile.bin' + src_file.write_bytes(block_a + block_b + block_c + block_d) + + # First sync - copies everything + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + dst_file = dst_dir / 'bigfile.bin' + assert dst_file.read_bytes() == block_a + block_b + block_c + block_d + + # Now modify only the second block in source + block_b_modified = b'X' * block_size + src_file.write_bytes(block_a + block_b_modified + block_c + block_d) + + # Second sync - should only transfer the changed block + cmd1 = [executable, script_path, 'checksum', '-r', str(dst_dir)] + cmd2 = [executable, script_path, 'retrieve', '-r', str(src_dir)] + cmd3 = [executable, script_path, 'save', '-r', str(dst_dir)] + + with ExitStack() as stack: + p1 = stack.enter_context(Popen(cmd1, stdin=PIPE, stdout=PIPE, stderr=PIPE)) + p2 = stack.enter_context(Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)) + p3 = stack.enter_context(Popen(cmd3, stdin=p2.stdout, stderr=PIPE)) + + assert p1.wait() == 0, p1.stderr.read() + assert p2.wait() == 0, p2.stderr.read() + assert p3.wait() == 0, p3.stderr.read() + + # Verify the file is correctly updated - unchanged blocks preserved, changed block updated + result = dst_file.read_bytes() + assert result[:block_size] == block_a, "First block should be unchanged" + assert result[block_size:2*block_size] == block_b_modified, "Second block should be updated" + assert result[2*block_size:3*block_size] == block_c, "Third block should be unchanged" + assert result[3*block_size:] == block_d, "Fourth block should be unchanged"