fix: use companion watcher process for async job completion
The async startJob previously relied on Node.js event listeners in the CLI process to capture child output and finalize the job file. But the CLI process exits immediately after returning the job ID, killing the event loop before the close handler fires — leaving jobs stuck at 'running' forever. Fix: startJob now spawns a companion watcher process (job-watcher.ts) that is itself detached and outlives the CLI. The watcher: - Spawns the actual client (codex/claude/opencode) - Captures stdout/stderr - Writes the final job record to disk on child exit - Has its own 10-minute timeout safety net The CLI returns the job ID immediately. The watcher independently finalizes the job. The CLI no longer needs to stay alive. Also updates tests to mock the watcher spawn via injectable spawnWatcher option.
This commit is contained in:
@@ -18,9 +18,11 @@ export interface JobOperationsOptions {
|
||||
};
|
||||
}
|
||||
|
||||
export interface StartJobOptions extends JobStartOptions, JobOperationsOptions {}
|
||||
|
||||
const runningChildren = new Map<string, { child: any; timeout?: ReturnType<typeof setTimeout>; cancelled?: boolean }>();
|
||||
export interface StartJobOptions extends JobStartOptions, JobOperationsOptions {
|
||||
/** Override the watcher spawn (for testing). When provided, startJob calls
|
||||
* this instead of spawning `node --import tsx job-watcher.ts ...` */
|
||||
spawnWatcher?: (jobFilePath: string, command: string, args: string[]) => { pid?: number; unref?: () => void };
|
||||
}
|
||||
|
||||
const DEFAULT_JOB_DIR = `${process.env.HOME || process.env.USERPROFILE}/.openclaw/ai-cli-dispatch/jobs`;
|
||||
|
||||
@@ -70,7 +72,6 @@ export async function startJob(
|
||||
unlinkSync: defaultUnlinkSync,
|
||||
};
|
||||
const spawnImpl = options.spawn ?? defaultSpawn;
|
||||
const timeoutMs = options.timeoutMs ?? 600_000;
|
||||
|
||||
const argBuilder = (CLIENT_ARGS as Record<string, (prompt: string) => string[]>)[client];
|
||||
if (!argBuilder) {
|
||||
@@ -87,12 +88,10 @@ export async function startJob(
|
||||
error: `Unknown client: ${client}`,
|
||||
};
|
||||
writeJobFile(jobDir, errRecord, fs);
|
||||
return new Promise((resolve) =>
|
||||
resolve({ id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error })
|
||||
);
|
||||
return { id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error };
|
||||
}
|
||||
|
||||
const args = argBuilder(prompt);
|
||||
const clientArgs = argBuilder(prompt);
|
||||
const command = options.clientPath ?? client;
|
||||
const startedAt = new Date().toISOString();
|
||||
|
||||
@@ -109,115 +108,51 @@ export async function startJob(
|
||||
|
||||
writeJobFile(jobDir, record, fs);
|
||||
|
||||
return new Promise((resolve) => {
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
let settled = false;
|
||||
let timedOut = false;
|
||||
// Spawn a companion watcher process that outlives this CLI invocation.
|
||||
// The watcher monitors the actual client (codex/claude/opencode), captures
|
||||
// stdout/stderr, and writes the final job record to disk on exit.
|
||||
// This allows the CLI to return the job ID immediately while the watcher
|
||||
// stays alive to finalize the job.
|
||||
let watcher: { pid?: number; unref?: () => void };
|
||||
|
||||
const child = spawnImpl(command, args, {
|
||||
if (options.spawnWatcher) {
|
||||
// Test path: use the injected watcher mock
|
||||
watcher = options.spawnWatcher(`${jobDir}/${jobId}.json`, command, clientArgs);
|
||||
} else {
|
||||
const watcherArgs = [
|
||||
"--import", "tsx",
|
||||
new URL("./job-watcher.ts", import.meta.url).pathname,
|
||||
`${jobDir}/${jobId}.json`,
|
||||
command,
|
||||
...clientArgs,
|
||||
];
|
||||
watcher = spawnImpl("node", watcherArgs, {
|
||||
detached: true,
|
||||
shell: false,
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
stdio: "ignore",
|
||||
});
|
||||
watcher.unref?.();
|
||||
}
|
||||
|
||||
// Close stdin immediately so clients like codex don't hang waiting for input
|
||||
child.stdin?.end();
|
||||
// Give the watcher a tick to spawn and record the real child PID
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
|
||||
record.pid = child.pid ?? undefined;
|
||||
writeJobFile(jobDir, record, fs);
|
||||
// Re-read the job file to pick up the watcher's PID update
|
||||
let updatedRecord: JobRecord;
|
||||
try {
|
||||
updatedRecord = JSON.parse(fs.readFileSync(`${jobDir}/${jobId}.json`, "utf-8")) as JobRecord;
|
||||
} catch {
|
||||
updatedRecord = record;
|
||||
}
|
||||
|
||||
child.stdout?.on("data", (chunk: Buffer | string) => {
|
||||
stdout += chunk.toString();
|
||||
});
|
||||
|
||||
child.stderr?.on("data", (chunk: Buffer | string) => {
|
||||
stderr += chunk.toString();
|
||||
});
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
timedOut = true;
|
||||
try {
|
||||
child.kill("SIGTERM");
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}, timeoutMs);
|
||||
|
||||
runningChildren.set(jobId, { child, timeout });
|
||||
|
||||
function finalize(status: JobStatus, result?: ExecResult, error?: string) {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
clearTimeout(timeout);
|
||||
runningChildren.delete(jobId);
|
||||
const completedAt = new Date().toISOString();
|
||||
const finalRecord: JobRecord = {
|
||||
...record,
|
||||
status,
|
||||
stdout,
|
||||
stderr,
|
||||
result,
|
||||
error,
|
||||
completedAt,
|
||||
};
|
||||
writeJobFile(jobDir, finalRecord, fs);
|
||||
}
|
||||
|
||||
child.on("error", (err: NodeJS.ErrnoException) => {
|
||||
finalize("failed", undefined, err.message);
|
||||
});
|
||||
|
||||
child.on("close", (code: number | null, signal: NodeJS.Signals | null) => {
|
||||
const entry = runningChildren.get(jobId);
|
||||
if (!entry && settled) return;
|
||||
|
||||
if (entry?.cancelled) {
|
||||
finalize("cancelled");
|
||||
return;
|
||||
}
|
||||
|
||||
if (timedOut) {
|
||||
const durationMs = Date.now() - new Date(record.startedAt).getTime();
|
||||
finalize("timed_out", {
|
||||
stdout,
|
||||
stderr,
|
||||
exitCode: -1,
|
||||
client,
|
||||
durationMs,
|
||||
});
|
||||
} else if (code !== null && code !== 0) {
|
||||
const durationMs = Date.now() - new Date(record.startedAt).getTime();
|
||||
finalize("failed", {
|
||||
stdout,
|
||||
stderr,
|
||||
exitCode: code,
|
||||
client,
|
||||
durationMs,
|
||||
});
|
||||
} else {
|
||||
const durationMs = Date.now() - new Date(record.startedAt).getTime();
|
||||
finalize("completed", {
|
||||
stdout,
|
||||
stderr,
|
||||
exitCode: code ?? 0,
|
||||
client,
|
||||
durationMs,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
child.unref?.();
|
||||
|
||||
resolve({
|
||||
id: jobId,
|
||||
client,
|
||||
prompt,
|
||||
status: "running",
|
||||
startedAt,
|
||||
pid: record.pid,
|
||||
});
|
||||
});
|
||||
return {
|
||||
id: jobId,
|
||||
client,
|
||||
prompt,
|
||||
status: "running",
|
||||
startedAt,
|
||||
pid: updatedRecord.pid ?? watcher.pid ?? undefined,
|
||||
};
|
||||
}
|
||||
|
||||
export function getJob(jobId: string, options: JobOperationsOptions = {}): Job {
|
||||
@@ -261,35 +196,22 @@ export function cancelJob(jobId: string, options: JobOperationsOptions = {}): vo
|
||||
return;
|
||||
}
|
||||
|
||||
const entry = runningChildren.get(jobId);
|
||||
if (entry) {
|
||||
entry.cancelled = true;
|
||||
clearTimeout(entry.timeout);
|
||||
try {
|
||||
entry.child.kill("SIGTERM");
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
} else if (record.pid) {
|
||||
// Kill the client child process (PID recorded by the watcher)
|
||||
if (record.pid) {
|
||||
try {
|
||||
process.kill(record.pid, "SIGTERM");
|
||||
} catch {
|
||||
// ignore
|
||||
// ignore — process may have already exited
|
||||
}
|
||||
const cancelledRecord: JobRecord = {
|
||||
...record,
|
||||
status: "cancelled",
|
||||
completedAt: new Date().toISOString(),
|
||||
};
|
||||
writeJobFile(jobDir, cancelledRecord, fs);
|
||||
} else {
|
||||
const cancelledRecord: JobRecord = {
|
||||
...record,
|
||||
status: "cancelled",
|
||||
completedAt: new Date().toISOString(),
|
||||
};
|
||||
writeJobFile(jobDir, cancelledRecord, fs);
|
||||
}
|
||||
|
||||
// Update the job file to cancelled
|
||||
const cancelledRecord: JobRecord = {
|
||||
...record,
|
||||
status: "cancelled",
|
||||
completedAt: new Date().toISOString(),
|
||||
};
|
||||
writeJobFile(jobDir, cancelledRecord, fs);
|
||||
}
|
||||
|
||||
export function listJobs(options: JobOperationsOptions & { filter?: JobStatus } = {}): Job[] {
|
||||
|
||||
Reference in New Issue
Block a user