130 lines
4.6 KiB
Python
130 lines
4.6 KiB
Python
"""Apify ingestion provider (alternative, paid) — implements JobSource.
|
|
|
|
Drop-in replacement for JobSpySource; same interface, selected by config
|
|
(JOB_SOURCE=apify). Two adjustment points when swapping actors:
|
|
1. _run_actor() — the one-line Apify SDK call + actor ID from config.
|
|
2. _to_raw_job() — the field-alias map (actor output schema varies).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from ..config import get_settings
|
|
from ..models import RawJob
|
|
from .base import JobSource, canonical_linkedin_url, clean_value, parse_linkedin_job_id
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Field aliases tried in order when mapping actor output → RawJob.
|
|
# Extend this list when the actor schema is known; first match wins.
|
|
_COMPANY_KEYS = ("company", "companyName", "company_name")
|
|
_URL_KEYS = ("jobUrl", "job_url", "url", "link", "applyUrl")
|
|
_WEBSITE_KEYS = ("companyWebsite", "website", "company_url_direct", "companyUrl")
|
|
_POSTED_KEYS = ("postedAt", "listedAt", "date_posted", "postedDate", "publishedAt")
|
|
_TITLE_KEYS = ("title", "jobTitle", "position")
|
|
_LOCATION_KEYS = ("location", "jobLocation")
|
|
|
|
|
|
class ApifySource(JobSource):
|
|
"""Fetches recent LinkedIn jobs via an Apify actor (paid; actor-agnostic)."""
|
|
|
|
def fetch_recent_jobs(
|
|
self,
|
|
search_terms: list[str],
|
|
location: str,
|
|
hours_old: int,
|
|
results_wanted: int,
|
|
) -> list[RawJob]:
|
|
settings = get_settings()
|
|
token = settings.apify_token
|
|
if not token or token.startswith("PLACEHOLDER"):
|
|
logger.error(
|
|
"Apify token is not configured (APIFY_TOKEN). "
|
|
"Set a real token or switch JOB_SOURCE=jobspy."
|
|
)
|
|
return []
|
|
|
|
run_input = {
|
|
"queries": search_terms,
|
|
"location": location,
|
|
"maxItems": results_wanted,
|
|
}
|
|
try:
|
|
items = self._run_actor(token, settings.apify_actor, run_input)
|
|
except Exception:
|
|
logger.exception("Apify actor run failed")
|
|
return []
|
|
|
|
seen: dict[str, RawJob] = {}
|
|
for item in items:
|
|
raw = self._to_raw_job(item)
|
|
if raw is None:
|
|
continue
|
|
if raw.job_id not in seen:
|
|
seen[raw.job_id] = raw
|
|
|
|
logger.info("Apify: %d unique jobs returned", len(seen))
|
|
return list(seen.values())
|
|
|
|
# ------------------------------------------------------------------
|
|
# Isolated Apify boundary — the one-line actor swap point.
|
|
# ------------------------------------------------------------------
|
|
|
|
def _run_actor(self, token: str, actor_id: str, run_input: dict) -> list[dict]:
|
|
"""Call the Apify actor and return all dataset items as plain dicts."""
|
|
from apify_client import ApifyClient # type: ignore[import-untyped]
|
|
|
|
client = ApifyClient(token)
|
|
run = client.actor(actor_id).call(run_input=run_input)
|
|
return list(client.dataset(run["defaultDatasetId"]).iterate_items())
|
|
|
|
# ------------------------------------------------------------------
|
|
# Field mapping
|
|
# ------------------------------------------------------------------
|
|
|
|
def _to_raw_job(self, item: dict) -> RawJob | None:
|
|
"""Map one actor output item to RawJob; return None to skip."""
|
|
raw_url = _first(item, _URL_KEYS)
|
|
job_id = parse_linkedin_job_id(raw_url)
|
|
if not job_id:
|
|
logger.debug("Skipping Apify item with no LinkedIn job_id: %s", raw_url)
|
|
return None
|
|
|
|
company = _first(item, _COMPANY_KEYS)
|
|
if not company:
|
|
logger.debug("Skipping Apify job %s: no company name", job_id)
|
|
return None
|
|
|
|
from datetime import datetime
|
|
|
|
posted_raw = _first(item, _POSTED_KEYS, coerce=False)
|
|
listed_at: datetime | None = None
|
|
if posted_raw:
|
|
try:
|
|
listed_at = datetime.fromisoformat(str(posted_raw))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
return RawJob(
|
|
job_id=job_id,
|
|
company=company,
|
|
linkedin_url=canonical_linkedin_url(job_id),
|
|
website=_first(item, _WEBSITE_KEYS),
|
|
listed_at=listed_at,
|
|
title=_first(item, _TITLE_KEYS),
|
|
location=_first(item, _LOCATION_KEYS),
|
|
)
|
|
|
|
|
|
def _first(item: dict, keys: tuple[str, ...], *, coerce: bool = True) -> str | None:
|
|
"""Return the first non-empty value found under any of keys."""
|
|
for k in keys:
|
|
v = item.get(k)
|
|
if coerce:
|
|
v = clean_value(v)
|
|
elif v is None:
|
|
continue
|
|
if v:
|
|
return str(v) if not coerce else v
|
|
return None
|