Skip to content

Commit

Permalink
Start eventHandler for session only if there is listeners on websocke…
Browse files Browse the repository at this point in the history
…t channel.

Fix to do not trigger bot api updates.
  • Loading branch information
xtrime-ru committed Apr 12, 2020
1 parent ed4716c commit 0409c0c
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 69 deletions.
1 change: 0 additions & 1 deletion server.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
}

new TelegramApiServer\Server\Server(
new TelegramApiServer\Client(),
$options,
$sessions
);
28 changes: 14 additions & 14 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,22 @@
use Psr\Log\LogLevel;
use RuntimeException;
use TelegramApiServer\EventObservers\EventHandler;
use TelegramApiServer\EventObservers\EventObserver;
use function Amp\call;

class Client
{
public static Client $self;
/** @var MadelineProto\API[] */
public array $instances = [];

public static function getInstance(): Client {
if (empty(static::$self)) {
static::$self = new static();
}
return static::$self;
}

private static function isSessionLoggedIn(MadelineProto\API $instance): bool
{
return ($instance->API->authorized ?? MTProto::NOT_LOGGED_IN) === MTProto::LOGGED_IN;
Expand Down Expand Up @@ -62,29 +71,26 @@ public function removeSession($session): void
throw new InvalidArgumentException('Session not found');
}

$this->instances[$session]->setNoop();
EventObserver::stopEventHandler($session);
$this->instances[$session]->stop();

/** @see runSession() */
//Mark this session as not logged in, so no other actions will be made.
$this->instances[$session]->API->authorized = MTProto::NOT_LOGGED_IN;

unset(
$this->instances[$session],
EventHandler::$instances[$session]
);
unset($this->instances[$session]);
}

