Skip to content

Commit

Permalink
* http/1.1 keepalive support for chunked(streaming) responses
Browse files Browse the repository at this point in the history
* max_connection_reqs to control requests per keepalive connection
* tweak some tests for better cpantesters matrix
  • Loading branch information
vividsnow committed Sep 1, 2024
1 parent 6cb46b6 commit 580ca9f
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 29 deletions.
6 changes: 6 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
Revision history for Perl extension Feersum

1.501 Sat Aug 31 09:10:55 2024 -0200
- http/1.1 keepalive support for chunked(streaming) responses
- max_connection_reqs to control requests per keepalive connection
- tweak some tests for better cpantesters matrix

1.500 Tue Aug 20 18:10:55 2024 -0200
Features:
- native interface: access specific parts of request
- http/1.1 keepalive support
- http/1.1 date header
- defer accept, accept4

Backward incompatibly:
- remove adobe flash policy support

Expand Down
68 changes: 51 additions & 17 deletions Feersum.xs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ struct feer_conn {
enum feer_respond_state responding;
enum feer_receive_state receiving;
bool is_keepalive;
int reqs;

unsigned int in_callback;
unsigned int is_http11:1;
Expand Down Expand Up @@ -302,6 +303,7 @@ static SV *shutdown_cb_cv = NULL;
static bool shutting_down = 0;
static int active_conns = 0;
static double read_timeout = READ_TIMEOUT;
static unsigned int max_connection_reqs = 0;

static SV *feer_server_name = NULL;
static SV *feer_server_port = NULL;
Expand Down Expand Up @@ -672,6 +674,7 @@ new_feer_conn (EV_P_ int conn_fd, struct sockaddr *sa)
c->responding = RESPOND_NOT_STARTED;
c->receiving = RECEIVE_HEADERS;
c->is_keepalive = 0;
c->reqs = 0;

ev_io_init(&c->read_ev_io, try_conn_read, conn_fd, EV_READ);
c->read_ev_io.data = (void *)c;
Expand Down Expand Up @@ -1129,6 +1132,7 @@ try_conn_read(EV_P_ ev_io *w, int revents)
else goto try_read_again_reset_timer;
}
#endif

if (process_request_headers(c, ret))
goto try_read_again_reset_timer;
else
Expand Down Expand Up @@ -1307,6 +1311,7 @@ process_request_headers (struct feer_conn *c, int body_offset)

c->is_http11 = (req->minor_version == 1);
c->is_keepalive = is_keepalive && c->is_http11;
c->reqs++;

change_receiving_state(c, RECEIVE_BODY);

Expand Down Expand Up @@ -1388,16 +1393,16 @@ process_request_headers (struct feer_conn *c, int body_offset)
unlikely(str_case_eq("connection", 10, hdr->name, hdr->name_len)))
{
if (likely(c->is_http11)
&& likely(str_case_eq("close", 5, hdr->value, hdr->value_len))
&& c->is_keepalive)
&& likely(c->is_keepalive)
&& likely(str_case_eq("close", 5, hdr->value, hdr->value_len)))
{
c->is_keepalive = 0;
trace("setting conn %d to close after response\n", c->fd);
}
else if (
likely(!c->is_http11)
&& likely(str_case_eq("keep-alive", 10, hdr->value, hdr->value_len))
&& !c->is_keepalive)
&& likely(is_keepalive)
&& str_case_eq("keep-alive", 10, hdr->value, hdr->value_len))
{
c->is_keepalive = 1;
trace("setting conn %d to keep after response\n", c->fd);
Expand All @@ -1406,6 +1411,11 @@ process_request_headers (struct feer_conn *c, int body_offset)
// TODO: support "Transfer-Encoding: chunked" bodies
}

if (max_connection_reqs > 0 && c->reqs >= max_connection_reqs) {
c->is_keepalive = 0;
trace("reached max requests per connection (%d), will close after response\n", max_connection_reqs);
}

if (likely(next_req_follows)) goto got_it_all; // optimize for GET
else if (likely(got_content_length)) goto got_cl;

Expand Down Expand Up @@ -1494,6 +1504,7 @@ respond_with_server_error (struct feer_conn *c, const char *msg, STRLEN msg_len,
stop_read_timer(c);
change_responding_state(c, RESPOND_SHUTDOWN);
change_receiving_state(c, RECEIVE_SHUTDOWN);
if (c->is_keepalive) c->is_keepalive = 0;
conn_write_ready(c);
}

Expand Down Expand Up @@ -1980,11 +1991,24 @@ feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers,
add_crlf_to_wbuf(c);
}

if (likely(c->is_http11)) {
#ifdef DATE_HEADER
generate_date_header();
add_const_to_wbuf(c, DATE_BUF, DATE_HEADER_LENGTH);
#endif
if (unlikely(!c->is_keepalive))
add_const_to_wbuf(c, "Connection: close" CRLF, 19);
} else if (unlikely(c->is_keepalive) && !streaming)
add_const_to_wbuf(c, "Connection: keep-alive" CRLF, 24);

