Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ new_git_repository(
# The pinned version of LLVM, and its SHA256 hash. The `LLVM_COMMIT` variable in
# `.github/workflows/main.yaml` must be updated to match this everytime it is
# changed.
LLVM_COMMIT = "842fd1537521d38913aec5c9a081afedf97d88fe"
LLVM_COMMIT = "cd66c9b6a04689659348c0a3ff4c1205b1133fe9"

LLVM_SHA256 = "21a1d434b4dbbafee65c2a9fc8e64ac3e3210eab9c5d67affd8d6d36bbf979b3"
LLVM_SHA256 = "8d2a8b2e0accdf072a3897aec6a3e68d522dfc1b40416b3c4215bb5af36a36e1"

http_archive(
name = "llvm-raw",
Expand Down
14 changes: 14 additions & 0 deletions gematria/datasets/pipelines/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,17 @@ gematria_py_test(
"//gematria/proto:execution_annotation_py_pb2",
],
)

gematria_py_library(
name = "benchmark_cpu_scheduler",
srcs = ["benchmark_cpu_scheduler.py"],
)

gematria_py_test(
name = "benchmark_cpu_scheduler_test",
size = "small",
srcs = ["benchmark_cpu_scheduler_test.py"],
deps = [
":benchmark_cpu_scheduler",
],
)
97 changes: 97 additions & 0 deletions gematria/datasets/pipelines/benchmark_cpu_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright 2024 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from collections.abc import Iterable
import os
import re


class BenchmarkScheduler(ABC):

@abstractmethod
def setup_and_get_benchmark_core(self) -> int | None:
pass

@abstractmethod
def verify(self):
pass


class NoSchedulingBenchmarkScheduler(BenchmarkScheduler):

def setup_and_get_benchmark_core(self) -> int | None:
return None

def verify(self):
pass


class DefaultBenchmarkScheduler(BenchmarkScheduler):

def __init__(self):
self._cpu_mask = []

@staticmethod
def _get_neighboring_threads(cpu_index: int) -> list[int]:
with open(
f'/sys/devices/system/cpu/cpu{cpu_index}/topology/thread_siblings_list'
) as thread_sibling_list_handle:
neighboring_threads_strings = re.split(
r'[-,]+', thread_sibling_list_handle.read().strip()
)
neighboring_threads = [
int(cpu_index_str) for cpu_index_str in neighboring_threads_strings
]
return neighboring_threads

def _get_aux_core_and_hyperthread_pair(
self,
cpu_mask: Iterable[int],
) -> tuple[int, list[int]]:
for cpu_index in cpu_mask:
neighboring_threads = self._get_neighboring_threads(cpu_index)
if len(neighboring_threads) != 2:
raise ValueError('Expected two hyperthreads per CPU.')

if (
neighboring_threads[0] in cpu_mask
and neighboring_threads[1] in cpu_mask
):
cpus = list(cpu_mask)
cpus.remove(neighboring_threads[0])
cpus.remove(neighboring_threads[1])
return (cpus[0], [neighboring_threads[0], neighboring_threads[1]])
raise ValueError(
'Expected a pair of neighboring hyperthreads in the CPU mask.'
)

def setup_and_get_benchmark_core(self) -> int | None:
cpu_mask = os.sched_getaffinity(0)

if len(cpu_mask) != 3:
raise ValueError('Expected to have three CPUs.')

aux_core, hyperthread_pair = self._get_aux_core_and_hyperthread_pair(
cpu_mask
)
os.sched_setaffinity(0, [aux_core])
self._cpu_mask = [aux_core]

return hyperthread_pair[0]

def verify(self):
cpu_mask = list(os.sched_getaffinity(0))
if self._cpu_mask != cpu_mask:
raise ValueError('Expected the CPU mask to not change.')
128 changes: 128 additions & 0 deletions gematria/datasets/pipelines/benchmark_cpu_scheduler_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright 2024 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from collections.abc import Iterable

from absl.testing import absltest

from gematria.datasets.pipelines import benchmark_cpu_scheduler


class BenchmarkSchedulerTests(absltest.TestCase):

def test_no_scheduling(self):
scheduler = benchmark_cpu_scheduler.NoSchedulingBenchmarkScheduler()
self.assertIsNone(scheduler.setup_and_get_benchmark_core())
scheduler.verify()

def test_default_scheduler_get_neighboring_threads(self):
scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
neighboring_threads = scheduler._get_neighboring_threads(0)

# Just check that we get two CPU ids back that are not the same. We cannot
# do much more without knowing more about the system topology, and this
# should be a reasonable enough test.
self.assertLen(neighboring_threads, 2)
self.assertNotEqual(neighboring_threads[0], neighboring_threads[1])

@staticmethod
def _set_normal_affinity():
cpu_mask = os.sched_getaffinity(0)
cpu_mask_list = list(cpu_mask)
aux_cpu = cpu_mask.pop()
hyperthread_pair_part = cpu_mask.pop()
hyperthread_pair = benchmark_cpu_scheduler.DefaultBenchmarkScheduler._get_neighboring_threads(
hyperthread_pair_part
)
new_cpu_mask = [aux_cpu, *hyperthread_pair]

os.sched_setaffinity(0, new_cpu_mask)
return (aux_cpu, hyperthread_pair, cpu_mask_list)

@staticmethod
def _reset_cpu_affinity(cpu_mask: Iterable[int]):
os.sched_setaffinity(0, cpu_mask)

def test_default_scheduler_get_cores(self):
expected_aux_cpu, expected_hyperthread_pair, old_cpu_mask = (
self._set_normal_affinity()
)
scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
cpu_mask = os.sched_getaffinity(0)
aux_cpu, hyperthread_pair = scheduler._get_aux_core_and_hyperthread_pair(
cpu_mask
)
self.assertEqual(aux_cpu, expected_aux_cpu)
self.assertContainsSubsequence(hyperthread_pair, expected_hyperthread_pair)
self._reset_cpu_affinity(old_cpu_mask)

def test_default_scheduler_get_cores_no_neighboring_threads(self):
cpu_mask = os.sched_getaffinity(0)
three_cores = [cpu_mask.pop(), cpu_mask.pop(), cpu_mask.pop()]

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
with self.assertRaises(ValueError):
scheduler._get_aux_core_and_hyperthread_pair(three_cores)

def test_default_scheduler_setup(self):
expected_aux_cpu, expected_hyperthread_pair, old_cpu_mask = (
self._set_normal_affinity()
)

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
benchmark_core = scheduler.setup_and_get_benchmark_core()
self.assertIn(benchmark_core, expected_hyperthread_pair)
set_cpu_mask = os.sched_getaffinity(0)
self.assertLen(set_cpu_mask, 1)
self.assertEqual(set_cpu_mask.pop(), expected_aux_cpu)

self._reset_cpu_affinity(old_cpu_mask)

def test_default_scheduler_not_three_cpus(self):
old_cpu_mask = os.sched_getaffinity(0)
cpu_mask_list = list(old_cpu_mask)
os.sched_setaffinity(0, cpu_mask_list[0:2])

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
with self.assertRaises(ValueError):
scheduler.setup_and_get_benchmark_core()

os.sched_setaffinity(0, old_cpu_mask)

def test_default_scheduler_verify(self):
_, _, old_cpu_mask = self._set_normal_affinity()

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
scheduler.setup_and_get_benchmark_core()
scheduler.verify()

self._reset_cpu_affinity(old_cpu_mask)

def test_default_scheduler_verify_mask_changed(self):
_, _, old_cpu_mask = self._set_normal_affinity()

scheduler = benchmark_cpu_scheduler.DefaultBenchmarkScheduler()
scheduler.setup_and_get_benchmark_core()

cpu_mask_list = list(old_cpu_mask)
os.sched_setaffinity(0, cpu_mask_list[1:3])
with self.assertRaises(ValueError):
scheduler.verify()

self._reset_cpu_affinity(old_cpu_mask)


if __name__ == '__main__':
absltest.main()