feat: make us-cpa questions retrieval-first

This commit is contained in:
Stefano Fiorini
2026-03-15 04:40:57 -05:00
parent b4f9666560
commit b2bb07fa90
6 changed files with 272 additions and 10 deletions

View File

@@ -57,6 +57,19 @@ def _load_json_file(path_value: str | None) -> dict[str, Any]:
return json.loads(Path(path_value).expanduser().resolve().read_text())
def _ensure_question_corpus(corpus: TaxYearCorpus, tax_year: int) -> None:
paths = corpus.paths_for_year(tax_year)
required_slugs = {item.slug for item in bootstrap_irs_catalog(tax_year)}
if not paths.manifest_path.exists():
corpus.download_catalog(tax_year, bootstrap_irs_catalog(tax_year))
return
manifest = json.loads(paths.manifest_path.read_text())
existing_slugs = {item["slug"] for item in manifest.get("sources", [])}
if not required_slugs.issubset(existing_slugs):
corpus.download_catalog(tax_year, bootstrap_irs_catalog(tax_year))
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="us-cpa",
@@ -110,6 +123,7 @@ def main(argv: list[str] | None = None) -> int:
if args.command == "question":
corpus = TaxYearCorpus()
_ensure_question_corpus(corpus, args.tax_year)
engine = QuestionEngine(corpus=corpus)
case_facts: dict[str, Any] = {}
if args.case_dir:

View File

