Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions components/execd/pkg/flag/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ var (

// ApiGracefulShutdownTimeout waits before tearing down SSE streams.
ApiGracefulShutdownTimeout time.Duration

// JupyterIdlePollInterval controls how often ExecuteCodeStream checks for
// late execute_result/error messages after receiving idle status.
JupyterIdlePollInterval time.Duration
)
15 changes: 15 additions & 0 deletions components/execd/pkg/flag/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
jupyterHostEnv = "JUPYTER_HOST"
jupyterTokenEnv = "JUPYTER_TOKEN"
gracefulShutdownTimeoutEnv = "EXECD_API_GRACE_SHUTDOWN"
jupyterIdlePollIntervalEnv = "EXECD_JUPYTER_IDLE_POLL_INTERVAL"
)

// InitFlags registers CLI flags and env overrides.
Expand All @@ -37,6 +38,7 @@ func InitFlags() {
ServerLogLevel = 6
ServerAccessToken = ""
ApiGracefulShutdownTimeout = time.Second * 1
JupyterIdlePollInterval = 10 * time.Millisecond

// First, set default values from environment variables
if jupyterFromEnv := os.Getenv(jupyterHostEnv); jupyterFromEnv != "" {
Expand Down Expand Up @@ -65,7 +67,20 @@ func InitFlags() {
ApiGracefulShutdownTimeout = duration
}

if idlePollInterval := os.Getenv(jupyterIdlePollIntervalEnv); idlePollInterval != "" {
duration, err := time.ParseDuration(idlePollInterval)
if err != nil {
stdlog.Panicf("Failed to parse jupyter idle poll interval from env: %v", err)
}
if duration <= 0 {
stdlog.Printf("Invalid %s=%s; fallback to default %s", jupyterIdlePollIntervalEnv, idlePollInterval, JupyterIdlePollInterval)
} else {
JupyterIdlePollInterval = duration
}
}

flag.DurationVar(&ApiGracefulShutdownTimeout, "graceful-shutdown-timeout", ApiGracefulShutdownTimeout, "API graceful shutdown timeout duration (default: 3s)")
flag.DurationVar(&JupyterIdlePollInterval, "jupyter-idle-poll-interval", JupyterIdlePollInterval, "Polling interval after Jupyter idle status before closing stream (default: 10ms)")

// Parse flags - these will override environment variables if provided
flag.Parse()
Expand Down
17 changes: 15 additions & 2 deletions components/execd/pkg/jupyter/execute/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

"github.com/google/uuid"
"github.com/gorilla/websocket"

execdflag "github.com/alibaba/opensandbox/execd/pkg/flag"
)

// HTTPClient defines the HTTP client interface
Expand Down Expand Up @@ -268,8 +270,19 @@ func (c *Client) ExecuteCodeStream(code string, resultChan chan *ExecutionResult
resultChan <- notify
resultMutex.Unlock()

for result.ExecutionCount <= 0 && result.Error == nil {
time.Sleep(300 * time.Millisecond)
pollInterval := execdflag.JupyterIdlePollInterval
if pollInterval <= 0 {
pollInterval = 10 * time.Millisecond
}

for {
resultMutex.Lock()
done := result.ExecutionCount > 0 || result.Error != nil
resultMutex.Unlock()
if done {
break
}
time.Sleep(pollInterval)
}

// Close result channel
Expand Down
33 changes: 30 additions & 3 deletions components/execd/pkg/web/controller/codeinterpreting.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ func (c *CodeInterpretingController) RunCode() {
defer cancel()
runCodeRequest := c.buildExecuteCodeRequest(request)
eventsHandler := c.setServerEventsHandler(ctx)

// completeCh is closed when OnExecuteComplete fires, meaning the final SSE
// event has been written and flushed. We only wait for this callback as a
// safety check and then return immediately to avoid fixed tail latency.
completeCh := make(chan struct{})
origComplete := eventsHandler.OnExecuteComplete
eventsHandler.OnExecuteComplete = func(executionTime time.Duration) {
origComplete(executionTime)
close(completeCh)
}
runCodeRequest.Hooks = eventsHandler

c.setupSSEResponse()
Expand All @@ -126,7 +136,10 @@ func (c *CodeInterpretingController) RunCode() {
return
}

time.Sleep(flag.ApiGracefulShutdownTimeout)
select {
case <-completeCh:
case <-time.After(flag.ApiGracefulShutdownTimeout):
}
}

// GetContext returns a specific code context by id.
Expand Down Expand Up @@ -305,7 +318,18 @@ func (c *CodeInterpretingController) RunInSession() {
}
ctx, cancel := context.WithCancel(c.ctx.Request.Context())
defer cancel()
runReq.Hooks = c.setServerEventsHandler(ctx)

// completeCh is closed when OnExecuteComplete fires, meaning the final SSE
// event has been written and flushed. We only wait for this callback as a
// safety check and then return immediately to avoid fixed tail latency.
completeCh := make(chan struct{})
hooks := c.setServerEventsHandler(ctx)
origComplete := hooks.OnExecuteComplete
hooks.OnExecuteComplete = func(executionTime time.Duration) {
origComplete(executionTime)
close(completeCh)
}
runReq.Hooks = hooks

c.setupSSEResponse()
err := codeRunner.RunInBashSession(ctx, runReq)
Expand All @@ -318,7 +342,10 @@ func (c *CodeInterpretingController) RunInSession() {
return
}

time.Sleep(flag.ApiGracefulShutdownTimeout)
select {
case <-completeCh:
case <-time.After(flag.ApiGracefulShutdownTimeout):
}
}

// DeleteSession deletes a bash session (delete_session API).
Expand Down
Loading