feat: add us-cpa review workflow
This commit is contained in:
111
skills/us-cpa/src/us_cpa/review.py
Normal file
111
skills/us-cpa/src/us_cpa/review.py
Normal file
@@ -0,0 +1,111 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from us_cpa.returns import normalize_case_facts
|
||||
from us_cpa.sources import TaxYearCorpus
|
||||
|
||||
|
||||
def _severity_rank(severity: str) -> int:
|
||||
return {"high": 0, "medium": 1, "low": 2}[severity]
|
||||
|
||||
|
||||
class ReviewEngine:
|
||||
def __init__(self, *, corpus: TaxYearCorpus | None = None) -> None:
|
||||
self.corpus = corpus or TaxYearCorpus()
|
||||
|
||||
def review_case(self, case_dir: Path) -> dict[str, Any]:
|
||||
case_dir = Path(case_dir).expanduser().resolve()
|
||||
manifest = json.loads((case_dir / "case-manifest.json").read_text())
|
||||
stored_return = json.loads((case_dir / "return" / "normalized-return.json").read_text())
|
||||
facts_payload = json.loads((case_dir / "extracted" / "facts.json").read_text())
|
||||
facts = {key: value["value"] for key, value in facts_payload["facts"].items()}
|
||||
recomputed = normalize_case_facts(facts, manifest["taxYear"])
|
||||
artifacts_payload = json.loads((case_dir / "output" / "artifacts.json").read_text())
|
||||
|
||||
findings: list[dict[str, Any]] = []
|
||||
if stored_return["totals"]["adjustedGrossIncome"] != recomputed["totals"]["adjustedGrossIncome"]:
|
||||
findings.append(
|
||||
{
|
||||
"severity": "high",
|
||||
"title": "Adjusted gross income mismatch",
|
||||
"explanation": "Stored adjusted gross income does not match the recomputed return from case facts.",
|
||||
"suggestedAction": f"Update AGI to {recomputed['totals']['adjustedGrossIncome']:.2f} on Form 1040 line 11.",
|
||||
"authorities": [
|
||||
{"title": "Instructions for Form 1040 and Schedules 1-3", "sourceClass": "irs_instructions"}
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
rendered_forms = {artifact["formCode"] for artifact in artifacts_payload["artifacts"]}
|
||||
for required_form in recomputed["requiredForms"]:
|
||||
if required_form not in rendered_forms:
|
||||
findings.append(
|
||||
{
|
||||
"severity": "high",
|
||||
"title": f"Missing rendered artifact for {required_form}",
|
||||
"explanation": "The return requires this form, but no rendered artifact is present in the artifact manifest.",
|
||||
"suggestedAction": f"Render and review {required_form} before treating the package as complete.",
|
||||
"authorities": [{"title": "Supported form manifest", "sourceClass": "irs_form"}],
|
||||
}
|
||||
)
|
||||
|
||||
for artifact in artifacts_payload["artifacts"]:
|
||||
if artifact.get("reviewRequired"):
|
||||
findings.append(
|
||||
{
|
||||
"severity": "medium",
|
||||
"title": f"Human review required for {artifact['formCode']}",
|
||||
"explanation": "The form was overlay-rendered on the official IRS PDF and must be reviewed before filing.",
|
||||
"suggestedAction": f"Review the rendered {artifact['formCode']} artifact visually before any filing/export handoff.",
|
||||
"authorities": [{"title": "Artifact render policy", "sourceClass": "irs_form"}],
|
||||
}
|
||||
)
|
||||
|
||||
findings.sort(key=lambda item: (_severity_rank(item["severity"]), item["title"]))
|
||||
review = {
|
||||
"status": "reviewed",
|
||||
"taxYear": manifest["taxYear"],
|
||||
"caseDir": str(case_dir),
|
||||
"findingCount": len(findings),
|
||||
"findings": findings,
|
||||
}
|
||||
(case_dir / "reports" / "review-report.json").write_text(json.dumps(review, indent=2))
|
||||
return review
|
||||
|
||||
|
||||
def render_review_summary(review: dict[str, Any]) -> str:
|
||||
if not review["findings"]:
|
||||
return "No findings detected in the reviewed return package."
|
||||
lines = ["Review findings:"]
|
||||
for finding in review["findings"]:
|
||||
lines.append(f"- [{finding['severity'].upper()}] {finding['title']}: {finding['explanation']}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def render_review_memo(review: dict[str, Any]) -> str:
|
||||
lines = ["# Review Memo", ""]
|
||||
if not review["findings"]:
|
||||
lines.append("No findings detected.")
|
||||
return "\n".join(lines)
|
||||
for index, finding in enumerate(review["findings"], start=1):
|
||||
lines.extend(
|
||||
[
|
||||
f"## Finding {index}: {finding['title']}",
|
||||
f"Severity: {finding['severity']}",
|
||||
"",
|
||||
"### Explanation",
|
||||
finding["explanation"],
|
||||
"",
|
||||
"### Suggested correction",
|
||||
finding["suggestedAction"],
|
||||
"",
|
||||
"### Authorities",
|
||||
]
|
||||
)
|
||||
for authority in finding["authorities"]:
|
||||
lines.append(f"- {authority['title']}")
|
||||
lines.append("")
|
||||
return "\n".join(lines).rstrip()
|
||||
Reference in New Issue
Block a user