Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not reuse global worker pool for files #84

Merged
merged 6 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"amphp/amp": "^3",
"amphp/byte-stream": "^2",
"amphp/cache": "^2",
"amphp/parallel": "^2.1",
"amphp/parallel": "^2.3",
"amphp/sync": "^2",
"revolt/event-loop": "^1"
},
Expand Down
78 changes: 47 additions & 31 deletions src/Driver/ParallelFilesystemDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,58 @@
use Amp\File\FilesystemException;
use Amp\File\Internal;
use Amp\Future;
use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\DelegatingWorkerPool;
use Amp\Parallel\Worker\LimitedWorkerPool;
use Amp\Parallel\Worker\TaskFailureThrowable;
use Amp\Parallel\Worker\Worker;
use Amp\Parallel\Worker\WorkerException;
use Amp\Parallel\Worker\WorkerPool;
use function Amp\async;
use function Amp\Parallel\Worker\workerPool;

final class ParallelFilesystemDriver implements FilesystemDriver
{
public const DEFAULT_WORKER_LIMIT = 8;

private WorkerPool $pool;
private readonly WorkerPool $pool;

/** @var int Maximum number of workers to use for open files. */
private int $workerLimit;
/** @var positive-int Maximum number of workers to use for open files. */
private readonly int $workerLimit;

/** @var \SplObjectStorage<Worker, int> Worker storage. */
private \SplObjectStorage $workerStorage;
/** @var \WeakMap<Worker, int> */
private \WeakMap $workerStorage;

/** @var Future Pending worker request */
private Future $pendingWorker;
/** @var Future<Worker>|null Pending worker request */
private ?Future $pendingWorker = null;

/**
* @param int $workerLimit Maximum number of workers to use from the pool for open files.
* @param WorkerPool|null $pool Custom worker pool to use for file workers. If null, a new pool is created.
* @param int|null $workerLimit [Deprecated] Maximum number of workers to use from the pool for open files. Instead
* of using this parameter, provide a pool with a limited number using an instance of {@see LimitedWorkerPool}
* such as {@see ContextWorkerPool}.
*/
public function __construct(?WorkerPool $pool = null, int $workerLimit = self::DEFAULT_WORKER_LIMIT)
{
$this->pool = $pool ?? workerPool();
public function __construct(?WorkerPool $pool = null, ?int $workerLimit = null)
{
/** @var \WeakMap<Worker, int> For Psalm. */
$this->workerStorage = new \WeakMap();

if ($workerLimit !== null) {
\trigger_error(
'The $workerLimit parameter is deprecated and will be removed in the next major version.' .
' To limit the number of workers used from the given pool, use an instance of ' .
LimitedWorkerPool::class . ' instead, such as ' . ContextWorkerPool::class . ' or ' .
DelegatingWorkerPool::class,
\E_USER_DEPRECATED,
);
}

$workerLimit ??= $pool instanceof LimitedWorkerPool ? $pool->getWorkerLimit() : self::DEFAULT_WORKER_LIMIT;
if ($workerLimit <= 0) {
throw new \ValueError("Worker limit must be a positive integer");
}

$this->pool = $pool ?? new ContextWorkerPool($workerLimit);
trowski marked this conversation as resolved.
Show resolved Hide resolved
$this->workerLimit = $workerLimit;
$this->workerStorage = new \SplObjectStorage();
$this->pendingWorker = Future::complete();
}

public function openFile(string $path, string $mode): ParallelFile
Expand All @@ -45,12 +66,12 @@ public function openFile(string $path, string $mode): ParallelFile

$workerStorage = $this->workerStorage;
$worker = new Internal\FileWorker($worker, static function (Worker $worker) use ($workerStorage): void {
if (!$workerStorage->contains($worker)) {
if (!isset($workerStorage[$worker])) {
return;
}

if (($workerStorage[$worker] -= 1) === 0 || !$worker->isRunning()) {
$workerStorage->detach($worker);
unset($workerStorage[$worker]);
}
});

Expand All @@ -67,26 +88,19 @@ public function openFile(string $path, string $mode): ParallelFile

private function selectWorker(): Worker
{
$this->pendingWorker->await(); // Wait for any currently pending request for a worker.
$this->pendingWorker?->await(); // Wait for any currently pending request for a worker.

if ($this->workerStorage->count() < $this->workerLimit) {
$this->pendingWorker = async($this->pool->getWorker(...));
$worker = $this->pendingWorker->await();

if ($this->workerStorage->contains($worker)) {
// amphp/parallel v1.x may return an already used worker from the pool.
$this->workerStorage[$worker] += 1;
} else {
// amphp/parallel v2.x should always return an unused worker.
$this->workerStorage->attach($worker, 1);
}
$this->workerStorage[$worker] = 1;

return $worker;
}

$max = \PHP_INT_MAX;
foreach ($this->workerStorage as $storedWorker) {
$count = $this->workerStorage[$storedWorker];
foreach ($this->workerStorage as $storedWorker => $count) {
if ($count <= $max) {
$worker = $storedWorker;
$max = $count;
Expand All @@ -96,7 +110,7 @@ private function selectWorker(): Worker
\assert(isset($worker) && $worker instanceof Worker);

if (!$worker->isRunning()) {
$this->workerStorage->detach($worker);
unset($this->workerStorage[$worker]);
return $this->selectWorker();
}

Expand Down Expand Up @@ -172,10 +186,12 @@ public function getLinkStatus(string $path): ?array

public function touch(string $path, ?int $modificationTime, ?int $accessTime): void
{
$this->runFileTask(new Internal\FileTask(
"touch",
[$path, $modificationTime, $accessTime]
));
$this->runFileTask(
new Internal\FileTask(
"touch",
[$path, $modificationTime, $accessTime]
)
);
}

public function read(string $path): string
Expand Down
21 changes: 3 additions & 18 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Amp\File;

use Amp\File\Driver\BlockingFilesystemDriver;
use Amp\File\Driver\EioFilesystemDriver;
use Amp\File\Driver\ParallelFilesystemDriver;
use Amp\File\Driver\StatusCachingFilesystemDriver;
Expand All @@ -27,24 +26,14 @@ function filesystem(?FilesystemDriver $driver = null): Filesystem
return $map[$loop];
}

$defaultDriver = createDefaultDriver();
$driver = createDefaultDriver();

if (!\defined("AMP_WORKER")) { // Prevent caching in workers, cache in parent instead.
$defaultDriver = new StatusCachingFilesystemDriver($defaultDriver);
$driver = new StatusCachingFilesystemDriver($driver);
}

$filesystem = new Filesystem($defaultDriver);
} else {
$filesystem = new Filesystem($driver);
}

if (\defined("AMP_WORKER") && $driver instanceof ParallelFilesystemDriver) {
throw new \Error("Cannot use the parallel driver within a worker");
}

$map[$loop] = $filesystem;

return $filesystem;
return $map[$loop] = new Filesystem($driver);
}

/**
Expand All @@ -62,10 +51,6 @@ function createDefaultDriver(): FilesystemDriver
return new EioFilesystemDriver($driver);
}

if (\defined("AMP_WORKER")) { // Prevent spawning infinite workers.
return new BlockingFilesystemDriver;
}

return new ParallelFilesystemDriver;
Comment on lines -65 to 54
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we prevent infinite ParallelFilesystemDriver instances now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't, so tasks can use async file access. @azjezz mentioned this was an issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But inside the worker we'll spawn another worker now and so on and never actually handle filesystem calls, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No no, FileTask which handles file system calls will use the blocking driver for the call. If the worker executes a user task, then another pool of workers might be spawned. I don't see infinite workers being likely (unless someone executes a Task within a Task, but that would be strange), but each worker having its own set of workers would be a possibility. Since by default we limit the number of workers to 8, the resource usage should still be within reason even on systems without eio or uv.

}

Expand Down
Loading