|
|
|
# nyc_tickets.py
|
|
# Scrape NYC CityPay "Search by License Plate" for tickets, expand groups,
|
|
# collect EVERY individual row, and export to Excel.
|
|
#
|
|
# Column policy (per user):
|
|
# - Always drop Column C (3rd column) before saving each plate sheet.
|
|
# - Drop columns with headers: Pending Payment, Total Amount Due, Payment Amount,
|
|
# and any "Payment Amount: $..." variant (even if split across lines/spaces).
|
|
# - In "Violation #", remove "View Ticket" and keep only the numeric ticket number.
|
|
# - Reduce results wait to 10 seconds.
|
|
|
|
import argparse
|
|
import csv
|
|
import time
|
|
import re
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
|
|
import pandas as pd
|
|
from playwright.sync_api import sync_playwright, Page
|
|
|
|
CITYPAY_URL = "https://a836-citypay.nyc.gov/citypay/Parking"
|
|
|
|
|
|
# -------------------- small helpers --------------------
|
|
|
|
def log(msg: str) -> None:
|
|
print(f"[tickets] {msg}")
|
|
|
|
def to_money(s: str) -> float:
|
|
try:
|
|
s = (s or "").replace("$", "").replace(",", "").strip()
|
|
return float(s) if s else 0.0
|
|
except Exception:
|
|
return 0.0
|
|
|
|
def read_plates_csv(path: Path) -> List[Tuple[str, str, str]]:
|
|
rows: List[Tuple[str, str, str]] = []
|
|
with open(path, newline="", encoding="utf-8-sig") as f:
|
|
for row in csv.DictReader(f):
|
|
plate = (row.get("plate") or "").strip()
|
|
state = (row.get("state") or "NY").strip()
|
|
ptype = (row.get("plate_type") or "--ALL--").strip()
|
|
if plate and state:
|
|
rows.append((plate, state, ptype))
|
|
return rows
|
|
|
|
|
|
# -------------------- debugging --------------------
|
|
|
|
def save_debug(page: Page, plate: str, tag: str) -> None:
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
outdir = Path("debug")
|
|
outdir.mkdir(exist_ok=True)
|
|
html = outdir / f"{plate}_{tag}_{ts}.html"
|
|
png = outdir / f"{plate}_{tag}_{ts}.png"
|
|
try:
|
|
html.write_text(page.content(), encoding="utf-8", errors="ignore")
|
|
except Exception:
|
|
pass
|
|
try:
|
|
page.screenshot(path=str(png), full_page=True)
|
|
except Exception:
|
|
pass
|
|
log(f"Saved debug -> {html} {png}")
|
|
|
|
|
|
# -------------------- page/form ops --------------------
|
|
|
|
def goto_plate_form(page: Page) -> None:
|
|
page.goto(CITYPAY_URL, timeout=120_000, wait_until="domcontentloaded")
|
|
try:
|
|
page.get_by_role("tab", name="Search By License Plate").click(timeout=3000)
|
|
except Exception:
|
|
pass
|
|
page.wait_for_selector("#by-plate-form", timeout=10_000)
|
|
page.locator("input#plate-number[name='PLATE_NUMBER']").first.wait_for(
|
|
state="attached", timeout=8000
|
|
)
|
|
|
|
def set_plate_value(page: Page, value: str) -> None:
|
|
inp = page.locator("input#plate-number[name='PLATE_NUMBER']").first
|
|
try:
|
|
inp.fill("", timeout=1500)
|
|
inp.type(value, delay=15)
|
|
return
|
|
except Exception:
|
|
page.evaluate(
|
|
"""(val) => {
|
|
const el = document.querySelector("input#plate-number[name='PLATE_NUMBER']");
|
|
if (!el) return;
|
|
el.value = "";
|
|
el.dispatchEvent(new Event('input', {bubbles:true}));
|
|
el.value = val;
|
|
el.dispatchEvent(new Event('input', {bubbles:true}));
|
|
el.dispatchEvent(new Event('change', {bubbles:true}));
|
|
}""",
|
|
value,
|
|
)
|
|
|
|
def pick_state(page: Page, code: str) -> None:
|
|
try:
|
|
opener = page.locator("#PLATE_STATE_chosen .chosen-single").first
|
|
opener.click(timeout=1500)
|
|
search = page.locator("#PLATE_STATE_chosen .chosen-search input").first
|
|
search.wait_for(state="visible", timeout=1500)
|
|
search.fill("")
|
|
search.type(code, delay=25)
|
|
page.keyboard.press("Enter")
|
|
page.wait_for_timeout(120)
|
|
except Exception:
|
|
pass
|
|
|
|
page.evaluate(
|
|
"""(code) => {
|
|
const sel = document.querySelector('#PLATE_STATE');
|
|
if (sel) {
|
|
sel.value = code;
|
|
sel.dispatchEvent(new Event('change', {bubbles:true}));
|
|
}
|
|
const span = document.querySelector('#PLATE_STATE_chosen .chosen-single span');
|
|
if (span && code) {
|
|
const t = (span.textContent || '');
|
|
const tail = t.includes('-') ? t.split('-').pop().trim() : t.trim();
|
|
span.textContent = code + ' - ' + tail;
|
|
}
|
|
}""",
|
|
code,
|
|
)
|
|
|
|
try:
|
|
label = page.locator("#PLATE_STATE_chosen .chosen-single span").inner_text(timeout=800).strip()
|
|
if code not in label:
|
|
page.evaluate(
|
|
"""() => {
|
|
const s = document.querySelector('#PLATE_STATE');
|
|
if (s) s.style.display='block';
|
|
}"""
|
|
)
|
|
try:
|
|
page.locator("select#PLATE_STATE").select_option(value=code)
|
|
finally:
|
|
page.evaluate(
|
|
"""() => {
|
|
const s = document.querySelector('#PLATE_STATE');
|
|
if (s) s.style.display='none';
|
|
}"""
|
|
)
|
|
page.wait_for_timeout(100)
|
|
except Exception:
|
|
pass
|
|
|
|
def pick_plate_type(page: Page, label: str) -> None:
|
|
if not label or label == "--ALL--":
|
|
return
|
|
try:
|
|
page.locator("select#PLATE_TYPE[name='PLATE_TYPE']").select_option(label=label)
|
|
return
|
|
except Exception:
|
|
pass
|
|
page.evaluate(
|
|
"""(label) => {
|
|
const sel = document.querySelector('#PLATE_TYPE');
|
|
if (!sel) return;
|
|
const want = (label || '').trim();
|
|
for (const o of sel.options) {
|
|
const t = (o.textContent || '').trim();
|
|
if (t.startsWith(want)) { sel.value = o.value; break; }
|
|
}
|
|
sel.dispatchEvent(new Event('change', {bubbles:true}));
|
|
}""",
|
|
label,
|
|
)
|
|
|
|
def submit_search(page: Page) -> None:
|
|
page.get_by_role("button", name="Search").click()
|
|
|
|
|
|
# -------------------- navigation-safe waiting --------------------
|
|
|
|
def wait_for_results_or_error(page: Page, max_wait_s: int = 10) -> str:
|
|
"""
|
|
Polls for either the results URL or the inline form error without crashing
|
|
when the page is navigating (execution context destroyed).
|
|
Returns: 'results' | 'error' | 'timeout'
|
|
"""
|
|
deadline = time.time() + max_wait_s
|
|
while time.time() < deadline:
|
|
# Did we reach results?
|
|
try:
|
|
if "/searchResults" in page.url:
|
|
return "results"
|
|
except Exception:
|
|
time.sleep(0.2)
|
|
continue
|
|
|
|
# Is the red inline error visible on the form?
|
|
try:
|
|
has_error = page.evaluate(
|
|
"""() => {
|
|
const el = document.querySelector('form#by-plate-form .error-msg label.error');
|
|
return !!(el && el.offsetParent !== null && el.textContent.trim().length);
|
|
}"""
|
|
)
|
|
if has_error:
|
|
return "error"
|
|
except Exception:
|
|
pass
|
|
|
|
time.sleep(0.2)
|
|
return "timeout"
|
|
|
|
|
|
# -------------------- results scraping --------------------
|
|
|
|
def expand_all_groups(page: Page) -> None:
|
|
# Click group toggles a few times to make sure everything is expanded
|
|
for _ in range(3):
|
|
for el in page.locator("tr.dtrg-group, tr.dtrg-group .ico-wrapper, tr.dtrg-group u").all():
|
|
try:
|
|
el.click(timeout=150)
|
|
except Exception:
|
|
pass
|
|
page.wait_for_timeout(120)
|
|
|
|
def scrape_all_ticket_rows(page: Page) -> List[Dict[str, str]]:
|
|
# Evaluate in the DOM to grab ALL rows, skipping group headers
|
|
return page.evaluate("""
|
|
() => {
|
|
const out = [];
|
|
const tables = Array.from(document.querySelectorAll('table'));
|
|
for (const t of tables) {
|
|
let headers = Array.from(t.querySelectorAll('thead th')).map(th => (th.textContent||'').trim());
|
|
if (!headers.length) {
|
|
const first = t.querySelector('tr');
|
|
if (!first) continue;
|
|
headers = Array.from(first.querySelectorAll('th,td')).map(c => (c.textContent||'').trim());
|
|
}
|
|
if (!headers.length) continue;
|
|
|
|
const bodyRows = t.querySelectorAll('tbody tr');
|
|
const rows = bodyRows.length ? Array.from(bodyRows) : Array.from(t.querySelectorAll('tr')).slice(1);
|
|
|
|
for (const tr of rows) {
|
|
const cls = (tr.getAttribute('class')||'');
|
|
if (cls.includes('dtrg-group')) continue;
|
|
const cells = Array.from(tr.querySelectorAll('th,td')).map(td => (td.textContent||'').trim());
|
|
if (!cells.some(Boolean)) continue;
|
|
|
|
const row = {};
|
|
for (let i=0;i<headers.length;i++) row[headers[i]] = cells[i] ?? '';
|
|
out.push(row);
|
|
}
|
|
}
|
|
return out;
|
|
}
|
|
""")
|
|
|
|
|
|
# -------------------- run one plate --------------------
|
|
|
|
def run_one_plate(page: Page, plate: str, state: str, ptype: str, first_plate: bool, debug: bool) -> pd.DataFrame:
|
|
goto_plate_form(page)
|
|
set_plate_value(page, plate)
|
|
pick_state(page, state)
|
|
pick_plate_type(page, ptype)
|
|
submit_search(page)
|
|
|
|
if first_plate:
|
|
# If a captcha shows up the first time, let the user solve it
|
|
time.sleep(1)
|
|
try:
|
|
if "recaptcha" in page.content().lower():
|
|
input("\nSolve any CAPTCHA in the browser, wait for results, then press Enter here...")
|
|
except Exception:
|
|
pass
|
|
|
|
status = wait_for_results_or_error(page, max_wait_s=10)
|
|
|
|
if status == "error":
|
|
if debug:
|
|
save_debug(page, plate, "form_error")
|
|
return pd.DataFrame()
|
|
|
|
if status != "results":
|
|
if debug:
|
|
save_debug(page, plate, "timeout")
|
|
return pd.DataFrame()
|
|
|
|
# On results page
|
|
expand_all_groups(page)
|
|
rows = scrape_all_ticket_rows(page)
|
|
df = pd.DataFrame(rows)
|
|
|
|
if df.empty and debug:
|
|
save_debug(page, plate, "no_rows")
|
|
|
|
return df
|
|
|
|
|
|
# -------------------- main --------------------
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Scrape NYC CityPay tickets by plate into Excel")
|
|
ap.add_argument("--plates", default="plates.csv", help="CSV with columns: plate,state,plate_type")
|
|
ap.add_argument("--out", default="tickets_report.xlsx", help="Output Excel path")
|
|
ap.add_argument("--debug", action="store_true", help="Save HTML/PNG for errors/no rows")
|
|
args = ap.parse_args()
|
|
|
|
plate_rows = read_plates_csv(Path(args.plates))
|
|
if not plate_rows:
|
|
print("No plates found. Create plates.csv with columns: plate,state,plate_type")
|
|
return
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=False, slow_mo=110)
|
|
ctx = browser.new_context(viewport={"width": 1360, "height": 900})
|
|
page = ctx.new_page()
|
|
|
|
log(f"Opening {CITYPAY_URL}")
|
|
page.goto(CITYPAY_URL, timeout=120_000, wait_until="domcontentloaded")
|
|
|
|
per_plate: List[Tuple[str, pd.DataFrame]] = []
|
|
summary: List[Dict[str, object]] = []
|
|
first = True
|
|
|
|
for i, (plate, state, ptype) in enumerate(plate_rows, start=1):
|
|
log(f"[{i}/{len(plate_rows)}] {plate} {state} …")
|
|
df = run_one_plate(page, plate, state, ptype, first, args.debug)
|
|
first = False
|
|
|
|
# Return to form for the next plate
|
|
goto_plate_form(page)
|
|
|
|
if df.empty:
|
|
summary.append({"plate": plate, "state": state, "rows": 0, "total_due": 0.0})
|
|
continue
|
|
|
|
# normalize and compute totals
|
|
df.insert(0, "PLATE", plate)
|
|
df.insert(1, "STATE", state)
|
|
|
|
for col in list(df.columns):
|
|
if "AMOUNT" in col.upper():
|
|
df[col] = df[col].map(to_money)
|
|
|
|
total_due = 0.0
|
|
for col in df.columns:
|
|
if "TOTAL AMOUNT DUE" in col.upper():
|
|
total_due = float(df[col].sum())
|
|
break
|
|
|
|
per_plate.append((plate, df))
|
|
summary.append({
|
|
"plate": plate,
|
|
"state": state,
|
|
"rows": len(df),
|
|
"total_due": round(total_due, 2),
|
|
})
|
|
|
|
# Write Excel
|
|
out_path = Path(args.out)
|
|
with pd.ExcelWriter(out_path, engine="openpyxl") as xl:
|
|
pd.DataFrame(summary).sort_values(
|
|
["total_due", "plate"], ascending=[False, True]
|
|
).to_excel(xl, index=False, sheet_name="Summary")
|
|
|
|
used = set()
|
|
for plate, df in per_plate:
|
|
sheet = plate[:28]
|
|
n = 2
|
|
while sheet in used:
|
|
sheet = f"{plate[:26]}-{n}"
|
|
n += 1
|
|
used.add(sheet)
|
|
|
|
# ------------ FINAL column cleanup (header + content) ------------
|
|
def _norm_hdr(s: str) -> str:
|
|
return re.sub(r"\s+", " ", str(s)).strip().lower()
|
|
|
|
drop_names = {"pending payment", "total amount due", "payment amount"}
|
|
drop_regex = re.compile(r"(?i)^\s*payment\s*amount\s*:\s*\$[0-9,.\s]+\s*$")
|
|
|
|
to_drop = []
|
|
for c in list(df.columns):
|
|
h = _norm_hdr(c)
|
|
if h in drop_names or drop_regex.match(h):
|
|
to_drop.append(c)
|
|
|
|
# Always remove Column C (3rd column)
|
|
if len(df.columns) >= 3:
|
|
col_c = df.columns[2]
|
|
if col_c not in to_drop:
|
|
to_drop.append(col_c)
|
|
|
|
if to_drop:
|
|
df = df.drop(columns=to_drop)
|
|
|
|
# Clean 'Violation #' column: remove 'View Ticket', keep only numeric ticket
|
|
for _col in list(df.columns):
|
|
if _norm_hdr(_col) == "violation #":
|
|
s = df[_col].astype(str)
|
|
s = s.str.replace(r"\s*view\s*ticket\s*", "", regex=True, flags=re.IGNORECASE)
|
|
digits = s.str.extract(r"(\d{5,})", expand=False)
|
|
df[_col] = digits.fillna(s.str.strip())
|
|
break
|
|
|
|
# -----------------------------------------------------------------
|
|
df.to_excel(xl, index=False, sheet_name=sheet)
|
|
|
|
log(f"Saved report -> {out_path.resolve()}")
|
|
ctx.close()
|
|
browser.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|