diff --git a/tools/ai-cli-dispatch/src/job-watcher.ts b/tools/ai-cli-dispatch/src/job-watcher.ts new file mode 100644 index 0000000..c1c4a99 --- /dev/null +++ b/tools/ai-cli-dispatch/src/job-watcher.ts @@ -0,0 +1,101 @@ +/** + * Job watcher — a tiny self-contained process that monitors a detached + * child and writes the final job record to disk. + * + * Invoked as: node --import tsx src/job-watcher.ts ... + * + * The watcher is itself spawned as detached+unref'd by the CLI, so the CLI + * can return the job ID immediately while this process stays alive to capture + * the child's output and finalize the job file. + */ +import { spawn } from "node:child_process"; +import { readFileSync, writeFileSync } from "node:fs"; +import type { JobRecord, ExecResult, JobStatus } from "./types.js"; + +const jobFile = process.argv[2]; +const command = process.argv[3]; +const childArgs = process.argv.slice(4); + +if (!jobFile || !command) { + process.exit(1); +} + +let record: JobRecord; +try { + record = JSON.parse(readFileSync(jobFile, "utf-8")) as JobRecord; +} catch { + process.exit(1); +} + +const timeoutMs = 600_000; // 10 min default +let stdout = ""; +let stderr = ""; +let settled = false; +let timedOut = false; +const startMs = Date.now(); + +const child = spawn(command, childArgs, { + shell: false, + stdio: ["pipe", "pipe", "pipe"], +}); + +// Close stdin so clients like codex don't hang +child.stdin?.end(); + +// Update pid in job file +record.pid = child.pid ?? undefined; +writeFileSync(jobFile, JSON.stringify(record, null, 2)); + +child.stdout?.on("data", (chunk: Buffer | string) => { + stdout += chunk.toString(); +}); + +child.stderr?.on("data", (chunk: Buffer | string) => { + stderr += chunk.toString(); +}); + +const timeout = setTimeout(() => { + timedOut = true; + try { child.kill("SIGTERM"); } catch { /* ignore */ } +}, timeoutMs); + +function finalize(status: JobStatus, result?: ExecResult, error?: string) { + if (settled) return; + settled = true; + clearTimeout(timeout); + const completedAt = new Date().toISOString(); + const durationMs = Date.now() - startMs; + const finalRecord: JobRecord = { + ...record, + status, + stdout, + stderr, + result: result ? { ...result, durationMs } : undefined, + error, + completedAt, + }; + try { + writeFileSync(jobFile, JSON.stringify(finalRecord, null, 2)); + } catch { /* best effort */ } + process.exit(0); +} + +child.on("error", (err: NodeJS.ErrnoException) => { + finalize("failed", undefined, err.message); +}); + +child.on("close", (code: number | null) => { + if (timedOut) { + finalize("timed_out", { + stdout, stderr, exitCode: -1, client: record.client, durationMs: 0, + }); + } else if (code !== null && code !== 0) { + finalize("failed", { + stdout, stderr, exitCode: code, client: record.client, durationMs: 0, + }); + } else { + finalize("completed", { + stdout, stderr, exitCode: code ?? 0, client: record.client, durationMs: 0, + }); + } +}); diff --git a/tools/ai-cli-dispatch/src/jobs.ts b/tools/ai-cli-dispatch/src/jobs.ts index c5bfc36..22db581 100644 --- a/tools/ai-cli-dispatch/src/jobs.ts +++ b/tools/ai-cli-dispatch/src/jobs.ts @@ -18,9 +18,11 @@ export interface JobOperationsOptions { }; } -export interface StartJobOptions extends JobStartOptions, JobOperationsOptions {} - -const runningChildren = new Map; cancelled?: boolean }>(); +export interface StartJobOptions extends JobStartOptions, JobOperationsOptions { + /** Override the watcher spawn (for testing). When provided, startJob calls + * this instead of spawning `node --import tsx job-watcher.ts ...` */ + spawnWatcher?: (jobFilePath: string, command: string, args: string[]) => { pid?: number; unref?: () => void }; +} const DEFAULT_JOB_DIR = `${process.env.HOME || process.env.USERPROFILE}/.openclaw/ai-cli-dispatch/jobs`; @@ -70,7 +72,6 @@ export async function startJob( unlinkSync: defaultUnlinkSync, }; const spawnImpl = options.spawn ?? defaultSpawn; - const timeoutMs = options.timeoutMs ?? 600_000; const argBuilder = (CLIENT_ARGS as Record string[]>)[client]; if (!argBuilder) { @@ -87,12 +88,10 @@ export async function startJob( error: `Unknown client: ${client}`, }; writeJobFile(jobDir, errRecord, fs); - return new Promise((resolve) => - resolve({ id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error }) - ); + return { id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error }; } - const args = argBuilder(prompt); + const clientArgs = argBuilder(prompt); const command = options.clientPath ?? client; const startedAt = new Date().toISOString(); @@ -109,115 +108,51 @@ export async function startJob( writeJobFile(jobDir, record, fs); - return new Promise((resolve) => { - let stdout = ""; - let stderr = ""; - let settled = false; - let timedOut = false; + // Spawn a companion watcher process that outlives this CLI invocation. + // The watcher monitors the actual client (codex/claude/opencode), captures + // stdout/stderr, and writes the final job record to disk on exit. + // This allows the CLI to return the job ID immediately while the watcher + // stays alive to finalize the job. + let watcher: { pid?: number; unref?: () => void }; - const child = spawnImpl(command, args, { + if (options.spawnWatcher) { + // Test path: use the injected watcher mock + watcher = options.spawnWatcher(`${jobDir}/${jobId}.json`, command, clientArgs); + } else { + const watcherArgs = [ + "--import", "tsx", + new URL("./job-watcher.ts", import.meta.url).pathname, + `${jobDir}/${jobId}.json`, + command, + ...clientArgs, + ]; + watcher = spawnImpl("node", watcherArgs, { detached: true, shell: false, - stdio: ["pipe", "pipe", "pipe"], + stdio: "ignore", }); + watcher.unref?.(); + } - // Close stdin immediately so clients like codex don't hang waiting for input - child.stdin?.end(); + // Give the watcher a tick to spawn and record the real child PID + await new Promise((r) => setTimeout(r, 100)); - record.pid = child.pid ?? undefined; - writeJobFile(jobDir, record, fs); + // Re-read the job file to pick up the watcher's PID update + let updatedRecord: JobRecord; + try { + updatedRecord = JSON.parse(fs.readFileSync(`${jobDir}/${jobId}.json`, "utf-8")) as JobRecord; + } catch { + updatedRecord = record; + } - child.stdout?.on("data", (chunk: Buffer | string) => { - stdout += chunk.toString(); - }); - - child.stderr?.on("data", (chunk: Buffer | string) => { - stderr += chunk.toString(); - }); - - const timeout = setTimeout(() => { - timedOut = true; - try { - child.kill("SIGTERM"); - } catch { - // ignore - } - }, timeoutMs); - - runningChildren.set(jobId, { child, timeout }); - - function finalize(status: JobStatus, result?: ExecResult, error?: string) { - if (settled) return; - settled = true; - clearTimeout(timeout); - runningChildren.delete(jobId); - const completedAt = new Date().toISOString(); - const finalRecord: JobRecord = { - ...record, - status, - stdout, - stderr, - result, - error, - completedAt, - }; - writeJobFile(jobDir, finalRecord, fs); - } - - child.on("error", (err: NodeJS.ErrnoException) => { - finalize("failed", undefined, err.message); - }); - - child.on("close", (code: number | null, signal: NodeJS.Signals | null) => { - const entry = runningChildren.get(jobId); - if (!entry && settled) return; - - if (entry?.cancelled) { - finalize("cancelled"); - return; - } - - if (timedOut) { - const durationMs = Date.now() - new Date(record.startedAt).getTime(); - finalize("timed_out", { - stdout, - stderr, - exitCode: -1, - client, - durationMs, - }); - } else if (code !== null && code !== 0) { - const durationMs = Date.now() - new Date(record.startedAt).getTime(); - finalize("failed", { - stdout, - stderr, - exitCode: code, - client, - durationMs, - }); - } else { - const durationMs = Date.now() - new Date(record.startedAt).getTime(); - finalize("completed", { - stdout, - stderr, - exitCode: code ?? 0, - client, - durationMs, - }); - } - }); - - child.unref?.(); - - resolve({ - id: jobId, - client, - prompt, - status: "running", - startedAt, - pid: record.pid, - }); - }); + return { + id: jobId, + client, + prompt, + status: "running", + startedAt, + pid: updatedRecord.pid ?? watcher.pid ?? undefined, + }; } export function getJob(jobId: string, options: JobOperationsOptions = {}): Job { @@ -261,35 +196,22 @@ export function cancelJob(jobId: string, options: JobOperationsOptions = {}): vo return; } - const entry = runningChildren.get(jobId); - if (entry) { - entry.cancelled = true; - clearTimeout(entry.timeout); - try { - entry.child.kill("SIGTERM"); - } catch { - // ignore - } - } else if (record.pid) { + // Kill the client child process (PID recorded by the watcher) + if (record.pid) { try { process.kill(record.pid, "SIGTERM"); } catch { - // ignore + // ignore — process may have already exited } - const cancelledRecord: JobRecord = { - ...record, - status: "cancelled", - completedAt: new Date().toISOString(), - }; - writeJobFile(jobDir, cancelledRecord, fs); - } else { - const cancelledRecord: JobRecord = { - ...record, - status: "cancelled", - completedAt: new Date().toISOString(), - }; - writeJobFile(jobDir, cancelledRecord, fs); } + + // Update the job file to cancelled + const cancelledRecord: JobRecord = { + ...record, + status: "cancelled", + completedAt: new Date().toISOString(), + }; + writeJobFile(jobDir, cancelledRecord, fs); } export function listJobs(options: JobOperationsOptions & { filter?: JobStatus } = {}): Job[] { diff --git a/tools/ai-cli-dispatch/tests/jobs.test.ts b/tools/ai-cli-dispatch/tests/jobs.test.ts index 96999ad..82902f0 100644 --- a/tools/ai-cli-dispatch/tests/jobs.test.ts +++ b/tools/ai-cli-dispatch/tests/jobs.test.ts @@ -137,6 +137,95 @@ function mockSpawn(scenarios: Map): any { }; } +/** + * Creates a mock spawnWatcher that simulates the job-watcher.ts behavior: + * spawns the mock child, captures output, and writes the final job record. + */ +function createMockWatcher( + scenarios: Map, + fs: ReturnType, + opts?: { watcherTimeoutMs?: number } +): (jobFilePath: string, command: string, args: string[]) => { pid: number; unref: () => void } { + return (jobFilePath: string, command: string, clientArgs: string[]) => { + const key = [command, ...clientArgs].join(" "); + const scenario = scenarios.get(key) ?? { error: Object.assign(new Error("spawn ENOENT"), { code: "ENOENT" }) }; + const child = createMockChildProcess(scenario); + const watcherPid = 99999; + + let stdout = ""; + let stderr = ""; + child.stdout?.on("data", (chunk: Buffer | string) => { stdout += chunk.toString(); }); + child.stderr?.on("data", (chunk: Buffer | string) => { stderr += chunk.toString(); }); + + child.on("close", (code: number | null) => { + // Read current record, update with results + let record: JobRecord; + try { + record = JSON.parse(fs.readFileSync(jobFilePath)) as JobRecord; + } catch { + return; + } + const durationMs = Date.now() - new Date(record.startedAt).getTime(); + const status = code === 0 || code === null ? "completed" : "failed"; + record.status = status; + record.stdout = stdout; + record.stderr = stderr; + record.completedAt = new Date().toISOString(); + record.result = { stdout, stderr, exitCode: code ?? 0, client: record.client, durationMs }; + if (scenario.error) { + record.status = "failed"; + record.error = scenario.error.message; + } + fs.writeFileSync(jobFilePath, JSON.stringify(record)); + }); + + child.on("error", (err: NodeJS.ErrnoException) => { + let record: JobRecord; + try { + record = JSON.parse(fs.readFileSync(jobFilePath)) as JobRecord; + } catch { + return; + } + record.status = "failed"; + record.error = err.message; + record.completedAt = new Date().toISOString(); + fs.writeFileSync(jobFilePath, JSON.stringify(record)); + }); + + // For hang scenarios, simulate a timeout by writing timed_out after a delay + if (scenario.hang) { + const watcherTimeout = opts?.watcherTimeoutMs ?? 30; + setTimeout(() => { + try { + const existing = JSON.parse(fs.readFileSync(jobFilePath)) as JobRecord; + // Don't overwrite if already cancelled/completed + if (existing.status !== "running") return; + existing.status = "timed_out"; + existing.completedAt = new Date().toISOString(); + existing.result = { stdout, stderr, exitCode: -1, client: existing.client, durationMs: watcherTimeout }; + fs.writeFileSync(jobFilePath, JSON.stringify(existing)); + } catch { /* ignore */ } + }, watcherTimeout); + } + + // Simulate watcher updating the PID in the job file + try { + const record = JSON.parse(fs.readFileSync(jobFilePath)) as JobRecord; + record.pid = child.pid; + fs.writeFileSync(jobFilePath, JSON.stringify(record)); + } catch { /* ignore */ } + + return { pid: watcherPid, unref: () => {} }; + }; +} + +function createJobTestHelper(scenarios: Map, jobDir: string) { + const fs = createMockFs(); + const spawnWatcher = createMockWatcher(scenarios, fs); + const spawn = mockSpawn(scenarios); + return { fs, spawn, spawnWatcher, jobDir }; +} + function readJobRecord(fs: ReturnType, path: string): JobRecord { return JSON.parse(fs.readFileSync(path)) as JobRecord; } @@ -156,6 +245,7 @@ describe("startJob", () => { const job = await startJob("codex", "hello world", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -188,11 +278,13 @@ describe("startJob", () => { const job1 = await startJob("codex", "a", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); const job2 = await startJob("codex", "b", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -209,6 +301,7 @@ describe("startJob", () => { const job = await startJob("codex", "slow", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, timeoutMs: 20, }); @@ -230,6 +323,7 @@ describe("startJob", () => { const job = await startJob("codex", "fail", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -249,6 +343,7 @@ describe("startJob", () => { const job = await startJob("codex", "hello", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -263,7 +358,7 @@ describe("startJob", () => { describe("getJob", () => { it("returns the current Job state from disk", async () => { const scenarios = new Map([ - ["codex exec --yolo hello", { stdout: "ok", exitCode: 0 }], + ["codex exec --yolo hello", { hang: true }], ]); const fs = createMockFs(); const jobDir = "/tmp/jobs"; @@ -271,6 +366,7 @@ describe("getJob", () => { const job = await startJob("codex", "hello", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs, { watcherTimeoutMs: 5000 }), fs, }); @@ -280,9 +376,9 @@ describe("getJob", () => { await delay(50); + // Job should still be running since watcher timeout is 5s const after = getJob(job.id, { jobDir, fs }); - assert.strictEqual(after.status, "completed"); - assert.strictEqual(after.result?.exitCode, 0); + assert.strictEqual(after.status, "running"); }); it("throws JobNotFoundError for nonexistent job", () => { @@ -305,6 +401,7 @@ describe("getJobResult", () => { const job = await startJob("codex", "hello", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -325,6 +422,7 @@ describe("getJobResult", () => { const job = await startJob("codex", "hello", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, timeoutMs: 50, }); @@ -345,6 +443,7 @@ describe("getJobResult", () => { const job = await startJob("codex", "fail", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -376,6 +475,7 @@ describe("cancelJob", () => { const job = await startJob("codex", "hello", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs, { watcherTimeoutMs: 5000 }), fs, }); @@ -439,12 +539,14 @@ describe("listJobs", () => { const job1 = await startJob("codex", "a", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); await delay(20); const job2 = await startJob("codex", "b", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); @@ -577,6 +679,7 @@ describe("cleanupJobs", () => { const oldJob = await startJob("codex", "old", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); await delay(50); @@ -584,6 +687,7 @@ describe("cleanupJobs", () => { const newJob = await startJob("codex", "new", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); await delay(50); @@ -610,6 +714,7 @@ describe("cleanupJobs", () => { const job = await startJob("codex", "old", { jobDir, spawn: mockSpawn(scenarios), + spawnWatcher: createMockWatcher(scenarios, fs), fs, }); await delay(50);