Skip to content

Commit 04636c2

Browse files
authored
Merge pull request #744 from martindurant/cat-ranges
cat with start/stop ranges
2 parents 3b0fe79 + ea4a2c8 commit 04636c2

File tree

9 files changed

+136
-11
lines changed

9 files changed

+136
-11
lines changed

fsspec/asyn.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,26 @@ async def _cat(self, path, recursive=False, on_error="raise", **kwargs):
358358
else:
359359
return out[0]
360360

361+
async def _cat_ranges(self, paths, starts, ends, max_gap=None, **kwargs):
362+
# TODO: on_error
363+
if max_gap is not None:
364+
# to be implemented in utils
365+
raise NotImplementedError
366+
if not isinstance(paths, list):
367+
raise TypeError
368+
if not isinstance(starts, list):
369+
starts = [starts] * len(paths)
370+
if not isinstance(ends, list):
371+
ends = [starts] * len(paths)
372+
if len(starts) != len(paths) or len(ends) != len(paths):
373+
raise ValueError
374+
return await asyncio.gather(
375+
*[
376+
self._cat_file(p, start=s, end=e, **kwargs)
377+
for p, s, e in zip(paths, starts, ends)
378+
]
379+
)
380+
361381
async def _put(
362382
self, lpath, rpath, recursive=False, callback=_DEFAULT_CALLBACK, **kwargs
363383
):

fsspec/caching.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,32 @@ def _fetch(self, start, end):
422422
return self.data[start:end]
423423

424424

425+
class KnownPartsOfAFile(BaseCache):
426+
name = "parts"
427+
428+
def __init__(self, blocksize, fetcher, size, data={}, **_):
429+
super(KnownPartsOfAFile, self).__init__(blocksize, fetcher, size)
430+
431+
# simple consolidation of contiguous blocks
432+
for start0, stop in data.copy():
433+
for start, stop1 in data.copy():
434+
if stop == start:
435+
data[(start0, stop1)] = data.pop((start0, stop)) + data.pop(
436+
(start, stop1)
437+
)
438+
self.data = data
439+
440+
def _fetch(self, start, stop):
441+
for (loc0, loc1), data in self.data.items():
442+
if loc0 <= start < loc1 and loc0 <= stop <= loc1:
443+
off = start - loc0
444+
out = data[off : off + stop - start]
445+
# reads beyond buffer are padded with zero
446+
out += b"\x00" * (stop - start - len(out))
447+
return out
448+
raise ValueError("Read outside of know parts of file")
449+
450+
425451
caches = {
426452
"none": BaseCache,
427453
"mmap": MMapCache,
@@ -430,4 +456,5 @@ def _fetch(self, start, end):
430456
"block": BlockCache,
431457
"first": FirstChunkCache,
432458
"all": AllBytes,
459+
"parts": KnownPartsOfAFile,
433460
}

fsspec/compression.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,9 @@ class SnappyFile(AbstractBufferedFile):
107107
def __init__(self, infile, mode, **kwargs):
108108
import snappy
109109

110-
self.details = {"size": 999999999} # not true, but OK if we don't seek
111-
super().__init__(fs=None, path="snappy", mode=mode.strip("b") + "b", **kwargs)
110+
super().__init__(
111+
fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs
112+
)
112113
self.infile = infile
113114
if "r" in mode:
114115
self.codec = snappy.StreamDecompressor()

