Skip to content

Commit 28b0f7b

Browse files
authored
feat: basic fsspec writing (#1016)
* use paramiko instead of sshfs * use specified port * test default handler behaviour * default to fsspec instead of error if scheme not found * attempt to close socket * fix ci * Revert "fix ci" This reverts commit e56e337. * broader exception * also handle socket exception * get user robust * enable github test with skip if api limit is hit (so we sometimes test it, but it never fails due to api limits). TODO: update exception class * add memory filesystem test * add zip and tar tests * fix memory test * zip/tar tests * add test for writing with fsspec without any integration (file-like object) * update docstrings, add some type hints * add helper method to produce sink from a path or file-like object * add fsspec writing test for integration * helper function to check if file-like object * return file-like object * explicit argument * rename test * create empty file * rename test * pass storage options to fsspec * test ssh and memory writing * split fsspec tests into reading and writing * loosen the check for file-like object * add test for file update * file update test * properly truncate the file * refactor how sink is created (no changes) * annotation to avoid warning * close file if sink initialization fails * sink will handle fsspec in a similar way to local path for open and close * rename to reading * check if test works * missing import * remove parent dirs * fix zip tar tests * skip github if api limits hit * attempt to fix windows paths * use more complex uri with object in zip test * debug * paths * add new test case to object url split * add new failing test case: TODO make it work * revert debug changes * working in new test case * modified where file is truncated * revert file-like check * revert is-file-like check * unified rb+ mode (fsspec uses "rb+" not "r+b" it can be confusing sometimes) * add http write test (not implemented error check) * skip test for debugging * understand test failure * use r+b instead of rb+ * use new sink * writing memory test * correctly truncate file with fsspec * use fsspec to get parent dir * aiohttp import skip * isstr * use ports instead of string * improve path:obj split to handle chained protocols (protocol1::protocol2://) * working url chain to some extent * attempt to fix ci * cleanup * attempt to fix ci
1 parent 113c58c commit 28b0f7b

File tree

9 files changed

+281
-74
lines changed

9 files changed

+281
-74
lines changed

src/uproot/_util.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -286,23 +286,26 @@ def regularize_path(path):
286286
pass
287287

288288
_uri_scheme = re.compile("^(" + "|".join([re.escape(x) for x in _schemes]) + ")://")
289+
_uri_scheme_chain = re.compile(
290+
"^(" + "|".join([re.escape(x) for x in _schemes]) + ")::"
291+
)
289292

290293

291-
def file_object_path_split(path: str) -> tuple[str, str | None]:
294+
def file_object_path_split(urlpath: str) -> tuple[str, str | None]:
292295
"""
293296
Split a path with a colon into a file path and an object-in-file path.
294297
295298
Args:
296-
path: The path to split. Example: ``"https://localhost:8888/file.root:tree"``
299+
urlpath: The path to split. Example: ``"https://localhost:8888/file.root:tree"``
297300
298301
Returns:
299302
A tuple of the file path and the object-in-file path. If there is no
300303
object-in-file path, the second element is ``None``.
301304
Example: ``("https://localhost:8888/file.root", "tree")``
302305
"""
303306

304-
path: str = regularize_path(path)
305-
path = path.strip()
307+
urlpath: str = regularize_path(urlpath).strip()
308+
path = urlpath
306309

307310
def _split_path(path: str) -> list[str]:
308311
parts = path.split(":")
@@ -313,16 +316,22 @@ def _split_path(path: str) -> list[str]:
313316
return parts
314317

315318
if "://" not in path:
316-
# assume it's a local file path
317-
parts = _split_path(path)
318-
elif _uri_scheme.match(path):
319+
path = "file://" + path
320+
321+
# replace the match of _uri_scheme_chain with "" until there is no match
322+
while _uri_scheme_chain.match(path):
323+
path = _uri_scheme_chain.sub("", path)
324+
325+
if _uri_scheme.match(path):
319326
# if not a local path, attempt to match a URI scheme
320-
parsed_url = urlparse(path)
321-
parsed_url_path = parsed_url.path
327+
if path.startswith("file://"):
328+
parsed_url_path = path[7:]
329+
else:
330+
parsed_url_path = urlparse(path).path
331+
322332
if parsed_url_path.startswith("//"):
323-
# This can be a leftover from url chaining in fsspec
324-
# TODO: replace this with str.removeprefix once Python 3.8 is dropped
325333
parsed_url_path = parsed_url_path[2:]
334+
326335
parts = _split_path(parsed_url_path)
327336
else:
328337
# invalid scheme
@@ -336,12 +345,15 @@ def _split_path(path: str) -> list[str]:
336345
elif len(parts) == 2:
337346
obj = parts[1]
338347
# remove the object from the path (including the colon)
339-
path = path[: -len(obj) - 1]
340-
obj = obj.strip()
348+
urlpath = urlpath[: -len(obj) - 1]
349+
# clean badly placed slashes
350+
obj = obj.strip().lstrip("/")
351+
while "//" in obj:
352+
obj = obj.replace("//", "/")
341353
else:
342354
raise ValueError(f"could not split object from path {path}")
343355

344-
return path, obj
356+
return urlpath, obj
345357

346358

347359
def file_path_to_source_class(file_path, options):
@@ -412,7 +424,7 @@ def file_path_to_source_class(file_path, options):
412424
windows_absolute_path = file_path
413425

414426
parsed_url = urlparse(file_path)
415-
if parsed_url.scheme.upper() == "FILE":
427+
if parsed_url.scheme.lower() == "file":
416428
parsed_url_path = unquote(parsed_url.path)
417429
else:
418430
parsed_url_path = parsed_url.path

src/uproot/sink/file.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
(although files are flushed after every object-write).
1010
"""
1111

12+
from __future__ import annotations
1213

1314
import numbers
1415
import os
@@ -28,7 +29,7 @@ class FileSink:
2829
"""
2930

3031
@classmethod
31-
def from_object(cls, obj):
32+
def from_object(cls, obj) -> FileSink:
3233
"""
3334
Args:
3435
obj (file-like object): An object with ``read``, ``write``, ``seek``,
@@ -59,23 +60,38 @@ def from_object(cls, obj):
5960
)
6061
return self
6162

62-
def __init__(self, file_path):
63+
@classmethod
64+
def from_fsspec(cls, open_file) -> FileSink:
65+
import fsspec
66+
67+
if not isinstance(open_file, fsspec.core.OpenFile):
68+
raise TypeError("""argument should be of type fsspec.core.OpenFile""")
69+
self = cls(None)
70+
self._fsspec_open_file = open_file
71+
return self
72+
73+
def __init__(self, file_path: str | None):
6374
self._file_path = file_path
6475
self._file = None
76+
self._fsspec_open_file = None
6577

6678
@property
67-
def file_path(self):
79+
def file_path(self) -> str | None:
6880
"""
6981
A path to the file, which is None if constructed with a file-like object.
7082
"""
7183
return self._file_path
7284

7385
def _ensure(self):
74-
if self._file is None:
75-
if self._file_path is None:
76-
raise TypeError("FileSink created from an object cannot be reopened")
86+
if self._file:
87+
return
88+
89+
if self._fsspec_open_file:
90+
self._file = self._fsspec_open_file.open()
91+
else:
7792
self._file = open(self._file_path, "r+b")
78-
self._file.seek(0)
93+
94+
self._file.seek(0)
7995

8096
def __getstate__(self):
8197
state = dict(self.__dict__)
@@ -101,7 +117,7 @@ def flush(self):
101117
return self._file.flush()
102118

103119
@property
104-
def closed(self):
120+
def closed(self) -> bool:
105121
"""
106122
True if the file is closed; False otherwise.
107123
"""
@@ -124,7 +140,7 @@ def __exit__(self, exception_type, exception_value, traceback):
124140
self.close()
125141

126142
@property
127-
def in_path(self):
143+
def in_path(self) -> str:
128144
if self._file_path is None:
129145
return ""
130146
else:
@@ -157,7 +173,7 @@ def set_file_length(self, length):
157173
if missing > 0:
158174
self._file.write(b"\x00" * missing)
159175

160-
def read(self, location, num_bytes, insist=True):
176+
def read(self, location, num_bytes, insist=True) -> bytes:
161177
"""
162178
Args:
163179
location (int): Position in the file to read.

src/uproot/source/fsspec.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,13 @@ def __init__(self, file_path: str, **options):
3131
exclude_keys = set(default_options.keys())
3232
storage_options = {k: v for k, v in options.items() if k not in exclude_keys}
3333

34-
protocol = fsspec.core.split_protocol(file_path)[0]
35-
self._async_impl = fsspec.get_filesystem_class(protocol=protocol).async_impl
3634
self._executor = FSSpecLoopExecutor()
3735

3836
self._fs, self._file_path = fsspec.core.url_to_fs(file_path, **storage_options)
3937

38+
# What should we do when there is a chain of filesystems?
39+
self._async_impl = self._fs.async_impl
40+
4041
self._file = self._fs.open(self._file_path)
4142
self._fh = None
4243
self._num_requests = 0

0 commit comments

Comments
 (0)