Skip to content

Commit 1589c7a

Browse files
committed
add support for checksum --progress
1 parent 6297b1c commit 1589c7a

3 files changed

Lines changed: 131 additions & 26 deletions

File tree

.github/workflows/test-from-github.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ jobs:
1414
runs-on: ubuntu-latest
1515

1616
steps:
17-
17+
1818
- name: Set up Python
1919
uses: actions/setup-python@v3
2020
with:
@@ -23,7 +23,7 @@ jobs:
2323
- name: Install package from Github
2424
run: |
2525
python -m pip install https://github.com/messa/blockcopy/archive/refs/heads/main.zip
26-
26+
2727
- name: Run --help
2828
run: |
2929
blockcopy --help
@@ -33,7 +33,7 @@ jobs:
3333
dd if=/dev/urandom bs=1M count=32 of=test_src
3434
dd if=/dev/urandom bs=1M count=32 of=test_dst
3535
sha1sum test_src | tee test_src.sha1
36-
blockcopy checksum test_dst | \
36+
blockcopy checksum --progress test_dst | \
3737
blockcopy retrieve test_src | \
3838
blockcopy save test_dst
3939
cmp test_src test_dst

blockcopy.py

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from queue import Queue
4343
import sys
4444
from threading import Event, Lock
45+
from time import monotonic
4546

4647

4748
logger = getLogger(__name__)
@@ -108,13 +109,14 @@ def main():
108109
p_retrieve = subparsers.add_parser('retrieve')
109110
p_save = subparsers.add_parser('save')
110111

111-
p_checksum.add_argument('file')
112-
p_checksum.add_argument('--start', type=int, default=0)
113-
p_checksum.add_argument('--end', type=int, default=None)
112+
p_checksum.add_argument('destination_file')
113+
p_checksum.add_argument('--progress', action='store_true', help='show progress info')
114+
p_checksum.add_argument('--start', type=int, metavar='OFFSET', default=0)
115+
p_checksum.add_argument('--end', type=int, metavar='OFFSET', default=None)
114116

115-
p_retrieve.add_argument('file')
117+
p_retrieve.add_argument('source_file')
116118

117-
p_save.add_argument('file')
119+
p_save.add_argument('destination_file')
118120

119121
args = parser.parse_args()
120122

@@ -125,11 +127,11 @@ def main():
125127

126128
try:
127129
if args.command == 'checksum':
128-
do_checksum(args.file, sys.stdout.buffer, args.start, args.end)
130+
do_checksum(args.destination_file, sys.stdout.buffer, args.start, args.end, args.progress)
129131
elif args.command == 'retrieve':
130-
do_retrieve(args.file, sys.stdin.buffer, sys.stdout.buffer)
132+
do_retrieve(args.source_file, sys.stdin.buffer, sys.stdout.buffer)
131133
elif args.command == 'save':
132-
do_save(args.file, sys.stdin.buffer)
134+
do_save(args.destination_file, sys.stdin.buffer)
133135
else:
134136
raise Exception(f'Not implemented: {args.command}')
135137

@@ -166,7 +168,7 @@ def ctrl_c_will_terminate_immediately():
166168
signal(SIGTERM, lambda *args: os.kill(os.getpid(), SIGKILL))
167169

168170

169-
def do_checksum(file_path, hash_output_stream, start_offset, end_offset):
171+
def do_checksum(file_path, hash_output_stream, start_offset, end_offset, show_progress):
170172
'''
171173
Read the file in blocks, calculate hash of each block and write the hashes to the output stream.
172174
@@ -183,6 +185,11 @@ def do_checksum(file_path, hash_output_stream, start_offset, end_offset):
183185
'''
184186
if hash_output_stream.isatty():
185187
sys.exit('ERROR (checksum): hash_output_stream is a tty - will not write binary data to terminal')
188+
start_offset = start_offset or 0
189+
if end_offset is not None:
190+
assert start_offset < end_offset
191+
192+
checksum_start_time = monotonic()
186193

