diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts index 925c268f8e0..2c4eede03bf 100644 --- a/packages/opencode/src/effect/runner.ts +++ b/packages/opencode/src/effect/runner.ts @@ -10,6 +10,17 @@ export interface Runner { export class Cancelled extends Schema.TaggedErrorClass()("RunnerCancelled", {}) {} +// kilocode_change start - identify process signal failures nested in platform errors +const signal = "Process interrupted due to receipt of signal" +const killed = (value: unknown): boolean => { + if (value instanceof Error && value.message.includes(signal)) return true + if (typeof value !== "object" || value === null) return false + const data = value as { cause?: unknown; message?: unknown } + if (typeof data.message === "string" && data.message.includes(signal)) return true + return killed(data.cause) +} +// kilocode_change end + interface RunHandle { id: number done: Deferred.Deferred @@ -19,6 +30,9 @@ interface RunHandle { interface ShellHandle { id: number fiber: Fiber.Fiber + // kilocode_change start - shell cancellation can finish with non-interrupt process errors + cancelled: boolean + // kilocode_change end } interface PendingHandle { @@ -98,7 +112,12 @@ export const make = ( }), ).pipe(Effect.flatten) - const stopShell = (shell: ShellHandle) => Fiber.interrupt(shell.fiber) + // kilocode_change start - wait for shell finalizers so aborted output is persisted before callers resolve + const stopShell = (shell: ShellHandle) => + Effect.sync(() => { + shell.cancelled = true + }).pipe(Effect.andThen(Fiber.interrupt(shell.fiber)), Effect.andThen(Fiber.await(shell.fiber)), Effect.asVoid) + // kilocode_change end const ensureRunning = (work: Effect.Effect) => SynchronizedRef.modifyEffect( @@ -146,14 +165,28 @@ export const make = ( yield* busy const id = next() const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild) - const shell = { id, fiber } satisfies ShellHandle + const shell = { id, fiber, cancelled: false } satisfies ShellHandle // kilocode_change return [ - Effect.gen(function* () { - const exit = yield* Fiber.await(fiber) - if (Exit.isSuccess(exit)) return exit.value - if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt - return yield* Effect.failCause(exit.cause) - }), + // kilocode_change start - cancelled shells may fail with process-signal errors after cleanup + Effect.uninterruptible( + Effect.gen(function* () { + const exit = yield* Fiber.await(fiber) + if (Exit.isSuccess(exit)) return exit.value + const ok = + exit.cause.reasons.length > 0 && + exit.cause.reasons.every((reason) => { + if (Cause.isInterruptReason(reason)) return true + if (Cause.isFailReason(reason)) return killed(reason.error) + if (Cause.isDieReason(reason)) return killed(reason.defect) + return false + }) + if ((Cause.hasInterruptsOnly(exit.cause) || (shell.cancelled && ok)) && onInterrupt) { + return yield* Effect.uninterruptible(onInterrupt) + } + return yield* Effect.failCause(exit.cause) + }), + ), + // kilocode_change end { _tag: "Shell", shell }, ] as const }), @@ -183,8 +216,10 @@ export const make = ( case "ShellThenRun": return [ Effect.gen(function* () { - yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid) + // kilocode_change start - let shell cleanup persist before queued loop resolves via onInterrupt yield* stopShell(st.shell) + yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid) + // kilocode_change end yield* idleIfCurrent() }), { _tag: "Idle" } as const, diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index e43cac7a26b..6550051f6dc 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -76,6 +76,17 @@ export const shouldAskPlanFollowup = KiloSessionPrompt.shouldAskPlanFollowup const log = Log.create({ service: "session.prompt" }) const elog = EffectLogger.create({ service: "session.prompt" }) +// kilocode_change start - identify process signal failures nested in platform errors +const signal = "Process interrupted due to receipt of signal" +const killed = (value: unknown): boolean => { + if (value instanceof Error && value.message.includes(signal)) return true + if (typeof value !== "object" || value === null) return false + const data = value as { cause?: unknown; message?: unknown } + if (typeof data.message === "string" && data.message.includes(signal)) return true + return killed(data.cause) +} +// kilocode_change end + export interface Interface { readonly cancel: (sessionID: SessionID) => Effect.Effect readonly prompt: (input: PromptInput) => Effect.Effect @@ -742,184 +753,209 @@ NOTE: At any point in time through this workflow you should feel free to ask the } satisfies MessageV2.TextPart) }) + // kilocode_change start - persist shell messages before honoring cancellation const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) { - const ctx = yield* InstanceState.context - const run = yield* runner() - const session = yield* sessions.get(input.sessionID) - if (session.revert) { - yield* revert.cleanup(session) - } - const agent = yield* agents.get(input.agent) - if (!agent) { - const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name) - const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" - const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` }) - yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() }) - throw error - } - const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID)) - const userMsg: MessageV2.User = { - id: input.messageID ?? MessageID.ascending(), - sessionID: input.sessionID, - time: { created: Date.now() }, - role: "user", - agent: input.agent, - model: { providerID: model.providerID, modelID: model.modelID }, - } - yield* sessions.updateMessage(userMsg) - const userPart: MessageV2.Part = { - type: "text", - id: PartID.ascending(), - messageID: userMsg.id, - sessionID: input.sessionID, - text: "The following tool was executed by the user", - synthetic: true, - } - yield* sessions.updatePart(userPart) - - const msg: MessageV2.Assistant = { - id: MessageID.ascending(), - sessionID: input.sessionID, - parentID: userMsg.id, - mode: input.agent, - agent: input.agent, - cost: 0, - path: { cwd: ctx.directory, root: ctx.worktree }, - time: { created: Date.now() }, - role: "assistant", - tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, - modelID: model.modelID, - providerID: model.providerID, - } - yield* sessions.updateMessage(msg) - const part: MessageV2.ToolPart = { - type: "tool", - id: PartID.ascending(), - messageID: msg.id, - sessionID: input.sessionID, - tool: "bash", - callID: ulid(), - state: { - status: "running", - time: { start: Date.now() }, - input: { command: input.command }, - }, - } - yield* sessions.updatePart(part) + return yield* Effect.uninterruptibleMask((restore) => + Effect.gen(function* () { + const ctx = yield* restore(InstanceState.context) + const run = yield* restore(runner()) + const session = yield* restore(sessions.get(input.sessionID)) + if (session.revert) { + yield* restore(revert.cleanup(session)) + } + const agent = yield* restore(agents.get(input.agent)) + if (!agent) { + const available = (yield* restore(agents.list())).filter((a) => !a.hidden).map((a) => a.name) + const hint = available.length ? ` Available agents: ${available.join(", ")}` : "" + const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` }) + yield* restore(bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() })) + throw error + } + const model = input.model ?? agent.model ?? (yield* restore(lastModel(input.sessionID))) + const userMsg: MessageV2.User = { + id: input.messageID ?? MessageID.ascending(), + sessionID: input.sessionID, + time: { created: Date.now() }, + role: "user", + agent: input.agent, + model: { providerID: model.providerID, modelID: model.modelID }, + } + yield* sessions.updateMessage(userMsg) + const userPart: MessageV2.Part = { + type: "text", + id: PartID.ascending(), + messageID: userMsg.id, + sessionID: input.sessionID, + text: "The following tool was executed by the user", + synthetic: true, + } + yield* sessions.updatePart(userPart) - const sh = Shell.preferred() - const shellName = ( - process.platform === "win32" ? path.win32.basename(sh, ".exe") : path.basename(sh) - ).toLowerCase() - const cwd = ctx.directory - const invocations: Record = { - nu: { args: ["-c", input.command] }, - fish: { args: ["-c", input.command] }, - zsh: { - args: [ - "-l", - "-c", - ` + const msg: MessageV2.Assistant = { + id: MessageID.ascending(), + sessionID: input.sessionID, + parentID: userMsg.id, + mode: input.agent, + agent: input.agent, + cost: 0, + path: { cwd: ctx.directory, root: ctx.worktree }, + time: { created: Date.now() }, + role: "assistant", + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + modelID: model.modelID, + providerID: model.providerID, + } + yield* sessions.updateMessage(msg) + const part: MessageV2.ToolPart = { + type: "tool", + id: PartID.ascending(), + messageID: msg.id, + sessionID: input.sessionID, + tool: "bash", + callID: ulid(), + state: { + status: "running", + time: { start: Date.now() }, + input: { command: input.command }, + }, + } + yield* sessions.updatePart(part) + + const sh = Shell.preferred() + const shellName = ( + process.platform === "win32" ? path.win32.basename(sh, ".exe") : path.basename(sh) + ).toLowerCase() + const cwd = ctx.directory + const invocations: Record = { + nu: { args: ["-c", input.command] }, + fish: { args: ["-c", input.command] }, + zsh: { + args: [ + "-l", + "-c", + ` [[ -f ~/.zshenv ]] && source ~/.zshenv >/dev/null 2>&1 || true [[ -f "\${ZDOTDIR:-$HOME}/.zshrc" ]] && source "\${ZDOTDIR:-$HOME}/.zshrc" >/dev/null 2>&1 || true cd -- "$1" eval ${JSON.stringify(input.command)} `, - "opencode", - cwd, - ], - }, - bash: { - args: [ - "-l", - "-c", - ` + "opencode", + cwd, + ], + }, + bash: { + args: [ + "-l", + "-c", + ` shopt -s expand_aliases [[ -f ~/.bashrc ]] && source ~/.bashrc >/dev/null 2>&1 || true cd -- "$1" eval ${JSON.stringify(input.command)} `, - "opencode", + "opencode", + cwd, + ], + }, + cmd: { args: ["/c", input.command] }, + powershell: { args: ["-NoProfile", "-Command", input.command] }, + pwsh: { args: ["-NoProfile", "-Command", input.command] }, + "": { args: ["-c", input.command] }, + } + + const args = (invocations[shellName] ?? invocations[""]).args + const shellEnv = yield* restore( + plugin.trigger("shell.env", { cwd, sessionID: input.sessionID, callID: part.callID }, { env: {} }), + ) + + const cmd = ChildProcess.make(sh, args, { cwd, - ], - }, - cmd: { args: ["/c", input.command] }, - powershell: { args: ["-NoProfile", "-Command", input.command] }, - pwsh: { args: ["-NoProfile", "-Command", input.command] }, - "": { args: ["-c", input.command] }, - } + extendEnv: true, + env: { ...shellEnv.env, TERM: "dumb" }, + stdin: "ignore", + forceKillAfter: "3 seconds", + }) - const args = (invocations[shellName] ?? invocations[""]).args - const shellEnv = yield* plugin.trigger( - "shell.env", - { cwd, sessionID: input.sessionID, callID: part.callID }, - { env: {} }, - ) + let output = "" + let aborted = false - const cmd = ChildProcess.make(sh, args, { - cwd, - extendEnv: true, - env: { ...shellEnv.env, TERM: "dumb" }, - stdin: "ignore", - forceKillAfter: "3 seconds", - }) + const finish = Effect.uninterruptible( + Effect.gen(function* () { + if (aborted) { + output += "\n\n" + ["", "User aborted the command", ""].join("\n") + } + if (!msg.time.completed) { + msg.time.completed = Date.now() + yield* sessions.updateMessage(msg) + } + if (part.state.status === "running") { + part.state = { + status: "completed", + time: { ...part.state.time, end: Date.now() }, + input: part.state.input, + title: "", + metadata: { output, description: "" }, + output, + } + yield* sessions.updatePart(part) + } + }), + ) - let output = "" - let aborted = false + const exit = yield* Effect.acquireUseRelease( + Effect.void, + () => + restore( + Effect.gen(function* () { + const handle = yield* spawner.spawn(cmd) + yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) => + Effect.sync(() => { + output += chunk + if (part.state.status === "running") { + part.state.metadata = { output, description: "" } + void run.fork(sessions.updatePart(part)) + } + }), + ) + yield* handle.exitCode + }).pipe( + Effect.scoped, + Effect.onInterrupt(() => + Effect.sync(() => { + aborted = true + }), + ), + Effect.orDie, + Effect.exit, + ), + ), + (_resource, exit) => + Effect.gen(function* () { + if (!aborted && Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) { + aborted = true + } + yield* finish + }), + ) - const finish = Effect.uninterruptible( - Effect.gen(function* () { - if (aborted) { - output += "\n\n" + ["", "User aborted the command", ""].join("\n") - } - if (!msg.time.completed) { - msg.time.completed = Date.now() - yield* sessions.updateMessage(msg) - } - if (part.state.status === "running") { - part.state = { - status: "completed", - time: { ...part.state.time, end: Date.now() }, - input: part.state.input, - title: "", - metadata: { output, description: "" }, - output, - } - yield* sessions.updatePart(part) + if (Exit.isFailure(exit)) { + const ok = + aborted && + exit.cause.reasons.length > 0 && + exit.cause.reasons.every((reason) => { + if (Cause.isInterruptReason(reason)) return true + if (Cause.isFailReason(reason)) return killed(reason.error) + if (Cause.isDieReason(reason)) return killed(reason.defect) + return false + }) + if (!ok) return yield* Effect.failCause(exit.cause) } - }), - ) - const exit = yield* Effect.gen(function* () { - const handle = yield* spawner.spawn(cmd) - yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) => - Effect.sync(() => { - output += chunk - if (part.state.status === "running") { - part.state.metadata = { output, description: "" } - void run.fork(sessions.updatePart(part)) - } - }), - ) - yield* handle.exitCode - }).pipe( - Effect.scoped, - Effect.onInterrupt(() => - Effect.sync(() => { - aborted = true - }), - ), - Effect.orDie, - Effect.ensuring(finish), - Effect.exit, + return { info: msg, parts: [part] } + }), ) - - if (Exit.isFailure(exit) && !Cause.hasInterruptsOnly(exit.cause)) { - return yield* Effect.failCause(exit.cause) - } - - return { info: msg, parts: [part] } }) + // kilocode_change end const getModel = Effect.fn("SessionPrompt.getModel")(function* ( providerID: ProviderID, @@ -1355,6 +1391,107 @@ NOTE: At any point in time through this workflow you should feel free to ask the throw new Error("Impossible") }) + // kilocode_change start - create an aborted shell record if cancellation beats setup + const shellInterrupt = Effect.fn("SessionPrompt.shellInterrupt")(function* (input: ShellInput) { + const id = input.messageID ?? MessageID.ascending() + const current = yield* sessions.findMessage( + input.sessionID, + (msg) => + msg.info.role === "assistant" && + msg.info.parentID === id && + msg.parts.some( + (part) => part.type === "tool" && part.tool === "bash" && part.state.input.command === input.command, + ), + ) + if (Option.isSome(current)) { + const msg = current.value.info + const part = current.value.parts.find( + (part) => part.type === "tool" && part.tool === "bash" && part.state.input.command === input.command, + ) + if (msg.role === "assistant" && !msg.time.completed) { + msg.time.completed = Date.now() + yield* sessions.updateMessage(msg) + } + if (part?.type === "tool" && (part.state.status === "pending" || part.state.status === "running")) { + const meta = ["", "User aborted the command", ""].join("\n") + const prior = + part.state.status === "running" && typeof part.state.metadata?.output === "string" + ? part.state.metadata.output + : "" + const output = prior ? `${prior}\n\n${meta}` : meta + const start = part.state.status === "running" ? part.state.time.start : Date.now() + const input = part.state.input + part.state = { + status: "completed", + time: { start, end: Date.now() }, + input, + title: "", + metadata: { output, description: "" }, + output, + } + yield* sessions.updatePart(part) + } + return current.value + } + + const ctx = yield* InstanceState.context + const fallback = { providerID: ProviderID.make("unknown"), modelID: ModelID.make("unknown") } + const model = input.model ?? (yield* provider.defaultModel().pipe(Effect.catchCause(() => Effect.succeed(fallback)))) + const userMsg: MessageV2.User = { + id, + sessionID: input.sessionID, + time: { created: Date.now() }, + role: "user", + agent: input.agent, + model: { providerID: model.providerID, modelID: model.modelID }, + } + yield* sessions.updateMessage(userMsg) + yield* sessions.updatePart({ + type: "text", + id: PartID.ascending(), + messageID: userMsg.id, + sessionID: input.sessionID, + text: "The following tool was executed by the user", + synthetic: true, + } satisfies MessageV2.Part) + + const msg: MessageV2.Assistant = { + id: MessageID.ascending(), + sessionID: input.sessionID, + parentID: userMsg.id, + mode: input.agent, + agent: input.agent, + cost: 0, + path: { cwd: ctx.directory, root: ctx.worktree }, + time: { created: Date.now(), completed: Date.now() }, + role: "assistant", + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + modelID: model.modelID, + providerID: model.providerID, + } + yield* sessions.updateMessage(msg) + const output = ["", "User aborted the command", ""].join("\n") + const part: MessageV2.ToolPart = { + type: "tool", + id: PartID.ascending(), + messageID: msg.id, + sessionID: input.sessionID, + tool: "bash", + callID: ulid(), + state: { + status: "completed", + time: { start: Date.now(), end: Date.now() }, + input: { command: input.command }, + title: "", + metadata: { output, description: "" }, + output, + }, + } + yield* sessions.updatePart(part) + return { info: msg, parts: [part] } + }) + // kilocode_change end + // kilocode_change — mutable close-reason per session, set by runLoop and read by loop const closeReasons = new Map() @@ -1676,7 +1813,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the const shell: (input: ShellInput) => Effect.Effect = Effect.fn("SessionPrompt.shell")( function* (input: ShellInput) { - return yield* state.startShell(input.sessionID, lastAssistant(input.sessionID), shellImpl(input)) + // kilocode_change start - share shell message id with cancellation fallback + const next = { ...input, messageID: input.messageID ?? MessageID.ascending() } + return yield* state.startShell(next.sessionID, shellInterrupt(next), shellImpl(next)) + // kilocode_change end }, ) diff --git a/packages/opencode/test/provider/provider.test.ts b/packages/opencode/test/provider/provider.test.ts index 060943f9bf3..974cea11c8e 100644 --- a/packages/opencode/test/provider/provider.test.ts +++ b/packages/opencode/test/provider/provider.test.ts @@ -56,6 +56,16 @@ async function defaultModel() { return run((provider) => provider.defaultModel()) } +// kilocode_change start - mirror upstream #24416 for @kilocode/plugin dependency readiness +async function markPluginDependenciesReady(dir: string) { + await mkdir(path.join(dir, "node_modules"), { recursive: true }) + await Bun.write( + path.join(dir, "package-lock.json"), + JSON.stringify({ packages: { "": { dependencies: { "@kilocode/plugin": "0.0.0" } } } }), + ) +} +// kilocode_change end + function paid(providers: Awaited>) { const item = providers[ProviderID.make("opencode")] if (!item) return 0 // kilocode_change - Kilo drops opencode provider without apiKey/auth @@ -2484,8 +2494,13 @@ test("cloudflare-ai-gateway forwards config metadata options", async () => { test("plugin config providers persist after instance dispose", async () => { await using tmp = await tmpdir({ init: async (dir) => { - const root = path.join(dir, ".opencode", "plugin") + // kilocode_change start - mirror upstream #24416 to avoid real plugin dependency installs + const cfg = path.join(dir, ".opencode") + const root = path.join(cfg, "plugin") await mkdir(root, { recursive: true }) + await markPluginDependenciesReady(cfg) + await markPluginDependenciesReady(Global.Path.config) + // kilocode_change end await Bun.write( path.join(root, "demo-provider.ts"), [ diff --git a/packages/shared/src/util/flock.ts b/packages/shared/src/util/flock.ts index 958bd9fd1da..5732c1b7fd6 100644 --- a/packages/shared/src/util/flock.ts +++ b/packages/shared/src/util/flock.ts @@ -106,6 +106,24 @@ export namespace Flock { return Math.max(0, ms + d) } + // kilocode_change start - tolerate transient Windows directory removal failures + function transient(err: unknown) { + const name = code(err) + return name === "EBUSY" || name === "EPERM" || name === "ENOTEMPTY" + } + + async function remove(dir: string, n = 0): Promise { + try { + await rm(dir, { recursive: true, force: true }) + return + } catch (err) { + if (!transient(err) || n >= 5) throw err + await sleep(25 * (n + 1)) + return remove(dir, n + 1) + } + } + // kilocode_change end + function mono() { return performance.now() } @@ -261,7 +279,7 @@ export namespace Flock { throw new Error("Refusing to release: lock token mismatch (not the owner).") } - await rm(lockDir, { recursive: true, force: true }) + await remove(lockDir) // kilocode_change } return {