Skip to content

Commit d365a54

Browse files
committed
golang: allow flushing buffer when processing the data asynchronously
Signed-off-by: spacewander <[email protected]>
1 parent a0c96b3 commit d365a54

File tree

15 files changed

+388
-0
lines changed

15 files changed

+388
-0
lines changed

.github/dependabot.yml

+10
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@ updates:
9090
interval: daily
9191
time: "06:00"
9292

93+
- package-ecosystem: "gomod"
94+
directory: "/contrib/golang/filters/http/test/test_data/buffer_flush"
95+
groups:
96+
contrib-golang:
97+
patterns:
98+
- "*"
99+
schedule:
100+
interval: daily
101+
time: "06:00"
102+
93103
- package-ecosystem: "gomod"
94104
directory: "/contrib/golang/filters/http/test/test_data/dummy"
95105
groups:

contrib/golang/common/go/api/api.h

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ CAPIStatus envoyGoFilterHttpGetBuffer(void* s, uint64_t buffer, void* value);
9191
CAPIStatus envoyGoFilterHttpDrainBuffer(void* s, uint64_t buffer, uint64_t length);
9292
CAPIStatus envoyGoFilterHttpSetBufferHelper(void* s, uint64_t buffer, void* data, int length,
9393
bufferAction action);
94+
CAPIStatus envoyGoFilterHttpFlushBuffer(void* s, uint64_t buffer, bool wait);
9495

9596
CAPIStatus envoyGoFilterHttpCopyTrailers(void* s, void* strs, void* buf);
9697
CAPIStatus envoyGoFilterHttpSetTrailer(void* s, void* key_data, int key_len, void* value,

contrib/golang/common/go/api/capi.go

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type HttpCAPI interface {
3838
HttpDrainBuffer(s unsafe.Pointer, bufferPtr uint64, length uint64)
3939
HttpSetBufferHelper(s unsafe.Pointer, bufferPtr uint64, value string, action BufferAction)
4040
HttpSetBytesBufferHelper(s unsafe.Pointer, bufferPtr uint64, value []byte, action BufferAction)
41+
HttpFlushBuffer(s unsafe.Pointer, bufferPtr uint64, wait bool)
4142

4243
HttpCopyTrailers(s unsafe.Pointer, num uint64, bytes uint64) map[string][]string
4344
HttpSetTrailer(s unsafe.Pointer, key string, value string, add bool)

contrib/golang/common/go/api/type.go

+3
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,9 @@ type BufferInstance interface {
254254

255255
// Append append the contents of the string data to the buffer.
256256
AppendString(s string) error
257+
258+
// TODO: Currently we only support wait=false
259+
Flush(wait bool)
257260
}
258261

259262
//*************** BufferInstance end **************//

contrib/golang/filters/http/source/cgo.cc

+8
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,14 @@ CAPIStatus envoyGoFilterHttpDrainBuffer(void* s, uint64_t buffer_ptr, uint64_t l
209209
});
210210
}
211211

