From abf77260712aaf35ad11e4169c9ca8bce64d41ef Mon Sep 17 00:00:00 2001 From: Stefano Fiorini Date: Tue, 19 May 2026 19:58:48 -0500 Subject: [PATCH 1/3] feat(S-201): Define job types and storage interfaces --- tools/ai-cli-dispatch/src/execute.ts | 13 +------ tools/ai-cli-dispatch/src/types.ts | 53 ++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/tools/ai-cli-dispatch/src/execute.ts b/tools/ai-cli-dispatch/src/execute.ts index b443e6a..f79be5b 100644 --- a/tools/ai-cli-dispatch/src/execute.ts +++ b/tools/ai-cli-dispatch/src/execute.ts @@ -1,8 +1,6 @@ -import type { ChildProcess } from "node:child_process"; import { spawn as defaultSpawn } from "node:child_process"; -import type { PathLike } from "node:fs"; import { existsSync as defaultExistsSync } from "node:fs"; -import type { ClientName, ExecResult, DebugInfo } from "./types.js"; +import type { ClientName, ExecResult, DebugInfo, ExecuteOptions } from "./types.js"; import { ClientNotFoundError, ExecError } from "./types.js"; const CLIENT_ARGS: Record string[]> = { @@ -11,15 +9,6 @@ const CLIENT_ARGS: Record string[]> = { opencode: (p) => ["run", "--dangerously-skip-permissions", p], }; -export interface ExecuteOptions { - clientPath?: string; - timeoutMs?: number; - debug?: boolean; - onDebug?: (info: DebugInfo) => void; - spawn?: (command: string, args: string[], options?: { shell?: boolean }) => ChildProcess; - existsSync?: (path: PathLike) => boolean; -} - export async function executePrompt( client: ClientName, prompt: string, diff --git a/tools/ai-cli-dispatch/src/types.ts b/tools/ai-cli-dispatch/src/types.ts index 0bf167d..fd0f2fe 100644 --- a/tools/ai-cli-dispatch/src/types.ts +++ b/tools/ai-cli-dispatch/src/types.ts @@ -1,3 +1,6 @@ +import type { ChildProcess } from "node:child_process"; +import type { PathLike } from "node:fs"; + export type ClientName = "codex" | "claude" | "opencode"; export interface ClientInfo { @@ -27,6 +30,42 @@ export interface DebugInfo { noisySuccess: boolean; } +export interface ExecuteOptions { + clientPath?: string; + timeoutMs?: number; + debug?: boolean; + onDebug?: (info: DebugInfo) => void; + spawn?: (command: string, args: string[], options?: { shell?: boolean }) => ChildProcess; + existsSync?: (path: PathLike) => boolean; +} + +export type JobStatus = "running" | "completed" | "failed" | "timed_out" | "cancelled"; + +export interface Job { + id: string; + client: ClientName; + prompt: string; + status: JobStatus; + result?: ExecResult; + error?: string; + startedAt: string; + completedAt?: string; + pid?: number; +} + +/** + * On-disk storage contract for job files under + * ~/.openclaw/ai-cli-dispatch/jobs/.json + */ +export interface JobRecord extends Job { + stdout: string; + stderr: string; +} + +export interface JobStartOptions extends ExecuteOptions { + jobDir?: string; +} + export class ClientNotFoundError extends Error { constructor(client: string) { super(`Client "${client}" not found or not installed.`); @@ -43,3 +82,17 @@ export class ExecError extends Error { this.result = result; } } + +export class JobNotFoundError extends Error { + constructor(jobId: string) { + super(`Job "${jobId}" not found.`); + this.name = "JobNotFoundError"; + } +} + +export class JobResultUnavailableError extends Error { + constructor(jobId: string, status: JobStatus) { + super(`Job "${jobId}" result is not available (status: ${status}).`); + this.name = "JobResultUnavailableError"; + } +} From 3b9ed0cc389f41337218079070ce9b9a15499e2d Mon Sep 17 00:00:00 2001 From: Stefano Fiorini Date: Tue, 19 May 2026 20:17:15 -0500 Subject: [PATCH 2/3] feat(S-202): Test-drive and implement src/jobs.ts (write) --- tools/ai-cli-dispatch/src/execute.ts | 2 +- tools/ai-cli-dispatch/src/jobs.ts | 360 +++++++++++++++ tools/ai-cli-dispatch/tests/jobs.test.ts | 558 +++++++++++++++++++++++ 3 files changed, 919 insertions(+), 1 deletion(-) create mode 100644 tools/ai-cli-dispatch/src/jobs.ts create mode 100644 tools/ai-cli-dispatch/tests/jobs.test.ts diff --git a/tools/ai-cli-dispatch/src/execute.ts b/tools/ai-cli-dispatch/src/execute.ts index f79be5b..f66e057 100644 --- a/tools/ai-cli-dispatch/src/execute.ts +++ b/tools/ai-cli-dispatch/src/execute.ts @@ -3,7 +3,7 @@ import { existsSync as defaultExistsSync } from "node:fs"; import type { ClientName, ExecResult, DebugInfo, ExecuteOptions } from "./types.js"; import { ClientNotFoundError, ExecError } from "./types.js"; -const CLIENT_ARGS: Record string[]> = { +export const CLIENT_ARGS: Record string[]> = { codex: (p) => ["exec", "--yolo", p], claude: (p) => ["-p", p, "--dangerously-skip-permissions"], opencode: (p) => ["run", "--dangerously-skip-permissions", p], diff --git a/tools/ai-cli-dispatch/src/jobs.ts b/tools/ai-cli-dispatch/src/jobs.ts new file mode 100644 index 0000000..e6c8792 --- /dev/null +++ b/tools/ai-cli-dispatch/src/jobs.ts @@ -0,0 +1,360 @@ +import { spawn as defaultSpawn } from "node:child_process"; +import { existsSync as defaultExistsSync, mkdirSync as defaultMkdirSync, readFileSync as defaultReadFileSync, readdirSync as defaultReaddirSync, statSync as defaultStatSync, unlinkSync as defaultUnlinkSync, writeFileSync as defaultWriteFileSync } from "node:fs"; +import { randomUUID } from "node:crypto"; +import { CLIENT_ARGS } from "./execute.js"; +import type { ClientName, Job, JobRecord, JobStartOptions, ExecResult, JobStatus } from "./types.js"; +import { JobNotFoundError, JobResultUnavailableError } from "./types.js"; + +export interface JobOperationsOptions { + jobDir?: string; + fs?: { + mkdirSync: typeof defaultMkdirSync; + writeFileSync: typeof defaultWriteFileSync; + readFileSync: typeof defaultReadFileSync; + readdirSync: typeof defaultReaddirSync; + existsSync: typeof defaultExistsSync; + statSync: typeof defaultStatSync; + unlinkSync: typeof defaultUnlinkSync; + }; +} + +export interface StartJobOptions extends JobStartOptions, JobOperationsOptions {} + +const runningChildren = new Map; cancelled?: boolean }>(); + +const DEFAULT_JOB_DIR = `${process.env.HOME || process.env.USERPROFILE}/.openclaw/ai-cli-dispatch/jobs`; + +function getJobDir(options?: { jobDir?: string }): string { + return options?.jobDir ?? DEFAULT_JOB_DIR; +} + +function writeJobFile( + jobDir: string, + record: JobRecord, + fs: NonNullable +): void { + fs.mkdirSync(jobDir, { recursive: true }); + fs.writeFileSync(`${jobDir}/${record.id}.json`, JSON.stringify(record, null, 2)); +} + +function readJobFile( + jobId: string, + jobDir: string, + fs: NonNullable +): JobRecord { + const path = `${jobDir}/${jobId}.json`; + try { + return JSON.parse(fs.readFileSync(path, "utf-8")) as JobRecord; + } catch (err: any) { + if (err.code === "ENOENT") { + throw new JobNotFoundError(jobId); + } + throw err; + } +} + +export async function startJob( + client: ClientName, + prompt: string, + options: StartJobOptions = {} +): Promise { + const jobId = randomUUID(); + const jobDir = getJobDir(options); + const fs = options.fs ?? { + mkdirSync: defaultMkdirSync, + writeFileSync: defaultWriteFileSync, + readFileSync: defaultReadFileSync, + readdirSync: defaultReaddirSync, + existsSync: defaultExistsSync, + statSync: defaultStatSync, + unlinkSync: defaultUnlinkSync, + }; + const spawnImpl = options.spawn ?? defaultSpawn; + const timeoutMs = options.timeoutMs ?? 600_000; + + const argBuilder = (CLIENT_ARGS as Record string[]>)[client]; + if (!argBuilder) { + const startedAt = new Date().toISOString(); + const errRecord: JobRecord = { + id: jobId, + client, + prompt, + status: "failed", + startedAt, + completedAt: startedAt, + stdout: "", + stderr: "", + error: `Unknown client: ${client}`, + }; + writeJobFile(jobDir, errRecord, fs); + return { id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error }; + } + + const args = argBuilder(prompt); + const command = options.clientPath ?? client; + const startedAt = new Date().toISOString(); + + const record: JobRecord = { + id: jobId, + client, + prompt, + status: "running", + startedAt, + pid: undefined, + stdout: "", + stderr: "", + }; + + writeJobFile(jobDir, record, fs); + + return new Promise((resolve) => { + let stdout = ""; + let stderr = ""; + let settled = false; + let timedOut = false; + + const child = spawnImpl(command, args, { + detached: true, + shell: false, + stdio: ["ignore", "pipe", "pipe"], + }); + + record.pid = child.pid ?? undefined; + writeJobFile(jobDir, record, fs); + + child.stdout?.on("data", (chunk: Buffer | string) => { + stdout += chunk.toString(); + }); + + child.stderr?.on("data", (chunk: Buffer | string) => { + stderr += chunk.toString(); + }); + + const timeout = setTimeout(() => { + timedOut = true; + try { + child.kill("SIGTERM"); + } catch { + // ignore + } + }, timeoutMs); + + runningChildren.set(jobId, { child, timeout }); + + function finalize(status: JobStatus, result?: ExecResult, error?: string) { + if (settled) return; + settled = true; + clearTimeout(timeout); + runningChildren.delete(jobId); + const completedAt = new Date().toISOString(); + const finalRecord: JobRecord = { + ...record, + status, + stdout, + stderr, + result, + error, + completedAt, + }; + writeJobFile(jobDir, finalRecord, fs); + } + + child.on("error", (err: NodeJS.ErrnoException) => { + finalize("failed", undefined, err.message); + }); + + child.on("close", (code: number | null, signal: NodeJS.Signals | null) => { + const entry = runningChildren.get(jobId); + if (!entry && settled) return; + + if (entry?.cancelled) { + finalize("cancelled"); + return; + } + + if (timedOut) { + const durationMs = Date.now() - new Date(record.startedAt).getTime(); + finalize("timed_out", { + stdout, + stderr, + exitCode: -1, + client, + durationMs, + }); + } else if (code !== null && code !== 0) { + const durationMs = Date.now() - new Date(record.startedAt).getTime(); + finalize("failed", { + stdout, + stderr, + exitCode: code, + client, + durationMs, + }); + } else { + const durationMs = Date.now() - new Date(record.startedAt).getTime(); + finalize("completed", { + stdout, + stderr, + exitCode: code ?? 0, + client, + durationMs, + }); + } + }); + + child.unref?.(); + + resolve({ + id: jobId, + client, + prompt, + status: "running", + startedAt, + pid: record.pid, + }); + }); +} + +export function getJob(jobId: string, options: JobOperationsOptions = {}): Job { + const jobDir = getJobDir(options); + const fs = options.fs ?? { + mkdirSync: defaultMkdirSync, + writeFileSync: defaultWriteFileSync, + readFileSync: defaultReadFileSync, + readdirSync: defaultReaddirSync, + existsSync: defaultExistsSync, + statSync: defaultStatSync, + unlinkSync: defaultUnlinkSync, + }; + const record = readJobFile(jobId, jobDir, fs); + const { stdout: _stdout, stderr: _stderr, ...job } = record; + return job; +} + +export function getJobResult(jobId: string, options: JobOperationsOptions = {}): ExecResult { + const job = getJob(jobId, options); + if (job.status !== "completed") { + throw new JobResultUnavailableError(jobId, job.status); + } + return job.result!; +} + +export function cancelJob(jobId: string, options: JobOperationsOptions = {}): void { + const jobDir = getJobDir(options); + const fs = options.fs ?? { + mkdirSync: defaultMkdirSync, + writeFileSync: defaultWriteFileSync, + readFileSync: defaultReadFileSync, + readdirSync: defaultReaddirSync, + existsSync: defaultExistsSync, + statSync: defaultStatSync, + unlinkSync: defaultUnlinkSync, + }; + + const record = readJobFile(jobId, jobDir, fs); + if (record.status !== "running") { + return; + } + + const entry = runningChildren.get(jobId); + if (entry) { + entry.cancelled = true; + clearTimeout(entry.timeout); + try { + entry.child.kill("SIGTERM"); + } catch { + // ignore + } + } else if (record.pid) { + try { + process.kill(record.pid, "SIGTERM"); + } catch { + // ignore + } + const cancelledRecord: JobRecord = { + ...record, + status: "cancelled", + completedAt: new Date().toISOString(), + }; + writeJobFile(jobDir, cancelledRecord, fs); + } else { + const cancelledRecord: JobRecord = { + ...record, + status: "cancelled", + completedAt: new Date().toISOString(), + }; + writeJobFile(jobDir, cancelledRecord, fs); + } +} + +export function listJobs(options: JobOperationsOptions & { filter?: JobStatus } = {}): Job[] { + const jobDir = getJobDir(options); + const fs = options.fs ?? { + mkdirSync: defaultMkdirSync, + writeFileSync: defaultWriteFileSync, + readFileSync: defaultReadFileSync, + readdirSync: defaultReaddirSync, + existsSync: defaultExistsSync, + statSync: defaultStatSync, + unlinkSync: defaultUnlinkSync, + }; + + if (!fs.existsSync(jobDir)) { + return []; + } + + const entries = fs.readdirSync(jobDir).filter((f) => f.endsWith(".json")); + const jobs: Job[] = []; + + for (const entry of entries) { + const jobId = entry.replace(/\.json$/, ""); + try { + const record = readJobFile(jobId, jobDir, fs); + const { stdout: _stdout, stderr: _stderr, ...job } = record; + jobs.push(job); + } catch { + // ignore corrupt/missing files + } + } + + jobs.sort((a, b) => new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime()); + + if (options.filter) { + return jobs.filter((j) => j.status === options.filter); + } + + return jobs; +} + +export function cleanupJobs(options: JobOperationsOptions & { maxAgeMs?: number } = {}): void { + const jobDir = getJobDir(options); + const fs = options.fs ?? { + mkdirSync: defaultMkdirSync, + writeFileSync: defaultWriteFileSync, + readFileSync: defaultReadFileSync, + readdirSync: defaultReaddirSync, + existsSync: defaultExistsSync, + statSync: defaultStatSync, + unlinkSync: defaultUnlinkSync, + }; + const maxAgeMs = options.maxAgeMs ?? 24 * 60 * 60 * 1000; + + if (!fs.existsSync(jobDir)) { + return; + } + + const now = Date.now(); + const entries = fs.readdirSync(jobDir).filter((f) => f.endsWith(".json")); + + for (const entry of entries) { + const path = `${jobDir}/${entry}`; + try { + const stat = fs.statSync(path); + if (now - stat.mtimeMs > maxAgeMs) { + fs.unlinkSync(path); + } + } catch { + // ignore + } + } +} diff --git a/tools/ai-cli-dispatch/tests/jobs.test.ts b/tools/ai-cli-dispatch/tests/jobs.test.ts new file mode 100644 index 0000000..ad7efa6 --- /dev/null +++ b/tools/ai-cli-dispatch/tests/jobs.test.ts @@ -0,0 +1,558 @@ +import { describe, it } from "node:test"; +import assert from "node:assert"; +import { EventEmitter } from "node:events"; +import { Readable } from "node:stream"; +import { + startJob, + getJob, + getJobResult, + cancelJob, + listJobs, + cleanupJobs, +} from "../src/jobs.js"; +import { JobNotFoundError, JobResultUnavailableError } from "../src/types.js"; +import type { ClientName, JobRecord, JobStatus } from "../src/types.js"; + +interface MockScenario { + stdout?: string; + stderr?: string; + exitCode?: number; + error?: NodeJS.ErrnoException; + hang?: boolean; +} + +function createMockChildProcess(scenario: MockScenario): any { + const child = new EventEmitter() as any; + child.pid = 12345; + child.stdout = Readable.from( + scenario.stdout !== undefined ? [scenario.stdout] : [] + ); + child.stderr = Readable.from( + scenario.stderr !== undefined ? [scenario.stderr] : [] + ); + child.killed = false; + child.kill = (signal: string = "SIGTERM") => { + child.killed = true; + process.nextTick(() => { + child.emit("exit", null, signal); + child.emit("close", null, signal); + }); + return true; + }; + child.unref = () => {}; + + let stdoutEnded = scenario.stdout === undefined; + let stderrEnded = scenario.stderr === undefined; + + child.stdout.on("end", () => { + stdoutEnded = true; + maybeClose(); + }); + child.stderr.on("end", () => { + stderrEnded = true; + maybeClose(); + }); + + function maybeClose() { + if (stdoutEnded && stderrEnded && !scenario.hang && !scenario.error) { + child.emit("exit", scenario.exitCode ?? 0, null); + child.emit("close", scenario.exitCode ?? 0, null); + } + } + + process.nextTick(() => { + if (scenario.error) { + child.emit("error", scenario.error); + } + }); + + return child; +} + +function createMockFs() { + const files = new Map(); + const dirs = new Set(); + + const fs = { + mkdirSync: (p: string, _opts?: any) => { + dirs.add(p); + }, + writeFileSync: (p: string, data: string) => { + files.set(p, data); + }, + readFileSync: (p: string, _opts?: any): string => { + if (!files.has(p)) { + const err = Object.assign(new Error(`ENOENT: ${p}`), { code: "ENOENT" }); + throw err; + } + return files.get(p)!; + }, + readdirSync: (p: string): string[] => { + const result: string[] = []; + for (const f of files.keys()) { + const dir = f.substring(0, f.lastIndexOf("/")); + if (dir === p) { + result.push(f.substring(f.lastIndexOf("/") + 1)); + } + } + return result; + }, + existsSync: (p: string): boolean => { + return files.has(p) || dirs.has(p); + }, + statSync: (p: string): { mtimeMs: number; isFile: () => boolean } => { + if (!files.has(p)) { + const err = Object.assign(new Error(`ENOENT: ${p}`), { code: "ENOENT" }); + throw err; + } + const data = files.get(p)!; + // Use startedAt from JSON for deterministic age tests + try { + const record = JSON.parse(data) as JobRecord; + return { mtimeMs: new Date(record.startedAt).getTime(), isFile: () => true }; + } catch { + return { mtimeMs: Date.now(), isFile: () => true }; + } + }, + unlinkSync: (p: string) => { + files.delete(p); + }, + __files: files, + __dirs: dirs, + }; + + return fs; +} + +function mockSpawn(scenarios: Map): any { + return (cmd: string, args: string[], _opts: any): any => { + const key = [cmd, ...args].join(" "); + const scenario = scenarios.get(key); + if (!scenario) { + return createMockChildProcess({ + error: Object.assign(new Error("spawn ENOENT"), { code: "ENOENT" }), + }); + } + return createMockChildProcess(scenario); + }; +} + +function readJobRecord(fs: ReturnType, path: string): JobRecord { + return JSON.parse(fs.readFileSync(path)) as JobRecord; +} + +function delay(ms: number): Promise { + return new Promise((r) => setTimeout(r, ms)); +} + +describe("startJob", () => { + it("spawns a detached child process and returns a running Job", async () => { + const scenarios = new Map([ + ["codex exec --yolo hello world", { stdout: "ok\n", exitCode: 0 }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job = await startJob("codex", "hello world", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + + assert.strictEqual(job.client, "codex"); + assert.strictEqual(job.prompt, "hello world"); + assert.strictEqual(job.status, "running"); + assert.strictEqual(typeof job.id, "string"); + assert.ok(job.id.length > 0); + assert.strictEqual(typeof job.pid, "number"); + assert.ok(typeof job.startedAt === "string"); + + // Wait for child to finish + await delay(50); + + const record = readJobRecord(fs, `${jobDir}/${job.id}.json`); + assert.strictEqual(record.status, "completed"); + assert.strictEqual(record.stdout, "ok\n"); + assert.strictEqual(record.result?.exitCode, 0); + assert.ok(typeof record.completedAt === "string"); + }); + + it("generates unique ids for concurrent jobs", async () => { + const scenarios = new Map([ + ["codex exec --yolo a", { stdout: "a", exitCode: 0 }], + ["codex exec --yolo b", { stdout: "b", exitCode: 0 }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job1 = await startJob("codex", "a", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + const job2 = await startJob("codex", "b", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + + assert.notStrictEqual(job1.id, job2.id); + }); + + it("sets status to timed_out when timeout is exceeded", async () => { + const scenarios = new Map([ + ["codex exec --yolo slow", { hang: true }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job = await startJob("codex", "slow", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + timeoutMs: 20, + }); + + // Wait for timeout + processing + await delay(100); + + const record = readJobRecord(fs, `${jobDir}/${job.id}.json`); + assert.strictEqual(record.status, "timed_out"); + }); + + it("sets status to failed on non-zero exit code", async () => { + const scenarios = new Map([ + ["codex exec --yolo fail", { stdout: "", stderr: "err", exitCode: 1 }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job = await startJob("codex", "fail", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + + await delay(50); + + const record = readJobRecord(fs, `${jobDir}/${job.id}.json`); + assert.strictEqual(record.status, "failed"); + assert.strictEqual(record.result?.exitCode, 1); + assert.strictEqual(record.stderr, "err"); + }); + + it("sets status to failed on spawn ENOENT", async () => { + const scenarios = new Map(); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job = await startJob("codex", "hello", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + + await delay(50); + + const record = readJobRecord(fs, `${jobDir}/${job.id}.json`); + assert.strictEqual(record.status, "failed"); + assert.ok(record.error); + }); +}); + +describe("getJob", () => { + it("returns the current Job state from disk", async () => { + const scenarios = new Map([ + ["codex exec --yolo hello", { stdout: "ok", exitCode: 0 }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job = await startJob("codex", "hello", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + + const fetched = getJob(job.id, { jobDir, fs }); + assert.strictEqual(fetched.id, job.id); + assert.strictEqual(fetched.status, "running"); + + await delay(50); + + const after = getJob(job.id, { jobDir, fs }); + assert.strictEqual(after.status, "completed"); + assert.strictEqual(after.result?.exitCode, 0); + }); + + it("throws JobNotFoundError for nonexistent job", () => { + const fs = createMockFs(); + assert.throws( + () => getJob("missing", { jobDir: "/tmp/jobs", fs }), + (err: unknown) => err instanceof JobNotFoundError + ); + }); +}); + +describe("getJobResult", () => { + it("returns ExecResult when job is completed", async () => { + const scenarios = new Map([ + ["codex exec --yolo hello", { stdout: "ok", exitCode: 0 }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job = await startJob("codex", "hello", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + + await delay(50); + + const result = getJobResult(job.id, { jobDir, fs }); + assert.strictEqual(result.stdout, "ok"); + assert.strictEqual(result.exitCode, 0); + }); + + it("throws JobResultUnavailableError when job is still running", async () => { + const scenarios = new Map([ + ["codex exec --yolo hello", { hang: true }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job = await startJob("codex", "hello", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + timeoutMs: 50, + }); + + assert.throws( + () => getJobResult(job.id, { jobDir, fs }), + (err: unknown) => err instanceof JobResultUnavailableError + ); + }); + + it("throws JobResultUnavailableError when job failed", async () => { + const scenarios = new Map([ + ["codex exec --yolo fail", { stdout: "", stderr: "err", exitCode: 1 }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job = await startJob("codex", "fail", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + + await delay(50); + + assert.throws( + () => getJobResult(job.id, { jobDir, fs }), + (err: unknown) => err instanceof JobResultUnavailableError + ); + }); + + it("throws JobNotFoundError for nonexistent job", () => { + const fs = createMockFs(); + assert.throws( + () => getJobResult("missing", { jobDir: "/tmp/jobs", fs }), + (err: unknown) => err instanceof JobNotFoundError + ); + }); +}); + +describe("cancelJob", () => { + it("sends SIGTERM and updates status to cancelled", async () => { + const scenarios = new Map([ + ["codex exec --yolo hello", { hang: true }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job = await startJob("codex", "hello", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + + cancelJob(job.id, { jobDir, fs }); + + await delay(50); + + const record = readJobRecord(fs, `${jobDir}/${job.id}.json`); + assert.strictEqual(record.status, "cancelled"); + }); + + it("throws JobNotFoundError for nonexistent job", () => { + const fs = createMockFs(); + assert.throws( + () => cancelJob("missing", { jobDir: "/tmp/jobs", fs }), + (err: unknown) => err instanceof JobNotFoundError + ); + }); +}); + +describe("listJobs", () => { + it("returns all jobs sorted by startedAt desc", async () => { + const scenarios = new Map([ + ["codex exec --yolo a", { stdout: "a", exitCode: 0 }], + ["codex exec --yolo b", { stdout: "b", exitCode: 0 }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job1 = await startJob("codex", "a", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + await delay(20); + const job2 = await startJob("codex", "b", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + + await delay(50); + + const jobs = listJobs({ jobDir, fs }); + assert.strictEqual(jobs.length, 2); + assert.strictEqual(jobs[0].id, job2.id); + assert.strictEqual(jobs[1].id, job1.id); + }); + + it("filters jobs by status when filter is provided", () => { + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const now = new Date().toISOString(); + const runningRecord: JobRecord = { + id: "job-running", + client: "codex", + prompt: "run", + status: "running", + startedAt: now, + stdout: "", + stderr: "", + }; + const completedRecord: JobRecord = { + id: "job-completed", + client: "claude", + prompt: "done", + status: "completed", + startedAt: now, + completedAt: now, + stdout: "ok", + stderr: "", + result: { + stdout: "ok", + stderr: "", + exitCode: 0, + client: "claude", + durationMs: 100, + }, + }; + const failedRecord: JobRecord = { + id: "job-failed", + client: "opencode", + prompt: "fail", + status: "failed", + startedAt: now, + completedAt: now, + stdout: "", + stderr: "err", + result: { + stdout: "", + stderr: "err", + exitCode: 1, + client: "opencode", + durationMs: 100, + }, + }; + + fs.mkdirSync(jobDir, { recursive: true }); + fs.writeFileSync(`${jobDir}/job-running.json`, JSON.stringify(runningRecord)); + fs.writeFileSync(`${jobDir}/job-completed.json`, JSON.stringify(completedRecord)); + fs.writeFileSync(`${jobDir}/job-failed.json`, JSON.stringify(failedRecord)); + + const all = listJobs({ jobDir, fs }); + assert.strictEqual(all.length, 3); + + const running = listJobs({ jobDir, fs, filter: "running" }); + assert.strictEqual(running.length, 1); + assert.strictEqual(running[0].id, "job-running"); + + const completed = listJobs({ jobDir, fs, filter: "completed" }); + assert.strictEqual(completed.length, 1); + assert.strictEqual(completed[0].id, "job-completed"); + + const failed = listJobs({ jobDir, fs, filter: "failed" }); + assert.strictEqual(failed.length, 1); + assert.strictEqual(failed[0].id, "job-failed"); + }); +}); + +describe("cleanupJobs", () => { + it("deletes job files older than maxAgeMs", async () => { + const scenarios = new Map([ + ["codex exec --yolo old", { stdout: "old", exitCode: 0 }], + ["codex exec --yolo new", { stdout: "new", exitCode: 0 }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + // Create an old job by manipulating startedAt after creation + const oldJob = await startJob("codex", "old", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + await delay(50); + + const newJob = await startJob("codex", "new", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + await delay(50); + + // Make old job appear 25h old by patching its record + const oldPath = `${jobDir}/${oldJob.id}.json`; + const oldRecord = readJobRecord(fs, oldPath); + oldRecord.startedAt = new Date(Date.now() - 25 * 60 * 60 * 1000).toISOString(); + fs.writeFileSync(oldPath, JSON.stringify(oldRecord)); + + cleanupJobs({ jobDir, fs, maxAgeMs: 24 * 60 * 60 * 1000 }); + + assert.strictEqual(fs.existsSync(oldPath), false); + assert.strictEqual(fs.existsSync(`${jobDir}/${newJob.id}.json`), true); + }); + + it("uses default maxAge of 24 hours", async () => { + const scenarios = new Map([ + ["codex exec --yolo old", { stdout: "old", exitCode: 0 }], + ]); + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + + const job = await startJob("codex", "old", { + jobDir, + spawn: mockSpawn(scenarios), + fs, + }); + await delay(50); + + const path = `${jobDir}/${job.id}.json`; + const record = readJobRecord(fs, path); + record.startedAt = new Date(Date.now() - 25 * 60 * 60 * 1000).toISOString(); + fs.writeFileSync(path, JSON.stringify(record)); + + cleanupJobs({ jobDir, fs }); + + assert.strictEqual(fs.existsSync(path), false); + }); +}); From e7b01612c84f43fc394beecfcdbe2a2e92862a9b Mon Sep 17 00:00:00 2001 From: Stefano Fiorini Date: Tue, 19 May 2026 20:29:35 -0500 Subject: [PATCH 3/3] feat(M2): Background Job Manager --- tools/ai-cli-dispatch/SKILL.md | 29 +++++++++ tools/ai-cli-dispatch/src/jobs.ts | 4 +- tools/ai-cli-dispatch/tests/jobs.test.ts | 75 ++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 1 deletion(-) diff --git a/tools/ai-cli-dispatch/SKILL.md b/tools/ai-cli-dispatch/SKILL.md index 4d7efa4..db86cee 100644 --- a/tools/ai-cli-dispatch/SKILL.md +++ b/tools/ai-cli-dispatch/SKILL.md @@ -45,6 +45,35 @@ The skill searches for the following clients in order: Run `list` to see which clients are installed and their resolved versions. +## Background Jobs + +For long-running or fire-and-forget tasks, use the programmatic job API instead of `exec`: + +```typescript +import { startJob, getJob, cancelJob, listJobs, cleanupJobs } from "./src/jobs.js"; + +// Start a detached job +const job = await startJob("codex", "refactor auth module", { timeoutMs: 300_000 }); +console.log(job.id); // e.g. "a1b2c3d4..." +console.log(job.status); // "running" + +// Poll for completion +const latest = getJob(job.id); +console.log(latest.status); // "running" | "completed" | "failed" | "timed_out" | "cancelled" + +// Cancel a running job +cancelJob(job.id); + +// List all jobs (newest first) +const jobs = listJobs(); // Job[] +const running = listJobs({ filter: "running" }); + +// Clean up job files older than 24 hours (default) +cleanupJobs({ maxAgeMs: 24 * 60 * 60 * 1000 }); +``` + +Job files are stored under `~/.openclaw/ai-cli-dispatch/jobs/.json` and include stdout, stderr, exit code, and timing. + ## Output Rules - Normal JSON output redacts local file paths and credential metadata. diff --git a/tools/ai-cli-dispatch/src/jobs.ts b/tools/ai-cli-dispatch/src/jobs.ts index e6c8792..a027cff 100644 --- a/tools/ai-cli-dispatch/src/jobs.ts +++ b/tools/ai-cli-dispatch/src/jobs.ts @@ -87,7 +87,9 @@ export async function startJob( error: `Unknown client: ${client}`, }; writeJobFile(jobDir, errRecord, fs); - return { id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error }; + return new Promise((resolve) => + resolve({ id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error }) + ); } const args = argBuilder(prompt); diff --git a/tools/ai-cli-dispatch/tests/jobs.test.ts b/tools/ai-cli-dispatch/tests/jobs.test.ts index ad7efa6..96999ad 100644 --- a/tools/ai-cli-dispatch/tests/jobs.test.ts +++ b/tools/ai-cli-dispatch/tests/jobs.test.ts @@ -394,6 +394,37 @@ describe("cancelJob", () => { (err: unknown) => err instanceof JobNotFoundError ); }); + + it("is a no-op when job is not running", () => { + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + const now = new Date().toISOString(); + const completedRecord: JobRecord = { + id: "job-completed", + client: "codex", + prompt: "done", + status: "completed", + startedAt: now, + completedAt: now, + stdout: "ok", + stderr: "", + result: { + stdout: "ok", + stderr: "", + exitCode: 0, + client: "codex", + durationMs: 100, + }, + }; + + fs.mkdirSync(jobDir, { recursive: true }); + fs.writeFileSync(`${jobDir}/job-completed.json`, JSON.stringify(completedRecord)); + + cancelJob("job-completed", { jobDir, fs }); + + const record = readJobRecord(fs, `${jobDir}/job-completed.json`); + assert.strictEqual(record.status, "completed"); + }); }); describe("listJobs", () => { @@ -425,6 +456,43 @@ describe("listJobs", () => { assert.strictEqual(jobs[1].id, job1.id); }); + it("returns empty array when jobDir does not exist", () => { + const fs = createMockFs(); + const jobs = listJobs({ jobDir: "/tmp/jobs", fs }); + assert.deepStrictEqual(jobs, []); + }); + + it("ignores corrupt or unreadable job files", () => { + const fs = createMockFs(); + const jobDir = "/tmp/jobs"; + const now = new Date().toISOString(); + const validRecord: JobRecord = { + id: "job-valid", + client: "codex", + prompt: "ok", + status: "completed", + startedAt: now, + completedAt: now, + stdout: "ok", + stderr: "", + result: { + stdout: "ok", + stderr: "", + exitCode: 0, + client: "codex", + durationMs: 100, + }, + }; + + fs.mkdirSync(jobDir, { recursive: true }); + fs.writeFileSync(`${jobDir}/job-valid.json`, JSON.stringify(validRecord)); + fs.writeFileSync(`${jobDir}/job-corrupt.json`, "not-json"); + + const jobs = listJobs({ jobDir, fs }); + assert.strictEqual(jobs.length, 1); + assert.strictEqual(jobs[0].id, "job-valid"); + }); + it("filters jobs by status when filter is provided", () => { const fs = createMockFs(); const jobDir = "/tmp/jobs"; @@ -555,4 +623,11 @@ describe("cleanupJobs", () => { assert.strictEqual(fs.existsSync(path), false); }); + + it("is a no-op when jobDir does not exist", () => { + const fs = createMockFs(); + // Should not throw + cleanupJobs({ jobDir: "/tmp/jobs", fs }); + assert.strictEqual(fs.existsSync("/tmp/jobs"), false); + }); });