Skip to content

Commit 4a4cb3a

Browse files
committed
Merge pull request #17 from dronemill/huge_payloads
Added support for huge payloads
2 parents ba2e5f9 + 22071bb commit 4a4cb3a

File tree

7 files changed

+172
-28
lines changed

7 files changed

+172
-28
lines changed

composer.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
"autoload": {
1212
"psr-4": {"WebSocket\\": "lib"}
1313
},
14+
"autoload-dev": {
15+
"psr-4": {"WebSocket\\Tests\\": "tests"}
16+
},
1417
"require-dev": {
1518
"phpunit/phpunit": "4.1.*",
1619
"phpunit/phpunit-selenium": "1.3.3",

examples/echoserver.php

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
$test_id = $server->getPath();
2020
$test_id = substr($test_id, 1);
2121

22-
xdebug_start_code_coverage(XDEBUG_CC_UNUSED | XDEBUG_CC_DEAD_CODE);
23-
PHPUnit_Extensions_SeleniumCommon_ExitHandler::init();
22+
if (function_exists('xdebug_get_code_coverage'))
23+
xdebug_start_code_coverage(XDEBUG_CC_UNUSED | XDEBUG_CC_DEAD_CODE);
24+
25+
if (class_exists('PHPUnit_Extensions_SeleniumCommon_ExitHandler'))
26+
PHPUnit_Extensions_SeleniumCommon_ExitHandler::init();
2427

2528
try {
2629
while(1) {
@@ -56,6 +59,9 @@
5659

5760

5861
function save_coverage_data($test_id) {
62+
63+
if (!function_exists('xdebug_get_code_coverage')) return;
64+
5965
$data = xdebug_get_code_coverage();
6066
xdebug_stop_code_coverage();
6167

lib/Base.php

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44

55
class Base {
66
protected $socket, $is_connected = false, $is_closing = false, $last_opcode = null,
7-
$close_status = null;
7+
$close_status = null, $huge_payload = null;
88

99
protected static $opcodes = array(
10-
'text' => 1,
11-
'binary' => 2,
12-
'close' => 8,
13-
'ping' => 9,
14-
'pong' => 10,
10+
'continuation' => 0,
11+
'text' => 1,
12+
'binary' => 2,
13+
'close' => 8,
14+
'ping' => 9,
15+
'pong' => 10,
1516
);
1617

1718
public function getLastOpcode() { return $this->last_opcode; }
@@ -26,20 +27,52 @@ public function setTimeout($timeout) {
2627
}
2728
}
2829

30+
public function setFragmentSize($fragment_size) {
31+
$this->options['fragment_size'] = $fragment_size;
32+
return $this;
33+
}
34+
35+
public function getFragmentSize() {
36+
return $this->options['fragment_size'];
37+
}
38+
2939
public function send($payload, $opcode = 'text', $masked = true) {
3040
if (!$this->is_connected) $this->connect(); /// @todo This is a client function, fixme!
3141

3242
if (!in_array($opcode, array_keys(self::$opcodes))) {
3343
throw new BadOpcodeException("Bad opcode '$opcode'. Try 'text' or 'binary'.");
3444
}
3545

46+
// record the length of the payload
47+
$payload_length = strlen($payload);
48+
49+
$fragment_cursor = 0;
50+
// while we have data to send
51+
while ($payload_length > $fragment_cursor) {
52+
// get a fragment of the payload
53+
$sub_payload = substr($payload, $fragment_cursor, $this->options['fragment_size']);
54+
55+
// advance the cursor
56+
$fragment_cursor += $this->options['fragment_size'];
57+
58+
// is this the final fragment to send?
59+
$final = $payload_length <= $fragment_cursor;
60+
61+
// send the fragment
62+
$this->send_fragment($final, $sub_payload, $opcode, $masked);
63+
64+
// all fragments after the first will be marked a continuation
65+
$opcode = 'continuation';
66+
}
67+
68+
}
69+
70+
protected function send_fragment($final, $payload, $opcode, $masked) {
3671
// Binary string for header.
3772
$frame_head_binstr = '';
3873

39-
4074
// Write FIN, final fragment bit.
41-
$final = true; /// @todo Support HUGE payloads.
42-
$frame_head_binstr .= $final ? '1' : '0';
75+
$frame_head_binstr .= (bool) $final ? '1' : '0';
4376

4477
// RSV 1, 2, & 3 false and unused.
4578
$frame_head_binstr .= '000';
@@ -88,6 +121,16 @@ public function send($payload, $opcode = 'text', $masked = true) {
88121
public function receive() {
89122
if (!$this->is_connected) $this->connect(); /// @todo This is a client function, fixme!
90123

124+
$this->huge_payload = '';
125+
126+
$response = null;
127+
while (is_null($response)) $response = $this->receive_fragment();
128+
129+
return $response;
130+
}
131+
132+
protected function receive_fragment() {
133+
91134
// Just read the main fragment information first.
92135
$data = $this->read(2);
93136

@@ -107,13 +150,17 @@ public function receive() {
107150
throw new ConnectionException("Bad opcode in websocket frame: $opcode_int");
108151
}
109152
$opcode = $opcode_ints[$opcode_int];
110-
$this->last_opcode = $opcode;
153+
154+
// record the opcode if we are not receiving a continutation fragment
155+
if ($opcode !== 'continuation') {
156+
$this->last_opcode = $opcode;
157+
}
111158

112159
// Masking?
113160
$mask = (boolean) (ord($data[1]) >> 7); // Bit 0 in byte 1
114161

115-
$payload = "";
116-
162+
$payload = '';
163+
117164
// Payload length
118165
$payload_length = (integer) ord($data[1]) & 127; // Bits 1-7 in byte 1
119166
if ($payload_length > 125) {
@@ -131,7 +178,6 @@ public function receive() {
131178

132179
if ($mask) {
133180
// Unmask payload.
134-
$payload = '';
135181
for ($i = 0; $i < $payload_length; $i++) $payload .= ($data[$i] ^ $masking_key[$i % 4]);
136182
}
137183
else $payload = $data;
@@ -144,16 +190,29 @@ public function receive() {
144190
$status = bindec(sprintf("%08b%08b", ord($payload[0]), ord($payload[1])));
145191
$this->close_status = $status;
146192
$payload = substr($payload, 2);
193+
194+
if (!$this->is_closing) $this->send($status_bin . 'Close acknowledged: ' . $status, 'close', true); // Respond.
147195
}
148196

149197
if ($this->is_closing) $this->is_closing = false; // A close response, all done.
150-
else $this->send($status_bin . 'Close acknowledged: ' . $status, 'close', true); // Respond.
151198

152199
// And close the socket.
153200
fclose($this->socket);
154201
$this->is_connected = false;
155202
}
156203

204+
// if this is not the last fragment, then we need to save the payload
205+
if (!$final) {
206+
$this->huge_payload .= $payload;
207+
return null;
208+
}
209+
// this is the last fragment, and we are processing a huge_payload
210+
else if ($this->huge_payload) {
211+
// sp we need to retreive the whole payload
212+
$payload = $this->huge_payload .= $payload;
213+
$this->huge_payload = null;
214+
}
215+
157216
return $payload;
158217
}
159218

lib/Client.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ public function __construct($uri, $options = array()) {
1717

1818
if (!array_key_exists('timeout', $this->options)) $this->options['timeout'] = 5;
1919

20+
// the fragment size
21+
if (!array_key_exists('fragment_size', $this->options)) $this->options['fragment_size'] = 4096;
22+
2023
$this->socket_uri = $uri;
2124
}
2225

lib/Server.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ class Server extends Base {
1515
* - port: Chose port for listening.
1616
*/
1717
public function __construct(array $options = array()) {
18+
19+
// the fragment size
20+
if (!array_key_exists('fragment_size', $options)) $options['fragment_size'] = 4096;
21+
22+
1823
$this->port = isset($options['port']) ? $options['port'] : 8000;
1924
$this->options = $options;
2025

tests/ClientTracker.php

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
namespace WebSocket\Tests;
4+
5+
use WebSocket\Client;
6+
7+
/**
8+
* A class to track the send and receive operations of the client
9+
*/
10+
class ClientTracker extends Client {
11+
public $fragment_count;
12+
13+
public function __construct($uri, $options = array()) {
14+
$this->fragment_count = array(
15+
'send' => 0,
16+
'receive' => 0,
17+
);
18+
19+
return parent::__construct($uri, $options);
20+
}
21+
22+
/**
23+
* Reset the send counter
24+
*/
25+
public function send($payload, $opcode = 'text', $masked = true) {
26+
$this->fragment_count['send'] = 0;
27+
return parent::send($payload, $opcode, $masked);
28+
}
29+
30+
/**
31+
* Increment the fragment send counter
32+
*/
33+
protected function send_fragment($final, $payload, $opcode, $masked) {
34+
$this->fragment_count['send']++;
35+
return parent::send_fragment($final, $payload, $opcode, $masked);
36+
}
37+
38+
/**
39+
* Reset the receive counter
40+
*/
41+
public function receive() {
42+
$this->fragment_count['receive'] = 0;
43+
return parent::receive();
44+
}
45+
46+
/**
47+
* Increment the fragment recieve counter
48+
*/
49+
protected function receive_fragment() {
50+
$this->fragment_count['receive']++;
51+
return parent::receive_fragment();
52+
}
53+
}

tests/unit/ClientTest.php

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
<?php
22

33
use WebSocket\Client;
4+
use WebSocket\Tests\ClientTracker;
45

56
class WebSocketTest extends PHPUnit_Framework_TestCase {
67
protected static $port;
@@ -69,11 +70,28 @@ public function testInstantiation() {
6970
$this->assertInstanceOf('WebSocket\Client', $ws);
7071
}
7172

73+
/**
74+
* Data provider for testEcho
75+
*
76+
* @return array of data
77+
*/
78+
public function dataLengthProvider() {
79+
return array(
80+
array(8),
81+
array(126),
82+
array(127),
83+
array(128),
84+
array(65000),
85+
array(66000),
86+
);
87+
}
88+
7289
/**
7390
* @dataProvider dataLengthProvider
7491
*/
7592
public function testEcho($data_length) {
76-
$ws = new Client('ws://localhost:' . self::$port . '/' . $this->test_id);
93+
$ws = new ClientTracker('ws://localhost:' . self::$port . '/' . $this->test_id);
94+
$ws->setFragmentSize(rand(10,512));
7795

7896
$greeting = '';
7997
for ($i = 0; $i < $data_length; $i++) $greeting .= 'o';
@@ -82,6 +100,8 @@ public function testEcho($data_length) {
82100
$response = $ws->receive();
83101

84102
$this->assertEquals($greeting, $response);
103+
$this->assertEquals($ws->fragment_count['send'], ceil($data_length / $ws->getFragmentSize()));
104+
$this->assertEquals($ws->fragment_count['receive'], ceil($data_length / 4096)); // the server sends with size 4096
85105
}
86106

87107
public function testBasicAuth() {
@@ -98,17 +118,6 @@ public function testBasicAuth() {
98118
$this->assertEquals("Basic Sm9obkRvZTplb0RuaG9K - $greeting", $response);
99119
}
100120

101-
public function dataLengthProvider() {
102-
return array(
103-
array(8),
104-
array(126),
105-
array(127),
106-
array(128),
107-
array(65000),
108-
array(66000),
109-
);
110-
}
111-
112121
public function testOrgEchoTwice() {
113122
$ws = new Client('ws://localhost:' . self::$port . '/' . $this->test_id);
114123

@@ -290,4 +299,10 @@ public function testSendBadOpcode() {
290299
$ws = new Client('ws://localhost:' . self::$port);
291300
$ws->send('foo', 'bad_opcode');
292301
}
302+
303+
public function testSetFragmentSize() {
304+
$ws = new Client('ws://localhost:' . self::$port);
305+
$size = $ws->setFragmentSize(123)->getFragmentSize();
306+
$this->assertSame(123, $size);
307+
}
293308
}

0 commit comments

Comments
 (0)