@@ -1,10 +1,13 @@
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from pypdf import PdfReader
from us_cpa.sources import TaxYearCorpus, build_primary_law_authorities
@@ -56,6 +59,71 @@ RISK_BY_CONFIDENCE = {
}
QUESTION_STOPWORDS = {
"a",
"also",
"am",
"an",
"and",
"are",
"as",
"at",
"be",
"before",
"but",
"by",
"can",
"considered",
"did",
"do",
"does",
"for",
"from",
"had",
"has",
"have",
"her",
"hers",
"his",
"i",
"if",
"in",
"is",
"it",
"its",
"my",
"of",
"or",
"our",
"she",
"should",
"that",
"the",
"their",
"them",
"they",
"this",
"to",
"was",
"we",
"went",
"what",
"worked",
"would",
"year",
"you",
"your",
}
SEARCH_SOURCE_BONUS = {
"irs_publication": 30,
"irs_instructions": 20,
"irs_faq": 10,
"irs_form": 0,
}
def _normalize_question(question: str) -> str:
return question.strip().lower()
@@ -64,6 +132,101 @@ def _filing_status_label(status: str) -> str:
return status.replace("_", " ").title()
def _question_terms(normalized_question: str) -> list[str]:
terms = []
for token in re.findall(r"[a-z0-9]+", normalized_question):
if len(token) < 3 or token in QUESTION_STOPWORDS or token.isdigit():
continue
terms.append(token)
expanded = set(terms)
if any(token in expanded for token in {"dependent", "dependents", "daughter", "son", "child", "children"}):
expanded.update({"dependent", "qualifying", "child", "support", "residency"})
if any(token in expanded for token in {"college", "school", "student", "tuition"}):
expanded.update({"student", "school", "education", "temporary", "absence"})
return sorted(expanded)
def _load_searchable_pages(path: Path) -> list[str]:
payload = path.read_bytes()
if payload.startswith(b"%PDF"):
try:
reader = PdfReader(path)
pages = []
for page in reader.pages:
text = page.extract_text() or ""
if text.strip():
pages.append(text)
if pages:
return pages
except Exception:
pass
try:
decoded = payload.decode("utf-8", errors="ignore")
except Exception:
return []
return [decoded] if decoded.strip() else []
def _build_excerpt(text: str, terms: list[str], *, width: int = 420) -> str:
lowered = text.lower()
first_index = None
for term in terms:
idx = lowered.find(term)
if idx >= 0 and (first_index is None or idx < first_index):
first_index = idx
if first_index is None:
cleaned = " ".join(text.split())
return cleaned[:width]
start = max(0, first_index - 120)
end = min(len(text), first_index + width)
cleaned = " ".join(text[start:end].split())
return cleaned
def _rank_research_hits(manifest: dict[str, Any], normalized_question: str) -> list[dict[str, Any]]:
terms = _question_terms(normalized_question)
if not terms:
return []
hits: list[dict[str, Any]] = []
for source in manifest["sources"]:
path = Path(source["localPath"])
if not path.exists():
continue
pages = _load_searchable_pages(path)
for page_number, text in enumerate(pages, start=1):
lowered = text.lower()
matched_terms = [term for term in terms if term in lowered]
if not matched_terms:
continue
score = (
len(matched_terms) * 10
+ SEARCH_SOURCE_BONUS.get(source["sourceClass"], 0)
- int(source["authorityRank"])
)
hits.append(
{
"slug": source["slug"],
"title": source["title"],
"sourceClass": source["sourceClass"],
"url": source["url"],
"localPath": source["localPath"],
"authorityRank": source["authorityRank"],
"page": page_number,
"score": score,
"matchedTerms": matched_terms,
"excerpt": _build_excerpt(text, matched_terms),
}
)
hits.sort(key=lambda item: (-item["score"], item["authorityRank"], item["slug"], item["page"]))
return hits[:5]
FILING_STATUS_PATTERNS = (
(("qualifying surviving spouse",), "qualifying_surviving_spouse"),
(("qualifying widow",), "qualifying_surviving_spouse"),
@@ -151,8 +314,54 @@ class QuestionEngine:
"riskLevel": RISK_BY_CONFIDENCE[rule["confidence"]],
"followUpQuestions": [],
"primaryLawRequired": False,
"excerpts": [],
}
research_hits = _rank_research_hits(manifest, normalized)
if research_hits:
authorities = []
seen = set()
for hit in research_hits:
if hit["slug"] in seen:
continue
authorities.append(
{
"slug": hit["slug"],
"title": hit["title"],
"sourceClass": hit["sourceClass"],
"url": hit["url"],
"localPath": hit["localPath"],
"authorityRank": hit["authorityRank"],
}
)
seen.add(hit["slug"])
return {
"issue": "irs_corpus_research",
"taxYear": tax_year,
"factsUsed": facts_used,
"missingFacts": [],
"authorities": authorities,
"excerpts": [
{
"slug": hit["slug"],
"title": hit["title"],
"page": hit["page"],
"matchedTerms": hit["matchedTerms"],
"excerpt": hit["excerpt"],
}
for hit in research_hits
],
"conclusion": {
"answer": "Relevant IRS authorities were found in the downloaded tax-year corpus. Answer from those authorities directly, and only escalate further if the cited passages are still insufficient.",
"summary": "Relevant IRS materials in the cached tax-year corpus address this question. Use the cited passages below to answer it directly.",
},
"confidence": "medium",
"riskLevel": "medium",
"followUpQuestions": [],
"primaryLawRequired": False,
}
return {
"issue": "requires_primary_law_escalation",
"taxYear": tax_year,
@@ -172,6 +381,7 @@ class QuestionEngine:
"Is there an existing return position or drafted treatment to review?",
],
"primaryLawRequired": True,
"excerpts": [],
}
@@ -186,6 +396,11 @@ def render_analysis(analysis: dict[str, Any]) -> str:
if analysis["authorities"]:
titles = "; ".join(item["title"] for item in analysis["authorities"])
lines.append(f"Authorities: {titles}.")
if analysis.get("excerpts"):
excerpt_lines = []
for item in analysis["excerpts"][:3]:
excerpt_lines.append(f"{item['title']} p.{item['page']}: {item['excerpt']}")
lines.append(f"Excerpts: {' | '.join(excerpt_lines)}")
if analysis["missingFacts"]:
lines.append(f"Open items: {' '.join(analysis['missingFacts'])}")
return " ".join(lines)
@@ -210,6 +425,10 @@ def render_memo(analysis: dict[str, Any]) -> str:
lines.append(f"- {authority['title']}")
else:
lines.append("- Primary-law escalation required.")
if analysis.get("excerpts"):
lines.extend(["", "## IRS Excerpts"])
for item in analysis["excerpts"]:
lines.append(f"- {item['title']} (page {item['page']}): {item['excerpt']}")
lines.extend(
[
"",

View File

@@ -143,6 +143,7 @@ def bootstrap_irs_catalog(tax_year: int) -> list[SourceDescriptor]:
("i5329", "Instructions for Form 5329", "irs_instructions"),
("i5695", "Instructions for Form 5695", "irs_instructions"),
("i1116", "Instructions for Form 1116", "irs_instructions"),
("p501", "Publication 501, Dependents, Standard Deduction, and Filing Information", "irs_publication"),
]
return [
SourceDescriptor(