187194
with ThreadPoolExecutor(worker_count + 2, thread_name_prefix='checksum') as executor:
188195
hash_output_stream_lock = Lock()
@@ -193,25 +200,36 @@ def do_checksum(file_path, hash_output_stream, start_offset, end_offset):
193200

194201
def read_worker():
195202
# Only one will run
196-
nonlocal source_end_offset
203+
nonlocal source_end_offset, show_progress
204+
show_progress_total_size = None
205+
show_progress_last_output_time = monotonic()
206+
show_progress_last_output_pct = 0
197207
try:
198208
with ExitStack() as stack:
199209
if file_path == '-':
200210
f = sys.stdin.buffer
201211
else:
202212
f = stack.enter_context(open(file_path, 'rb'))
203213

214+
if show_progress:
215+
if end_offset is None:
216+
try:
217+
assert f.tell() == 0
218+
show_progress_total_size = f.seek(0, os.SEEK_END)
219+
f.seek(0)
220+
except OSError:
221+
# Probably `[Errno 29] Illegal seek` when reading from pipe e.g. from pv command
222+
show_progress = False
223+
else:
224+
show_progress_total_size = end_offset - start_offset
225+
204226
if start_offset:
205-
f.seek(start_offset)
206-
block_pos = f.tell()
227+
block_pos = f.seek(start_offset)
207228
assert block_pos == start_offset
208229
else:
209230
block_pos = 0
210231

211232
while True:
212-
if exception_collector.has_exception():
213-
break
214-
215233
block_data_batch = []
216234

217235
for _ in range(16):
@@ -240,7 +258,7 @@ def read_worker():
240258

241259
block_pos += len(block_data)
242260

243-
if not block_data_batch:
261+
if not block_data_batch or exception_collector.has_exception():
244262
break
245263

246264
hash_result_event = Event()
@@ -250,11 +268,29 @@ def read_worker():
250268

251269
del block_data_batch
252270

253-
try:
254-
source_end_offset = f.tell()
255-
except OSError:
256-
# Probably `[Errno 29] Illegal seek` when reading from pipe e.g. from pv command
257-
source_end_offset = None
271+
if show_progress:
272+
show_progress_pct = 100 * (block_pos - start_offset) / show_progress_total_size
273+
if monotonic() - show_progress_last_output_time >= 60 or show_progress_pct - show_progress_last_output_pct >= 5:
274+
show_progress_last_output_time = monotonic()
275+
show_progress_last_output_pct = show_progress_pct
276+
print(
277+
f'Checksum progress: {block_pos/2**30:7.2f} GB / {show_progress_total_size/2**30:.2f} GB'
278+
f' ({show_progress_pct:5.2f}%)',
279+
file=sys.stderr,
280+
flush=True)
281+
282+
if end_offset is None:
283+
try:
284+
# Mark where we have ended reading the destination file checksum.
285+
# The retrieve side then has chance to read and send anything from the source beyond this offset.
286+
source_end_offset = f.tell()
287+
except OSError:
288+
# Probably `[Errno 29] Illegal seek` when reading from pipe e.g. from pv command
289+
source_end_offset = None
290+
elif end_offset > block_pos:
291+
raise Exception('You have specified an --end offset, but the destination file (which we are checksumming now) is smaller than that')
292+
else:
293+
assert end_offset == block_pos
258294

259295
except Exception as exc:
260296
logger.exception('do_checksum read_worker failed: %r', exc)
@@ -349,14 +385,18 @@ def send_worker():
349385
with hash_output_stream_lock:
350386
if source_end_offset is not None:
351387
# Instruct the retrieve process to send data afther the last hashed block.
352-
# This is necessary when the destination file is smaller than the source file
353-
# and we want to copy the whole source file.
388+
# This is necessary when the destination file (which are we checksumming now)
389+
# is smaller than the source file and we want to copy the whole source file.
354390
hash_output_stream.write(b'rest')
355391
hash_output_stream.write(source_end_offset.to_bytes(8, 'big'))
356392

