Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions distributed/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from collections.abc import Callable
from functools import wraps

import psutil

from distributed.compatibility import WINDOWS

_empty_namedtuple = collections.namedtuple("_empty_namedtuple", ())
Expand All @@ -13,14 +15,8 @@
def _psutil_caller(method_name, default=_empty_namedtuple):
"""
Return a function calling the given psutil *method_name*,
or returning *default* if psutil is not present.
or returning *default* if psutil fails.
"""
# Import only once to avoid the cost of a failing import at each wrapper() call
try:
import psutil
except ImportError: # pragma: no cover
return default

meth = getattr(psutil, method_name)

@wraps(meth)
Expand Down
3 changes: 2 additions & 1 deletion distributed/pytest_resourceleaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test1():
import pytest

from distributed.metrics import time
from distributed.system import process_memory


def pytest_addoption(parser):
Expand Down Expand Up @@ -172,7 +173,7 @@ class RSSMemoryChecker(ResourceChecker, name="memory"):
LEAK_THRESHOLD = 10 * 2**20

def measure(self) -> int:
return psutil.Process().memory_info().rss
return process_memory()

def has_leak(self, before: int, after: int) -> bool:
return after > before + self.LEAK_THRESHOLD
Expand Down
23 changes: 23 additions & 0 deletions distributed/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import psutil

from distributed.compatibility import LINUX

__all__ = ("memory_limit", "MEMORY_LIMIT")


Expand Down Expand Up @@ -63,4 +65,25 @@ def memory_limit() -> int:
return limit


def process_memory(proc: psutil.Process | int | None = None) -> int:
"""Return total memory used by a process

Parameters
----------
proc: psutil.Process | int, optional
Process or PID to measure. Default: current process
"""
if proc is None:
proc = psutil.Process()
elif isinstance(proc, int):
proc = psutil.Process(proc)

if LINUX:
minfo = proc.memory_full_info()
return minfo.rss + minfo.swap
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we can add this. swap is defined as the data that is swaped to disk

swap (Linux): amount of memory that has been swapped out to disk.

and is not actually in memory

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We measure "process" memory for these purposes:

  • Calculate unmanaged memory (bokeh + prometheus + AMM optimistic memory heuristics). If you don't add swap, then managed memory may exceed rss. You will also see be misled into thinking that your host mounts enough physical RAM, whereas it could benefit from more.
  • terminate threshold. If the user configured the memory_limit so that spilling is possible, then we should expect that the memory_limit was in fact set to physical memory + part of the swap file. If you omit swap from your measure, then you may end up never hitting the terminate threshold while the swap mount fills up. When it's fully saturated, the whole host will hang.
  • In addition to the previous point: it's entirely legitimate and performant to disable target, spill, and pause thresholds, leaving everything to the os swapping mechanism, and just leave the terminate threshold on.
  • To appreciate how much memory you reclaimed aftr gc and after spilling. Again, you have no control if some of the memory was swapped out or not. As a matter of fact, if you have both spill and swap systems going on, it's very likely that dask will choose to spill memory that was swapped out to begin with - because both algorithms are LRU(ish). This is obviously poorly performant, but a very realistic use case on dev boxes where you have a web browser and your IDE running at least.
  • To notice memory leaks in tests. Again, if you're leaking you might be leaking swapped out memory.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't add swap, then managed memory may exceed rss. You will also see be misled into thinking that your host mounts enough physical RAM, whereas it could benefit from more.

I'm not terribly concerned about this. If that was a concern and we wanted to raise awareness we should rather visualize swap on the dashboard

If you omit swap from your measure, then you may end up never hitting the terminate threshold while the swap mount fills up. When it's fully saturated, the whole host will hang.

True but this doesn't warrant us introducing a poorly/untested functionality. If this was a concern I would go for a better visualization instead of applying a "magical" fix

In addition to the previous point: it's entirely legitimate and performant to disable target, spill, and pause thresholds, leaving everything to the os swapping mechanism, and just leave the terminate threshold on.

Yes but this still doesn't explain why dask should add these values.

This is obviously poorly performant, but a very realistic use case on dev boxes where you have a web browser and your IDE running at least.

