Files
stef-openclaw-skills/tools/ai-cli-dispatch/src/jobs.ts
T

363 lines
9.7 KiB
TypeScript

import { spawn as defaultSpawn } from "node:child_process";
import { existsSync as defaultExistsSync, mkdirSync as defaultMkdirSync, readFileSync as defaultReadFileSync, readdirSync as defaultReaddirSync, statSync as defaultStatSync, unlinkSync as defaultUnlinkSync, writeFileSync as defaultWriteFileSync } from "node:fs";
import { randomUUID } from "node:crypto";
import { CLIENT_ARGS } from "./execute.js";
import type { ClientName, Job, JobRecord, JobStartOptions, ExecResult, JobStatus } from "./types.js";
import { JobNotFoundError, JobResultUnavailableError } from "./types.js";
export interface JobOperationsOptions {
jobDir?: string;
fs?: {
mkdirSync: typeof defaultMkdirSync;
writeFileSync: typeof defaultWriteFileSync;
readFileSync: typeof defaultReadFileSync;
readdirSync: typeof defaultReaddirSync;
existsSync: typeof defaultExistsSync;
statSync: typeof defaultStatSync;
unlinkSync: typeof defaultUnlinkSync;
};
}
export interface StartJobOptions extends JobStartOptions, JobOperationsOptions {}
const runningChildren = new Map<string, { child: any; timeout?: ReturnType<typeof setTimeout>; cancelled?: boolean }>();
const DEFAULT_JOB_DIR = `${process.env.HOME || process.env.USERPROFILE}/.openclaw/ai-cli-dispatch/jobs`;
function getJobDir(options?: { jobDir?: string }): string {
return options?.jobDir ?? DEFAULT_JOB_DIR;
}
function writeJobFile(
jobDir: string,
record: JobRecord,
fs: NonNullable<JobOperationsOptions["fs"]>
): void {
fs.mkdirSync(jobDir, { recursive: true });
fs.writeFileSync(`${jobDir}/${record.id}.json`, JSON.stringify(record, null, 2));
}
function readJobFile(
jobId: string,
jobDir: string,
fs: NonNullable<JobOperationsOptions["fs"]>
): JobRecord {
const path = `${jobDir}/${jobId}.json`;
try {
return JSON.parse(fs.readFileSync(path, "utf-8")) as JobRecord;
} catch (err: any) {
if (err.code === "ENOENT") {
throw new JobNotFoundError(jobId);
}
throw err;
}
}
export async function startJob(
client: ClientName,
prompt: string,
options: StartJobOptions = {}
): Promise<Job> {
const jobId = randomUUID();
const jobDir = getJobDir(options);
const fs = options.fs ?? {
mkdirSync: defaultMkdirSync,
writeFileSync: defaultWriteFileSync,
readFileSync: defaultReadFileSync,
readdirSync: defaultReaddirSync,
existsSync: defaultExistsSync,
statSync: defaultStatSync,
unlinkSync: defaultUnlinkSync,
};
const spawnImpl = options.spawn ?? defaultSpawn;
const timeoutMs = options.timeoutMs ?? 600_000;
const argBuilder = (CLIENT_ARGS as Record<string, (prompt: string) => string[]>)[client];
if (!argBuilder) {
const startedAt = new Date().toISOString();
const errRecord: JobRecord = {
id: jobId,
client,
prompt,
status: "failed",
startedAt,
completedAt: startedAt,
stdout: "",
stderr: "",
error: `Unknown client: ${client}`,
};
writeJobFile(jobDir, errRecord, fs);
return new Promise((resolve) =>
resolve({ id: jobId, client, prompt, status: "failed", startedAt, error: errRecord.error })
);
}
const args = argBuilder(prompt);
const command = options.clientPath ?? client;
const startedAt = new Date().toISOString();
const record: JobRecord = {
id: jobId,
client,
prompt,
status: "running",
startedAt,
pid: undefined,
stdout: "",
stderr: "",
};
writeJobFile(jobDir, record, fs);
return new Promise((resolve) => {
let stdout = "";
let stderr = "";
let settled = false;
let timedOut = false;
const child = spawnImpl(command, args, {
detached: true,
shell: false,
stdio: ["ignore", "pipe", "pipe"],
});
record.pid = child.pid ?? undefined;
writeJobFile(jobDir, record, fs);
child.stdout?.on("data", (chunk: Buffer | string) => {
stdout += chunk.toString();
});
child.stderr?.on("data", (chunk: Buffer | string) => {
stderr += chunk.toString();
});
const timeout = setTimeout(() => {
timedOut = true;
try {
child.kill("SIGTERM");
} catch {
// ignore
}
}, timeoutMs);
runningChildren.set(jobId, { child, timeout });
function finalize(status: JobStatus, result?: ExecResult, error?: string) {
if (settled) return;
settled = true;
clearTimeout(timeout);
runningChildren.delete(jobId);
const completedAt = new Date().toISOString();
const finalRecord: JobRecord = {
...record,
status,
stdout,
stderr,
result,
error,
completedAt,
};
writeJobFile(jobDir, finalRecord, fs);
}
child.on("error", (err: NodeJS.ErrnoException) => {
finalize("failed", undefined, err.message);
});
child.on("close", (code: number | null, signal: NodeJS.Signals | null) => {
const entry = runningChildren.get(jobId);
if (!entry && settled) return;
if (entry?.cancelled) {
finalize("cancelled");
return;
}
if (timedOut) {
const durationMs = Date.now() - new Date(record.startedAt).getTime();
finalize("timed_out", {
stdout,
stderr,
exitCode: -1,
client,
durationMs,
});
} else if (code !== null && code !== 0) {
const durationMs = Date.now() - new Date(record.startedAt).getTime();
finalize("failed", {
stdout,
stderr,
exitCode: code,
client,
durationMs,
});
} else {
const durationMs = Date.now() - new Date(record.startedAt).getTime();
finalize("completed", {
stdout,
stderr,
exitCode: code ?? 0,
client,
durationMs,
});
}
});
child.unref?.();
resolve({
id: jobId,
client,
prompt,
status: "running",
startedAt,
pid: record.pid,
});
});
}
export function getJob(jobId: string, options: JobOperationsOptions = {}): Job {
const jobDir = getJobDir(options);
const fs = options.fs ?? {
mkdirSync: defaultMkdirSync,
writeFileSync: defaultWriteFileSync,
readFileSync: defaultReadFileSync,
readdirSync: defaultReaddirSync,
existsSync: defaultExistsSync,
statSync: defaultStatSync,
unlinkSync: defaultUnlinkSync,
};
const record = readJobFile(jobId, jobDir, fs);
const { stdout: _stdout, stderr: _stderr, ...job } = record;
return job;
}
export function getJobResult(jobId: string, options: JobOperationsOptions = {}): ExecResult {
const job = getJob(jobId, options);
if (job.status !== "completed") {
throw new JobResultUnavailableError(jobId, job.status);
}
return job.result!;
}
export function cancelJob(jobId: string, options: JobOperationsOptions = {}): void {
const jobDir = getJobDir(options);
const fs = options.fs ?? {
mkdirSync: defaultMkdirSync,
writeFileSync: defaultWriteFileSync,
readFileSync: defaultReadFileSync,
readdirSync: defaultReaddirSync,
existsSync: defaultExistsSync,
statSync: defaultStatSync,
unlinkSync: defaultUnlinkSync,
};
const record = readJobFile(jobId, jobDir, fs);
if (record.status !== "running") {
return;
}
const entry = runningChildren.get(jobId);
if (entry) {
entry.cancelled = true;
clearTimeout(entry.timeout);
try {
entry.child.kill("SIGTERM");
} catch {
// ignore
}
} else if (record.pid) {
try {
process.kill(record.pid, "SIGTERM");
} catch {
// ignore
}
const cancelledRecord: JobRecord = {
...record,
status: "cancelled",
completedAt: new Date().toISOString(),
};
writeJobFile(jobDir, cancelledRecord, fs);
} else {
const cancelledRecord: JobRecord = {
...record,
status: "cancelled",
completedAt: new Date().toISOString(),
};
writeJobFile(jobDir, cancelledRecord, fs);
}
}
export function listJobs(options: JobOperationsOptions & { filter?: JobStatus } = {}): Job[] {
const jobDir = getJobDir(options);
const fs = options.fs ?? {
mkdirSync: defaultMkdirSync,
writeFileSync: defaultWriteFileSync,
readFileSync: defaultReadFileSync,
readdirSync: defaultReaddirSync,
existsSync: defaultExistsSync,
statSync: defaultStatSync,
unlinkSync: defaultUnlinkSync,
};
if (!fs.existsSync(jobDir)) {
return [];
}
const entries = fs.readdirSync(jobDir).filter((f) => f.endsWith(".json"));
const jobs: Job[] = [];
for (const entry of entries) {
const jobId = entry.replace(/\.json$/, "");
try {
const record = readJobFile(jobId, jobDir, fs);
const { stdout: _stdout, stderr: _stderr, ...job } = record;
jobs.push(job);
} catch {
// ignore corrupt/missing files
}
}
jobs.sort((a, b) => new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime());
if (options.filter) {
return jobs.filter((j) => j.status === options.filter);
}
return jobs;
}
export function cleanupJobs(options: JobOperationsOptions & { maxAgeMs?: number } = {}): void {
const jobDir = getJobDir(options);
const fs = options.fs ?? {
mkdirSync: defaultMkdirSync,
writeFileSync: defaultWriteFileSync,
readFileSync: defaultReadFileSync,
readdirSync: defaultReaddirSync,
existsSync: defaultExistsSync,
statSync: defaultStatSync,
unlinkSync: defaultUnlinkSync,
};
const maxAgeMs = options.maxAgeMs ?? 24 * 60 * 60 * 1000;
if (!fs.existsSync(jobDir)) {
return;
}
const now = Date.now();
const entries = fs.readdirSync(jobDir).filter((f) => f.endsWith(".json"));
for (const entry of entries) {
const path = `${jobDir}/${entry}`;
try {
const stat = fs.statSync(path);
if (now - stat.mtimeMs > maxAgeMs) {
fs.unlinkSync(path);
}
} catch {
// ignore
}
}
}