Skip to content
This repository was archived by the owner on Jan 13, 2021. It is now read-only.

Commit 070dd9b

Browse files
committed
Merge pull request #76 from Lukasa/readahead
Add read-ahead logic.
2 parents fd393ec + 9514dc4 commit 070dd9b

File tree

5 files changed

+62
-11
lines changed

5 files changed

+62
-11
lines changed

hyper/http20/bufsocket.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
process.
1212
"""
1313
import select
14+
from .exceptions import ConnectionResetError
1415

1516
class BufferedSocket(object):
1617
"""
@@ -61,6 +62,20 @@ def _buffer_end(self):
6162
"""
6263
return self._index + self._bytes_in_buffer
6364

65+
@property
66+
def can_read(self):
67+
"""
68+
Whether or not there is more data to read from the socket.
69+
"""
70+
if self._bytes_in_buffer:
71+
return True
72+
73+
read = select.select([self._sck], [], [], 0)[0]
74+
if read:
75+
return True
76+
77+
return False
78+
6479
def recv(self, amt):
6580
"""
6681
Read some data from the socket.
@@ -105,6 +120,11 @@ def read_all_from_buffer():
105120
if ((self._remaining_capacity > self._bytes_in_buffer) and
106121
(should_read)):
107122
count = self._sck.recv_into(self._buffer_view[self._buffer_end:])
123+
124+
# The socket just got closed. We should throw an exception if we
125+
# were asked for more data than we can return.
126+
if not count and amt > self._bytes_in_buffer:
127+
raise ConnectionResetError()
108128
self._bytes_in_buffer += count
109129

110130
# Read out the bytes and update the index.

hyper/http20/connection.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
)
1616
from .response import HTTP20Response, HTTP20Push
1717
from .window import FlowControlManager
18-
from .exceptions import ConnectionError
18+
from .exceptions import ConnectionError, ConnectionResetError
1919
from .bufsocket import BufferedSocket
2020

2121
import errno
@@ -468,15 +468,13 @@ def _adjust_receive_window(self, frame_len):
468468

469469
return
470470

471-
def _recv_cb(self):
471+
def _consume_single_frame(self):
472472
"""
473-
This is the callback used by streams to read data from the connection.
473+
Consumes a single frame from the TCP stream.
474474
475-
It expects to read a single frame, and then to deserialize that frame
476-
and pass it to the relevant stream. This is generally called by a
477-
stream, not by the connection itself, and it's likely that streams will
478-
read a frame that doesn't belong to them. That's ok: streams need to
479-
make a decision to spin around again.
475+
Right now this method really does a bit too much: it shouldn't be
476+
responsible for determining if a frame is valid or to increase the
477+
flow control window.
480478
"""
481479
# Begin by reading 9 bytes from the socket.
482480
header = self._sock.recv(9)
@@ -522,6 +520,28 @@ def _recv_cb(self):
522520
else:
523521
self.receive_frame(frame)
524522

523+
def _recv_cb(self):
524+
"""
525+
This is the callback used by streams to read data from the connection.
526+
527+
It expects to read a single frame, and then to deserialize that frame
528+
and pass it to the relevant stream. It then attempts to optimistically
529+
read further frames (in an attempt to ensure that we see control frames
530+
as early as possible).
531+
532+
This is generally called by a stream, not by the connection itself, and
533+
it's likely that streams will read a frame that doesn't belong to them.
534+
"""
535+
self._consume_single_frame()
536+
count = 9
537+
538+
while count and self._sock is not None and self._sock.can_read:
539+
# If the connection has been closed, bail out.
540+
try:
541+
self._consume_single_frame()
542+
except ConnectionResetError:
543+
break
544+
525545

526546
# The following two methods are the implementation of the context manager
527547
# protocol.

hyper/http20/exceptions.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,13 @@ class ProtocolError(HTTP20Error):
3939
The remote party violated the HTTP/2 protocol.
4040
"""
4141
pass
42+
43+
44+
# Create our own ConnectionResetError.
45+
try: # pragma: no cover
46+
ConnectionResetError = ConnectionResetError
47+
except NameError: # pragma: no cover
48+
class ConnectionResetError(Exception):
49+
"""
50+
A HTTP/2 connection was unexpectedly reset.
51+
"""

test/test_hyper.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2029,6 +2029,7 @@ class DummySocket(object):
20292029
def __init__(self):
20302030
self.queue = []
20312031
self.buffer = BytesIO()
2032+
self.can_read = False
20322033

20332034
def send(self, data):
20342035
self.queue.append(data)

test/test_socket.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,16 @@ def test_receive_multiple_packets_one_at_a_time(self, monkeypatch):
6363

6464
assert d == b'Herebeginsthetestdata'
6565

66-
def test_receive_empty_packet(self, monkeypatch):
66+
def test_receive_small_packets(self, monkeypatch):
6767
monkeypatch.setattr(
6868
hyper.http20.bufsocket.select, 'select', dummy_select
6969
)
7070
s = DummySocket()
7171
b = BufferedSocket(s)
72-
s.inbound_packets = [b'Here', b'begins', b'', b'the', b'', b'test', b'data']
72+
s.inbound_packets = [b'Here', b'begins', b'the', b'test', b'data']
7373

7474
d = b''
75-
for _ in range(7):
75+
for _ in range(5):
7676
d += b.recv(100).tobytes()
7777

7878
assert d == b'Herebeginsthetestdata'

0 commit comments

Comments
 (0)