Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: reduce packet rate, add timeout mechanism #15

Merged
merged 3 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
GO ?= go
TAG ?= v0.0.15
TAG ?= v0.0.16
IMAGE ?= quay.io/cilium/test-connection-disruption
GOOS ?= linux
GOARCH ?= amd64
Expand Down
28 changes: 17 additions & 11 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ var stats struct {
var args struct {
addr string
interval time.Duration
latency time.Duration
timeout time.Duration
}

func main() {
flag.DurationVar(&args.interval, "dispatch-interval", 50*time.Millisecond, "TCP packet dispatch interval")
flag.DurationVar(&args.latency, "latency", 250*time.Millisecond, "Maximum expected latency for the connection, used for setting read deadlines")
flag.DurationVar(&args.timeout, "timeout", 5*time.Second, "Client exits when no reply is received within this duration")
flag.Parse()

args.addr = flag.Arg(0)
Expand All @@ -48,12 +48,12 @@ func main() {
os.Exit(1)
}

// For backwards compatibility, clamp the interval to a minimum of 1ms.
// Before the rewrite, 0ms meant roughly 5k pps due to latency, but now
// 500k pps can be achieved by disabling the interval.
// For backwards compatibility, clamp the interval to a minimum of 10ms to
// avoid overloading resource-constrained CI machines where Cilium runs with
// monitor aggregation disabled.
if args.interval == 0 {
args.interval = 500 * time.Microsecond
fmt.Println("Zero interval changed to", args.interval, "for backwards compatibility, otherwise bufferbloat will interfere.")
args.interval = 10 * time.Millisecond
fmt.Println("Zero interval changed to", args.interval, "for backwards compatibility.")
}

conn, err := dial()
Expand Down Expand Up @@ -121,7 +121,7 @@ func writer(ctx context.Context, cancel context.CancelFunc, conn net.Conn, reque
// behaviour.
runtime.LockOSThread()

fmt.Println("Sending requests at a target interval of", args.interval, "with max expected latency of", args.latency)
fmt.Println("Sending requests at a target interval of", args.interval, "with timeout of", args.timeout)

for {
// Immediately stop producing packets when the client is shutting down.
Expand Down Expand Up @@ -174,10 +174,10 @@ func reader(ctx context.Context, cancel context.CancelFunc, conn net.Conn, reque
// Stop the reader when the writer is done, or the ErrGroup will wait forever.
defer cancel()

last := time.Now()
reply := make([]byte, internal.MsgSize)
for {
deadline := args.interval + args.latency
if err := conn.SetReadDeadline(time.Now().Add(deadline)); err != nil {
if err := conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond)); err != nil {
return fmt.Errorf("set read deadline: %w", err)
}

Expand All @@ -198,7 +198,12 @@ func reader(ctx context.Context, cancel context.CancelFunc, conn net.Conn, reque
default:
}

return fmt.Errorf("no reply received within %v deadline: %w (%d tx, %d rx)", deadline, err, stats.tx.Load(), stats.rx.Load())
// Retry while the last reply was received within the timeout.
if time.Since(last) <= args.timeout {
continue
}

return fmt.Errorf("no reply received within %v timeout: %w", args.timeout, err)
}
if errors.Is(err, io.EOF) {
fmt.Println("Server closed the connection")
Expand All @@ -212,6 +217,7 @@ func reader(ctx context.Context, cancel context.CancelFunc, conn net.Conn, reque
return fmt.Errorf("invalid reply(%v) to request(%v)", reply, request)
}

last = time.Now()
stats.rx.Add(1)
stats.bytes.Add(internal.MsgSize)

Expand Down