import { spawn as defaultSpawn } from "node:child_process"; import { existsSync as defaultExistsSync, mkdirSync as defaultMkdirSync, readFileSync as defaultReadFileSync, readdirSync as defaultReaddirSync, statSync as defaultStatSync, unlinkSync as defaultUnlinkSync, writeFileSync as defaultWriteFileSync } from "node:fs"; import { randomUUID } from "node:crypto"; import { CLIENT_ARGS } from "./execute.js"; import type { ClientName, Job, JobRecord, JobStartOptions, ExecResult, JobStatus } from "./types.js"; import { JobNotFoundError, JobResultUnavailableError } from "./types.js"; export interface JobOperationsOptions { jobDir?: string; fs?: { mkdirSync: typeof defaultMkdirSync; writeFileSync: typeof defaultWriteFileSync; readFileSync: typeof defaultReadFileSync; readdirSync: typeof defaultReaddirSync; existsSync: typeof defaultExistsSync; statSync: typeof defaultStatSync; unlinkSync: typeof defaultUnlinkSync; }; } export interface StartJobOptions extends JobStartOptions, JobOperationsOptions {} const runningChildren = new Map; cancelled?: boolean }>(); const DEFAULT_JOB_DIR = `${process.env.HOME || process.env.USERPROFILE}/.openclaw/ai-cli-dispatch/jobs`; function getJobDir(options?: { jobDir?: string }): string { return options?.jobDir ?? DEFAULT_JOB_DIR; } function writeJobFile( jobDir: string, record: JobRecord, fs: NonNullable ): void { fs.mkdirSync(jobDir, { recursive: true }); fs.writeFileSync(`${jobDir}/${record.id}.json`, JSON.stringify(record, null, 2)); } function readJobFile( jobId: string, jobDir: string, fs: NonNullable ): JobRecord { const path = `${jobDir}/${jobId}.json`; try { return JSON.parse(fs.readFileSync(path, "utf-8")) as JobRecord; } catch (err: any) { if (err.code === "ENOENT") { throw new JobNotFoundError(jobId); } throw err; } } export async function startJob( client: ClientName, prompt: string, options: StartJobOptions = {} ): Promise { const jobId = randomUUID(); const jobDir = getJobDir(options); const fs = options.fs ?? { mkdirSync: defaultMkdirSync, writeFileSync: defaultWriteFileSync, readFileSync: defaultReadFileSync, readdirSync: defaultReaddirSync, existsSync: defaultExistsSync, statSync: defaultStatSync, unlinkSync: defaultUnlinkSync, }; const spawnImpl = options.spawn ?? defaultSpawn; const timeoutMs = options.timeoutMs ?? 600_000; const argBuilder = (CLIENT_ARGS as Record string[]>)[client]; if (!argBuilder) { const startedAt = new Date().toISOString(); const errRecord: JobRecord = { id: jobId, client, prompt, status: "failed", startedAt, completedAt: startedAt, stdout: "", stderr: "", error: `Unknown client: ${client}`, }; writeJobFile(jobDir, errRecord, fs); return { id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error }; } const args = argBuilder(prompt); const command = options.clientPath ?? client; const startedAt = new Date().toISOString(); const record: JobRecord = { id: jobId, client, prompt, status: "running", startedAt, pid: undefined, stdout: "", stderr: "", }; writeJobFile(jobDir, record, fs); return new Promise((resolve) => { let stdout = ""; let stderr = ""; let settled = false; let timedOut = false; const child = spawnImpl(command, args, { detached: true, shell: false, stdio: ["ignore", "pipe", "pipe"], }); record.pid = child.pid ?? undefined; writeJobFile(jobDir, record, fs); child.stdout?.on("data", (chunk: Buffer | string) => { stdout += chunk.toString(); }); child.stderr?.on("data", (chunk: Buffer | string) => { stderr += chunk.toString(); }); const timeout = setTimeout(() => { timedOut = true; try { child.kill("SIGTERM"); } catch { // ignore } }, timeoutMs); runningChildren.set(jobId, { child, timeout }); function finalize(status: JobStatus, result?: ExecResult, error?: string) { if (settled) return; settled = true; clearTimeout(timeout); runningChildren.delete(jobId); const completedAt = new Date().toISOString(); const finalRecord: JobRecord = { ...record, status, stdout, stderr, result, error, completedAt, }; writeJobFile(jobDir, finalRecord, fs); } child.on("error", (err: NodeJS.ErrnoException) => { finalize("failed", undefined, err.message); }); child.on("close", (code: number | null, signal: NodeJS.Signals | null) => { const entry = runningChildren.get(jobId); if (!entry && settled) return; if (entry?.cancelled) { finalize("cancelled"); return; } if (timedOut) { const durationMs = Date.now() - new Date(record.startedAt).getTime(); finalize("timed_out", { stdout, stderr, exitCode: -1, client, durationMs, }); } else if (code !== null && code !== 0) { const durationMs = Date.now() - new Date(record.startedAt).getTime(); finalize("failed", { stdout, stderr, exitCode: code, client, durationMs, }); } else { const durationMs = Date.now() - new Date(record.startedAt).getTime(); finalize("completed", { stdout, stderr, exitCode: code ?? 0, client, durationMs, }); } }); child.unref?.(); resolve({ id: jobId, client, prompt, status: "running", startedAt, pid: record.pid, }); }); } export function getJob(jobId: string, options: JobOperationsOptions = {}): Job { const jobDir = getJobDir(options); const fs = options.fs ?? { mkdirSync: defaultMkdirSync, writeFileSync: defaultWriteFileSync, readFileSync: defaultReadFileSync, readdirSync: defaultReaddirSync, existsSync: defaultExistsSync, statSync: defaultStatSync, unlinkSync: defaultUnlinkSync, }; const record = readJobFile(jobId, jobDir, fs); const { stdout: _stdout, stderr: _stderr, ...job } = record; return job; } export function getJobResult(jobId: string, options: JobOperationsOptions = {}): ExecResult { const job = getJob(jobId, options); if (job.status !== "completed") { throw new JobResultUnavailableError(jobId, job.status); } return job.result!; } export function cancelJob(jobId: string, options: JobOperationsOptions = {}): void { const jobDir = getJobDir(options); const fs = options.fs ?? { mkdirSync: defaultMkdirSync, writeFileSync: defaultWriteFileSync, readFileSync: defaultReadFileSync, readdirSync: defaultReaddirSync, existsSync: defaultExistsSync, statSync: defaultStatSync, unlinkSync: defaultUnlinkSync, }; const record = readJobFile(jobId, jobDir, fs); if (record.status !== "running") { return; } const entry = runningChildren.get(jobId); if (entry) { entry.cancelled = true; clearTimeout(entry.timeout); try { entry.child.kill("SIGTERM"); } catch { // ignore } } else if (record.pid) { try { process.kill(record.pid, "SIGTERM"); } catch { // ignore } const cancelledRecord: JobRecord = { ...record, status: "cancelled", completedAt: new Date().toISOString(), }; writeJobFile(jobDir, cancelledRecord, fs); } else { const cancelledRecord: JobRecord = { ...record, status: "cancelled", completedAt: new Date().toISOString(), }; writeJobFile(jobDir, cancelledRecord, fs); } } export function listJobs(options: JobOperationsOptions & { filter?: JobStatus } = {}): Job[] { const jobDir = getJobDir(options); const fs = options.fs ?? { mkdirSync: defaultMkdirSync, writeFileSync: defaultWriteFileSync, readFileSync: defaultReadFileSync, readdirSync: defaultReaddirSync, existsSync: defaultExistsSync, statSync: defaultStatSync, unlinkSync: defaultUnlinkSync, }; if (!fs.existsSync(jobDir)) { return []; } const entries = fs.readdirSync(jobDir).filter((f) => f.endsWith(".json")); const jobs: Job[] = []; for (const entry of entries) { const jobId = entry.replace(/\.json$/, ""); try { const record = readJobFile(jobId, jobDir, fs); const { stdout: _stdout, stderr: _stderr, ...job } = record; jobs.push(job); } catch { // ignore corrupt/missing files } } jobs.sort((a, b) => new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime()); if (options.filter) { return jobs.filter((j) => j.status === options.filter); } return jobs; } export function cleanupJobs(options: JobOperationsOptions & { maxAgeMs?: number } = {}): void { const jobDir = getJobDir(options); const fs = options.fs ?? { mkdirSync: defaultMkdirSync, writeFileSync: defaultWriteFileSync, readFileSync: defaultReadFileSync, readdirSync: defaultReaddirSync, existsSync: defaultExistsSync, statSync: defaultStatSync, unlinkSync: defaultUnlinkSync, }; const maxAgeMs = options.maxAgeMs ?? 24 * 60 * 60 * 1000; if (!fs.existsSync(jobDir)) { return; } const now = Date.now(); const entries = fs.readdirSync(jobDir).filter((f) => f.endsWith(".json")); for (const entry of entries) { const path = `${jobDir}/${entry}`; try { const stat = fs.statSync(path); if (now - stat.mtimeMs > maxAgeMs) { fs.unlinkSync(path); } } catch { // ignore } } }