Skip to content

Commit c9fd8d2

Browse files
committed
Add linux_utils.network module (network location awareness)
1 parent 1230997 commit c9fd8d2

File tree

5 files changed

+240
-3
lines changed

5 files changed

+240
-3
lines changed

README.rst

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ following functionality is currently implemented:
1515
- A basic Python API for cryptsetup_ and a Python implementation of
1616
cryptdisks_start_ and cryptdisks_stop_ (with a command line interface).
1717
- Atomic filesystem operations for Linux in Python.
18+
- Simple network location awareness / discovery.
1819

1920
The package is currently tested on cPython 2.7, 3.4, 3.5, 3.6, 3.7 and PyPy
2021
(2.7) on Ubuntu Linux (using `Travis CI`_).

docs/api.rst

+6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ the `linux-utils` package.
4545
.. automodule:: linux_utils.luks
4646
:members:
4747

48+
:mod:`linux_utils.network`
49+
--------------------------
50+
51+
.. automodule:: linux_utils.network
52+
:members:
53+
4854
:mod:`linux_utils.tabfile`
4955
--------------------------
5056

linux_utils/network.py

+159
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# linux-utils: Linux system administration tools for Python.
2+
#
3+
# Author: Peter Odding <[email protected]>
4+
# Last Change: February 9, 2020
5+
# URL: https://linux-utils.readthedocs.io
6+
7+
"""
8+
Python API for Linux networking tools.
9+
10+
The functions in this module make it possible to inspect the current network
11+
configuration of a Linux system, which can provide hints about the physical
12+
location of the system.
13+
"""
14+
15+
# Standard library modules.
16+
import logging
17+
18+
# Modules included in our package.
19+
from linux_utils import coerce_context
20+
21+
# Public identifiers that require documentation.
22+
__all__ = (
23+
'determine_network_location',
24+
'find_gateway_ip',
25+
'find_gateway_mac',
26+
'find_mac_address',
27+
'have_internet_connection',
28+
'logger',
29+
)
30+
31+
# Initialize a logger for this module.
32+
logger = logging.getLogger(__name__)
33+
34+
35+
def determine_network_location(context=None, **gateways):
36+
"""
37+
Determine the physical location of the current system.
38+
39+
This works by matching the MAC address of the current gateway against a set
40+
of known MAC addresses, which provides a simple but robust way to identify
41+
the current network. Because networks tend to have a physical location,
42+
identifying the current network tells us our physical location.
43+
44+
:param gateways: One or more keyword arguments with lists of strings
45+
containing MAC addresses of known networks.
46+
:param context: See :func:`.coerce_context()` for details.
47+
:returns: The name of the matched MAC address (a string) or :data:`None`
48+
when the MAC address of the current gateway is unknown.
49+
50+
Here's an example involving two networks and a physical location with
51+
multiple gateways:
52+
53+
.. code-block:: python
54+
55+
>>> determine_network_location(
56+
... home=['84:9C:A6:76:23:8E'],
57+
... office=['00:15:C5:5F:92:79', 'B6:25:B2:19:28:61'],
58+
... )
59+
'home'
60+
61+
This is used to tweak my desktop environment based on the physical location
62+
of my laptop, for example at home my external monitor is to the right of my
63+
laptop whereas at work it's the other way around, so the :man:`xrandr`
64+
commands to be run differ between the two locations.
65+
"""
66+
context = coerce_context(context)
67+
current_gateway_mac = find_gateway_mac(context)
68+
if current_gateway_mac:
69+
for network_name, known_gateways in gateways.items():
70+
if any(current_gateway_mac.upper() == gateway.upper() for gateway in known_gateways):
71+
logger.info("%s is connected to the %s network.", context, network_name)
72+
return network_name
73+
logger.info(
74+
"%s isn't connected to a known network (unknown gateway MAC address %s).", context, current_gateway_mac
75+
)
76+
else:
77+
logger.info("Failed to determine gateway of %s, assuming network connection is down.", context)
78+
79+
80+
def find_gateway_ip(context=None):
81+
"""
82+
Find the IP address of the current gateway using the ``ip route show`` command.
83+
84+
:param context: See :func:`.coerce_context()` for details.
85+
:returns: The IP address of the gateway (a string) or :data:`None`.
86+
"""
87+
context = coerce_context(context)
88+
logger.debug("Looking for IP address of current gateway ..")
89+
for line in context.capture("ip", "route", "show").splitlines():
90+
tokens = line.split()
91+
logger.debug("Parsing 'ip route show' output: %s", tokens)
92+
if len(tokens) >= 3 and tokens[:2] == ["default", "via"]:
93+
ip_address = tokens[2]
94+
logger.debug("Found gateway IP address: %s", ip_address)
95+
return ip_address
96+
logger.debug("Couldn't find IP address of gateway in 'ip route show' output!")
97+
98+
99+
def find_gateway_mac(context=None):
100+
"""
101+
Find the MAC address of the current gateway using :func:`find_gateway_ip()` and :func:`find_mac_address()`.
102+
103+
:param context: See :func:`.coerce_context()` for details.
104+
:returns: The MAC address of the gateway (a string) or :data:`None`.
105+
"""
106+
context = coerce_context(context)
107+
ip_address = find_gateway_ip(context)
108+
if ip_address:
109+
mac_address = find_mac_address(ip_address, context)
110+
if mac_address:
111+
logger.debug("Found gateway MAC address: %s", mac_address)
112+
return mac_address
113+
logger.debug("Couldn't find MAC address of gateway in 'arp -n' output!")
114+
115+
116+
def find_mac_address(ip_address, context=None):
117+
"""
118+
Determine the MAC address of an IP address using the ``arp -n`` command.
119+
120+
:param ip_address: The IP address we're interested in (a string).
121+
:param context: See :func:`.coerce_context()` for details.
122+
:returns: The MAC address of the IP address (a string) or :data:`None`.
123+
"""
124+
context = coerce_context(context)
125+
logger.debug("Looking for MAC address of %s ..", ip_address)
126+
for line in context.capture("arp", "-n").splitlines():
127+
tokens = line.split()
128+
logger.debug("Parsing 'arp -n' output: %s", tokens)
129+
if len(tokens) >= 3 and tokens[0] == ip_address:
130+
mac_address = tokens[2]
131+
logger.debug("Found MAC address of %s: %s", ip_address, mac_address)
132+
return mac_address
133+
logger.debug("Couldn't find MAC address in 'arp -n' output!")
134+
135+
136+
def have_internet_connection(endpoint="8.8.8.8", context=None):
137+
"""
138+
Check if an internet connection is available using :man:`ping`.
139+
140+
:param endpoint: The public IP address to :man:`ping` (a string).
141+
:param context: See :func:`.coerce_context()` for details.
142+
:returns: :data:`True` if an internet connection is available,
143+
:data:`False` otherwise.
144+
145+
This works by pinging 8.8.8.8 which is one of `Google public DNS servers`_.
146+
This IP address was chosen because it is documented that Google uses
147+
anycast_ to keep this IP address available at all times.
148+
149+
.. _Google public DNS servers: https://developers.google.com/speed/public-dns/
150+
.. _anycast: https://en.wikipedia.org/wiki/Anycast
151+
"""
152+
context = coerce_context(context)
153+
logger.debug("Checking if %s has internet connectivity ..", context)
154+
if context.test("ping", "-c1", "-w1", "8.8.8.8"):
155+
logger.debug("Confirmed that %s has internet connectivity.", context)
156+
return True
157+
else:
158+
logger.debug("No internet connectivity detected on %s.", context)
159+
return False

