Skip to content

Commit 93f5a9c

Browse files
committed
Restore backpressure to unbuffered results
1 parent cf43bc0 commit 93f5a9c

File tree

1 file changed

+25
-4
lines changed

1 file changed

+25
-4
lines changed

src/PqUnbufferedResultSet.php

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Amp\Producer;
66
use Amp\Promise;
77
use pq;
8+
use function Amp\asyncCall;
89

910
final class PqUnbufferedResultSet implements ResultSet
1011
{
@@ -17,6 +18,9 @@ final class PqUnbufferedResultSet implements ResultSet
1718
/** @var array|object Last row emitted. */
1819
private $currentRow;
1920

21+
/** @var bool */
22+
private $destroyed = false;
23+
2024
/**
2125
* @param callable(): $fetch Function to fetch next result row.
2226
* @param \pq\Result $result PostgreSQL result object.
@@ -25,19 +29,36 @@ public function __construct(callable $fetch, pq\Result $result, callable $releas
2529
{
2630
$this->numCols = $result->numCols;
2731

28-
$this->producer = new Producer(static function (callable $emit) use ($release, $result, $fetch) {
32+
$this->producer = new Producer(static function (callable $emit) use (&$destroyed, $release, $result, $fetch) {
2933
try {
3034
do {
3135
$result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY;
32-
$emit($result);
33-
$result = yield $fetch();
36+
$next = $fetch();
37+
yield $emit($result);
38+
$result = yield $next;
3439
} while ($result instanceof pq\Result);
3540
} finally {
41+
$destroyed = true;
3642
$release();
3743
}
3844
});
3945
}
4046

47+
public function __destruct()
48+
{
49+
if ($this->destroyed) {
50+
return;
51+
}
52+
53+
asyncCall(function () {
54+
try {
55+
while (yield $this->producer->advance());
56+
} catch (\Throwable $exception) {
57+
// Ignore iterator failure when destroying.
58+
}
59+
});
60+
}
61+
4162
/**
4263
* {@inheritdoc}
4364
*/
@@ -58,7 +79,7 @@ public function getCurrent(): array
5879
}
5980

6081
$result = $this->producer->getCurrent();
61-
\assert($result instanceof \pq\Result);
82+
\assert($result instanceof pq\Result);
6283

6384
return $this->currentRow = $result->fetchRow(pq\Result::FETCH_ASSOC);
6485
}

0 commit comments

Comments
 (0)