diff --git a/composer.json b/composer.json index c664571..b6561fe 100644 --- a/composer.json +++ b/composer.json @@ -54,7 +54,10 @@ "psr-4": { "Amp\\File\\": "src" }, - "files": ["src/functions.php"] + "files": [ + "src/functions.php", + "src/Internal/functions.php" + ] }, "autoload-dev": { "psr-4": { diff --git a/src/Driver/BlockingFile.php b/src/Driver/BlockingFile.php index 33d5f3f..cc52b7c 100644 --- a/src/Driver/BlockingFile.php +++ b/src/Driver/BlockingFile.php @@ -8,6 +8,7 @@ use Amp\Cancellation; use Amp\DeferredFuture; use Amp\File\File; +use Amp\File\Internal; use Amp\File\Whence; /** @@ -55,18 +56,26 @@ public function __destruct() } } + public function lock(): bool + { + return Internal\lock($this->path, $this->getFileHandle()); + } + + public function unlock(): void + { + Internal\unlock($this->path, $this->getFileHandle()); + } + public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string { - if ($this->handle === null) { - throw new ClosedException("The file '{$this->path}' has been closed"); - } + $handle = $this->getFileHandle(); try { \set_error_handler(function (int $type, string $message): never { throw new StreamException("Failed reading from file '{$this->path}': {$message}"); }); - $data = \fread($this->handle, $length); + $data = \fread($handle, $length); if ($data === false) { throw new StreamException("Failed reading from file '{$this->path}'"); } @@ -79,16 +88,14 @@ public function read(?Cancellation $cancellation = null, int $length = self::DEF public function write(string $bytes): void { - if ($this->handle === null) { - throw new ClosedException("The file '{$this->path}' has been closed"); - } + $handle = $this->getFileHandle(); try { \set_error_handler(function (int $type, string $message): never { throw new StreamException("Failed writing to file '{$this->path}': {$message}"); }); - $length = \fwrite($this->handle, $bytes); + $length = \fwrite($handle, $bytes); if ($length === false) { throw new StreamException("Failed writing to file '{$this->path}'"); } @@ -146,16 +153,14 @@ public function onClose(\Closure $onClose): void public function truncate(int $size): void { - if ($this->handle === null) { - throw new ClosedException("The file '{$this->path}' has been closed"); - } + $handle = $this->getFileHandle(); try { \set_error_handler(function (int $type, string $message): never { throw new StreamException("Could not truncate file '{$this->path}': {$message}"); }); - if (!\ftruncate($this->handle, $size)) { + if (!\ftruncate($handle, $size)) { throw new StreamException("Could not truncate file '{$this->path}'"); } } finally { @@ -165,9 +170,7 @@ public function truncate(int $size): void public function seek(int $position, Whence $whence = Whence::Start): int { - if ($this->handle === null) { - throw new ClosedException("The file '{$this->path}' has been closed"); - } + $handle = $this->getFileHandle(); $mode = match ($whence) { Whence::Start => SEEK_SET, @@ -181,7 +184,7 @@ public function seek(int $position, Whence $whence = Whence::Start): int throw new StreamException("Could not seek in file '{$this->path}': {$message}"); }); - if (\fseek($this->handle, $position, $mode) === -1) { + if (\fseek($handle, $position, $mode) === -1) { throw new StreamException("Could not seek in file '{$this->path}'"); } @@ -193,20 +196,12 @@ public function seek(int $position, Whence $whence = Whence::Start): int public function tell(): int { - if ($this->handle === null) { - throw new ClosedException("The file '{$this->path}' has been closed"); - } - - return \ftell($this->handle); + return \ftell($this->getFileHandle()); } public function eof(): bool { - if ($this->handle === null) { - throw new ClosedException("The file '{$this->path}' has been closed"); - } - - return \feof($this->handle); + return \feof($this->getFileHandle()); } public function getPath(): string @@ -238,4 +233,18 @@ public function getId(): int { return $this->id; } + + /** + * @return resource + * + * @throws ClosedException + */ + private function getFileHandle() + { + if ($this->handle === null) { + throw new ClosedException("The file '{$this->path}' has been closed"); + } + + return $this->handle; + } } diff --git a/src/Driver/EioFile.php b/src/Driver/EioFile.php index 5426c81..79867a4 100644 --- a/src/Driver/EioFile.php +++ b/src/Driver/EioFile.php @@ -14,26 +14,36 @@ final class EioFile extends Internal\QueuedWritesFile { private readonly Internal\EioPoll $poll; - /** @var resource eio file handle. */ - private $fh; + /** @var int eio file handle resource ID. */ + private int $fh; + + /** @var resource|closed-resource */ + private $fd; private ?Future $closing = null; private readonly DeferredFuture $onClose; - /** - * @param resource $fh - */ - public function __construct(Internal\EioPoll $poll, $fh, string $path, string $mode, int $size) + public function __construct(Internal\EioPoll $poll, int $fh, string $path, string $mode, int $size) { parent::__construct($path, $mode, $size); $this->poll = $poll; $this->fh = $fh; + $this->fd = \fopen('php://fd/' . $this->fh, 'r'); $this->onClose = new DeferredFuture; } + protected function getFileHandle() + { + if (!\is_resource($this->fd)) { + throw new ClosedException("The file has been closed"); + } + + return $this->fd; + } + public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string { if ($this->isReading || !$this->queue->isEmpty()) { @@ -99,6 +109,10 @@ public function close(): void return; } + if (\is_resource($this->fd)) { + \fclose($this->fd); + } + $this->closing = $this->onClose->getFuture(); $this->poll->listen(); diff --git a/src/Driver/ParallelFile.php b/src/Driver/ParallelFile.php index 04ecc59..f356142 100644 --- a/src/Driver/ParallelFile.php +++ b/src/Driver/ParallelFile.php @@ -142,6 +142,35 @@ public function eof(): bool return $this->pendingWrites === 0 && $this->size <= $this->position; } + public function lock(): bool + { + return $this->flock(true); + } + + public function unlock(): void + { + $this->flock(false); + } + + private function flock(bool $mode): bool + { + if ($this->id === null) { + throw new ClosedException("The file has been closed"); + } + + $this->busy = true; + + try { + return $this->worker->execute(new Internal\FileTask('flock', [$mode], $this->id)); + } catch (TaskFailureException $exception) { + throw new StreamException("Attempting to lock the file failed", 0, $exception); + } catch (WorkerException $exception) { + throw new StreamException("Sending the task to the worker failed", 0, $exception); + } finally { + $this->busy = false; + } + } + public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string { if ($this->id === null) { diff --git a/src/Driver/StatusCachingFile.php b/src/Driver/StatusCachingFile.php index 2c89dba..3ca96aa 100644 --- a/src/Driver/StatusCachingFile.php +++ b/src/Driver/StatusCachingFile.php @@ -53,6 +53,16 @@ public function end(): void } } + public function lock(): bool + { + return $this->file->lock(); + } + + public function unlock(): void + { + $this->file->unlock(); + } + public function close(): void { $this->file->close(); diff --git a/src/Driver/UvFile.php b/src/Driver/UvFile.php index 5b3f770..e21854f 100644 --- a/src/Driver/UvFile.php +++ b/src/Driver/UvFile.php @@ -46,6 +46,15 @@ public function __construct( $this->onClose = new DeferredFuture; } + protected function getFileHandle() + { + if ($this->closing) { + throw new ClosedException("The file has been closed"); + } + + return $this->fh; + } + public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string { if ($this->isReading || !$this->queue->isEmpty()) { diff --git a/src/File.php b/src/File.php index da9222b..4b50d86 100644 --- a/src/File.php +++ b/src/File.php @@ -2,6 +2,7 @@ namespace Amp\File; +use Amp\ByteStream\ClosedException; use Amp\ByteStream\ReadableStream; use Amp\ByteStream\WritableStream; use Amp\Cancellation; @@ -54,4 +55,16 @@ public function getMode(): string; * @param int $size New file size. */ public function truncate(int $size): void; + + /** + * Makes a non-blocking attempt to lock the file. Returns true if the lock was obtained. + * + * @throws ClosedException If the file has been closed. + */ + public function lock(): bool; + + /** + * @throws ClosedException If the file has been closed. + */ + public function unlock(): void; } diff --git a/src/FileMutex.php b/src/FileMutex.php index b827f94..866e84f 100644 --- a/src/FileMutex.php +++ b/src/FileMutex.php @@ -26,27 +26,30 @@ public function __construct(private readonly string $fileName, ?Filesystem $file $this->directory = \dirname($this->fileName); } + /** + * @throws SyncException + */ public function acquire(?Cancellation $cancellation = null): Lock { if (!$this->filesystem->isDirectory($this->directory)) { throw new SyncException(\sprintf('Directory of "%s" does not exist or is not a directory', $this->fileName)); } - // Try to create the lock file. If the file already exists, someone else - // has the lock, so set an asynchronous timer and try again. + // Try to create and lock the file. If flock fails, someone else already has the lock, + // so set an asynchronous timer and try again. for ($attempt = 0; true; ++$attempt) { try { - $file = $this->filesystem->openFile($this->fileName, 'x'); - - // Return a lock object that can be used to release the lock on the mutex. - $lock = new Lock($this->release(...)); - + $file = $this->filesystem->openFile($this->fileName, 'c'); + if ($file->lock()) { + return new Lock(fn () => $this->release($file)); + } $file->close(); - - return $lock; - } catch (FilesystemException) { - delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * (2 ** $attempt)), cancellation: $cancellation); + } catch (FilesystemException $exception) { + throw new SyncException($exception->getMessage(), previous: $exception); } + + $multiplier = 2 ** \min(31, $attempt); + delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * $multiplier), cancellation: $cancellation); } } @@ -55,11 +58,12 @@ public function acquire(?Cancellation $cancellation = null): Lock * * @throws SyncException */ - private function release(): void + private function release(File $file): void { try { - $this->filesystem->deleteFile($this->fileName); - } catch (\Throwable $exception) { + $this->filesystem->deleteFile($this->fileName); // Delete file while holding the lock. + $file->close(); + } catch (FilesystemException $exception) { throw new SyncException( 'Failed to unlock the mutex file: ' . $this->fileName, previous: $exception, diff --git a/src/Internal/FileTask.php b/src/Internal/FileTask.php index 7f7016e..927d507 100644 --- a/src/Internal/FileTask.php +++ b/src/Internal/FileTask.php @@ -101,6 +101,15 @@ public function run(Channel $channel, Cancellation $cancellation): mixed $file->close(); return null; + case "flock": + [$mode] = $this->args; + if ($mode) { + return $file->lock(); + } + + $file->unlock(); + return null; + default: throw new \Error('Invalid operation'); } diff --git a/src/Internal/QueuedWritesFile.php b/src/Internal/QueuedWritesFile.php index 30b4d97..0c65b8f 100644 --- a/src/Internal/QueuedWritesFile.php +++ b/src/Internal/QueuedWritesFile.php @@ -121,6 +121,23 @@ public function truncate(int $size): void $future->await(); } + /** + * @return resource + * + * @throws ClosedException If the file has been closed. + */ + abstract protected function getFileHandle(); + + public function lock(): bool + { + return lock($this->getPath(), $this->getFileHandle()); + } + + public function unlock(): void + { + unlock($this->getPath(), $this->getFileHandle()); + } + public function seek(int $position, Whence $whence = Whence::Start): int { if ($this->isReading) { diff --git a/src/Internal/functions.php b/src/Internal/functions.php new file mode 100644 index 0000000..3c8071c --- /dev/null +++ b/src/Internal/functions.php @@ -0,0 +1,63 @@ +directory = \rtrim($directory, "/\\"); } - public function acquire(string $key, ?Cancellation $cancellation = null): Lock - { - if (!$this->filesystem->isDirectory($this->directory)) { - throw new SyncException(\sprintf('Directory "%s" does not exist or is not a directory', $this->directory)); - } - - $filename = $this->getFilename($key); - - // Try to create the lock file. If the file already exists, someone else - // has the lock, so set an asynchronous timer and try again. - for ($attempt = 0; true; ++$attempt) { - try { - $file = $this->filesystem->openFile($filename, 'x'); - - // Return a lock object that can be used to release the lock on the mutex. - $lock = new Lock(fn () => $this->release($filename)); - - $file->close(); - - return $lock; - } catch (FilesystemException) { - delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * (2 ** $attempt)), cancellation: $cancellation); - } - } - } - /** - * Releases the lock on the mutex. - * * @throws SyncException */ - private function release(string $filename): void + public function acquire(string $key, ?Cancellation $cancellation = null): Lock { - try { - $this->filesystem->deleteFile($filename); - } catch (\Throwable $exception) { - throw new SyncException( - 'Failed to unlock the mutex file: ' . $filename, - previous: $exception, - ); - } + $mutex = new FileMutex($this->getFilename($key), $this->filesystem); + + return $mutex->acquire($cancellation); } private function getFilename(string $key): string