diff --git a/README.md b/README.md index 5088136..aea0203 100644 --- a/README.md +++ b/README.md @@ -55,10 +55,12 @@ class Mrloop callable $callback, ): void public tcpServer(int $port, callable $callback): void + public writev(int $fd, string $message): void public static parseHttpRequest(string $request, int $headerlimit = 100): iterable public static parseHttpResponse(string $response, int $headerlimit = 100): iterable public addTimer(float $interval, callable $callback): void public addPeriodicTimer(float $interval, callable $callback): void + public futureTick(callable $callback): void public addSignal(int $signal, callable $callback): void public run(): void public stop(): void @@ -69,10 +71,12 @@ class Mrloop - [`Mrloop::addReadStream`](#mrloopaddreadstream) - [`Mrloop::addWriteStream`](#mrloopaddwritestream) - [`Mrloop::tcpServer`](#mrlooptcpserver) +- [`Mrloop::writev`](#mrloopwritev) - [`Mrloop::parseHttpRequest`](#mrloopparsehttprequest) - [`Mrloop::parseHttpResponse`](#mrloopparsehttpresponse) - [`Mrloop::addTimer`](#mrloopaddtimer) - [`Mrloop::addPeriodicTimer`](#mrloopaddperiodictimer) +- [`Mrloop::futureTick`](#mrloopfuturetick) - [`Mrloop::addSignal`](#mrloopaddsignal) - [`Mrloop::run`](#mrlooprun) - [`Mrloop::stop`](#mrloopstop) @@ -242,6 +246,7 @@ Instantiates a simple TCP server. - **client** (iterable) - An array containing client socket information. - **client_addr** (string) - The client IP address. - **client_port** (integer) - The client socket port. + - **client_fd** (integer) - The client socket file descriptor. **Return value(s)** @@ -282,6 +287,60 @@ The example above will produce output similar to that in the snippet to follow. ``` +### `Mrloop::writev` + +```php +public Mrloop::writev(int $fd, string $contents): void +``` + +Performs vectorized non-blocking write operation on a specified file descriptor. + +**Parameter(s)** + +- **fd** (integer) - The file descriptor to write to. +- **contents** (string) - The arbitrary contents to write. + +**Return value(s)** + +The parser will throw an exception in the event that an invalid file descriptor is encountered and will not return anything otherwise. + +```php +use ringphp\Mrloop; + +$loop = Mrloop::init(); + +$loop->tcpServer( + 8080, + function (string $message, iterable $client) use ($loop) { + [ + 'client_addr' => $addr, + 'client_port' => $port, + 'client_fd' => $fd, + ] = $client; + + $loop->writev( + $fd, + \sprintf( + "Hello, %s:%d\r\n", + $addr, + $port, + ), + ); + }, +); + +echo "Listening on port 8080\n"; + +$loop->run(); +``` + +The example above will produce output similar to that in the snippet to follow. + +``` +Listening on port 8080 + +``` + ### `Mrloop::parseHttpRequest` ```php @@ -584,6 +643,50 @@ Tick: 4 Tick: 5 ``` +### `Mrloop::futureTick` + +```php +public Mrloop::futureTick(callable $callback): void +``` + +Schedules the execution of a specified action for the next event loop tick. + +**Parameter(s)** + +- **callback** (callable) - The function in which the action to be scheduled is defined. + +**Return value(s)** + +The function does not return anything. + +```php +use ringphp\Mrloop; + +$loop = Mrloop::init(); +$tick = 0; + +$loop->futureTick( + function () use (&$tick) { + echo \sprintf("Tick: %d\n", ++$tick); + }, +); + +$loop->futureTick( + function () use (&$tick) { + echo \sprintf("Tick: %d\n", ++$tick); + }, +); + +$loop->run(); +``` + +The example above will produce output similar to that in the snippet to follow. + +``` +Tick: 1 +Tick: 2 +``` + ### `Mrloop::addSignal` ```php diff --git a/mrloop_arginfo.h b/mrloop_arginfo.h index 8c3dad8..9757a86 100644 --- a/mrloop_arginfo.h +++ b/mrloop_arginfo.h @@ -50,6 +50,15 @@ ZEND_ARG_TYPE_INFO(0, contents, IS_STRING, 0) ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Mrloop_writev, 0, 0, 2) +ZEND_ARG_TYPE_INFO(0, fd, IS_LONG, 0) +ZEND_ARG_TYPE_INFO(0, contents, IS_STRING, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_class_Mrloop_futureTick, 0, 0, 1) +ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0) +ZEND_END_ARG_INFO() + ZEND_METHOD(Mrloop, init); ZEND_METHOD(Mrloop, stop); ZEND_METHOD(Mrloop, run); @@ -61,6 +70,8 @@ ZEND_METHOD(Mrloop, addSignal); ZEND_METHOD(Mrloop, addReadStream); ZEND_METHOD(Mrloop, addWriteStream); ZEND_METHOD(Mrloop, parseHttpResponse); +ZEND_METHOD(Mrloop, writev); +ZEND_METHOD(Mrloop, futureTick); static const zend_function_entry class_Mrloop_methods[] = { PHP_ME(Mrloop, init, arginfo_class_Mrloop_init, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) @@ -74,4 +85,6 @@ static const zend_function_entry class_Mrloop_methods[] = { PHP_ME(Mrloop, addReadStream, arginfo_class_Mrloop_addReadStream, ZEND_ACC_PUBLIC) PHP_ME(Mrloop, addWriteStream, arginfo_class_Mrloop_addWriteStream, ZEND_ACC_PUBLIC) PHP_ME(Mrloop, parseHttpResponse, arginfo_class_Mrloop_parseHttpResponse, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) - PHP_FE_END}; + PHP_ME(Mrloop, writev, arginfo_class_Mrloop_writev, ZEND_ACC_PUBLIC) + PHP_ME(Mrloop, futureTick, arginfo_class_Mrloop_futureTick, ZEND_ACC_PUBLIC) + PHP_FE_END}; diff --git a/php_mrloop.c b/php_mrloop.c index a633e99..eb66f28 100644 --- a/php_mrloop.c +++ b/php_mrloop.c @@ -84,6 +84,20 @@ PHP_METHOD(Mrloop, addWriteStream) } /* }}} */ +/* {{{ proto void Mrloop::writev( int fd [, string contents ] ) */ +PHP_METHOD(Mrloop, writev) +{ + php_mrloop_writev(INTERNAL_FUNCTION_PARAM_PASSTHRU); +} +/* }}} */ + +/* {{{ proto void Mrloop::futureTick( callable callback ) */ +PHP_METHOD(Mrloop, futureTick) +{ + php_mrloop_add_future_tick(INTERNAL_FUNCTION_PARAM_PASSTHRU); +} +/* }}} */ + /* {{{ PHP_MINIT_FUNCTION */ PHP_MINIT_FUNCTION(mrloop) { diff --git a/src/loop.c b/src/loop.c index f14d66f..97f2a21 100644 --- a/src/loop.c +++ b/src/loop.c @@ -102,7 +102,7 @@ static int php_mrloop_timer_cb(void *data) zval_ptr_dtor(&result); efree(cb); - return type == PHP_MRLOOP_TIMER ? 0 : 1; + return type == PHP_MRLOOP_TIMER || type == PHP_MRLOOP_FUTURE_TICK ? 0 : 1; } static void php_mrloop_add_timer(INTERNAL_FUNCTION_PARAMETERS) { @@ -162,6 +162,33 @@ static void php_mrloop_add_periodic_timer(INTERNAL_FUNCTION_PARAMETERS) return; } +static void php_mrloop_add_future_tick(INTERNAL_FUNCTION_PARAMETERS) +{ + zval *obj; + php_mrloop_cb_t *cb; + php_mrloop_t *this; + zend_fcall_info fci; + zend_fcall_info_cache fci_cache; + + fci = empty_fcall_info; + fci_cache = empty_fcall_info_cache; + obj = getThis(); + + ZEND_PARSE_PARAMETERS_START(1, 1) + Z_PARAM_FUNC(fci, fci_cache) + ZEND_PARSE_PARAMETERS_END(); + + this = PHP_MRLOOP_OBJ(obj); + cb = emalloc(sizeof(php_mrloop_cb_t)); + PHP_CB_TO_MRLOOP_CB(cb, fci, fci_cache); + + cb->signal = PHP_MRLOOP_FUTURE_TICK; + cb->data = this->loop; + + mr_call_soon(this->loop, php_mrloop_timer_cb, cb); + + return; +} static void php_mrloop_readv_cb(void *data, int res) { @@ -261,13 +288,13 @@ static int php_mrloop_tcp_server_recv(void *conn, int fd, ssize_t nbytes, char * array_init(&args[1]); add_assoc_string(&args[1], "client_addr", (char *)client->addr); add_assoc_long(&args[1], "client_port", client->port); + add_assoc_long(&args[1], "client_fd", client->fd); MRLOOP_G(tcp_cb)->fci.retval = &result; MRLOOP_G(tcp_cb)->fci.param_count = 2; MRLOOP_G(tcp_cb)->fci.params = args; - if (zend_call_function(&MRLOOP_G(tcp_cb)->fci, &MRLOOP_G(tcp_cb)->fci_cache) == FAILURE || - Z_TYPE(result) != IS_STRING) + if (zend_call_function(&MRLOOP_G(tcp_cb)->fci, &MRLOOP_G(tcp_cb)->fci_cache) == FAILURE) { PHP_MRLOOP_THROW("There is an error in your callback"); zval_ptr_dtor(&result); @@ -275,11 +302,15 @@ static int php_mrloop_tcp_server_recv(void *conn, int fd, ssize_t nbytes, char * return 1; } - client->iov.iov_base = Z_STRVAL(result); - client->iov.iov_len = Z_STRLEN(result); + if (Z_TYPE(result) == IS_STRING) + { + client->iov.iov_base = Z_STRVAL(result); + client->iov.iov_len = Z_STRLEN(result); + + mr_writev(loop, client->fd, &(client->iov), 1); + mr_flush(loop); + } - mr_writev(loop, client->fd, &(client->iov), 1); - mr_flush(loop); zval_ptr_dtor(&result); return 1; @@ -629,3 +660,36 @@ static void php_mrloop_add_write_stream(INTERNAL_FUNCTION_PARAMETERS) return; } +static void php_mrloop_writev(INTERNAL_FUNCTION_PARAMETERS) +{ + zend_long fd; + zend_string *contents; + php_iovec_t iov; + php_mrloop_t *this; + zval *obj; + size_t nbytes; + + obj = getThis(); + + ZEND_PARSE_PARAMETERS_START(2, 2) + Z_PARAM_LONG(fd) + Z_PARAM_STR(contents) + ZEND_PARSE_PARAMETERS_END(); + + this = PHP_MRLOOP_OBJ(obj); + + if (fcntl((int)fd, F_GETFD) < 0) + { + PHP_MRLOOP_THROW("Detected invalid file descriptor"); + mr_stop(this->loop); + + return; + } + + nbytes = ZSTR_LEN(contents); + iov.iov_base = ZSTR_VAL(contents); + iov.iov_len = nbytes; + + mr_writev(this->loop, fd, &iov, 1); + mr_flush(this->loop); +} diff --git a/src/loop.h b/src/loop.h index b022978..426829b 100644 --- a/src/loop.h +++ b/src/loop.h @@ -28,6 +28,7 @@ #define DEFAULT_HTTP_HEADER_LIMIT 100 #define PHP_MRLOOP_TIMER 1 #define PHP_MRLOOP_PERIODIC_TIMER 2 +#define PHP_MRLOOP_FUTURE_TICK 3 struct php_mrloop_t; struct php_mrloop_cb_t; @@ -130,6 +131,8 @@ static int php_mrloop_timer_cb(void *data); static void php_mrloop_add_timer(INTERNAL_FUNCTION_PARAMETERS); /* executes a specified action in perpetuity with each successive execution occurring after a specified time interval */ static void php_mrloop_add_periodic_timer(INTERNAL_FUNCTION_PARAMETERS); +/* schedules the execution of a specified action for the next event loop tick */ +static void php_mrloop_add_future_tick(INTERNAL_FUNCTION_PARAMETERS); /* mrloop-bound callback specified during invocation of vectorized read function */ static void php_mrloop_readv_cb(void *data, int res); @@ -156,6 +159,8 @@ static void php_mrloop_add_signal(INTERNAL_FUNCTION_PARAMETERS); static void php_mrloop_add_read_stream(INTERNAL_FUNCTION_PARAMETERS); /* funnels file descriptor in writable stream into event loop and thence executes a non-blocking write operation */ static void php_mrloop_add_write_stream(INTERNAL_FUNCTION_PARAMETERS); +/* performs vectorized non-blocking write operation on a specified file descriptor */ +static void php_mrloop_writev(INTERNAL_FUNCTION_PARAMETERS); zend_class_entry *php_mrloop_ce, *php_mrloop_exception_ce; diff --git a/tests/011.phpt b/tests/011.phpt new file mode 100644 index 0000000..85d15aa --- /dev/null +++ b/tests/011.phpt @@ -0,0 +1,17 @@ +--TEST-- +writev() performs vectorized non-blocking write operation on file descriptor +--FILE-- +writev(1, "Hello, user"); +$loop->stop(); + +$loop->run(); + +?> +--EXPECT-- +Hello, user diff --git a/tests/012.phpt b/tests/012.phpt new file mode 100644 index 0000000..a7be3ba --- /dev/null +++ b/tests/012.phpt @@ -0,0 +1,21 @@ +--TEST-- +writev() throws exception on detection of invalid file descriptor +--FILE-- +writev(987874, "Hello, user"); +} catch (\Throwable $err) { + $loop->writev(1, $err->getMessage()); +} +$loop->stop(); + +$loop->run(); + +?> +--EXPECT-- +Detected invalid file descriptor diff --git a/tests/013.phpt b/tests/013.phpt new file mode 100644 index 0000000..4daca84 --- /dev/null +++ b/tests/013.phpt @@ -0,0 +1,30 @@ +--TEST-- +futureTick() schedules the execution of a specified action for the next event loop tick +--FILE-- +futureTick( + function () use (&$tick) { + echo \sprintf("Tick: %d\n", ++$tick); + }, +); + +$loop->futureTick( + function () use ($loop, &$tick) { + echo \sprintf("Tick: %d\n", ++$tick); + $loop->stop(); + }, +); + +$loop->run(); + +?> +--EXPECT-- +Tick: 1 +Tick: 2