Files
stef-openclaw-skills/skills/us-cpa/src/us_cpa/review.py
2026-03-15 03:01:16 -05:00

163 lines
7.8 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from us_cpa.returns import normalize_case_facts
from us_cpa.sources import TaxYearCorpus
def _severity_rank(severity: str) -> int:
return {"high": 0, "medium": 1, "low": 2}[severity]
class ReviewEngine:
def __init__(self, *, corpus: TaxYearCorpus | None = None) -> None:
self.corpus = corpus or TaxYearCorpus()
def review_case(self, case_dir: Path) -> dict[str, Any]:
case_dir = Path(case_dir).expanduser().resolve()
manifest = json.loads((case_dir / "case-manifest.json").read_text())
stored_return = json.loads((case_dir / "return" / "normalized-return.json").read_text())
facts_payload = json.loads((case_dir / "extracted" / "facts.json").read_text())
facts = {key: value["value"] for key, value in facts_payload["facts"].items()}
facts["_factMetadata"] = {
key: {"sources": value.get("sources", [])} for key, value in facts_payload["facts"].items()
}
recomputed = normalize_case_facts(facts, manifest["taxYear"])
artifacts_payload = json.loads((case_dir / "output" / "artifacts.json").read_text())
findings: list[dict[str, Any]] = []
if stored_return["totals"]["adjustedGrossIncome"] != recomputed["totals"]["adjustedGrossIncome"]:
findings.append(
{
"severity": "high",
"title": "Adjusted gross income mismatch",
"explanation": "Stored adjusted gross income does not match the recomputed return from case facts.",
"suggestedAction": f"Update AGI to {recomputed['totals']['adjustedGrossIncome']:.2f} on Form 1040 line 11.",
"authorities": [
{"title": "Instructions for Form 1040 and Schedules 1-3", "sourceClass": "irs_instructions"}
],
}
)
for field, label in (
("wages", "wages"),
("taxableInterest", "taxable interest"),
("businessIncome", "business income"),
("capitalGainLoss", "capital gains or losses"),
("rentalIncome", "rental income"),
):
stored_value = stored_return["income"].get(field, 0.0)
recomputed_value = recomputed["income"].get(field, 0.0)
sources = recomputed.get("provenance", {}).get(f"income.{field}", {}).get("sources", [])
has_document_source = any(item.get("sourceType") == "document_extract" for item in sources)
if stored_value != recomputed_value:
findings.append(
{
"severity": "high" if has_document_source else "medium",
"title": f"Source fact mismatch for {label}",
"explanation": f"Stored return reports {stored_value:.2f} for {label}, but case facts support {recomputed_value:.2f}.",
"suggestedAction": f"Reconcile {label} to {recomputed_value:.2f} before treating the return as final.",
"authorities": [
{"title": "Case fact registry", "sourceClass": "irs_form"}
],
}
)
if stored_value == 0 and recomputed_value > 0 and has_document_source:
findings.append(
{
"severity": "high",
"title": f"Likely omitted {label}",
"explanation": f"Document-extracted facts support {recomputed_value:.2f} of {label}, but the stored return reports none.",
"suggestedAction": f"Add {label} to the return and regenerate the required forms.",
"authorities": [
{"title": "Case document extraction", "sourceClass": "irs_form"}
],
}
)
rendered_forms = {artifact["formCode"] for artifact in artifacts_payload["artifacts"]}
for required_form in recomputed["requiredForms"]:
if required_form not in rendered_forms:
findings.append(
{
"severity": "high",
"title": f"Missing rendered artifact for {required_form}",
"explanation": "The return requires this form, but no rendered artifact is present in the artifact manifest.",
"suggestedAction": f"Render and review {required_form} before treating the package as complete.",
"authorities": [{"title": "Supported form manifest", "sourceClass": "irs_form"}],
}
)
for artifact in artifacts_payload["artifacts"]:
if artifact.get("reviewRequired"):
findings.append(
{
"severity": "medium",
"title": f"Human review required for {artifact['formCode']}",
"explanation": "The form was overlay-rendered on the official IRS PDF and must be reviewed before filing.",
"suggestedAction": f"Review the rendered {artifact['formCode']} artifact visually before any filing/export handoff.",
"authorities": [{"title": "Artifact render policy", "sourceClass": "irs_form"}],
}
)
required_forms_union = set(recomputed["requiredForms"]) | set(stored_return.get("requiredForms", []))
if any(form in required_forms_union for form in ("f6251", "f8960", "f8959", "f1116")):
findings.append(
{
"severity": "medium",
"title": "High-complexity tax position requires specialist follow-up",
"explanation": "The return includes forms or computations that usually require deeper technical support and careful authority review.",
"suggestedAction": "Review the supporting authority and computations for the high-complexity forms before treating the return as filing-ready.",
"authorities": [{"title": "Required form analysis", "sourceClass": "irs_instructions"}],
}
)
findings.sort(key=lambda item: (_severity_rank(item["severity"]), item["title"]))
review = {
"status": "reviewed",
"taxYear": manifest["taxYear"],
"caseDir": str(case_dir),
"findingCount": len(findings),
"findings": findings,
}
(case_dir / "reports" / "review-report.json").write_text(json.dumps(review, indent=2))
return review
def render_review_summary(review: dict[str, Any]) -> str:
if not review["findings"]:
return "No findings detected in the reviewed return package."
lines = ["Review findings:"]
for finding in review["findings"]:
lines.append(f"- [{finding['severity'].upper()}] {finding['title']}: {finding['explanation']}")
return "\n".join(lines)
def render_review_memo(review: dict[str, Any]) -> str:
lines = ["# Review Memo", ""]
if not review["findings"]:
lines.append("No findings detected.")
return "\n".join(lines)
for index, finding in enumerate(review["findings"], start=1):
lines.extend(
[
f"## Finding {index}: {finding['title']}",
f"Severity: {finding['severity']}",
"",
"### Explanation",
finding["explanation"],
"",
"### Suggested correction",
finding["suggestedAction"],
"",
"### Authorities",
]
)
for authority in finding["authorities"]:
lines.append(f"- {authority['title']}")
lines.append("")
return "\n".join(lines).rstrip()