Files
stef-openclaw-skills/tools/ai-cli-dispatch/src/jobs.ts
T
stefano 0bea1c590d fix: review feedback — signal handling, cancel race, stderr consistency
Address issues found by code review:

1. Bug: timeout/signal-killed child reported as 'completed' with exit
   code 0 because close handler ignored the signal parameter. Now
   treats any signal termination as timed_out.

2. Bug: cancelled job gets overwritten by watcher on child exit. The
   watcher now re-reads the job file before writing and skips if the
   status has been changed to 'cancelled'.

3. Inconsistency: watcher path skipped stderr noise filtering. Added
   filterStderrNoise to the watcher (duplicated from execute.ts to
   keep the watcher self-contained).

4. getJobResult now guards against missing result field instead of
   using non-null assertion.
2026-05-20 14:17:28 -05:00

291 lines
8.5 KiB
TypeScript

import { spawn as defaultSpawn } from "node:child_process";
import { existsSync as defaultExistsSync, mkdirSync as defaultMkdirSync, readFileSync as defaultReadFileSync, readdirSync as defaultReaddirSync, statSync as defaultStatSync, unlinkSync as defaultUnlinkSync, writeFileSync as defaultWriteFileSync } from "node:fs";
import { randomUUID } from "node:crypto";
import { CLIENT_ARGS } from "./execute.js";
import type { ClientName, Job, JobRecord, JobStartOptions, ExecResult, JobStatus } from "./types.js";
import { JobNotFoundError, JobResultUnavailableError } from "./types.js";
export interface JobOperationsOptions {
jobDir?: string;
fs?: {
mkdirSync: typeof defaultMkdirSync;
writeFileSync: typeof defaultWriteFileSync;
readFileSync: typeof defaultReadFileSync;
readdirSync: typeof defaultReaddirSync;
existsSync: typeof defaultExistsSync;
statSync: typeof defaultStatSync;
unlinkSync: typeof defaultUnlinkSync;
};
}
export interface StartJobOptions extends JobStartOptions, JobOperationsOptions {
/** Override the watcher spawn (for testing). When provided, startJob calls
* this instead of spawning `node --import tsx job-watcher.ts ...` */
spawnWatcher?: (jobFilePath: string, command: string, args: string[]) => { pid?: number; unref?: () => void };
}
const DEFAULT_JOB_DIR = `${process.env.HOME || process.env.USERPROFILE}/.openclaw/ai-cli-dispatch/jobs`;
function getJobDir(options?: { jobDir?: string }): string {
return options?.jobDir ?? DEFAULT_JOB_DIR;
}
function writeJobFile(
jobDir: string,
record: JobRecord,
fs: NonNullable<JobOperationsOptions["fs"]>
): void {
fs.mkdirSync(jobDir, { recursive: true });
fs.writeFileSync(`${jobDir}/${record.id}.json`, JSON.stringify(record, null, 2));
}
function readJobFile(
jobId: string,
jobDir: string,
fs: NonNullable<JobOperationsOptions["fs"]>
): JobRecord {
const path = `${jobDir}/${jobId}.json`;
try {
return JSON.parse(fs.readFileSync(path, "utf-8")) as JobRecord;
} catch (err: any) {
if (err.code === "ENOENT") {
throw new JobNotFoundError(jobId);
}
throw err;
}
}
export async function startJob(
client: ClientName,
prompt: string,
options: StartJobOptions = {}
): Promise<Job> {
const jobId = randomUUID();
const jobDir = getJobDir(options);
const fs = options.fs ?? {
mkdirSync: defaultMkdirSync,
writeFileSync: defaultWriteFileSync,
readFileSync: defaultReadFileSync,
readdirSync: defaultReaddirSync,
existsSync: defaultExistsSync,
statSync: defaultStatSync,
unlinkSync: defaultUnlinkSync,
};
const spawnImpl = options.spawn ?? defaultSpawn;
const argBuilder = (CLIENT_ARGS as Record<string, (prompt: string) => string[]>)[client];
if (!argBuilder) {
const startedAt = new Date().toISOString();
const errRecord: JobRecord = {
id: jobId,
client,
prompt,
status: "failed",
startedAt,
completedAt: startedAt,
stdout: "",
stderr: "",
error: `Unknown client: ${client}`,
};
writeJobFile(jobDir, errRecord, fs);
return { id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error };
}
const clientArgs = argBuilder(prompt);
const command = options.clientPath ?? client;
const startedAt = new Date().toISOString();
const record: JobRecord = {
id: jobId,
client,
prompt,
status: "running",
startedAt,
pid: undefined,
stdout: "",
stderr: "",
};
writeJobFile(jobDir, record, fs);
// Spawn a companion watcher process that outlives this CLI invocation.
// The watcher monitors the actual client (codex/claude/opencode), captures
// stdout/stderr, and writes the final job record to disk on exit.
// This allows the CLI to return the job ID immediately while the watcher
// stays alive to finalize the job.
let watcher: { pid?: number; unref?: () => void };
if (options.spawnWatcher) {
// Test path: use the injected watcher mock
watcher = options.spawnWatcher(`${jobDir}/${jobId}.json`, command, clientArgs);
} else {
const watcherArgs = [
"--import", "tsx",
new URL("./job-watcher.ts", import.meta.url).pathname,
`${jobDir}/${jobId}.json`,
command,
...clientArgs,
];
watcher = spawnImpl("node", watcherArgs, {
detached: true,
shell: false,
stdio: "ignore",
});
watcher.unref?.();
}
// Give the watcher a tick to spawn and record the real child PID
await new Promise((r) => setTimeout(r, 100));
// Re-read the job file to pick up the watcher's PID update
let updatedRecord: JobRecord;
try {
updatedRecord = JSON.parse(fs.readFileSync(`${jobDir}/${jobId}.json`, "utf-8")) as JobRecord;
} catch {
updatedRecord = record;
}
return {
id: jobId,
client,
prompt,
status: "running",
startedAt,
pid: updatedRecord.pid ?? watcher.pid ?? undefined,
};
}
export function getJob(jobId: string, options: JobOperationsOptions = {}): Job {
const jobDir = getJobDir(options);
const fs = options.fs ?? {
mkdirSync: defaultMkdirSync,
writeFileSync: defaultWriteFileSync,
readFileSync: defaultReadFileSync,
readdirSync: defaultReaddirSync,
existsSync: defaultExistsSync,
statSync: defaultStatSync,
unlinkSync: defaultUnlinkSync,
};
const record = readJobFile(jobId, jobDir, fs);
const { stdout: _stdout, stderr: _stderr, ...job } = record;
return job;
}
export function getJobResult(jobId: string, options: JobOperationsOptions = {}): ExecResult {
const job = getJob(jobId, options);
if (job.status !== "completed") {
throw new JobResultUnavailableError(jobId, job.status);
}
if (!job.result) {
throw new JobResultUnavailableError(jobId, "completed");
}
return job.result;
}
export function cancelJob(jobId: string, options: JobOperationsOptions = {}): void {
const jobDir = getJobDir(options);
const fs = options.fs ?? {
mkdirSync: defaultMkdirSync,
writeFileSync: defaultWriteFileSync,
readFileSync: defaultReadFileSync,
readdirSync: defaultReaddirSync,
existsSync: defaultExistsSync,
statSync: defaultStatSync,
unlinkSync: defaultUnlinkSync,
};
const record = readJobFile(jobId, jobDir, fs);
if (record.status !== "running") {
return;
}
// Kill the client child process (PID recorded by the watcher)
if (record.pid) {
try {
process.kill(record.pid, "SIGTERM");
} catch {
// ignore — process may have already exited
}
}
// Update the job file to cancelled
const cancelledRecord: JobRecord = {
...record,
status: "cancelled",
completedAt: new Date().toISOString(),
};
writeJobFile(jobDir, cancelledRecord, fs);
}
export function listJobs(options: JobOperationsOptions & { filter?: JobStatus } = {}): Job[] {
const jobDir = getJobDir(options);
const fs = options.fs ?? {
mkdirSync: defaultMkdirSync,
writeFileSync: defaultWriteFileSync,
readFileSync: defaultReadFileSync,
readdirSync: defaultReaddirSync,
existsSync: defaultExistsSync,
statSync: defaultStatSync,
unlinkSync: defaultUnlinkSync,
};
if (!fs.existsSync(jobDir)) {
return [];
}
const entries = fs.readdirSync(jobDir).filter((f) => f.endsWith(".json"));
const jobs: Job[] = [];
for (const entry of entries) {
const jobId = entry.replace(/\.json$/, "");
try {
const record = readJobFile(jobId, jobDir, fs);
const { stdout: _stdout, stderr: _stderr, ...job } = record;
jobs.push(job);
} catch {
// ignore corrupt/missing files
}
}
jobs.sort((a, b) => new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime());
if (options.filter) {
return jobs.filter((j) => j.status === options.filter);
}
return jobs;
}
export function cleanupJobs(options: JobOperationsOptions & { maxAgeMs?: number } = {}): void {
const jobDir = getJobDir(options);
const fs = options.fs ?? {
mkdirSync: defaultMkdirSync,
writeFileSync: defaultWriteFileSync,
readFileSync: defaultReadFileSync,
readdirSync: defaultReaddirSync,
existsSync: defaultExistsSync,
statSync: defaultStatSync,
unlinkSync: defaultUnlinkSync,
};
const maxAgeMs = options.maxAgeMs ?? 24 * 60 * 60 * 1000;
if (!fs.existsSync(jobDir)) {
return;
}
const now = Date.now();
const entries = fs.readdirSync(jobDir).filter((f) => f.endsWith(".json"));
for (const entry of entries) {
const path = `${jobDir}/${entry}`;
try {
const stat = fs.statSync(path);
if (now - stat.mtimeMs > maxAgeMs) {
fs.unlinkSync(path);
}
} catch {
// ignore
}
}
}