11
11
use ApiClients \Foundation \Transport \CommandBus \Command \RequestCommand ;
12
12
use ApiClients \Foundation \Transport \CommandBus \Command \StreamingRequestCommand ;
13
13
use ApiClients \Foundation \Transport \Options as TransportOptions ;
14
+ use ApiClients \Tools \CommandBus \CommandBus ;
14
15
use ApiClients \Tools \Psr7 \Oauth1 \Definition ;
15
16
use GuzzleHttp \Psr7 \Request ;
16
17
use Psr \Http \Message \RequestInterface ;
24
25
25
26
final class AsyncClient
26
27
{
27
- const STREAM_DELIMITER = "\r\n" ;
28
-
29
28
/**
30
29
* @var string
31
30
*/
@@ -46,6 +45,11 @@ final class AsyncClient
46
45
*/
47
46
protected $ client ;
48
47
48
+ /**
49
+ * @var AsyncStreamingClient
50
+ */
51
+ protected $ streamingClient ;
52
+
49
53
public function __construct (
50
54
string $ consumerKey ,
51
55
string $ consumerSecret ,
@@ -107,6 +111,11 @@ public function withOutAccessToken(): AsyncClient
107
111
);
108
112
}
109
113
114
+ public function getCommandBus (): CommandBus
115
+ {
116
+ return $ this ->client ->getFromContainer (CommandBus::class);
117
+ }
118
+
110
119
public function user (string $ user ): PromiseInterface
111
120
{
112
121
return $ this ->client ->handle (new RequestCommand (
@@ -116,45 +125,12 @@ public function user(string $user): PromiseInterface
116
125
});
117
126
}
118
127
119
- public function sampleStream (): Observable
120
- {
121
- return $ this ->stream (
122
- new Request ('GET ' , 'https://stream.twitter.com/1.1/statuses/sample.json ' )
123
- );
124
- }
125
-
126
- public function filteredStream (array $ filter = []): Observable
128
+ public function stream (): AsyncStreamingClient
127
129
{
128
- $ postData = http_build_query ($ filter );
129
- return $ this ->stream (
130
- new Request (
131
- 'POST ' ,
132
- 'https://stream.twitter.com/1.1/statuses/filter.json ' ,
133
- [
134
- 'Content-Type ' => 'application/x-www-form-urlencoded ' ,
135
- 'Content-Length ' => strlen ($ postData ),
136
- ],
137
- $ postData
138
- )
139
- );
140
- }
130
+ if (!($ this ->streamingClient instanceof AsyncStreamingClient)) {
131
+ $ this ->streamingClient = new AsyncStreamingClient ($ this ->client );
132
+ }
141
133
142
- protected function stream (RequestInterface $ request ): Observable
143
- {
144
- return Promise::toObservable ($ this ->client ->handle (new StreamingRequestCommand (
145
- $ request
146
- )))->switchLatest ()->lift (function () {
147
- return new CutOperator (self ::STREAM_DELIMITER );
148
- })->filter (function (string $ json ) {
149
- return trim ($ json ) !== '' ; // To keep the stream alive Twitter sends an empty line at times
150
- })->jsonDecode ()->flatMap (function (array $ document ) {
151
- if (isset ($ document ['delete ' ])) {
152
- return Promise::toObservable ($ this ->client ->handle (
153
- new HydrateCommand ('DeletedTweet ' , $ document ['delete ' ])
154
- ));
155
- }
156
-
157
- return Promise::toObservable ($ this ->client ->handle (new HydrateCommand ('Tweet ' , $ document )));
158
- });
134
+ return $ this ->streamingClient ;
159
135
}
160
136
}
0 commit comments