Skip to content
Merged
57 changes: 32 additions & 25 deletions gematria/datasets/pipelines/benchmark_cpu_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# limitations under the License.

import abc
from typing_extensions import override
from collections.abc import Iterable
from collections.abc import Collection
import os
import re
from typing_extensions import override


class BenchmarkScheduler(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -71,6 +71,19 @@ def verify(self):
pass


def _get_neighboring_threads(cpu_index: int) -> list[int]:
with open(
f'/sys/devices/system/cpu/cpu{cpu_index}/topology/thread_siblings_list'
) as thread_sibling_list_handle:
neighboring_threads_strings = re.split(
'[-,]+', thread_sibling_list_handle.read().strip()
)
neighboring_threads = [
int(cpu_index_str) for cpu_index_str in neighboring_threads_strings
]
return neighboring_threads


class DefaultBenchmarkScheduler(BenchmarkScheduler):
"""A BenchmarkScheduler that schedules processes separately.

Expand All @@ -81,44 +94,38 @@ class DefaultBenchmarkScheduler(BenchmarkScheduler):
assuming that two of the threads are neighboring (part of the same core).
Errors are raised if these conditions are not met. The benchmarking core
returned is one of the two neighboring threads. The main process has its
COU mask limited to the thread that neighbors neither of the other threads.
CPU mask limited to the thread that neighbors neither of the other threads.
"""

def __init__(self):
self._cpu_mask = []

@staticmethod
def _get_neighboring_threads(cpu_index: int) -> list[int]:
with open(
f'/sys/devices/system/cpu/cpu{cpu_index}/topology/thread_siblings_list'
) as thread_sibling_list_handle:
neighboring_threads_strings = re.split(
r'[-,]+', thread_sibling_list_handle.read().strip()
)
neighboring_threads = [
int(cpu_index_str) for cpu_index_str in neighboring_threads_strings
]
return neighboring_threads

def _get_aux_core_and_hyperthread_pair(
self,
cpu_mask: Iterable[int],
cpu_mask: Collection[int],
) -> tuple[int, list[int]]:
for cpu_index in cpu_mask:
neighboring_threads = self._get_neighboring_threads(cpu_index)
neighboring_threads = _get_neighboring_threads(cpu_index)
if len(neighboring_threads) != 2:
raise ValueError('Expected two hyperthreads per CPU.')

if (
neighboring_threads[0] in cpu_mask
and neighboring_threads[1] in cpu_mask
neighboring_threads[0] not in cpu_mask
or neighboring_threads[1] not in cpu_mask
):
cpus = list(cpu_mask)
cpus.remove(neighboring_threads[0])
cpus.remove(neighboring_threads[1])
return (cpus[0], [neighboring_threads[0], neighboring_threads[1]])
continue

cpus = list(cpu_mask)
cpus.remove(neighboring_threads[0])
cpus.remove(neighboring_threads[1])

if not cpus:
continue

return (cpus[0], [neighboring_threads[0], neighboring_threads[1]])
raise ValueError(
'Expected a pair of neighboring hyperthreads in the CPU mask.'
'Expected a pair of neighboring hyperthreads and an aux thread in the'
' CPU mask.'
)

@override
Expand Down
80 changes: 43 additions & 37 deletions gematria/datasets/pipelines/benchmark_cpu_scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,73 +22,83 @@

class BenchmarkSchedulerTests(absltest.TestCase):

def _set_normal_affinity(self):
cpu_mask = os.sched_getaffinity(0)
cpu_mask_list = list(cpu_mask)

self.addCleanup(lambda: os.sched_setaffinity(0, list(cpu_mask_list)))

# We first select a CPU to use for the hyperthread pair and then remove
# both cores in the pair from the mask set to ensure that we do not
# accidentally select an auxiliary CPU that is part of the pair.
hyperthread_pair_part = next(iter(cpu_mask))
hyperthread_pair = benchmark_cpu_scheduler._get_neighboring_threads(
hyperthread_pair_part
)
cpu_mask.remove(hyperthread_pair[0])
cpu_mask.remove(hyperthread_pair[1])
aux_cpu = cpu_mask.pop()
new_cpu_mask = [aux_cpu, *hyperthread_pair]

os.sched_setaffinity(0, new_cpu_mask)
return (aux_cpu, hyperthread_pair, cpu_mask_list)

def test_no_scheduling(self):
scheduler = benchmark_cpu_scheduler.NoSchedulingBenchmarkScheduler()
self.assertIsNone(scheduler.setup_and_get_benchmark_core())
scheduler.verify()

def test_default_scheduler_get_neighboring_threads(self):
scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
neighboring_threads = scheduler._get_neighboring_threads(0)
neighboring_threads = benchmark_cpu_scheduler._get_neighboring_threads(0)

# Just check that we get two CPU ids back that are not the same. We cannot
# do much more without knowing more about the system topology, and this
# should be a reasonable enough test.
self.assertLen(neighboring_threads, 2)
self.assertNotEqual(neighboring_threads[0], neighboring_threads[1])

@staticmethod
def _set_normal_affinity():
cpu_mask = os.sched_getaffinity(0)
cpu_mask_list = list(cpu_mask)
aux_cpu = cpu_mask.pop()
hyperthread_pair_part = cpu_mask.pop()
hyperthread_pair = benchmark_cpu_scheduler.DefaultBenchmarkScheduler._get_neighboring_threads(
hyperthread_pair_part
)
new_cpu_mask = [aux_cpu, *hyperthread_pair]

os.sched_setaffinity(0, new_cpu_mask)
return (aux_cpu, hyperthread_pair, cpu_mask_list)

@staticmethod
def _reset_cpu_affinity(cpu_mask: Iterable[int]):
os.sched_setaffinity(0, cpu_mask)

def test_default_scheduler_get_cores(self):
expected_aux_cpu, expected_hyperthread_pair, old_cpu_mask = (
self._set_normal_affinity()
)
expected_aux_cpu, expected_hyperthread_pair, _ = self._set_normal_affinity()
scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
cpu_mask = os.sched_getaffinity(0)
aux_cpu, hyperthread_pair = scheduler._get_aux_core_and_hyperthread_pair(
cpu_mask
)
self.assertEqual(aux_cpu, expected_aux_cpu)
self.assertContainsSubsequence(hyperthread_pair, expected_hyperthread_pair)
self._reset_cpu_affinity(old_cpu_mask)

def test_default_scheduler_get_cores_no_neighboring_threads(self):
cpu_mask = os.sched_getaffinity(0)

# If we have less than five CPUs, that means we are guaranteed to get
# a pair when selecting three of them. Skip this test in those
# instances.
if len(cpu_mask) < 5:
self.skipTest('Not enough cores to complete setup properly.')

three_cores = [cpu_mask.pop(), cpu_mask.pop(), cpu_mask.pop()]

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
with self.assertRaises(ValueError):
with self.assertRaisesRegex(
ValueError,
'Expected a pair of neighboring hyperthreads and an aux thread in the'
' CPU mask.',
):
scheduler._get_aux_core_and_hyperthread_pair(three_cores)

def test_default_scheduler_setup(self):
expected_aux_cpu, expected_hyperthread_pair, old_cpu_mask = (
self._set_normal_affinity()
)
expected_aux_cpu, expected_hyperthread_pair, _ = self._set_normal_affinity()

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
benchmark_core = scheduler.setup_and_get_benchmark_core()
self.assertIn(benchmark_core, expected_hyperthread_pair)
set_cpu_mask = os.sched_getaffinity(0)
self.assertLen(set_cpu_mask, 1)
self.assertEqual(set_cpu_mask.pop(), expected_aux_cpu)

self._reset_cpu_affinity(old_cpu_mask)
self.assertSequenceEqual(
set_cpu_mask,
{
expected_aux_cpu,
},
)

def test_default_scheduler_not_three_cpus(self):
old_cpu_mask = os.sched_getaffinity(0)
Expand All @@ -102,14 +112,12 @@ def test_default_scheduler_not_three_cpus(self):
os.sched_setaffinity(0, old_cpu_mask)

def test_default_scheduler_verify(self):
_, _, old_cpu_mask = self._set_normal_affinity()
self._set_normal_affinity()

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
scheduler.setup_and_get_benchmark_core()
scheduler.verify()

self._reset_cpu_affinity(old_cpu_mask)

def test_default_scheduler_verify_mask_changed(self):
_, _, old_cpu_mask = self._set_normal_affinity()

Expand All @@ -121,8 +129,6 @@ def test_default_scheduler_verify_mask_changed(self):
with self.assertRaises(ValueError):
scheduler.verify()

self._reset_cpu_affinity(old_cpu_mask)


if __name__ == '__main__':
absltest.main()
Loading