Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions packages/opencode/src/effect/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ export interface Runner<A, E = never> {

export class Cancelled extends Schema.TaggedErrorClass<Cancelled>()("RunnerCancelled", {}) {}

// kilocode_change start - identify process signal failures nested in platform errors
const signal = "Process interrupted due to receipt of signal"
const killed = (value: unknown): boolean => {
if (value instanceof Error && value.message.includes(signal)) return true
if (typeof value !== "object" || value === null) return false
const data = value as { cause?: unknown; message?: unknown }
if (typeof data.message === "string" && data.message.includes(signal)) return true
return killed(data.cause)
}
// kilocode_change end

interface RunHandle<A, E> {
id: number
done: Deferred.Deferred<A, E | Cancelled>
Expand All @@ -19,6 +30,9 @@ interface RunHandle<A, E> {
interface ShellHandle<A, E> {
id: number
fiber: Fiber.Fiber<A, E>
// kilocode_change start - shell cancellation can finish with non-interrupt process errors
cancelled: boolean
// kilocode_change end
}

interface PendingHandle<A, E> {
Expand Down Expand Up @@ -98,7 +112,12 @@ export const make = <A, E = never>(
}),
).pipe(Effect.flatten)

const stopShell = (shell: ShellHandle<A, E>) => Fiber.interrupt(shell.fiber)
// kilocode_change start - wait for shell finalizers so aborted output is persisted before callers resolve
const stopShell = (shell: ShellHandle<A, E>) =>
Effect.sync(() => {
shell.cancelled = true
}).pipe(Effect.andThen(Fiber.interrupt(shell.fiber)), Effect.andThen(Fiber.await(shell.fiber)), Effect.asVoid)
// kilocode_change end

const ensureRunning = (work: Effect.Effect<A, E>) =>
SynchronizedRef.modifyEffect(
Expand Down Expand Up @@ -146,14 +165,28 @@ export const make = <A, E = never>(
yield* busy
const id = next()
const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
const shell = { id, fiber } satisfies ShellHandle<A, E>
const shell = { id, fiber, cancelled: false } satisfies ShellHandle<A, E> // kilocode_change
return [
Effect.gen(function* () {
const exit = yield* Fiber.await(fiber)
if (Exit.isSuccess(exit)) return exit.value
if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt
return yield* Effect.failCause(exit.cause)
}),
// kilocode_change start - cancelled shells may fail with process-signal errors after cleanup
Effect.uninterruptible(
Effect.gen(function* () {
const exit = yield* Fiber.await(fiber)
if (Exit.isSuccess(exit)) return exit.value
const ok =
exit.cause.reasons.length > 0 &&
exit.cause.reasons.every((reason) => {
if (Cause.isInterruptReason(reason)) return true
if (Cause.isFailReason(reason)) return killed(reason.error)
if (Cause.isDieReason(reason)) return killed(reason.defect)
return false
})
if ((Cause.hasInterruptsOnly(exit.cause) || (shell.cancelled && ok)) && onInterrupt) {
return yield* Effect.uninterruptible(onInterrupt)
}
return yield* Effect.failCause(exit.cause)
}),
),
// kilocode_change end
{ _tag: "Shell", shell },
] as const
}),
Expand Down Expand Up @@ -183,8 +216,10 @@ export const make = <A, E = never>(
case "ShellThenRun":
return [
Effect.gen(function* () {
yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
// kilocode_change start - let shell cleanup persist before queued loop resolves via onInterrupt
yield* stopShell(st.shell)
yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
// kilocode_change end
yield* idleIfCurrent()
}),
{ _tag: "Idle" } as const,
Expand Down
Loading
Loading