357393
hash_output_stream.write(b'done')
358394
hash_output_stream.flush()
359395

396+
if show_progress:
397+
checksum_duration = monotonic() - checksum_start_time
398+
print(f'Checksum done in {checksum_duration:.3f} seconds', file=sys.stderr, flush=True)
399+
360400

361401
def do_retrieve(file_path, hash_input_stream, block_output_stream):
362402
'''

tests/test_checksum.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,68 @@ def test_checksum_stdin(tmp_path, script_path, terminate_process):
7070
sha3_512(test_content).digest(), # block hash
7171
b'done',
7272
])
73+
74+
75+
def test_checksum_file_progress(tmp_path, script_path, terminate_process):
76+
test_content = b'Hello World!'
77+
assert len(test_content) == 12
78+
sample_path = tmp_path / 'src_file'
79+
sample_path.write_bytes(test_content)
80+
cmd1 = [executable, script_path, 'checksum', '--progress', str(sample_path)]
81+
82+
with ExitStack() as stack:
83+
p1 = stack.enter_context(Popen(cmd1, stdin=DEVNULL, stdout=PIPE, stderr=PIPE))
84+
stack.callback(lambda: terminate_process(p1))
85+
p1_output, p1_error = p1.communicate(input=test_content, timeout=5)
86+
assert p1.wait() == 0
87+
assert p1_output == b''.join([
88+
b'Hash',
89+
b'\x00\x00\x00\x00\x00\x00\x00\x00', # block pos
90+
b'\x00\x00\x00\x0c', # block data length
91+
sha3_512(test_content).digest(), # block hash
92+
b'rest',
93+
b'\x00\x00\x00\x00\x00\x00\x00\x0c', # size of the sample file
94+
b'done',
95+
])
96+
97+
98+
def test_checksum_devstdin_progress(tmp_path, script_path, terminate_process):
99+
test_content = b'Hello World!'
100+
assert len(test_content) == 12
101+
sample_path = tmp_path / 'src_file'
102+
sample_path.write_bytes(test_content)
103+
cmd = [executable, script_path, 'checksum', '--progress', '/dev/stdin']
104+
105+
with ExitStack() as stack:
106+
p1 = stack.enter_context(Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE))
107+
stack.callback(lambda: terminate_process(p1))
108+
p1_output, p1_error = p1.communicate(input=test_content, timeout=5)
109+
assert p1.wait() == 0
110+
assert p1_output == b''.join([
111+
b'Hash',
112+
b'\x00\x00\x00\x00\x00\x00\x00\x00', # block pos
113+
b'\x00\x00\x00\x0c', # block data length
114+
sha3_512(test_content).digest(), # block hash
115+
b'done',
116+
])
117+
118+
119+
def test_checksum_stdin_progress(tmp_path, script_path, terminate_process):
120+
test_content = b'Hello World!'
121+
assert len(test_content) == 12
122+
sample_path = tmp_path / 'src_file'
123+
sample_path.write_bytes(test_content)
124+
cmd = [executable, script_path, 'checksum', '--progress', '-']
125+
126+
with ExitStack() as stack:
127+
p1 = stack.enter_context(Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE))
128+
stack.callback(lambda: terminate_process(p1))
129+
p1_output, p1_error = p1.communicate(input=test_content, timeout=5)
130+
assert p1.wait() == 0
131+
assert p1_output == b''.join([
132+
b'Hash',
133+
b'\x00\x00\x00\x00\x00\x00\x00\x00', # block pos
134+
b'\x00\x00\x00\x0c', # block data length
135+
sha3_512(test_content).digest(), # block hash
136+
b'done',
137+
])

0 commit comments

Comments
 (0)