Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add disconnect() method to release resources deterministically #290

Merged
merged 2 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/Contract/PheanstalkClientInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Pheanstalk\Contract;

interface PheanstalkClientInterface
{
/**
* Closes the current connection and releases all associated resources.
*
* This method ensures that after the call, no resources are held by the client,
* allowing garbage collection to safely reclaim memory. If no connection is open,
* this method does nothing (noop). Future command dispatches will automatically
* establish a new connection if needed.
*
* Note: If the client has any reserved jobs at the time of disconnection, beanstalkd
* will automatically release them back into the ready queue.
*/
public function disconnect(): void;
}
2 changes: 1 addition & 1 deletion src/Contract/PheanstalkManagerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use Pheanstalk\Values\TubeName;
use Pheanstalk\Values\TubeStats;

interface PheanstalkManagerInterface
interface PheanstalkManagerInterface extends PheanstalkClientInterface
{
/**
* Kicks buried or delayed jobs into a 'ready' state.
Expand Down
2 changes: 1 addition & 1 deletion src/Contract/PheanstalkPublisherInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Pheanstalk\Values\TubeName;

interface PheanstalkPublisherInterface
interface PheanstalkPublisherInterface extends PheanstalkClientInterface
{
public const int DEFAULT_DELAY = 0; // no delay
public const int DEFAULT_PRIORITY = 1024; // most urgent: 0, least urgent: 4294967295
Expand Down
2 changes: 1 addition & 1 deletion src/Contract/PheanstalkSubscriberInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Pheanstalk\Values\TubeList;
use Pheanstalk\Values\TubeName;

interface PheanstalkSubscriberInterface
interface PheanstalkSubscriberInterface extends PheanstalkClientInterface
{
/**
* Permanently deletes a job.
Expand Down
5 changes: 5 additions & 0 deletions src/Pheanstalk.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public function stats(): ServerStats
return $this->manager->stats();
}

public function disconnect(): void
{
$this->manager->disconnect();
}

public function bury(JobIdInterface $job, int $priority = self::DEFAULT_PRIORITY): void
{
$this->subscriber->bury($job, $priority);
Expand Down
5 changes: 5 additions & 0 deletions src/PheanstalkManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,9 @@ public function stats(): ServerStats
$command = new Command\StatsCommand();
return $command->interpret($this->dispatch($command));
}

public function disconnect(): void
{
$this->connection->disconnect();
}
}
5 changes: 5 additions & 0 deletions src/PheanstalkPublisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public function put(
$command = new PutCommand($data, $priority, $delay, $timeToRelease);
return $command->interpret($this->connection->dispatchCommand($command));
}

public function disconnect(): void
{
$this->connection->disconnect();
}
}
5 changes: 5 additions & 0 deletions src/PheanstalkSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,9 @@ public function reserveJob(JobIdInterface $job): Job
$command = new ReserveJobCommand($job);
return $command->interpret($this->dispatch($command));
}

public function disconnect(): void
{
$this->connection->disconnect();
}
}
19 changes: 19 additions & 0 deletions tests/Unit/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,34 @@
public function testDisconnect(): void
{
$socket = $this->getMockBuilder(SocketInterface::class)->getMock();
$socket->expects(self::once())->method('disconnect');

Check failure on line 32 in tests/Unit/ConnectionTest.php

View workflow job for this annotation

GitHub Actions / Static analysis

Call to method method() on an unknown class PHPUnit\Framework\MockObject\Builder\InvocationMocker.
$factory = $this->getMockBuilder(SocketFactoryInterface::class)->getMock();
$factory->expects(self::atLeastOnce())->method('create')->willReturn($socket);

Check failure on line 34 in tests/Unit/ConnectionTest.php

View workflow job for this annotation

GitHub Actions / Static analysis

Call to method method() on an unknown class PHPUnit\Framework\MockObject\Builder\InvocationMocker.

$connection = new Connection($factory);
$connection->connect();
$connection->disconnect();
}

public function testReconnectAfterDispatch(): void
{
$socket = $this->getMockBuilder(SocketInterface::class)->getMock();
$socket->expects(self::once())->method('disconnect');

Check failure on line 44 in tests/Unit/ConnectionTest.php

View workflow job for this annotation

GitHub Actions / Static analysis

Call to method method() on an unknown class PHPUnit\Framework\MockObject\Builder\InvocationMocker.
$socket->expects(self::once())

Check failure on line 45 in tests/Unit/ConnectionTest.php

View workflow job for this annotation

GitHub Actions / Static analysis

Call to method method() on an unknown class PHPUnit\Framework\MockObject\Builder\InvocationMocker.
->method('getLine')
->willReturn(ResponseType::Using->value);

$factory = $this->getMockBuilder(SocketFactoryInterface::class)->getMock();
$factory->expects(self::exactly(2))

Check failure on line 50 in tests/Unit/ConnectionTest.php

View workflow job for this annotation

GitHub Actions / Static analysis

Call to method method() on an unknown class PHPUnit\Framework\MockObject\Builder\InvocationMocker.
->method('create')
->willReturn($socket);

$connection = new Connection($factory);
$connection->connect();
$connection->disconnect();
$connection->dispatchCommand(new UseCommand(new TubeName('foo')));
}

private function getCommand(): CommandInterface
{
return new UseCommand(new TubeName('tube5'));
Expand All @@ -51,11 +70,11 @@
$socket = $this->getMockBuilder(SocketInterface::class)
->getMock();

$socket->expects(self::once())

Check failure on line 73 in tests/Unit/ConnectionTest.php

View workflow job for this annotation

GitHub Actions / Static analysis

Call to method method() on an unknown class PHPUnit\Framework\MockObject\Builder\InvocationMocker.
->method('getLine')
->willReturn($line);

$socket->expects(self::once())->method('write');

Check failure on line 77 in tests/Unit/ConnectionTest.php

View workflow job for this annotation

GitHub Actions / Static analysis

Call to method method() on an unknown class PHPUnit\Framework\MockObject\Builder\InvocationMocker.

return $this->createConnectionBySocket($socket);
}
Expand Down Expand Up @@ -102,7 +121,7 @@
{
$socket = $this->getMockBuilder(SocketInterface::class)
->getMock();
$socket->expects(self::exactly(2))

Check failure on line 124 in tests/Unit/ConnectionTest.php

View workflow job for this annotation

GitHub Actions / Static analysis

Call to method method() on an unknown class PHPUnit\Framework\MockObject\Builder\InvocationMocker.
->method('write')
->willReturnCallback(static function () {
static $firstRun = true;
Expand All @@ -112,7 +131,7 @@
}
});

$socket->expects(self::once())->method('disconnect');

Check failure on line 134 in tests/Unit/ConnectionTest.php

View workflow job for this annotation

GitHub Actions / Static analysis

Call to method method() on an unknown class PHPUnit\Framework\MockObject\Builder\InvocationMocker.

$connection = $this->createConnectionBySocket($socket);

Expand All @@ -121,7 +140,7 @@
self::fail('Expected ConnectionException was not thrown');
} catch (ConnectionException) {
}
$socket->expects(self::exactly(1))

Check failure on line 143 in tests/Unit/ConnectionTest.php

View workflow job for this annotation

GitHub Actions / Static analysis

Call to method method() on an unknown class PHPUnit\Framework\MockObject\Builder\InvocationMocker.
->method('getLine')
->willReturn(ResponseType::Released->value);
$connection->dispatchCommand($this->getCommand());
Expand Down