diff --git a/CLAUDE.md b/CLAUDE.md index c45b287..d9a4c2c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -178,7 +178,9 @@ pytest -q ## Gotchas (append confirmed findings here as you build — this section is durable memory across /clear) - Verify ATS JSON field names against live responses before trusting them: Greenhouse `jobs[].absolute_url`; Lever `[].hostedUrl`; Ashby `jobs[].jobUrl`; Workday varies by tenant. Fix in code AND note the confirmed shape here. -- JobSpy populates the company's own site (`company_url_direct`) only sometimes; `resolve.py` must cover the gap. Record the observed fill rate here after the first live fetch. +- **JobSpy `company_url_direct` fill rate: 0% observed** (5/5 jobs had `website=None` in a live fetch on 2026-06-17, search: "software engineer", United States, `linkedin_fetch_description=False`). `resolve.py` is essential for **every** job, not just a gap-filler. Do not assume any job arrives with a website pre-populated. +- **JobSpy `date_posted` / `listed_at` fill rate: ~40% observed** (2/5 jobs had a date; 3/5 were `None`). This is because `linkedin_fetch_description=False` (our default for speed) means LinkedIn's posted date is often absent. `listed_at` is best-effort metadata only; do not gate pipeline logic on it. +- **JobSpy confirmed column names** (verified 2026-06-17): `job_url` (full LinkedIn URL incl. tracking params), `company` (display name), `company_url_direct` (company own site — always `None` in practice so far), `date_posted` (sparse when `linkedin_fetch_description=False`), `title`, `location`, `id` (may be `None`; always parse job_id from `job_url` instead). `company_url` is the LinkedIn *company page* URL — never use it as the company website. - LinkedIn parses the numeric job id from `/jobs/view/{id}`; strip tracking query params. - Browser Use needs Chromium installed (`playwright install chromium`) and an LLM key; without them the tier must degrade gracefully. - LinkedIn rate-limits aggressively; keep batches small while testing. diff --git a/jobsource/resolve.py b/jobsource/resolve.py index 04f441a..4214307 100644 --- a/jobsource/resolve.py +++ b/jobsource/resolve.py @@ -1,10 +1,122 @@ """Resolve company name → company website URL (Stage 1b, deterministic). -Scaffold stub -- not implemented yet. +Three-tier cascade — returns on first hit: + Tier 1: provider-supplied website (trusted, no network call). + Tier 2: verified {slug}.com guess (HTTP HEAD/GET probe). + Tier 3: search API (gated by SEARCH_API_ENABLED; ships as a stub — wire + a real provider in _search_api_lookup() when ready). """ -# TODO (Stage 1b): implement per CLAUDE.md "Stage 1b — Resolve website (deterministic)". -# Resolution order: -# 1. Use provider-supplied website if present. -# 2. Verified domain guess: normalize company name to {slug}.com and probe via HTTP HEAD. -# 3. Optional search API (SEARCH_API_ENABLED=true) as final fallback. -# Returns the resolved URL string, or None if unresolvable. +from __future__ import annotations + +import logging +import re + +import httpx + +from .config import get_settings +from .http import build_client, request_with_retries + +logger = logging.getLogger(__name__) + +# Legal suffix words stripped when building the domain slug. +_LEGAL_SUFFIXES = re.compile( + r"\b(inc|llc|ltd|corp|co|gmbh|plc|sa|ag|pbc|lp|llp)\b", + re.IGNORECASE, +) +_NON_ALNUM = re.compile(r"[^a-z0-9]+") + +# HEAD responses that indicate the server doesn't support HEAD — retry with GET. +_HEAD_NOT_SUPPORTED = frozenset({405, 501}) + + +def resolve_website( + company_name: str, + website: str | None = None, + *, + client: httpx.Client | None = None, +) -> str | None: + """Return the company's own website URL, or None if unresolvable. + + Pass an existing httpx.Client to reuse connections across many calls; + otherwise a short-lived client is created and closed here. + """ + settings = get_settings() + _managed = client is None + if _managed: + client = build_client() + + try: + # Tier 1 — provider-supplied website (trusted, no network needed). + if website and not website.startswith("PLACEHOLDER"): + resolved = _normalize_scheme(website) + logger.info("resolve(%s): tier=provider url=%s", company_name, resolved) + return resolved + + # Tier 2 — {slug}.com guess with HTTP verification. + slug = _slug(company_name) + if slug: + guessed = f"https://{slug}.com" + verified = _verify(client, guessed) + if verified: + logger.info("resolve(%s): tier=slug_guess url=%s", company_name, verified) + return verified + + # Tier 3 — optional search API (gated; stub by default). + if settings.search_api_enabled and not settings.search_api_key.startswith("PLACEHOLDER"): + result = _search_api_lookup(company_name, client) + if result: + logger.info("resolve(%s): tier=search_api url=%s", company_name, result) + return result + + logger.info("resolve(%s): unresolvable (all tiers missed)", company_name) + return None + finally: + if _managed: + client.close() + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _normalize_scheme(url: str) -> str: + """Add https:// if the URL has no scheme.""" + if url.startswith(("http://", "https://")): + return url + return f"https://{url}" + + +def _slug(name: str) -> str | None: + """Normalize company name to a domain slug (lowercase, no legal suffixes, alnum only).""" + s = name.lower() + s = _LEGAL_SUFFIXES.sub("", s) + s = _NON_ALNUM.sub("", s) + return s or None + + +def _verify(client: httpx.Client, url: str) -> str | None: + """Probe url with HEAD (fallback to GET on 405/501); return final URL or None.""" + try: + resp = request_with_retries(client, "HEAD", url, max_retries=1) + if resp.status_code in _HEAD_NOT_SUPPORTED: + resp = request_with_retries(client, "GET", url, max_retries=1) + if resp.status_code < 400: + return str(resp.url) + return None + except Exception: + return None + + +def _search_api_lookup(company_name: str, client: httpx.Client) -> str | None: + """Search API fallback — returns the company's website from a web search. + + Integration point: implement by querying a search API with + f'"{company_name}" official website' using settings.search_api_key, + extracting the registrable domain from the top organic result's URL, + verifying it with _verify(), and returning the URL or None. + + Currently a stub; enabled only when SEARCH_API_ENABLED=true and a real + SEARCH_API_KEY is set. + """ + return None diff --git a/jobsource/sources/__init__.py b/jobsource/sources/__init__.py index 681d827..eb14d8d 100644 --- a/jobsource/sources/__init__.py +++ b/jobsource/sources/__init__.py @@ -1 +1,38 @@ -"""Job source provider package.""" +"""Job source provider package. + +Public API: JobSource interface + get_job_source() factory. +Heavy provider imports are deferred to get_job_source() so loading this +package does not pull in pandas, jobspy, or apify-client at import time. +""" +from __future__ import annotations + +from .base import JobSource + +__all__ = ["JobSource", "get_job_source"] + + +def get_job_source(settings=None) -> JobSource: + """Return the configured JobSource instance. + + Reads job_source from settings (default: get_settings()). Lazy-imports + the chosen module so the unused provider's heavy deps are never loaded. + + Raises ValueError for unknown provider names. + """ + if settings is None: + from ..config import get_settings + + settings = get_settings() + + provider = settings.job_source.lower() + if provider == "jobspy": + from .jobspy_source import JobSpySource + + return JobSpySource() + if provider == "apify": + from .apify_source import ApifySource + + return ApifySource() + raise ValueError( + f"Unknown job_source {settings.job_source!r}. Valid values: 'jobspy', 'apify'." + ) diff --git a/jobsource/sources/apify_source.py b/jobsource/sources/apify_source.py index 26d5ebd..91acbfb 100644 --- a/jobsource/sources/apify_source.py +++ b/jobsource/sources/apify_source.py @@ -1,8 +1,129 @@ """Apify ingestion provider (alternative, paid) — implements JobSource. -Scaffold stub -- not implemented yet. +Drop-in replacement for JobSpySource; same interface, selected by config +(JOB_SOURCE=apify). Two adjustment points when swapping actors: + 1. _run_actor() — the one-line Apify SDK call + actor ID from config. + 2. _to_raw_job() — the field-alias map (actor output schema varies). """ -# TODO (Stage 1): implement ApifySource per CLAUDE.md "Stage 1 — Ingest". -# Drop-in alternative to JobSpySource; same JobSource interface. -# Uses apify-client; actor ID from config (APIFY_ACTOR env var). -# Map Apify actor output fields → RawJob; same dedup key (LinkedIn jobPostingId). +from __future__ import annotations + +import logging + +from ..config import get_settings +from ..models import RawJob +from .base import JobSource, canonical_linkedin_url, clean_value, parse_linkedin_job_id + +logger = logging.getLogger(__name__) + +# Field aliases tried in order when mapping actor output → RawJob. +# Extend this list when the actor schema is known; first match wins. +_COMPANY_KEYS = ("company", "companyName", "company_name") +_URL_KEYS = ("jobUrl", "job_url", "url", "link", "applyUrl") +_WEBSITE_KEYS = ("companyWebsite", "website", "company_url_direct", "companyUrl") +_POSTED_KEYS = ("postedAt", "listedAt", "date_posted", "postedDate", "publishedAt") +_TITLE_KEYS = ("title", "jobTitle", "position") +_LOCATION_KEYS = ("location", "jobLocation") + + +class ApifySource(JobSource): + """Fetches recent LinkedIn jobs via an Apify actor (paid; actor-agnostic).""" + + def fetch_recent_jobs( + self, + search_terms: list[str], + location: str, + hours_old: int, + results_wanted: int, + ) -> list[RawJob]: + settings = get_settings() + token = settings.apify_token + if not token or token.startswith("PLACEHOLDER"): + logger.error( + "Apify token is not configured (APIFY_TOKEN). " + "Set a real token or switch JOB_SOURCE=jobspy." + ) + return [] + + run_input = { + "queries": search_terms, + "location": location, + "maxItems": results_wanted, + } + try: + items = self._run_actor(token, settings.apify_actor, run_input) + except Exception: + logger.exception("Apify actor run failed") + return [] + + seen: dict[str, RawJob] = {} + for item in items: + raw = self._to_raw_job(item) + if raw is None: + continue + if raw.job_id not in seen: + seen[raw.job_id] = raw + + logger.info("Apify: %d unique jobs returned", len(seen)) + return list(seen.values()) + + # ------------------------------------------------------------------ + # Isolated Apify boundary — the one-line actor swap point. + # ------------------------------------------------------------------ + + def _run_actor(self, token: str, actor_id: str, run_input: dict) -> list[dict]: + """Call the Apify actor and return all dataset items as plain dicts.""" + from apify_client import ApifyClient # type: ignore[import-untyped] + + client = ApifyClient(token) + run = client.actor(actor_id).call(run_input=run_input) + return list(client.dataset(run["defaultDatasetId"]).iterate_items()) + + # ------------------------------------------------------------------ + # Field mapping + # ------------------------------------------------------------------ + + def _to_raw_job(self, item: dict) -> RawJob | None: + """Map one actor output item to RawJob; return None to skip.""" + raw_url = _first(item, _URL_KEYS) + job_id = parse_linkedin_job_id(raw_url) + if not job_id: + logger.debug("Skipping Apify item with no LinkedIn job_id: %s", raw_url) + return None + + company = _first(item, _COMPANY_KEYS) + if not company: + logger.debug("Skipping Apify job %s: no company name", job_id) + return None + + from datetime import datetime + + posted_raw = _first(item, _POSTED_KEYS, coerce=False) + listed_at: datetime | None = None + if posted_raw: + try: + listed_at = datetime.fromisoformat(str(posted_raw)) + except (ValueError, TypeError): + pass + + return RawJob( + job_id=job_id, + company=company, + linkedin_url=canonical_linkedin_url(job_id), + website=_first(item, _WEBSITE_KEYS), + listed_at=listed_at, + title=_first(item, _TITLE_KEYS), + location=_first(item, _LOCATION_KEYS), + ) + + +def _first(item: dict, keys: tuple[str, ...], *, coerce: bool = True) -> str | None: + """Return the first non-empty value found under any of keys.""" + for k in keys: + v = item.get(k) + if coerce: + v = clean_value(v) + elif v is None: + continue + if v: + return str(v) if not coerce else v + return None diff --git a/jobsource/sources/base.py b/jobsource/sources/base.py index cb144d5..550f126 100644 --- a/jobsource/sources/base.py +++ b/jobsource/sources/base.py @@ -1,16 +1,62 @@ -"""JobSource interface: every ingestion provider must implement fetch_recent_jobs(). +"""JobSource interface and shared LinkedIn URL helpers. -Scaffold stub -- not implemented yet. +Every ingestion provider implements JobSource. The helpers here are +shared by all providers and have no external dependencies. """ -# TODO (Stage 1): define the JobSource ABC per CLAUDE.md "Stage 1 — Ingest (deterministic)". -# Interface: -# class JobSource(ABC): -# @abstractmethod -# def fetch_recent_jobs( -# self, -# search_terms: list[str], -# location: str, -# hours_old: int, -# results_wanted: int, -# ) -> list[RawJob]: ... -# Implementations: jobspy_source.JobSpySource, apify_source.ApifySource. +from __future__ import annotations + +import re +from abc import ABC, abstractmethod + +from ..models import RawJob + +_LINKEDIN_JOB_URL_RE = re.compile(r"/jobs/view/(\d+)") + + +class JobSource(ABC): + """Abstract base for all ingestion providers.""" + + @abstractmethod + def fetch_recent_jobs( + self, + search_terms: list[str], + location: str, + hours_old: int, + results_wanted: int, + ) -> list[RawJob]: + """Return recent jobs matching search_terms in location. + + Never raises — callers expect a list (possibly empty) on any error. + """ + + +def parse_linkedin_job_id(url: str | None) -> str | None: + """Extract the numeric jobPostingId from a LinkedIn job-view URL. + + Returns None for None input or any URL that doesn't contain /jobs/view/{id}. + Tracking query params are ignored (we only look at the path segment). + """ + if not url: + return None + m = _LINKEDIN_JOB_URL_RE.search(url) + return m.group(1) if m else None + + +def canonical_linkedin_url(job_id: str) -> str: + """Return the clean LinkedIn job URL with no tracking params.""" + return f"https://www.linkedin.com/jobs/view/{job_id}" + + +def clean_value(value: object) -> str | None: + """Normalize a source cell to str | None. + + Treats None, empty/whitespace strings, and float NaN (pandas sentinel) + as None — without importing pandas. + """ + if value is None: + return None + # float NaN check without pandas: NaN is the only float where v != v + if isinstance(value, float) and value != value: + return None + s = str(value).strip() + return s if s else None diff --git a/jobsource/sources/jobspy_source.py b/jobsource/sources/jobspy_source.py index cfea85e..aa55299 100644 --- a/jobsource/sources/jobspy_source.py +++ b/jobsource/sources/jobspy_source.py @@ -1,10 +1,137 @@ """JobSpy ingestion provider (default, free) — implements JobSource. -Scaffold stub -- not implemented yet. +Uses python-jobspy to search LinkedIn. The boundary between JobSpy's API +and this module is _scrape(); everything else is plain mapping logic. + +JobSpy column names confirmed against live responses (update CLAUDE.md Gotchas +when first verified): id, job_url, company, company_url_direct, date_posted, +title, location. company_url_direct is the company's own site (not the LinkedIn +company page); fill rate observed as low — resolve.py covers the gap. """ -# TODO (Stage 1): implement JobSpySource per CLAUDE.md "Stage 1 — Ingest". -# Uses python-jobspy (python_jobspy). Key notes: -# - Search LinkedIn via JobSpy; parse LinkedIn numeric jobPostingId from the job URL. -# - Map JobSpy result fields → RawJob (company, website from company_url_direct if present). -# - Strip tracking query params from linkedin_url; keep only /jobs/view/{id}. -# - Log observed fill rate of company_url_direct (see CLAUDE.md Gotchas). +from __future__ import annotations + +import logging +from datetime import date, datetime + +from ..models import RawJob +from .base import JobSource, canonical_linkedin_url, clean_value, parse_linkedin_job_id + +logger = logging.getLogger(__name__) + + +class JobSpySource(JobSource): + """Fetches recent LinkedIn jobs via python-jobspy (no authentication required).""" + + def fetch_recent_jobs( + self, + search_terms: list[str], + location: str, + hours_old: int, + results_wanted: int, + ) -> list[RawJob]: + seen: dict[str, RawJob] = {} + total_records = 0 + with_website = 0 + + for term in search_terms: + try: + records = self._scrape(term, location, hours_old, results_wanted) + except Exception: + logger.exception("JobSpy scrape failed for term %r", term) + continue + + for record in records: + total_records += 1 + raw = self._to_raw_job(record) + if raw is None: + continue + if raw.website: + with_website += 1 + if raw.job_id not in seen: + seen[raw.job_id] = raw + + fill_rate = (with_website / total_records * 100) if total_records else 0.0 + logger.info( + "JobSpy: %d unique jobs from %d terms; company_url_direct fill rate %.0f%%", + len(seen), + len(search_terms), + fill_rate, + ) + return list(seen.values()) + + # ------------------------------------------------------------------ + # Isolated JobSpy boundary — swap provider here and in the import only. + # ------------------------------------------------------------------ + + def _scrape( + self, term: str, location: str, hours_old: int, results_wanted: int + ) -> list[dict]: + """Call python-jobspy and return raw records as plain dicts.""" + from jobspy import scrape_jobs # type: ignore[import-untyped] + + df = scrape_jobs( + site_name=["linkedin"], + search_term=term, + location=location, + results_wanted=results_wanted, + hours_old=hours_old, + linkedin_fetch_description=False, + ) + if df is None or df.empty: + return [] + return df.to_dict("records") # type: ignore[return-value] + + # ------------------------------------------------------------------ + # Field mapping + # ------------------------------------------------------------------ + + def _to_raw_job(self, record: dict) -> RawJob | None: + """Map one JobSpy record dict to RawJob; return None to skip.""" + raw_url = clean_value(record.get("job_url")) + job_id = parse_linkedin_job_id(raw_url) + if not job_id: + # Fallback: JobSpy sometimes exposes a bare id column + job_id = clean_value(record.get("id")) + if not job_id: + logger.debug("Skipping record with no parseable job_id: %s", raw_url) + return None + + company = clean_value(record.get("company")) + if not company: + logger.debug("Skipping job %s: no company name", job_id) + return None + + # company_url_direct is the company's own site; company_url is the LinkedIn page. + website = clean_value(record.get("company_url_direct")) + + return RawJob( + job_id=job_id, + company=company, + linkedin_url=canonical_linkedin_url(job_id), + website=website, + listed_at=_to_datetime(record.get("date_posted")), + title=clean_value(record.get("title")), + location=clean_value(record.get("location")), + ) + + +def _to_datetime(value: object) -> datetime | None: + """Coerce JobSpy date/datetime/string cells to datetime | None.""" + if value is None: + return None + # float NaN (pandas sentinel) — NaN != NaN + if isinstance(value, float) and value != value: + return None + if isinstance(value, datetime): + return value + if isinstance(value, date): + return datetime(value.year, value.month, value.day) + # pandas NaT has isoformat but raises when compared; check type name to avoid import + if type(value).__name__ == "NaTType": + return None + if isinstance(value, str): + try: + return datetime.fromisoformat(value) + except ValueError: + return None + return None diff --git a/tests/test_resolve.py b/tests/test_resolve.py new file mode 100644 index 0000000..c133613 --- /dev/null +++ b/tests/test_resolve.py @@ -0,0 +1,145 @@ +"""Tests for jobsource/resolve.py — all network-free via monkeypatched _verify.""" +from __future__ import annotations + +import pytest + +from jobsource.config import get_settings +from jobsource.resolve import ( + _search_api_lookup, + _slug, + _verify, + resolve_website, +) + + +# --------------------------------------------------------------------------- +# _slug +# --------------------------------------------------------------------------- + + +class TestSlug: + def test_basic(self): + assert _slug("GitHub") == "github" + + def test_strips_legal_suffix_inc(self): + assert _slug("Acme Inc") == "acme" + + def test_strips_legal_suffix_llc(self): + assert _slug("Widgets LLC") == "widgets" + + def test_strips_legal_suffix_corp(self): + assert _slug("MegaCorp Corp") == "megacorp" + + def test_strips_multiple_words(self): + assert _slug("Some Company Ltd") == "somecompany" + + def test_removes_spaces_and_punctuation(self): + assert _slug("Foo & Bar") == "foobar" + + def test_empty_after_strip_returns_none(self): + assert _slug("LLC") is None + + def test_empty_string_returns_none(self): + assert _slug("") is None + + def test_gmbh(self): + assert _slug("Acme GmbH") == "acme" + + +# --------------------------------------------------------------------------- +# resolve_website — tier 1: provider-supplied +# --------------------------------------------------------------------------- + + +class TestResolveWebsiteTier1: + def test_returns_provider_website_unchanged_if_has_scheme(self, monkeypatch): + called = [] + monkeypatch.setattr("jobsource.resolve._verify", lambda c, u: called.append(u) or None) + result = resolve_website("Acme", "https://acme.com") + assert result == "https://acme.com" + assert called == [] # no network call + + def test_adds_https_if_no_scheme(self, monkeypatch): + monkeypatch.setattr("jobsource.resolve._verify", lambda c, u: None) + result = resolve_website("Acme", "acme.com") + assert result == "https://acme.com" + + def test_placeholder_website_skips_to_next_tier(self, monkeypatch): + verify_calls = [] + monkeypatch.setattr("jobsource.resolve._verify", lambda c, u: verify_calls.append(u) or None) + result = resolve_website("Acme", "PLACEHOLDER_URL") + assert result is None + assert len(verify_calls) >= 1 # fell through to tier 2 + + +# --------------------------------------------------------------------------- +# resolve_website — tier 2: slug guess +# --------------------------------------------------------------------------- + + +class TestResolveWebsiteTier2: + def test_verified_slug_returned(self, monkeypatch): + monkeypatch.setattr( + "jobsource.resolve._verify", + lambda c, u: "https://github.com/" if "github" in u else None, + ) + result = resolve_website("GitHub") + assert result == "https://github.com/" + + def test_miss_returns_none_when_search_disabled(self, monkeypatch): + monkeypatch.setattr("jobsource.resolve._verify", lambda c, u: None) + result = resolve_website("Acme Corp") + assert result is None + + def test_unslugable_name_skips_tier2(self, monkeypatch): + verify_calls = [] + monkeypatch.setattr("jobsource.resolve._verify", lambda c, u: verify_calls.append(u) or None) + result = resolve_website("LLC") # slug → None + assert result is None + assert verify_calls == [] + + +# --------------------------------------------------------------------------- +# resolve_website — tier 3: search API (gated stub) +# --------------------------------------------------------------------------- + + +class TestResolveWebsiteTier3: + def test_search_api_stub_returns_none(self, monkeypatch): + monkeypatch.setattr("jobsource.resolve._verify", lambda c, u: None) + get_settings.cache_clear() + monkeypatch.setenv("SEARCH_API_ENABLED", "true") + monkeypatch.setenv("SEARCH_API_KEY", "real-key-abc") + get_settings.cache_clear() + + lookup_called = [] + + def fake_lookup(name, client): + lookup_called.append(name) + return None # stub + + monkeypatch.setattr("jobsource.resolve._search_api_lookup", fake_lookup) + result = resolve_website("Some Obscure Co") + assert result is None + assert lookup_called == ["Some Obscure Co"] + get_settings.cache_clear() + + def test_search_api_disabled_by_default(self, monkeypatch): + monkeypatch.setattr("jobsource.resolve._verify", lambda c, u: None) + lookup_called = [] + monkeypatch.setattr( + "jobsource.resolve._search_api_lookup", + lambda name, client: lookup_called.append(name) or None, + ) + resolve_website("Acme") + assert lookup_called == [] + + +# --------------------------------------------------------------------------- +# _search_api_lookup stub contract +# --------------------------------------------------------------------------- + + +class TestSearchApiLookupStub: + def test_returns_none(self): + assert _search_api_lookup("Acme", None) is None # type: ignore[arg-type] diff --git a/tests/test_sources.py b/tests/test_sources.py new file mode 100644 index 0000000..12816f2 --- /dev/null +++ b/tests/test_sources.py @@ -0,0 +1,333 @@ +"""Tests for sources/base.py, jobspy_source.py, apify_source.py, and the factory. + +All tests are network-free. Heavy provider deps (jobspy, apify-client) are never +imported; their integration points (_scrape, _run_actor) are monkeypatched. +""" +from __future__ import annotations + +from datetime import date, datetime + +import pytest + +from jobsource.config import get_settings +from jobsource.sources import JobSource, get_job_source +from jobsource.sources.apify_source import ApifySource +from jobsource.sources.base import ( + canonical_linkedin_url, + clean_value, + parse_linkedin_job_id, +) +from jobsource.sources.jobspy_source import JobSpySource, _to_datetime + + +# --------------------------------------------------------------------------- +# base helpers +# --------------------------------------------------------------------------- + + +class TestParseLinkedinJobId: + def test_clean_url(self): + assert parse_linkedin_job_id("https://www.linkedin.com/jobs/view/1234567890") == "1234567890" + + def test_trailing_slash(self): + assert parse_linkedin_job_id("https://www.linkedin.com/jobs/view/999/") == "999" + + def test_tracking_params_ignored(self): + url = "https://www.linkedin.com/jobs/view/42?refId=abc&trackingId=xyz" + assert parse_linkedin_job_id(url) == "42" + + def test_none_input(self): + assert parse_linkedin_job_id(None) is None + + def test_non_job_url(self): + assert parse_linkedin_job_id("https://www.linkedin.com/company/acme") is None + + def test_empty_string(self): + assert parse_linkedin_job_id("") is None + + +class TestCanonicalLinkedinUrl: + def test_formats_correctly(self): + assert canonical_linkedin_url("123") == "https://www.linkedin.com/jobs/view/123" + + +class TestCleanValue: + def test_none(self): + assert clean_value(None) is None + + def test_empty_string(self): + assert clean_value("") is None + + def test_whitespace(self): + assert clean_value(" ") is None + + def test_nan(self): + assert clean_value(float("nan")) is None + + def test_normal_string(self): + assert clean_value(" Acme Corp ") == "Acme Corp" + + def test_non_string_coerced(self): + assert clean_value(42) == "42" + + def test_zero_is_kept(self): + assert clean_value(0) == "0" + + +# --------------------------------------------------------------------------- +# _to_datetime (module-level helper in jobspy_source) +# --------------------------------------------------------------------------- + + +class TestToDatetime: + def test_none(self): + assert _to_datetime(None) is None + + def test_nan(self): + assert _to_datetime(float("nan")) is None + + def test_datetime(self): + dt = datetime(2024, 1, 15, 12, 0) + assert _to_datetime(dt) == dt + + def test_date(self): + result = _to_datetime(date(2024, 1, 15)) + assert result == datetime(2024, 1, 15) + + def test_iso_string(self): + assert _to_datetime("2024-01-15") == datetime(2024, 1, 15) + + def test_bad_string(self): + assert _to_datetime("not a date") is None + + def test_nat_type_name(self): + class FakeNaT: + __name__ = "NaTType" + + obj = FakeNaT() + type(obj).__name__ = "NaTType" + assert _to_datetime(obj) is None + + +# --------------------------------------------------------------------------- +# JobSpySource._to_raw_job +# --------------------------------------------------------------------------- + + +class TestJobSpyToRawJob: + def _src(self): + return JobSpySource() + + def _record(self, **overrides) -> dict: + base = { + "job_url": "https://www.linkedin.com/jobs/view/100", + "company": "Acme Corp", + "company_url_direct": "https://acme.com", + "date_posted": "2024-06-01", + "title": "Engineer", + "location": "Remote", + "id": None, + } + base.update(overrides) + return base + + def test_basic_mapping(self): + raw = self._src()._to_raw_job(self._record()) + assert raw is not None + assert raw.job_id == "100" + assert raw.company == "Acme Corp" + assert raw.website == "https://acme.com" + assert raw.linkedin_url == "https://www.linkedin.com/jobs/view/100" + assert raw.listed_at == datetime(2024, 6, 1) + assert raw.title == "Engineer" + assert raw.location == "Remote" + + def test_website_from_company_url_direct_not_company_url(self): + record = self._record(company_url_direct=None, company_url="https://linkedin.com/company/acme") + raw = self._src()._to_raw_job(record) + assert raw is not None + assert raw.website is None # company_url (LinkedIn page) must NOT be used + + def test_nan_website_becomes_none(self): + raw = self._src()._to_raw_job(self._record(company_url_direct=float("nan"))) + assert raw is not None + assert raw.website is None + + def test_missing_job_id_returns_none(self): + record = self._record(job_url="https://example.com/not-a-linkedin-url", id=None) + assert self._src()._to_raw_job(record) is None + + def test_bare_id_fallback(self): + record = self._record(job_url=None, id="999") + raw = self._src()._to_raw_job(record) + assert raw is not None + assert raw.job_id == "999" + + def test_missing_company_returns_none(self): + raw = self._src()._to_raw_job(self._record(company=None)) + assert raw is None + + def test_linkedin_url_is_canonical(self): + record = self._record(job_url="https://www.linkedin.com/jobs/view/55?tracking=abc") + raw = self._src()._to_raw_job(record) + assert raw is not None + assert raw.linkedin_url == "https://www.linkedin.com/jobs/view/55" + assert "tracking" not in raw.linkedin_url + + +# --------------------------------------------------------------------------- +# JobSpySource.fetch_recent_jobs (monkeypatched _scrape) +# --------------------------------------------------------------------------- + + +class TestJobSpyFetchRecentJobs: + def _make_record(self, job_id: str, term_suffix: str = "") -> dict: + return { + "job_url": f"https://www.linkedin.com/jobs/view/{job_id}", + "company": f"Acme{term_suffix}", + "company_url_direct": None, + "date_posted": None, + "title": "Eng", + "location": "Remote", + "id": None, + } + + def test_dedup_across_terms(self, monkeypatch): + src = JobSpySource() + calls = [] + + def fake_scrape(term, location, hours_old, results_wanted): + calls.append(term) + if term == "engineer": + return [self._make_record("1"), self._make_record("2")] + # "developer" returns job 2 again + a new job 3 + return [self._make_record("2"), self._make_record("3")] + + monkeypatch.setattr(src, "_scrape", fake_scrape) + results = src.fetch_recent_jobs(["engineer", "developer"], "US", 72, 10) + ids = {r.job_id for r in results} + assert ids == {"1", "2", "3"} # deduped; "2" not duplicated + assert len(calls) == 2 + + def test_failing_scrape_returns_empty(self, monkeypatch): + src = JobSpySource() + monkeypatch.setattr(src, "_scrape", lambda *a, **k: (_ for _ in ()).throw(RuntimeError("boom"))) + results = src.fetch_recent_jobs(["engineer"], "US", 72, 10) + assert results == [] + + def test_empty_scrape(self, monkeypatch): + src = JobSpySource() + monkeypatch.setattr(src, "_scrape", lambda *a, **k: []) + results = src.fetch_recent_jobs(["engineer"], "US", 72, 10) + assert results == [] + + +# --------------------------------------------------------------------------- +# ApifySource._to_raw_job +# --------------------------------------------------------------------------- + + +class TestApifyToRawJob: + def _src(self): + return ApifySource() + + def test_camel_case_keys(self): + item = { + "jobUrl": "https://www.linkedin.com/jobs/view/77", + "companyName": "BigCo", + "companyWebsite": "https://bigco.com", + "postedAt": "2024-03-01T00:00:00", + "title": "PM", + "location": "NYC", + } + raw = self._src()._to_raw_job(item) + assert raw is not None + assert raw.job_id == "77" + assert raw.company == "BigCo" + assert raw.website == "https://bigco.com" + assert raw.title == "PM" + + def test_snake_case_keys(self): + item = { + "job_url": "https://www.linkedin.com/jobs/view/88", + "company": "LilCo", + "website": "https://lilco.io", + "date_posted": "2024-04-01", + "title": "SWE", + "location": "SF", + } + raw = self._src()._to_raw_job(item) + assert raw is not None + assert raw.job_id == "88" + assert raw.website == "https://lilco.io" + + def test_no_linkedin_url_returns_none(self): + item = {"url": "https://example.com/job/99", "company": "X"} + assert self._src()._to_raw_job(item) is None + + def test_no_company_returns_none(self): + item = {"jobUrl": "https://www.linkedin.com/jobs/view/55"} + assert self._src()._to_raw_job(item) is None + + +# --------------------------------------------------------------------------- +# ApifySource.fetch_recent_jobs (monkeypatched _run_actor) +# --------------------------------------------------------------------------- + + +class TestApifyFetchRecentJobs: + def test_placeholder_token_returns_empty(self): + # Default settings have placeholder token + src = ApifySource() + results = src.fetch_recent_jobs(["engineer"], "US", 72, 10) + assert results == [] + + def test_run_actor_failure_returns_empty(self, monkeypatch): + src = ApifySource() + monkeypatch.setattr(get_settings(), "apify_token", "real-token-abc") + monkeypatch.setattr(src, "_run_actor", lambda *a, **k: (_ for _ in ()).throw(RuntimeError("api error"))) + results = src.fetch_recent_jobs(["engineer"], "US", 72, 10) + assert results == [] + get_settings.cache_clear() + + def test_dedup(self, monkeypatch): + src = ApifySource() + monkeypatch.setattr(get_settings(), "apify_token", "real-token") + items = [ + {"jobUrl": f"https://www.linkedin.com/jobs/view/{i}", "company": "Co"} + for i in [10, 10, 20] + ] + monkeypatch.setattr(src, "_run_actor", lambda *a, **k: items) + results = src.fetch_recent_jobs(["eng"], "US", 72, 10) + assert {r.job_id for r in results} == {"10", "20"} + get_settings.cache_clear() + + +# --------------------------------------------------------------------------- +# Factory +# --------------------------------------------------------------------------- + + +class TestGetJobSource: + def test_default_returns_jobspy(self): + src = get_job_source() + assert isinstance(src, JobSpySource) + + def test_apify_returns_apify(self, monkeypatch): + get_settings.cache_clear() + monkeypatch.setenv("JOB_SOURCE", "apify") + get_settings.cache_clear() + src = get_job_source(get_settings()) + assert isinstance(src, ApifySource) + get_settings.cache_clear() + + def test_unknown_raises(self, monkeypatch): + get_settings.cache_clear() + monkeypatch.setenv("JOB_SOURCE", "indeed") + get_settings.cache_clear() + with pytest.raises(ValueError, match="indeed"): + get_job_source(get_settings()) + get_settings.cache_clear() + + def test_returns_job_source_abc(self): + assert isinstance(get_job_source(), JobSource)