feat: add us-cpa case intake workflow

This commit is contained in:
Stefano Fiorini
2026-03-15 00:56:07 -05:00
parent 0c2e34f2f0
commit faff555757
6 changed files with 365 additions and 2 deletions

View File

@@ -0,0 +1,157 @@
from __future__ import annotations
import hashlib
import json
import shutil
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
CASE_SUBDIRECTORIES = (
"input",
"extracted",
"return",
"output",
"reports",
"issues",
"sources",
)
def _timestamp() -> str:
return datetime.now(timezone.utc).isoformat()
def _sha256_path(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(65536), b""):
digest.update(chunk)
return digest.hexdigest()
class CaseConflictError(Exception):
def __init__(self, issue: dict[str, Any]) -> None:
super().__init__(issue["message"])
self.issue = issue
@dataclass
class CaseManager:
case_dir: Path
def __post_init__(self) -> None:
self.case_dir = self.case_dir.expanduser().resolve()
@property
def manifest_path(self) -> Path:
return self.case_dir / "case-manifest.json"
@property
def facts_path(self) -> Path:
return self.case_dir / "extracted" / "facts.json"
@property
def issues_path(self) -> Path:
return self.case_dir / "issues" / "open-issues.json"
def create_case(self, *, case_label: str, tax_year: int) -> dict[str, Any]:
self.case_dir.mkdir(parents=True, exist_ok=True)
for name in CASE_SUBDIRECTORIES:
(self.case_dir / name).mkdir(exist_ok=True)
manifest = {
"caseLabel": case_label,
"taxYear": tax_year,
"createdAt": _timestamp(),
"updatedAt": _timestamp(),
"status": "open",
"documents": [],
}
self.manifest_path.write_text(json.dumps(manifest, indent=2))
if not self.facts_path.exists():
self.facts_path.write_text(json.dumps({"facts": {}}, indent=2))
if not self.issues_path.exists():
self.issues_path.write_text(json.dumps({"issues": []}, indent=2))
return manifest
def load_manifest(self) -> dict[str, Any]:
return json.loads(self.manifest_path.read_text())
def _load_facts(self) -> dict[str, Any]:
return json.loads(self.facts_path.read_text())
def _write_manifest(self, manifest: dict[str, Any]) -> None:
manifest["updatedAt"] = _timestamp()
self.manifest_path.write_text(json.dumps(manifest, indent=2))
def _write_facts(self, facts: dict[str, Any]) -> None:
self.facts_path.write_text(json.dumps(facts, indent=2))
def _write_issue(self, issue: dict[str, Any]) -> None:
current = json.loads(self.issues_path.read_text())
current["issues"].append(issue)
self.issues_path.write_text(json.dumps(current, indent=2))
def intake(
self,
*,
tax_year: int,
user_facts: dict[str, Any],
document_paths: list[Path],
) -> dict[str, Any]:
manifest = self.load_manifest()
if manifest["taxYear"] != tax_year:
raise ValueError(
f"Case tax year {manifest['taxYear']} does not match requested tax year {tax_year}."
)
registered_documents = []
for source_path in document_paths:
source_path = source_path.expanduser().resolve()
destination = self.case_dir / "input" / source_path.name
shutil.copy2(source_path, destination)
document_entry = {
"name": source_path.name,
"sourcePath": str(source_path),
"storedPath": str(destination),
"sha256": _sha256_path(destination),
"registeredAt": _timestamp(),
}
manifest["documents"].append(document_entry)
registered_documents.append(document_entry)
facts_payload = self._load_facts()
for field, value in user_facts.items():
existing = facts_payload["facts"].get(field)
if existing and existing["value"] != value:
issue = {
"status": "needs_resolution",
"issueType": "fact_conflict",
"field": field,
"existingValue": existing["value"],
"newValue": value,
"message": f"Conflicting values for {field}. Resolve before continuing.",
"createdAt": _timestamp(),
"taxYear": tax_year,
}
self._write_issue(issue)
raise CaseConflictError(issue)
facts_payload["facts"][field] = {
"value": value,
"sourceType": "user_statement",
"capturedAt": _timestamp(),
}
self._write_manifest(manifest)
self._write_facts(facts_payload)
return {
"status": "accepted",
"caseDir": str(self.case_dir),
"taxYear": tax_year,
"registeredDocuments": registered_documents,
"factCount": len(facts_payload["facts"]),
}

View File

@@ -6,6 +6,7 @@ import sys
from pathlib import Path
from typing import Any
from us_cpa.cases import CaseConflictError, CaseManager
from us_cpa.sources import TaxYearCorpus, bootstrap_irs_catalog
COMMANDS = (
@@ -47,6 +48,12 @@ def _require_case_dir(args: argparse.Namespace) -> Path:
return Path(args.case_dir).expanduser().resolve()
def _load_json_file(path_value: str | None) -> dict[str, Any]:
if not path_value:
return {}
return json.loads(Path(path_value).expanduser().resolve().read_text())
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="us-cpa",
@@ -74,6 +81,10 @@ def build_parser() -> argparse.ArgumentParser:
"extract-docs", help="Extract facts from case documents."
)
_add_common_arguments(extract_docs)
extract_docs.add_argument("--create-case", action="store_true")
extract_docs.add_argument("--case-label")
extract_docs.add_argument("--facts-json")
extract_docs.add_argument("--input-file", action="append", default=[])
render_forms = subparsers.add_parser(
"render-forms", help="Render compiled IRS forms."
@@ -103,7 +114,35 @@ def main(argv: list[str] | None = None) -> int:
}
return _emit(payload, args.format)
if args.command in {"prepare", "review", "extract-docs", "render-forms", "export-efile-ready"}:
if args.command == "extract-docs":
case_dir = _require_case_dir(args)
manager = CaseManager(case_dir)
if args.create_case:
if not args.case_label:
raise SystemExit("--case-label is required when --create-case is used.")
manager.create_case(case_label=args.case_label, tax_year=args.tax_year)
elif not manager.manifest_path.exists():
raise SystemExit("Case manifest not found. Use --create-case for a new case.")
try:
result = manager.intake(
tax_year=args.tax_year,
user_facts=_load_json_file(args.facts_json),
document_paths=[
Path(path_value).expanduser().resolve() for path_value in args.input_file
],
)
except CaseConflictError as exc:
print(json.dumps(exc.issue, indent=2))
return 1
payload = {
"command": args.command,
"format": args.format,
**result,
}
return _emit(payload, args.format)
if args.command in {"prepare", "review", "render-forms", "export-efile-ready"}:
case_dir = _require_case_dir(args)
payload = {
"command": args.command,