diff --git a/frontend/app/view/term/term.tsx b/frontend/app/view/term/term.tsx index 4d30aa5c7..4b8ebc04d 100644 --- a/frontend/app/view/term/term.tsx +++ b/frontend/app/view/term/term.tsx @@ -26,7 +26,7 @@ import { } from "@/store/global"; import * as services from "@/store/services"; import * as keyutil from "@/util/keyutil"; -import { boundNumber, fireAndForget, stringToBase64, useAtomValueSafe } from "@/util/util"; +import { boundNumber, fireAndForget, getNextActionId, stringToBase64, useAtomValueSafe } from "@/util/util"; import { computeBgStyleFromMeta } from "@/util/waveutil"; import { ISearchOptions } from "@xterm/addon-search"; import clsx from "clsx"; @@ -350,7 +350,13 @@ class TermViewModel implements ViewModel { sendDataToController(data: string) { const b64data = stringToBase64(data); - RpcApi.ControllerInputCommand(TabRpcClient, { blockid: this.blockId, inputdata64: b64data }); + const actionId = getNextActionId(); + RpcApi.ControllerInputCommand(TabRpcClient, { + blockid: this.blockId, + inputdata64: b64data, + feactionid: actionId, + pendingptyoffset: this.termRef.current.pendingPtyOffset, + }); } setTermMode(mode: "term" | "vdom") { @@ -980,12 +986,7 @@ const TerminalView = ({ blockId, model }: ViewComponentProps) => if (blockData?.meta?.["term:scrollback"]) { termScrollback = Math.floor(blockData.meta["term:scrollback"]); } - if (termScrollback < 0) { - termScrollback = 0; - } - if (termScrollback > 50000) { - termScrollback = 50000; - } + termScrollback = boundNumber(termScrollback, 0, 50000); const termAllowBPM = globalStore.get(termBPMAtom) ?? false; const wasFocused = model.termRef.current != null && globalStore.get(model.nodeModel.isFocused); const termWrap = new TermWrap( diff --git a/frontend/app/view/term/termwrap.ts b/frontend/app/view/term/termwrap.ts index 46089c4aa..3cca779b6 100644 --- a/frontend/app/view/term/termwrap.ts +++ b/frontend/app/view/term/termwrap.ts @@ -2,12 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 import { getFileSubject } from "@/app/store/wps"; -import { sendWSCommand } from "@/app/store/ws"; import { RpcApi } from "@/app/store/wshclientapi"; import { TabRpcClient } from "@/app/store/wshrpcutil"; import { PLATFORM, WOS, atoms, fetchWaveFile, getSettingsKeyAtom, globalStore, openLink } from "@/store/global"; import * as services from "@/store/services"; -import { base64ToArray, fireAndForget } from "@/util/util"; +import { base64ToArray, fireAndForget, getNextActionId } from "@/util/util"; import { SearchAddon } from "@xterm/addon-search"; import { SerializeAddon } from "@xterm/addon-serialize"; import { WebLinksAddon } from "@xterm/addon-web-links"; @@ -136,6 +135,7 @@ function handleOsc7Command(data: string, blockId: string, loaded: boolean): bool export class TermWrap { blockId: string; ptyOffset: number; + pendingPtyOffset: number; dataBytesProcessed: number; terminal: Terminal; connectElem: HTMLDivElement; @@ -147,6 +147,7 @@ export class TermWrap { heldData: Uint8Array[]; handleResize_debounced: () => void; hasResized: boolean; + isLoadingCache: boolean; multiInputCallback: (data: string) => void; sendDataHandler: (data: string) => void; onSearchResultsDidChange?: (result: { resultIndex: number; resultCount: number }) => void; @@ -159,6 +160,7 @@ export class TermWrap { waveOptions: TermWrapOptions ) { this.loaded = false; + this.isLoadingCache = false; this.blockId = blockId; this.sendDataHandler = waveOptions.sendDataHandler; this.ptyOffset = 0; @@ -259,6 +261,9 @@ export class TermWrap { } handleTermData(data: string) { + if (this.isLoadingCache) { + return; + } if (!this.loaded) { return; } @@ -297,11 +302,14 @@ export class TermWrap { let prtn = new Promise((presolve, _) => { resolve = presolve; }); + if (setPtyOffset != null) { + this.pendingPtyOffset = setPtyOffset; + } else { + this.pendingPtyOffset = this.ptyOffset + data.length; + } this.terminal.write(data, () => { - if (setPtyOffset != null) { - this.ptyOffset = setPtyOffset; - } else { - this.ptyOffset += data.length; + this.ptyOffset = this.pendingPtyOffset; + if (setPtyOffset == null) { this.dataBytesProcessed += data.length; } resolve(); @@ -316,20 +324,25 @@ export class TermWrap { if (cacheFile != null) { ptyOffset = cacheFile.meta["ptyoffset"] ?? 0; if (cacheData.byteLength > 0) { - const curTermSize: TermSize = { rows: this.terminal.rows, cols: this.terminal.cols }; - const fileTermSize: TermSize = cacheFile.meta["termsize"]; - let didResize = false; - if ( - fileTermSize != null && - (fileTermSize.rows != curTermSize.rows || fileTermSize.cols != curTermSize.cols) - ) { - console.log("terminal restore size mismatch, temp resize", fileTermSize, curTermSize); - this.terminal.resize(fileTermSize.cols, fileTermSize.rows); - didResize = true; - } - this.doTerminalWrite(cacheData, ptyOffset); - if (didResize) { - this.terminal.resize(curTermSize.cols, curTermSize.rows); + try { + this.isLoadingCache = true; + const curTermSize: TermSize = { rows: this.terminal.rows, cols: this.terminal.cols }; + const fileTermSize: TermSize = cacheFile.meta["termsize"]; + let didResize = false; + if ( + fileTermSize != null && + (fileTermSize.rows != curTermSize.rows || fileTermSize.cols != curTermSize.cols) + ) { + console.log("terminal restore size mismatch, temp resize", fileTermSize, curTermSize); + this.terminal.resize(fileTermSize.cols, fileTermSize.rows); + didResize = true; + } + await this.doTerminalWrite(cacheData, ptyOffset); + if (didResize) { + this.terminal.resize(curTermSize.cols, curTermSize.rows); + } + } finally { + this.isLoadingCache = false; } } } @@ -363,12 +376,13 @@ export class TermWrap { this.fitAddon.fit(); if (oldRows !== this.terminal.rows || oldCols !== this.terminal.cols) { const termSize: TermSize = { rows: this.terminal.rows, cols: this.terminal.cols }; - const wsCommand: SetBlockTermSizeWSCommand = { - wscommand: "setblocktermsize", + const actionId = getNextActionId(); + RpcApi.ControllerInputCommand(TabRpcClient, { blockid: this.blockId, + feactionid: actionId, termsize: termSize, - }; - sendWSCommand(wsCommand); + pendingptyoffset: this.pendingPtyOffset, + }); } dlog("resize", `${this.terminal.rows}x${this.terminal.cols}`, `${oldRows}x${oldCols}`, this.hasResized); if (!this.hasResized) { diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 2570f4b4d..e76a11a5c 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -77,13 +77,6 @@ declare global { files: FileInfo[]; }; - // webcmd.BlockInputWSCommand - type BlockInputWSCommand = { - wscommand: "blockinput"; - blockid: string; - inputdata64: string; - }; - // waveobj.Client type Client = WaveObj & { windowids: string[]; @@ -123,6 +116,8 @@ declare global { blockid: string; inputdata64?: string; signame?: string; + pendingptyoffset?: number; + feactionid?: string; termsize?: TermSize; }; @@ -668,13 +663,6 @@ declare global { winsize?: WinSize; }; - // webcmd.SetBlockTermSizeWSCommand - type SetBlockTermSizeWSCommand = { - wscommand: "setblocktermsize"; - blockid: string; - termsize: TermSize; - }; - // wconfig.SettingsType type SettingsType = { "app:*"?: boolean; @@ -1131,7 +1119,7 @@ declare global { type WSCommandType = { wscommand: string; - } & ( SetBlockTermSizeWSCommand | BlockInputWSCommand | WSRpcCommand ); + } & ( WSRpcCommand ); // eventbus.WSEventType type WSEventType = { diff --git a/frontend/util/util.ts b/frontend/util/util.ts index 04b581521..de2b5d80f 100644 --- a/frontend/util/util.ts +++ b/frontend/util/util.ts @@ -302,6 +302,24 @@ function makeConnRoute(conn: string): string { return "conn:" + conn; } +let lastTimestamp = 0; +let counter = 0; + +// guaranteed to be monotonically increasing for each call within the same tab +function getNextActionId(): string { + const now = Date.now(); + + if (now === lastTimestamp) { + counter += 1; + } else { + lastTimestamp = now; + counter = 0; + } + + const paddedCounter = String(counter).padStart(5, "0"); + return `${now}:${paddedCounter}`; +} + function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } @@ -409,6 +427,7 @@ export { countGraphemes, deepCompareReturnPrev, fireAndForget, + getNextActionId, getPrefixedSettings, getPromiseState, getPromiseValue, diff --git a/pkg/blockcontroller/blockcontroller.go b/pkg/blockcontroller/blockcontroller.go index d7363af45..ea5b3d904 100644 --- a/pkg/blockcontroller/blockcontroller.go +++ b/pkg/blockcontroller/blockcontroller.go @@ -68,24 +68,28 @@ var globalLock = &sync.Mutex{} var blockControllerMap = make(map[string]*BlockController) type BlockInputUnion struct { - InputData []byte `json:"inputdata,omitempty"` - SigName string `json:"signame,omitempty"` - TermSize *waveobj.TermSize `json:"termsize,omitempty"` + InputData []byte `json:"inputdata,omitempty"` + SigName string `json:"signame,omitempty"` + TermSize *waveobj.TermSize `json:"termsize,omitempty"` + FeActionId string `json:"feactionid,omitempty"` + PtyProcessedToOffset int64 `json:"ptyprocessedtooffset,omitempty"` } type BlockController struct { - Lock *sync.Mutex - ControllerType string - TabId string - BlockId string - BlockDef *waveobj.BlockDef - CreatedHtmlFile bool - ShellProc *shellexec.ShellProc - ShellInputCh chan *BlockInputUnion - ShellProcStatus string - ShellProcExitCode int - RunLock *atomic.Bool - StatusVersion int + Lock *sync.Mutex + ControllerType string + TabId string + BlockId string + BlockDef *waveobj.BlockDef + CreatedHtmlFile bool + ShellProc *shellexec.ShellProc + ShellInputCh chan *BlockInputUnion + ShellProcStatus string + ShellProcExitCode int + RunLock *atomic.Bool + StatusVersion int + ProcessedToOffset *atomic.Int64 + LastResizeActionId string } type BlockControllerRuntimeStatus struct { @@ -123,6 +127,16 @@ func (bc *BlockController) getShellProc() *shellexec.ShellProc { return bc.ShellProc } +func (bc *BlockController) TestAndSetResizeActionId(actionId string) bool { + bc.Lock.Lock() + defer bc.Lock.Unlock() + if actionId <= bc.LastResizeActionId { + return false + } + bc.LastResizeActionId = actionId + return true +} + type RunShellOpts struct { TermSize waveobj.TermSize `json:"termsize,omitempty"` } @@ -731,7 +745,23 @@ func (bc *BlockController) manageRunningShellProcess(shellProc *shellexec.ShellP shellProc.Cmd.Write(ic.InputData) } if ic.TermSize != nil { - updateTermSize(shellProc, bc.BlockId, *ic.TermSize) + ok := bc.TestAndSetResizeActionId(ic.FeActionId) + if ok { + updateTermSize(shellProc, bc.BlockId, *ic.TermSize) + } else { + log.Printf("resize action id already processed or out of order: %s %s %v\n", bc.BlockId, ic.FeActionId, *ic.TermSize) + } + } + if ic.PtyProcessedToOffset != 0 { + for { + curOffset := bc.ProcessedToOffset.Load() + if ic.PtyProcessedToOffset <= curOffset { + break + } + if bc.ProcessedToOffset.CompareAndSwap(curOffset, ic.PtyProcessedToOffset) { + break + } + } } } }() @@ -997,12 +1027,13 @@ func getOrCreateBlockController(tabId string, blockId string, controllerName str bc = blockControllerMap[blockId] if bc == nil { bc = &BlockController{ - Lock: &sync.Mutex{}, - ControllerType: controllerName, - TabId: tabId, - BlockId: blockId, - ShellProcStatus: Status_Init, - RunLock: &atomic.Bool{}, + Lock: &sync.Mutex{}, + ControllerType: controllerName, + TabId: tabId, + BlockId: blockId, + ShellProcStatus: Status_Init, + RunLock: &atomic.Bool{}, + ProcessedToOffset: &atomic.Int64{}, } blockControllerMap[blockId] = bc createdController = true diff --git a/pkg/web/webcmd/webcmd.go b/pkg/web/webcmd/webcmd.go index bf732de0c..7f21c2f90 100644 --- a/pkg/web/webcmd/webcmd.go +++ b/pkg/web/webcmd/webcmd.go @@ -9,14 +9,11 @@ import ( "github.com/wavetermdev/waveterm/pkg/tsgen/tsgenmeta" "github.com/wavetermdev/waveterm/pkg/util/utilfn" - "github.com/wavetermdev/waveterm/pkg/waveobj" "github.com/wavetermdev/waveterm/pkg/wshutil" ) const ( - WSCommand_SetBlockTermSize = "setblocktermsize" - WSCommand_BlockInput = "blockinput" - WSCommand_Rpc = "rpc" + WSCommand_Rpc = "rpc" ) type WSCommandType interface { @@ -28,8 +25,6 @@ func WSCommandTypeUnionMeta() tsgenmeta.TypeUnionMeta { BaseType: reflect.TypeOf((*WSCommandType)(nil)).Elem(), TypeFieldName: "wscommand", Types: []reflect.Type{ - reflect.TypeOf(SetBlockTermSizeWSCommand{}), - reflect.TypeOf(BlockInputWSCommand{}), reflect.TypeOf(WSRpcCommand{}), }, } @@ -44,46 +39,12 @@ func (cmd *WSRpcCommand) GetWSCommand() string { return cmd.WSCommand } -type SetBlockTermSizeWSCommand struct { - WSCommand string `json:"wscommand" tstype:"\"setblocktermsize\""` - BlockId string `json:"blockid"` - TermSize waveobj.TermSize `json:"termsize"` -} - -func (cmd *SetBlockTermSizeWSCommand) GetWSCommand() string { - return cmd.WSCommand -} - -type BlockInputWSCommand struct { - WSCommand string `json:"wscommand" tstype:"\"blockinput\""` - BlockId string `json:"blockid"` - InputData64 string `json:"inputdata64"` -} - -func (cmd *BlockInputWSCommand) GetWSCommand() string { - return cmd.WSCommand -} - func ParseWSCommandMap(cmdMap map[string]any) (WSCommandType, error) { cmdType, ok := cmdMap["wscommand"].(string) if !ok { return nil, fmt.Errorf("no wscommand field in command map") } switch cmdType { - case WSCommand_SetBlockTermSize: - var cmd SetBlockTermSizeWSCommand - err := utilfn.DoMapStructure(&cmd, cmdMap) - if err != nil { - return nil, fmt.Errorf("error decoding SetBlockTermSizeWSCommand: %w", err) - } - return &cmd, nil - case WSCommand_BlockInput: - var cmd BlockInputWSCommand - err := utilfn.DoMapStructure(&cmd, cmdMap) - if err != nil { - return nil, fmt.Errorf("error decoding BlockInputWSCommand: %w", err) - } - return &cmd, nil case WSCommand_Rpc: var cmd WSRpcCommand err := utilfn.DoMapStructure(&cmd, cmdMap) diff --git a/pkg/web/ws.go b/pkg/web/ws.go index 41fa916fd..eaa294be6 100644 --- a/pkg/web/ws.go +++ b/pkg/web/ws.go @@ -19,7 +19,6 @@ import ( "github.com/wavetermdev/waveterm/pkg/eventbus" "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/web/webcmd" - "github.com/wavetermdev/waveterm/pkg/wshrpc" "github.com/wavetermdev/waveterm/pkg/wshutil" ) @@ -97,40 +96,6 @@ func processWSCommand(jmsg map[string]any, outputCh chan any, rpcInputCh chan [] return } switch cmd := wsCommand.(type) { - case *webcmd.SetBlockTermSizeWSCommand: - data := wshrpc.CommandBlockInputData{ - BlockId: cmd.BlockId, - TermSize: &cmd.TermSize, - } - rpcMsg := wshutil.RpcMessage{ - Command: wshrpc.Command_ControllerInput, - Data: data, - } - msgBytes, err := json.Marshal(rpcMsg) - if err != nil { - // this really should never fail since we just unmarshalled this value - log.Printf("[websocket] error marshalling rpc message: %v\n", err) - return - } - rpcInputCh <- msgBytes - - case *webcmd.BlockInputWSCommand: - data := wshrpc.CommandBlockInputData{ - BlockId: cmd.BlockId, - InputData64: cmd.InputData64, - } - rpcMsg := wshutil.RpcMessage{ - Command: wshrpc.Command_ControllerInput, - Data: data, - } - msgBytes, err := json.Marshal(rpcMsg) - if err != nil { - // this really should never fail since we just unmarshalled this value - log.Printf("[websocket] error marshalling rpc message: %v\n", err) - return - } - rpcInputCh <- msgBytes - case *webcmd.WSRpcCommand: rpcMsg := cmd.Message if rpcMsg == nil { diff --git a/pkg/wshrpc/wshrpctypes.go b/pkg/wshrpc/wshrpctypes.go index 5d5e3ec52..9c14f4f61 100644 --- a/pkg/wshrpc/wshrpctypes.go +++ b/pkg/wshrpc/wshrpctypes.go @@ -390,10 +390,12 @@ type CommandControllerAppendOutputData struct { } type CommandBlockInputData struct { - BlockId string `json:"blockid" wshcontext:"BlockId"` - InputData64 string `json:"inputdata64,omitempty"` - SigName string `json:"signame,omitempty"` - TermSize *waveobj.TermSize `json:"termsize,omitempty"` + BlockId string `json:"blockid" wshcontext:"BlockId"` + InputData64 string `json:"inputdata64,omitempty"` + SigName string `json:"signame,omitempty"` + PendingPtyOffset int64 `json:"pendingptyoffset,omitempty"` + FeActionId string `json:"feactionid,omitempty"` + TermSize *waveobj.TermSize `json:"termsize,omitempty"` } type FileDataAt struct { diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index 3a2e8768f..470a61562 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -321,8 +321,9 @@ func (ws *WshServer) ControllerInputCommand(ctx context.Context, data wshrpc.Com return fmt.Errorf("block controller not found for block %q", data.BlockId) } inputUnion := &blockcontroller.BlockInputUnion{ - SigName: data.SigName, - TermSize: data.TermSize, + SigName: data.SigName, + TermSize: data.TermSize, + FeActionId: data.FeActionId, } if len(data.InputData64) > 0 { inputBuf := make([]byte, base64.StdEncoding.DecodedLen(len(data.InputData64)))