Skip to content

password support to ZipFS #361

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [2.4.12] - (Unreleased)

### Added

- Added `passwd` argument and `setpassword` for ReadZipFS to extract password
protected date from zip file. [#360](https://github.com/PyFilesystem/pyfilesystem2/issues/360)

### Changed

- Start testing on PyPy. Due to [#342](https://github.com/PyFilesystem/pyfilesystem2/issues/342)
Expand Down
8 changes: 8 additions & 0 deletions fs/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"NoURL",
"OperationFailed",
"OperationTimeout",
"PasswordUnsupported",
"PathError",
"PermissionDenied",
"RemoteConnectionError",
Expand Down Expand Up @@ -255,6 +256,13 @@ class RemoveRootError(OperationFailed):
default_message = "root directory may not be removed"


class PasswordUnsupported(Unsupported):
"""Attempt to create a password protected zip file.
"""

default_message = "can not create password protected zip"


class ResourceError(FSError):
"""Base exception class for error associated with a specific resource.
"""
Expand Down
7 changes: 6 additions & 1 deletion fs/opener/zipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,10 @@ def open_fs(

if not create and writeable:
raise NotWriteable("Unable to open existing ZIP file for writing")
zip_fs = ZipFS(parse_result.resource, write=create)

password = parse_result.params.get("password")
if password is not None:
password = password.encode()

zip_fs = ZipFS(parse_result.resource, write=create, password=password)
return zip_fs
49 changes: 37 additions & 12 deletions fs/zipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@


class _ZipExtFile(RawWrapper):
def __init__(self, fs, name):
# type: (ReadZipFS, Text) -> None
def __init__(self, fs, name, password=None):
# type: (ReadZipFS, Text, Optional[bytes]) -> None
if password is not None:
_password_type_check(password)
self._zip = _zip = fs._zip
self._end = _zip.getinfo(name).file_size
self._pos = 0
super(_ZipExtFile, self).__init__(_zip.open(name), "r", name)
super(_ZipExtFile, self).__init__(_zip.open(name, pwd=password), "r", name)

def read(self, size=-1):
# type: (int) -> bytes
Expand Down Expand Up @@ -160,6 +162,8 @@ class ZipFS(WrapFS):
defined in the `zipfile` module in the stdlib).
temp_fs (str): An FS URL for the temporary filesystem used to
store data prior to zipping.
password (bytes): Password for extracting file from zip file. Only used
for read mode.

"""

Expand All @@ -171,16 +175,19 @@ def __new__( # type: ignore
compression=zipfile.ZIP_DEFLATED, # type: int
encoding="utf-8", # type: Text
temp_fs="temp://__ziptemp__", # type: Text
password=None, # type: Optional[bytes]
):
# type: (...) -> FS
# This magic returns a different class instance based on the
# value of the ``write`` parameter.
if write:
if password is not None:
raise errors.PasswordUnsupported()
return WriteZipFS(
file, compression=compression, encoding=encoding, temp_fs=temp_fs
)
else:
return ReadZipFS(file, encoding=encoding)
return ReadZipFS(file, encoding=encoding, password=password)

if typing.TYPE_CHECKING:

Expand All @@ -191,6 +198,7 @@ def __init__(
compression=zipfile.ZIP_DEFLATED, # type: int
encoding="utf-8", # type: Text
temp_fs="temp://__ziptemp__", # type: Text
password=None, # type: Optional[bytes]
):
# type: (...) -> None
pass
Expand Down Expand Up @@ -290,13 +298,15 @@ class ReadZipFS(FS):
}

@errors.CreateFailed.catch_all
def __init__(self, file, encoding="utf-8"):
# type: (Union[BinaryIO, Text], Text) -> None
def __init__(self, file, encoding="utf-8", password=None):
# type: (Union[BinaryIO, Text], Text, Optional[bytes]) -> None
super(ReadZipFS, self).__init__()
self._file = file
self.encoding = encoding
self._zip = zipfile.ZipFile(file, "r")
self._directory_fs = None # type: Optional[MemoryFS]
if password is not None:
self.setpassword(password)

def __repr__(self):
# type: () -> Text
Expand Down Expand Up @@ -409,8 +419,8 @@ def makedir(
self.check()
raise errors.ResourceReadOnly(path)

def openbin(self, path, mode="r", buffering=-1, **kwargs):
# type: (Text, Text, int, **Any) -> BinaryIO
def openbin(self, path, mode="r", buffering=-1, password=None, **kwargs):
# type: (Text, Text, int, Optional[bytes], **Any) -> BinaryIO
self.check()
if "w" in mode or "+" in mode or "a" in mode:
raise errors.ResourceReadOnly(path)
Expand All @@ -421,7 +431,7 @@ def openbin(self, path, mode="r", buffering=-1, **kwargs):
raise errors.FileExpected(path)

zip_name = self._path_to_zip_name(path)
return _ZipExtFile(self, zip_name) # type: ignore
return _ZipExtFile(self, zip_name, password) # type: ignore

def remove(self, path):
# type: (Text) -> None
Expand All @@ -439,13 +449,15 @@ def close(self):
if hasattr(self, "_zip"):
self._zip.close()

def readbytes(self, path):
# type: (Text) -> bytes
def readbytes(self, path, password=None):
# type: (Text, Optional[bytes]) -> bytes
self.check()
if not self._directory.isfile(path):
raise errors.ResourceNotFound(path)
if password is not None:
_password_type_check(password)
zip_name = self._path_to_zip_name(path)
zip_bytes = self._zip.read(zip_name)
zip_bytes = self._zip.read(zip_name, pwd=password)
return zip_bytes

def geturl(self, path, purpose="download"):
Expand All @@ -456,3 +468,16 @@ def geturl(self, path, purpose="download"):
return "zip://{}!/{}".format(quoted_file, quoted_path)
else:
raise errors.NoURL(path, purpose)

def setpassword(self, password):
# type: (bytes) -> None
"""Set *password* as default password to extract encrypted files.
"""
_password_type_check(password)
self._zip.setpassword(password)


def _password_type_check(password):
if isinstance(password, six.binary_type):
return
raise TypeError("except bytes for password, not " + type(password).__name__)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar nit: "Accept" not "Except"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe it was a typo of "expect bytes..." ? 🤷‍♂️

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's typo 😆 i'd like to type 'expect'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe it was a typo of "expect bytes..." ?

That never occurred to me hahaha

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it should ultimately say 'Expected', I guess.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would make more sense than my suggestion.

64 changes: 63 additions & 1 deletion tests/test_zipfs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- encoding: UTF-8
from __future__ import unicode_literals

import codecs
import os
import sys
import tempfile
Expand All @@ -13,7 +14,7 @@
from fs.compress import write_zip
from fs.opener import open_fs
from fs.opener.errors import NotWriteable
from fs.errors import NoURL
from fs.errors import NoURL, PasswordUnsupported
from fs.test import FSTestCases
from fs.enums import Seek

Expand All @@ -40,6 +41,10 @@ def test_unicode_paths(self):
with zip_fs.openbin(path) as f:
f.read()

def test_create_password(self):
with self.assertRaises(PasswordUnsupported):
zipfs.ZipFS(self._temp_path, write=True, password="hello")


class TestWriteZipFS(FSTestCases, unittest.TestCase):
"""
Expand Down Expand Up @@ -220,7 +225,64 @@ def test_implied(self):
os.remove(path)


class TestPasswordReadZipFS(unittest.TestCase):

ZIP_BIN = (
b"UEsDBAoACQAAAH2whk8tOwivGAAAAAwAAAADABwAZm9vVVQJAAPNX+pdzl/qXXV4CwABBPUBAAAE"
b"FAAAAJ6pj1kohibjIq4YqnEKUZ8SCJMeUkl9oVBLBwgtOwivGAAAAAwAAABQSwECHgMKAAkAAAB9"
b"sIZPLTsIrxgAAAAMAAAAAwAYAAAAAAABAAAApIEAAAAAZm9vVVQFAAPNX+pddXgLAAEE9QEAAAQU"
b"AAAAUEsFBgAAAAABAAEASQAAAGUAAAAAAA=="
)

PASSWD = b"P@ssw0rd"

def setUp(self):
fh, path = tempfile.mkstemp("testzip.zip")
os.write(fh, codecs.decode(self.ZIP_BIN, "base64"))
os.close(fh)
self.path = path

def tearDown(self):
os.remove(self.path)

def test_openbin(self):
with zipfs.ReadZipFS(self.path, password=self.PASSWD) as zip_fs:
with zip_fs.openbin("foo") as fp:
self.assertEqual(fp.read(), b"hello world\n")

with zipfs.ReadZipFS(self.path) as zip_fs:
with zip_fs.openbin("foo", password=self.PASSWD) as fp:
self.assertEqual(fp.read(), b"hello world\n")

def test_readbytes(self):
with zipfs.ReadZipFS(self.path, password=self.PASSWD) as zip_fs:
self.assertEqual(zip_fs.readbytes("foo"), b"hello world\n")

with zipfs.ReadZipFS(self.path) as zip_fs:
self.assertEqual(
zip_fs.readbytes("foo", password=self.PASSWD), b"hello world\n"
)

def test_setpassword(self):
with zipfs.ReadZipFS(self.path) as zip_fs:
with self.assertRaises(RuntimeError):
zip_fs._zip.read("foo")

zip_fs.setpassword(self.PASSWD)
self.assertEqual(zip_fs._zip.read("foo"), b"hello world\n")


class TestPasswordTypeCheck(unittest.TestCase):
def test_raise(self):
with self.assertRaises(TypeError):
zipfs._password_type_check("string")

zipfs._password_type_check(b"bytes")


class TestOpener(unittest.TestCase):
def test_not_writeable(self):
with self.assertRaises(NotWriteable):
open_fs("zip://foo.zip", writeable=True)

open_fs("zip://foo.zip?password=1234")