How would this help? if the OS starts spilling, the new process_memory definition would stay roughly the same and won't reduce, i.e. you will not help avoiding the "spill + swap" scenario. Quite the opposite, really since while RSS is lower and we might not spill if additional data is put on the cluster, this new metric would suggest that we're hitting the actual hard limit sooner and would increase the chance of entering this "spill+swap" space.

Besides, I would refrain from arguing about how the OS swaps. The OS swaps memory pages and is using algorithms we don't have any control over. We're operating at a way too high level of abstraction of trying to compare these systems

To notice memory leaks in tests. Again, if you're leaking you might be leaking swapped out memory.

Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, I would refrain from arguing about how the OS swaps. The OS swaps memory pages and is using algorithms we don't have any control over. We're operating at a way too high level of abstraction of trying to compare these systems

Exactly. Dask should not know about how many pages are swapped and how many aren't.
Dask does care about how much memory dask itself (well, the Python interpreter actually) malloc'ed. That's what we call process memory. The total amount of malloc'ed memory is rss+swap.

else:
minfo = proc.memory_info()
return minfo.rss


MEMORY_LIMIT = memory_limit()
3 changes: 2 additions & 1 deletion distributed/system_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from distributed.compatibility import WINDOWS
from distributed.diagnostics import nvml
from distributed.metrics import monotonic, time
from distributed.system import process_memory


class SystemMonitor:
Expand Down Expand Up @@ -112,7 +113,7 @@ def get_process_memory(self) -> int:
as the OS allocating and releasing memory is highly volatile and a constant
source of flakiness.
"""
return self.proc.memory_info().rss
return process_memory(self.proc)

def update(self) -> dict[str, Any]:
now = time()
Expand Down
6 changes: 5 additions & 1 deletion distributed/tests/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import psutil
import pytest

from distributed.system import memory_limit
from distributed.system import memory_limit, process_memory


def test_memory_limit():
Expand Down Expand Up @@ -97,3 +97,7 @@ def test_rlimit():
assert memory_limit() == new_limit
except OSError:
pytest.skip("resource could not set the RSS limit")


def test_process_memory():
assert 2**20 < process_memory() < 2**40
3 changes: 1 addition & 2 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from typing import ClassVar, Iterator, TypeVar, overload

import click
import psutil
import tblib.pickling_support

try:
Expand Down Expand Up @@ -200,8 +201,6 @@ def get_ip_interface(ifname):
ValueError is raised if the interface does no have an IPv4 address
associated with it.
"""
import psutil

net_if_addrs = psutil.net_if_addrs()

if ifname not in net_if_addrs:
Expand Down
15 changes: 5 additions & 10 deletions distributed/utils_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
import threading
from collections import deque

import psutil

from dask.utils import format_bytes

from distributed.metrics import thread_time
from distributed.system import process_memory

logger = _logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -147,12 +150,7 @@ def __init__(self, warn_over_frac=0.1, info_over_rss_win=10 * 1e6):
def enable(self):
assert not self._enabled
self._fractional_timer = FractionalTimer(n_samples=self.N_SAMPLES)
try:
import psutil
except ImportError:
self._proc = None
else:
self._proc = psutil.Process()
self._proc = psutil.Process()

cb = self._gc_callback
assert cb not in gc.callbacks
Expand Down Expand Up @@ -181,10 +179,7 @@ def _gc_callback(self, phase, info):
# don't waste time measuring them
if info["generation"] != 2:
return
if self._proc is not None:
rss = self._proc.memory_info().rss
else:
rss = 0
rss = process_memory(self._proc)
if phase == "start":
self._fractional_timer.start_timing()
self._gc_rss_before = rss
Expand Down
6 changes: 4 additions & 2 deletions distributed/worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,13 @@ def memory_monitor(self, nanny: Nanny) -> None:

process = nanny.process.process
try:
memory = psutil.Process(process.pid).memory_info().rss
memory = system.process_memory(process.pid)
except (ProcessLookupError, psutil.NoSuchProcess, psutil.AccessDenied):
return # pragma: nocover

if memory / self.memory_limit <= self.memory_terminate_fraction:
assert self.memory_limit is not None
assert self.memory_terminate_fraction is not False
if memory <= self.memory_limit * self.memory_terminate_fraction:
return

if self._last_terminated_pid != process.pid:
Expand Down