Skip to content

Commit

Permalink
flock-based File Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 6, 2024
1 parent 6c03166 commit 6407608
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 53 deletions.
48 changes: 36 additions & 12 deletions src/FileMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ final class FileMutex implements Mutex
private const LATENCY_TIMEOUT = 0.01;
private const DELAY_LIMIT = 1;

private static ?\Closure $errorHandler = null;

private readonly Filesystem $filesystem;

private readonly string $directory;
Expand All @@ -26,39 +28,61 @@ public function __construct(private readonly string $fileName, ?Filesystem $file
$this->directory = \dirname($this->fileName);
}

/**
* @throws SyncException
*/
public function acquire(?Cancellation $cancellation = null): Lock
{
if (!$this->filesystem->isDirectory($this->directory)) {
throw new SyncException(\sprintf('Directory of "%s" does not exist or is not a directory', $this->fileName));
}

// Try to create the lock file. If the file already exists, someone else
// has the lock, so set an asynchronous timer and try again.
// Try to create and lock the file. If flock fails, someone else already has the lock,
// so set an asynchronous timer and try again.
for ($attempt = 0; true; ++$attempt) {
try {
$file = $this->filesystem->openFile($this->fileName, 'x');

// Return a lock object that can be used to release the lock on the mutex.
$lock = new Lock($this->release(...));
\set_error_handler(self::$errorHandler ??= static fn () => true);

$file->close();
try {
$handle = \fopen($this->fileName, 'c');
if (!$handle) {
throw new SyncException(\sprintf(
'Unable to open or create file at %s: %s',
$this->fileName,
\error_get_last()['message'] ?? 'Unknown error',
));
}

return $lock;
} catch (FilesystemException) {
delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * (2 ** $attempt)), cancellation: $cancellation);
if (\flock($handle, \LOCK_EX | \LOCK_NB)) {
return new Lock(fn () => $this->release($handle));
}
} finally {
\restore_error_handler();
}

$multiplier = 2 ** \min(31, $attempt);
delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * $multiplier), cancellation: $cancellation);
}
}

/**
* Releases the lock on the mutex.
*
* @param resource $handle
*
* @throws SyncException
*/
private function release(): void
private function release($handle): void
{
try {
$this->filesystem->deleteFile($this->fileName);

\set_error_handler(self::$errorHandler ??= static fn () => true);

try {
\fclose($handle);
} finally {
\restore_error_handler();
}
} catch (\Throwable $exception) {
throw new SyncException(
'Failed to unlock the mutex file: ' . $this->fileName,
Expand Down
45 changes: 4 additions & 41 deletions src/KeyedFileMutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@
use Amp\Sync\KeyedMutex;
use Amp\Sync\Lock;
use Amp\Sync\SyncException;
use function Amp\delay;

final class KeyedFileMutex implements KeyedMutex
{
private const LATENCY_TIMEOUT = 0.01;
private const DELAY_LIMIT = 1;

private readonly Filesystem $filesystem;

private readonly string $directory;
Expand All @@ -26,47 +22,14 @@ public function __construct(string $directory, ?Filesystem $filesystem = null)
$this->directory = \rtrim($directory, "/\\");
}

public function acquire(string $key, ?Cancellation $cancellation = null): Lock
{
if (!$this->filesystem->isDirectory($this->directory)) {
throw new SyncException(\sprintf('Directory "%s" does not exist or is not a directory', $this->directory));
}

$filename = $this->getFilename($key);

// Try to create the lock file. If the file already exists, someone else
// has the lock, so set an asynchronous timer and try again.
for ($attempt = 0; true; ++$attempt) {
try {
$file = $this->filesystem->openFile($filename, 'x');

// Return a lock object that can be used to release the lock on the mutex.
$lock = new Lock(fn () => $this->release($filename));

$file->close();

return $lock;
} catch (FilesystemException) {
delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * (2 ** $attempt)), cancellation: $cancellation);
}
}
}

/**
* Releases the lock on the mutex.
*
* @throws SyncException
*/
private function release(string $filename): void
public function acquire(string $key, ?Cancellation $cancellation = null): Lock
{
try {
$this->filesystem->deleteFile($filename);
} catch (\Throwable $exception) {
throw new SyncException(
'Failed to unlock the mutex file: ' . $filename,
previous: $exception,
);
}
$mutex = new FileMutex($this->getFilename($key), $this->filesystem);

return $mutex->acquire($cancellation);
}

private function getFilename(string $key): string
Expand Down

0 comments on commit 6407608

Please sign in to comment.