Skip to content

Commit 11dee4e

Browse files
author
Sotiris Nanopoulos
authored
[Level Events] manage level events registration mask (envoyproxy#13787)
Signed-off-by: Sotiris Nanopoulos <[email protected]>
1 parent 928a62b commit 11dee4e

File tree

18 files changed

+254
-54
lines changed

18 files changed

+254
-54
lines changed

include/envoy/api/io_error.h

+6
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ template <typename ReturnValue> struct IoCallResult {
7171
*/
7272
bool ok() const { return err_ == nullptr; }
7373

74+
/**
75+
* This return code is frequent enough that we have a separate function to check.
76+
* @return true if the system call failed because the socket would block.
77+
*/
78+
bool wouldBlock() const { return !ok() && err_->getErrorCode() == IoError::IoErrorCode::Again; }
79+
7480
// TODO(danzh): rename it to be more meaningful, i.e. return_value_.
7581
ReturnValue rc_;
7682
IoErrorPtr err_;

include/envoy/event/file_event.h

+39-6
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,38 @@ struct FileReadyType {
1818
static const uint32_t Closed = 0x4;
1919
};
2020

21-
enum class FileTriggerType { Level, Edge };
21+
enum class FileTriggerType {
22+
// See @man 7 epoll(7)
23+
// They are used on all platforms for DNS and TCP listeners.
24+
Level,
25+
// See @man 7 epoll(7)
26+
// They are used on all platforms that support Edge triggering as the default trigger type.
27+
Edge,
28+
// These are synthetic edge events managed by Envoy. They are based on level events and when they
29+
// are activated they are immediately disabled. This makes them behave like Edge events. Then it
30+
// is is the responsibility of the consumer of the event to reactivate the event
31+
// when the socket operation would block.
32+
//
33+
// Their main application in Envoy is for Win32 which does not support edge-triggered events. They
34+
// should be used in Win32 instead of level events. They can only be used in platforms where
35+
// `PlatformDefaultTriggerType` is `FileTriggerType::EmulatedEdge`.
36+
EmulatedEdge
37+
};
2238

23-
static constexpr FileTriggerType PlatformDefaultTriggerType
24-
#ifdef WIN32
25-
// Libevent only supports Level trigger on Windows.
26-
{FileTriggerType::Level};
39+
// For POSIX developers to get the Windows behavior of file events
40+
// you need to add the following definition:
41+
// `FORCE_LEVEL_EVENTS`
42+
// You can do this with bazel if you add the following build/test options
43+
// `--copt="-DFORCE_LEVEL_EVENTS"`
44+
constexpr FileTriggerType determinePlatformPreferredEventType() {
45+
#if defined(WIN32) || defined(FORCE_LEVEL_EVENTS)
46+
return FileTriggerType::EmulatedEdge;
2747
#else
28-
{FileTriggerType::Edge};
48+
return FileTriggerType::Edge;
2949
#endif
50+
}
51+
52+
static constexpr FileTriggerType PlatformDefaultTriggerType = determinePlatformPreferredEventType();
3053

3154
/**
3255
* Callback invoked when a FileEvent is ready for reading or writing.
@@ -53,6 +76,16 @@ class FileEvent {
5376
* registered events and fire callbacks when they are active.
5477
*/
5578
virtual void setEnabled(uint32_t events) PURE;
79+
80+
/**
81+
* Add a single event from the event registration mark.
82+
*/
83+
virtual void registerEventIfEmulatedEdge(uint32_t event) PURE;
84+
85+
/**
86+
* Remove a single event from the event registration mark.
87+
*/
88+
virtual void unregisterEventIfEmulatedEdge(uint32_t event) PURE;
5689
};
5790

5891
using FileEventPtr = std::unique_ptr<FileEvent>;

source/common/event/file_event_impl.cc

+66-8
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace Event {
1212

1313
FileEventImpl::FileEventImpl(DispatcherImpl& dispatcher, os_fd_t fd, FileReadyCb cb,
1414
FileTriggerType trigger, uint32_t events)
15-
: cb_(cb), fd_(fd), trigger_(trigger),
15+
: cb_(cb), fd_(fd), trigger_(trigger), enabled_events_(events),
1616
activation_cb_(dispatcher.createSchedulableCallback([this]() {
1717
ASSERT(injected_activation_events_ != 0);
1818
mergeInjectedEventsAndRunCb(0);
@@ -21,9 +21,13 @@ FileEventImpl::FileEventImpl(DispatcherImpl& dispatcher, os_fd_t fd, FileReadyCb
2121
// an OOM condition and just crash.
2222
RELEASE_ASSERT(SOCKET_VALID(fd), "");
2323
#ifdef WIN32
24-
RELEASE_ASSERT(trigger_ == FileTriggerType::Level,
25-
"libevent does not support edge triggers on Windows");
24+
ASSERT(trigger_ != FileTriggerType::Edge, "libevent does not support edge triggers on Windows");
2625
#endif
26+
if constexpr (PlatformDefaultTriggerType != FileTriggerType::EmulatedEdge) {
27+
ASSERT(trigger_ != FileTriggerType::EmulatedEdge,
28+
"Cannot use EmulatedEdge events if they are not the default platform type");
29+
}
30+
2731
assignEvents(events, &dispatcher.base());
2832
event_add(&raw_event_, nullptr);
2933
}
@@ -48,9 +52,10 @@ void FileEventImpl::activate(uint32_t events) {
4852

4953
void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
5054
ASSERT(base != nullptr);
55+
enabled_events_ = events;
5156
event_assign(
5257
&raw_event_, base, fd_,
53-
EV_PERSIST | (trigger_ == FileTriggerType::Level ? 0 : EV_ET) |
58+
EV_PERSIST | (trigger_ == FileTriggerType::Edge ? EV_ET : 0) |
5459
(events & FileReadyType::Read ? EV_READ : 0) |
5560
(events & FileReadyType::Write ? EV_WRITE : 0) |
5661
(events & FileReadyType::Closed ? EV_CLOSED : 0),
@@ -75,6 +80,16 @@ void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
7580
this);
7681
}
7782

83+
void FileEventImpl::updateEvents(uint32_t events) {
84+
if (events == enabled_events_) {
85+
return;
86+
}
87+
auto* base = event_get_base(&raw_event_);
88+
event_del(&raw_event_);
89+
assignEvents(events, base);
90+
event_add(&raw_event_, nullptr);
91+
}
92+
7893
void FileEventImpl::setEnabled(uint32_t events) {
7994
if (injected_activation_events_ != 0) {
8095
// Clear pending events on updates to the fd event mask to avoid delivering events that are no
@@ -84,19 +99,62 @@ void FileEventImpl::setEnabled(uint32_t events) {
8499
injected_activation_events_ = 0;
85100
activation_cb_->cancel();
86101
}
102+
updateEvents(events);
103+
}
87104

88-
auto* base = event_get_base(&raw_event_);
89-
event_del(&raw_event_);
90-
assignEvents(events, base);
91-
event_add(&raw_event_, nullptr);
105+
void FileEventImpl::unregisterEventIfEmulatedEdge(uint32_t event) {
106+
// This constexpr if allows the compiler to optimize away the function on POSIX
107+
if constexpr (PlatformDefaultTriggerType == FileTriggerType::EmulatedEdge) {
108+
ASSERT((event & (FileReadyType::Read | FileReadyType::Write)) == event);
109+
if (trigger_ == FileTriggerType::EmulatedEdge) {
110+
auto new_event_mask = enabled_events_ & ~event;
111+
updateEvents(new_event_mask);
112+
}
113+
}
114+
}
115+
116+
void FileEventImpl::registerEventIfEmulatedEdge(uint32_t event) {
117+
// This constexpr if allows the compiler to optimize away the function on POSIX
118+
if constexpr (PlatformDefaultTriggerType == FileTriggerType::EmulatedEdge) {
119+
ASSERT((event & (FileReadyType::Read | FileReadyType::Write)) == event);
120+
if (trigger_ == FileTriggerType::EmulatedEdge) {
121+
auto new_event_mask = enabled_events_ | event;
122+
if (event & FileReadyType::Read && (enabled_events_ & FileReadyType::Closed)) {
123+
// We never ask for both early close and read at the same time.
124+
new_event_mask = new_event_mask & ~FileReadyType::Read;
125+
}
126+
updateEvents(new_event_mask);
127+
}
128+
}
92129
}
93130

94131
void FileEventImpl::mergeInjectedEventsAndRunCb(uint32_t events) {
95132
if (injected_activation_events_ != 0) {
133+
// TODO(antoniovicente) remove this adjustment to activation events once ConnectionImpl can
134+
// handle Read and Close events delivered together.
135+
if constexpr (PlatformDefaultTriggerType == FileTriggerType::EmulatedEdge) {
136+
if (events & FileReadyType::Closed && injected_activation_events_ & FileReadyType::Read) {
137+
// We never ask for both early close and read at the same time. If close is requested
138+
// keep that instead.
139+
injected_activation_events_ = injected_activation_events_ & ~FileReadyType::Read;
140+
}
141+
}
142+
96143
events |= injected_activation_events_;
97144
injected_activation_events_ = 0;
98145
activation_cb_->cancel();
99146
}
147+
148+
// TODO(davinci26): This can be optimized further in (w)epoll backends using the `EPOLLONESHOT`
149+
// flag. With this flag `EPOLLIN`/`EPOLLOUT` are automatically disabled when the event is
150+
// activated.
151+
if constexpr (PlatformDefaultTriggerType == FileTriggerType::EmulatedEdge) {
152+
if (trigger_ == FileTriggerType::EmulatedEdge) {
153+
unregisterEventIfEmulatedEdge(events &
154+
(Event::FileReadyType::Write | Event::FileReadyType::Read));
155+
}
156+
}
157+
100158
cb_(events);
101159
}
102160

source/common/event/file_event_impl.h

+5
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,19 @@ class FileEventImpl : public FileEvent, ImplBase {
2222
// Event::FileEvent
2323
void activate(uint32_t events) override;
2424
void setEnabled(uint32_t events) override;
25+
void unregisterEventIfEmulatedEdge(uint32_t event) override;
26+
void registerEventIfEmulatedEdge(uint32_t event) override;
2527

2628
private:
2729
void assignEvents(uint32_t events, event_base* base);
2830
void mergeInjectedEventsAndRunCb(uint32_t events);
31+
void updateEvents(uint32_t events);
2932

3033
FileReadyCb cb_;
3134
os_fd_t fd_;
3235
FileTriggerType trigger_;
36+
// Enabled events for this fd.
37+
uint32_t enabled_events_;
3338

3439
// Injected FileReadyType events that were scheduled by recent calls to activate() and are pending
3540
// delivery.

source/common/event/libevent_scheduler.h

+3-4
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,9 @@ class LibeventScheduler : public Scheduler, public CallbackScheduler {
108108

109109
static constexpr int flagsBasedOnEventType() {
110110
if constexpr (Event::PlatformDefaultTriggerType == FileTriggerType::Level) {
111-
// On Windows, EVLOOP_NONBLOCK will cause the libevent event_base_loop to run forever.
112-
// This is because libevent only supports level triggering on Windows, and so the write
113-
// event callbacks will trigger every time through the loop. Adding EVLOOP_ONCE ensures the
114-
// loop will run at most once
111+
// With level events, EVLOOP_NONBLOCK will cause the libevent event_base_loop to run
112+
// forever. This is because the write event callbacks will trigger every time through the
113+
// loop. Adding EVLOOP_ONCE ensures the loop will run at most once
115114
return EVLOOP_NONBLOCK | EVLOOP_ONCE;
116115
}
117116
return EVLOOP_NONBLOCK;

source/common/network/io_socket_handle_impl.cc

+85-7
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::readv(uint64_t max_length, Buffer::R
8484
num_bytes_to_read += slice_length;
8585
}
8686
ASSERT(num_bytes_to_read <= max_length);
87-
return sysCallResultToIoCallResult(Api::OsSysCallsSingleton::get().readv(
87+
auto result = sysCallResultToIoCallResult(Api::OsSysCallsSingleton::get().readv(
8888
fd_, iov.begin(), static_cast<int>(num_slices_to_read)));
89+
90+
// Emulated edge events need to registered if the socket operation did not complete
91+
// because the socket would block.
92+
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
93+
// Some tests try to read without initializing the file_event.
94+
if (result.wouldBlock() && file_event_) {
95+
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
96+
}
97+
}
98+
return result;
8999
}
90100

91101
Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer, uint64_t max_length) {
@@ -103,6 +113,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer, uint6
103113
bytes_to_commit -= slices[i].len_;
104114
}
105115
buffer.commit(slices, num_slices);
116+
117+
// Emulated edge events need to registered if the socket operation did not complete
118+
// because the socket would block.
119+
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
120+
// Some tests try to read without initializing the file_event.
121+
if (result.wouldBlock() && file_event_) {
122+
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
123+
}
124+
}
106125
return result;
107126
}
108127

@@ -120,8 +139,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::writev(const Buffer::RawSlice* slice
120139
if (num_slices_to_write == 0) {
121140
return Api::ioCallUint64ResultNoError();
122141
}
123-
return sysCallResultToIoCallResult(
142+
auto result = sysCallResultToIoCallResult(
124143
Api::OsSysCallsSingleton::get().writev(fd_, iov.begin(), num_slices_to_write));
144+
145+
// Emulated edge events need to registered if the socket operation did not complete
146+
// because the socket would block.
147+
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
148+
// Some tests try to write without initializing the file_event.
149+
if (result.wouldBlock() && file_event_) {
150+
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Write);
151+
}
152+
}
153+
return result;
125154
}
126155

127156
Api::IoCallUint64Result IoSocketHandleImpl::write(Buffer::Instance& buffer) {
@@ -131,6 +160,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::write(Buffer::Instance& buffer) {
131160
if (result.ok() && result.rc_ > 0) {
132161
buffer.drain(static_cast<uint64_t>(result.rc_));
133162
}
163+
164+
// Emulated edge events need to registered if the socket operation did not complete
165+
// because the socket would block.
166+
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
167+
// Some tests try to read without initializing the file_event.
168+
if (result.wouldBlock() && file_event_) {
169+
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Write);
170+
}
171+
}
134172
return result;
135173
}
136174

@@ -168,7 +206,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::sendmsg(const Buffer::RawSlice* slic
168206
message.msg_control = nullptr;
169207
message.msg_controllen = 0;
170208
const Api::SysCallSizeResult result = os_syscalls.sendmsg(fd_, &message, flags);
171-
return sysCallResultToIoCallResult(result);
209+
auto io_result = sysCallResultToIoCallResult(result);
210+
// Emulated edge events need to registered if the socket operation did not complete
211+
// because the socket would block.
212+
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
213+
if (io_result.wouldBlock() && file_event_) {
214+
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Write);
215+
}
216+
}
217+
return io_result;
172218
} else {
173219
const size_t space_v6 = CMSG_SPACE(sizeof(in6_pktinfo));
174220
const size_t space_v4 = CMSG_SPACE(sizeof(in_pktinfo));
@@ -210,7 +256,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::sendmsg(const Buffer::RawSlice* slic
210256
*(reinterpret_cast<absl::uint128*>(pktinfo->ipi6_addr.s6_addr)) = self_ip->ipv6()->address();
211257
}
212258
const Api::SysCallSizeResult result = os_syscalls.sendmsg(fd_, &message, flags);
213-
return sysCallResultToIoCallResult(result);
259+
auto io_result = sysCallResultToIoCallResult(result);
260+
// Emulated edge events need to registered if the socket operation did not complete
261+
// because the socket would block.
262+
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
263+
if (io_result.wouldBlock() && file_event_) {
264+
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Write);
265+
}
266+
}
267+
return io_result;
214268
}
215269
}
216270

