559 lines
15 KiB
TypeScript
559 lines
15 KiB
TypeScript
import { describe, it } from "node:test";
|
|
import assert from "node:assert";
|
|
import { EventEmitter } from "node:events";
|
|
import { Readable } from "node:stream";
|
|
import {
|
|
startJob,
|
|
getJob,
|
|
getJobResult,
|
|
cancelJob,
|
|
listJobs,
|
|
cleanupJobs,
|
|
} from "../src/jobs.js";
|
|
import { JobNotFoundError, JobResultUnavailableError } from "../src/types.js";
|
|
import type { ClientName, JobRecord, JobStatus } from "../src/types.js";
|
|
|
|
interface MockScenario {
|
|
stdout?: string;
|
|
stderr?: string;
|
|
exitCode?: number;
|
|
error?: NodeJS.ErrnoException;
|
|
hang?: boolean;
|
|
}
|
|
|
|
function createMockChildProcess(scenario: MockScenario): any {
|
|
const child = new EventEmitter() as any;
|
|
child.pid = 12345;
|
|
child.stdout = Readable.from(
|
|
scenario.stdout !== undefined ? [scenario.stdout] : []
|
|
);
|
|
child.stderr = Readable.from(
|
|
scenario.stderr !== undefined ? [scenario.stderr] : []
|
|
);
|
|
child.killed = false;
|
|
child.kill = (signal: string = "SIGTERM") => {
|
|
child.killed = true;
|
|
process.nextTick(() => {
|
|
child.emit("exit", null, signal);
|
|
child.emit("close", null, signal);
|
|
});
|
|
return true;
|
|
};
|
|
child.unref = () => {};
|
|
|
|
let stdoutEnded = scenario.stdout === undefined;
|
|
let stderrEnded = scenario.stderr === undefined;
|
|
|
|
child.stdout.on("end", () => {
|
|
stdoutEnded = true;
|
|
maybeClose();
|
|
});
|
|
child.stderr.on("end", () => {
|
|
stderrEnded = true;
|
|
maybeClose();
|
|
});
|
|
|
|
function maybeClose() {
|
|
if (stdoutEnded && stderrEnded && !scenario.hang && !scenario.error) {
|
|
child.emit("exit", scenario.exitCode ?? 0, null);
|
|
child.emit("close", scenario.exitCode ?? 0, null);
|
|
}
|
|
}
|
|
|
|
process.nextTick(() => {
|
|
if (scenario.error) {
|
|
child.emit("error", scenario.error);
|
|
}
|
|
});
|
|
|
|
return child;
|
|
}
|
|
|
|
function createMockFs() {
|
|
const files = new Map<string, string>();
|
|
const dirs = new Set<string>();
|
|
|
|
const fs = {
|
|
mkdirSync: (p: string, _opts?: any) => {
|
|
dirs.add(p);
|
|
},
|
|
writeFileSync: (p: string, data: string) => {
|
|
files.set(p, data);
|
|
},
|
|
readFileSync: (p: string, _opts?: any): string => {
|
|
if (!files.has(p)) {
|
|
const err = Object.assign(new Error(`ENOENT: ${p}`), { code: "ENOENT" });
|
|
throw err;
|
|
}
|
|
return files.get(p)!;
|
|
},
|
|
readdirSync: (p: string): string[] => {
|
|
const result: string[] = [];
|
|
for (const f of files.keys()) {
|
|
const dir = f.substring(0, f.lastIndexOf("/"));
|
|
if (dir === p) {
|
|
result.push(f.substring(f.lastIndexOf("/") + 1));
|
|
}
|
|
}
|
|
return result;
|
|
},
|
|
existsSync: (p: string): boolean => {
|
|
return files.has(p) || dirs.has(p);
|
|
},
|
|
statSync: (p: string): { mtimeMs: number; isFile: () => boolean } => {
|
|
if (!files.has(p)) {
|
|
const err = Object.assign(new Error(`ENOENT: ${p}`), { code: "ENOENT" });
|
|
throw err;
|
|
}
|
|
const data = files.get(p)!;
|
|
// Use startedAt from JSON for deterministic age tests
|
|
try {
|
|
const record = JSON.parse(data) as JobRecord;
|
|
return { mtimeMs: new Date(record.startedAt).getTime(), isFile: () => true };
|
|
} catch {
|
|
return { mtimeMs: Date.now(), isFile: () => true };
|
|
}
|
|
},
|
|
unlinkSync: (p: string) => {
|
|
files.delete(p);
|
|
},
|
|
__files: files,
|
|
__dirs: dirs,
|
|
};
|
|
|
|
return fs;
|
|
}
|
|
|
|
function mockSpawn(scenarios: Map<string, MockScenario>): any {
|
|
return (cmd: string, args: string[], _opts: any): any => {
|
|
const key = [cmd, ...args].join(" ");
|
|
const scenario = scenarios.get(key);
|
|
if (!scenario) {
|
|
return createMockChildProcess({
|
|
error: Object.assign(new Error("spawn ENOENT"), { code: "ENOENT" }),
|
|
});
|
|
}
|
|
return createMockChildProcess(scenario);
|
|
};
|
|
}
|
|
|
|
function readJobRecord(fs: ReturnType<typeof createMockFs>, path: string): JobRecord {
|
|
return JSON.parse(fs.readFileSync(path)) as JobRecord;
|
|
}
|
|
|
|
function delay(ms: number): Promise<void> {
|
|
return new Promise((r) => setTimeout(r, ms));
|
|
}
|
|
|
|
describe("startJob", () => {
|
|
it("spawns a detached child process and returns a running Job", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo hello world", { stdout: "ok\n", exitCode: 0 }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job = await startJob("codex", "hello world", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
|
|
assert.strictEqual(job.client, "codex");
|
|
assert.strictEqual(job.prompt, "hello world");
|
|
assert.strictEqual(job.status, "running");
|
|
assert.strictEqual(typeof job.id, "string");
|
|
assert.ok(job.id.length > 0);
|
|
assert.strictEqual(typeof job.pid, "number");
|
|
assert.ok(typeof job.startedAt === "string");
|
|
|
|
// Wait for child to finish
|
|
await delay(50);
|
|
|
|
const record = readJobRecord(fs, `${jobDir}/${job.id}.json`);
|
|
assert.strictEqual(record.status, "completed");
|
|
assert.strictEqual(record.stdout, "ok\n");
|
|
assert.strictEqual(record.result?.exitCode, 0);
|
|
assert.ok(typeof record.completedAt === "string");
|
|
});
|
|
|
|
it("generates unique ids for concurrent jobs", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo a", { stdout: "a", exitCode: 0 }],
|
|
["codex exec --yolo b", { stdout: "b", exitCode: 0 }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job1 = await startJob("codex", "a", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
const job2 = await startJob("codex", "b", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
|
|
assert.notStrictEqual(job1.id, job2.id);
|
|
});
|
|
|
|
it("sets status to timed_out when timeout is exceeded", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo slow", { hang: true }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job = await startJob("codex", "slow", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
timeoutMs: 20,
|
|
});
|
|
|
|
// Wait for timeout + processing
|
|
await delay(100);
|
|
|
|
const record = readJobRecord(fs, `${jobDir}/${job.id}.json`);
|
|
assert.strictEqual(record.status, "timed_out");
|
|
});
|
|
|
|
it("sets status to failed on non-zero exit code", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo fail", { stdout: "", stderr: "err", exitCode: 1 }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job = await startJob("codex", "fail", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
|
|
await delay(50);
|
|
|
|
const record = readJobRecord(fs, `${jobDir}/${job.id}.json`);
|
|
assert.strictEqual(record.status, "failed");
|
|
assert.strictEqual(record.result?.exitCode, 1);
|
|
assert.strictEqual(record.stderr, "err");
|
|
});
|
|
|
|
it("sets status to failed on spawn ENOENT", async () => {
|
|
const scenarios = new Map<string, MockScenario>();
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job = await startJob("codex", "hello", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
|
|
await delay(50);
|
|
|
|
const record = readJobRecord(fs, `${jobDir}/${job.id}.json`);
|
|
assert.strictEqual(record.status, "failed");
|
|
assert.ok(record.error);
|
|
});
|
|
});
|
|
|
|
describe("getJob", () => {
|
|
it("returns the current Job state from disk", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo hello", { stdout: "ok", exitCode: 0 }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job = await startJob("codex", "hello", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
|
|
const fetched = getJob(job.id, { jobDir, fs });
|
|
assert.strictEqual(fetched.id, job.id);
|
|
assert.strictEqual(fetched.status, "running");
|
|
|
|
await delay(50);
|
|
|
|
const after = getJob(job.id, { jobDir, fs });
|
|
assert.strictEqual(after.status, "completed");
|
|
assert.strictEqual(after.result?.exitCode, 0);
|
|
});
|
|
|
|
it("throws JobNotFoundError for nonexistent job", () => {
|
|
const fs = createMockFs();
|
|
assert.throws(
|
|
() => getJob("missing", { jobDir: "/tmp/jobs", fs }),
|
|
(err: unknown) => err instanceof JobNotFoundError
|
|
);
|
|
});
|
|
});
|
|
|
|
describe("getJobResult", () => {
|
|
it("returns ExecResult when job is completed", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo hello", { stdout: "ok", exitCode: 0 }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job = await startJob("codex", "hello", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
|
|
await delay(50);
|
|
|
|
const result = getJobResult(job.id, { jobDir, fs });
|
|
assert.strictEqual(result.stdout, "ok");
|
|
assert.strictEqual(result.exitCode, 0);
|
|
});
|
|
|
|
it("throws JobResultUnavailableError when job is still running", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo hello", { hang: true }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job = await startJob("codex", "hello", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
timeoutMs: 50,
|
|
});
|
|
|
|
assert.throws(
|
|
() => getJobResult(job.id, { jobDir, fs }),
|
|
(err: unknown) => err instanceof JobResultUnavailableError
|
|
);
|
|
});
|
|
|
|
it("throws JobResultUnavailableError when job failed", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo fail", { stdout: "", stderr: "err", exitCode: 1 }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job = await startJob("codex", "fail", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
|
|
await delay(50);
|
|
|
|
assert.throws(
|
|
() => getJobResult(job.id, { jobDir, fs }),
|
|
(err: unknown) => err instanceof JobResultUnavailableError
|
|
);
|
|
});
|
|
|
|
it("throws JobNotFoundError for nonexistent job", () => {
|
|
const fs = createMockFs();
|
|
assert.throws(
|
|
() => getJobResult("missing", { jobDir: "/tmp/jobs", fs }),
|
|
(err: unknown) => err instanceof JobNotFoundError
|
|
);
|
|
});
|
|
});
|
|
|
|
describe("cancelJob", () => {
|
|
it("sends SIGTERM and updates status to cancelled", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo hello", { hang: true }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job = await startJob("codex", "hello", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
|
|
cancelJob(job.id, { jobDir, fs });
|
|
|
|
await delay(50);
|
|
|
|
const record = readJobRecord(fs, `${jobDir}/${job.id}.json`);
|
|
assert.strictEqual(record.status, "cancelled");
|
|
});
|
|
|
|
it("throws JobNotFoundError for nonexistent job", () => {
|
|
const fs = createMockFs();
|
|
assert.throws(
|
|
() => cancelJob("missing", { jobDir: "/tmp/jobs", fs }),
|
|
(err: unknown) => err instanceof JobNotFoundError
|
|
);
|
|
});
|
|
});
|
|
|
|
describe("listJobs", () => {
|
|
it("returns all jobs sorted by startedAt desc", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo a", { stdout: "a", exitCode: 0 }],
|
|
["codex exec --yolo b", { stdout: "b", exitCode: 0 }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job1 = await startJob("codex", "a", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
await delay(20);
|
|
const job2 = await startJob("codex", "b", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
|
|
await delay(50);
|
|
|
|
const jobs = listJobs({ jobDir, fs });
|
|
assert.strictEqual(jobs.length, 2);
|
|
assert.strictEqual(jobs[0].id, job2.id);
|
|
assert.strictEqual(jobs[1].id, job1.id);
|
|
});
|
|
|
|
it("filters jobs by status when filter is provided", () => {
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const now = new Date().toISOString();
|
|
const runningRecord: JobRecord = {
|
|
id: "job-running",
|
|
client: "codex",
|
|
prompt: "run",
|
|
status: "running",
|
|
startedAt: now,
|
|
stdout: "",
|
|
stderr: "",
|
|
};
|
|
const completedRecord: JobRecord = {
|
|
id: "job-completed",
|
|
client: "claude",
|
|
prompt: "done",
|
|
status: "completed",
|
|
startedAt: now,
|
|
completedAt: now,
|
|
stdout: "ok",
|
|
stderr: "",
|
|
result: {
|
|
stdout: "ok",
|
|
stderr: "",
|
|
exitCode: 0,
|
|
client: "claude",
|
|
durationMs: 100,
|
|
},
|
|
};
|
|
const failedRecord: JobRecord = {
|
|
id: "job-failed",
|
|
client: "opencode",
|
|
prompt: "fail",
|
|
status: "failed",
|
|
startedAt: now,
|
|
completedAt: now,
|
|
stdout: "",
|
|
stderr: "err",
|
|
result: {
|
|
stdout: "",
|
|
stderr: "err",
|
|
exitCode: 1,
|
|
client: "opencode",
|
|
durationMs: 100,
|
|
},
|
|
};
|
|
|
|
fs.mkdirSync(jobDir, { recursive: true });
|
|
fs.writeFileSync(`${jobDir}/job-running.json`, JSON.stringify(runningRecord));
|
|
fs.writeFileSync(`${jobDir}/job-completed.json`, JSON.stringify(completedRecord));
|
|
fs.writeFileSync(`${jobDir}/job-failed.json`, JSON.stringify(failedRecord));
|
|
|
|
const all = listJobs({ jobDir, fs });
|
|
assert.strictEqual(all.length, 3);
|
|
|
|
const running = listJobs({ jobDir, fs, filter: "running" });
|
|
assert.strictEqual(running.length, 1);
|
|
assert.strictEqual(running[0].id, "job-running");
|
|
|
|
const completed = listJobs({ jobDir, fs, filter: "completed" });
|
|
assert.strictEqual(completed.length, 1);
|
|
assert.strictEqual(completed[0].id, "job-completed");
|
|
|
|
const failed = listJobs({ jobDir, fs, filter: "failed" });
|
|
assert.strictEqual(failed.length, 1);
|
|
assert.strictEqual(failed[0].id, "job-failed");
|
|
});
|
|
});
|
|
|
|
describe("cleanupJobs", () => {
|
|
it("deletes job files older than maxAgeMs", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo old", { stdout: "old", exitCode: 0 }],
|
|
["codex exec --yolo new", { stdout: "new", exitCode: 0 }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
// Create an old job by manipulating startedAt after creation
|
|
const oldJob = await startJob("codex", "old", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
await delay(50);
|
|
|
|
const newJob = await startJob("codex", "new", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
await delay(50);
|
|
|
|
// Make old job appear 25h old by patching its record
|
|
const oldPath = `${jobDir}/${oldJob.id}.json`;
|
|
const oldRecord = readJobRecord(fs, oldPath);
|
|
oldRecord.startedAt = new Date(Date.now() - 25 * 60 * 60 * 1000).toISOString();
|
|
fs.writeFileSync(oldPath, JSON.stringify(oldRecord));
|
|
|
|
cleanupJobs({ jobDir, fs, maxAgeMs: 24 * 60 * 60 * 1000 });
|
|
|
|
assert.strictEqual(fs.existsSync(oldPath), false);
|
|
assert.strictEqual(fs.existsSync(`${jobDir}/${newJob.id}.json`), true);
|
|
});
|
|
|
|
it("uses default maxAge of 24 hours", async () => {
|
|
const scenarios = new Map<string, MockScenario>([
|
|
["codex exec --yolo old", { stdout: "old", exitCode: 0 }],
|
|
]);
|
|
const fs = createMockFs();
|
|
const jobDir = "/tmp/jobs";
|
|
|
|
const job = await startJob("codex", "old", {
|
|
jobDir,
|
|
spawn: mockSpawn(scenarios),
|
|
fs,
|
|
});
|
|
await delay(50);
|
|
|
|
const path = `${jobDir}/${job.id}.json`;
|
|
const record = readJobRecord(fs, path);
|
|
record.startedAt = new Date(Date.now() - 25 * 60 * 60 * 1000).toISOString();
|
|
fs.writeFileSync(path, JSON.stringify(record));
|
|
|
|
cleanupJobs({ jobDir, fs });
|
|
|
|
assert.strictEqual(fs.existsSync(path), false);
|
|
});
|
|
});
|