Skip to content

Commit 6c03166

Browse files
authored
Do not reuse global worker pool for files (#84)
1 parent 6465251 commit 6c03166

File tree

3 files changed

+51
-50
lines changed

3 files changed

+51
-50
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"amphp/amp": "^3",
3737
"amphp/byte-stream": "^2",
3838
"amphp/cache": "^2",
39-
"amphp/parallel": "^2.1",
39+
"amphp/parallel": "^2.3",
4040
"amphp/sync": "^2",
4141
"revolt/event-loop": "^1"
4242
},

src/Driver/ParallelFilesystemDriver.php

Lines changed: 47 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,37 +6,58 @@
66
use Amp\File\FilesystemException;
77
use Amp\File\Internal;
88
use Amp\Future;
9+
use Amp\Parallel\Worker\ContextWorkerPool;
10+
use Amp\Parallel\Worker\DelegatingWorkerPool;
11+
use Amp\Parallel\Worker\LimitedWorkerPool;
912
use Amp\Parallel\Worker\TaskFailureThrowable;
1013
use Amp\Parallel\Worker\Worker;
1114
use Amp\Parallel\Worker\WorkerException;
1215
use Amp\Parallel\Worker\WorkerPool;
1316
use function Amp\async;
14-
use function Amp\Parallel\Worker\workerPool;
1517