212+
CAPIStatus envoyGoFilterHttpFlushBuffer(void* s, uint64_t buffer_ptr, bool wait) {
213+
return envoyGoFilterProcessStateHandlerWrapper(
214+
s, [buffer_ptr, wait](std::shared_ptr<Filter>& filter, ProcessorState& state) -> CAPIStatus {
215+
auto buffer = reinterpret_cast<Buffer::Instance*>(buffer_ptr);
216+
return filter->flushBuffer(state, buffer, wait);
217+
});
218+
}
219+
212220
CAPIStatus envoyGoFilterHttpSetBufferHelper(void* s, uint64_t buffer_ptr, void* data, int length,
213221
bufferAction action) {
214222
return envoyGoFilterProcessStateHandlerWrapper(

contrib/golang/filters/http/source/go/pkg/http/capi_impl.go

+6
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,12 @@ func (c *httpCApiImpl) HttpDrainBuffer(s unsafe.Pointer, bufferPtr uint64, lengt
230230
handleCApiStatus(res)
231231
}
232232

233+
func (c *httpCApiImpl) HttpFlushBuffer(s unsafe.Pointer, bufferPtr uint64, wait bool) {
234+
state := (*processState)(s)
235+
res := C.envoyGoFilterHttpFlushBuffer(unsafe.Pointer(state.processState), C.uint64_t(bufferPtr), C.bool(wait))
236+
handleCApiStatus(res)
237+
}
238+
233239
func (c *httpCApiImpl) HttpSetBufferHelper(s unsafe.Pointer, bufferPtr uint64, value string, action api.BufferAction) {
234240
state := (*processState)(s)
235241
c.httpSetBufferHelper(state, bufferPtr, unsafe.Pointer(unsafe.StringData(value)), C.int(len(value)), action)

contrib/golang/filters/http/source/go/pkg/http/type.go

+9
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,15 @@ func (b *httpBuffer) Reset() {
435435
b.Drain(b.Len())
436436
}
437437

438+
func (b *httpBuffer) Flush(wait bool) {
439+
if b.length == 0 {
440+
return
441+
}
442+
443+
cAPI.HttpFlushBuffer(unsafe.Pointer(b.state), b.envoyBufferInstance, wait)
444+
b.length = 0
445+
}
446+
438447
func (b *httpBuffer) String() string {
439448
if b.length == 0 {
440449
return ""

contrib/golang/filters/http/source/golang_filter.cc

+35
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,41 @@ CAPIStatus Filter::drainBuffer(ProcessorState& state, Buffer::Instance* buffer,
791791
return CAPIStatus::CAPIOK;
792792
}
793793

794+
CAPIStatus Filter::flushBuffer(ProcessorState& state, Buffer::Instance* buffer, bool wait) {
795+
// lock until this function return since it may running in a Go thread.
796+
Thread::LockGuard lock(mutex_);
797+
if (has_destroyed_) {
798+
ENVOY_LOG(debug, "golang filter has been destroyed");
799+
return CAPIStatus::CAPIFilterIsDestroy;
800+
}
801+
if (!state.isProcessingInGo()) {
802+
ENVOY_LOG(debug, "golang filter is not processing Go");
803+
return CAPIStatus::CAPINotInGo;
804+
}
805+
if (!state.doDataList.checkExisting(buffer)) {
806+
ENVOY_LOG(debug, "invoking cgo api at invalid state: {}", __func__);
807+
return CAPIStatus::CAPIInvalidPhase;
808+
}
809+
810+
auto data_to_write = std::make_shared<Buffer::OwnedImpl>();
811+
data_to_write->add(*buffer);
812+
buffer->drain(buffer->length());
813+
814+
auto weak_ptr = weak_from_this();
815+
state.getDispatcher().post([this, &state, weak_ptr, data_to_write, wait] {
816+
if (!weak_ptr.expired() && !hasDestroyed()) {
817+
ENVOY_LOG(debug, "golang filter inject data to filter chain, length: {}, wait: {}",
818+
data_to_write->length(), wait);
819+
state.injectDataToFilterChain(*data_to_write.get(), false);
820+
// TODO: handle wait
821+
} else {
822+
ENVOY_LOG(debug, "golang filter has gone or destroyed in flushBuffer event");
823+
}
824+
});
825+
826+
return CAPIStatus::CAPIOK;
827+
}
828+
794829
CAPIStatus Filter::setBufferHelper(ProcessorState& state, Buffer::Instance* buffer,
795830
absl::string_view& value, bufferAction action) {
796831
// lock until this function return since it may running in a Go thread.

contrib/golang/filters/http/source/golang_filter.h

+1
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ class Filter : public Http::StreamFilter,
278278
CAPIStatus removeHeader(ProcessorState& state, absl::string_view key);
279279
CAPIStatus copyBuffer(ProcessorState& state, Buffer::Instance* buffer, char* data);
280280
CAPIStatus drainBuffer(ProcessorState& state, Buffer::Instance* buffer, uint64_t length);
281+
CAPIStatus flushBuffer(ProcessorState& state, Buffer::Instance* buffer, bool wait);
281282
CAPIStatus setBufferHelper(ProcessorState& state, Buffer::Instance* buffer,
282283
absl::string_view& value, bufferAction action);
283284
CAPIStatus copyTrailers(ProcessorState& state, GoString* go_strs, char* go_buf);

contrib/golang/filters/http/test/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ envoy_cc_test(
5858
"//contrib/golang/filters/http/test/test_data/basic:filter.so",
5959
"//contrib/golang/filters/http/test/test_data/buffer:filter.so",
6060
"//contrib/golang/filters/http/test/test_data/echo:filter.so",
61+
"//contrib/golang/filters/http/test/test_data/buffer_flush:filter.so",
6162
"//contrib/golang/filters/http/test/test_data/metric:filter.so",
6263
"//contrib/golang/filters/http/test/test_data/passthrough:filter.so",
6364
"//contrib/golang/filters/http/test/test_data/property:filter.so",

contrib/golang/filters/http/test/golang_integration_test.cc

+111
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,7 @@ name: golang
810810
const std::string METRIC{"metric"};
811811
const std::string ACTION{"action"};
812812
const std::string ADDDATA{"add_data"};
813+
const std::string BUFFERFLUSH{"buffer_flush"};
813814
};
814815

815816
INSTANTIATE_TEST_SUITE_P(IpVersions, GolangIntegrationTest,
@@ -1350,6 +1351,116 @@ TEST_P(GolangIntegrationTest, AddDataBufferAllDataAndAsync) {
13501351
cleanup();
13511352
}
13521353

1354+
TEST_P(GolangIntegrationTest, BufferFlush_InBufferedDownstreamRequest) {
1355+
initializeBasicFilter(BUFFERFLUSH, "test.com");
1356+
1357+
codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
1358+
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
1359+
{":path", "/test?bufferingly_decode"},
1360+
{":scheme", "http"},
1361+
{":authority", "test.com"}};
1362+
1363+
auto encoder_decoder = codec_client_->startRequest(request_headers, false);
1364+
Http::RequestEncoder& request_encoder = encoder_decoder.first;
1365+
codec_client_->sendData(request_encoder, "To ", false);
1366+
codec_client_->sendData(request_encoder, "be, ", true);
1367+
1368+
waitForNextUpstreamRequest();
1369+
1370+
auto body = "To be, or not to be, that is the question";
1371+
EXPECT_EQ(body, upstream_request_->body().toString());
1372+
1373+
cleanup();
1374+
}
1375+
1376+
TEST_P(GolangIntegrationTest, BufferFlush_InNonBufferedDownstreamRequest) {
1377+
initializeBasicFilter(BUFFERFLUSH, "test.com");
1378+
1379+
codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
1380+
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
1381+
{":path", "/test?nonbufferingly_decode"},
1382+
{":scheme", "http"},
1383+
{":authority", "test.com"}};
1384+
1385+
auto encoder_decoder = codec_client_->startRequest(request_headers, false);
1386+
Http::RequestEncoder& request_encoder = encoder_decoder.first;
1387+
codec_client_->sendData(request_encoder, "To be, ", false);
1388+
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
1389+
Event::Dispatcher::RunType::NonBlock);
1390+
codec_client_->sendData(request_encoder, "that is ", true);
1391+
1392+
waitForNextUpstreamRequest();
1393+
1394+
auto body = "To be, or not to be, that is the question";
1395+
EXPECT_EQ(body, upstream_request_->body().toString());
1396+
1397+
cleanup();
1398+
}
1399+
1400+
TEST_P(GolangIntegrationTest, BufferFlush_InBufferedUpstreamResponse) {
1401+
initializeBasicFilter(BUFFERFLUSH, "test.com");
1402+
1403+
codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
1404+
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
1405+
{":path", "/test?bufferingly_encode"},
1406+
{":scheme", "http"},
1407+
{":authority", "test.com"}};
1408+
1409+
auto encoder_decoder = codec_client_->startRequest(request_headers, true);
1410+
auto response = std::move(encoder_decoder.second);
1411+
1412+
waitForNextUpstreamRequest();
1413+
1414+
Http::TestResponseHeaderMapImpl response_headers{
1415+
{":status", "200"},
1416+
};
1417+
upstream_request_->encodeHeaders(response_headers, false);
1418+
Buffer::OwnedImpl response_data("To ");
1419+
upstream_request_->encodeData(response_data, false);
1420+
Buffer::OwnedImpl response_data2("be, ");
1421+
upstream_request_->encodeData(response_data2, true);
1422+
1423+
ASSERT_TRUE(response->waitForEndStream());
1424+
1425+
auto body = "To be, or not to be, that is the question";
1426+
EXPECT_EQ(body, response->body());
1427+
1428+
cleanup();
1429+
}
1430+
1431+
TEST_P(GolangIntegrationTest, BufferFlush_InNonBufferedUpstreamResponse) {
1432+
initializeBasicFilter(BUFFERFLUSH, "test.com");
1433+
1434+
codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
1435+
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
1436+
{":path", "/test?nonbufferingly_encode"},
1437+
{":scheme", "http"},
1438+
{":authority", "test.com"}};
1439+
1440+
auto encoder_decoder = codec_client_->startRequest(request_headers, true);
1441+
auto response = std::move(encoder_decoder.second);
1442+
1443+
waitForNextUpstreamRequest();
1444+
1445+
Http::TestResponseHeaderMapImpl response_headers{
1446+
{":status", "200"},
1447+
};
1448+
upstream_request_->encodeHeaders(response_headers, false);
1449+
Buffer::OwnedImpl response_data("To be, ");
1450+
upstream_request_->encodeData(response_data, false);
1451+
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
1452+
Event::Dispatcher::RunType::NonBlock);
1453+
Buffer::OwnedImpl response_data2("that is ");
1454+
upstream_request_->encodeData(response_data2, true);
1455+
1456+
ASSERT_TRUE(response->waitForEndStream());
1457+
1458+
auto body = "To be, or not to be, that is the question";
1459+
EXPECT_EQ(body, response->body());
1460+
1461+
cleanup();
1462+
}
1463+
13531464
// Buffer exceed limit in decode header phase.
13541465
TEST_P(GolangIntegrationTest, BufferExceedLimit_DecodeHeader) {
13551466
testBufferExceedLimit("/test?databuffer=decode-header");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_binary")
2+
3+
licenses(["notice"]) # Apache 2
4+
5+
go_binary(
6+
name = "filter.so",
7+
srcs = [
8+
"config.go",
9+
"filter.go",
10+
],
11+
out = "filter.so",
12+
cgo = True,
13+
importpath = "github.com/envoyproxy/envoy/contrib/golang/filters/http/test/test_data/buffer_flush",
14+
linkmode = "c-shared",
15+
visibility = ["//visibility:public"],
16+
deps = [
17+
"//contrib/golang/common/go/api",
18+
"//contrib/golang/filters/http/source/go/pkg/http",
19+
"@com_github_cncf_xds_go//xds/type/v3:type",
20+
"@org_golang_google_protobuf//types/known/anypb",
21+
"@org_golang_google_protobuf//types/known/structpb",
22+
],
23+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package main
2+
3+
import (
4+
"google.golang.org/protobuf/types/known/anypb"
5+
6+
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
7+
"github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http"
8+
)
9+
10+
const Name = "buffer_flush"
11+
12+
func init() {
13+
http.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
14+
}
15+
16+
type config struct {
17+
}
18+
19+
type parser struct {
20+
}
21+
22+
func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) {
23+
return &config{}, nil
24+
}
25+
26+
func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
27+
return child
28+
}
29+
30+
func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
31+
conf, ok := c.(*config)
32+
if !ok {
33+
panic("unexpected config type")
34+
}
35+
return &filter{
36+
callbacks: callbacks,
37+
config: conf,
38+
}
39+
}
40+
41+
func main() {}

0 commit comments

Comments
 (0)