121 lines
4.3 KiB
Python
121 lines
4.3 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from pypdf import PdfReader, PdfWriter
|
|
from reportlab.pdfgen import canvas
|
|
|
|
from us_cpa.sources import TaxYearCorpus
|
|
|
|
|
|
FORM_TEMPLATES = {
|
|
"f1040": "f1040",
|
|
"f1040sb": "f1040sb",
|
|
"f1040sc": "f1040sc",
|
|
"f1040se": "f1040se",
|
|
"f1040s1": "f1040s1",
|
|
}
|
|
|
|
|
|
OVERLAY_FIELDS = {
|
|
"f1040": [
|
|
(72, 725, lambda data: f"Taxpayer: {data['taxpayer']['fullName']}"),
|
|
(72, 705, lambda data: f"Filing status: {data['filingStatus']}"),
|
|
(72, 685, lambda data: f"Wages: {data['income']['wages']:.2f}"),
|
|
(72, 665, lambda data: f"Taxable interest: {data['income']['taxableInterest']:.2f}"),
|
|
(72, 645, lambda data: f"AGI: {data['totals']['adjustedGrossIncome']:.2f}"),
|
|
(72, 625, lambda data: f"Standard deduction: {data['deductions']['standardDeduction']:.2f}"),
|
|
(72, 605, lambda data: f"Taxable income: {data['totals']['taxableIncome']:.2f}"),
|
|
(72, 585, lambda data: f"Total tax: {data['taxes']['totalTax']:.2f}"),
|
|
(72, 565, lambda data: f"Withholding: {data['payments']['federalWithholding']:.2f}"),
|
|
(72, 545, lambda data: f"Refund: {data['totals']['refund']:.2f}"),
|
|
(72, 525, lambda data: f"Balance due: {data['totals']['balanceDue']:.2f}"),
|
|
],
|
|
}
|
|
|
|
|
|
FIELD_FILL_VALUES = {
|
|
"f1040": lambda data: {
|
|
"taxpayer_full_name": data["taxpayer"]["fullName"],
|
|
"filing_status": data["filingStatus"],
|
|
"wages": f"{data['income']['wages']:.2f}",
|
|
"taxable_interest": f"{data['income']['taxableInterest']:.2f}",
|
|
}
|
|
}
|
|
|
|
|
|
def _field_fill_page(template_path: Path, output_path: Path, form_code: str, normalized: dict[str, Any]) -> bool:
|
|
reader = PdfReader(str(template_path))
|
|
fields = reader.get_fields() or {}
|
|
values = FIELD_FILL_VALUES.get(form_code, lambda _: {})(normalized)
|
|
matched = {key: value for key, value in values.items() if key in fields}
|
|
if not matched:
|
|
return False
|
|
|
|
writer = PdfWriter(clone_from=str(template_path))
|
|
writer.update_page_form_field_values(writer.pages[0], matched, auto_regenerate=False)
|
|
writer.set_need_appearances_writer()
|
|
with output_path.open("wb") as handle:
|
|
writer.write(handle)
|
|
return True
|
|
|
|
|
|
def _overlay_page(template_path: Path, output_path: Path, form_code: str, normalized: dict[str, Any]) -> None:
|
|
reader = PdfReader(str(template_path))
|
|
writer = PdfWriter(clone_from=str(template_path))
|
|
|
|
page = writer.pages[0]
|
|
width = float(page.mediabox.width)
|
|
height = float(page.mediabox.height)
|
|
buffer = BytesIO()
|
|
pdf = canvas.Canvas(buffer, pagesize=(width, height))
|
|
for x, y, getter in OVERLAY_FIELDS.get(form_code, []):
|
|
pdf.drawString(x, y, getter(normalized))
|
|
pdf.save()
|
|
buffer.seek(0)
|
|
overlay = PdfReader(buffer)
|
|
page.merge_page(overlay.pages[0])
|
|
with output_path.open("wb") as handle:
|
|
writer.write(handle)
|
|
|
|
|
|
def render_case_forms(case_dir: Path, corpus: TaxYearCorpus, normalized: dict[str, Any]) -> dict[str, Any]:
|
|
output_dir = case_dir / "output" / "forms"
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
irs_dir = corpus.paths_for_year(normalized["taxYear"]).irs_dir
|
|
|
|
artifacts = []
|
|
for form_code in normalized["requiredForms"]:
|
|
template_slug = FORM_TEMPLATES.get(form_code)
|
|
if template_slug is None:
|
|
continue
|
|
template_path = irs_dir / f"{template_slug}.pdf"
|
|
output_path = output_dir / f"{form_code}.pdf"
|
|
render_method = "overlay"
|
|
review_required = True
|
|
if _field_fill_page(template_path, output_path, form_code, normalized):
|
|
render_method = "field_fill"
|
|
review_required = False
|
|
else:
|
|
_overlay_page(template_path, output_path, form_code, normalized)
|
|
artifacts.append(
|
|
{
|
|
"formCode": form_code,
|
|
"templatePath": str(template_path),
|
|
"outputPath": str(output_path),
|
|
"renderMethod": render_method,
|
|
"reviewRequired": review_required,
|
|
}
|
|
)
|
|
|
|
artifact_manifest = {
|
|
"taxYear": normalized["taxYear"],
|
|
"artifactCount": len(artifacts),
|
|
"artifacts": artifacts,
|
|
}
|
|
(case_dir / "output" / "artifacts.json").write_text(json.dumps(artifact_manifest, indent=2))
|
|
return artifact_manifest
|