Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: TCP socket support using LibUV #6683

Draft
wants to merge 29 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b875f26
feat: implement the basic libuv event loop
algebraic-dev Jan 2, 2025
7528056
doc: the event loop
hargoniX Jan 2, 2025
50cc599
fix: make the cond var safe and document it
hargoniX Jan 2, 2025
cd71bf0
feat: add async timer primitives based on libuv (#6219)
algebraic-dev Jan 6, 2025
f1e506a
feat: asynchronous timer API (#6306)
hargoniX Jan 10, 2025
c18310d
feat: start of the uv tc
algebraic-dev Jan 10, 2025
5726fcd
feat: basic tcp structure
algebraic-dev Jan 10, 2025
f376f93
Merge branch 'ft-async' of github.com:leanprover/lean4 into tcp-socket
algebraic-dev Jan 10, 2025
da8919a
feat: progress on connection
algebraic-dev Jan 14, 2025
04621bb
feat: three functions
algebraic-dev Jan 14, 2025
7a1c978
merge branch 'master' of github.com:leanprover/lean4 into tcp-socket
algebraic-dev Jan 17, 2025
260d5cb
chore: remove useless comment
algebraic-dev Jan 18, 2025
8e2fe1c
chore: remove useless modifications
algebraic-dev Jan 18, 2025
9c2e66b
feat: impl all functions
algebraic-dev Jan 23, 2025
c08d70b
fix: namespace brace
algebraic-dev Jan 23, 2025
889f9cb
fix: memory leak in accept
algebraic-dev Jan 23, 2025
cc2baa4
chore: stylistic changes
algebraic-dev Jan 23, 2025
28fe474
chore: small changes
algebraic-dev Jan 23, 2025
6e6016e
chore: remove useless print
algebraic-dev Jan 23, 2025
abf70f8
chore: comments and messages
algebraic-dev Jan 28, 2025
128aec9
chore: move things around
algebraic-dev Jan 28, 2025
4ec0f5f
Merge branch 'libuv-move' of github.com:algebraic-dev/lean4 into tcp-…
algebraic-dev Jan 29, 2025
fdd5599
fix: behavior of loop configure function
algebraic-dev Jan 29, 2025
9e39fbd
Merge branch 'libuv-move' of github.com:algebraic-dev/lean4 into tcp-…
algebraic-dev Jan 29, 2025
989f4f6
feat: high level interface and fix some memory leaks
algebraic-dev Jan 29, 2025
21f3829
feat: opt in client accept and fix some lean_dec and lean_inc
algebraic-dev Jan 30, 2025
05dc16a
feat: add shutdown and warning
algebraic-dev Jan 30, 2025
cefb9f0
fix: replace new with malloc
algebraic-dev Jan 30, 2025
20f04e8
chore: remove useless file
algebraic-dev Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Std/Internal/Async.lean
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ Authors: Henrik Böving
prelude
import Std.Internal.Async.Basic
import Std.Internal.Async.Timer
import Std.Internal.Async.Tcp
120 changes: 120 additions & 0 deletions src/Std/Internal/Async/Tcp.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
prelude
import Std.Time
import Std.Internal.UV
import Std.Internal.Async.Basic
import Std.Net.Addr

namespace Std
namespace Internal
namespace IO
namespace Async
namespace Tcp

open Std.Net

/--
A high-level wrapper around a TCP socket.
-/
structure Socket where
private ofNative ::
native : Internal.UV.Tcp.Socket

namespace Socket

/--
Create a new TCP socket.
-/
@[inline]
def mk : IO Socket := do
let native ← Internal.UV.Tcp.Socket.new
return Socket.ofNative native

/--
Bind the socket to the given address.
-/
@[inline]
def bind (s : Socket) (addr : SocketAddress) : IO Unit :=
s.native.bind addr

/--
Connect the socket to the given address.
-/
@[inline]
def connect (s : Socket) (addr : SocketAddress) : IO (AsyncTask Unit) :=
AsyncTask.ofPromise <$> s.native.connect addr

/--
Listen for incoming connections with the given backlog.
-/
@[inline]
def listen (s : Socket) (backlog : Int32) : IO Unit :=
s.native.listen backlog

/--
Accept an incoming connection.
-/
@[inline]
def accept (s : Socket) : IO (AsyncTask Socket) := do
let conn ← s.native.accept
return conn.result.map (·.map Socket.ofNative)

/--
Send data through the socket.
-/
@[inline]
def send (s : Socket) (data : ByteArray) : IO (AsyncTask Unit) :=
AsyncTask.ofPromise <$> s.native.send data

/--
Receive data from the socket.
-/
@[inline]
def recv (s : Socket) : IO (AsyncTask ByteArray) :=
AsyncTask.ofPromise <$> s.native.recv

/--
Receive data from the socket.
-/
@[inline]
def shutdown (s : Socket) : IO (AsyncTask Unit) :=
AsyncTask.ofPromise <$> s.native.shutdown

/--
Get the remote address of the connected socket.
-/
@[inline]
def getPeerName (s : Socket) : IO SocketAddress :=
s.native.getpeername

/--
Get the local address of the socket.
-/
@[inline]
def getSockName (s : Socket) : IO SocketAddress :=
s.native.getsockname

/--
Enable or disable the Nagle algorithm.
-/
@[inline]
def noDelay (s : Socket) : IO Unit :=
s.native.nodelay

/--
Enable or disable TCP keep-alive with a specified delay.
-/
@[inline]
def keepAlive (s : Socket) (enable : Int32) (delay : UInt32) : IO Unit :=
s.native.keepalive enable delay

end Socket
end Tcp
end Async
end IO
end Internal
end Std
113 changes: 3 additions & 110 deletions src/Std/Internal/UV.lean
Original file line number Diff line number Diff line change
Expand Up @@ -7,113 +7,6 @@ prelude
import Init.System.IO
import Init.System.Promise

namespace Std
namespace Internal
namespace UV

namespace Loop

/--
Options for configuring the event loop behavior.
-/
structure Loop.Options where
/--
Accumulate the amount of idle time the event loop spends in the event provider.
-/
accumulateIdleTime : Bool := False

/--
Block a SIGPROF signal when polling for new events. It's commonly used for unnecessary wakeups
when using a sampling profiler.
-/
blockSigProfSignal : Bool := False

/--
Configures the event loop with the specified options.
-/
@[extern "lean_uv_event_loop_configure"]
opaque configure (options : Loop.Options) : BaseIO Unit

/--
Checks if the event loop is still active and processing events.
-/
@[extern "lean_uv_event_loop_alive"]
opaque alive : BaseIO Bool

end Loop

private opaque TimerImpl : NonemptyType.{0}

/--
`Timer`s are used to generate `IO.Promise`s that resolve after some time.

A `Timer` can be in one of 3 states:
- Right after construction it's initial.
- While it is ticking it's running.
- If it has stopped for some reason it's finished.

This together with whether it was set up as `repeating` with `Timer.new` determines the behavior
of all functions on `Timer`s.
-/
def Timer : Type := TimerImpl.type

instance : Nonempty Timer := TimerImpl.property

namespace Timer

/--
This creates a `Timer` in the initial state and doesn't run it yet.
- If `repeating` is `false` this constructs a timer that resolves once after `durationMs`
milliseconds, counting from when it's run.
- If `repeating` is `true` this constructs a timer that resolves after multiples of `durationMs`
milliseconds, counting from when it's run. Note that this includes the 0th multiple right after
starting the timer. Furthermore a repeating timer will only be freed after `Timer.stop` is called.
-/
@[extern "lean_uv_timer_mk"]
opaque mk (timeout : UInt64) (repeating : Bool) : IO Timer

/--
This function has different behavior depending on the state and configuration of the `Timer`:
- if `repeating` is `false` and:
- it is initial, run it and return a new `IO.Promise` that is set to resolve once `durationMs`
milliseconds have elapsed. After this `IO.Promise` is resolved the `Timer` is finished.
- it is running or finished, return the same `IO.Promise` that the first call to `next` returned.
- if `repeating` is `true` and:
- it is initial, run it and return a new `IO.Promise` that resolves right away
(as it is the 0th multiple of `durationMs`).
- it is running, check whether the last returned `IO.Promise` is already resolved:
- If it is, return a new `IO.Promise` that resolves upon finishing the next cycle
- If it is not, return the last `IO.Promise`
This ensures that the returned `IO.Promise` resolves at the next repetition of the timer.
- if it is finished, return the last `IO.Promise` created by `next`. Notably this could be one
that never resolves if the timer was stopped before fulfilling the last one.
-/
@[extern "lean_uv_timer_next"]
opaque next (timer : @& Timer) : IO (IO.Promise Unit)

/--
This function has different behavior depending on the state and configuration of the `Timer`:
- If it is initial or finished this is a no-op.
- If it is running and `repeating` is `false` this will delay the resolution of the timer until
`durationMs` milliseconds after the call of this function.
- Delay the resolution of the next tick of the timer until `durationMs` milliseconds after the
call of this function, then continue normal ticking behavior from there.
-/
@[extern "lean_uv_timer_reset"]
opaque reset (timer : @& Timer) : IO Unit

/--
This function has different behavior depending on the state of the `Timer`:
- If it is initial or finished this is a no-op.
- If it is running the execution of the timer is stopped and it is put into the finished state.
Note that if the last `IO.Promise` generated by `next` is unresolved and being waited
on this creates a memory leak and the waiting task is not going to be awoken anymore.
-/
@[extern "lean_uv_timer_stop"]
opaque stop (timer : @& Timer) : IO Unit

end Timer

end UV
end Internal
end Std
import Std.Internal.UV.Loop
import Std.Internal.UV.Timer
import Std.Internal.UV.Tcp
45 changes: 45 additions & 0 deletions src/Std/Internal/UV/Loop.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
prelude
import Init.System.IO
import Init.System.Promise

namespace Std
namespace Internal
namespace UV
namespace Loop

/--
Options for configuring the event loop behavior.
-/
structure Options where
/--
Accumulate the amount of idle time the event loop spends in the event provider.
-/
accumulateIdleTime : Bool := False

/--
Block a SIGPROF signal when polling for new events. It's commonly used for unnecessary wakeups
when using a sampling profiler.
-/
blockSigProfSignal : Bool := False

/--
Configures the event loop with the specified options.
-/
@[extern "lean_uv_event_loop_configure"]
opaque configure (options : Options) : BaseIO Unit

/--
Checks if the event loop is still active and processing events.
-/
@[extern "lean_uv_event_loop_alive"]
opaque alive : BaseIO Bool

end Loop
end UV
end Internal
end Std
105 changes: 105 additions & 0 deletions src/Std/Internal/UV/Tcp.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving, Sofia Rodrigues
-/
prelude
import Init.System.IO
import Init.System.Promise
import Std.Net

namespace Std
namespace Internal
namespace UV
namespace Tcp

open Std.Net

private opaque SocketImpl : NonemptyType.{0}

/--
Represents a TCP socket.
-/
def Socket : Type := SocketImpl.type

instance : Nonempty Socket := SocketImpl.property

namespace Socket

/--
Creates a new TCP socket.
-/
@[extern "lean_uv_tcp_new"]
opaque new : IO Socket

/--
Connect a TCP socket to the specified address.
-/
@[extern "lean_uv_tcp_connect"]
opaque connect (socket : @& Socket) (addr : SocketAddress) : IO (IO.Promise (Except IO.Error Unit))

/--
Send data through a TCP socket.
-/
@[extern "lean_uv_tcp_send"]
opaque send (socket : @& Socket) (data : ByteArray) : IO (IO.Promise (Except IO.Error Unit))

/--
Receive data from a TCP socket.
-/
@[extern "lean_uv_tcp_recv"]
opaque recv (socket : @& Socket) : IO (IO.Promise (Except IO.Error ByteArray))

/--
Bind a TCP socket to a specific address.
-/
@[extern "lean_uv_tcp_bind"]
opaque bind (socket : @& Socket) (addr : SocketAddress) : IO Unit

/--
Start listening for incoming connections on a TCP socket.
-/
@[extern "lean_uv_tcp_listen"]
opaque listen (socket : @& Socket) (backlog : Int32) : IO Unit

/--
Accept an incoming connection on a listening TCP socket.
-/
@[extern "lean_uv_tcp_accept"]
opaque accept (socket : @& Socket) : IO (IO.Promise (Except IO.Error Socket))

/--
Accept an incoming connection on a listening TCP socket.
-/
@[extern "lean_uv_tcp_shutdown"]
opaque shutdown (socket : @& Socket) : IO (IO.Promise (Except IO.Error Unit))

/--
Get the remote address of a connected TCP socket.
-/
@[extern "lean_uv_tcp_getpeername"]
opaque getpeername (socket : @& Socket) : IO SocketAddress

/--
Get the local address of a bound TCP socket.
-/
@[extern "lean_uv_tcp_getsockname"]
opaque getsockname (socket : @& Socket) : IO SocketAddress

/--
Enable or disable the Nagle algorithm for a TCP socket.
-/
@[extern "lean_uv_tcp_nodelay"]
opaque nodelay (socket : @& Socket) : IO Unit

/--
Enable or disable TCP keep-alive for a socket.
-/
@[extern "lean_uv_tcp_keepalive"]
opaque keepalive (socket : @& Socket) (enable : Int32) (delay : UInt32) : IO Unit

end Socket
end Tcp
end UV
end Internal
end Std
Loading
Loading