fsspec/core.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from .compression import compr
1919
from .registry import filesystem, get_filesystem_class
2020
from .utils import (
21+
_unstrip_protocol,
2122
build_name_function,
2223
infer_compression,
2324
stringify_path,
@@ -124,6 +125,10 @@ def __del__(self):
124125
if hasattr(self, "fobjects"):
125126
self.fobjects.clear() # may cause cleanup of objects and close files
126127

128+
@property
129+
def full_name(self):
130+
return _unstrip_protocol(self.path, self.fs)
131+
127132
def open(self):
128133
"""Materialise this as a real open file without context
129134

fsspec/implementations/memory.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,11 +139,14 @@ def info(self, path, **kwargs):
139139
"type": "directory",
140140
}
141141
elif path in self.store:
142+
filelike = self.store[path]
142143
return {
143144
"name": path,
144-
"size": self.store[path].getbuffer().nbytes,
145+
"size": filelike.size
146+
if hasattr(filelike, "size")
147+
else filelike.getbuffer().nbytes,
145148
"type": "file",
146-
"created": self.store[path].created,
149+
"created": getattr(filelike, "created", None),
147150
}
148151
else:
149152
raise FileNotFoundError(path)

fsspec/implementations/reference.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ def cat_file(self, path, start=None, end=None, **kwargs):
194194
return part_or_url[start:end]
195195
return self.fs.cat_file(part_or_url, start=start0, end=end0)[start:end]
196196

197+
def pipe_file(self, path, value, **_):
198+
"""Temporarily add binary data or reference as a file"""
199+
self.references[path] = value
200+
197201
async def _get_file(self, rpath, lpath, **kwargs):
198202
data = await self._cat_file(rpath)
199203
with open(lpath, "wb") as f:

fsspec/spec.py

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from .dircache import DirCache
1515
from .transaction import Transaction
1616
from .utils import (
17+
_unstrip_protocol,
1718
get_package_version_without_import,
1819
other_paths,
1920
read_block,
@@ -700,18 +701,39 @@ def pipe(self, path, value=None, **kwargs):
700701
else:
701702
raise ValueError("path must be str or dict")
702703

704+
def cat_ranges(self, paths, starts, ends, max_gap=None, **kwargs):
705+
if max_gap is not None:
706+
raise NotImplementedError
707+
if not isinstance(paths, list):
708+
raise TypeError
709+
if not isinstance(starts, list):
710+
starts = [starts] * len(paths)
711+
if not isinstance(ends, list):
712+
ends = [starts] * len(paths)
713+
if len(starts) != len(paths) or len(ends) != len(paths):
714+
raise ValueError
715+
return [self.cat_file(p, s, e) for p, s, e in zip(paths, starts, ends)]
716+
703717
def cat(self, path, recursive=False, on_error="raise", **kwargs):
704718
"""Fetch (potentially multiple) paths' contents
705719
706-
Returns a dict of {path: contents} if there are multiple paths
707-
or the path has been otherwise expanded
708-
720+
Parameters
721+
----------
722+
recursive: bool
723+
If True, assume the path(s) are directories, and get all the
724+
contained files
709725
on_error : "raise", "omit", "return"
710726
If raise, an underlying exception will be raised (converted to KeyError
711727
if the type is in self.missing_exceptions); if omit, keys with exception
712728
will simply not be included in the output; if "return", all keys are
713729
included in the output, but the value will be bytes or an exception
714730
instance.
731+
kwargs: passed to cat_file
732+
733+
Returns
734+
-------
735+
dict of {path: contents} if there are multiple paths
736+
or the path has been otherwise expanded
715737
"""
716738
paths = self.expand_path(path, recursive=recursive)
717739
if (
@@ -1241,6 +1263,7 @@ class AbstractBufferedFile(io.IOBase):
12411263
"""
12421264

12431265
DEFAULT_BLOCK_SIZE = 5 * 2 ** 20
1266+
_details = None
12441267

12451268
def __init__(
12461269
self,
@@ -1251,6 +1274,7 @@ def __init__(
12511274
autocommit=True,
12521275
cache_type="readahead",
12531276
cache_options=None,
1277+
size=None,
12541278
**kwargs,
12551279
):
12561280
"""
@@ -1274,6 +1298,8 @@ def __init__(
12741298
cache_options : dict
12751299
Additional options passed to the constructor for the cache specified
12761300
by `cache_type`.
1301+
size: int
1302+
If given and in read mode, suppressed having to look up the file size
12771303
kwargs:
12781304
Gets stored as self.kwargs
12791305
"""
@@ -1307,9 +1333,10 @@ def __init__(
13071333
if mode not in {"ab", "rb", "wb"}:
13081334
raise NotImplementedError("File mode not supported")
13091335
if mode == "rb":
1310-
if not hasattr(self, "details"):
1311-
self.details = fs.info(path)
1312-
self.size = self.details["size"]
1336+
if size is not None:
1337+
self.size = size
1338+
else:
1339+
self.size = self.details["size"]
13131340
self.cache = caches[cache_type](
13141341
self.blocksize, self._fetch_range, self.size, **cache_options
13151342
)
@@ -1319,6 +1346,21 @@ def __init__(
13191346
self.forced = False
13201347
self.location = None
13211348

1349+
@property
1350+
def details(self):
1351+
if self._details is None:
1352+
self._details = self.fs.info(self.path)
1353+
return self._details
1354+
1355+
@details.setter
1356+
def details(self, value):
1357+
self._details = value
1358+
self.size = value["size"]
1359+
1360+
@property
1361+
def full_name(self):
1362+
return _unstrip_protocol(self.path, self.fs)
1363+
13221364
@property
13231365
def closed(self):
13241366
# get around this attr being read-only in IOBase

fsspec/tests/test_caches.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ def letters_fetcher(start, end):
4444
return string.ascii_letters[start:end].encode()
4545

4646

47-
@pytest.fixture(params=caches.values(), ids=list(caches.keys()))
47+
not_parts_caches = {k: v for k, v in caches.items() if k != "parts"}
48+
49+
50+
@pytest.fixture(params=not_parts_caches.values(), ids=list(not_parts_caches))
4851
def Cache_imp(request):
4952
return request.param
5053

@@ -91,3 +94,12 @@ def test_cache_basic(Cache_imp, blocksize, size_requests):
9194
result = cache._fetch(start, end)
9295
expected = string.ascii_letters[start:end].encode()
9396
assert result == expected
97+
98+
99+
def test_known():
100+
c = caches["parts"](None, None, 100, {(10, 20): b"1" * 10, (0, 10): b"0" * 10})
101+
assert (0, 20) in c.data # got consolidated
102+
assert c._fetch(5, 15) == b"0" * 5 + b"1" * 5
103+
with pytest.raises(ValueError):
104+
# tries to call None fetcher
105+
c._fetch(25, 35)

fsspec/utils.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,17 @@ def setup_logging(logger=None, logger_name=None, level="DEBUG", clear=True):
450450
return logger
451451

452452

453+
def _unstrip_protocol(name, fs):
454+
if isinstance(fs.protocol, str):
455+
if name.startswith(fs.protocol):
456+
return name
457+
return fs.protocol + "://" + name
458+
else:
459+
if name.startswith(tuple(fs.protocol)):
460+
return name
461+
return fs.protocol[0] + "://" + name
462+
463+
453464
def mirror_from(origin_name, methods):
454465
"""Mirror attributes and methods from the given
455466
origin_name attribute of the instance to the

0 commit comments

Comments
 (0)