diff --git a/confconsole.py b/confconsole.py index a97535c..c8450bf 100755 --- a/confconsole.py +++ b/confconsole.py @@ -13,21 +13,17 @@ import os import sys import dialog -import ipaddr from string import Template - -import ifutil -import netinfo import getopt - -import conf - from io import StringIO import traceback import subprocess from subprocess import PIPE, CalledProcessError import shlex +import netinfo + +from libconfconsole import ipaddr, ifutil, conf import plugin PLUGIN_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), diff --git a/debian/confconsole.install b/debian/confconsole.install index f8da7bf..224c115 100644 --- a/debian/confconsole.install +++ b/debian/confconsole.install @@ -1,5 +1,6 @@ -plugins.d/ /usr/lib/confconsole/ -*.py /usr/lib/confconsole/ -conf/* /etc/confconsole/ -share/* /usr/share/confconsole/ -add-water/* /lib/systemd/system +plugins.d/ /usr/lib/confconsole/ +*.py /usr/lib/confconsole/ +libconfconsole/ /usr/lib/python3/dist-packages/ +conf/* /etc/confconsole/ +share/* /usr/share/confconsole/ +add-water/* /lib/systemd/system diff --git a/libconfconsole/__init__.py b/libconfconsole/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/conf.py b/libconfconsole/conf.py similarity index 100% rename from conf.py rename to libconfconsole/conf.py diff --git a/ifutil.py b/libconfconsole/ifutil.py similarity index 62% rename from ifutil.py rename to libconfconsole/ifutil.py index 6c0800d..3932858 100644 --- a/ifutil.py +++ b/libconfconsole/ifutil.py @@ -3,8 +3,9 @@ import os from time import sleep - import subprocess +from typing import Optional, Callable + from netinfo import InterfaceInfo from netinfo import get_hostname @@ -27,8 +28,8 @@ class EtcNetworkInterfaces: def __init__(self): self.read_conf() - def read_conf(self): - self.conf = {} + def read_conf(self) -> None: + self.conf: dict[str, str] = {} self.unconfigured = False ifname = None @@ -42,13 +43,13 @@ def read_conf(self): if not line or line.startswith("#"): continue - if line.startswith("auto"): + if line.startswith("auto") or line.startswith("allow-hotplug"): ifname = line.split()[1] self.conf[ifname] = line + "\n" elif ifname: self.conf[ifname] += line + "\n" - def _get_iface_opts(self, ifname): + def _get_iface_opts(self, ifname: str) -> list[str]: iface_opts = ('pre-up', 'up', 'post-up', 'pre-down', 'down', 'post-down') if ifname not in self.conf: @@ -59,7 +60,7 @@ def _get_iface_opts(self, ifname): for line in ifconf.splitlines() if line.strip().split()[0] in iface_opts] - def _get_bridge_opts(self, ifname): + def _get_bridge_opts(self, ifname: str) -> list: bridge_opts = ('bridge_ports', 'bridge_ageing', 'bridge_bridgeprio', 'bridge_fd', 'bridge_gcinit', 'bridge_hello', 'bridge_hw', 'bridge_maxage', 'bridge_maxwait', @@ -73,7 +74,7 @@ def _get_bridge_opts(self, ifname): for line in ifconf.splitlines() if line.strip().split()[0] in bridge_opts] - def write_conf(self, ifname, ifconf): + def write_conf(self, ifname: str, ifconf: str) -> None: self.read_conf() if not self.unconfigured: raise IfError(f"refusing to write to {self.CONF_FILE}\n" @@ -89,58 +90,79 @@ def write_conf(self, ifname, ifconf): with open(self.CONF_FILE, "w") as fob: fob.write(self.HEADER_UNCONFIGURED+'\n') - fob.write("# remove the above line if you edit this file\n\n") - - fob.write("auto lo\n") - fob.write("iface lo inet loopback\n\n") - - fob.write(ifconf+'\n') - - for c in self.conf: - if c in ('lo', ifname): - continue - - fob.write(self.conf[c] + '\n') - - def set_dhcp(self, ifname): + fob.write("# remove the above line if you edit this file") + + for iface in self.conf.keys(): + if iface: + fob.write('\n\n') + if iface == ifname: + fob.writelines(ifconf.rstrip()) + else: + fob.writelines(self.conf[iface].rstrip()) + fob.write('\n') + + @staticmethod + def _preproc_if(ifname_conf: str) -> list[str]: + lines = ifname_conf.splitlines() + if len(lines) == 2: + return lines + new_lines = [] hostname = get_hostname() - ifconf = [f"auto {ifname}\niface {ifname} inet dhcp"] - - if hostname: - ifconf.append(f" hostname {hostname}") + for line in lines: + _line = line.lstrip() + if (_line.startswith('allow-hotplug') + or _line.startswith('auto') + or _line.startswith('iface') + or _line.startswith('wpa-conf')): + new_lines.append(line) + elif _line.startswith('hostname'): + if hostname: + new_lines.append(f' hostname {hostname}') + else: + continue + elif (_line.startswith('address') + or _line.startswith('netmask') + or _line.startswith('gateway') + or _line.startswith('dns-nameserver')): + continue + else: + raise IfError(f'Unexpected config line: {line}') + return new_lines - ifconf = "\n".join(ifconf) - self.write_conf(ifname, ifconf) + def set_dhcp(self, ifname: str) -> None: + ifconf = self._preproc_if(self.conf[ifname]) + ifconf[1] = f'iface {ifname} inet dhcp' - def set_manual(self, ifname): - ifconf = f"auto {ifname}\niface {ifname} inet manual" - self.write_conf(ifname, ifconf) + ifconf_str = "\n".join(ifconf) + self.write_conf(ifname, ifconf_str) - def set_static(self, ifname, addr, netmask, gateway=None, nameservers=[]): - hostname = get_hostname() - ifconf = [f"auto {ifname}", - f"iface {ifname} inet static"] + def set_manual(self, ifname: str) -> None: + ifconf = self._preproc_if(self.conf[ifname]) + ifconf[1] = f'iface {ifname} inet manual' + ifconf_str = "\n".join(ifconf) + self.write_conf(ifname, ifconf_str) - if hostname: - ifconf.append(f" hostname {hostname}") + def set_static(self, ifname: str, addr: str, netmask: str, + gateway: str = None, nameservers: list = None + ) -> None: + ifconf = self._preproc_if(self.conf[ifname]) + ifconf[1] = f'iface {ifname} inet static' ifconf.extend([f" address {addr}", f" netmask {netmask}"]) - if gateway: ifconf.append(f" gateway {gateway}") - if nameservers: ifconf.append(f" dns-nameservers {' '.join(nameservers)}") - ifconf = "\n".join(ifconf) - self.write_conf(ifname, ifconf) + ifconf_str = "\n".join(ifconf) + self.write_conf(ifname, ifconf_str) class EtcNetworkInterface: """enumerate interface information from /etc/network/interfaces""" - def __init__(self, ifname): + def __init__(self, ifname: str): self.ifname = ifname interfaces = EtcNetworkInterfaces() @@ -149,7 +171,7 @@ def __init__(self, ifname): if ifname in interfaces.conf: self.conflines = interfaces.conf[ifname].splitlines() - def _parse_attr(self, attr): + def _parse_attr(self, attr: str) -> list[str]: for line in self.conflines: vals = line.strip().split() @@ -172,7 +194,7 @@ def method(self): def dns_nameservers(self): return self._parse_attr('dns-nameservers')[1:] - def __getattr__(self, attrname): + def __getattr__(self, attrname: str) -> list[str]: # attributes with multiple values will be returned in an array # exception: dns-nameservers always returns in array (expected) @@ -181,19 +203,19 @@ def __getattr__(self, attrname): if len(values) > 2: return values[1:] elif len(values) > 1: - return values[1] + return [values[1]] - return + return [] -def get_nameservers(ifname): +def get_nameservers(ifname: str) -> list[str]: # /etc/network/interfaces (static) interface = EtcNetworkInterface(ifname) if interface.dns_nameservers: return interface.dns_nameservers - def parse_resolv(path): + def parse_resolv(path: str) -> list[str]: nameservers = [] with open(path, 'r') as fob: for line in fob: @@ -220,26 +242,29 @@ def parse_resolv(path): return [] -def ifup(ifname): - return subprocess.check_output(["ifup", ifname]) +def ifup(ifname: str) -> str: + return subprocess.check_output(["ifup", ifname], text=True) -def ifdown(ifname): - return subprocess.check_output(["ifdown", ifname]) +def ifdown(ifname: str) -> str: + return subprocess.check_output(["ifdown", ifname], text=True) -def unconfigure_if(ifname): +def unconfigure_if(ifname: str) -> Optional[str]: try: ifdown(ifname) interfaces = EtcNetworkInterfaces() interfaces.set_manual(ifname) subprocess.check_output(['ifconfig', ifname, '0.0.0.0']) ifup(ifname) + return None except subprocess.CalledProcessError as e: return str(e) -def set_static(ifname, addr, netmask, gateway, nameservers): +def set_static(ifname: str, addr: str, netmask: str, + gateway: str, nameservers: list[str] + ) -> Optional[str]: try: ifdown(ifname) interfaces = EtcNetworkInterfaces() @@ -252,13 +277,13 @@ def set_static(ifname, addr, netmask, gateway, nameservers): net = InterfaceInfo(ifname) if not net.addr: - raise IfError('Error obtaining IP address\n\n%s' % output) - + raise IfError(f'Error obtaining IP address\n\n{output}') + return None except Exception as e: return str(e) -def set_dhcp(ifname): +def set_dhcp(ifname: str) -> Optional[str]: try: ifdown(ifname) interfaces = EtcNetworkInterfaces() @@ -267,18 +292,20 @@ def set_dhcp(ifname): net = InterfaceInfo(ifname) if not net.addr: - raise IfError('Error obtaining IP address\n\n%s' % output) - + raise IfError(f'Error obtaining IP address\n\n{output}') + return None except Exception as e: return str(e) -def get_ipconf(ifname, error=False): +def get_ipconf(ifname: str, error: bool = False + ) -> tuple[Callable[[], Optional[str]], + Optional[str], Optional[str], list[str]]: net = InterfaceInfo(ifname) - return (net.addr, net.netmask, - net.get_gateway(error), get_nameservers(ifname)) + return (net.addr, net.netmask, net.get_gateway(error), + get_nameservers(ifname)) -def get_ifmethod(ifname): +def get_ifmethod(ifname: str) -> str: interface = EtcNetworkInterface(ifname) return interface.method diff --git a/ipaddr.py b/libconfconsole/ipaddr.py similarity index 71% rename from ipaddr.py rename to libconfconsole/ipaddr.py index a4d0911..3a6f575 100644 --- a/ipaddr.py +++ b/libconfconsole/ipaddr.py @@ -6,7 +6,7 @@ import math -def is_legal_ip(ip): +def is_legal_ip(ip: str) -> bool: try: if len([octet for octet in ip.split(".") if 255 >= int(octet) >= 0]) != 4: @@ -22,23 +22,23 @@ def is_legal_ip(ip): return True -def _str2int(ip): +def _str2int(ip: str) -> str: bytes = list(map(int, ip.split('.'))) ip, = struct.unpack("!L", struct.pack("BBBB", *bytes)) return ip -def _int2str(num): +def _int2str(num: int) -> str: bytes = struct.unpack("BBBB", struct.pack("!L", num)) return '.'.join(list(map(str, bytes))) -class Error(Exception): +class IpError(Exception): pass class IP(int): - def __new__(cls, arg): + def __new__(cls, arg: str) -> IP: if isinstance(arg, IP): return int.__new__(cls, int(arg)) @@ -47,18 +47,19 @@ def __new__(cls, arg): else: if not is_legal_ip(arg): - raise Error("illegal ip (%s)" % arg) + raise IpError(f"illegal ip ({arg})") return int.__new__(cls, _str2int(arg)) - def __str__(self): + def __str__(self) -> str: return _int2str(self) - def __repr__(self): - return "IP(%r)" % str(self) + def __repr__(self) -> str: + return f"IP({str(self)})" - def _numeric_method(method): - def f(self, other): + @staticmethod + def _numeric_method(method: str): + def f(self, other: str): return IP(getattr(int, method)(self, other)) return f @@ -72,25 +73,25 @@ def f(self, other): class IPRange: @classmethod - def from_cidr(cls, arg): + def from_cidr(cls, arg: str) -> IPRange: address, cidr = arg.split('/') netmask = 2 ** 32 - (2 ** (32 - int(cidr))) return cls(address, netmask) - def __init__(self, ip, netmask): + def __init__(self, ip: str, netmask: str): self.ip = IP(ip) self.netmask = IP(netmask) self.network = self.ip & self.netmask self.broadcast = self.network + 2 ** 32 - self.netmask - 1 self.cidr = int(32 - math.log(2 ** 32 - self.netmask, 2)) - def __contains__(self, ip): + def __contains__(self, ip: str) -> bool: return self.network < IP(ip) < self.broadcast - def __repr__(self): - return "IPRange('%s', '%s')" % (self.ip, self.netmask) + def __repr__(self) -> str: + return f"IPRange('{self.ip}', '{self.netmask}')" - def fmt_cidr(self): - return "%s/%d" % (self.ip, self.cidr) + def fmt_cidr(self) -> str: + return f"{self.ip}/{self.cidr}" __str__ = fmt_cidr