if (streaming) {
if (c->is_http11)
add_const_to_wbuf(c, "Transfer-Encoding: chunked" CRLFx2, 30);
else
add_const_to_wbuf(c, "Connection: close" CRLFx2, 21);
else {
add_crlf_to_wbuf(c);
// cant do keep-alive for streaming http/1.0 since client completes read on close
if (unlikely(c->is_keepalive)) c->is_keepalive = 0;
}
}

conn_write_ready(c);
Expand Down Expand Up @@ -2019,17 +2043,6 @@ feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body)
body_is_string = 1;
}

if (likely(c->is_http11)) {
#ifdef DATE_HEADER
generate_date_header();
#endif
add_const_to_wbuf(c, DATE_BUF, DATE_HEADER_LENGTH);
if (unlikely(!c->is_keepalive))
add_const_to_wbuf(c, "Connection: close" CRLF, 19);
}
else if (unlikely(c->is_keepalive))
add_const_to_wbuf(c, "Connection: keep-alive" CRLF, 24);

SV *cl_sv; // content-length future
struct iovec *cl_iov;
if (likely(c->auto_cl))
Expand Down Expand Up @@ -2586,6 +2599,27 @@ set_keepalive (SV *self, SV *set)
is_keepalive = SvTRUE(set);
}

unsigned int
max_connection_reqs (SV *self, ...)
PROTOTYPE: $;$
CODE:
{
if (items <= 1) {
RETVAL = max_connection_reqs;
}
else if (items == 2) {
SV *num = ST(1);
NV new_max_connection_reqs = SvIV(num);
if (!(new_max_connection_reqs >= 0)) {
croak("must set a positive value");
}
trace("set max requests per connection %d\n", (unsigned int)new_max_connection_reqs);
max_connection_reqs = (unsigned int) new_max_connection_reqs;
}
}
OUTPUT:
RETVAL

void
DESTROY (SV *self)
PPCODE:
Expand Down
2 changes: 1 addition & 1 deletion lib/Feersum.pm
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use warnings;
use EV ();
use Carp ();

our $VERSION = '1.500';
our $VERSION = '1.501';

