"""JobSpy ingestion provider (default, free) — implements JobSource. Uses python-jobspy to search LinkedIn. The boundary between JobSpy's API and this module is _scrape(); everything else is plain mapping logic. JobSpy column names confirmed against live responses (update CLAUDE.md Gotchas when first verified): id, job_url, company, company_url_direct, date_posted, title, location. company_url_direct is the company's own site (not the LinkedIn company page); fill rate observed as low — resolve.py covers the gap. """ from __future__ import annotations import logging from datetime import date, datetime from ..models import RawJob from .base import JobSource, canonical_linkedin_url, clean_value, parse_linkedin_job_id logger = logging.getLogger(__name__) class JobSpySource(JobSource): """Fetches recent LinkedIn jobs via python-jobspy (no authentication required).""" def fetch_recent_jobs( self, search_terms: list[str], location: str, hours_old: int, results_wanted: int, ) -> list[RawJob]: seen: dict[str, RawJob] = {} total_records = 0 with_website = 0 for term in search_terms: try: records = self._scrape(term, location, hours_old, results_wanted) except Exception: logger.exception("JobSpy scrape failed for term %r", term) continue for record in records: total_records += 1 raw = self._to_raw_job(record) if raw is None: continue if raw.website: with_website += 1 if raw.job_id not in seen: seen[raw.job_id] = raw fill_rate = (with_website / total_records * 100) if total_records else 0.0 logger.info( "JobSpy: %d unique jobs from %d terms; company_url_direct fill rate %.0f%%", len(seen), len(search_terms), fill_rate, ) return list(seen.values()) # ------------------------------------------------------------------ # Isolated JobSpy boundary — swap provider here and in the import only. # ------------------------------------------------------------------ def _scrape( self, term: str, location: str, hours_old: int, results_wanted: int ) -> list[dict]: """Call python-jobspy and return raw records as plain dicts.""" from jobspy import scrape_jobs # type: ignore[import-untyped] df = scrape_jobs( site_name=["linkedin"], search_term=term, location=location, results_wanted=results_wanted, hours_old=hours_old, linkedin_fetch_description=False, ) if df is None or df.empty: return [] return df.to_dict("records") # type: ignore[return-value] # ------------------------------------------------------------------ # Field mapping # ------------------------------------------------------------------ def _to_raw_job(self, record: dict) -> RawJob | None: """Map one JobSpy record dict to RawJob; return None to skip.""" raw_url = clean_value(record.get("job_url")) job_id = parse_linkedin_job_id(raw_url) if not job_id: # Fallback: JobSpy sometimes exposes a bare id column job_id = clean_value(record.get("id")) if not job_id: logger.debug("Skipping record with no parseable job_id: %s", raw_url) return None company = clean_value(record.get("company")) if not company: logger.debug("Skipping job %s: no company name", job_id) return None # company_url_direct is the company's own site; company_url is the LinkedIn page. website = clean_value(record.get("company_url_direct")) return RawJob( job_id=job_id, company=company, linkedin_url=canonical_linkedin_url(job_id), website=website, listed_at=_to_datetime(record.get("date_posted")), title=clean_value(record.get("title")), location=clean_value(record.get("location")), ) def _to_datetime(value: object) -> datetime | None: """Coerce JobSpy date/datetime/string cells to datetime | None.""" if value is None: return None # float NaN (pandas sentinel) — NaN != NaN if isinstance(value, float) and value != value: return None if isinstance(value, datetime): return value if isinstance(value, date): return datetime(value.year, value.month, value.day) # pandas NaT has isoformat but raises when compared; check type name to avoid import if type(value).__name__ == "NaTType": return None if isinstance(value, str): try: return datetime.fromisoformat(value) except ValueError: return None return None