merge M2 into implement/2026-05-19-ai-cli-dispatch-fixes
This commit is contained in:
@@ -45,6 +45,35 @@ The skill searches for the following clients in order:
|
||||
|
||||
Run `list` to see which clients are installed and their resolved versions.
|
||||
|
||||
## Background Jobs
|
||||
|
||||
For long-running or fire-and-forget tasks, use the programmatic job API instead of `exec`:
|
||||
|
||||
```typescript
|
||||
import { startJob, getJob, cancelJob, listJobs, cleanupJobs } from "./src/jobs.js";
|
||||
|
||||
// Start a detached job
|
||||
const job = await startJob("codex", "refactor auth module", { timeoutMs: 300_000 });
|
||||
console.log(job.id); // e.g. "a1b2c3d4..."
|
||||
console.log(job.status); // "running"
|
||||
|
||||
// Poll for completion
|
||||
const latest = getJob(job.id);
|
||||
console.log(latest.status); // "running" | "completed" | "failed" | "timed_out" | "cancelled"
|
||||
|
||||
// Cancel a running job
|
||||
cancelJob(job.id);
|
||||
|
||||
// List all jobs (newest first)
|
||||
const jobs = listJobs(); // Job[]
|
||||
const running = listJobs({ filter: "running" });
|
||||
|
||||
// Clean up job files older than 24 hours (default)
|
||||
cleanupJobs({ maxAgeMs: 24 * 60 * 60 * 1000 });
|
||||
```
|
||||
|
||||
Job files are stored under `~/.openclaw/ai-cli-dispatch/jobs/<jobId>.json` and include stdout, stderr, exit code, and timing.
|
||||
|
||||
## Output Rules
|
||||
|
||||
- Normal JSON output redacts local file paths and credential metadata.
|
||||
|
||||
@@ -1,25 +1,14 @@
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
import { spawn as defaultSpawn } from "node:child_process";
|
||||
import type { PathLike } from "node:fs";
|
||||
import { existsSync as defaultExistsSync } from "node:fs";
|
||||
import type { ClientName, ExecResult, DebugInfo } from "./types.js";
|
||||
import type { ClientName, ExecResult, DebugInfo, ExecuteOptions } from "./types.js";
|
||||
import { ClientNotFoundError, ExecError } from "./types.js";
|
||||
|
||||
const CLIENT_ARGS: Record<ClientName, (prompt: string) => string[]> = {
|
||||
export const CLIENT_ARGS: Record<ClientName, (prompt: string) => string[]> = {
|
||||
codex: (p) => ["exec", "--yolo", p],
|
||||
claude: (p) => ["-p", p, "--dangerously-skip-permissions"],
|
||||
opencode: (p) => ["run", "--dangerously-skip-permissions", p],
|
||||
};
|
||||
|
||||
export interface ExecuteOptions {
|
||||
clientPath?: string;
|
||||
timeoutMs?: number;
|
||||
debug?: boolean;
|
||||
onDebug?: (info: DebugInfo) => void;
|
||||
spawn?: (command: string, args: string[], options?: { shell?: boolean }) => ChildProcess;
|
||||
existsSync?: (path: PathLike) => boolean;
|
||||
}
|
||||
|
||||
export async function executePrompt(
|
||||
client: ClientName,
|
||||
prompt: string,
|
||||
|
||||
@@ -0,0 +1,362 @@
|
||||
import { spawn as defaultSpawn } from "node:child_process";
|
||||
import { existsSync as defaultExistsSync, mkdirSync as defaultMkdirSync, readFileSync as defaultReadFileSync, readdirSync as defaultReaddirSync, statSync as defaultStatSync, unlinkSync as defaultUnlinkSync, writeFileSync as defaultWriteFileSync } from "node:fs";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { CLIENT_ARGS } from "./execute.js";
|
||||
import type { ClientName, Job, JobRecord, JobStartOptions, ExecResult, JobStatus } from "./types.js";
|
||||
import { JobNotFoundError, JobResultUnavailableError } from "./types.js";
|
||||
|
||||
export interface JobOperationsOptions {
|
||||
jobDir?: string;
|
||||
fs?: {
|
||||
mkdirSync: typeof defaultMkdirSync;
|
||||
writeFileSync: typeof defaultWriteFileSync;
|
||||
readFileSync: typeof defaultReadFileSync;
|
||||
readdirSync: typeof defaultReaddirSync;
|
||||
existsSync: typeof defaultExistsSync;
|
||||
statSync: typeof defaultStatSync;
|
||||
unlinkSync: typeof defaultUnlinkSync;
|
||||
};
|
||||
}
|
||||
|
||||
export interface StartJobOptions extends JobStartOptions, JobOperationsOptions {}
|
||||
|
||||
const runningChildren = new Map<string, { child: any; timeout?: ReturnType<typeof setTimeout>; cancelled?: boolean }>();
|
||||
|
||||
const DEFAULT_JOB_DIR = `${process.env.HOME || process.env.USERPROFILE}/.openclaw/ai-cli-dispatch/jobs`;
|
||||
|
||||
function getJobDir(options?: { jobDir?: string }): string {
|
||||
return options?.jobDir ?? DEFAULT_JOB_DIR;
|
||||
}
|
||||
|
||||
function writeJobFile(
|
||||
jobDir: string,
|
||||
record: JobRecord,
|
||||
fs: NonNullable<JobOperationsOptions["fs"]>
|
||||
): void {
|
||||
fs.mkdirSync(jobDir, { recursive: true });
|
||||
fs.writeFileSync(`${jobDir}/${record.id}.json`, JSON.stringify(record, null, 2));
|
||||
}
|
||||
|
||||
function readJobFile(
|
||||
jobId: string,
|
||||
jobDir: string,
|
||||
fs: NonNullable<JobOperationsOptions["fs"]>
|
||||
): JobRecord {
|
||||
const path = `${jobDir}/${jobId}.json`;
|
||||
try {
|
||||
return JSON.parse(fs.readFileSync(path, "utf-8")) as JobRecord;
|
||||
} catch (err: any) {
|
||||
if (err.code === "ENOENT") {
|
||||
throw new JobNotFoundError(jobId);
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
export async function startJob(
|
||||
client: ClientName,
|
||||
prompt: string,
|
||||
options: StartJobOptions = {}
|
||||
): Promise<Job> {
|
||||
const jobId = randomUUID();
|
||||
const jobDir = getJobDir(options);
|
||||
const fs = options.fs ?? {
|
||||
mkdirSync: defaultMkdirSync,
|
||||
writeFileSync: defaultWriteFileSync,
|
||||
readFileSync: defaultReadFileSync,
|
||||
readdirSync: defaultReaddirSync,
|
||||
existsSync: defaultExistsSync,
|
||||
statSync: defaultStatSync,
|
||||
unlinkSync: defaultUnlinkSync,
|
||||
};
|
||||
const spawnImpl = options.spawn ?? defaultSpawn;
|
||||
const timeoutMs = options.timeoutMs ?? 600_000;
|
||||
|
||||
const argBuilder = (CLIENT_ARGS as Record<string, (prompt: string) => string[]>)[client];
|
||||
if (!argBuilder) {
|
||||
const startedAt = new Date().toISOString();
|
||||
const errRecord: JobRecord = {
|
||||
id: jobId,
|
||||
client,
|
||||
prompt,
|
||||
status: "failed",
|
||||
startedAt,
|
||||
completedAt: startedAt,
|
||||
stdout: "",
|
||||
stderr: "",
|
||||
error: `Unknown client: ${client}`,
|
||||
};
|
||||
writeJobFile(jobDir, errRecord, fs);
|
||||
return new Promise((resolve) =>
|
||||
resolve({ id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error })
|
||||
);
|
||||
}
|
||||
|
||||
const args = argBuilder(prompt);
|
||||
const command = options.clientPath ?? client;
|
||||
const startedAt = new Date().toISOString();
|
||||
|
||||
const record: JobRecord = {
|
||||
id: jobId,
|
||||
client,
|
||||
prompt,
|
||||
status: "running",
|
||||
startedAt,
|
||||
pid: undefined,
|
||||
stdout: "",
|
||||
stderr: "",
|
||||
};
|
||||
|
||||
writeJobFile(jobDir, record, fs);
|
||||
|
||||
return new Promise((resolve) => {
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
let settled = false;
|
||||
let timedOut = false;
|
||||
|
||||
const child = spawnImpl(command, args, {
|
||||
detached: true,
|
||||
shell: false,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
|
||||
record.pid = child.pid ?? undefined;
|
||||
writeJobFile(jobDir, record, fs);
|
||||
|
||||
child.stdout?.on("data", (chunk: Buffer | string) => {
|
||||
stdout += chunk.toString();
|
||||
});
|
||||
|
||||
child.stderr?.on("data", (chunk: Buffer | string) => {
|
||||
stderr += chunk.toString();
|
||||
});
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
timedOut = true;
|
||||
try {
|
||||
child.kill("SIGTERM");
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}, timeoutMs);
|
||||
|
||||
runningChildren.set(jobId, { child, timeout });
|
||||
|
||||
function finalize(status: JobStatus, result?: ExecResult, error?: string) {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
clearTimeout(timeout);
|
||||
runningChildren.delete(jobId);
|
||||
const completedAt = new Date().toISOString();
|
||||
const finalRecord: JobRecord = {
|
||||
...record,
|
||||
status,
|
||||
stdout,
|
||||
stderr,
|
||||
result,
|
||||
error,
|
||||
completedAt,
|
||||
};
|
||||
writeJobFile(jobDir, finalRecord, fs);
|
||||
}
|
||||
|
||||
child.on("error", (err: NodeJS.ErrnoException) => {
|
||||
finalize("failed", undefined, err.message);
|
||||
});
|
||||
|
||||
child.on("close", (code: number | null, signal: NodeJS.Signals | null) => {
|
||||
const entry = runningChildren.get(jobId);
|
||||
if (!entry && settled) return;
|
||||
|
||||
if (entry?.cancelled) {
|
||||
finalize("cancelled");
|
||||
return;
|
||||
}
|
||||
|
||||
if (timedOut) {
|
||||
const durationMs = Date.now() - new Date(record.startedAt).getTime();
|
||||
finalize("timed_out", {
|
||||
stdout,
|
||||
stderr,
|
||||
exitCode: -1,
|
||||
client,
|
||||
durationMs,
|
||||
});
|
||||
} else if (code !== null && code !== 0) {
|
||||
const durationMs = Date.now() - new Date(record.startedAt).getTime();
|
||||
finalize("failed", {
|
||||
stdout,
|
||||
stderr,
|
||||
exitCode: code,
|
||||
client,
|
||||
durationMs,
|
||||
});
|
||||
} else {
|
||||
const durationMs = Date.now() - new Date(record.startedAt).getTime();
|
||||
finalize("completed", {
|
||||
stdout,
|
||||
stderr,
|
||||
exitCode: code ?? 0,
|
||||
client,
|
||||
durationMs,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
child.unref?.();
|
||||
|
||||
resolve({
|
||||
id: jobId,
|
||||
client,
|
||||
prompt,
|
||||
status: "running",
|
||||
startedAt,
|
||||
pid: record.pid,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function getJob(jobId: string, options: JobOperationsOptions = {}): Job {
|
||||
const jobDir = getJobDir(options);
|
||||
const fs = options.fs ?? {
|
||||
mkdirSync: defaultMkdirSync,
|
||||
writeFileSync: defaultWriteFileSync,
|
||||
readFileSync: defaultReadFileSync,
|
||||
readdirSync: defaultReaddirSync,
|
||||
existsSync: defaultExistsSync,
|
||||
statSync: defaultStatSync,
|
||||
unlinkSync: defaultUnlinkSync,
|
||||
};
|
||||
const record = readJobFile(jobId, jobDir, fs);
|
||||
const { stdout: _stdout, stderr: _stderr, ...job } = record;
|
||||
return job;
|
||||
}
|
||||
|
||||
export function getJobResult(jobId: string, options: JobOperationsOptions = {}): ExecResult {
|
||||
const job = getJob(jobId, options);
|
||||
if (job.status !== "completed") {
|
||||
throw new JobResultUnavailableError(jobId, job.status);
|
||||
}
|
||||
return job.result!;
|
||||
}
|
||||
|
||||
export function cancelJob(jobId: string, options: JobOperationsOptions = {}): void {
|
||||
const jobDir = getJobDir(options);
|
||||
const fs = options.fs ?? {
|
||||
mkdirSync: defaultMkdirSync,
|
||||
writeFileSync: defaultWriteFileSync,
|
||||
readFileSync: defaultReadFileSync,
|
||||
readdirSync: defaultReaddirSync,
|
||||
existsSync: defaultExistsSync,
|
||||
statSync: defaultStatSync,
|
||||
unlinkSync: defaultUnlinkSync,
|
||||
};
|
||||
|
||||
const record = readJobFile(jobId, jobDir, fs);
|
||||
if (record.status !== "running") {
|
||||
return;
|
||||
}
|
||||
|
||||
const entry = runningChildren.get(jobId);
|
||||
if (entry) {
|
||||
entry.cancelled = true;
|
||||
clearTimeout(entry.timeout);
|
||||
try {
|
||||
entry.child.kill("SIGTERM");
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
} else if (record.pid) {
|
||||
try {
|
||||
process.kill(record.pid, "SIGTERM");
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
const cancelledRecord: JobRecord = {
|
||||
...record,
|
||||
status: "cancelled",
|
||||
completedAt: new Date().toISOString(),
|
||||
};
|
||||
writeJobFile(jobDir, cancelledRecord, fs);
|
||||
} else {
|
||||
const cancelledRecord: JobRecord = {
|
||||
...record,
|
||||
status: "cancelled",
|
||||
completedAt: new Date().toISOString(),
|
||||
};
|
||||
writeJobFile(jobDir, cancelledRecord, fs);
|
||||
}
|
||||
}
|
||||
|
||||
export function listJobs(options: JobOperationsOptions & { filter?: JobStatus } = {}): Job[] {
|
||||
const jobDir = getJobDir(options);
|
||||
const fs = options.fs ?? {
|
||||
mkdirSync: defaultMkdirSync,
|
||||
writeFileSync: defaultWriteFileSync,
|
||||
readFileSync: defaultReadFileSync,
|
||||
readdirSync: defaultReaddirSync,
|
||||
existsSync: defaultExistsSync,
|
||||
statSync: defaultStatSync,
|
||||
unlinkSync: defaultUnlinkSync,
|
||||
};
|
||||
|
||||
if (!fs.existsSync(jobDir)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const entries = fs.readdirSync(jobDir).filter((f) => f.endsWith(".json"));
|
||||
const jobs: Job[] = [];
|
||||
|
||||
for (const entry of entries) {
|
||||
const jobId = entry.replace(/\.json$/, "");
|
||||
try {
|
||||
const record = readJobFile(jobId, jobDir, fs);
|
||||
const { stdout: _stdout, stderr: _stderr, ...job } = record;
|
||||
jobs.push(job);
|
||||
} catch {
|
||||
// ignore corrupt/missing files
|
||||
}
|
||||
}
|
||||
|
||||
jobs.sort((a, b) => new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime());
|
||||
|
||||
if (options.filter) {
|
||||
return jobs.filter((j) => j.status === options.filter);
|
||||
}
|
||||
|
||||
return jobs;
|
||||
}
|
||||
|
||||
export function cleanupJobs(options: JobOperationsOptions & { maxAgeMs?: number } = {}): void {
|
||||
const jobDir = getJobDir(options);
|
||||
const fs = options.fs ?? {
|
||||
mkdirSync: defaultMkdirSync,
|
||||
writeFileSync: defaultWriteFileSync,
|
||||
readFileSync: defaultReadFileSync,
|
||||
readdirSync: defaultReaddirSync,
|
||||
existsSync: defaultExistsSync,
|
||||
statSync: defaultStatSync,
|
||||
unlinkSync: defaultUnlinkSync,
|
||||
};
|
||||
const maxAgeMs = options.maxAgeMs ?? 24 * 60 * 60 * 1000;
|
||||
|
||||
if (!fs.existsSync(jobDir)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
const entries = fs.readdirSync(jobDir).filter((f) => f.endsWith(".json"));
|
||||
|
||||
for (const entry of entries) {
|
||||
const path = `${jobDir}/${entry}`;
|
||||
try {
|
||||
const stat = fs.statSync(path);
|
||||
if (now - stat.mtimeMs > maxAgeMs) {
|
||||
fs.unlinkSync(path);
|
||||
}
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,6 @@
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
import type { PathLike } from "node:fs";
|
||||
|
||||
export type ClientName = "codex" | "claude" | "opencode";
|
||||
|
||||
export interface ClientInfo {
|
||||
@@ -27,6 +30,42 @@ export interface DebugInfo {
|
||||
noisySuccess: boolean;
|
||||
}
|
||||
|
||||
export interface ExecuteOptions {
|
||||
clientPath?: string;
|
||||
timeoutMs?: number;
|
||||
debug?: boolean;
|
||||
onDebug?: (info: DebugInfo) => void;
|
||||
spawn?: (command: string, args: string[], options?: { shell?: boolean }) => ChildProcess;
|
||||
existsSync?: (path: PathLike) => boolean;
|
||||
}
|
||||
|
||||
export type JobStatus = "running" | "completed" | "failed" | "timed_out" | "cancelled";
|
||||
|
||||
export interface Job {
|
||||
id: string;
|
||||
client: ClientName;
|
||||
prompt: string;
|
||||
status: JobStatus;
|
||||
result?: ExecResult;
|
||||
error?: string;
|
||||
startedAt: string;
|
||||
completedAt?: string;
|
||||
pid?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* On-disk storage contract for job files under
|
||||
* ~/.openclaw/ai-cli-dispatch/jobs/<jobId>.json
|
||||
*/
|
||||
export interface JobRecord extends Job {
|
||||
stdout: string;
|
||||
stderr: string;
|
||||
}
|
||||
|
||||
export interface JobStartOptions extends ExecuteOptions {
|
||||
jobDir?: string;
|
||||
}
|
||||
|
||||
export class ClientNotFoundError extends Error {
|
||||
constructor(client: string) {
|
||||
super(`Client "${client}" not found or not installed.`);
|
||||
@@ -43,3 +82,17 @@ export class ExecError extends Error {
|
||||
this.result = result;
|
||||
}
|
||||
}
|
||||
|
||||
export class JobNotFoundError extends Error {
|
||||
constructor(jobId: string) {
|
||||
super(`Job "${jobId}" not found.`);
|
||||
this.name = "JobNotFoundError";
|
||||
}
|
||||
}
|
||||
|
||||
export class JobResultUnavailableError extends Error {
|
||||
constructor(jobId: string, status: JobStatus) {
|
||||
super(`Job "${jobId}" result is not available (status: ${status}).`);
|
||||
this.name = "JobResultUnavailableError";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,633 @@
|
||||
import { describe, it } from "node:test";
|
||||
import assert from "node:assert";
|
||||
import { EventEmitter } from "node:events";
|
||||
import { Readable } from "node:stream";
|
||||
import {
|
||||
startJob,
|
||||
getJob,
|
||||
getJobResult,
|
||||
cancelJob,
|
||||
listJobs,
|
||||
cleanupJobs,
|
||||
} from "../src/jobs.js";
|
||||
import { JobNotFoundError, JobResultUnavailableError } from "../src/types.js";
|
||||
import type { ClientName, JobRecord, JobStatus } from "../src/types.js";
|
||||
|
||||
interface MockScenario {
|
||||
stdout?: string;
|
||||
stderr?: string;
|
||||
exitCode?: number;
|
||||
error?: NodeJS.ErrnoException;
|
||||
hang?: boolean;
|
||||
}
|
||||
|
||||
function createMockChildProcess(scenario: MockScenario): any {
|
||||
const child = new EventEmitter() as any;
|
||||
child.pid = 12345;
|
||||
child.stdout = Readable.from(
|
||||
scenario.stdout !== undefined ? [scenario.stdout] : []
|
||||
);
|
||||
child.stderr = Readable.from(
|
||||
scenario.stderr !== undefined ? [scenario.stderr] : []
|
||||
);
|
||||
child.killed = false;
|
||||
child.kill = (signal: string = "SIGTERM") => {
|
||||
child.killed = true;
|
||||
process.nextTick(() => {
|
||||
child.emit("exit", null, signal);
|
||||
child.emit("close", null, signal);
|
||||
});
|
||||
return true;
|
||||
};
|
||||
child.unref = () => {};
|
||||
|
||||
let stdoutEnded = scenario.stdout === undefined;
|
||||
let stderrEnded = scenario.stderr === undefined;
|
||||
|
||||
child.stdout.on("end", () => {
|
||||
stdoutEnded = true;
|
||||
maybeClose();
|
||||
});
|
||||
child.stderr.on("end", () => {
|
||||
stderrEnded = true;
|
||||
maybeClose();
|
||||
});
|
||||
|
||||
function maybeClose() {
|
||||
if (stdoutEnded && stderrEnded && !scenario.hang && !scenario.error) {
|
||||
child.emit("exit", scenario.exitCode ?? 0, null);
|
||||
child.emit("close", scenario.exitCode ?? 0, null);
|
||||
}
|
||||
}
|
||||
|
||||
process.nextTick(() => {
|
||||
if (scenario.error) {
|
||||
child.emit("error", scenario.error);
|
||||
}
|
||||
});
|
||||
|
||||
return child;
|
||||
}
|
||||
|
||||
function createMockFs() {
|
||||
const files = new Map<string, string>();
|
||||
const dirs = new Set<string>();
|
||||
|
||||
const fs = {
|
||||
mkdirSync: (p: string, _opts?: any) => {
|
||||
dirs.add(p);
|
||||
},
|
||||
writeFileSync: (p: string, data: string) => {
|
||||
files.set(p, data);
|
||||
},
|
||||
readFileSync: (p: string, _opts?: any): string => {
|
||||
if (!files.has(p)) {
|
||||
const err = Object.assign(new Error(`ENOENT: ${p}`), { code: "ENOENT" });
|
||||
throw err;
|
||||
}
|
||||
return files.get(p)!;
|
||||
},
|
||||
readdirSync: (p: string): string[] => {
|
||||
const result: string[] = [];
|
||||
for (const f of files.keys()) {
|
||||
const dir = f.substring(0, f.lastIndexOf("/"));
|
||||
if (dir === p) {
|
||||
result.push(f.substring(f.lastIndexOf("/") + 1));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
},
|
||||
existsSync: (p: string): boolean => {
|
||||
return files.has(p) || dirs.has(p);
|
||||
},
|
||||
statSync: (p: string): { mtimeMs: number; isFile: () => boolean } => {
|
||||
if (!files.has(p)) {
|
||||
const err = Object.assign(new Error(`ENOENT: ${p}`), { code: "ENOENT" });
|
||||
throw err;
|
||||
}
|
||||
const data = files.get(p)!;
|
||||
// Use startedAt from JSON for deterministic age tests
|
||||
try {
|
||||
const record = JSON.parse(data) as JobRecord;
|
||||
return { mtimeMs: new Date(record.startedAt).getTime(), isFile: () => true };
|
||||
} catch {
|
||||
return { mtimeMs: Date.now(), isFile: () => true };
|
||||
}
|
||||
},
|
||||
unlinkSync: (p: string) => {
|
||||
files.delete(p);
|
||||
},
|
||||
__files: files,
|
||||
__dirs: dirs,
|
||||
};
|
||||
|
||||
return fs;
|
||||
}
|
||||
|
||||
function mockSpawn(scenarios: Map<string, MockScenario>): any {
|
||||
return (cmd: string, args: string[], _opts: any): any => {
|
||||
const key = [cmd, ...args].join(" ");
|
||||
const scenario = scenarios.get(key);
|
||||
if (!scenario) {
|
||||
return createMockChildProcess({
|
||||
error: Object.assign(new Error("spawn ENOENT"), { code: "ENOENT" }),
|
||||
});
|
||||
}
|
||||
return createMockChildProcess(scenario);
|
||||
};
|
||||
}
|
||||
|
||||
function readJobRecord(fs: ReturnType<typeof createMockFs>, path: string): JobRecord {
|
||||
return JSON.parse(fs.readFileSync(path)) as JobRecord;
|
||||
}
|
||||
|
||||
function delay(ms: number): Promise<void> {
|
||||
return new Promise((r) => setTimeout(r, ms));
|
||||
}
|
||||
|
||||
describe("startJob", () => {
|
||||
it("spawns a detached child process and returns a running Job", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo hello world", { stdout: "ok\n", exitCode: 0 }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job = await startJob("codex", "hello world", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
|
||||
assert.strictEqual(job.client, "codex");
|
||||
assert.strictEqual(job.prompt, "hello world");
|
||||
assert.strictEqual(job.status, "running");
|
||||
assert.strictEqual(typeof job.id, "string");
|
||||
assert.ok(job.id.length > 0);
|
||||
assert.strictEqual(typeof job.pid, "number");
|
||||
assert.ok(typeof job.startedAt === "string");
|
||||
|
||||
// Wait for child to finish
|
||||
await delay(50);
|
||||
|
||||
const record = readJobRecord(fs, `${jobDir}/${job.id}.json`);
|
||||
assert.strictEqual(record.status, "completed");
|
||||
assert.strictEqual(record.stdout, "ok\n");
|
||||
assert.strictEqual(record.result?.exitCode, 0);
|
||||
assert.ok(typeof record.completedAt === "string");
|
||||
});
|
||||
|
||||
it("generates unique ids for concurrent jobs", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo a", { stdout: "a", exitCode: 0 }],
|
||||
["codex exec --yolo b", { stdout: "b", exitCode: 0 }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job1 = await startJob("codex", "a", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
const job2 = await startJob("codex", "b", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
|
||||
assert.notStrictEqual(job1.id, job2.id);
|
||||
});
|
||||
|
||||
it("sets status to timed_out when timeout is exceeded", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo slow", { hang: true }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job = await startJob("codex", "slow", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
timeoutMs: 20,
|
||||
});
|
||||
|
||||
// Wait for timeout + processing
|
||||
await delay(100);
|
||||
|
||||
const record = readJobRecord(fs, `${jobDir}/${job.id}.json`);
|
||||
assert.strictEqual(record.status, "timed_out");
|
||||
});
|
||||
|
||||
it("sets status to failed on non-zero exit code", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo fail", { stdout: "", stderr: "err", exitCode: 1 }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job = await startJob("codex", "fail", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
|
||||
await delay(50);
|
||||
|
||||
const record = readJobRecord(fs, `${jobDir}/${job.id}.json`);
|
||||
assert.strictEqual(record.status, "failed");
|
||||
assert.strictEqual(record.result?.exitCode, 1);
|
||||
assert.strictEqual(record.stderr, "err");
|
||||
});
|
||||
|
||||
it("sets status to failed on spawn ENOENT", async () => {
|
||||
const scenarios = new Map<string, MockScenario>();
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job = await startJob("codex", "hello", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
|
||||
await delay(50);
|
||||
|
||||
const record = readJobRecord(fs, `${jobDir}/${job.id}.json`);
|
||||
assert.strictEqual(record.status, "failed");
|
||||
assert.ok(record.error);
|
||||
});
|
||||
});
|
||||
|
||||
describe("getJob", () => {
|
||||
it("returns the current Job state from disk", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo hello", { stdout: "ok", exitCode: 0 }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job = await startJob("codex", "hello", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
|
||||
const fetched = getJob(job.id, { jobDir, fs });
|
||||
assert.strictEqual(fetched.id, job.id);
|
||||
assert.strictEqual(fetched.status, "running");
|
||||
|
||||
await delay(50);
|
||||
|
||||
const after = getJob(job.id, { jobDir, fs });
|
||||
assert.strictEqual(after.status, "completed");
|
||||
assert.strictEqual(after.result?.exitCode, 0);
|
||||
});
|
||||
|
||||
it("throws JobNotFoundError for nonexistent job", () => {
|
||||
const fs = createMockFs();
|
||||
assert.throws(
|
||||
() => getJob("missing", { jobDir: "/tmp/jobs", fs }),
|
||||
(err: unknown) => err instanceof JobNotFoundError
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("getJobResult", () => {
|
||||
it("returns ExecResult when job is completed", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo hello", { stdout: "ok", exitCode: 0 }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job = await startJob("codex", "hello", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
|
||||
await delay(50);
|
||||
|
||||
const result = getJobResult(job.id, { jobDir, fs });
|
||||
assert.strictEqual(result.stdout, "ok");
|
||||
assert.strictEqual(result.exitCode, 0);
|
||||
});
|
||||
|
||||
it("throws JobResultUnavailableError when job is still running", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo hello", { hang: true }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job = await startJob("codex", "hello", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
timeoutMs: 50,
|
||||
});
|
||||
|
||||
assert.throws(
|
||||
() => getJobResult(job.id, { jobDir, fs }),
|
||||
(err: unknown) => err instanceof JobResultUnavailableError
|
||||
);
|
||||
});
|
||||
|
||||
it("throws JobResultUnavailableError when job failed", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo fail", { stdout: "", stderr: "err", exitCode: 1 }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job = await startJob("codex", "fail", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
|
||||
await delay(50);
|
||||
|
||||
assert.throws(
|
||||
() => getJobResult(job.id, { jobDir, fs }),
|
||||
(err: unknown) => err instanceof JobResultUnavailableError
|
||||
);
|
||||
});
|
||||
|
||||
it("throws JobNotFoundError for nonexistent job", () => {
|
||||
const fs = createMockFs();
|
||||
assert.throws(
|
||||
() => getJobResult("missing", { jobDir: "/tmp/jobs", fs }),
|
||||
(err: unknown) => err instanceof JobNotFoundError
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("cancelJob", () => {
|
||||
it("sends SIGTERM and updates status to cancelled", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo hello", { hang: true }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job = await startJob("codex", "hello", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
|
||||
cancelJob(job.id, { jobDir, fs });
|
||||
|
||||
await delay(50);
|
||||
|
||||
const record = readJobRecord(fs, `${jobDir}/${job.id}.json`);
|
||||
assert.strictEqual(record.status, "cancelled");
|
||||
});
|
||||
|
||||
it("throws JobNotFoundError for nonexistent job", () => {
|
||||
const fs = createMockFs();
|
||||
assert.throws(
|
||||
() => cancelJob("missing", { jobDir: "/tmp/jobs", fs }),
|
||||
(err: unknown) => err instanceof JobNotFoundError
|
||||
);
|
||||
});
|
||||
|
||||
it("is a no-op when job is not running", () => {
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
const now = new Date().toISOString();
|
||||
const completedRecord: JobRecord = {
|
||||
id: "job-completed",
|
||||
client: "codex",
|
||||
prompt: "done",
|
||||
status: "completed",
|
||||
startedAt: now,
|
||||
completedAt: now,
|
||||
stdout: "ok",
|
||||
stderr: "",
|
||||
result: {
|
||||
stdout: "ok",
|
||||
stderr: "",
|
||||
exitCode: 0,
|
||||
client: "codex",
|
||||
durationMs: 100,
|
||||
},
|
||||
};
|
||||
|
||||
fs.mkdirSync(jobDir, { recursive: true });
|
||||
fs.writeFileSync(`${jobDir}/job-completed.json`, JSON.stringify(completedRecord));
|
||||
|
||||
cancelJob("job-completed", { jobDir, fs });
|
||||
|
||||
const record = readJobRecord(fs, `${jobDir}/job-completed.json`);
|
||||
assert.strictEqual(record.status, "completed");
|
||||
});
|
||||
});
|
||||
|
||||
describe("listJobs", () => {
|
||||
it("returns all jobs sorted by startedAt desc", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo a", { stdout: "a", exitCode: 0 }],
|
||||
["codex exec --yolo b", { stdout: "b", exitCode: 0 }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job1 = await startJob("codex", "a", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
await delay(20);
|
||||
const job2 = await startJob("codex", "b", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
|
||||
await delay(50);
|
||||
|
||||
const jobs = listJobs({ jobDir, fs });
|
||||
assert.strictEqual(jobs.length, 2);
|
||||
assert.strictEqual(jobs[0].id, job2.id);
|
||||
assert.strictEqual(jobs[1].id, job1.id);
|
||||
});
|
||||
|
||||
it("returns empty array when jobDir does not exist", () => {
|
||||
const fs = createMockFs();
|
||||
const jobs = listJobs({ jobDir: "/tmp/jobs", fs });
|
||||
assert.deepStrictEqual(jobs, []);
|
||||
});
|
||||
|
||||
it("ignores corrupt or unreadable job files", () => {
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
const now = new Date().toISOString();
|
||||
const validRecord: JobRecord = {
|
||||
id: "job-valid",
|
||||
client: "codex",
|
||||
prompt: "ok",
|
||||
status: "completed",
|
||||
startedAt: now,
|
||||
completedAt: now,
|
||||
stdout: "ok",
|
||||
stderr: "",
|
||||
result: {
|
||||
stdout: "ok",
|
||||
stderr: "",
|
||||
exitCode: 0,
|
||||
client: "codex",
|
||||
durationMs: 100,
|
||||
},
|
||||
};
|
||||
|
||||
fs.mkdirSync(jobDir, { recursive: true });
|
||||
fs.writeFileSync(`${jobDir}/job-valid.json`, JSON.stringify(validRecord));
|
||||
fs.writeFileSync(`${jobDir}/job-corrupt.json`, "not-json");
|
||||
|
||||
const jobs = listJobs({ jobDir, fs });
|
||||
assert.strictEqual(jobs.length, 1);
|
||||
assert.strictEqual(jobs[0].id, "job-valid");
|
||||
});
|
||||
|
||||
it("filters jobs by status when filter is provided", () => {
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const now = new Date().toISOString();
|
||||
const runningRecord: JobRecord = {
|
||||
id: "job-running",
|
||||
client: "codex",
|
||||
prompt: "run",
|
||||
status: "running",
|
||||
startedAt: now,
|
||||
stdout: "",
|
||||
stderr: "",
|
||||
};
|
||||
const completedRecord: JobRecord = {
|
||||
id: "job-completed",
|
||||
client: "claude",
|
||||
prompt: "done",
|
||||
status: "completed",
|
||||
startedAt: now,
|
||||
completedAt: now,
|
||||
stdout: "ok",
|
||||
stderr: "",
|
||||
result: {
|
||||
stdout: "ok",
|
||||
stderr: "",
|
||||
exitCode: 0,
|
||||
client: "claude",
|
||||
durationMs: 100,
|
||||
},
|
||||
};
|
||||
const failedRecord: JobRecord = {
|
||||
id: "job-failed",
|
||||
client: "opencode",
|
||||
prompt: "fail",
|
||||
status: "failed",
|
||||
startedAt: now,
|
||||
completedAt: now,
|
||||
stdout: "",
|
||||
stderr: "err",
|
||||
result: {
|
||||
stdout: "",
|
||||
stderr: "err",
|
||||
exitCode: 1,
|
||||
client: "opencode",
|
||||
durationMs: 100,
|
||||
},
|
||||
};
|
||||
|
||||
fs.mkdirSync(jobDir, { recursive: true });
|
||||
fs.writeFileSync(`${jobDir}/job-running.json`, JSON.stringify(runningRecord));
|
||||
fs.writeFileSync(`${jobDir}/job-completed.json`, JSON.stringify(completedRecord));
|
||||
fs.writeFileSync(`${jobDir}/job-failed.json`, JSON.stringify(failedRecord));
|
||||
|
||||
const all = listJobs({ jobDir, fs });
|
||||
assert.strictEqual(all.length, 3);
|
||||
|
||||
const running = listJobs({ jobDir, fs, filter: "running" });
|
||||
assert.strictEqual(running.length, 1);
|
||||
assert.strictEqual(running[0].id, "job-running");
|
||||
|
||||
const completed = listJobs({ jobDir, fs, filter: "completed" });
|
||||
assert.strictEqual(completed.length, 1);
|
||||
assert.strictEqual(completed[0].id, "job-completed");
|
||||
|
||||
const failed = listJobs({ jobDir, fs, filter: "failed" });
|
||||
assert.strictEqual(failed.length, 1);
|
||||
assert.strictEqual(failed[0].id, "job-failed");
|
||||
});
|
||||
});
|
||||
|
||||
describe("cleanupJobs", () => {
|
||||
it("deletes job files older than maxAgeMs", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo old", { stdout: "old", exitCode: 0 }],
|
||||
["codex exec --yolo new", { stdout: "new", exitCode: 0 }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
// Create an old job by manipulating startedAt after creation
|
||||
const oldJob = await startJob("codex", "old", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
await delay(50);
|
||||
|
||||
const newJob = await startJob("codex", "new", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
await delay(50);
|
||||
|
||||
// Make old job appear 25h old by patching its record
|
||||
const oldPath = `${jobDir}/${oldJob.id}.json`;
|
||||
const oldRecord = readJobRecord(fs, oldPath);
|
||||
oldRecord.startedAt = new Date(Date.now() - 25 * 60 * 60 * 1000).toISOString();
|
||||
fs.writeFileSync(oldPath, JSON.stringify(oldRecord));
|
||||
|
||||
cleanupJobs({ jobDir, fs, maxAgeMs: 24 * 60 * 60 * 1000 });
|
||||
|
||||
assert.strictEqual(fs.existsSync(oldPath), false);
|
||||
assert.strictEqual(fs.existsSync(`${jobDir}/${newJob.id}.json`), true);
|
||||
});
|
||||
|
||||
it("uses default maxAge of 24 hours", async () => {
|
||||
const scenarios = new Map<string, MockScenario>([
|
||||
["codex exec --yolo old", { stdout: "old", exitCode: 0 }],
|
||||
]);
|
||||
const fs = createMockFs();
|
||||
const jobDir = "/tmp/jobs";
|
||||
|
||||
const job = await startJob("codex", "old", {
|
||||
jobDir,
|
||||
spawn: mockSpawn(scenarios),
|
||||
fs,
|
||||
});
|
||||
await delay(50);
|
||||
|
||||
const path = `${jobDir}/${job.id}.json`;
|
||||
const record = readJobRecord(fs, path);
|
||||
record.startedAt = new Date(Date.now() - 25 * 60 * 60 * 1000).toISOString();
|
||||
fs.writeFileSync(path, JSON.stringify(record));
|
||||
|
||||
cleanupJobs({ jobDir, fs });
|
||||
|
||||
assert.strictEqual(fs.existsSync(path), false);
|
||||
});
|
||||
|
||||
it("is a no-op when jobDir does not exist", () => {
|
||||
const fs = createMockFs();
|
||||
// Should not throw
|
||||
cleanupJobs({ jobDir: "/tmp/jobs", fs });
|
||||
assert.strictEqual(fs.existsSync("/tmp/jobs"), false);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user