require Feersum::Connection;
require Feersum::Connection::Handle;
Expand Down
10 changes: 8 additions & 2 deletions lib/Feersum/Runner.pm
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ sub _prepare {
$f->use_socket($sock);

if (my $opts = $self->{options}) {
$self->{$_} = delete $opts->{$_} for grep $opts->{$_}, qw/pre_fork keepalive read_timeout/;
$self->{$_} = delete $opts->{$_} for grep defined($opts->{$_}),
qw/pre_fork keepalive read_timeout max_connection_reqs/;
}
$f->set_keepalive($_) for grep $_, delete $self->{keepalive};
$f->set_keepalive($_) for grep defined, delete $self->{keepalive};
$f->read_timeout($_) for grep $_, delete $self->{read_timeout};
$f->max_connection_reqs($_) for grep $_, delete $self->{max_connection_reqs};

$self->{endjinn} = $f;
return;
Expand Down Expand Up @@ -235,6 +237,10 @@ Enable/disable http keepalive requests.
Set read/keepalive timeout in seconds.
=item max_connection_reqs
Set max requests per connection in case of keepalive - 0(default) for unlimited.
=item quiet
Don't be so noisy. (default: on)
Expand Down
20 changes: 17 additions & 3 deletions t/05-streaming.t
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use strict;
use constant HARDER => $ENV{RELEASE_TESTING} ? 10 : 1;
use constant CLIENTS_11 => HARDER * 2;
use constant CLIENTS_10 => HARDER * 2;
use constant CLIENTS => CLIENTS_11 + CLIENTS_10;
use Test::More tests => 7 + 21 * CLIENTS_11 + 22 * CLIENTS_10;
use constant CLIENTS => CLIENTS_11 * 2 + CLIENTS_10;
use Test::More tests => 7 + 44 * CLIENTS_11 + 22 * CLIENTS_10;
use Test::Fatal;
use lib 't'; use Utils;

Expand Down Expand Up @@ -101,6 +101,7 @@ is exception {
sub client {
my $cnum = sprintf("%04d",shift);
my $is_chunked = shift || 0;
my $is_keepalive = shift || 0;
$cv->begin;
my $h; $h = simple_client GET => '/foo',
name => $cnum,
Expand All @@ -110,18 +111,28 @@ sub client {
"Accept" => "*/*",
'X-Client' => $cnum,
},
$is_chunked && $is_keepalive ? (keepalive => 1) : (),
sub {
my ($body, $headers) = @_;
is $headers->{Status}, 200, "$cnum got 200"
or diag $headers->{Reason};
if ($is_chunked) {
is $headers->{HTTPVersion}, '1.1';
is $headers->{'transfer-encoding'}, "chunked", "$cnum got chunked!";
if (!$is_keepalive) {
is $headers->{'connection'}, 'close', "$cnum conn close";
} else {
ok !exists($headers->{'connection'}), 'conn keep';
}
}
else {
is $headers->{HTTPVersion}, '1.0';
ok !exists $headers->{'transfer-encoding'}, "$cnum not chunked!";
is $headers->{'connection'}, 'close', "$cnum conn closed";
if ($is_keepalive) {
is $headers->{'connection'}, 'keep-alive', "$cnum conn keep";
} else {
ok !exists($headers->{'connection'}), 'conn close';
}
}
is_deeply [split /\n/,$body], [
"$cnum Hello streaming world! chunk one",
Expand All @@ -141,6 +152,9 @@ sub client {
client(1000+$_,1) for (1..CLIENTS_11);
client(2000+$_,0) for (1..CLIENTS_10); # HTTP/1.0 style

$evh->set_keepalive(1);
client(1000+$_,1,1) for (1..CLIENTS_11);

$cv->recv;
is $started, CLIENTS, 'handlers started';
is $finished, CLIENTS, 'handlers finished';
Expand Down
5 changes: 4 additions & 1 deletion t/11-runner.t
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use strict;
use Test::More;
use utf8;
use lib 't'; use Utils;
use File::Spec::Functions 'rel2abs';

BEGIN {
plan skip_all => "Need Test::TCP 1.06 to run this test"
Expand All @@ -28,6 +29,8 @@ plan tests => 15;
ok -f 'eg/app.feersum' && -r _, "found eg/app.feersum";
ok -f 'eg/chat.feersum' && -r _, "found eg/chat.feersum";

note(my $app_path = rel2abs('eg/app.feersum'));

test_tcp(
client => sub {
my $port = shift;
Expand All @@ -50,7 +53,7 @@ test_tcp(

my $runner;
eval {
my $app = do 'eg/app.feersum';
my $app = do $app_path;
ok $app, "did the app";
$runner = Feersum::Runner->new(
listen => ["localhost:$port"],
Expand Down
4 changes: 3 additions & 1 deletion t/13-pre-fork.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use constant CLIENTS => HARDER ? 30 : 4;
use Test::More tests => 4 + CLIENTS*3;
use utf8;
use lib 't'; use Utils;
use File::Spec::Functions 'rel2abs';

use_ok 'Feersum::Runner';

Expand All @@ -29,6 +30,7 @@ sub simple_get {
};
}

note(my $app_path = rel2abs('eg/app.feersum'));
my $pid = fork;
die "can't fork: $!" unless defined $pid;
if (!$pid) {
Expand All @@ -37,7 +39,7 @@ if (!$pid) {
my $runner = Feersum::Runner->new(
listen => ["localhost:$port"],
server_starter => 1,
app_file => 'eg/app.feersum',
app_file => $app_path,
pre_fork => NUM_FORK,
quiet => 1,
);
Expand Down
24 changes: 22 additions & 2 deletions t/51-psgi-streaming.t
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!perl
use warnings;
use strict;
use Test::More tests => 36;
use Test::More tests => 48;
use lib 't'; use Utils;

BEGIN { use_ok('Feersum') };
Expand Down Expand Up @@ -116,6 +116,7 @@ using_writer: {
is $headers->{'Status'}, 200, "Response OK";
is $headers->{'content-type'}, 'application/json', "... is JSON";
is $headers->{'transfer-encoding'}, 'chunked', '... was chunked';
is $headers->{'connection'}, 'close', '... close';
is $body, q({"message":"O hai 2"}), "... correct de-chunked body";
$cv->end;
undef $h;
Expand All @@ -131,12 +132,31 @@ using_writer_and_1_0: {
is $headers->{'Status'}, 200, "Response OK";
is $headers->{'content-type'}, 'application/json', "... is JSON";
ok !$headers->{'transfer-encoding'}, '... was not chunked';
is $headers->{'connection'}, 'close', '... got close';
isnt $headers->{'connection'}, 'keep-alive', '... got close';
is $body, q({"message":"O hai 3"}), "... correct body";
$cv->end;
undef $h2;
};
$cv->recv;
}

$evh->set_keepalive(1);

using_writer_and_1_1: {
my $cv = AE::cv;
$cv->begin;
my $h2; $h2 = simple_client GET => '/', proto => '1.1', keepalive => 1, sub {
my ($body, $headers) = @_;
is $headers->{'Status'}, 200, "Response OK";
is $headers->{'content-type'}, 'application/json', "... is JSON";
ok $headers->{'transfer-encoding'}, '... not chunked';
isnt $headers->{'connection'}, 'close', '... keep';
is $body, q({"message":"O hai 4"}), "... correct de-chunked body";
$cv->end;
undef $h2;
};
$cv->recv;
}


pass "all done app 2";
Loading

0 comments on commit 580ca9f

Please sign in to comment.