-
Notifications
You must be signed in to change notification settings - Fork 88
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
`vcs_base.load_url()` currently doesn't support authentication. Add support for both basic and token-based authentication by parsing netrc-formatted files. Use `appdirs` to support vcstool-specific authentication files for both the user and the system (user takes precedence). Signed-off-by: Kyle Fazzari <[email protected]>
- Loading branch information
Showing
3 changed files
with
370 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,252 @@ | ||
import os | ||
import shutil | ||
import tempfile | ||
import textwrap | ||
import unittest | ||
|
||
try: | ||
from urllib.error import HTTPError | ||
except ImportError: | ||
from urllib2 import HTTPError | ||
|
||
|
||
try: | ||
from unittest import mock | ||
except ImportError: | ||
import mock | ||
|
||
from vcstool.clients import vcs_base | ||
|
||
|
||
class TestBase(unittest.TestCase): | ||
|
||
def setUp(self): | ||
self.default_auth_dir = tempfile.mkdtemp() | ||
self.addCleanup(shutil.rmtree, self.default_auth_dir) | ||
self.user_auth_dir = tempfile.mkdtemp() | ||
self.addCleanup(shutil.rmtree, self.user_auth_dir) | ||
self.system_auth_dir = tempfile.mkdtemp() | ||
self.addCleanup(shutil.rmtree, self.system_auth_dir) | ||
|
||
self._previous_home = os.getenv("HOME") | ||
os.environ["HOME"] = self.default_auth_dir | ||
|
||
patcher = mock.patch( | ||
'vcstool.clients.vcs_base.appdirs.user_config_dir', | ||
return_value=self.user_auth_dir) | ||
patcher.start() | ||
self.addCleanup(patcher.stop) | ||
|
||
patcher = mock.patch( | ||
'vcstool.clients.vcs_base.appdirs.site_config_dir', | ||
return_value=self.system_auth_dir) | ||
patcher.start() | ||
self.addCleanup(patcher.stop) | ||
|
||
def tearDown(self): | ||
if self._previous_home: | ||
os.environ["HOME"] = self._previous_home | ||
else: | ||
del os.environ["HOME"] | ||
|
||
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) | ||
@mock.patch( | ||
'vcstool.clients.vcs_base._authenticated_urlopen', autospec=True) | ||
def test_load_url_calls_urlopen( | ||
self, authenticated_urlopen_mock, urlopen_mock): | ||
urlopen_read_mock = urlopen_mock.return_value.read | ||
|
||
vcs_base.load_url('example.com', timeout=123) | ||
|
||
urlopen_mock.assert_called_once_with('example.com', timeout=123) | ||
urlopen_read_mock.assert_called_once_with() | ||
self.assertFalse(authenticated_urlopen_mock.mock_calls) | ||
|
||
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) | ||
@mock.patch( | ||
'vcstool.clients.vcs_base._authenticated_urlopen', autospec=True) | ||
def test_load_url_calls_authenticated_urlopen( | ||
self, authenticated_urlopen_mock, urlopen_mock): | ||
for code in (401, 404): | ||
urlopen_mock.side_effect = [ | ||
HTTPError(None, code, None, None, None)] | ||
urlopen_read_mock = urlopen_mock.return_value.read | ||
|
||
vcs_base.load_url('example.com', timeout=123) | ||
|
||
urlopen_mock.assert_called_once_with('example.com', timeout=123) | ||
self.assertFalse(urlopen_read_mock.mock_calls) | ||
|
||
authenticated_urlopen_mock.assert_called_once_with( | ||
'example.com', timeout=123) | ||
|
||
authenticated_urlopen_mock.reset_mock() | ||
urlopen_mock.reset_mock() | ||
|
||
def test_netrc_open_no_such_file(self): | ||
try: | ||
self.assertEqual(vcs_base._authenticated_urlopen( | ||
'https://example.com'), None) | ||
except Exception: | ||
self.fail( | ||
'The lack of a .netrc file should not result in an exception') | ||
|
||
def test_netrc_file_precedence(self): | ||
machine = 'example.com' | ||
|
||
default_auth_file_path = os.path.join(self.default_auth_dir, '.netrc') | ||
user_auth_file_path = os.path.join( | ||
self.user_auth_dir, vcs_base._AUTHENTICATION_CONFIGURATION_FILE) | ||
system_auth_file_path = os.path.join( | ||
self.system_auth_dir, vcs_base._AUTHENTICATION_CONFIGURATION_FILE) | ||
|
||
for path in ( | ||
default_auth_file_path, user_auth_file_path, | ||
system_auth_file_path): | ||
_create_netrc_file(path, textwrap.dedent('''\ | ||
machine %s | ||
password %s | ||
''' % (machine, path))) | ||
|
||
credentials = vcs_base._credentials_for_machine(machine) | ||
self.assertIsNotNone(credentials) | ||
self.assertEqual(len(credentials), 3) | ||
self.assertEqual(credentials[2], default_auth_file_path) | ||
|
||
# Remove default auth file and assert that the user auth file is used | ||
os.unlink(default_auth_file_path) | ||
credentials = vcs_base._credentials_for_machine(machine) | ||
self.assertIsNotNone(credentials) | ||
self.assertEqual(len(credentials), 3) | ||
self.assertEqual(credentials[2], user_auth_file_path) | ||
|
||
# Remove user auth file and assert that the system auth file is used | ||
os.unlink(user_auth_file_path) | ||
credentials = vcs_base._credentials_for_machine(machine) | ||
self.assertIsNotNone(credentials) | ||
self.assertEqual(len(credentials), 3) | ||
self.assertEqual(credentials[2], system_auth_file_path) | ||
|
||
# Remove system auth file and assert that no creds are found | ||
os.unlink(system_auth_file_path) | ||
self.assertIsNone(vcs_base._credentials_for_machine(machine)) | ||
|
||
def test_netrc_file_skip_errors(self): | ||
machine = 'example.com' | ||
|
||
default_auth_file_path = os.path.join(self.default_auth_dir, '.netrc') | ||
user_auth_file_path = os.path.join( | ||
self.user_auth_dir, vcs_base._AUTHENTICATION_CONFIGURATION_FILE) | ||
|
||
_create_netrc_file(default_auth_file_path, 'skip-me-invalid') | ||
|
||
_create_netrc_file(user_auth_file_path, textwrap.dedent('''\ | ||
machine %s | ||
password %s | ||
''' % (machine, user_auth_file_path))) | ||
|
||
credentials = vcs_base._credentials_for_machine(machine) | ||
self.assertIsNotNone(credentials) | ||
self.assertEqual(len(credentials), 3) | ||
self.assertEqual(credentials[2], user_auth_file_path) | ||
|
||
def test_auth_parts(self): | ||
user_auth_file_path = os.path.join( | ||
self.user_auth_dir, vcs_base._AUTHENTICATION_CONFIGURATION_FILE) | ||
user_auth_file_part_path = os.path.join( | ||
self.user_auth_dir, | ||
vcs_base._AUTHENTICATION_CONFIGURATION_PARTS_DIR, 'test.conf') | ||
os.makedirs(os.path.dirname(user_auth_file_part_path)) | ||
|
||
auth_machine = 'auth.example.com' | ||
parts_machine = 'parts.example.com' | ||
|
||
for path in (user_auth_file_path, user_auth_file_part_path): | ||
_create_netrc_file(path, textwrap.dedent('''\ | ||
machine %s | ||
password %s | ||
''' % (auth_machine, path))) | ||
with open(user_auth_file_part_path, 'a') as f: | ||
f.write('machine %s\n' % parts_machine) | ||
f.write('password %s\n' % path) | ||
|
||
credentials = vcs_base._credentials_for_machine(auth_machine) | ||
self.assertIsNotNone(credentials) | ||
self.assertEqual(len(credentials), 3) | ||
self.assertEqual(credentials[2], user_auth_file_path) | ||
|
||
credentials = vcs_base._credentials_for_machine(parts_machine) | ||
self.assertIsNotNone(credentials) | ||
self.assertEqual(len(credentials), 3) | ||
self.assertEqual(credentials[2], user_auth_file_part_path) | ||
|
||
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) | ||
@mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True) | ||
def test_authenticated_urlopen_basic_auth( | ||
self, build_opener_mock, urlopen_mock): | ||
open_mock = build_opener_mock.return_value.open | ||
|
||
machine = 'example.com' | ||
_create_netrc_file( | ||
os.path.join(self.default_auth_dir, '.netrc'), | ||
textwrap.dedent('''\ | ||
machine %s | ||
login username | ||
password password | ||
''' % machine)) | ||
|
||
url = 'https://%s/foo/bar' % machine | ||
vcs_base._authenticated_urlopen(url) | ||
|
||
self.assertFalse(urlopen_mock.mock_calls) | ||
|
||
class _HTTPBasicAuthHandlerMatcher(object): | ||
def __init__(self, test): | ||
self.test = test | ||
|
||
def __eq__(self, other): | ||
manager = other.passwd | ||
self.test.assertEqual( | ||
manager.find_user_password(None, 'example.com'), | ||
('username', 'password')) | ||
return True | ||
|
||
build_opener_mock.assert_called_once_with( | ||
_HTTPBasicAuthHandlerMatcher(self)) | ||
open_mock.assert_called_once_with(url, timeout=None) | ||
|
||
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True) | ||
@mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True) | ||
def test_authenticated_urlopen_token_auth( | ||
self, build_opener_mock, urlopen_mock): | ||
machine = 'example.com' | ||
_create_netrc_file( | ||
os.path.join(self.default_auth_dir, '.netrc'), | ||
textwrap.dedent('''\ | ||
machine %s | ||
password password | ||
''' % machine)) | ||
|
||
url = 'https://%s/foo/bar' % machine | ||
vcs_base._authenticated_urlopen(url) | ||
|
||
self.assertFalse(build_opener_mock.mock_calls) | ||
|
||
class _RequestMatcher(object): | ||
def __init__(self, test): | ||
self.test = test | ||
|
||
def __eq__(self, other): | ||
self.test.assertEqual(other.get_full_url(), url) | ||
self.test.assertEqual( | ||
other.get_header('Private-token'), 'password') | ||
return True | ||
|
||
urlopen_mock.assert_called_once_with( | ||
_RequestMatcher(self), timeout=None) | ||
|
||
|
||
def _create_netrc_file(path, contents): | ||
with open(path, 'w') as f: | ||
f.write(contents) | ||
os.chmod(path, 0o600) |
Oops, something went wrong.