Skip to content

Commit

Permalink
Support authentication via netrc
Browse files Browse the repository at this point in the history
`vcs_base.load_url()` currently doesn't support authentication. Add
support for both basic and token-based authentication by parsing netrc
files.

Signed-off-by: Kyle Fazzari <[email protected]>
  • Loading branch information
kyrofa committed Apr 23, 2020
1 parent 4e0c2f4 commit 7a1be83
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ matrix:
install:
# newer versions of PyYAML dropped support for Python 3.4
- if [ $TRAVIS_PYTHON_VERSION == "3.4" ]; then pip install PyYAML==5.2; fi
- pip install coverage flake8 flake8-docstrings flake8-import-order pytest PyYAML
- pip install coverage flake8 flake8-docstrings flake8-import-order pytest PyYAML mock
script:
- PYTHONPATH=`pwd` pytest -s -v test
notifications:
Expand Down
123 changes: 123 additions & 0 deletions test/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import os
import shutil
import tempfile
import unittest

try:
from urllib.error import HTTPError
except ImportError:
from urllib2 import HTTPError


try:
from unittest import mock
except ImportError:
import mock

from vcstool.clients import vcs_base


class TestBase(unittest.TestCase):

@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
@mock.patch('vcstool.clients.vcs_base._netrc_open', autospec=True)
def test_load_url_calls_urlopen(self, netrc_open_mock, urlopen_mock):
urlopen_read_mock = urlopen_mock.return_value.read

vcs_base.load_url('example.com', timeout=123)

urlopen_mock.assert_called_once_with('example.com', timeout=123)
urlopen_read_mock.assert_called_once_with()
self.assertFalse(netrc_open_mock.mock_calls)

@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
@mock.patch('vcstool.clients.vcs_base._netrc_open', autospec=True)
def test_load_url_calls_netrc_open(self, netrc_open_mock, urlopen_mock):
for code in (401, 404):
urlopen_mock.side_effect = [HTTPError(None, code, None, None, None)]
urlopen_read_mock = urlopen_mock.return_value.read

vcs_base.load_url('example.com', timeout=123)

urlopen_mock.assert_called_once_with('example.com', timeout=123)
self.assertFalse(urlopen_read_mock.mock_calls)

netrc_open_mock.assert_called_once_with('example.com', timeout=123)

netrc_open_mock.reset_mock()
urlopen_mock.reset_mock()

def test_netrc_open_no_such_file(self):
try:
self.assertEqual(vcs_base._netrc_open(
'https://example.com', filename='/non-existent'), None)
except Exception:
self.fail(
'The lack of a .netrc file should not result in an exception')

@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
@mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True)
def test_netrc_open_basic_auth(self, build_opener_mock, urlopen_mock):
open_mock = build_opener_mock.return_value.open

tmpdir = tempfile.mkdtemp()
netrc_file = os.path.join(tmpdir, 'netrc')
machine = 'example.com'
with open(netrc_file, 'w') as f:
f.write('machine %s\n' % machine)
f.write('login username\n')
f.write('password password')

url = 'https://%s/foo/bar' % machine
try:
vcs_base._netrc_open(url, filename=netrc_file, timeout=123)
finally:
shutil.rmtree(tmpdir)

self.assertFalse(urlopen_mock.mock_calls)

class _HTTPBasicAuthHandlerMatcher(object):
def __init__(self, test):
self.test = test

def __eq__(self, other):
manager = other.passwd
self.test.assertEqual(
manager.find_user_password(None, 'example.com'),
('username', 'password'))
return True

build_opener_mock.assert_called_once_with(
_HTTPBasicAuthHandlerMatcher(self))
open_mock.assert_called_once_with(url, timeout=123)

@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
@mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True)
def test_netrc_open_token_auth(self, build_opener_mock, urlopen_mock):
tmpdir = tempfile.mkdtemp()
netrc_file = os.path.join(tmpdir, 'netrc')
machine = 'example.com'
with open(netrc_file, 'w') as f:
f.write('machine %s\n' % machine)
f.write('password password')

url = 'https://%s/foo/bar' % machine
try:
vcs_base._netrc_open(url, filename=netrc_file, timeout=123)
finally:
shutil.rmtree(tmpdir)

self.assertFalse(build_opener_mock.mock_calls)

class _RequestMatcher(object):
def __init__(self, test):
self.test = test

def __eq__(self, other):
self.test.assertEqual(other.get_full_url(), url)
self.test.assertEqual(
other.get_header('Private-token'), 'password')
return True

urlopen_mock.assert_called_once_with(
_RequestMatcher(self), timeout=123)
60 changes: 59 additions & 1 deletion vcstool/clients/vcs_base.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
import errno
import logging
import netrc
import os
import socket
import subprocess
import time
try:
from urllib.request import Request
from urllib.request import urlopen
from urllib.request import HTTPPasswordMgrWithDefaultRealm
from urllib.request import HTTPBasicAuthHandler
from urllib.request import build_opener
from urllib.parse import urlparse
from urllib.error import HTTPError
from urllib.error import URLError
except ImportError:
from urllib2 import HTTPError
from urllib2 import Request
from urllib2 import URLError
from urllib2 import urlopen
from urllib2 import HTTPPasswordMgrWithDefaultRealm
from urllib2 import HTTPBasicAuthHandler
from urllib2 import build_opener
from urlparse import urlparse

try:
from shutil import which # noqa
Expand Down Expand Up @@ -91,7 +102,7 @@ def run_command(cmd, cwd, env=None):

def load_url(url, retry=2, retry_period=1, timeout=10):
try:
fh = urlopen(url, timeout=timeout)
fh = _urlopen_netrc(url, timeout=timeout)
except HTTPError as e:
if e.code == 503 and retry:
time.sleep(retry_period)
Expand Down Expand Up @@ -132,3 +143,50 @@ def test_url(url, retry=2, retry_period=1, timeout=10):
timeout=timeout)
raise URLError(str(e) + ' (%s)' % url)
return response


def _urlopen_netrc(uri, timeout=None):
try:
return urlopen(uri, timeout=timeout)
except HTTPError as e:
if e.code in (401, 404):
# Try again with netrc credentials
result = _netrc_open(uri, timeout=timeout)
if result is not None:
return result
raise


def _netrc_open(uri, filename=None, timeout=None):
parsed_uri = urlparse(uri)
machine = parsed_uri.netloc
if not machine:
return None

opener = None
try:
info = netrc.netrc(filename).authenticators(machine)
if info is None:
# caught below, like other netrc parse errors
raise netrc.NetrcParseError('No authenticators for "%s"' % machine)

(username, _, password) = info
if username and password:
pass_man = HTTPPasswordMgrWithDefaultRealm()
pass_man.add_password(None, machine, username, password)
authhandler = HTTPBasicAuthHandler(pass_man)
opener = build_opener(authhandler)
return opener.open(uri, timeout=timeout)
elif password:
request = Request(uri)
request.add_header('PRIVATE-TOKEN', password)
return urlopen(request, timeout=timeout)
except EnvironmentError as e:
# Don't error just because the user doesn't have a .netrc file
if e.errno != errno.ENOENT:
raise
except netrc.NetrcParseError as neterr:
logging.getLogger(__name__).warn(
'WARNING: parsing .netrc: %s' % str(neterr))

return None

0 comments on commit 7a1be83

Please sign in to comment.