/**
* @param string|null $session
*
* @return MadelineProto\API
*/
public function getInstance(?string $session = null): MadelineProto\API
public function getSession(?string $session = null): MadelineProto\API
{
if (!$this->instances) {
throw new RuntimeException(
'No sessions available. Use combinedApi or restart server with --session option'
'No sessions available. Call /system/addSession?session=%session_name% or restart server with --session option'
);
}

Expand Down Expand Up @@ -137,7 +143,6 @@ public function runSession(MadelineProto\API $instance): Promise
function() use ($instance) {
if (static::isSessionLoggedIn($instance)) {
yield $instance->start();
yield $instance->setEventHandler(EventHandler::class);
Loop::defer(fn() => $this->loop($instance));
}
}
Expand All @@ -154,12 +159,7 @@ private function loop(MadelineProto\API $instance, callable $callback = null): v
$e->getMessage(),
[
'probable_session' => $sessionName,
'exception' => [
'exception' => get_class($e),
'code' => $e->getCode(),
'file' => $e->getFile(),
'line' => $e->getLine(),
],
'exception' => Logger::getExceptionAsArray($e),
]
);
foreach ($this->getBrokenSessions() as $session) {
Expand Down
27 changes: 7 additions & 20 deletions src/Controllers/AbstractApiController.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
use danog\MadelineProto\API;
use danog\MadelineProto\CombinedAPI;
use danog\MadelineProto\TON\API as TonAPI;
use TelegramApiServer\Client;
use TelegramApiServer\Logger;
use TelegramApiServer\MadelineProtoExtensions\ApiExtensions;
use TelegramApiServer\MadelineProtoExtensions\SystemApiExtensions;

abstract class AbstractApiController
{
public const JSON_HEADER = ['Content-Type'=>'application/json;charset=utf-8'];

protected Client $client;
protected Request $request;
protected ?File $file = null;
protected $extensionClass;
Expand All @@ -41,11 +40,11 @@ abstract class AbstractApiController
abstract protected function resolvePath(array $path);
abstract protected function callApi();

public static function getRouterCallback(Client $client, $extensionClass): CallableRequestHandler
public static function getRouterCallback($extensionClass): CallableRequestHandler
{
return new CallableRequestHandler(
static function (Request $request) use($client, $extensionClass) {
$requestCallback = new static($client, $request, $extensionClass);
static function (Request $request) use($extensionClass) {
$requestCallback = new static($request, $extensionClass);
$response = yield from $requestCallback->process();

if ($response instanceof Response) {
Expand All @@ -60,9 +59,8 @@ static function (Request $request) use($client, $extensionClass) {
);
}

public function __construct(Client $client, Request $request, $extensionClass = null)
public function __construct(Request $request, $extensionClass = null)
{
$this->client = $client;
$this->request = $request;
$this->extensionClass = $extensionClass;
}
Expand Down Expand Up @@ -144,12 +142,7 @@ private function generateResponse()
}

} catch (\Throwable $e) {
error($e->getMessage(), [
'exception' => get_class($e),
'code' => $e->getCode(),
'file' => $e->getFile(),
'line' => $e->getLine(),
]);
error($e->getMessage(), Logger::getExceptionAsArray($e));
$this->setError($e);
}

Expand Down Expand Up @@ -202,13 +195,7 @@ private function setError(\Throwable $e): self
$this->setPageCode(400);
}

$this->page['errors'][] = [
'message' => $e->getMessage(),
'exception' => get_class($e),
'code' => $e->getCode(),
'file' => $e->getFile(),
'line' => $e->getLine(),
];
$this->page['errors'][] = Logger::getExceptionAsArray($e);

return $this;
}
Expand Down
3 changes: 2 additions & 1 deletion src/Controllers/ApiController.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace TelegramApiServer\Controllers;

use Amp\Promise;
use TelegramApiServer\Client;

class ApiController extends AbstractApiController
{
Expand All @@ -27,7 +28,7 @@ protected function resolvePath(array $path): void
*/
protected function callApi()
{
$madelineProto = $this->client->getInstance($this->session);
$madelineProto = Client::getInstance()->getSession($this->session);
return $this->callApiCommon($madelineProto);
}

Expand Down
29 changes: 16 additions & 13 deletions src/Controllers/EventsController.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,31 @@
use Amp\Http\Server\Router;
use Amp\Promise;
use Amp\Success;
use Amp\Websocket\Client as WebsocketClient;
use Amp\Websocket\Server\ClientHandler;
use Amp\Websocket\Server\Websocket;
use Amp\Websocket\Server\Websocket as WebsocketServer;
use TelegramApiServer\Client;
use TelegramApiServer\EventObservers\EventObserver;
use function Amp\call;

class EventsController implements ClientHandler
{
private Client $client;
private ?Websocket $endpoint;
private ?WebsocketServer $endpoint;

public static function getRouterCallback(Client $client): Websocket

public static function getRouterCallback(): WebsocketServer
{
$class = new static();
$class->client = $client;
return new Websocket($class);
return new WebsocketServer($class);
}

public function onStart(Websocket $endpoint): Promise
public function onStart(WebsocketServer $endpoint): Promise
{
$this->endpoint = $endpoint;
return new Success;
}

public function onStop(Websocket $endpoint): Promise
public function onStop(WebsocketServer $endpoint): Promise
{
$this->endpoint = null;
return new Success;
Expand All @@ -42,7 +42,7 @@ public function handleHandshake(Request $request, Response $response): Promise
try {
$session = $request->getAttribute(Router::class)['session'] ?? null;
if ($session) {
$this->client->getInstance($session);
Client::getInstance()->getSession($session);
}
} catch (\Throwable $e){
$response->setStatus(400);
Expand All @@ -51,11 +51,11 @@ public function handleHandshake(Request $request, Response $response): Promise
return new Success($response);
}

public function handleClient(\Amp\Websocket\Client $client, Request $request, Response $response): Promise
public function handleClient(WebsocketClient $client, Request $request, Response $response): Promise
{
return call(function() use($client, $request) {
$requestedSession = $request->getAttribute(Router::class)['session'] ?? null;
$this->subscribeForUpdates($client, $requestedSession);
yield from $this->subscribeForUpdates($client, $requestedSession);

while ($message = yield $client->receive()) {
// Messages received on the connection are ignored and discarded.
Expand All @@ -64,12 +64,15 @@ public function handleClient(\Amp\Websocket\Client $client, Request $request, Re
});
}

private function subscribeForUpdates(\Amp\Websocket\Client $client, ?string $requestedSession): void
private function subscribeForUpdates(WebsocketClient $client, ?string $requestedSession): \Generator
{
$clientId = $client->getId();

$client->onClose(static function() use($clientId) {
yield EventObserver::startEventHandler($requestedSession);

$client->onClose(static function() use($clientId, $requestedSession) {
EventObserver::removeSubscriber($clientId);
EventObserver::stopEventHandler($requestedSession);
});

EventObserver::addSubscriber($clientId, function($update, ?string $session) use($clientId, $requestedSession) {
Expand Down
3 changes: 2 additions & 1 deletion src/Controllers/SystemController.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace TelegramApiServer\Controllers;

use Amp\Promise;
use TelegramApiServer\Client;

class SystemController extends AbstractApiController
{
Expand All @@ -23,7 +24,7 @@ protected function resolvePath(array $path): void
*/
protected function callApi()
{
$madelineProtoExtensions = new $this->extensionClass($this->client);
$madelineProtoExtensions = new $this->extensionClass(Client::getInstance());
$result = $madelineProtoExtensions->{$this->api[0]}(...$this->parameters);
return $result;
}
Expand Down
70 changes: 70 additions & 0 deletions src/EventObservers/EventObserver.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,85 @@
namespace TelegramApiServer\EventObservers;


use Amp\Promise;
use TelegramApiServer\Client;
use TelegramApiServer\Logger;
use function Amp\call;

class EventObserver
{
use ObserverTrait;

/** @var int[] */
private static array $sessionClients = [];

public static function notify(array $update, string $sessionName) {
foreach (static::$subscribers as $clientId => $callback) {
notice("Pass update to callback. ClientId: {$clientId}");
$callback($update, $sessionName);
}
}

private static function addSessionClient(string $session): void
{
if (empty(static::$sessionClients[$session])) {
static::$sessionClients[$session] = 0;
}
++static::$sessionClients[$session];
}

private static function removeSessionClient(string $session): void
{
if (!empty(static::$sessionClients[$session])) {
--static::$sessionClients[$session];
}
}

public static function startEventHandler(?string $requestedSession = null): Promise
{
return call(static function() use($requestedSession) {
$sessions = [];
if ($requestedSession === null) {
$sessions = array_keys(Client::getInstance()->instances);
} else {
$sessions[] = $requestedSession;
}

foreach ($sessions as $session) {
static::addSessionClient($session);
if (static::$sessionClients[$session] === 1) {
try {
warning("Start EventHandler: {$session}");
yield Client::getInstance()->getSession($session)->setEventHandler(EventHandler::class);
} catch (\Throwable $e) {
static::removeSessionClient($session);
error('Cant set EventHandler', [
'session' => $session,
'exception' => Logger::getExceptionAsArray($e)
]);
}
}
}
});
}

public static function stopEventHandler(?string $requestedSession = null): void
{
$sessions = [];
if ($requestedSession === null) {
$sessions = array_keys(Client::getInstance()->instances);
} else {
$sessions[] = $requestedSession;
}
foreach ($sessions as $session) {
static::removeSessionClient($session);
if (empty(static::$sessionClients[$session])) {
warning("Stop EventHandler: {$session}");
Client::getInstance()->getSession($session)->setNoop();
unset(EventHandler::$instances[$session]);
}
}

}

}
10 changes: 10 additions & 0 deletions src/Logger.php
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,14 @@ private function format(string $level, string $message, array $context): string
: ''
) . PHP_EOL;
}

public static function getExceptionAsArray(\Throwable $exception) {
return [
'exception' => get_class($exception),
'message' => $exception->getMessage(),
'file' => $exception->getFile(),
'line' => $exception->getLine(),
'code' => $exception->getCode(),
];
}
}
Loading

0 comments on commit 0409c0c

Please sign in to comment.