linux_utils/tests.py

+73-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Test suite for the `linux-utils' Python package.
22
#
33
# Author: Peter Odding <[email protected]>
4-
# Last Change: June 24, 2017
4+
# Last Change: February 9, 2020
55
# URL: https://linux-utils.readthedocs.io
66

77
"""Test suite for the `linux-utils` package."""
@@ -17,7 +17,8 @@
1717
# External dependencies.
1818
from executor import ExternalCommandFailed, execute
1919
from executor.contexts import LocalContext
20-
from humanfriendly.testing import TemporaryDirectory, TestCase, run_cli
20+
from humanfriendly.text import dedent
21+
from humanfriendly.testing import MockedProgram, TemporaryDirectory, TestCase, run_cli
2122
from mock import MagicMock
2223

2324
# The module we're testing.
@@ -35,6 +36,13 @@
3536
lock_filesystem,
3637
unlock_filesystem,
3738
)
39+
from linux_utils.network import (
40+
determine_network_location,
41+
find_gateway_ip,
42+
find_gateway_mac,
43+
find_mac_address,
44+
have_internet_connection,
45+
)
3846
from linux_utils.tabfile import parse_tab_file
3947

4048
# The following files have fixed locations to enable the configuration file
@@ -401,3 +409,66 @@ def test_cryptdisks_start_stop_error_reporting(self):
401409
for fallback in cryptdisks_start_cli, cryptdisks_stop_cli:
402410
returncode, output = run_cli(fallback, TEST_UNKNOWN_TARGET, merged=True)
403411
assert returncode != 0
412+
413+
def test_determine_network_location(self):
414+
"""Test :func:`linux_utils.network.determine_network_location()`."""
415+
with self.mock_arp, self.mock_ip:
416+
# Make sure the happy path works as intended.
417+
assert determine_network_location(home=['80:34:58:ad:6c:f5']) == 'home'
418+
# Make sure nothing bad happens when we're not on a known network.
419+
assert determine_network_location() is None
420+
421+
def test_find_gateway_ip(self):
422+
"""Test :func:`linux_utils.network.find_gateway_ip()`."""
423+
with self.mock_ip:
424+
assert find_gateway_ip() == '192.168.1.1'
425+
426+
def test_find_gateway_mac(self):
427+
"""Test :func:`linux_utils.network.find_gateway_mac()`."""
428+
with self.mock_arp, self.mock_ip:
429+
assert find_gateway_mac() == '80:34:58:ad:6c:f5'
430+
431+
def test_find_mac_address(self):
432+
"""Test :func:`linux_utils.network.find_mac_address()`."""
433+
with self.mock_arp:
434+
assert find_mac_address('192.168.1.4') == '4b:21:f5:49:88:85'
435+
436+
def test_have_internet_connection(self):
437+
"""Test :func:`linux_utils.network.have_internet_connection().`"""
438+
with MockedProgram(name='ping', returncode=0):
439+
assert have_internet_connection() is True
440+
with MockedProgram(name='ping', returncode=1):
441+
assert have_internet_connection() is False
442+
443+
@property
444+
def mock_arp(self):
445+
"""Mocked ``arp`` program."""
446+
return MockedProgram(name='arp', script=dedent("""
447+
cat << EOF
448+
Address HWtype HWaddress Flags Mask Iface
449+
192.168.1.4 ether 4b:21:f5:49:88:85 C wlp3s0
450+
192.168.3.28 ether 3d:a6:19:62:9a:83 C wlp3s0
451+
192.168.3.5 ether c5:4c:8d:56:25:0c C wlp3s0
452+
192.168.1.1 ether 80:34:58:ad:6c:f5 C wlp3s0
453+
192.168.3.2 ether 20:22:a0:22:0c:db C wlp3s0
454+
192.168.1.12 ether ad:12:75:46:e9:70 C wlp3s0
455+
192.168.3.6 ether 08:33:c7:ef:f7:27 C wlp3s0
456+
192.168.1.11 ether c9:0e:95:24:68:31 C wlp3s0
457+
192.168.3.4 ether e7:e6:2c:3b:bc:8a C wlp3s0
458+
192.168.3.3 ether 72:d7:d3:2c:54:93 C wlp3s0
459+
192.168.1.6 ether 95:ef:85:cf:d3:36 C wlp3s0
460+
192.168.3.7 ether 65:c0:be:40:cd:31 C wlp3s0
461+
EOF
462+
"""))
463+
464+
@property
465+
def mock_ip(self):
466+
"""Mocked ``ip route show`` program."""
467+
return MockedProgram(name='ip', script=dedent("""
468+
cat << EOF
469+
default via 192.168.1.1 dev wlp3s0 proto dhcp metric 600
470+
169.254.0.0/16 dev virbr0 scope link metric 1000 linkdown
471+
192.168.0.0/16 dev wlp3s0 proto kernel scope link src 192.168.2.214 metric 600
472+
192.168.122.0/24 dev virbr0 proto kernel scope link src 192.168.122.1 linkdown
473+
EOF
474+
"""))

requirements.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
coloredlogs >= 7.0
22
executor >= 16.0.1
3-
humanfriendly >= 5.0
3+
humanfriendly >= 6.0
44
property-manager >= 2.3
55
six >= 1.10.0

0 commit comments

Comments
 (0)