|
4 | 4 |
|
5 | 5 | use ApiClients\Foundation\Client;
|
6 | 6 | use ApiClients\Foundation\Hydrator\CommandBus\Command\HydrateCommand;
|
| 7 | +use ApiClients\Foundation\Transport\CommandBus\Command\RequestCommand; |
7 | 8 | use ApiClients\Foundation\Transport\CommandBus\Command\StreamingRequestCommand;
|
| 9 | +use ApiClients\Foundation\Transport\ParsedContentsInterface; |
8 | 10 | use GuzzleHttp\Psr7\Request;
|
9 | 11 | use Psr\Http\Message\RequestInterface;
|
| 12 | +use Psr\Http\Message\ResponseInterface; |
10 | 13 | use React\EventLoop\LoopInterface;
|
11 | 14 | use Rx\Observable;
|
12 | 15 | use Rx\Operator\CutOperator;
|
13 | 16 | use Rx\React\Promise;
|
14 | 17 | use Rx\Scheduler\ImmediateScheduler;
|
| 18 | +use function ApiClients\Tools\Rx\observableFromArray; |
| 19 | +use function RingCentral\Psr7\build_query; |
15 | 20 |
|
16 | 21 | final class AsyncStreamingClient implements AsyncStreamingClientInterface
|
17 | 22 | {
|
@@ -56,6 +61,28 @@ public function filtered(array $filter = []): Observable
|
56 | 61 | );
|
57 | 62 | }
|
58 | 63 |
|
| 64 | + public function searchTweets(array $filter = []): Observable |
| 65 | + { |
| 66 | + $query = build_query($filter); |
| 67 | + |
| 68 | + return Promise::toObservable($this->client->handle(new RequestCommand( |
| 69 | + new Request( |
| 70 | + 'GET', |
| 71 | + 'https://api.twitter.com/1.1/search/tweets.json?' . $query, |
| 72 | + [] |
| 73 | + ) |
| 74 | + ))->then(function (ResponseInterface $response) { |
| 75 | + /** @var ParsedContentsInterface $body */ |
| 76 | + $body = $response->getBody(); |
| 77 | + |
| 78 | + return $body->getParsedContents()['statuses']; |
| 79 | + }))->flatMap(function (array $statuses) { |
| 80 | + return observableFromArray($statuses); |
| 81 | + })->flatMap(function (array $document) { |
| 82 | + return Promise::toObservable($this->client->handle(new HydrateCommand('Tweet', $document))); |
| 83 | + }); |
| 84 | + } |
| 85 | + |
59 | 86 | protected function stream(RequestInterface $request): Observable
|
60 | 87 | {
|
61 | 88 | return Promise::toObservable($this->client->handle(new StreamingRequestCommand(
|
|
0 commit comments