diff --git a/.github/workflows/autils_migration_announcement.yml b/.github/workflows/autils_migration_announcement.yml index b0433e5dab..2f93a20787 100644 --- a/.github/workflows/autils_migration_announcement.yml +++ b/.github/workflows/autils_migration_announcement.yml @@ -17,6 +17,7 @@ on: - '**/output.py' - '**/path.py' - '**/process.py' + - '**/cpu.py' - '**/script.py' - '**/wait.py' diff --git a/.pylintrc_utils b/.pylintrc_utils index 82c5f7c718..de774353b7 100644 --- a/.pylintrc_utils +++ b/.pylintrc_utils @@ -7,7 +7,7 @@ extension-pkg-whitelist=netifaces # Add files or directories to the blacklist. They should be base names, not # paths. -ignore=CVS,archive.py,asset.py,astring.py,aurl.py,build.py,cloudinit.py,cpu.py,data_factory.py,datadrainer.py,debug.py,diff_validator.py,disk.py,distro.py,dmesg.py,download.py,exit_codes.py,file_utils.py,filelock.py,git.py,iso9660.py,kernel.py,linux.py,linux_modules.py,lv_utils.py,memory.py,multipath.py,nvme.py,partition.py,pci.py,pmem.py,podman.py,service.py,softwareraid.py,ssh.py,stacktrace.py,sysinfo.py,vmimage.py,wait.py,gdbmi_parser.py,spark.py,distro_packages.py,inspector.py,main.py,manager.py,apt.py,base.py,dnf.py,dpkg.py,rpm.py,yum.py,zypper.py,deprecation.py +ignore=CVS,archive.py,asset.py,astring.py,aurl.py,build.py,cloudinit.py,data_factory.py,datadrainer.py,debug.py,diff_validator.py,disk.py,distro.py,dmesg.py,download.py,exit_codes.py,file_utils.py,filelock.py,git.py,iso9660.py,kernel.py,linux.py,linux_modules.py,lv_utils.py,memory.py,multipath.py,nvme.py,partition.py,pci.py,pmem.py,podman.py,service.py,softwareraid.py,ssh.py,stacktrace.py,sysinfo.py,vmimage.py,wait.py,gdbmi_parser.py,spark.py,distro_packages.py,inspector.py,main.py,manager.py,apt.py,base.py,dnf.py,dpkg.py,rpm.py,yum.py,zypper.py,deprecation.py # regex matches against base names, not paths. ignore-patterns=.git diff --git a/avocado/utils/cpu.py b/avocado/utils/cpu.py index c5e57a76ea..18fa04a443 100644 --- a/avocado/utils/cpu.py +++ b/avocado/utils/cpu.py @@ -16,8 +16,12 @@ # Original author: John Admanski -""" -Get information from the current's machine CPU. +"""Utilities for querying and managing CPU information on the current machine. + +This module provides functions to read CPU details from /proc/cpuinfo and sysfs, +including architecture, vendor, version, online/offline status, NUMA topology, +and frequency governor settings. It supports x86_64, i386, powerpc, s390, +aarch64, and other architectures. """ import glob import logging @@ -44,12 +48,11 @@ class FamilyException(Exception): - pass + """Raised when CPU family cannot be determined for the current architecture.""" def _list_matches(content_list, pattern): - """ - Checks if any item in list matches the specified pattern. + """Check if any item in list matches the specified pattern. :param content_list: items to match :type content_list: list @@ -66,8 +69,7 @@ def _list_matches(content_list, pattern): def _get_info(): - """ - Returns info on the 1st CPU entry from /proc/cpuinfo as a list of lines. + """Return info on the 1st CPU entry from /proc/cpuinfo as a list of lines. :return: `list` of lines 1st CPU entry from /proc/cpuinfo file :rtype: list @@ -82,8 +84,7 @@ def _get_info(): def _get_status(cpu): - """ - Check if a CPU is online or offline. + """Check if a CPU is online or offline. :param cpu: CPU number (e.g. 1, 2 or 39) :type cpu: int @@ -99,8 +100,7 @@ def _get_status(cpu): def cpu_has_flags(flags): - """ - Check if a list of flags are available on current CPU info. + """Check if a list of flags are available on current CPU info. :param flags: A `list` of cpu flags that must exists on the current CPU. :type flags: list of str @@ -119,8 +119,7 @@ def cpu_has_flags(flags): def get_version(): - """ - Get cpu version. + """Get cpu version. :return: cpu version of given machine e.g.:- 'i5-5300U' for Intel and 'POWER9' for IBM machines in @@ -148,8 +147,7 @@ def get_version(): def get_vendor(): - """ - Get the current cpu vendor name. + """Get the current cpu vendor name. :return: a key of :data:`VENDORS_MAP` (e.g. 'intel') depending on the current CPU architecture. Return None if it was unable to @@ -165,12 +163,11 @@ def get_vendor(): def get_revision(): - """ - Get revision from /proc/cpuinfo + """Get revision from /proc/cpuinfo. - :return: revision entry from /proc/cpuinfo file - e.g.:- '0080' for IBM POWER10 machine - :rtype: str + :return: Revision entry from /proc/cpuinfo (e.g. '0080' for IBM POWER10), + or None if no revision line is found. + :rtype: str or None """ rev = None proc_cpuinfo = genio.read_file("/proc/cpuinfo") @@ -181,10 +178,10 @@ def get_revision(): def get_va_bits(): - """ - Check for VA address bit size in /proc/cpuinfo + """Get virtual address bit size from /proc/cpuinfo (x86). - :return: VA address bit size + :return: VA address bit size as string (e.g. '48'), or empty string + if not found or on non-x86 architectures. :rtype: str """ cpu_info = genio.read_file("/proc/cpuinfo") @@ -195,7 +192,11 @@ def get_va_bits(): def get_arch(): - """Work out which CPU architecture we're running on.""" + """Detect the CPU architecture of the current machine. + + :return: Architecture string (e.g. 'x86_64', 'powerpc', 's390', 'aarch64'). + :rtype: str + """ cpu_table = [ (b"^cpu.*(RS64|Broadband Engine)", "powerpc"), (rb"^cpu.*POWER\d+", "powerpc"), @@ -234,7 +235,14 @@ def get_arch(): def get_family(): - """Get family name of the cpu like Broadwell, Haswell, power8, power9.""" + """Get CPU family or microarchitecture name. + + :return: Family string (e.g. 'broadwell', 'haswell', 'power8', 'power9', + 'z15') depending on architecture. + :rtype: str + :raises FamilyException: When family cannot be determined. + :raises NotImplementedError: On unsupported architectures. + """ family = None arch = get_arch() if arch in ("x86_64", "i386"): @@ -284,8 +292,11 @@ def get_family(): def get_model(): - """ - Get model of cpu + """Get CPU model number (x86 only). + + :return: Model integer from /proc/cpuinfo, or None if not found. + :rtype: int or None + :raises NotImplementedError: On non-x86 architectures. """ arch = get_arch() if arch == "x86_64": @@ -301,15 +312,14 @@ def get_model(): def get_x86_amd_zen(family=None, model=None): - """ - Get the AMD Zen architecture's version of the x86_AMD CPU - :param family: AMD family - :type family: int - :param model: AMD model - :type model: int + """Get the AMD Zen architecture version for x86 AMD CPUs. - :return: AMD Zen - :rtype: int + :param family: AMD CPU family (default: from get_family()). + :type family: int or None + :param model: AMD CPU model (default: from get_model()). + :type model: int or None + :return: Zen generation (1-6), or None if not an AMD Zen CPU. + :rtype: int or None """ x86_amd_zen = { @@ -323,9 +333,9 @@ def get_x86_amd_zen(family=None, model=None): 6: [(0x50, 0x5F), (0x80, 0xAF), (0xC0, 0xCF)], }, } - if not family: + if family is None: family = get_family() - if not model: + if model is None: model = get_model() for _family, _zen_model in x86_amd_zen.items(): @@ -337,7 +347,11 @@ def get_x86_amd_zen(family=None, model=None): def online_list(): - """Reports a list of indexes of the online cpus.""" + """Report a list of indexes of the online CPUs. + + :return: List of online CPU indices. + :rtype: list of int + """ cpus = [] search_str = b"processor" index = 2 @@ -352,21 +366,42 @@ def online_list(): def total_count(): - """Return Number of Total cpus in the system including offline cpus.""" + """Return number of total CPUs in the system including offline CPUs. + + :return: Total CPU count. + :rtype: int + """ return os.sysconf("SC_NPROCESSORS_CONF") def online_count(): - """Return Number of Online cpus in the system.""" + """Return number of online CPUs in the system. + + :return: Online CPU count. + :rtype: int + """ return os.sysconf("SC_NPROCESSORS_ONLN") def is_hotpluggable(cpu): + """Check whether a CPU can be hot-plugged (offlined/onlined). + + :param cpu: CPU index to check. + :type cpu: int + :return: True if the CPU has an 'online' sysfs interface. + :rtype: bool + """ return os.path.exists(f"/sys/devices/system/cpu/cpu{cpu}/online") def online(cpu): - """Online given CPU.""" + """Bring a CPU online. + + :param cpu: CPU index to bring online. + :type cpu: int + :return: 1 on success, 0 on failure (requires root). + :rtype: int + """ if _get_status(cpu) is False: with open( f"/sys/devices/system/cpu/cpu{cpu}/online", "wb" @@ -378,7 +413,13 @@ def online(cpu): def offline(cpu): - """Offline given CPU.""" + """Take a CPU offline. + + :param cpu: CPU index to take offline. + :type cpu: int + :return: 0 on success, 1 on failure (requires root). + :rtype: int + """ if _get_status(cpu): with open( f"/sys/devices/system/cpu/cpu{cpu}/online", "wb" @@ -390,10 +431,9 @@ def offline(cpu): def get_idle_state(): - """ - Get current cpu idle values. + """Get current CPU idle state values. - :return: Dict of cpuidle states values for all cpus + :return: Dict of cpuidle state values for all CPUs :rtype: dict """ cpus_list = online_list() @@ -419,10 +459,15 @@ def get_idle_state(): def _bool_to_binary(value): - """ - Turns a Python boolean value (True or False) into binary data. + """Turn a Python boolean value (True or False) into binary data. This function is suitable for writing to /proc/* and /sys/* files. + + :param value: Boolean to convert. + :type value: bool + :return: b'1' for True, b'0' for False. + :rtype: bytes + :raises TypeError: When value is not a boolean. """ if value is True: return b"1" @@ -432,8 +477,7 @@ def _bool_to_binary(value): def set_idle_state(state_number="all", disable=True, setstate=None): - """ - Set/Reset cpu idle states for all cpus. + """Set or reset CPU idle states for all CPUs. :param state_number: cpuidle state number, default: `all` all states :type state_number: str @@ -488,12 +532,13 @@ def set_idle_state(state_number="all", disable=True, setstate=None): def set_freq_governor(governor="random"): - """ - To change the given cpu frequency governor. + """Change the CPU frequency governor for all CPUs. - :param governor: frequency governor profile name whereas `random` is default - option to choose random profile among available ones. + :param governor: Governor name (e.g. 'performance', 'powersave'), or + 'random' to pick one randomly from available governors. :type governor: str + :return: True on success, False on failure. + :rtype: bool """ avl_gov_file = "/sys/devices/system/cpu/cpu0/cpufreq/scaling_available_governors" cur_gov_file = "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor" @@ -535,7 +580,11 @@ def set_freq_governor(governor="random"): def get_freq_governor(): - """Get current cpu frequency governor.""" + """Get the current CPU frequency governor. + + :return: Governor name (e.g. 'performance'), or empty string on error. + :rtype: str + """ cur_gov_file = "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor" try: with open(cur_gov_file, "r") as fl: # pylint: disable=W1514 @@ -546,13 +595,12 @@ def get_freq_governor(): def get_pid_cpus(pid): - """ - Get all the cpus being used by the process according to pid informed. + """Get CPU indices used by a process (from ``/proc//task/*/stat``). - :param pid: process id - :type pid: str - :return: A list include all cpus the process is using - :rtype: list + :param pid: Process ID. + :type pid: int or str + :return: List of CPU index strings the process threads are running on. + :rtype: list of str """ # processor id index is defined according proc documentation # the negative index is necessary because backward data @@ -571,14 +619,10 @@ def get_pid_cpus(pid): def get_numa_node_has_cpus(): - """ - Get the list NUMA node numbers which has CPU's on the system, - if there is no CPU associated to NUMA node,Those NUMA node number - will not be appended to list. + """Get NUMA node numbers that have CPUs assigned. - :return: A list where NUMA node numbers only which has - CPU's - as elements of The list. - :rtype: List + :return: List of NUMA node identifiers that have CPUs. + :rtype: list of str """ cpu_path = "/sys/devices/system/node/has_cpu" delim = ",", "-" @@ -589,12 +633,10 @@ def get_numa_node_has_cpus(): def numa_nodes_with_assigned_cpus(): - """ - Get NUMA nodes with associated CPU's on the system. + """Get NUMA nodes with their associated CPU indices. - :return: A dictionary,in which "NUMA node numbers" as key - and "NUMA node associated CPU's" as values. - :rtype: dictionary + :return: Dict mapping NUMA node ID to sorted list of CPU indices. + :rtype: dict """ numa_nodes_with_cpus = {} for path in glob.glob("/sys/devices/system/node/node[0-9]*"): @@ -610,17 +652,23 @@ def numa_nodes_with_assigned_cpus(): def _deprecated(newfunc, oldfuncname): - """ - Print a warning to user and return the new function. + """Print a deprecation warning and return the new function. - :param newfunc: new function to be assigned - :param oldfunctionname: Old function name string - :rtype: `function` + :param newfunc: New function to be assigned. + :param oldfuncname: Old function name string. + :return: Wrapper that warns and calls newfunc. + :rtype: function """ def wrap(*args, **kwargs): - fmt_str = f"avocado.utils.cpu.{oldfuncname}() it is getting deprecat" - fmt_str += f"ed, Use avocado.utils.cpu.{newfunc.__name__}() instead" + """Wrapper that emits deprecation warning and delegates to newfunc. + + :param args: Positional arguments passed through to newfunc. + :param kwargs: Keyword arguments passed through to newfunc. + :return: Result of newfunc(`*args`, `**kwargs`). + :rtype: any + """ + fmt_str = f"avocado.utils.cpu.{oldfuncname}() is deprecated, please use avocado.utils.cpu.{newfunc.__name__}() instead" warnings.warn((fmt_str), DeprecationWarning, stacklevel=2) return newfunc(*args, **kwargs) @@ -628,17 +676,12 @@ def wrap(*args, **kwargs): def lscpu(): - """ - Get Cores per socket, Physical sockets and Physical chips - by executing 'lscpu' command. - - :rtype: `dict_obj` with the following details - :cores per socket: - :physical sockets: - :physical chips: - :threads per core: - :sockets: - :chips: physical sockets * physical chips + """Get CPU topology by executing the 'lscpu' command. + + :return: Dict with keys such as 'cores_per_chip', 'physical_sockets', + 'physical_chips', 'threads_per_core', 'sockets', 'chips', + 'physical_cores' (depending on lscpu output). + :rtype: dict """ output = process.run("LANG=en_US.UTF-8;lscpu", shell=True) res = {} @@ -679,3 +722,8 @@ def lscpu(): set_cpuidle_state = _deprecated(set_idle_state, "set_cpuidle_state") set_cpufreq_governor = _deprecated(set_freq_governor, "set_cpufreq_governor") get_cpufreq_governor = _deprecated(get_freq_governor, "get_cpufreq_governor") + +# pylint: disable=wrong-import-position +from avocado.utils.deprecation import log_deprecation + +log_deprecation.warning("cpu") diff --git a/selftests/check.py b/selftests/check.py index 4fd6dc98d3..e50929baff 100755 --- a/selftests/check.py +++ b/selftests/check.py @@ -27,7 +27,7 @@ "job-api-check-tmp-directory-exists": 1, "nrunner-interface": 90, "nrunner-requirement": 28, - "unit": 976, + "unit": 1005, "jobs": 11, "functional-parallel": 368, "functional-serial": 7, diff --git a/selftests/unit/utils/cpu.py b/selftests/unit/utils/cpu.py index 3bda99f013..c3410b664a 100644 --- a/selftests/unit/utils/cpu.py +++ b/selftests/unit/utils/cpu.py @@ -127,6 +127,14 @@ def test_cpu_vendor_power9(self): ): self.assertEqual(cpu.get_vendor(), "ibm") + def test_get_vendor_none(self): + """Test get_vendor returns None when cpuinfo matches no known vendor.""" + cpuinfo = b"processor : 0\ncpu : Unknown\n" + with unittest.mock.patch( + "builtins.open", return_value=self._get_file_mock(cpuinfo) + ): + self.assertIsNone(cpu.get_vendor()) + def test_s390x_get_version(self): with unittest.mock.patch( "avocado.utils.cpu.platform.machine", return_value="s390x" @@ -145,6 +153,13 @@ def test_intel_get_version(self): ): self.assertEqual(cpu.get_version(), "i7-4710MQ") + def test_get_version_unsupported_arch(self): + """Test get_version returns empty string when arch has no version pattern.""" + with unittest.mock.patch( + "builtins.open", return_value=self._get_data_mock("risc_v") + ): + self.assertEqual(cpu.get_version(), "") + def test_power8_get_version(self): with unittest.mock.patch( "avocado.utils.cpu.platform.machine", return_value="powerpc" @@ -180,6 +195,24 @@ def test_intel_get_family(self): ): self.assertEqual(cpu.get_family(), "broadwell") + def test_get_family_intel_file_not_found(self): + """Test get_family raises FamilyException when pmu_name missing (Intel).""" + with unittest.mock.patch("avocado.utils.cpu.get_arch", return_value="x86_64"): + with unittest.mock.patch( + "avocado.utils.cpu.get_vendor", return_value="intel" + ): + with unittest.mock.patch( + "builtins.open", side_effect=FileNotFoundError("No such file") + ): + with self.assertRaises(cpu.FamilyException): + cpu.get_family() + + def test_get_family_not_implemented(self): + """Test get_family raises NotImplementedError for unsupported arch.""" + with unittest.mock.patch("avocado.utils.cpu.get_arch", return_value="aarch64"): + with self.assertRaises(NotImplementedError): + cpu.get_family() + def test_power8_get_family(self): with unittest.mock.patch("avocado.utils.cpu.get_arch", return_value="powerpc"): with unittest.mock.patch( @@ -205,6 +238,19 @@ def test_get_idle_state_off(self): with unittest.mock.patch("builtins.open", mocked_open): self.assertEqual(cpu.get_idle_state(), retval) + def test_get_idle_state_io_error(self): + """Test get_idle_state handles IOError when reading state file.""" + with unittest.mock.patch("avocado.utils.cpu.online_list", return_value=[0]): + with unittest.mock.patch( + "glob.glob", + return_value=["/sys/devices/system/cpu/cpu0/cpuidle/state1"], + ): + with unittest.mock.patch( + "builtins.open", side_effect=IOError("Permission denied") + ): + result = cpu.get_idle_state() + self.assertEqual(result, {0: {}}) + def test_get_idle_state_on(self): retval = {0: {0: True}} with unittest.mock.patch("avocado.utils.cpu.online_list", return_value=[0]): @@ -271,6 +317,182 @@ def test_set_idle_state_disable(self): with self.assertRaises(TypeError): cpu.set_idle_state(disable=1) + def test_set_idle_state_io_error(self): + """Test set_idle_state handles IOError when writing state file.""" + with unittest.mock.patch("avocado.utils.cpu.online_list", return_value=[0]): + with unittest.mock.patch( + "glob.glob", + return_value=["/sys/devices/system/cpu/cpu0/cpuidle/state1"], + ): + with unittest.mock.patch( + "builtins.open", side_effect=IOError("Permission denied") + ): + cpu.set_idle_state() + + def test_get_revision(self): + """Test get_revision parses revision from cpuinfo.""" + cpuinfo = "processor : 0\nrevision : 0080\n" + with unittest.mock.patch( + "avocado.utils.cpu.genio.read_file", return_value=cpuinfo + ): + self.assertEqual(cpu.get_revision(), "0080") + + def test_get_revision_no_revision(self): + """Test get_revision returns None when no revision line.""" + with unittest.mock.patch( + "avocado.utils.cpu.genio.read_file", return_value="processor : 0\n" + ): + self.assertIsNone(cpu.get_revision()) + + def test_get_va_bits(self): + """Test get_va_bits extracts VA bits from address sizes line.""" + cpuinfo = "address sizes : 39 bits physical, 48 bits virtual\n" + with unittest.mock.patch( + "avocado.utils.cpu.genio.read_file", return_value=cpuinfo + ): + self.assertEqual(cpu.get_va_bits(), "48") + + def test_get_va_bits_empty(self): + """Test get_va_bits returns empty string when no address sizes.""" + with unittest.mock.patch( + "avocado.utils.cpu.genio.read_file", return_value="processor : 0\n" + ): + self.assertEqual(cpu.get_va_bits(), "") + + def test_get_model_x86_64(self): + """Test get_model extracts model for x86_64.""" + with unittest.mock.patch("avocado.utils.cpu.get_arch", return_value="x86_64"): + with unittest.mock.patch( + "builtins.open", return_value=self._get_data_mock("x86_64") + ): + result = cpu.get_model() + self.assertIsInstance(result, int) + self.assertGreaterEqual(result, 0) + + def test_get_model_not_implemented(self): + """Test get_model raises NotImplementedError for non-x86_64.""" + with unittest.mock.patch("avocado.utils.cpu.get_arch", return_value="powerpc"): + with self.assertRaises(NotImplementedError): + cpu.get_model() + + def test_get_model_no_model_line(self): + """Test get_model returns None when cpuinfo has no model line.""" + cpuinfo = b"processor : 0\nvendor_id : GenuineIntel\n" + with unittest.mock.patch("avocado.utils.cpu.get_arch", return_value="x86_64"): + with unittest.mock.patch( + "builtins.open", return_value=self._get_file_mock(cpuinfo) + ): + self.assertIsNone(cpu.get_model()) + + def test_get_x86_amd_zen_with_params(self): + """Test get_x86_amd_zen with explicit family/model.""" + self.assertEqual(cpu.get_x86_amd_zen(family=23, model=0), 1) + self.assertEqual(cpu.get_x86_amd_zen(family=23, model=0x30), 2) + self.assertEqual(cpu.get_x86_amd_zen(family=25, model=0), 3) + self.assertIsNone(cpu.get_x86_amd_zen(family=0x99, model=0)) + + def test_get_x86_amd_zen_family_match_model_no_match(self): + """Test get_x86_amd_zen returns None when family matches but model outside ranges.""" + # Family 0x17 (Zen) but model 0x80 is in gap between Zen 1/2 ranges + self.assertIsNone(cpu.get_x86_amd_zen(family=23, model=0x80)) + + def test_total_count(self): + """Test total_count returns sysconf value.""" + with unittest.mock.patch("os.sysconf", return_value=8): + self.assertEqual(cpu.total_count(), 8) + + def test_deprecated_alias(self): + """Test deprecated total_cpus_count alias calls total_count.""" + with unittest.mock.patch("os.sysconf", return_value=8): + with self.assertWarns(DeprecationWarning): + self.assertEqual(cpu.total_cpus_count(), 8) + + def test_online_count(self): + """Test online_count returns sysconf value.""" + with unittest.mock.patch("os.sysconf", return_value=8): + self.assertEqual(cpu.online_count(), 8) + + def test_is_hotpluggable_true(self): + """Test is_hotpluggable when cpu online file exists.""" + with unittest.mock.patch("os.path.exists", return_value=True): + self.assertTrue(cpu.is_hotpluggable(1)) + + def test_is_hotpluggable_false(self): + """Test is_hotpluggable when cpu online file does not exist.""" + with unittest.mock.patch("os.path.exists", return_value=False): + self.assertFalse(cpu.is_hotpluggable(0)) + + def test_online_already_online(self): + """Test online returns 1 when CPU is already online.""" + with unittest.mock.patch("avocado.utils.cpu._get_status", return_value=True): + self.assertEqual(cpu.online(1), 1) + + def test_online_cpu_failure(self): + """Test online returns 0 when write to sysfs does not persist.""" + with unittest.mock.patch( + "avocado.utils.cpu._get_status", side_effect=[False, False] + ): + with unittest.mock.patch("builtins.open", unittest.mock.mock_open()): + self.assertEqual(cpu.online(1), 0) + + def test_offline_already_offline(self): + """Test offline returns 0 when CPU is already offline.""" + with unittest.mock.patch("avocado.utils.cpu._get_status", return_value=False): + self.assertEqual(cpu.offline(1), 0) + + def test_offline_cpu_failure(self): + """Test offline returns 1 when write to sysfs does not persist.""" + with unittest.mock.patch( + "avocado.utils.cpu._get_status", side_effect=[True, True] + ): + with unittest.mock.patch("builtins.open", unittest.mock.mock_open()): + self.assertEqual(cpu.offline(1), 1) + + def test_get_freq_governor(self): + """Test get_freq_governor reads scaling_governor.""" + with unittest.mock.patch( + "builtins.open", + unittest.mock.mock_open(read_data="performance"), + ): + self.assertEqual(cpu.get_freq_governor(), "performance") + + def test_get_freq_governor_io_error(self): + """Test get_freq_governor returns empty on IOError.""" + with unittest.mock.patch("builtins.open", side_effect=IOError("No such file")): + self.assertEqual(cpu.get_freq_governor(), "") + + def test_set_freq_governor_no_current(self): + """Test set_freq_governor returns False when get_freq_governor is empty.""" + with unittest.mock.patch( + "avocado.utils.cpu.get_freq_governor", return_value="" + ): + self.assertFalse(cpu.set_freq_governor("performance")) + + def test_get_numa_node_has_cpus(self): + """Test get_numa_node_has_cpus parses has_cpu file.""" + with unittest.mock.patch( + "avocado.utils.cpu.genio.read_file", return_value="0-3\n" + ): + self.assertEqual(cpu.get_numa_node_has_cpus(), ["0", "3"]) + + def test_lscpu(self): + """Test lscpu parses output.""" + output = unittest.mock.Mock() + output.stdout = ( + b"Core(s) per socket: 4\n" + b"Thread(s) per core: 2\n" + b"Socket(s): 2\n" + b"Physical cores/chip: 4\n" + b"Physical sockets: 2\n" + b"Physical chips: 2\n" + ) + with unittest.mock.patch("avocado.utils.cpu.process.run", return_value=output): + result = cpu.lscpu() + self.assertEqual(result["virtual_cores"], 4) + self.assertEqual(result["threads_per_core"], 2) + self.assertEqual(result["sockets"], 2) + self.assertEqual(result["chips"], 4) + if __name__ == "__main__": unittest.main()