diff --git a/CHANGELOG.md b/CHANGELOG.md index 90e93dc..ad430c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to `fork` will be documented in this file. +## 1.1.0 - 2021-05-04 + +- Add `Fork::concurrent(int $concurrent)` + ## 1.0.1 - 2021-05-03 - Add check for pcntl support diff --git a/README.md b/README.md index fa8eb97..06e74fc 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,22 @@ $result = Fork::new() ); ``` +### Configuring concurrency + +By default, all callables will be run in parallel. You can however configure a maximum amount of concurrent processes: + +```php +$results = Fork::new() + ->concurrent(2) + ->run( + fn () => 1, + fn () => 2, + fn () => 3, + ); +``` + +In this case, the first two functions will be run immediately and as soon as one of them finishes, the last one will start as well. + ## Testing ```bash diff --git a/composer.json b/composer.json index 00ac55f..a60da3f 100644 --- a/composer.json +++ b/composer.json @@ -25,6 +25,7 @@ "ext-pcntl": "*" }, "require-dev": { + "nesbot/carbon": "^2.47", "phpunit/phpunit": "^9.5", "spatie/ray": "^1.10" }, diff --git a/src/Fork.php b/src/Fork.php index c567a0d..8e793b0 100644 --- a/src/Fork.php +++ b/src/Fork.php @@ -8,11 +8,21 @@ class Fork { protected ?Closure $toExecuteBeforeInChildTask = null; + protected ?Closure $toExecuteBeforeInParentTask = null; protected ?Closure $toExecuteAfterInChildTask = null; + protected ?Closure $toExecuteAfterInParentTask = null; + protected ?int $concurrent = null; + + /** @var \Spatie\Fork\Task[] */ + protected array $queue = []; + + /** @var \Spatie\Fork\Task[] */ + protected array $runningTasks = []; + public function __construct() { if (! function_exists('pcntl_fork')) { @@ -41,21 +51,69 @@ public function after(callable $child = null, callable $parent = null): self return $this; } + public function concurrent(int $concurrent): self + { + $this->concurrent = $concurrent; + + return $this; + } + public function run(callable ...$callables): array { $tasks = []; foreach ($callables as $order => $callable) { - if ($this->toExecuteBeforeInParentTask) { - ($this->toExecuteBeforeInParentTask)(); + $tasks[] = Task::fromCallable($callable, $order); + } + + return $this->waitFor(...$tasks); + } + + protected function waitFor(Task ...$queue): array + { + $output = []; + + $this->startRunning(...$queue); + + while ($this->isRunning()) { + foreach ($this->runningTasks as $task) { + if (! $task->isFinished()) { + continue; + } + + $output[$task->order()] = $this->finishTask($task); + + $this->shiftTaskFromQueue(); + } + + if ($this->isRunning()) { + usleep(1_000); } + } - $task = Task::fromCallable($callable, $order); + return $output; + } - $tasks[] = $this->forkForTask($task); + protected function runTask(Task $task): Task + { + if ($this->toExecuteBeforeInParentTask) { + ($this->toExecuteBeforeInParentTask)(); } - return $this->waitFor(...$tasks); + return $this->forkForTask($task); + } + + protected function finishTask(Task $task): mixed + { + $output = $task->output(); + + if ($this->toExecuteAfterInParentTask) { + ($this->toExecuteAfterInParentTask)($output); + } + + unset($this->runningTasks[$task->order()]); + + return $output; } protected function forkForTask(Task $task): Task @@ -80,35 +138,6 @@ protected function forkForTask(Task $task): Task ->setConnection($socketToChild); } - protected function waitFor(Task ...$runningTasks): array - { - $output = []; - - while (count($runningTasks)) { - foreach ($runningTasks as $key => $task) { - if ($task->isFinished()) { - $taskOutput = $task->output(); - - $output[$task->order()] = $taskOutput; - - unset($runningTasks[$key]); - - if ($this->toExecuteAfterInParentTask) { - ($this->toExecuteAfterInParentTask)($taskOutput); - } - } - } - - if (! count($runningTasks)) { - break; - } - - usleep(1_000); - } - - return $output; - } - protected function currentlyInChildTask(int $pid): bool { return $pid === 0; @@ -132,4 +161,42 @@ protected function executeInChildTask( $connectionToParent->close(); } + + protected function shiftTaskFromQueue(): void + { + if (! count($this->queue)) { + return; + } + + $firstTask = array_shift($this->queue); + + $this->runningTasks[] = $this->runTask($firstTask); + } + + + protected function startRunning( + Task ...$queue + ): void { + $this->queue = $queue; + + foreach ($this->queue as $task) { + $this->runningTasks[$task->order()] = $this->runTask($task); + + unset($this->queue[$task->order()]); + + if ($this->concurrencyLimitReached()) { + break; + } + } + } + + protected function isRunning(): bool + { + return count($this->runningTasks) > 0; + } + + protected function concurrencyLimitReached(): bool + { + return $this->concurrent && count($this->runningTasks) >= $this->concurrent; + } } diff --git a/tests/ForkTest.php b/tests/ForkTest.php index 39a6fcc..33d268e 100644 --- a/tests/ForkTest.php +++ b/tests/ForkTest.php @@ -2,6 +2,7 @@ namespace Spatie\Fork\Tests; +use Carbon\Carbon; use DateTime; use PHPUnit\Framework\TestCase; use Spatie\Fork\Fork; @@ -30,35 +31,45 @@ public function it_will_execute_the_given_closures() } /** @test */ - public function it_can_execute_the_closures_concurrently() + public function it_will_execute_the_given_closures_with_concurrency_cap() { $results = Fork::new() + ->concurrent(2) ->run( function () { sleep(1); - return 1 + 1; - }, - function () { - sleep(2); - - return 2 + 2; + return Carbon::now()->second; }, function () { sleep(1); - return 3 + 3; + return Carbon::now()->second; }, function () { sleep(1); - return 4 + 4; + return Carbon::now()->second; }, ); - $this->assertEquals([2, 4, 6, 8], $results); + $this->assertEquals($results[0], $results[1]); + $this->assertNotEquals($results[1], $results[2]); + } + + /** @test */ + public function it_can_execute_the_closures_concurrently() + { + Fork::new() + ->run( + ...array_fill( + start_index: 0, + count: 20, + value: fn () => usleep(100_000), + ) // 1/10th of a second each + ); - $this->assertTookLessThanSeconds(3); + $this->assertTookLessThanSeconds(1); } /** @test */