1618
final class ParallelFilesystemDriver implements FilesystemDriver
1719
{
1820
public const DEFAULT_WORKER_LIMIT = 8;
1921

20-
private WorkerPool $pool;
22+
private readonly WorkerPool $pool;
2123

22-
/** @var int Maximum number of workers to use for open files. */
23-
private int $workerLimit;
24+
/** @var positive-int Maximum number of workers to use for open files. */
25+
private readonly int $workerLimit;
2426

25-
/** @var \SplObjectStorage<Worker, int> Worker storage. */
26-
private \SplObjectStorage $workerStorage;
27+
/** @var \WeakMap<Worker, int> */
28+
private \WeakMap $workerStorage;
2729

28-
/** @var Future Pending worker request */
29-
private Future $pendingWorker;
30+
/** @var Future<Worker>|null Pending worker request */
31+
private ?Future $pendingWorker = null;
3032

3133
/**
32-
* @param int $workerLimit Maximum number of workers to use from the pool for open files.
34+
* @param WorkerPool|null $pool Custom worker pool to use for file workers. If null, a new pool is created.
35+
* @param int|null $workerLimit [Deprecated] Maximum number of workers to use from the pool for open files. Instead
36+
* of using this parameter, provide a pool with a limited number using an instance of {@see LimitedWorkerPool}
37+
* such as {@see ContextWorkerPool}.
3338
*/
34-
public function __construct(?WorkerPool $pool = null, int $workerLimit = self::DEFAULT_WORKER_LIMIT)
35-
{
36-
$this->pool = $pool ?? workerPool();
39+
public function __construct(?WorkerPool $pool = null, ?int $workerLimit = null)
40+
{
41+
/** @var \WeakMap<Worker, int> For Psalm. */
42+
$this->workerStorage = new \WeakMap();
43+
44+
if ($workerLimit !== null) {
45+
\trigger_error(
46+
'The $workerLimit parameter is deprecated and will be removed in the next major version.' .
47+
' To limit the number of workers used from the given pool, use an instance of ' .
48+
LimitedWorkerPool::class . ' instead, such as ' . ContextWorkerPool::class . ' or ' .
49+
DelegatingWorkerPool::class,
50+
\E_USER_DEPRECATED,
51+
);
52+
}
53+
54+
$workerLimit ??= $pool instanceof LimitedWorkerPool ? $pool->getWorkerLimit() : self::DEFAULT_WORKER_LIMIT;
55+
if ($workerLimit <= 0) {
56+
throw new \ValueError("Worker limit must be a positive integer");
57+
}
58+
59+
$this->pool = $pool ?? new ContextWorkerPool($workerLimit);
3760
$this->workerLimit = $workerLimit;
38-
$this->workerStorage = new \SplObjectStorage();
39-
$this->pendingWorker = Future::complete();
4061
}
4162

4263
public function openFile(string $path, string $mode): ParallelFile
@@ -45,12 +66,12 @@ public function openFile(string $path, string $mode): ParallelFile
4566

4667
$workerStorage = $this->workerStorage;
4768
$worker = new Internal\FileWorker($worker, static function (Worker $worker) use ($workerStorage): void {
48-
if (!$workerStorage->contains($worker)) {
69+
if (!isset($workerStorage[$worker])) {
4970
return;
5071
}
5172

5273
if (($workerStorage[$worker] -= 1) === 0 || !$worker->isRunning()) {
53-
$workerStorage->detach($worker);
74+
unset($workerStorage[$worker]);
5475
}
5576
});
5677

@@ -67,26 +88,19 @@ public function openFile(string $path, string $mode): ParallelFile
6788

6889
private function selectWorker(): Worker
6990
{
70-
$this->pendingWorker->await(); // Wait for any currently pending request for a worker.
91+
$this->pendingWorker?->await(); // Wait for any currently pending request for a worker.
7192

7293
if ($this->workerStorage->count() < $this->workerLimit) {
7394
$this->pendingWorker = async($this->pool->getWorker(...));
7495
$worker = $this->pendingWorker->await();
7596

76-
if ($this->workerStorage->contains($worker)) {
77-
// amphp/parallel v1.x may return an already used worker from the pool.
78-
$this->workerStorage[$worker] += 1;
79-
} else {
80-
// amphp/parallel v2.x should always return an unused worker.
81-
$this->workerStorage->attach($worker, 1);
82-
}
97+
$this->workerStorage[$worker] = 1;
8398

8499
return $worker;
85100
}
86101

87102
$max = \PHP_INT_MAX;
88-
foreach ($this->workerStorage as $storedWorker) {
89-
$count = $this->workerStorage[$storedWorker];
103+
foreach ($this->workerStorage as $storedWorker => $count) {
90104
if ($count <= $max) {
91105
$worker = $storedWorker;
92106
$max = $count;
@@ -96,7 +110,7 @@ private function selectWorker(): Worker
96110
\assert(isset($worker) && $worker instanceof Worker);
97111

98112
if (!$worker->isRunning()) {
99-
$this->workerStorage->detach($worker);
113+
unset($this->workerStorage[$worker]);
100114
return $this->selectWorker();
101115
}
102116

@@ -172,10 +186,12 @@ public function getLinkStatus(string $path): ?array
172186

173187
public function touch(string $path, ?int $modificationTime, ?int $accessTime): void
174188
{
175-
$this->runFileTask(new Internal\FileTask(
176-
"touch",
177-
[$path, $modificationTime, $accessTime]
178-
));
189+
$this->runFileTask(
190+
new Internal\FileTask(
191+
"touch",
192+
[$path, $modificationTime, $accessTime]
193+
)
194+
);
179195
}
180196

181197
public function read(string $path): string

src/functions.php

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
namespace Amp\File;
44

5-
use Amp\File\Driver\BlockingFilesystemDriver;
65
use Amp\File\Driver\EioFilesystemDriver;
76
use Amp\File\Driver\ParallelFilesystemDriver;
87
use Amp\File\Driver\StatusCachingFilesystemDriver;
@@ -27,24 +26,14 @@ function filesystem(?FilesystemDriver $driver = null): Filesystem
2726
return $map[$loop];
2827
}
2928

30-
$defaultDriver = createDefaultDriver();
29+
$driver = createDefaultDriver();
3130

3231
if (!\defined("AMP_WORKER")) { // Prevent caching in workers, cache in parent instead.
33-
$defaultDriver = new StatusCachingFilesystemDriver($defaultDriver);
32+
$driver = new StatusCachingFilesystemDriver($driver);
3433
}
35-
36-
$filesystem = new Filesystem($defaultDriver);
37-
} else {
38-
$filesystem = new Filesystem($driver);
39-
}
40-
41-
if (\defined("AMP_WORKER") && $driver instanceof ParallelFilesystemDriver) {
42-
throw new \Error("Cannot use the parallel driver within a worker");
4334
}
4435

45-
$map[$loop] = $filesystem;
46-
47-
return $filesystem;
36+
return $map[$loop] = new Filesystem($driver);
4837
}
4938

5039
/**
@@ -62,10 +51,6 @@ function createDefaultDriver(): FilesystemDriver
6251
return new EioFilesystemDriver($driver);
6352
}
6453

65-
if (\defined("AMP_WORKER")) { // Prevent spawning infinite workers.
66-
return new BlockingFilesystemDriver;
67-
}
68-
6954
return new ParallelFilesystemDriver;
7055
}
7156

0 commit comments

Comments
 (0)