This commit is contained in:
ldy
2026-06-17 08:38:15 -04:00
commit f13b8fc1ca
28 changed files with 894 additions and 0 deletions

38
.env.example Normal file
View File

@@ -0,0 +1,38 @@
# == Job source / ingestion ==
JOB_SOURCE=jobspy # jobspy (default, free) | apify
SEARCH_TERMS=["software engineer"] # JSON list; CLI --search overrides
LOCATION=United States
HOURS_OLD=72
BATCH_SIZE=20
RESULTS_WANTED=50
# == Apify (only if JOB_SOURCE=apify) ==
APIFY_TOKEN=
APIFY_ACTOR=
# == Website resolution (optional search API) ==
SEARCH_API_ENABLED=false
SEARCH_API_KEY=
# == LLM / agent models ==
# Set REAL model identifiers here; code ships with inert placeholders.
# Pydantic AI is model-agnostic — you may also set the provider's native key var:
# Anthropic: ANTHROPIC_API_KEY=...
# OpenAI: OPENAI_API_KEY=...
LLM_API_KEY=
CLASSIFIER_MODEL= # cheap model for link classification
AGENT_MODEL= # stronger model for the browser agent
# == HTTP client ==
HTTP_TIMEOUT=20
HTTP_MAX_RETRIES=3
HTTP_BACKOFF_FACTOR=0.5
USER_AGENT=JobSourceAgent/0.1 (+https://example.com)
# == Storage / output ==
DB_PATH=output/jobsource.db
OUTPUT_CSV=output/results.csv
# == Browser agent ==
ENABLE_BROWSER_AGENT=true
BROWSER_HEADLESS=true

20
.gitignore vendored Normal file
View File

@@ -0,0 +1,20 @@
# Python
__pycache__/
*.py[cod]
.venv/
venv/
*.egg-info/
.pytest_cache/
# Env / secrets
.env
.env.local
# Output (keep the dir, ignore artifacts)
output/*
!output/.gitkeep
# OS / editor
.DS_Store
.idea/
.vscode/

186
CLAUDE.md Normal file
View File

@@ -0,0 +1,186 @@
# CLAUDE.md
Operating instructions for Claude Code on this project. Read this fully before planning or editing. These are decisions, not suggestions — do not re-derive or override them without asking.
---
## Project goal
Build the **AI Job Source Agent**: a Python pipeline that, for recently posted LinkedIn jobs, produces records of the form:
```
company_name, career_page_url, open_position_url
```
It runs in configurable batches, on a schedule, and is **incremental** — re-runs process only NEW jobs. The output is a CSV at `output/results.csv` plus rows in a local database.
The four logical steps:
1. From LinkedIn job listings, get **company name** and **company website URL**.
2. From the company website, find the **careers/jobs page URL**.
3. From the careers page, get **one open position URL**.
4. Emit `company_name, career_page_url, open_position_url`.
---
## Architecture decisions (non-negotiable)
**This is a WORKFLOW, not a multi-agent system.** The orchestrator is plain code (Prefect), not an LLM. Most stages are deterministic. Genuine LLM agency appears in exactly one place: the last-resort fallback for steps 23.
1. **Stage 1 (ingestion) uses a managed data API — NEVER browser automation on LinkedIn.** LinkedIn is a hostile anti-bot target and browser agents get blocked / require login (ToS + ban risk). Default provider is **JobSpy** (free); **Apify** is a drop-in alternative behind the same interface. No hand-built LinkedIn crawler.
2. **Company website is a separate, deterministic resolution step.** LinkedIn often does not expose the company's own site. Resolve `company name → website` via the provider field if present, else a verified domain guess, else an optional search API. This is plumbing, not an agent.
3. **Steps 2 and 3 share ONE cascade, cheapest tier first.** Each tier returns early on success. A full browser agent is the LAST tier only.
4. **When the browser-agent tier fires, it does steps 2 AND 3 in a single session** (find careers page + return one job URL). One agent run, not two.
5. **Dedup keys:** jobs are keyed on the LinkedIn numeric `jobPostingId` (parsed from the job URL); companies are keyed on normalized domain. Resolved careers pages are cached per company so a company is never re-resolved.
6. **Everything swappable lives behind an interface** (provider pattern): job sources, the careers cascade tiers, the extractor. Swapping JobSpy↔Apify, or heuristics↔agent, must not require touching neighbors.
7. **No fine-tuning.** The task is solved with tool use + prompting + the cascade. Use a small/cheap model for link classification and a stronger model for the browser agent; both configurable.
8. **Graceful degradation is mandatory.** If the LLM key or Browser Use / its browser is unavailable at runtime, the affected tier logs clearly and returns `None`, and the pipeline still completes (those records get status `needs_review`).
9. **Design for extension:** adding new ingestion sources (Indeed, Wellfound, ATS firehoses) and swapping SQLite→Postgres should drop in behind the existing interfaces without refactors. Cross-source dedup (later) will use a `(company_domain, normalized_title, location)` fingerprint.
---
## Pipeline stages (the cascade, in order)
**Stage 1 — Ingest (deterministic):** call the job source for recent postings (`hours_old` window) → list of `RawJob{job_id, company, website?, linkedin_url, listed_at}`. Dedup by `job_id`.
**Stage 1b — Resolve website (deterministic):** if `website` empty, resolve from company name (verified `{slug}.com` guess, optional search API).
**Stage 2 — Find careers page (cascade, return on first hit):**
1. **ATS detection** — detect Greenhouse / Lever / Ashby / Workday from the site and use their **public JSON APIs** (most reliable; also yields a job URL for Stage 3).
2. **URL patterns** — probe `/careers`, `/career`, `/jobs`, `/join-us`, `/join`, `careers.{domain}`, `jobs.{domain}`.
3. **Homepage link scan** — fetch homepage, rank anchors by career/job keywords in href/text.
4. **Sitemap** — parse `sitemap.xml` for career/job URLs.
5. **Cheap-LLM classification** — pass extracted anchors to a small model; pick the careers link (Pydantic AI, typed output).
6. **Browser-agent fallback** — Browser Use; fused with Stage 3 (see below).
**Stage 3 — Extract one open position (return on first hit):**
1. **ATS JSON** — if ATS known from Stage 2, return the first posting URL directly.
2. **JobPosting JSON-LD** — parse `application/ld+json` for a `url`.
3. **Job-like anchors** — first link matching `/job`, `/position`, `/opening`, `/vacancy`.
4. **Cheap-LLM classification** — pick the single-job link from anchors.
5. **Browser-agent fallback** — handled inside the fused Stage-2 agent call.
**Stage 4 — Persist & export:** write status to DB, export the 3-field CSV.
---
## Tech stack
- **Python 3.11+**
- **Orchestration/scheduling:** Prefect (`@flow`, retries, interval schedule). Cron documented as a no-daemon fallback.
- **HTTP:** httpx (shared client; timeouts + bounded retries).
- **HTML parsing:** BeautifulSoup + lxml.
- **Ingestion:** JobSpy (`python-jobspy`) default; Apify (`apify-client`) alternative.
- **Structured LLM extraction:** Pydantic AI (model-agnostic, typed).
- **Browser agent (fallback only):** Browser Use (`browser-use`) + Playwright/Chromium.
- **Config:** pydantic-settings (env-driven).
- **Data models:** Pydantic v2.
- **Storage:** SQLite via stdlib `sqlite3` (Postgres-ready behind the DB module).
- **Tests:** pytest.
Do not add other heavy dependencies without asking. (`uv` may be used instead of pip/venv if preferred.)
---
## Project structure
```
jobsource/
__init__.py
config.py # pydantic-settings; env-driven; model IDs/keys read from env with placeholder defaults (never hardcode or look up model IDs)
models.py # Pydantic: RawJob, JobResult; JobStatus enum
http.py # shared httpx client factory: timeout, headers, retry
db.py # SQLite: companies, jobs; dedup, company cache, CSV export
resolve.py # company name -> website (deterministic)
sources/
__init__.py
base.py # JobSource interface: fetch_recent_jobs() -> list[RawJob]
jobspy_source.py # default provider
apify_source.py # alternative provider (same interface)
careers/
__init__.py
cascade.py # find_careers_page() orchestrates the tiers
ats.py # ATS detect + public JSON (Greenhouse/Lever/Ashby/Workday)
heuristics.py # URL patterns, homepage scan, sitemap
classify_llm.py # Pydantic AI link classifier (careers link / job link)
extract.py # extract_open_position(): ATS -> JSON-LD -> anchors -> LLM
agent_fallback.py # Browser Use: fused find-careers + extract-job (last resort)
pipeline.py # run_batch(): dedup, per-record isolation, persistence, summary
flow.py # Prefect flow + schedule
main.py # CLI entry
tests/ # pytest
output/ # results.csv (gitignored)
.env.example
requirements.txt
README.md
```
---
## Data model
`JobStatus` enum: `new | website_resolved | careers_found | position_found | failed | needs_review`.
A record is **complete** when status is `position_found`.
`jobs` table: `job_id` (PK, LinkedIn numeric id), `company_key`, `linkedin_url`, `position_url`, `status`, `listed_at`, `first_seen`.
`companies` table: `company_key` (PK, normalized domain else lowercased name), `name`, `website`, `career_url` (cached), `first_seen`.
CSV columns, exactly: `company_name,career_page_url,open_position_url`. Empty cells allowed for incomplete rows; complete rows sorted first.
---
## Commands
```bash
# setup
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
playwright install chromium # for the browser-agent tier
cp .env.example .env # fill keys as available
# run a batch
python -m jobsource.main --batch-size 20 --search "software engineer" --location "United States"
# scheduled run (Prefect)
python -m jobsource.flow # serves the flow on an interval schedule
# cron fallback: */0 6 * * * cd <repo> && ./.venv/bin/python -m jobsource.main --batch-size 50
# tests
pytest -q
```
`--search` is repeatable. Provide `--help` from `main.py`.
---
## Coding conventions
- Full type hints; Pydantic models for all records crossing module boundaries.
- Every external call (job provider, HTTP fetch, ATS API, LLM, agent) wrapped with a timeout, bounded retry, and try/except. **One failing company must never abort the batch** — catch, record `failed`/`needs_review`, continue.
- Secrets only from env (pydantic-settings). Never hardcode keys; never commit `.env`.
- Each cascade/extract function returns a typed result including which tier/method resolved it (for observability and metrics).
- Keep functions small and independently testable. Pure functions where possible; side effects (DB, network) isolated.
- Log at INFO per stage with the method used; log failures with context.
- Prefer standard library and the listed stack; ask before introducing alternatives.
- Model identifiers are configurable env values with placeholder defaults; never hardcode specific model IDs or fetch model references, the operator sets real values in `.env`.
---
## Output contract & success criteria
- `python -m jobsource.main --batch-size 20` completes without an unhandled exception and writes `output/results.csv`.
- Every row has exactly the three contract columns.
- Re-running immediately processes **0 new jobs** and adds **0 rows** (dedup proven).
- A run summary prints per-stage counts and end-to-end coverage (% of new jobs reaching `position_found`).
- Spot-checked `career_page_url` and `open_position_url` resolve (HTTP 200, not a 404/login wall).
---
## Gotchas (append confirmed findings here as you build — this section is durable memory across /clear)
- Verify ATS JSON field names against live responses before trusting them: Greenhouse `jobs[].absolute_url`; Lever `[].hostedUrl`; Ashby `jobs[].jobUrl`; Workday varies by tenant. Fix in code AND note the confirmed shape here.
- JobSpy populates the company's own site (`company_url_direct`) only sometimes; `resolve.py` must cover the gap. Record the observed fill rate here after the first live fetch.
- LinkedIn parses the numeric job id from `/jobs/view/{id}`; strip tracking query params.
- Browser Use needs Chromium installed (`playwright install chromium`) and an LLM key; without them the tier must degrade gracefully.
- LinkedIn rate-limits aggressively; keep batches small while testing.
- Standard pip struggles with pydantic dependency resolution in this stack; always use uv pip install instead.
- The system Python is protected by PEP 668 (externally-managed-environment). Always use explicit virtual environment paths (e.g., .venv/bin/python, .venv/bin/pytest) for all terminal commands instead of relying on global commands.

44
README.md Normal file
View File

@@ -0,0 +1,44 @@
# AI Job Source Agent
For recently posted LinkedIn jobs, produces records of the form:
```
company_name, career_page_url, open_position_url
```
Runs in configurable batches, on a schedule, and is incremental — re-runs process only new jobs.
## Setup
```bash
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
playwright install chromium # for the browser-agent tier
cp .env.example .env # fill keys as available
```
## Run
```bash
# one batch
python -m jobsource.main --batch-size 20 --search "software engineer" --location "United States"
# scheduled run (Prefect)
python -m jobsource.flow
# cron fallback (no daemon):
# */0 6 * * * cd <repo> && ./.venv/bin/python -m jobsource.main --batch-size 50
```
`--search` is repeatable. Run `python -m jobsource.main --help` for all options.
## Tests
```bash
pytest -q
```
## Output
`output/results.csv` — three columns: `company_name`, `career_page_url`, `open_position_url`.
Complete rows (status `position_found`) are sorted first.

3
jobsource/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""AI Job Source Agent package."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,11 @@
"""Browser Use fused fallback: find careers page AND extract one job URL in one session.
Scaffold stub -- not implemented yet.
"""
# TODO (Stage 2/3 last resort): implement per CLAUDE.md "Stage 2 — tier 6" and "Stage 3 — tier 5".
# This is the LAST tier of the cascade. Fires only when all cheaper tiers in cascade.py
# and extract.py have failed. One Browser Use agent session does both:
# 1. Navigate to the company website and locate the careers/jobs page.
# 2. From the careers page, return the URL of one open position.
# Graceful degradation: if Browser Use / Playwright / LLM key are unavailable, log clearly
# and return (careers_url=None, position_url=None) so the pipeline records needs_review.

View File

@@ -0,0 +1 @@
"""Careers page discovery sub-package (Stage 2 cascade)."""

17
jobsource/careers/ats.py Normal file
View File

@@ -0,0 +1,17 @@
"""ATS detection and public JSON API fetching (Stage 2, tier 1).
Scaffold stub -- not implemented yet.
"""
# TODO (Stage 2, tier 1): implement per CLAUDE.md "Stage 2 — ATS detection".
# Detect Greenhouse / Lever / Ashby / Workday from the company website, then call
# their public JSON APIs (no login needed). On success, return both the careers page URL
# AND the first job posting URL (so Stage 3 can skip its own cascade for ATS companies).
#
# Confirmed ATS JSON field shapes (verify live before trusting — see CLAUDE.md Gotchas):
# Greenhouse: GET https://boards-api.greenhouse.io/v1/boards/{slug}/jobs
# → {"jobs": [{"absolute_url": "...", ...}, ...]}
# Lever: GET https://api.lever.co/v0/postings/{company}?mode=json
# → [{"hostedUrl": "...", ...}, ...]
# Ashby: POST https://api.ashbyhq.com/posting-api/job-board/{slug}
# → {"jobs": [{"jobUrl": "...", ...}, ...]}
# Workday: varies by tenant — needs per-tenant discovery logic

View File

@@ -0,0 +1,13 @@
"""find_careers_page(): orchestrate the Stage 2 tier cascade.
Scaffold stub -- not implemented yet.
"""
# TODO (Stage 2): implement per CLAUDE.md "Stage 2 — Find careers page (cascade, return on first hit)".
# Cascade order (return early on first success):
# 1. ATS detection → ats.detect_and_fetch()
# 2. URL patterns → heuristics.probe_url_patterns()
# 3. Homepage scan → heuristics.scan_homepage_links()
# 4. Sitemap → heuristics.parse_sitemap()
# 5. Cheap-LLM → classify_llm.classify_careers_link()
# 6. Browser agent → agent_fallback.run_fused_agent() (also handles Stage 3)
# Returns (careers_url: str | None, method: str, ats_name: str | None).

View File

@@ -0,0 +1,13 @@
"""Cheap-LLM link classification for careers page and job links (Stage 2, tier 5 / Stage 3, tier 4).
Scaffold stub -- not implemented yet.
"""
# TODO (Stage 2 tier 5 / Stage 3 tier 4): implement per CLAUDE.md "Cheap-LLM classification".
# Uses Pydantic AI (model-agnostic) with the `classifier_model` from config.
# Two typed tasks:
# 1. classify_careers_link(anchors: list[Anchor]) -> CareerLinkResult
# Given extracted <a> tags from a page, pick the careers/jobs page URL.
# 2. classify_job_link(anchors: list[Anchor]) -> JobLinkResult
# Given extracted <a> tags from a careers page, pick one open-position URL.
# Both return a typed Pydantic result including the chosen URL and confidence.
# Graceful degradation: if llm_api_key is placeholder or call fails, return None.

View File

@@ -0,0 +1,11 @@
"""Deterministic careers-page heuristics: URL probing, homepage scan, sitemap (Stage 2, tiers 24).
Scaffold stub -- not implemented yet.
"""
# TODO (Stage 2, tiers 24): implement per CLAUDE.md "Stage 2 — URL patterns / homepage / sitemap".
# Tier 2 — URL patterns: probe /careers, /career, /jobs, /join-us, /join,
# careers.{domain}, jobs.{domain} via HTTP HEAD (or GET if HEAD fails).
# Tier 3 — Homepage link scan: fetch homepage HTML, parse with BeautifulSoup + lxml,
# rank <a> anchors by career/job keywords in href/text, return highest-ranked.
# Tier 4 — Sitemap: fetch sitemap.xml (and sitemap index if present), scan for career/job URLs.
# Each function returns (url: str | None) so cascade.py can return early on first hit.

64
jobsource/config.py Normal file
View File

@@ -0,0 +1,64 @@
"""Application configuration, loaded from the environment via pydantic-settings.
Every setting is env-driven. Model identifiers and API keys are read from the
environment with inert placeholder defaults — the operator supplies real values
in `.env`. Never hardcode real model IDs or secrets in this file.
"""
from __future__ import annotations
from functools import lru_cache
from pathlib import Path
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
case_sensitive=False,
)
# -- Job source / ingestion --------------------------------------------
job_source: str = Field(default="jobspy", description="Ingestion provider: 'jobspy' | 'apify'.")
search_terms: list[str] = Field(default_factory=lambda: ["software engineer"])
location: str = "United States"
hours_old: int = 72
batch_size: int = 20
results_wanted: int = 50
# -- Apify (only used when job_source == 'apify') ----------------------
apify_token: str = "PLACEHOLDER_APIFY_TOKEN"
apify_actor: str = "PLACEHOLDER_APIFY_ACTOR"
# -- Website resolution (optional search API) --------------------------
search_api_enabled: bool = False
search_api_key: str = "PLACEHOLDER_SEARCH_API_KEY"
# -- LLM / agent models (placeholders -- set real IDs in .env) ---------
# NEVER hardcode real model identifiers. These are inert placeholders.
llm_api_key: str = "PLACEHOLDER_LLM_API_KEY"
classifier_model: str = "PLACEHOLDER_CLASSIFIER_MODEL" # cheap model: link classification
agent_model: str = "PLACEHOLDER_AGENT_MODEL" # stronger model: browser agent
# -- HTTP client -------------------------------------------------------
http_timeout: float = 20.0
http_max_retries: int = 3
http_backoff_factor: float = 0.5
user_agent: str = "JobSourceAgent/0.1 (+https://example.com)"
# -- Storage / output --------------------------------------------------
db_path: Path = Path("output/jobsource.db")
output_csv: Path = Path("output/results.csv")
# -- Browser agent (fallback tier) -------------------------------------
enable_browser_agent: bool = True
browser_headless: bool = True
@lru_cache
def get_settings() -> Settings:
"""Return the cached Settings singleton (call get_settings.cache_clear() in tests)."""
return Settings()

10
jobsource/db.py Normal file
View File

@@ -0,0 +1,10 @@
"""SQLite persistence layer: companies table, jobs table, dedup, company cache, CSV export.
Scaffold stub -- not implemented yet.
"""
# TODO (Stage 4): implement per CLAUDE.md "Stage 4 — Persist & export" and "Data model".
# Schema:
# companies(company_key PK, name, website, career_url, first_seen)
# jobs(job_id PK, company_key, linkedin_url, position_url, status, listed_at, first_seen)
# CSV export writes output/results.csv with columns: company_name, career_page_url, open_position_url
# (complete rows — status==position_found — sorted first; incomplete rows follow).

12
jobsource/extract.py Normal file
View File

@@ -0,0 +1,12 @@
"""Extract one open position URL from a careers page (Stage 3).
Scaffold stub -- not implemented yet.
"""
# TODO (Stage 3): implement per CLAUDE.md "Stage 3 — Extract one open position (return on first hit)".
# Cascade order (return early on first hit):
# 1. ATS JSON — if ATS is already known from Stage 2, return first posting URL directly.
# 2. JobPosting JSON-LD — parse application/ld+json for a `url` field.
# 3. Job-like anchors — first <a> matching /job, /position, /opening, /vacancy in href.
# 4. Cheap-LLM classification — Pydantic AI typed output (classifier_model).
# 5. Browser-agent fallback — handled inside the fused Stage-2 agent call in agent_fallback.py.
# Returns (url: str | None, method: str) so callers know which tier resolved it.

10
jobsource/flow.py Normal file
View File

@@ -0,0 +1,10 @@
"""Prefect flow definition and interval schedule.
Scaffold stub -- not implemented yet.
"""
# TODO (scheduling): implement per CLAUDE.md "Orchestration/scheduling: Prefect".
# Wrap run_batch() in a @flow with:
# - Retries on the flow level.
# - An interval schedule (configurable; default daily).
# Run with: python -m jobsource.flow
# Cron fallback (no daemon): */0 6 * * * cd <repo> && ./.venv/bin/python -m jobsource.main --batch-size 50

97
jobsource/http.py Normal file
View File

@@ -0,0 +1,97 @@
"""Shared httpx client factory and a small bounded-retry helper.
Every outbound HTTP call in the pipeline should go through a client built here
so timeouts, headers, and bounded retries are applied consistently. Connection-
level retries are handled by the transport; request_with_retries adds bounded
retries for transient HTTP status codes.
"""
from __future__ import annotations
import logging
import time
from collections.abc import Iterable
import httpx
from .config import get_settings
logger = logging.getLogger(__name__)
_RETRY_STATUS = frozenset({429, 500, 502, 503, 504})
def default_headers() -> dict[str, str]:
settings = get_settings()
return {
"User-Agent": settings.user_agent,
"Accept": "text/html,application/xhtml+xml,application/json;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
def build_client(**overrides: object) -> httpx.Client:
"""Create a configured sync httpx client.
Timeout and connection-level retries come from settings; callers may pass
httpx.Client kwargs as overrides (e.g. base_url, extra headers).
"""
settings = get_settings()
kwargs: dict[str, object] = {
"timeout": httpx.Timeout(settings.http_timeout),
"headers": default_headers(),
"follow_redirects": True,
"transport": httpx.HTTPTransport(retries=settings.http_max_retries),
}
kwargs.update(overrides)
return httpx.Client(**kwargs) # type: ignore[arg-type]
def request_with_retries(
client: httpx.Client,
method: str,
url: str,
*,
max_retries: int | None = None,
retry_status: Iterable[int] = _RETRY_STATUS,
**kwargs: object,
) -> httpx.Response:
"""Issue a request, retrying on transient status codes with exponential backoff."""
settings = get_settings()
retries = settings.http_max_retries if max_retries is None else max_retries
backoff = settings.http_backoff_factor
statuses = frozenset(retry_status)
last_exc: Exception | None = None
for attempt in range(retries + 1):
try:
response = client.request(method, url, **kwargs) # type: ignore[arg-type]
if response.status_code in statuses and attempt < retries:
sleep_for = backoff * (2**attempt)
logger.warning(
"HTTP %s on %s (attempt %d/%d); retrying in %.1fs",
response.status_code,
url,
attempt + 1,
retries,
sleep_for,
)
time.sleep(sleep_for)
continue
return response
except httpx.HTTPError as exc:
last_exc = exc
if attempt < retries:
sleep_for = backoff * (2**attempt)
logger.warning(
"HTTP error on %s (attempt %d/%d): %s; retrying in %.1fs",
url,
attempt + 1,
retries,
exc,
sleep_for,
)
time.sleep(sleep_for)
continue
raise
if last_exc is not None: # pragma: no cover - defensive
raise last_exc
raise RuntimeError("request_with_retries exhausted without a response")

55
jobsource/main.py Normal file
View File

@@ -0,0 +1,55 @@
"""CLI entry point: `python -m jobsource.main`.
Scaffold stub. Argument parsing is wired so `--help` works; the actual batch
run lands in a later step (see jobsource/pipeline.py). Imports only stdlib so
`--help` works before the heavier dependencies are installed.
"""
from __future__ import annotations
import argparse
import sys
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="python -m jobsource.main",
description=(
"AI Job Source Agent -- emit company_name, career_page_url, "
"open_position_url for recently posted LinkedIn jobs."
),
)
parser.add_argument(
"--batch-size",
type=int,
default=None,
help="Number of new jobs to process this run (default from config).",
)
parser.add_argument(
"--search",
action="append",
metavar="TERM",
help="Search term; repeatable. Overrides config search terms.",
)
parser.add_argument(
"--location",
default=None,
help="Job location filter (default from config).",
)
parser.add_argument(
"--hours-old",
type=int,
default=None,
help="Only jobs posted within this many hours (default from config).",
)
return parser
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
print("jobsource: scaffold stub -- pipeline not implemented yet.", file=sys.stderr)
print(f"parsed args: {vars(args)}", file=sys.stderr)
return 0
if __name__ == "__main__":
raise SystemExit(main())

88
jobsource/models.py Normal file
View File

@@ -0,0 +1,88 @@
"""Pydantic data models shared across the pipeline.
RawJob is the normalized output of any job source (Stage 1). JobResult is the
per-job record that flows through the cascade and becomes one CSV row. The CSV
contract is exactly three columns: company_name, career_page_url,
open_position_url.
"""
from __future__ import annotations
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, Field
class JobStatus(str, Enum):
"""Lifecycle of a single job record. Complete == position_found."""
new = "new"
website_resolved = "website_resolved"
careers_found = "careers_found"
position_found = "position_found"
failed = "failed"
needs_review = "needs_review"
class RawJob(BaseModel):
"""Normalized job posting from a source provider (Stage 1 output)."""
job_id: str = Field(..., description="LinkedIn numeric jobPostingId, parsed from the job URL.")
company: str = Field(..., description="Company name as reported by the source.")
linkedin_url: str = Field(..., description="Canonical LinkedIn job-view URL.")
website: str | None = Field(default=None, description="Company's own site, if provided.")
listed_at: datetime | None = Field(default=None, description="When the job was posted, if known.")
title: str | None = Field(default=None, description="Job title, if provided.")
location: str | None = Field(default=None, description="Job location, if provided.")
class JobResult(BaseModel):
"""Per-job record carried through the cascade; serializes to one CSV row."""
job_id: str
company_name: str
company_key: str | None = Field(
default=None, description="Normalized domain, else lowercased name."
)
website: str | None = None
career_page_url: str | None = None
open_position_url: str | None = None
status: JobStatus = JobStatus.new
linkedin_url: str | None = None
listed_at: datetime | None = None
title: str | None = None
location: str | None = None
# Observability: which cascade tier/method resolved each stage.
careers_method: str | None = None
position_method: str | None = None
@property
def is_complete(self) -> bool:
"""A record is complete once an open position has been found."""
return self.status == JobStatus.position_found
@classmethod
def from_raw(cls, raw: RawJob) -> "JobResult":
"""Seed a result from a raw job (status starts at `new`)."""
return cls(
job_id=raw.job_id,
company_name=raw.company,
website=raw.website,
linkedin_url=raw.linkedin_url,
listed_at=raw.listed_at,
title=raw.title,
location=raw.location,
status=JobStatus.new,
)
def to_csv_row(self) -> dict[str, str]:
"""Return exactly the three contract columns (empty string for None)."""
return {
"company_name": self.company_name or "",
"career_page_url": self.career_page_url or "",
"open_position_url": self.open_position_url or "",
}
# The CSV output contract — exactly these columns, in this order.
CSV_COLUMNS: tuple[str, str, str] = ("company_name", "career_page_url", "open_position_url")

12
jobsource/pipeline.py Normal file
View File

@@ -0,0 +1,12 @@
"""Batch orchestration: dedup, per-record isolation, cascade, persistence, summary.
Scaffold stub -- not implemented yet.
"""
# TODO (pipeline): implement run_batch() per CLAUDE.md "Pipeline stages".
# run_batch() contract:
# - Accept batch_size, search terms, location, hours_old overrides.
# - Call the job source, dedup by job_id against the DB (skip already-seen jobs).
# - For each new RawJob, run the full cascade (resolve -> careers -> extract) in isolation:
# one failing record must NEVER abort the batch — catch, record failed/needs_review, continue.
# - Persist each JobResult to the DB and export output/results.csv when done.
# - Print a run summary: per-stage counts + % of new jobs reaching position_found.

10
jobsource/resolve.py Normal file
View File

@@ -0,0 +1,10 @@
"""Resolve company name → company website URL (Stage 1b, deterministic).
Scaffold stub -- not implemented yet.
"""
# TODO (Stage 1b): implement per CLAUDE.md "Stage 1b — Resolve website (deterministic)".
# Resolution order:
# 1. Use provider-supplied website if present.
# 2. Verified domain guess: normalize company name to {slug}.com and probe via HTTP HEAD.
# 3. Optional search API (SEARCH_API_ENABLED=true) as final fallback.
# Returns the resolved URL string, or None if unresolvable.

View File

@@ -0,0 +1 @@
"""Job source provider package."""

View File

@@ -0,0 +1,8 @@
"""Apify ingestion provider (alternative, paid) — implements JobSource.
Scaffold stub -- not implemented yet.
"""
# TODO (Stage 1): implement ApifySource per CLAUDE.md "Stage 1 — Ingest".
# Drop-in alternative to JobSpySource; same JobSource interface.
# Uses apify-client; actor ID from config (APIFY_ACTOR env var).
# Map Apify actor output fields → RawJob; same dedup key (LinkedIn jobPostingId).

16
jobsource/sources/base.py Normal file
View File

@@ -0,0 +1,16 @@
"""JobSource interface: every ingestion provider must implement fetch_recent_jobs().
Scaffold stub -- not implemented yet.
"""
# TODO (Stage 1): define the JobSource ABC per CLAUDE.md "Stage 1 — Ingest (deterministic)".
# Interface:
# class JobSource(ABC):
# @abstractmethod
# def fetch_recent_jobs(
# self,
# search_terms: list[str],
# location: str,
# hours_old: int,
# results_wanted: int,
# ) -> list[RawJob]: ...
# Implementations: jobspy_source.JobSpySource, apify_source.ApifySource.

View File

@@ -0,0 +1,10 @@
"""JobSpy ingestion provider (default, free) — implements JobSource.
Scaffold stub -- not implemented yet.
"""
# TODO (Stage 1): implement JobSpySource per CLAUDE.md "Stage 1 — Ingest".
# Uses python-jobspy (python_jobspy). Key notes:
# - Search LinkedIn via JobSpy; parse LinkedIn numeric jobPostingId from the job URL.
# - Map JobSpy result fields → RawJob (company, website from company_url_direct if present).
# - Strip tracking query params from linkedin_url; keep only /jobs/view/{id}.
# - Log observed fill rate of company_url_direct (see CLAUDE.md Gotchas).

0
output/.gitkeep Normal file
View File

25
requirements.txt Normal file
View File

@@ -0,0 +1,25 @@
# Core
httpx>=0.27
pydantic>=2.6
pydantic-settings>=2.2
# HTML parsing
beautifulsoup4>=4.12
lxml>=5
# Ingestion
python-jobspy
apify-client
# Structured LLM extraction
pydantic-ai
# Browser agent (fallback only)
browser-use
playwright
# Orchestration / scheduling
prefect
# Tests
pytest

0
tests/__init__.py Normal file
View File

119
tests/test_smoke.py Normal file
View File

@@ -0,0 +1,119 @@
"""Scaffold smoke tests — verify the package is importable and core models are correct.
No stage logic, no network calls, no heavy deps beyond pydantic/pydantic-settings.
"""
from __future__ import annotations
import pytest
def test_package_version() -> None:
import jobsource
assert isinstance(jobsource.__version__, str)
assert jobsource.__version__ # non-empty
def test_cli_help_exits_zero() -> None:
from jobsource.main import build_parser
with pytest.raises(SystemExit) as exc_info:
build_parser().parse_args(["--help"])
assert exc_info.value.code == 0
def test_cli_parser_flags() -> None:
from jobsource.main import build_parser
parser = build_parser()
args = parser.parse_args(
["--batch-size", "10", "--search", "engineer", "--search", "pm",
"--location", "Remote", "--hours-old", "48"]
)
assert args.batch_size == 10
assert args.search == ["engineer", "pm"]
assert args.location == "Remote"
assert args.hours_old == 48
def test_job_status_enum() -> None:
from jobsource.models import JobStatus
assert JobStatus.new == "new"
assert JobStatus.position_found == "position_found"
assert JobStatus.needs_review == "needs_review"
# All six values defined
assert len(JobStatus) == 6
def test_raw_job_model() -> None:
from jobsource.models import RawJob
job = RawJob(
job_id="123456789",
company="Acme Corp",
linkedin_url="https://www.linkedin.com/jobs/view/123456789",
)
assert job.job_id == "123456789"
assert job.website is None
def test_job_result_from_raw_and_csv_row() -> None:
from jobsource.models import CSV_COLUMNS, JobResult, JobStatus, RawJob
raw = RawJob(
job_id="987",
company="Globex",
linkedin_url="https://www.linkedin.com/jobs/view/987",
website="https://globex.example.com",
title="Software Engineer",
location="Remote",
)
result = JobResult.from_raw(raw)
assert result.status == JobStatus.new
assert result.company_name == "Globex"
assert result.website == "https://globex.example.com"
assert not result.is_complete
row = result.to_csv_row()
assert set(row.keys()) == set(CSV_COLUMNS)
assert row["company_name"] == "Globex"
assert row["career_page_url"] == ""
assert row["open_position_url"] == ""
def test_job_result_is_complete() -> None:
from jobsource.models import JobResult, JobStatus
result = JobResult(
job_id="1",
company_name="Initech",
status=JobStatus.position_found,
career_page_url="https://initech.com/careers",
open_position_url="https://initech.com/careers/jobs/42",
)
assert result.is_complete
row = result.to_csv_row()
assert row["career_page_url"] == "https://initech.com/careers"
assert row["open_position_url"] == "https://initech.com/careers/jobs/42"
def test_settings_load_defaults() -> None:
from jobsource.config import Settings
s = Settings()
assert s.job_source == "jobspy"
assert s.batch_size == 20
assert s.hours_old == 72
# Model IDs must remain as inert placeholders — never real identifiers.
assert s.classifier_model.startswith("PLACEHOLDER")
assert s.agent_model.startswith("PLACEHOLDER")
assert s.llm_api_key.startswith("PLACEHOLDER")
def test_csv_columns_constant() -> None:
from jobsource.models import CSV_COLUMNS
assert CSV_COLUMNS == ("company_name", "career_page_url", "open_position_url")
assert len(CSV_COLUMNS) == 3