Skip to content

Commit c48259e

Browse files
committed
feat(fabric): introduce process-safe port management
1 parent b554e99 commit c48259e

File tree

8 files changed

+2482
-63
lines changed

8 files changed

+2482
-63
lines changed

docs/source-fabric/advanced/port_manager_design.md

Lines changed: 769 additions & 0 deletions
Large diffs are not rendered by default.

docs/source-fabric/levels/advanced.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
<../advanced/distributed_communication>
77
<../advanced/multiple_setup>
88
<../advanced/compile>
9+
<../advanced/port_manager_design>
910
<../advanced/model_parallel/fsdp>
1011
<../guide/checkpoint/distributed_checkpoint>
1112

@@ -59,6 +60,14 @@ Advanced skills
5960
:height: 170
6061
:tag: advanced
6162

63+
.. displayitem::
64+
:header: Coordinate distributed ports safely
65+
:description: Learn how Lightning Fabric manages process-safe port allocation with file-backed state
66+
:button_link: ../advanced/port_manager_design.html
67+
:col_css: col-md-4
68+
:height: 170
69+
:tag: advanced
70+
6271
.. displayitem::
6372
:header: Save and load very large models
6473
:description: Save and load very large models efficiently with distributed checkpoints
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
# Copyright The Lightning AI team.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Platform-abstracted file locking for cross-process coordination."""
15+
16+
import logging
17+
import os
18+
import sys
19+
import time
20+
from abc import ABC, abstractmethod
21+
from contextlib import suppress
22+
from pathlib import Path
23+
from typing import Optional
24+
25+
log = logging.getLogger(__name__)
26+
27+
28+
class FileLock(ABC):
29+
"""Abstract base class for platform-specific file locking.
30+
31+
File locks enable process-safe coordination by providing exclusive access to shared resources across multiple
32+
processes. This abstract interface allows platform-specific implementations while maintaining a consistent API.
33+
34+
"""
35+
36+
def __init__(self, lock_file: Path) -> None:
37+
"""Initialize the file lock.
38+
39+
Args:
40+
lock_file: Path to the lock file
41+
42+
"""
43+
self._lock_file = lock_file
44+
self._fd: Optional[int] = None
45+
self._is_locked = False
46+
47+
@abstractmethod
48+
def acquire(self, timeout: float = 30.0) -> bool:
49+
"""Acquire the lock, blocking up to timeout seconds.
50+
51+
Args:
52+
timeout: Maximum seconds to wait for lock acquisition
53+
54+
Returns:
55+
True if lock was acquired, False if timeout occurred
56+
57+
"""
58+
59+
@abstractmethod
60+
def release(self) -> None:
61+
"""Release the lock if held."""
62+
63+
def is_locked(self) -> bool:
64+
"""Check if this instance currently holds the lock.
65+
66+
Returns:
67+
True if lock is currently held by this instance
68+
69+
"""
70+
return self._is_locked
71+
72+
def __enter__(self) -> "FileLock":
73+
"""Enter context manager - acquire lock."""
74+
if not self.acquire():
75+
raise TimeoutError(f"Failed to acquire lock on {self._lock_file} within timeout")
76+
return self
77+
78+
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
79+
"""Exit context manager - release lock."""
80+
self.release()
81+
return False # Don't suppress exceptions
82+
83+
def __del__(self) -> None:
84+
"""Cleanup - ensure lock is released and file descriptor closed."""
85+
if self._is_locked:
86+
with suppress(Exception):
87+
self.release()
88+
89+
if self._fd is not None:
90+
with suppress(Exception):
91+
os.close(self._fd)
92+
93+
94+
class UnixFileLock(FileLock):
95+
"""File locking using fcntl.flock for Unix-like systems (Linux, macOS).
96+
97+
Uses fcntl.flock() which provides advisory locking. This implementation uses LOCK_EX (exclusive lock) with LOCK_NB
98+
(non-blocking) for timeout support.
99+
100+
"""
101+
102+
def acquire(self, timeout: float = 30.0) -> bool:
103+
"""Acquire exclusive lock using fcntl.flock.
104+
105+
Args:
106+
timeout: Maximum seconds to wait for lock
107+
108+
Returns:
109+
True if lock acquired, False if timeout occurred
110+
111+
"""
112+
import fcntl
113+
114+
# Ensure lock file exists and open it
115+
self._lock_file.parent.mkdir(parents=True, exist_ok=True)
116+
self._lock_file.touch(exist_ok=True)
117+
118+
if self._fd is None:
119+
self._fd = os.open(str(self._lock_file), os.O_RDWR | os.O_CREAT)
120+
121+
start_time = time.time()
122+
while time.time() - start_time < timeout:
123+
try:
124+
# Try to acquire exclusive lock non-blockingly
125+
fcntl.flock(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
126+
self._is_locked = True
127+
return True
128+
except OSError:
129+
# Lock held by another process, wait and retry
130+
time.sleep(0.1)
131+
132+
# Timeout - log warning
133+
elapsed = time.time() - start_time
134+
log.warning(f"Lock acquisition timeout after {elapsed:.1f}s for {self._lock_file}")
135+
return False
136+
137+
def release(self) -> None:
138+
"""Release the lock using fcntl.flock."""
139+
if not self._is_locked or self._fd is None:
140+
return
141+
142+
import fcntl
143+
144+
try:
145+
fcntl.flock(self._fd, fcntl.LOCK_UN)
146+
self._is_locked = False
147+
except OSError as e:
148+
log.warning(f"Error releasing lock on {self._lock_file}: {e}")
149+
150+
151+
class WindowsFileLock(FileLock):
152+
"""File locking using msvcrt.locking for Windows systems.
153+
154+
Uses msvcrt.locking() which provides mandatory locking on Windows. This implementation uses LK_NBLCK (non-blocking
155+
exclusive lock) for timeout support.
156+
157+
"""
158+
159+
def acquire(self, timeout: float = 30.0) -> bool:
160+
"""Acquire exclusive lock using msvcrt.locking.
161+
162+
Args:
163+
timeout: Maximum seconds to wait for lock
164+
165+
Returns:
166+
True if lock acquired, False if timeout occurred
167+
168+
"""
169+
import msvcrt
170+
171+
# Ensure lock file exists and open it
172+
self._lock_file.parent.mkdir(parents=True, exist_ok=True)
173+
self._lock_file.touch(exist_ok=True)
174+
175+
if self._fd is None:
176+
self._fd = os.open(str(self._lock_file), os.O_RDWR | os.O_CREAT)
177+
178+
start_time = time.time()
179+
while time.time() - start_time < timeout:
180+
try:
181+
# Try to lock 1 byte at file position 0
182+
msvcrt.locking(self._fd, msvcrt.LK_NBLCK, 1)
183+
self._is_locked = True
184+
return True
185+
except OSError:
186+
# Lock held by another process, wait and retry
187+
time.sleep(0.1)
188+
189+
# Timeout - log warning
190+
elapsed = time.time() - start_time
191+
log.warning(f"Lock acquisition timeout after {elapsed:.1f}s for {self._lock_file}")
192+
return False
193+
194+
def release(self) -> None:
195+
"""Release the lock using msvcrt.locking."""
196+
if not self._is_locked or self._fd is None:
197+
return
198+
199+
import msvcrt
200+
201+
try:
202+
# Unlock the byte we locked
203+
msvcrt.locking(self._fd, msvcrt.LK_UNLCK, 1)
204+
self._is_locked = False
205+
except OSError as e:
206+
log.warning(f"Error releasing lock on {self._lock_file}: {e}")
207+
208+
209+
def create_file_lock(lock_file: Path) -> FileLock:
210+
"""Factory function to create platform-appropriate file lock.
211+
212+
Args:
213+
lock_file: Path to the lock file
214+
215+
Returns:
216+
Platform-specific FileLock instance
217+
218+
"""
219+
if sys.platform == "win32":
220+
return WindowsFileLock(lock_file)
221+
return UnixFileLock(lock_file)

0 commit comments

Comments
 (0)