feat: add us-cpa question engine

This commit is contained in:
Stefano Fiorini
2026-03-15 01:17:14 -05:00
parent faff555757
commit 8f797b3a51
6 changed files with 360 additions and 2 deletions

View File

@@ -7,6 +7,7 @@ from pathlib import Path
from typing import Any
from us_cpa.cases import CaseConflictError, CaseManager
from us_cpa.questions import QuestionEngine, render_analysis, render_memo
from us_cpa.sources import TaxYearCorpus, bootstrap_irs_catalog
COMMANDS = (
@@ -64,6 +65,7 @@ def build_parser() -> argparse.ArgumentParser:
question = subparsers.add_parser("question", help="Answer a tax question.")
_add_common_arguments(question)
question.add_argument("--question", required=True)
question.add_argument("--style", choices=("conversation", "memo"), default="conversation")
prepare = subparsers.add_parser("prepare", help="Prepare a return case.")
_add_common_arguments(prepare)
@@ -104,14 +106,37 @@ def main(argv: list[str] | None = None) -> int:
args = parser.parse_args(argv)
if args.command == "question":
corpus = TaxYearCorpus()
engine = QuestionEngine(corpus=corpus)
case_facts: dict[str, Any] = {}
if args.case_dir:
manager = CaseManager(Path(args.case_dir))
if manager.facts_path.exists():
case_facts = {
key: value["value"]
for key, value in json.loads(manager.facts_path.read_text())["facts"].items()
}
analysis = engine.answer(
question=args.question,
tax_year=args.tax_year,
case_facts=case_facts,
)
payload = {
"command": "question",
"format": args.format,
"style": args.style,
"taxYear": args.tax_year,
"caseDir": args.case_dir,
"question": args.question,
"status": "not_implemented",
"status": "answered",
"analysis": analysis,
}
payload["rendered"] = (
render_memo(analysis) if args.style == "memo" else render_analysis(analysis)
)
if args.format == "markdown":
print(payload["rendered"])
return 0
return _emit(payload, args.format)
if args.command == "extract-docs":

View File

@@ -0,0 +1,172 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from us_cpa.sources import TaxYearCorpus
TOPIC_RULES = [
{
"issue": "standard_deduction",
"keywords": ("standard deduction",),
"authority_slugs": ("i1040gi",),
"answer_by_status": {
"single": "$15,000",
"married_filing_jointly": "$30,000",
"head_of_household": "$22,500",
},
"summary_template": "{filing_status_label} filers use a {answer} standard deduction for tax year {tax_year}.",
"confidence": "high",
},
{
"issue": "schedule_c_required",
"keywords": ("schedule c", "sole proprietor", "self-employment"),
"authority_slugs": ("f1040sc", "i1040sc"),
"answer": "Schedule C is generally required when a taxpayer reports sole proprietorship business income or expenses.",
"summary": "Business income and expenses from a sole proprietorship generally belong on Schedule C.",
"confidence": "medium",
},
]
def _normalize_question(question: str) -> str:
return question.strip().lower()
def _filing_status_label(status: str) -> str:
return status.replace("_", " ").title()
@dataclass
class QuestionEngine:
corpus: TaxYearCorpus
def _manifest(self, tax_year: int) -> dict[str, Any]:
path = self.corpus.paths_for_year(tax_year).manifest_path
if not path.exists():
raise FileNotFoundError(
f"Tax year {tax_year} corpus not found at {path}. Run fetch-year first."
)
return json.loads(path.read_text())
def _authorities_for(self, manifest: dict[str, Any], slugs: tuple[str, ...]) -> list[dict[str, Any]]:
found = []
sources = {item["slug"]: item for item in manifest["sources"]}
for slug in slugs:
if slug in sources:
source = sources[slug]
found.append(
{
"slug": source["slug"],
"title": source["title"],
"sourceClass": source["sourceClass"],
"url": source["url"],
"localPath": source["localPath"],
"authorityRank": source["authorityRank"],
}
)
return found
def answer(self, *, question: str, tax_year: int, case_facts: dict[str, Any]) -> dict[str, Any]:
manifest = self._manifest(tax_year)
normalized = _normalize_question(question)
facts_used = [{"field": key, "value": value} for key, value in sorted(case_facts.items())]
for rule in TOPIC_RULES:
if all(keyword in normalized for keyword in rule["keywords"]):
authorities = self._authorities_for(manifest, rule["authority_slugs"])
if rule["issue"] == "standard_deduction":
filing_status = case_facts.get("filingStatus", "single")
answer = rule["answer_by_status"].get(filing_status, rule["answer_by_status"]["single"])
summary = rule["summary_template"].format(
filing_status_label=_filing_status_label(filing_status),
answer=answer,
tax_year=tax_year,
)
else:
answer = rule["answer"]
summary = rule["summary"]
return {
"issue": rule["issue"],
"taxYear": tax_year,
"factsUsed": facts_used,
"missingFacts": [],
"authorities": authorities,
"conclusion": {"answer": answer, "summary": summary},
"confidence": rule["confidence"],
"followUpQuestions": [],
"primaryLawRequired": False,
}
return {
"issue": "requires_primary_law_escalation",
"taxYear": tax_year,
"factsUsed": facts_used,
"missingFacts": [
"Internal Revenue Code or Treasury regulation analysis is required before answering this question confidently."
],
"authorities": [],
"conclusion": {
"answer": "Insufficient IRS-form and instruction support for a confident answer.",
"summary": "This question needs primary-law analysis before a reliable answer can be given.",
},
"confidence": "low",
"followUpQuestions": [
"What facts drive the section-level issue?",
"Is there an existing return position or drafted treatment to review?",
],
"primaryLawRequired": True,
}
def render_analysis(analysis: dict[str, Any]) -> str:
lines = [analysis["conclusion"]["summary"]]
if analysis["factsUsed"]:
facts = ", ".join(f"{item['field']}={item['value']}" for item in analysis["factsUsed"])
lines.append(f"Facts used: {facts}.")
if analysis["authorities"]:
titles = "; ".join(item["title"] for item in analysis["authorities"])
lines.append(f"Authorities: {titles}.")
if analysis["missingFacts"]:
lines.append(f"Open items: {' '.join(analysis['missingFacts'])}")
return " ".join(lines)
def render_memo(analysis: dict[str, Any]) -> str:
lines = [
"# Tax Memo",
"",
f"## Issue\n{analysis['issue']}",
"",
"## Facts",
]
if analysis["factsUsed"]:
for item in analysis["factsUsed"]:
lines.append(f"- {item['field']}: {item['value']}")
else:
lines.append("- No case-specific facts supplied.")
lines.extend(["", "## Authorities"])
if analysis["authorities"]:
for authority in analysis["authorities"]:
lines.append(f"- {authority['title']}")
else:
lines.append("- Primary-law escalation required.")
lines.extend(
[
"",
"## Analysis",
analysis["conclusion"]["summary"],
"",
"## Conclusion",
analysis["conclusion"]["answer"],
]
)
if analysis["missingFacts"]:
lines.extend(["", "## Open Items"])
for item in analysis["missingFacts"]:
lines.append(f"- {item}")
return "\n".join(lines)