From 33c898ff9a80201d5115cb7995aa3893619497c9 Mon Sep 17 00:00:00 2001 From: Stefano Fiorini Date: Wed, 20 May 2026 14:08:44 -0500 Subject: [PATCH] fix: use companion watcher process for async job completion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The async startJob previously relied on Node.js event listeners in the CLI process to capture child output and finalize the job file. But the CLI process exits immediately after returning the job ID, killing the event loop before the close handler fires — leaving jobs stuck at 'running' forever. Fix: startJob now spawns a companion watcher process (job-watcher.ts) that is itself detached and outlives the CLI. The watcher: - Spawns the actual client (codex/claude/opencode) - Captures stdout/stderr - Writes the final job record to disk on child exit - Has its own 10-minute timeout safety net The CLI returns the job ID immediately. The watcher independently finalizes the job. The CLI no longer needs to stay alive. Also updates tests to mock the watcher spawn via injectable spawnWatcher option. --- tools/ai-cli-dispatch/src/job-watcher.ts | 101 ++++++++++++ tools/ai-cli-dispatch/src/jobs.ts | 190 +++++++---------------- tools/ai-cli-dispatch/tests/jobs.test.ts | 111 ++++++++++++- 3 files changed, 265 insertions(+), 137 deletions(-) create mode 100644 tools/ai-cli-dispatch/src/job-watcher.ts diff --git a/tools/ai-cli-dispatch/src/job-watcher.ts b/tools/ai-cli-dispatch/src/job-watcher.ts new file mode 100644 index 0000000..c1c4a99 --- /dev/null +++ b/tools/ai-cli-dispatch/src/job-watcher.ts @@ -0,0 +1,101 @@ +/** + * Job watcher — a tiny self-contained process that monitors a detached + * child and writes the final job record to disk. + * + * Invoked as: node --import tsx src/job-watcher.ts ... + * + * The watcher is itself spawned as detached+unref'd by the CLI, so the CLI + * can return the job ID immediately while this process stays alive to capture + * the child's output and finalize the job file. + */ +import { spawn } from "node:child_process"; +import { readFileSync, writeFileSync } from "node:fs"; +import type { JobRecord, ExecResult, JobStatus } from "./types.js"; + +const jobFile = process.argv[2]; +const command = process.argv[3]; +const childArgs = process.argv.slice(4); + +if (!jobFile || !command) { + process.exit(1); +} + +let record: JobRecord; +try { + record = JSON.parse(readFileSync(jobFile, "utf-8")) as JobRecord; +} catch { + process.exit(1); +} + +const timeoutMs = 600_000; // 10 min default +let stdout = ""; +let stderr = ""; +let settled = false; +let timedOut = false; +const startMs = Date.now(); + +const child = spawn(command, childArgs, { + shell: false, + stdio: ["pipe", "pipe", "pipe"], +}); + +// Close stdin so clients like codex don't hang +child.stdin?.end(); + +// Update pid in job file +record.pid = child.pid ?? undefined; +writeFileSync(jobFile, JSON.stringify(record, null, 2)); + +child.stdout?.on("data", (chunk: Buffer | string) => { + stdout += chunk.toString(); +}); + +child.stderr?.on("data", (chunk: Buffer | string) => { + stderr += chunk.toString(); +}); + +const timeout = setTimeout(() => { + timedOut = true; + try { child.kill("SIGTERM"); } catch { /* ignore */ } +}, timeoutMs); + +function finalize(status: JobStatus, result?: ExecResult, error?: string) { + if (settled) return; + settled = true; + clearTimeout(timeout); + const completedAt = new Date().toISOString(); + const durationMs = Date.now() - startMs; + const finalRecord: JobRecord = { + ...record, + status, + stdout, + stderr, + result: result ? { ...result, durationMs } : undefined, + error, + completedAt, + }; + try { + writeFileSync(jobFile, JSON.stringify(finalRecord, null, 2)); + } catch { /* best effort */ } + process.exit(0); +} + +child.on("error", (err: NodeJS.ErrnoException) => { + finalize("failed", undefined, err.message); +}); + +child.on("close", (code: number | null) => { + if (timedOut) { + finalize("timed_out", { + stdout, stderr, exitCode: -1, client: record.client, durationMs: 0, + }); + } else if (code !== null && code !== 0) { + finalize("failed", { + stdout, stderr, exitCode: code, client: record.client, durationMs: 0, + }); + } else { + finalize("completed", { + stdout, stderr, exitCode: code ?? 0, client: record.client, durationMs: 0, + }); + } +}); diff --git a/tools/ai-cli-dispatch/src/jobs.ts b/tools/ai-cli-dispatch/src/jobs.ts index c5bfc36..22db581 100644 --- a/tools/ai-cli-dispatch/src/jobs.ts +++ b/tools/ai-cli-dispatch/src/jobs.ts @@ -18,9 +18,11 @@ export interface JobOperationsOptions { }; } -export interface StartJobOptions extends JobStartOptions, JobOperationsOptions {} - -const runningChildren = new Map; cancelled?: boolean }>(); +export interface StartJobOptions extends JobStartOptions, JobOperationsOptions { + /** Override the watcher spawn (for testing). When provided, startJob calls + * this instead of spawning `node --import tsx job-watcher.ts ...` */ + spawnWatcher?: (jobFilePath: string, command: string, args: string[]) => { pid?: number; unref?: () => void }; +} const DEFAULT_JOB_DIR = `${process.env.HOME || process.env.USERPROFILE}/.openclaw/ai-cli-dispatch/jobs`; @@ -70,7 +72,6 @@ export async function startJob( unlinkSync: defaultUnlinkSync, }; const spawnImpl = options.spawn ?? defaultSpawn; - const timeoutMs = options.timeoutMs ?? 600_000; const argBuilder = (CLIENT_ARGS as Record string[]>)[client]; if (!argBuilder) { @@ -87,12 +88,10 @@ export async function startJob( error: `Unknown client: ${client}`, }; writeJobFile(jobDir, errRecord, fs); - return new Promise((resolve) => - resolve({ id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error }) - ); + return { id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error }; } - const args = argBuilder(prompt); + const clientArgs = argBuilder(prompt); const command = options.clientPath ?? client; const startedAt = new Date().toISOString(); @@ -109,115 +108,51 @@ export async function startJob( writeJobFile(jobDir, record, fs); - return new Promise((resolve) => { - let stdout = ""; - let stderr = ""; - let settled = false; - let timedOut = false; + // Spawn a companion watcher process that outlives this CLI invocation. + // The watcher monitors the actual client (codex/claude/opencode), captures + // stdout/stderr, and writes the final job record to disk on exit. + // This allows the CLI to return the job ID immediately while the watcher + // stays alive to finalize the job. + let watcher: { pid?: number; unref?: () => void }; - const child = spawnImpl(command, args, { + if (options.spawnWatcher) { + // Test path: use the injected watcher mock + watcher = options.spawnWatcher(`${jobDir}/${jobId}.json`, command, clientArgs); + } else { + const watcherArgs = [ + "--import", "tsx", + new URL("./job-watcher.ts", import.meta.url).pathname, + `${jobDir}/${jobId}.json`, + command, + ...clientArgs, + ]; + watcher = spawnImpl("node", watcherArgs, { detached: true, shell: false, - stdio: ["pipe", "pipe", "pipe"], + stdio: "ignore", }); + watcher.unref?.(); + } - // Close stdin immediately so clients like codex don't hang waiting for input - child.stdin?.end(); + // Give the watcher a tick to spawn and record the real child PID + await new Promise((r) => setTimeout(r, 100)); - record.pid = child.pid ?? undefined; - writeJobFile(jobDir, record, fs); + // Re-read the job file to pick up the watcher's PID update + let updatedRecord: JobRecord; + try { + updatedRecord = JSON.parse(fs.readFileSync(`${jobDir}/${jobId}.json`, "utf-8")) as JobRecord; + } catch { + updatedRecord = record; + } - child.stdout?.on("data", (chunk: Buffer | string) => { - stdout += chunk.toString(); - }); - - child.stderr?.on("data", (chunk: Buffer | string) => { - stderr += chunk.toString(); - }); - - const timeout = setTimeout(() => { - timedOut = true; - try { - child.kill("SIGTERM"); - } catch { - // ignore - } - }, timeoutMs); - - runningChildren.set(jobId, { child, timeout }); - - function finalize(status: JobStatus, result?: ExecResult, error?: string) { - if (settled) return; - settled = true; - clearTimeout(timeout); - runningChildren.delete(jobId); - const completedAt = new Date().toISOString(); - const finalRecord: JobRecord = { - ...record, - status, - stdout, - stderr, - result, - error, - completedAt, - }; - writeJobFile(jobDir, finalRecord, fs); - } - - child.on("error", (err: NodeJS.ErrnoException) => { - finalize("failed", undefined, err.message); - }); - - child.on("close", (code: number | null, signal: NodeJS.Signals | null) => { - const entry = runningChildren.get(jobId); - if (!entry && settled) return; - - if (entry?.cancelled) { - finalize("cancelled"); - return; - } - - if (timedOut) { - const durationMs = Date.now() - new Date(record.startedAt).getTime(); - finalize("timed_out", { - stdout, - stderr, - exitCode: -1, - client, - durationMs, - }); - } else if (code !== null && code !== 0) { - const durationMs = Date.now() - new Date(record.startedAt).getTime(); - finalize("failed", { - stdout, - stderr, - exitCode: code, - client, - durationMs, - }); - } else { - const durationMs = Date.now() - new Date(record.startedAt).getTime(); - finalize("completed", { - stdout, - stderr, - exitCode: code ?? 0, - client, - durationMs, - }); - } - }); - - child.unref?.(); - - resolve({ - id: jobId, - client, - prompt, - status: "running", - startedAt, - pid: record.pid, - }); - }); + return { + id: jobId, + client, + prompt, + status: "running", + startedAt, + pid: updatedRecord.pid ?? watcher.pid ?? undefined, + }; } export function getJob(jobId: string, options: JobOperationsOptions = {}): Job { @@ -261,35 +196,22 @@ export function cancelJob(jobId: string, options: JobOperationsOptions = {}): vo return; } - const entry = runningChildren.get(jobId); - if (entry) { - entry.cancelled = true; - clearTimeout(entry.timeout); - try { - entry.child.kill("SIGTERM"); - } catch { - // ignore - } - } else if (record.pid) { + // Kill the client child process (PID recorded by the watcher) + if (record.pid) { try { process.kill(record.pid, "SIGTERM"); } catch { - // ignore + // ignore — process may have already exited } - const cancelledRecord: JobRecord = { - ...record, - status: "cancelled", - completedAt: new Date().toISOString(), - }; - writeJobFile(jobDir, cancelledRecord, fs); - } else { - const cancelledRecord: JobRecord = { - ...record, - status: "cancelled", - completedAt: new Date().toISOString(), - }; - writeJobFile(jobDir, cancelledRecord, fs); } + + // Update the job file to cancelled + const cancelledRecord: JobRecord = { + ...record, + status: "cancelled", + completedAt: new Date().toISOString(), + }; + writeJobFile(jobDir, cancelledRecord, fs); } export function listJobs(options: JobOperationsOptions & { filter?: JobStatus } = {}): Job[] { diff --git a/tools/ai-cli-dispatch/tests/jobs.test.ts b/tools/ai-cli-dispatch/tests/jobs.test.ts index 96999ad..82902f0 100644 --- a/tools/ai-cli-dispatch/tests/jobs.test.ts +++ b/tools/ai-cli-dispatch/tests/jobs.test.ts @@ -137,6 +137,95 @@ function mockSpawn(scenarios: Map): any { }; } +/** + * Creates a mock spawnWatcher that simulates the job-watcher.ts behavior: + * spawns the mock child, captures output, and writes the final job record. + */ +function createMockWatcher( + scenarios: Map, + fs: ReturnType, + opts?: { watcherTimeoutMs?: number } +): (jobFilePath: string, command: string, args: string[]) => { pid: number; unref: () => void } { + return (jobFilePath: string, command: string, clientArgs: string[]) => { + const key = [command, ...clientArgs].join(" "); + const scenario = scenarios.get(key) ?? { error: Object.assign(new Error("spawn ENOENT"), { code: "ENOENT" }) }; + const child = createMockChildProcess(scenario); + const watcherPid = 99999; + + let stdout = ""; + let stderr = ""; + child.stdout?.on("data", (chunk: Buffer | string) => { stdout += chunk.toString(); }); + child.stderr?.on("data", (chunk: Buffer | string) => { stderr += chunk.toString(); }); + + child.on("close", (code: number | null) => { + // Read current record, update with results + let record: JobRecord; + try { + record = JSON.parse(fs.readFileSync(jobFilePath)) as JobRecord; + } catch { + return; + } + const durationMs = Date.now() - new Date(record.startedAt).getTime(); + const status = code === 0 || code === null ? "completed" : "failed"; + record.status = status; + record.stdout = stdout; + record.stderr = stderr; + record.completedAt = new Date().toISOString(); + record.result = { stdout, stderr, exitCode: code ?? 0, client: record.client, durationMs }; + if (scenario.error) { + record.status = "failed"; + record.error = scenario.error.message; + } + fs.writeFileSync(jobFilePath, JSON.stringify(record)); + }); + + child.on("error", (err: NodeJS.ErrnoException) => { + let record: JobRecord; + try { + record = JSON.parse(fs.readFileSync(jobFilePath)) as JobRecord; + } catch { + return; + } + record.status = "failed"; + record.error = err.message; + record.completedAt = new Date().toISOString(); + fs.writeFileSync(jobFilePath, JSON.stringify(record)); + }); + + // For hang scenarios, simulate a timeout by writing timed_out after a delay + if (scenario.hang) { + const watcherTimeout = opts?.watcherTimeoutMs ?? 30; + setTimeout(() => { + try { + const existing = JSON.parse(fs.readFileSync(jobFilePath)) as JobRecord; + // Don't overwrite if already cancelled/completed + if (existing.status !== "running") return; + existing.status = "timed_out"; + existing.completedAt = new Date().toISOString(); + existing.result = { stdout, stderr, exitCode: -1, client: existing.client, durationMs: watcherTimeout }; + fs.writeFileSync(jobFilePath, JSON.stringify(existing)); + } catch { /* ignore */ } + }, watcherTimeout); + } + + // Simulate watcher updating the PID in the job file + try { + const record = JSON.parse(fs.readFileSync(jobFilePath)) as JobRecord; + record.pid = child.pid; + fs.writeFileSync(jobFilePath, JSON.stringify(record)); + } catch { /* ignore */ } + + return { pid: watcherPid, unref: () => {} }; + }; +} + +function createJobTestHelper(scenarios: Map, jobDir: string) { + const fs = createMockFs(); + const spawnWatcher = createMockWatcher(scenarios, fs); + const spawn = mockSpawn(scenarios); + return { fs, spawn, spawnWatcher, jobDir }; +} + function readJobRecord(fs: ReturnType, path: string): JobRecord { return JSON.parse(fs.readFileSync(path)) as JobRecord; } @@ -156,6 +245,7 @@ describe("startJob", () => { const job = await startJob("codex", "hello world", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -188,11 +278,13 @@ describe("startJob", () => { const job1 = await startJob("codex", "a", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); const job2 = await startJob("codex", "b", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -209,6 +301,7 @@ describe("startJob", () => { const job = await startJob("codex", "slow", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, timeoutMs: 20, }); @@ -230,6 +323,7 @@ describe("startJob", () => { const job = await startJob("codex", "fail", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -249,6 +343,7 @@ describe("startJob", () => { const job = await startJob("codex", "hello", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -263,7 +358,7 @@ describe("startJob", () => { describe("getJob", () => { it("returns the current Job state from disk", async () => { const scenarios = new Map([ - ["codex exec --yolo hello", { stdout: "ok", exitCode: 0 }], + ["codex exec --yolo hello", { hang: true }], ]); const fs = createMockFs(); const jobDir = "/tmp/jobs"; @@ -271,6 +366,7 @@ describe("getJob", () => { const job = await startJob("codex", "hello", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs, { watcherTimeoutMs: 5000 }), fs, }); @@ -280,9 +376,9 @@ describe("getJob", () => { await delay(50); + // Job should still be running since watcher timeout is 5s const after = getJob(job.id, { jobDir, fs }); - assert.strictEqual(after.status, "completed"); - assert.strictEqual(after.result?.exitCode, 0); + assert.strictEqual(after.status, "running"); }); it("throws JobNotFoundError for nonexistent job", () => { @@ -305,6 +401,7 @@ describe("getJobResult", () => { const job = await startJob("codex", "hello", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -325,6 +422,7 @@ describe("getJobResult", () => { const job = await startJob("codex", "hello", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, timeoutMs: 50, }); @@ -345,6 +443,7 @@ describe("getJobResult", () => { const job = await startJob("codex", "fail", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -376,6 +475,7 @@ describe("cancelJob", () => { const job = await startJob("codex", "hello", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs, { watcherTimeoutMs: 5000 }), fs, }); @@ -439,12 +539,14 @@ describe("listJobs", () => { const job1 = await startJob("codex", "a", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); await delay(20); const job2 = await startJob("codex", "b", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -577,6 +679,7 @@ describe("cleanupJobs", () => { const oldJob = await startJob("codex", "old", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); await delay(50); @@ -584,6 +687,7 @@ describe("cleanupJobs", () => { const newJob = await startJob("codex", "new", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); await delay(50); @@ -610,6 +714,7 @@ describe("cleanupJobs", () => { const job = await startJob("codex", "old", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); await delay(50);