Skip to content

improve loop #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Contracts/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public function interrupt(): void;
* @throws MqttClientException
* @throws ProtocolViolationException
*/
public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, ?int $queueWaitLimit = null): void;
public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, ?int $queueWaitLimit = null): bool;

/**
* Runs an event loop iteration that handles messages from the server and calls the registered
Expand Down
22 changes: 11 additions & 11 deletions src/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ public function __construct(
string $protocol = self::MQTT_3_1,
?Repository $repository = null,
?LoggerInterface $logger = null
)
{
) {
if (!in_array($protocol, [self::MQTT_3_1, self::MQTT_3_1_1])) {
throw new ProtocolNotSupportedException($protocol);
}
Expand Down Expand Up @@ -160,8 +159,8 @@ protected function establishSocketConnection(): void
$this->logger->debug('Using TLS for the connection to the broker.');

$shouldVerifyPeer = $this->settings->shouldTlsVerifyPeer()
|| $this->settings->getTlsCertificateAuthorityFile() !== null
|| $this->settings->getTlsCertificateAuthorityPath() !== null;
|| $this->settings->getTlsCertificateAuthorityFile() !== null
|| $this->settings->getTlsCertificateAuthorityPath() !== null;

if (!$shouldVerifyPeer) {
$this->logger->warning('Using TLS without peer verification is discouraged. Are you aware of the security risk?');
Expand Down Expand Up @@ -200,7 +199,7 @@ protected function establishSocketConnection(): void
if ($this->settings->getTlsAlpn() !== null) {
$tlsOptions['alpn_protocols'] = $this->settings->getTlsAlpn();
}

$contextOptions['ssl'] = $tlsOptions;
}

Expand Down Expand Up @@ -525,8 +524,7 @@ protected function publishMessage(
bool $retain,
?int $messageId = null,
bool $isDuplicate = false
): void
{
): void {
$this->logger->debug('Publishing a message on topic [{topic}]: {message}', [
'topic' => $topic,
'message' => $message,
Expand Down Expand Up @@ -597,8 +595,9 @@ protected function nextPingAt(): float

/**
* {@inheritDoc}
* @return bool True if loop was exited because queue was empty, False otherwise
*/
public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, ?int $queueWaitLimit = null): void
public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false, ?int $queueWaitLimit = null): bool
{
$this->logger->debug('Starting client loop to process incoming messages and the resend queue.');

Expand All @@ -616,16 +615,17 @@ public function loop(bool $allowSleep = true, bool $exitWhenQueuesEmpty = false,
// In any case, there may not be any active subscriptions though.
if ($exitWhenQueuesEmpty && $this->repository->countSubscriptions() === 0) {
if ($this->allQueuesAreEmpty()) {
break;
return true;
}

// The time limit is reached. This most likely means the outgoing queues could not be emptied in time.
// Probably the server did not respond with an acknowledgement.
if ($queueWaitLimit !== null && (microtime(true) - $loopStartedAt) > $queueWaitLimit) {
break;
return false;
}
}
}
return false;
}

/**
Expand Down Expand Up @@ -877,7 +877,7 @@ protected function handleMessage(Message $message): void
protected function allQueuesAreEmpty(): bool
{
return $this->repository->countPendingOutgoingMessages() === 0 &&
$this->repository->countPendingIncomingMessages() === 0;
$this->repository->countPendingIncomingMessages() === 0;
}

/**
Expand Down