diff --git a/src/Std/Internal/Async.lean b/src/Std/Internal/Async.lean index 8d8e7db3e919..88fedb0938ac 100644 --- a/src/Std/Internal/Async.lean +++ b/src/Std/Internal/Async.lean @@ -6,3 +6,4 @@ Authors: Henrik Böving prelude import Std.Internal.Async.Basic import Std.Internal.Async.Timer +import Std.Internal.Async.TCP diff --git a/src/Std/Internal/Async/TCP.lean b/src/Std/Internal/Async/TCP.lean new file mode 100644 index 000000000000..bcf75bbf55cc --- /dev/null +++ b/src/Std/Internal/Async/TCP.lean @@ -0,0 +1,175 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +prelude +import Std.Time +import Std.Internal.UV +import Std.Internal.Async.Basic +import Std.Net.Addr + +namespace Std +namespace Internal +namespace IO +namespace Async +namespace TCP + +open Std.Net + +namespace Socket + +/-- +Represents a TCP server socket, managing incoming client connections. +-/ +structure Server where + private ofNative :: + native : Internal.UV.TCP.Socket + +/-- +Represents a TCP client socket, used to connect to a server. +-/ +structure Client where + private ofNative :: + native : Internal.UV.TCP.Socket + +namespace Server + +/-- +Creates a new TCP server socket. +-/ +@[inline] +def mk : IO Server := do + let native ← Internal.UV.TCP.Socket.new + return Server.ofNative native + +/-- +Binds the server socket to the specified address. Address reuse is enabled to allow rebinding the +same address. +-/ +@[inline] +def bind (s : Server) (addr : SocketAddress) : IO Unit := + s.native.bind addr + +/-- +Listens for incoming connections with the given backlog. +-/ +@[inline] +def listen (s : Server) (backlog : UInt32) : IO Unit := + s.native.listen backlog + +/-- +Accepts an incoming connection. +-/ +@[inline] +def accept (s : Server) : IO (AsyncTask Client) := do + let conn ← s.native.accept + return conn.result!.map (·.map Client.ofNative) + +/-- +Gets the local address of the server socket. +-/ +@[inline] +def getSockName (s : Server) : IO SocketAddress := + s.native.getSockName + +/-- +Enables the Nagle algorithm for all client sockets accepted by this server socket. +-/ +@[inline] +def noDelay (s : Server) : IO Unit := + s.native.noDelay + +/-- +Enables TCP keep-alive for all client sockets accepted by this server socket. +-/ +@[inline] +def keepAlive (s : Server) (enable : Bool) (delay : Std.Time.Second.Offset) (_ : delay.val ≥ 1 := by decide) : IO Unit := + s.native.keepAlive enable.toInt8 delay.val.toNat.toUInt32 + +end Server + +namespace Client + +/-- +Creates a new TCP client socket. +-/ +@[inline] +def mk : IO Client := do + let native ← Internal.UV.TCP.Socket.new + return Client.ofNative native + +/-- +Binds the server socket to the specified address. Address reuse is enabled to allow rebinding the +same address. +-/ +@[inline] +def bind (s : Client) (addr : SocketAddress) : IO Unit := + s.native.bind addr + +/-- +Connects the client socket to the given address. +-/ +@[inline] +def connect (s : Client) (addr : SocketAddress) : IO (AsyncTask Unit) := + AsyncTask.ofPromise <$> s.native.connect addr + +/-- +Sends data through the client socket. +-/ +@[inline] +def send (s : Client) (data : ByteArray) : IO (AsyncTask Unit) := + AsyncTask.ofPromise <$> s.native.send data + +/-- +Receives data from the client socket. If data is received, it’s wrapped in .some. If EOF is reached, +the result is .none, indicating no more data is available. Receiving data in parallel on the same +socket is not supported. Instead, we recommend binding multiple sockets to the same address. +-/ +@[inline] +def recv? (s : Client) (size : UInt64) : IO (AsyncTask (Option ByteArray)) := + AsyncTask.ofPromise <$> s.native.recv? size + +/-- +Shuts down the write side of the client socket. +-/ +@[inline] +def shutdown (s : Client) : IO (AsyncTask Unit) := + AsyncTask.ofPromise <$> s.native.shutdown + +/-- +Gets the remote address of the client socket. +-/ +@[inline] +def getPeerName (s : Client) : IO SocketAddress := + s.native.getPeerName + +/-- +Gets the local address of the client socket. +-/ +@[inline] +def getSockName (s : Client) : IO SocketAddress := + s.native.getSockName + +/-- +Enables the Nagle algorithm for the client socket. +-/ +@[inline] +def noDelay (s : Client) : IO Unit := + s.native.noDelay + +/-- +Enables TCP keep-alive with a specified delay for the client socket. +-/ +@[inline] +def keepAlive (s : Client) (enable : Bool) (delay : Std.Time.Second.Offset) (_ : delay.val ≥ 0 := by decide) : IO Unit := + s.native.keepAlive enable.toInt8 delay.val.toNat.toUInt32 + +end Client + +end Socket +end TCP +end Async +end IO +end Internal +end Std diff --git a/src/Std/Internal/UV.lean b/src/Std/Internal/UV.lean index 8b0fde3f40d3..2b4782020066 100644 --- a/src/Std/Internal/UV.lean +++ b/src/Std/Internal/UV.lean @@ -9,3 +9,4 @@ import Init.System.Promise import Std.Internal.UV.Loop import Std.Internal.UV.Timer +import Std.Internal.UV.TCP diff --git a/src/Std/Internal/UV/TCP.lean b/src/Std/Internal/UV/TCP.lean new file mode 100644 index 000000000000..a96b27229687 --- /dev/null +++ b/src/Std/Internal/UV/TCP.lean @@ -0,0 +1,109 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Henrik Böving, Sofia Rodrigues +-/ +prelude +import Init.System.IO +import Init.System.Promise +import Init.Data.SInt +import Std.Net + +namespace Std +namespace Internal +namespace UV +namespace TCP + +open Std.Net + +private opaque SocketImpl : NonemptyType.{0} + +/-- +Represents a TCP socket. +-/ +def Socket : Type := SocketImpl.type + +instance : Nonempty Socket := SocketImpl.property + +namespace Socket + +/-- +Creates a new TCP socket. +-/ +@[extern "lean_uv_tcp_new"] +opaque new : IO Socket + +/-- +Connects a TCP socket to the specified address. +-/ +@[extern "lean_uv_tcp_connect"] +opaque connect (socket : @& Socket) (addr : SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) + +/-- +Sends data through a TCP socket. +-/ +@[extern "lean_uv_tcp_send"] +opaque send (socket : @& Socket) (data : ByteArray) : IO (IO.Promise (Except IO.Error Unit)) + +/-- +Receives data from a TCP socket with a maximum size of size bytes. The promise resolves when data is +available or an error occurs. If data is received, it’s wrapped in .some. If EOF is reached, the +result is .none, indicating no more data is available. Receiving data in parallel on the same +socket is not supported. Instead, we recommend binding multiple sockets to the same address. +-/ +@[extern "lean_uv_tcp_recv"] +opaque recv? (socket : @& Socket) (size : UInt64) : IO (IO.Promise (Except IO.Error (Option ByteArray))) + +/-- +Binds a TCP socket to a specific address. +-/ +@[extern "lean_uv_tcp_bind"] +opaque bind (socket : @& Socket) (addr : SocketAddress) : IO Unit + +/-- +Starts listening for incoming connections on a TCP socket. +-/ +@[extern "lean_uv_tcp_listen"] +opaque listen (socket : @& Socket) (backlog : UInt32) : IO Unit + +/-- +Accepts an incoming connection on a listening TCP socket. +-/ +@[extern "lean_uv_tcp_accept"] +opaque accept (socket : @& Socket) : IO (IO.Promise (Except IO.Error Socket)) + +/-- +Shuts down an incoming connection on a listening TCP socket. +-/ +@[extern "lean_uv_tcp_shutdown"] +opaque shutdown (socket : @& Socket) : IO (IO.Promise (Except IO.Error Unit)) + +/-- +Gets the remote address of a connected TCP socket. +-/ +@[extern "lean_uv_tcp_getpeername"] +opaque getPeerName (socket : @& Socket) : IO SocketAddress + +/-- +Gets the local address of a bound TCP socket. +-/ +@[extern "lean_uv_tcp_getsockname"] +opaque getSockName (socket : @& Socket) : IO SocketAddress + +/-- +Enables the Nagle algorithm for a TCP socket. +-/ +@[extern "lean_uv_tcp_nodelay"] +opaque noDelay (socket : @& Socket) : IO Unit + +/-- +Enables TCP keep-alive for a socket. If delay is less than 1 then UV_EINVAL is returned. +-/ +@[extern "lean_uv_tcp_keepalive"] +opaque keepAlive (socket : @& Socket) (enable : Int8) (delay : UInt32) : IO Unit + +end Socket +end TCP +end UV +end Internal +end Std diff --git a/src/include/lean/lean.h b/src/include/lean/lean.h index 4b5b950d60b8..84f25fb86b17 100644 --- a/src/include/lean/lean.h +++ b/src/include/lean/lean.h @@ -2718,6 +2718,13 @@ static inline bool lean_io_result_is_ok(b_lean_obj_arg r) { return lean_ptr_tag( static inline bool lean_io_result_is_error(b_lean_obj_arg r) { return lean_ptr_tag(r) == 1; } static inline b_lean_obj_res lean_io_result_get_value(b_lean_obj_arg r) { assert(lean_io_result_is_ok(r)); return lean_ctor_get(r, 0); } static inline b_lean_obj_res lean_io_result_get_error(b_lean_obj_arg r) { assert(lean_io_result_is_error(r)); return lean_ctor_get(r, 0); } +static inline lean_obj_res lean_io_result_take_value(lean_obj_arg r) { + assert(lean_io_result_is_ok(r)); + lean_object* v = lean_ctor_get(r, 0); + lean_inc(v); + lean_dec(r); + return v; +} LEAN_EXPORT void lean_io_result_show_error(b_lean_obj_arg r); LEAN_EXPORT void lean_io_mark_end_initialization(void); static inline lean_obj_res lean_io_result_mk_ok(lean_obj_arg a) { diff --git a/src/runtime/CMakeLists.txt b/src/runtime/CMakeLists.txt index fc1e41f41693..27f34defa942 100644 --- a/src/runtime/CMakeLists.txt +++ b/src/runtime/CMakeLists.txt @@ -3,7 +3,7 @@ object.cpp apply.cpp exception.cpp interrupt.cpp memory.cpp stackinfo.cpp compact.cpp init_module.cpp load_dynlib.cpp io.cpp hash.cpp platform.cpp alloc.cpp allocprof.cpp sharecommon.cpp stack_overflow.cpp process.cpp object_ref.cpp mpn.cpp mutex.cpp libuv.cpp uv/net_addr.cpp uv/event_loop.cpp -uv/timer.cpp) +uv/timer.cpp uv/tcp.cpp) add_library(leanrt_initial-exec STATIC ${RUNTIME_OBJS}) set_target_properties(leanrt_initial-exec PROPERTIES ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/src/runtime/libuv.cpp b/src/runtime/libuv.cpp index f625c115950f..58fffa400640 100644 --- a/src/runtime/libuv.cpp +++ b/src/runtime/libuv.cpp @@ -15,6 +15,7 @@ namespace lean { extern "C" void initialize_libuv() { initialize_libuv_timer(); + initialize_libuv_tcp_socket(); initialize_libuv_loop(); lthread([]() { event_loop_run_loop(&global_ev); }); diff --git a/src/runtime/libuv.h b/src/runtime/libuv.h index ce974f37e83d..f158ce024c07 100644 --- a/src/runtime/libuv.h +++ b/src/runtime/libuv.h @@ -8,6 +8,7 @@ Author: Markus Himmel, Sofia Rodrigues #include #include "runtime/uv/event_loop.h" #include "runtime/uv/timer.h" +#include "runtime/uv/tcp.h" #include "runtime/alloc.h" #include "runtime/io.h" #include "runtime/utf8.h" diff --git a/src/runtime/object.cpp b/src/runtime/object.cpp index ac1f1478fb37..f8585108cbaa 100644 --- a/src/runtime/object.cpp +++ b/src/runtime/object.cpp @@ -1150,7 +1150,7 @@ extern "C" LEAN_EXPORT b_obj_res lean_io_wait_any_core(b_obj_arg task_list) { return g_task_manager->wait_any(task_list); } -extern "C" LEAN_EXPORT obj_res lean_io_promise_new(obj_arg) { +obj_res lean_promise_new() { lean_always_assert(g_task_manager); bool keep_alive = false; @@ -1165,11 +1165,20 @@ extern "C" LEAN_EXPORT obj_res lean_io_promise_new(obj_arg) { lean_set_st_header((lean_object *)o, LeanPromise, 0); o->m_result = t; // the promise takes ownership of one task token - return io_result_mk_ok((lean_object *) o); + return (lean_object *) o; } -extern "C" LEAN_EXPORT obj_res lean_io_promise_resolve(obj_arg value, b_obj_arg promise, obj_arg) { +void lean_promise_resolve(obj_arg value, b_obj_arg promise) { g_task_manager->resolve(lean_to_promise(promise)->m_result, mk_option_some(value)); +} + +extern "C" LEAN_EXPORT obj_res lean_io_promise_new(obj_arg) { + lean_object * o = lean_promise_new(); + return io_result_mk_ok(o); +} + +extern "C" LEAN_EXPORT obj_res lean_io_promise_resolve(obj_arg value, b_obj_arg promise, obj_arg) { + lean_promise_resolve(value, promise); return io_result_mk_ok(box(0)); } diff --git a/src/runtime/object.h b/src/runtime/object.h index 370d9131f697..577c92e2f437 100644 --- a/src/runtime/object.h +++ b/src/runtime/object.h @@ -302,6 +302,12 @@ inline void * external_data(object * o) { return lean_get_external_data(o); } inline obj_res mk_option_none() { return box(0); } inline obj_res mk_option_some(obj_arg v) { obj_res r = alloc_cnstr(1, 1, 0); cnstr_set(r, 0, v); return r; } +// ======================================= +// Except + +inline obj_res mk_except_ok(obj_arg v) { obj_res r = alloc_cnstr(1, 1, 0); cnstr_set(r, 0, v); return r; } +inline obj_res mk_except_err(obj_arg v) { obj_res r = alloc_cnstr(0, 1, 0); cnstr_set(r, 0, v); return r; } + // ======================================= // Natural numbers @@ -467,6 +473,8 @@ inline obj_res st_ref_set(b_obj_arg r, obj_arg v, obj_arg w) { return lean_st_re inline obj_res st_ref_reset(b_obj_arg r, obj_arg w) { return lean_st_ref_reset(r, w); } inline obj_res st_ref_swap(b_obj_arg r, obj_arg v, obj_arg w) { return lean_st_ref_swap(r, v, w); } +obj_res lean_promise_new(); +void lean_promise_resolve(obj_arg value, b_obj_arg promise); extern "C" LEAN_EXPORT obj_res lean_io_promise_new(obj_arg); extern "C" LEAN_EXPORT obj_res lean_io_promise_resolve(obj_arg value, b_obj_arg promise, obj_arg); diff --git a/src/runtime/uv/event_loop.cpp b/src/runtime/uv/event_loop.cpp index 1e20f1f93496..2614b086680d 100644 --- a/src/runtime/uv/event_loop.cpp +++ b/src/runtime/uv/event_loop.cpp @@ -23,6 +23,16 @@ using namespace std; event_loop_t global_ev; +// Helpers + +void lean_promise_resolve_with_code(int status, obj_arg promise) { + obj_arg res = status == 0 + ? mk_except_ok(lean_box(0)) + : mk_except_err(lean_decode_uv_error(status, nullptr)); + + lean_promise_resolve(res, promise); +} + // Utility function for error checking. This function is only used inside the // initializition of the event loop. static void check_uv(int result, const char * msg) { diff --git a/src/runtime/uv/event_loop.h b/src/runtime/uv/event_loop.h index 42dba397e3fa..75835d79aa60 100644 --- a/src/runtime/uv/event_loop.h +++ b/src/runtime/uv/event_loop.h @@ -44,4 +44,8 @@ void event_loop_run_loop(event_loop_t *event_loop); extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_configure(b_obj_arg options, obj_arg /* w */ ); extern "C" LEAN_EXPORT lean_obj_res lean_uv_event_loop_alive(obj_arg /* w */ ); +// Helpers + +void lean_promise_resolve_with_code(int status, obj_arg promise); + } diff --git a/src/runtime/uv/net_addr.cpp b/src/runtime/uv/net_addr.cpp index f06a078f6874..964299dd4692 100644 --- a/src/runtime/uv/net_addr.cpp +++ b/src/runtime/uv/net_addr.cpp @@ -28,6 +28,24 @@ void lean_ipv6_addr_to_in6_addr(b_obj_arg ipv6_addr, in6_addr* out) { } } +void lean_socket_address_to_sockaddr_storage(b_obj_arg ip_addr, sockaddr_storage* out) { + lean_object* socket_addr_obj = lean_ctor_get(ip_addr, 0); + lean_object* ip_addr_obj = lean_ctor_get(socket_addr_obj, 0); + uint16_t port_obj = lean_ctor_get_uint16(socket_addr_obj, sizeof(void*)*1); + + if (lean_ptr_tag(ip_addr) == 0) { + sockaddr_in* cast = (sockaddr_in*)out; + lean_ipv4_addr_to_in_addr(ip_addr_obj, &cast->sin_addr); + cast->sin_family = AF_INET; + cast->sin_port = htons(port_obj); + } else { + sockaddr_in6* cast = (sockaddr_in6*)out; + lean_ipv6_addr_to_in6_addr(ip_addr_obj, (in6_addr*)&cast->sin6_addr); + cast->sin6_family = AF_INET6; + cast->sin6_port = htons(port_obj); + } +} + lean_obj_res lean_in_addr_to_ipv4_addr(const in_addr* ipv4_addr) { obj_res ret = alloc_array(0, 4); uint32_t hostaddr = ntohl(ipv4_addr->s_addr); @@ -55,6 +73,42 @@ lean_obj_res lean_in6_addr_to_ipv6_addr(const in6_addr* ipv6_addr) { return ret; } +lean_obj_res lean_mk_socketaddress(lean_obj_res ip_addr, uint16_t port) { + lean_obj_res socket_addr = lean_alloc_ctor(0, 1, 2); + lean_ctor_set(socket_addr, 0, ip_addr); + lean_ctor_set_uint16(socket_addr, sizeof(void*)*1, port); + return socket_addr; +} + +lean_obj_res lean_sockaddr_to_socketaddress(const struct sockaddr* sockaddr) { + lean_object* part = nullptr; + int tag; + + if (sockaddr->sa_family == AF_INET) { + const struct sockaddr_in* addr_in = (const struct sockaddr_in*)sockaddr; + const in_addr* ipv4_addr = &addr_in->sin_addr; + lean_obj_res lean_ipv4 = lean_in_addr_to_ipv4_addr(ipv4_addr); + uint16_t port = ntohs(addr_in->sin_port); + part = lean_mk_socketaddress(lean_ipv4, port); + tag = 0; + } else if (sockaddr->sa_family == AF_INET6) { + const struct sockaddr_in6* addr_in6 = (const struct sockaddr_in6*)sockaddr; + const in6_addr* ipv6_addr = &addr_in6->sin6_addr; + lean_obj_res lean_ipv6 = lean_in6_addr_to_ipv6_addr(ipv6_addr); + uint16_t port = ntohs(addr_in6->sin6_port); + part = lean_mk_socketaddress(lean_ipv6, port); + tag = 1; + } else { + lean_unreachable(); + } + + lean_object* ctor = lean_alloc_ctor(tag, 1, 0); + lean_ctor_set(ctor, 0, part); + + return ctor; +} + + /* Std.Net.IPV4Addr.ofString (s : @&String) : Option IPV4Addr */ extern "C" LEAN_EXPORT lean_obj_res lean_uv_pton_v4(b_obj_arg str_obj) { const char* str = string_cstr(str_obj); diff --git a/src/runtime/uv/net_addr.h b/src/runtime/uv/net_addr.h index ceaea593679b..11bcb0b99919 100644 --- a/src/runtime/uv/net_addr.h +++ b/src/runtime/uv/net_addr.h @@ -14,8 +14,11 @@ namespace lean { void lean_ipv4_addr_to_in_addr(b_obj_arg ipv4_addr, struct in_addr* out); void lean_ipv6_addr_to_in6_addr(b_obj_arg ipv6_addr, struct in6_addr* out); +void lean_socket_address_to_sockaddr_storage(b_obj_arg ip_addr, struct sockaddr_storage* out); + lean_obj_res lean_in_addr_to_ipv4_addr(const struct in_addr* ipv4_addr); lean_obj_res lean_in6_addr_to_ipv6_addr(const struct in6_addr* ipv6_addr); +lean_obj_res lean_sockaddr_to_socketaddress(const struct sockaddr* sockaddr); #endif diff --git a/src/runtime/uv/tcp.cpp b/src/runtime/uv/tcp.cpp new file mode 100644 index 000000000000..c30f4c8755c1 --- /dev/null +++ b/src/runtime/uv/tcp.cpp @@ -0,0 +1,611 @@ +/* +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Author: Sofia Rodrigues +*/ + +#include "runtime/uv/tcp.h" +#include + +namespace lean { + +#ifndef LEAN_EMSCRIPTEN + +// Stores all the things needed to connect to a TCP socket. +typedef struct { + lean_object* promise; + lean_object* socket; +} tcp_connect_data; + +// Stores all the things needed to send data to a TCP socket. +typedef struct { + lean_object* promise; + lean_object* data; + lean_object* socket; +} tcp_send_data; + +// ======================================= +// TCP socket object manipulation functions. + +void lean_uv_tcp_socket_finalizer(void* ptr) { + lean_uv_tcp_socket_object* tcp_socket = (lean_uv_tcp_socket_object*)ptr; + + lean_always_assert(tcp_socket->m_promise_shutdown == nullptr); + lean_always_assert(tcp_socket->m_promise_accept == nullptr); + lean_always_assert(tcp_socket->m_promise_read == nullptr); + lean_always_assert(tcp_socket->m_byte_array == nullptr); + + /// It's changing here because the object is being freed in the finalizer, and we need the data + /// inside of it. + tcp_socket->m_uv_tcp->data = ptr; + + event_loop_lock(&global_ev); + + uv_close((uv_handle_t*)tcp_socket->m_uv_tcp, [](uv_handle_t* handle) { + lean_uv_tcp_socket_object* tcp_socket = (lean_uv_tcp_socket_object*)handle->data; + free(tcp_socket->m_uv_tcp); + free(tcp_socket); + }); + + event_loop_unlock(&global_ev); +} + +void initialize_libuv_tcp_socket() { + g_uv_tcp_socket_external_class = lean_register_external_class(lean_uv_tcp_socket_finalizer, [](void* obj, lean_object* f) { + lean_uv_tcp_socket_object* tcp_socket = (lean_uv_tcp_socket_object*)obj; + + if (tcp_socket->m_promise_accept != nullptr) { + lean_inc(f); + lean_apply_1(f, tcp_socket->m_promise_accept); + } + + if (tcp_socket->m_promise_shutdown != nullptr) { + lean_inc(f); + lean_apply_1(f, tcp_socket->m_promise_shutdown); + } + + if (tcp_socket->m_promise_read != nullptr) { + lean_inc(f); + lean_apply_1(f, tcp_socket->m_promise_read); + } + + if (tcp_socket->m_byte_array != nullptr) { + lean_inc(f); + lean_apply_1(f, tcp_socket->m_byte_array); + } + }); +} + +// ======================================= +// TCP Socket Operations + +/* Std.Internal.UV.TCP.Socket.new : IO Socket */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_new() { + lean_uv_tcp_socket_object* tcp_socket = (lean_uv_tcp_socket_object*)malloc(sizeof(lean_uv_tcp_socket_object)); + + tcp_socket->m_promise_accept = nullptr; + tcp_socket->m_promise_shutdown = nullptr; + tcp_socket->m_promise_read = nullptr; + tcp_socket->m_byte_array = nullptr; + tcp_socket->m_client = nullptr; + + uv_tcp_t* uv_tcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + + event_loop_lock(&global_ev); + int result = uv_tcp_init(global_ev.loop, uv_tcp); + event_loop_unlock(&global_ev); + + if (result != 0) { + free(uv_tcp); + free(tcp_socket); + + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + tcp_socket->m_uv_tcp = uv_tcp; + + lean_object* obj = lean_uv_tcp_socket_new(tcp_socket); + lean_mark_mt(obj); + + tcp_socket->m_uv_tcp->data = obj; + + return lean_io_result_mk_ok(obj); +} + +/* Std.Internal.UV.TCP.Socket.connect (socket : @& Socket) (addr : SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_connect(b_obj_arg socket, obj_arg addr) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); + + sockaddr_storage addr_ptr; + lean_socket_address_to_sockaddr_storage(addr, &addr_ptr); + + lean_object* promise = lean_promise_new(); + mark_mt(promise); + + uv_connect_t* uv_connect = (uv_connect_t*)malloc(sizeof(uv_connect_t)); + tcp_connect_data* connect_data = (tcp_connect_data*)malloc(sizeof(tcp_connect_data)); + + connect_data->promise = promise; + connect_data->socket = socket; + uv_connect->data = connect_data; + + // The event loop owns the socket. + lean_inc(socket); + lean_inc(promise); + + event_loop_lock(&global_ev); + + int result = uv_tcp_connect(uv_connect, tcp_socket->m_uv_tcp, (sockaddr*)&addr_ptr, [](uv_connect_t* req, int status) { + tcp_connect_data* tup = (tcp_connect_data*) req->data; + lean_promise_resolve_with_code(status, tup->promise); + + // The event loop does not own the object anymore. + lean_dec(tup->socket); + lean_dec(tup->promise); + + free(req->data); + free(req); + }); + + event_loop_unlock(&global_ev); + + if (result < 0) { + lean_dec(promise); // The structure does not own it. + lean_dec(promise); // We are not going to return it. + lean_dec(socket); + + free(uv_connect->data); + free(uv_connect); + + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(promise); +} + +/* Std.Internal.UV.TCP.Socket.send (socket : @& Socket) (data : ByteArray) : IO (IO.Promise (Except IO.Error Unit)) */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); + + size_t data_len = lean_sarray_size(data); + char* data_str = (char*)lean_sarray_cptr(data); + + uv_buf_t buf = uv_buf_init(data_str, data_len); + + lean_object* promise = lean_promise_new(); + mark_mt(promise); + + uv_write_t* write_uv = (uv_write_t*)malloc(sizeof(uv_write_t)); + write_uv->data = (tcp_send_data*)malloc(sizeof(tcp_send_data)); + + tcp_send_data* send_data = (tcp_send_data*)write_uv->data; + send_data->promise = promise; + send_data->data = data; + send_data->socket = socket; + + // These objects are going to enter the loop and be owned by it + lean_inc(promise); + lean_inc(socket); + + event_loop_lock(&global_ev); + + int result = uv_write(write_uv, (uv_stream_t*)tcp_socket->m_uv_tcp, &buf, 1, [](uv_write_t* req, int status) { + tcp_send_data* tup = (tcp_send_data*) req->data; + + lean_promise_resolve_with_code(status, tup->promise); + + lean_dec(tup->promise); + lean_dec(tup->data); + lean_dec(tup->socket); + + free(req->data); + free(req); + }); + + event_loop_unlock(&global_ev); + + if (result < 0) { + lean_dec(promise); // The structure does not own it. + lean_dec(promise); // We are not going to return it. + lean_dec(socket); + + lean_dec(data); + + free(write_uv->data); + free(write_uv); + + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(promise); +} + +/* Std.Internal.UV.TCP.Socket.recv? (socket : @& Socket) (size : UInt64) : IO (IO.Promise (Except IO.Error (Option ByteArray))) */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_recv(b_obj_arg socket, uint64_t buffer_size) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); + + // Locking early prevents potential paralellism issues setting the byte_array. + event_loop_lock(&global_ev); + + if (tcp_socket->m_byte_array != nullptr) { + event_loop_unlock(&global_ev); + return lean_io_result_mk_error(lean_decode_uv_error(UV_EALREADY, nullptr)); + } + + tcp_socket->m_byte_array = lean_alloc_sarray(1, 0, buffer_size); + + lean_object* promise = lean_promise_new(); + mark_mt(promise); + + tcp_socket->m_promise_read = promise; + + // The event loop owns the socket. + lean_inc(socket); + lean_inc(promise); + + int result = uv_read_start((uv_stream_t*)tcp_socket->m_uv_tcp, [](uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket((lean_object*)handle->data); + + buf->base = (char*)lean_sarray_cptr(tcp_socket->m_byte_array); + buf->len = lean_sarray_capacity(tcp_socket->m_byte_array); + }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { + uv_read_stop(stream); + + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket((lean_object*)stream->data); + lean_object* promise = tcp_socket->m_promise_read; + lean_object* byte_array = tcp_socket->m_byte_array; + + tcp_socket->m_promise_read = nullptr; + tcp_socket->m_byte_array = nullptr; + + if (nread >= 0) { + lean_sarray_set_size(byte_array, nread); + lean_promise_resolve(mk_except_ok(lean::mk_option_some(byte_array)), promise); + } else if (nread == UV_EOF) { + lean_dec(byte_array); + lean_promise_resolve(mk_except_ok(lean::mk_option_none()), promise); + } else if (nread < 0) { + lean_dec(byte_array); + lean_promise_resolve(mk_except_err(lean_decode_uv_error(nread, nullptr)), promise); + } + + lean_dec(promise); + + // The event loop does not own the object anymore. + lean_dec((lean_object*)stream->data); + }); + + if (result < 0) { + tcp_socket->m_promise_read = nullptr; + event_loop_unlock(&global_ev); + + lean_dec(promise); // The structure does not own it. + lean_dec(promise); // We are not going to return it. + + lean_dec(socket); + + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + event_loop_unlock(&global_ev); + + return lean_io_result_mk_ok(promise); +} + +/* Std.Internal.UV.TCP.Socket.bind (socket : @& Socket) (addr : SocketAddress) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_bind(b_obj_arg socket, obj_arg addr) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); + + sockaddr_storage addr_ptr; + lean_socket_address_to_sockaddr_storage(addr, &addr_ptr); + + event_loop_lock(&global_ev); + int result = uv_tcp_bind(tcp_socket->m_uv_tcp, (sockaddr*)&addr_ptr, 0); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} + +/* Std.Internal.UV.TCP.Socket.listen (socket : @& Socket) (backlog : Int32) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_listen(b_obj_arg socket, int32_t backlog) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); + + event_loop_lock(&global_ev); + + int result = uv_listen((uv_stream_t*)tcp_socket->m_uv_tcp, backlog, [](uv_stream_t* stream, int status) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket((lean_object*)stream->data); + + if (tcp_socket->m_promise_accept == nullptr) { + return; + } + + lean_object* promise = tcp_socket->m_promise_accept; + + if (status < 0) { + lean_promise_resolve_with_code(status, promise); + lean_dec(promise); + tcp_socket->m_promise_accept = nullptr; + return; + } + + lean_object* client = tcp_socket->m_client; + lean_uv_tcp_socket_object* client_socket = lean_to_uv_tcp_socket(client); + + int result = uv_accept((uv_stream_t*)tcp_socket->m_uv_tcp, (uv_stream_t*)client_socket->m_uv_tcp); + + tcp_socket->m_promise_accept = nullptr; + tcp_socket->m_client = nullptr; + + if (result < 0) { + lean_dec(client); + lean_promise_resolve_with_code(result, promise); + lean_dec(promise); + return; + } + + lean_promise_resolve(mk_except_ok(client), promise); + lean_dec(promise); + + // The accept increases the count and then the listen decreases + lean_dec((lean_object*)stream->data); + }); + + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} + +/* Std.Internal.UV.TCP.Socket.accept (socket : @& Socket) : IO (IO.Promise (Except IO.Error Socket)) */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_accept(b_obj_arg socket) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); + + // Locking early prevents potential paralellism issues setting m_promise_accept. + event_loop_lock(&global_ev); + + if (tcp_socket->m_promise_accept != nullptr) { + return lean_io_result_mk_error(lean_decode_uv_error(UV_EALREADY, mk_string("parallel accept is not allowed! consider binding multiple sockets to the same address and accepting on them instead"))); + } + + lean_object* promise = lean_promise_new(); + mark_mt(promise); + + lean_object* client = lean_io_result_take_value(lean_uv_tcp_new()); + + lean_uv_tcp_socket_object* client_socket = lean_to_uv_tcp_socket(client); + + int result = uv_accept((uv_stream_t*)tcp_socket->m_uv_tcp, (uv_stream_t*)client_socket->m_uv_tcp); + + if (result < 0 && result != UV_EAGAIN) { + event_loop_unlock(&global_ev); + lean_dec(client); + lean_promise_resolve_with_code(result, promise); + } else if (result >= 0) { + event_loop_unlock(&global_ev); + lean_promise_resolve(mk_except_ok(client), promise); + } else { + // The event loop owns the object. It will be released in the listen + lean_inc(socket); + lean_inc(promise); + + tcp_socket->m_promise_accept = promise; + tcp_socket->m_client = client; + + event_loop_unlock(&global_ev); + } + + return lean_io_result_mk_ok(promise); +} + +/* Std.Internal.UV.TCP.Socket.shutdown (socket : @& Socket) : IO (IO.Promise (Except IO.Error Unit)) */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_shutdown(b_obj_arg socket) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); + + // Locking early prevents potential paralellism issues setting the m_promise_shutdown. + event_loop_lock(&global_ev); + + if (tcp_socket->m_promise_shutdown != nullptr) { + event_loop_unlock(&global_ev); + return lean_io_result_mk_error(lean_decode_uv_error(UV_EALREADY, mk_string("shutdown already in progress"))); + } + + lean_object* promise = lean_promise_new(); + mark_mt(promise); + + tcp_socket->m_promise_shutdown = promise; + lean_inc(promise); + + + uv_shutdown_t* shutdown_req = (uv_shutdown_t*)malloc(sizeof(uv_shutdown_t)); + shutdown_req->data = (void*)socket; + + lean_inc(socket); + + int result = uv_shutdown(shutdown_req, (uv_stream_t*)tcp_socket->m_uv_tcp, [](uv_shutdown_t* req, int status) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket((lean_object*)req->data); + + if (status < 0) { + lean_promise_resolve_with_code(status, tcp_socket->m_promise_shutdown); + } else { + lean_promise_resolve(mk_except_ok(lean_box(0)), tcp_socket->m_promise_shutdown); + } + + lean_dec(tcp_socket->m_promise_shutdown); + + tcp_socket->m_promise_shutdown = nullptr; + + lean_dec((lean_object*)req->data); + free(req); + }); + + + if (result < 0) { + free(shutdown_req); + lean_dec(tcp_socket->m_promise_shutdown); + tcp_socket->m_promise_shutdown = nullptr; + event_loop_unlock(&global_ev); + + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + event_loop_unlock(&global_ev); + + return lean_io_result_mk_ok(promise); +} + +/* Std.Internal.UV.TCP.Socket.getPeerName (socket : @& Socket) : IO SocketAddress */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_getpeername(b_obj_arg socket) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); + + sockaddr addr_storage; + int addr_len = sizeof(addr_storage); + + event_loop_lock(&global_ev); + int result = uv_tcp_getpeername(tcp_socket->m_uv_tcp, &addr_storage, &addr_len); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + lean_object* lean_addr = lean_sockaddr_to_socketaddress(&addr_storage); + + return lean_io_result_mk_ok(lean_addr); +} + +/* Std.Internal.UV.TCP.Socket.getSockName (socket : @& Socket) : IO SocketAddress */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_getsockname(b_obj_arg socket) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); + + struct sockaddr addr_storage; + int addr_len = sizeof(addr_storage); + + event_loop_lock(&global_ev); + int result = uv_tcp_getsockname(tcp_socket->m_uv_tcp, &addr_storage, &addr_len); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + lean_object* lean_addr = lean_sockaddr_to_socketaddress(&addr_storage); + return lean_io_result_mk_ok(lean_addr); +} + +/* Std.Internal.UV.TCP.Socket.noDelay (socket : @& Socket) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_nodelay(b_obj_arg socket) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); + + event_loop_lock(&global_ev); + int result = uv_tcp_nodelay(tcp_socket->m_uv_tcp, 1); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} + +/* Std.Internal.UV.TCP.Socket.keepAlive (socket : @& Socket) (enable : Int8) (delay : UInt32) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_keepalive(b_obj_arg socket, int32_t enable, uint32_t delay) { + lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); + + event_loop_lock(&global_ev); + int result = uv_tcp_keepalive(tcp_socket->m_uv_tcp, enable, delay); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} +#else + +// ======================================= +// TCP Socket Operations + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_shutdown(b_obj_arg socket) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_new() { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_connect(b_obj_arg socket, b_obj_arg addr) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, b_obj_arg data) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_recv(b_obj_arg socket) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_bind(b_obj_arg socket, b_obj_arg addr) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_listen(b_obj_arg socket, int32_t backlog) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_accept(b_obj_arg socket) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +// ======================================= +// TCP Socket Utility Functions + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_getpeername(b_obj_arg socket) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_getsockname(b_obj_arg socket) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_nodelay(b_obj_arg socket) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_keepalive(b_obj_arg socket, int8_t enable, uint32_t delay) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} +#endif +} diff --git a/src/runtime/uv/tcp.h b/src/runtime/uv/tcp.h new file mode 100644 index 000000000000..33327043b79a --- /dev/null +++ b/src/runtime/uv/tcp.h @@ -0,0 +1,57 @@ +/* +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Author: Sofia Rodrigues +*/ +#pragma once +#include +#include "runtime/uv/event_loop.h" +#include "runtime/uv/net_addr.h" +#include "runtime/object_ref.h" + +namespace lean { + +static lean_external_class* g_uv_tcp_socket_external_class = NULL; +void initialize_libuv_tcp_socket(); + +#ifndef LEAN_EMSCRIPTEN +#include + +// Structure for managing a single TCP socket object, including promise handling, +// connection state, and read/write buffers. +typedef struct { + uv_tcp_t* m_uv_tcp; // LibUV TCP handle. + lean_object* m_promise_accept; // The associated promise for asynchronous results for accepting new sockets. + lean_object* m_promise_read; // The associated promise for asynchronous results for reading from the socket. + lean_object* m_promise_shutdown; // The associated promise for asynchronous results to shutdown the socket. + lean_object* m_client; // Cached client that is going to be used in the next accept. + lean_object* m_byte_array; // Buffer for storing data received via `recv_start`. +} lean_uv_tcp_socket_object; + +// ======================================= +// Tcp socket object manipulation functions. +static inline lean_object* lean_uv_tcp_socket_new(lean_uv_tcp_socket_object* s) { return lean_alloc_external(g_uv_tcp_socket_external_class, s); } +static inline lean_uv_tcp_socket_object* lean_to_uv_tcp_socket(lean_object* o) { return (lean_uv_tcp_socket_object*)(lean_get_external_data(o)); } + +#endif + +// ======================================= +// TCP Socket Operations +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_new(); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_connect(b_obj_arg socket, b_obj_arg addr); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, b_obj_arg data); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_try_send(b_obj_arg socket, obj_arg data); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_recv(b_obj_arg socket, uint64_t buffer_size); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_bind(b_obj_arg socket, b_obj_arg addr); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_listen(b_obj_arg socket, int32_t backlog); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_accept(b_obj_arg socket); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_shutdown(b_obj_arg socket); + +// ======================================= +// TCP Socket Utility Functions +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_getpeername(b_obj_arg socket); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_getsockname(b_obj_arg socket); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_nodelay(b_obj_arg socket); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_keepalive(b_obj_arg socket, int32_t enable, uint32_t delay); + +} diff --git a/tests/lean/run/async_tcp_fname_errors.lean b/tests/lean/run/async_tcp_fname_errors.lean new file mode 100644 index 000000000000..68cb89d4f6e0 --- /dev/null +++ b/tests/lean/run/async_tcp_fname_errors.lean @@ -0,0 +1,82 @@ +import Std.Internal.Async +import Std.Internal.UV +import Std.Net.Addr + +open Std.Internal.IO.Async +open Std.Net + +-- Define the Async monad +structure Async (α : Type) where + run : IO (AsyncTask α) + +namespace Async + +-- Monad instance for Async +instance : Monad Async where + pure x := Async.mk (pure (AsyncTask.pure x)) + bind ma f := Async.mk do + let task ← ma.run + task.bindIO fun a => (f a).run + +-- Await function to simplify AsyncTask handling +def await (task : IO (AsyncTask α)) : Async α := + Async.mk task + +instance : MonadLift IO Async where + monadLift io := Async.mk (io >>= (pure ∘ AsyncTask.pure)) + +-------------------------------------------------------------- + +/-- Mike is another client. -/ +def runMike (client: TCP.Socket.Client) : Async Unit := do + let mes ← await (client.recv? 1024) + assert! String.fromUTF8!<$> mes == some "hi mike!! :)" + await (client.send (String.toUTF8 "hello robert!!")) + await (client.shutdown) + +/-- Joe is another client. -/ +def runJoe (client: TCP.Socket.Client) : Async Unit := do + let mes ← await (client.recv? 1024) + assert! String.fromUTF8!<$> mes == some "hi joe! :)" + await (client.send (String.toUTF8 "hello robert!")) + await client.shutdown + +/-- Robert is the server. -/ +def runRobert (server: TCP.Socket.Server) : Async Unit := do + discard <| await server.accept + discard <| await server.accept + +def clientServer : IO Unit := do + let addr := SocketAddressV4.mk (.ofParts 127 0 0 1) 8080 + + let server ← TCP.Socket.Server.mk + server.bind addr + server.listen 128 + + assert! (← server.getSockName).port == 8080 + + let joe ← TCP.Socket.Client.mk + let task ← joe.connect addr + task.block + + assert! (← joe.getPeerName).port == 8080 + + joe.noDelay + + let mike ← TCP.Socket.Client.mk + let task ← mike.connect addr + task.block + + assert! (← mike.getPeerName).port == 8080 + + mike.noDelay + + let serverTask ← (runRobert server).run + + discard <| (runJoe joe).run + discard <| (runMike mike).run + serverTask.block + +end Async + +#eval Async.clientServer diff --git a/tests/lean/run/async_tcp_half.lean b/tests/lean/run/async_tcp_half.lean new file mode 100644 index 000000000000..700713676b01 --- /dev/null +++ b/tests/lean/run/async_tcp_half.lean @@ -0,0 +1,66 @@ +import Std.Internal.Async +import Std.Internal.UV +import Std.Net.Addr + +open Std.Internal.IO.Async +open Std.Net + +-- Define the Async monad +structure Async (α : Type) where + run : IO (AsyncTask α) + +namespace Async + +-- Monad instance for Async +instance : Monad Async where + pure x := Async.mk (pure (AsyncTask.pure x)) + bind ma f := Async.mk do + let task ← ma.run + task.bindIO fun a => (f a).run + +-- Await function to simplify AsyncTask handling +def await (task : IO (AsyncTask α)) : Async α := + Async.mk task + +instance : MonadLift IO Async where + monadLift io := Async.mk (io >>= (pure ∘ AsyncTask.pure)) + +/-- Joe is another client. -/ +def runJoe (addr: SocketAddress) : Async Unit := do + let client ← TCP.Socket.Client.mk + + await (client.connect addr) + await (client.send (String.toUTF8 "hello robert!")) + await client.shutdown + +def listenClose : IO Unit := do + let addr := SocketAddressV4.mk (.ofParts 127 0 0 1) 8080 + + let server ← TCP.Socket.Server.mk + server.bind addr + server.listen 128 + +def acceptClose : IO Unit := do + let addr := SocketAddressV4.mk (.ofParts 127 0 0 1) 8082 + + let server ← TCP.Socket.Server.mk + server.bind addr + server.listen 128 + + let _ ← (runJoe addr).run + + let task ← server.accept + let client ← task.block + + let mes ← client.recv? 1024 + let msg ← mes.block + + assert! (String.fromUTF8! <$> msg) == "hello robert!" + + let mes ← client.recv? 1024 + let msg ← mes.block + + assert! (String.fromUTF8! <$> msg) == none + +#eval acceptClose +#eval listenClose diff --git a/tests/lean/run/async_tcp_server_client.lean b/tests/lean/run/async_tcp_server_client.lean new file mode 100644 index 000000000000..0372348a37d8 --- /dev/null +++ b/tests/lean/run/async_tcp_server_client.lean @@ -0,0 +1,89 @@ +import Std.Internal.Async +import Std.Internal.UV +import Std.Net.Addr + +open Std.Internal.IO.Async +open Std.Net + +-- Define the Async monad +structure Async (α : Type) where + run : IO (AsyncTask α) + +namespace Async + +-- Monad instance for Async +instance : Monad Async where + pure x := Async.mk (pure (AsyncTask.pure x)) + bind ma f := Async.mk do + let task ← ma.run + task.bindIO fun a => (f a).run + +-- Await function to simplify AsyncTask handling +def await (task : IO (AsyncTask α)) : Async α := + Async.mk task + +instance : MonadLift IO Async where + monadLift io := Async.mk (io >>= (pure ∘ AsyncTask.pure)) + +-------------------------------------------------------------- + +/-- Mike is another client. -/ +def runMike (client: TCP.Socket.Client) : Async Unit := do + let mes ← await (client.recv? 1024) + assert! String.fromUTF8! <$> mes == some "hi mike!! :)" + await (client.send (String.toUTF8 "hello robert!!")) + await (client.shutdown) + +/-- Joe is another client. -/ +def runJoe (client: TCP.Socket.Client) : Async Unit := do + let mes ← await (client.recv? 1024) + assert! String.fromUTF8! <$> mes == some "hi joe! :)" + await (client.send (String.toUTF8 "hello robert!")) + await client.shutdown + +/-- Robert is the server. -/ +def runRobert (server: TCP.Socket.Server) : Async Unit := do + let joe ← await server.accept + let mike ← await server.accept + + await (joe.send (String.toUTF8 "hi joe! :)")) + let mes ← await (joe.recv? 1024) + assert! String.fromUTF8! <$> mes == some "hello robert!" + + await (mike.send (String.toUTF8 "hi mike!! :)")) + let mes ← await (mike.recv? 1024) + assert! String.fromUTF8! <$> mes == some "hello robert!!" + + pure () + +def clientServer (addr : SocketAddress) : IO Unit := do + let server ← TCP.Socket.Server.mk + server.bind addr + server.listen 128 + + assert! (← server.getSockName).port == addr.port + + let joe ← TCP.Socket.Client.mk + let task ← joe.connect addr + task.block + + assert! (← joe.getPeerName).port == addr.port + + joe.noDelay + + let mike ← TCP.Socket.Client.mk + let task ← mike.connect addr + task.block + + assert! (← mike.getPeerName).port == addr.port + + mike.noDelay + + let serverTask ← (runRobert server).run + + discard <| (runJoe joe).run + discard <| (runMike mike).run + serverTask.block + +#eval clientServer (SocketAddressV4.mk (.ofParts 127 0 0 1) 9000) +#eval clientServer (SocketAddressV6.mk (.ofParts 0 0 0 0 0 0 0 1) 9000)