Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add top-level copy/cp function #723

Closed
wants to merge 4 commits into from

Conversation

cisaacstern
Copy link
Contributor

@cisaacstern cisaacstern commented Aug 13, 2021

First draft for the feature proposed by @martindurant in pangeo-forge/pangeo-forge-recipes#179 (comment). The included test passes, as do the CI tests for pangeo-forge/pangeo-forge-recipes#179 (with fsspec installed from here).

Remaining TODO:

  • Add callbacks

Martin, my understanding is that within fsspec.cp, this should look like:

while True:
    data = a.read(a.fs.blocksize, callback=callback)

Except that AbstractBufferedFile.read does not implement callbacks. (And neither does and HTTPFile.read, as one backend example.) Do I understand correctly that this is because callbacks are new and sporadically implemented, and therefore the next step is to add callbacks to AbstractBufferedFile.read following this example:

https://github.com/intake/filesystem_spec/blob/5c58feb616a1e39c234fe85394d990ae36737203/fsspec/spec.py#L730-L743

?

  • Special-casing for FTP servers that don't allow random access reads

This is a second step after completing the callbacks implementation, but my impression is it should be something like:

# fsspec.cp
while True:
    data = a.read(a.fs.blocksize, callback=callback)
    if not data:
        break
    if not isinstance(a.fs, FTPFileSystem):
        b.write(data)

...and above that, we'll need to make sure b.write(data) is included in the callback if a.fs is an FTPFileSystem.

Two questions about this:

  1. Do we want to force writing within the callback for all FTPFileSystems? Or just the subset that don't allow random access?
  2. For those that do require writing within the callback, might a good way to implement that be to pass the callback to a new FTPFileSystem._fetch_all method (following the naming pattern of HTTPFileSystem._fetch_all), which does something like:
def _fetch_all(self, rpath, callback):    
    with self.ftp.transfercmd("RETR %s" % rpath) as conn:
        data = conn.recv(self.blocksize)
        callback.relative_update(data)  # assumes that a write function is one of the callback hooks

?

@martindurant
Copy link
Member

cc @isidentical for questions about callbacks. As far as I understand, callbacks are not implemented for any file-like objects, only for get/put/cat/pipe and potentially bulk rm/cp.

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intent for a top-level function would be that you can do cp("ftp://path/*", "s3://bucket/path"), just like with a (posix) command line.

That means that you would need to pass multiple open_kwargs for the two filesystems - but not for methods such as rm that only need one path. Maybe we could solve this by encouraging people to use the configuration system to define defaults for each protocol, or have some other mechanism for registering the "current" instance(s) for such operations.

#588 #41

@@ -468,6 +468,35 @@ def open_local(url, mode="rb", **storage_options):
return paths


def cp(filea, fileb: OpenFile, callback=None, **open_kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the filesystem API cp has recursive and other parameters - it can do batch jobs. I had imagined the code here as a file-specific copy method of a filesystem which could be overidden in various specific implementations (like FTP!).

This code doesn't use the given callback.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this, I think it will be great to have this useful feature!

Sorry for the nitpick, but might be nice to use src and dst as argument names similar to the python shutil.copyfile function, also maybe mark them as positional only as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly fine with src/dst. I'm less sure there is a need to make them position-only.

b.write(data)


def copy():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy = cp

@isidentical
Copy link
Member

As far as I understand, callbacks are not implemented for any file-like objects, only for get/put/cat/pipe and potentially bulk rm/cp.

No. We do not use callbacks on any file-like objects. We only implement them to raw data transfer methods (e.g get_file callback will return transferred_bytes / len(data), unit: bytes) and bulk file transfer methods (e.g put callback will return transferred_files / len(total_files), unit: whole files).

@cisaacstern
Copy link
Contributor Author

just like with a (posix) command line.

Gotcha, happy to re-work the draft to align with this. Setting aside momentarily where src_fs_kwargs, src_open_kwargs, and dst_open_kwargs come from (perhaps they are passed directly to the top-level function, or configured elsewhere), just want to confirm I'm on the right track with the design:

# fsspec/core.py

def cp(src, dst, src_fs_kwargs, src_open_kwargs, dst_open_kwargs, callback=None):
    src_fs, _ = url_to_fs(src, **src_fs_kwargs)
    open_dst = open(dst, **dst_open_kwargs)
    src_fs.cp(src, open_dst, src_open_kwargs, callback=callback)
# fsspec/implementations/some_implementation.py

def cp_file():
    # already exists in some implementations, for intra-filesystem copying

def cp_file_x():
    # the new inter-filesystem copy function, non-recursive
    # callbacks implemented here

def cp():
    # recursive wrapper for `cp_file_x` and/or `cp_file`
    # called by fsspec/core.py

Does this seem to be the right general direction?

@martindurant
Copy link
Member

Does this seem to be the right general direction?

Yes I think so, except maybe I would do expand_paths right in the generic cp, and then call the cp_file_x method rather than the fs-specific cp (to keep the latter pure).

Have you seen #732 ?

@cisaacstern
Copy link
Contributor Author

maybe I would do expand_paths right in the generic cp

👍

Have you seen #732 ?

Yes, very interesting. My takeaway is this justifies not expending extra effort in this PR on *_kwargs-passing feature development, as #723 may open up a new/better path for that. And further, that we should also not worry much in this PR about defining a generalizable model for cross-filesystem operations. But that the basic fsspec.cp under discussion here is still worth implementing now, even though it may be refactored a bit down the line. Correct?

@martindurant
Copy link
Member

Yes, I think that's right. We can concentrate here on making fsspec.api.cp with strings, and let the Upath implementation take its course.

@cisaacstern
Copy link
Contributor Author

@martindurant, should we close this? Hopefully the discussion was useful, but I assume you'll be starting fresh when you begin working on a top-level copy?

@martindurant
Copy link
Member

Yes, I am starting fresh (but my initial version is not that different, of course!).

@cisaacstern cisaacstern closed this Dec 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants