Skip to content

Commit 61b66b6

Browse files
groutrmartindurant
andauthored
Consolidate block fetch requests. (#1733)
--------- Co-authored-by: Martin Durant <[email protected]>
1 parent cbd73b0 commit 61b66b6

File tree

1 file changed

+33
-13
lines changed

1 file changed

+33
-13
lines changed

fsspec/caching.py

+33-13
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import os
88
import threading
99
import warnings
10+
from itertools import groupby
11+
from operator import itemgetter
1012
from concurrent.futures import Future, ThreadPoolExecutor
1113
from typing import (
1214
TYPE_CHECKING,
@@ -161,21 +163,39 @@ def _fetch(self, start: int | None, end: int | None) -> bytes:
161163
return b""
162164
start_block = start // self.blocksize
163165
end_block = end // self.blocksize
164-
need = [i for i in range(start_block, end_block + 1) if i not in self.blocks]
165-
hits = [i for i in range(start_block, end_block + 1) if i in self.blocks]
166-
self.miss_count += len(need)
167-
self.hit_count += len(hits)
168-
while need:
169-
# TODO: not a for loop so we can consolidate blocks later to
170-
# make fewer fetch calls; this could be parallel
171-
i = need.pop(0)
172-
173-
sstart = i * self.blocksize
174-
send = min(sstart + self.blocksize, self.size)
166+
block_range = range(start_block, end_block + 1)
167+
# Determine which blocks need to be fetched. This sequence is sorted by construction.
168+
need = (i for i in block_range if i not in self.blocks)
169+
# Count the number of blocks already cached
170+
self.hit_count += sum(1 for i in block_range if i in self.blocks)
171+
172+
# Consolidate needed blocks.
173+
# Algorithm adapted from Python 2.x itertools documentation.
174+
# We are grouping an enumerated sequence of blocks. By comparing when the difference
175+
# between an ascending range (provided by enumerate) and the needed block numbers
176+
# we can detect when the block number skips values. The key computes this difference.
177+
# Whenever the difference changes, we know that we have previously cached block(s),
178+
# and a new group is started. In other words, this algorithm neatly groups
179+
# runs of consecutive block numbers so they can be fetched together.
180+
for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]):
181+
# Extract the blocks from the enumerated sequence
182+
_blocks = tuple(map(itemgetter(1), _blocks))
183+
# Compute start of first block
184+
sstart = _blocks[0] * self.blocksize
185+
# Compute the end of the last block. Last block may not be full size.
186+
send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size)
187+
188+
# Fetch bytes (could be multiple consecutive blocks)
175189
self.total_requested_bytes += send - sstart
176-
logger.debug(f"MMap get block #{i} ({sstart}-{send})")
190+
logger.debug(
191+
f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})"
192+
)
177193
self.cache[sstart:send] = self.fetcher(sstart, send)
178-
self.blocks.add(i)
194+
195+
# Update set of cached blocks
196+
self.blocks.update(_blocks)
197+
# Update cache statistics with number of blocks we had to cache
198+
self.miss_count += len(_blocks)
179199

180200
return self.cache[start:end]
181201

0 commit comments

Comments
 (0)