Files
stef-openclaw-skills/skills/us-cpa/src/us_cpa/questions.py
T
2026-03-15 04:40:57 -05:00

449 lines
15 KiB
Python

from __future__ import annotations
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from pypdf import PdfReader
from us_cpa.sources import TaxYearCorpus, build_primary_law_authorities
TOPIC_RULES = [
{
"issue": "standard_deduction",
"keywords": ("standard deduction",),
"authority_slugs": ("i1040gi",),
"answer_by_status": {
"single": "$15,750",
"married_filing_jointly": "$31,500",
"qualifying_surviving_spouse": "$31,500",
"head_of_household": "$23,625",
},
"summary_template": "{filing_status_label} filers use a {answer} standard deduction for tax year {tax_year}.",
"confidence": "high",
},
{
"issue": "schedule_c_required",
"keywords": ("schedule c", "sole proprietor", "self-employment"),
"authority_slugs": ("f1040sc", "i1040sc"),
"answer": "Schedule C is generally required when a taxpayer reports sole proprietorship business income or expenses.",
"summary": "Business income and expenses from a sole proprietorship generally belong on Schedule C.",
"confidence": "medium",
},
{
"issue": "schedule_d_required",
"keywords": ("schedule d", "capital gains"),
"authority_slugs": ("f1040sd", "i1040sd", "f8949", "i8949"),
"answer": "Schedule D is generally required when a taxpayer reports capital gains or losses, often alongside Form 8949.",
"summary": "Capital gains and losses generally flow through Schedule D, with Form 8949 supporting detail when required.",
"confidence": "medium",
},
{
"issue": "schedule_e_required",
"keywords": ("schedule e", "rental income"),
"authority_slugs": ("f1040se", "i1040se"),
"answer": "Schedule E is generally required when a taxpayer reports rental real-estate income or expenses.",
"summary": "Rental income and expenses generally belong on Schedule E.",
"confidence": "medium",
},
]
RISK_BY_CONFIDENCE = {
"high": "low",
"medium": "medium",
"low": "high",
}
QUESTION_STOPWORDS = {
"a",
"also",
"am",
"an",
"and",
"are",
"as",
"at",
"be",
"before",
"but",
"by",
"can",
"considered",
"did",
"do",
"does",
"for",
"from",
"had",
"has",
"have",
"her",
"hers",
"his",
"i",
"if",
"in",
"is",
"it",
"its",
"my",
"of",
"or",
"our",
"she",
"should",
"that",
"the",
"their",
"them",
"they",
"this",
"to",
"was",
"we",
"went",
"what",
"worked",
"would",
"year",
"you",
"your",
}
SEARCH_SOURCE_BONUS = {
"irs_publication": 30,
"irs_instructions": 20,
"irs_faq": 10,
"irs_form": 0,
}
def _normalize_question(question: str) -> str:
return question.strip().lower()
def _filing_status_label(status: str) -> str:
return status.replace("_", " ").title()
def _question_terms(normalized_question: str) -> list[str]:
terms = []
for token in re.findall(r"[a-z0-9]+", normalized_question):
if len(token) < 3 or token in QUESTION_STOPWORDS or token.isdigit():
continue
terms.append(token)
expanded = set(terms)
if any(token in expanded for token in {"dependent", "dependents", "daughter", "son", "child", "children"}):
expanded.update({"dependent", "qualifying", "child", "support", "residency"})
if any(token in expanded for token in {"college", "school", "student", "tuition"}):
expanded.update({"student", "school", "education", "temporary", "absence"})
return sorted(expanded)
def _load_searchable_pages(path: Path) -> list[str]:
payload = path.read_bytes()
if payload.startswith(b"%PDF"):
try:
reader = PdfReader(path)
pages = []
for page in reader.pages:
text = page.extract_text() or ""
if text.strip():
pages.append(text)
if pages:
return pages
except Exception:
pass
try:
decoded = payload.decode("utf-8", errors="ignore")
except Exception:
return []
return [decoded] if decoded.strip() else []
def _build_excerpt(text: str, terms: list[str], *, width: int = 420) -> str:
lowered = text.lower()
first_index = None
for term in terms:
idx = lowered.find(term)
if idx >= 0 and (first_index is None or idx < first_index):
first_index = idx
if first_index is None:
cleaned = " ".join(text.split())
return cleaned[:width]
start = max(0, first_index - 120)
end = min(len(text), first_index + width)
cleaned = " ".join(text[start:end].split())
return cleaned
def _rank_research_hits(manifest: dict[str, Any], normalized_question: str) -> list[dict[str, Any]]:
terms = _question_terms(normalized_question)
if not terms:
return []
hits: list[dict[str, Any]] = []
for source in manifest["sources"]:
path = Path(source["localPath"])
if not path.exists():
continue
pages = _load_searchable_pages(path)
for page_number, text in enumerate(pages, start=1):
lowered = text.lower()
matched_terms = [term for term in terms if term in lowered]
if not matched_terms:
continue
score = (
len(matched_terms) * 10
+ SEARCH_SOURCE_BONUS.get(source["sourceClass"], 0)
- int(source["authorityRank"])
)
hits.append(
{
"slug": source["slug"],
"title": source["title"],
"sourceClass": source["sourceClass"],
"url": source["url"],
"localPath": source["localPath"],
"authorityRank": source["authorityRank"],
"page": page_number,
"score": score,
"matchedTerms": matched_terms,
"excerpt": _build_excerpt(text, matched_terms),
}
)
hits.sort(key=lambda item: (-item["score"], item["authorityRank"], item["slug"], item["page"]))
return hits[:5]
FILING_STATUS_PATTERNS = (
(("qualifying surviving spouse",), "qualifying_surviving_spouse"),
(("qualifying widow",), "qualifying_surviving_spouse"),
(("qualifying widower",), "qualifying_surviving_spouse"),
(("surviving spouse",), "qualifying_surviving_spouse"),
(("married filing jointly",), "married_filing_jointly"),
(("mfj",), "married_filing_jointly"),
(("head of household",), "head_of_household"),
(("hoh",), "head_of_household"),
(("married filing separately",), "married_filing_separately"),
(("mfs",), "married_filing_separately"),
(("single",), "single"),
)
def _infer_filing_status(normalized_question: str, case_facts: dict[str, Any]) -> str:
if "filingStatus" in case_facts:
return case_facts["filingStatus"]
for patterns, filing_status in FILING_STATUS_PATTERNS:
if all(pattern in normalized_question for pattern in patterns):
return filing_status
return "single"
@dataclass
class QuestionEngine:
corpus: TaxYearCorpus
def _manifest(self, tax_year: int) -> dict[str, Any]:
path = self.corpus.paths_for_year(tax_year).manifest_path
if not path.exists():
raise FileNotFoundError(
f"Tax year {tax_year} corpus not found at {path}. Run fetch-year first."
)
return json.loads(path.read_text())
def _authorities_for(self, manifest: dict[str, Any], slugs: tuple[str, ...]) -> list[dict[str, Any]]:
found = []
sources = {item["slug"]: item for item in manifest["sources"]}
for slug in slugs:
if slug in sources:
source = sources[slug]
found.append(
{
"slug": source["slug"],
"title": source["title"],
"sourceClass": source["sourceClass"],
"url": source["url"],
"localPath": source["localPath"],
"authorityRank": source["authorityRank"],
}
)
return found
def answer(self, *, question: str, tax_year: int, case_facts: dict[str, Any]) -> dict[str, Any]:
manifest = self._manifest(tax_year)
normalized = _normalize_question(question)
facts_used = [{"field": key, "value": value} for key, value in sorted(case_facts.items())]
for rule in TOPIC_RULES:
if all(keyword in normalized for keyword in rule["keywords"]):
authorities = self._authorities_for(manifest, rule["authority_slugs"])
if rule["issue"] == "standard_deduction":
filing_status = _infer_filing_status(normalized, case_facts)
answer = rule["answer_by_status"].get(filing_status, rule["answer_by_status"]["single"])
summary = rule["summary_template"].format(
filing_status_label=_filing_status_label(filing_status),
answer=answer,
tax_year=tax_year,
)
else:
answer = rule["answer"]
summary = rule["summary"]
return {
"issue": rule["issue"],
"taxYear": tax_year,
"factsUsed": facts_used,
"missingFacts": [],
"authorities": authorities,
"conclusion": {"answer": answer, "summary": summary},
"confidence": rule["confidence"],
"riskLevel": RISK_BY_CONFIDENCE[rule["confidence"]],
"followUpQuestions": [],
"primaryLawRequired": False,
"excerpts": [],
}
research_hits = _rank_research_hits(manifest, normalized)
if research_hits:
authorities = []
seen = set()
for hit in research_hits:
if hit["slug"] in seen:
continue
authorities.append(
{
"slug": hit["slug"],
"title": hit["title"],
"sourceClass": hit["sourceClass"],
"url": hit["url"],
"localPath": hit["localPath"],
"authorityRank": hit["authorityRank"],
}
)
seen.add(hit["slug"])
return {
"issue": "irs_corpus_research",
"taxYear": tax_year,
"factsUsed": facts_used,
"missingFacts": [],
"authorities": authorities,
"excerpts": [
{
"slug": hit["slug"],
"title": hit["title"],
"page": hit["page"],
"matchedTerms": hit["matchedTerms"],
"excerpt": hit["excerpt"],
}
for hit in research_hits
],
"conclusion": {
"answer": "Relevant IRS authorities were found in the downloaded tax-year corpus. Answer from those authorities directly, and only escalate further if the cited passages are still insufficient.",
"summary": "Relevant IRS materials in the cached tax-year corpus address this question. Use the cited passages below to answer it directly.",
},
"confidence": "medium",
"riskLevel": "medium",
"followUpQuestions": [],
"primaryLawRequired": False,
}
return {
"issue": "requires_primary_law_escalation",
"taxYear": tax_year,
"factsUsed": facts_used,
"missingFacts": [
"Internal Revenue Code or Treasury regulation analysis is required before answering this question confidently."
],
"authorities": build_primary_law_authorities(question),
"conclusion": {
"answer": "Insufficient IRS-form and instruction support for a confident answer.",
"summary": "This question needs primary-law analysis before a reliable answer can be given.",
},
"confidence": "low",
"riskLevel": "high",
"followUpQuestions": [
"What facts drive the section-level issue?",
"Is there an existing return position or drafted treatment to review?",
],
"primaryLawRequired": True,
"excerpts": [],
}
def render_analysis(analysis: dict[str, Any]) -> str:
lines = [analysis["conclusion"]["summary"]]
lines.append(
f"Confidence: {analysis['confidence']}. Risk: {analysis['riskLevel']}."
)
if analysis["factsUsed"]:
facts = ", ".join(f"{item['field']}={item['value']}" for item in analysis["factsUsed"])
lines.append(f"Facts used: {facts}.")
if analysis["authorities"]:
titles = "; ".join(item["title"] for item in analysis["authorities"])
lines.append(f"Authorities: {titles}.")
if analysis.get("excerpts"):
excerpt_lines = []
for item in analysis["excerpts"][:3]:
excerpt_lines.append(f"{item['title']} p.{item['page']}: {item['excerpt']}")
lines.append(f"Excerpts: {' | '.join(excerpt_lines)}")
if analysis["missingFacts"]:
lines.append(f"Open items: {' '.join(analysis['missingFacts'])}")
return " ".join(lines)
def render_memo(analysis: dict[str, Any]) -> str:
lines = [
"# Tax Memo",
"",
f"## Issue\n{analysis['issue']}",
"",
"## Facts",
]
if analysis["factsUsed"]:
for item in analysis["factsUsed"]:
lines.append(f"- {item['field']}: {item['value']}")
else:
lines.append("- No case-specific facts supplied.")
lines.extend(["", "## Authorities"])
if analysis["authorities"]:
for authority in analysis["authorities"]:
lines.append(f"- {authority['title']}")
else:
lines.append("- Primary-law escalation required.")
if analysis.get("excerpts"):
lines.extend(["", "## IRS Excerpts"])
for item in analysis["excerpts"]:
lines.append(f"- {item['title']} (page {item['page']}): {item['excerpt']}")
lines.extend(
[
"",
"## Analysis",
analysis["conclusion"]["summary"],
f"Confidence: {analysis['confidence']}",
f"Risk level: {analysis['riskLevel']}",
"",
"## Conclusion",
analysis["conclusion"]["answer"],
]
)
if analysis["missingFacts"]:
lines.extend(["", "## Open Items"])
for item in analysis["missingFacts"]:
lines.append(f"- {item}")
return "\n".join(lines)