@@ -298,7 +352,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
298352
hdr.msg_controllen = cmsg_space_;
299353
const Api::SysCallSizeResult result = Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, 0);
300354
if (result.rc_ < 0) {
301-
return sysCallResultToIoCallResult(result);
355+
auto io_result = sysCallResultToIoCallResult(result);
356+
// Emulated edge events need to registered if the socket operation did not complete
357+
// because the socket would block.
358+
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
359+
if (io_result.wouldBlock() && file_event_) {
360+
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
361+
}
362+
}
363+
return io_result;
302364
}
303365

304366
RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
@@ -381,7 +443,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
381443
fd_, mmsg_hdr.data(), num_packets_per_mmsg_call, MSG_TRUNC | MSG_WAITFORONE, nullptr);
382444

383445
if (result.rc_ <= 0) {
384-
return sysCallResultToIoCallResult(result);
446+
auto io_result = sysCallResultToIoCallResult(result);
447+
// Emulated edge events need to registered if the socket operation did not complete
448+
// because the socket would block.
449+
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
450+
if (io_result.wouldBlock() && file_event_) {
451+
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
452+
}
453+
}
454+
return io_result;
385455
}
386456

387457
int num_packets_read = result.rc_;
@@ -435,7 +505,15 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
435505
Api::IoCallUint64Result IoSocketHandleImpl::recv(void* buffer, size_t length, int flags) {
436506
const Api::SysCallSizeResult result =
437507
Api::OsSysCallsSingleton::get().recv(fd_, buffer, length, flags);
438-
return sysCallResultToIoCallResult(result);
508+
auto io_result = sysCallResultToIoCallResult(result);
509+
// Emulated edge events need to registered if the socket operation did not complete
510+
// because the socket would block.
511+
if constexpr (Event::PlatformDefaultTriggerType == Event::FileTriggerType::EmulatedEdge) {
512+
if (io_result.wouldBlock() && file_event_) {
513+
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
514+
}
515+
}
516+
return io_result;
439517
}
440518

441519
bool IoSocketHandleImpl::supportsMmsg() const {

0 commit comments

Comments
 (0)