Files

117 lines
3.9 KiB
Python

"""Shared httpx client factory and a small bounded-retry helper.
Every outbound HTTP call in the pipeline should go through a client built here
so timeouts, headers, and bounded retries are applied consistently. Connection-
level retries are handled by the transport; request_with_retries adds bounded
retries for transient HTTP status codes.
"""
from __future__ import annotations
import logging
import time
from collections.abc import Iterable
import httpx
from .config import get_settings
logger = logging.getLogger(__name__)
_RETRY_STATUS = frozenset({429, 500, 502, 503, 504})
_HEAD_NOT_SUPPORTED = frozenset({405, 501})
def default_headers() -> dict[str, str]:
settings = get_settings()
return {
"User-Agent": settings.user_agent,
"Accept": "text/html,application/xhtml+xml,application/json;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
def build_client(**overrides: object) -> httpx.Client:
"""Create a configured sync httpx client.
Timeout and connection-level retries come from settings; callers may pass
httpx.Client kwargs as overrides (e.g. base_url, extra headers).
"""
settings = get_settings()
kwargs: dict[str, object] = {
"timeout": httpx.Timeout(settings.http_timeout),
"headers": default_headers(),
"follow_redirects": True,
"transport": httpx.HTTPTransport(retries=settings.http_max_retries),
}
kwargs.update(overrides)
return httpx.Client(**kwargs) # type: ignore[arg-type]
def request_with_retries(
client: httpx.Client,
method: str,
url: str,
*,
max_retries: int | None = None,
retry_status: Iterable[int] = _RETRY_STATUS,
**kwargs: object,
) -> httpx.Response:
"""Issue a request, retrying on transient status codes with exponential backoff."""
settings = get_settings()
retries = settings.http_max_retries if max_retries is None else max_retries
backoff = settings.http_backoff_factor
statuses = frozenset(retry_status)
last_exc: Exception | None = None
for attempt in range(retries + 1):
try:
response = client.request(method, url, **kwargs) # type: ignore[arg-type]
if response.status_code in statuses and attempt < retries:
sleep_for = backoff * (2**attempt)
logger.warning(
"HTTP %s on %s (attempt %d/%d); retrying in %.1fs",
response.status_code,
url,
attempt + 1,
retries,
sleep_for,
)
time.sleep(sleep_for)
continue
return response
except httpx.HTTPError as exc:
last_exc = exc
if attempt < retries:
sleep_for = backoff * (2**attempt)
logger.warning(
"HTTP error on %s (attempt %d/%d): %s; retrying in %.1fs",
url,
attempt + 1,
retries,
exc,
sleep_for,
)
time.sleep(sleep_for)
continue
raise
if last_exc is not None: # pragma: no cover - defensive
raise last_exc
raise RuntimeError("request_with_retries exhausted without a response")
def probe_url(client: httpx.Client, url: str) -> str | None:
"""Probe a URL with HEAD (fallback GET on 405/501); return final URL or None.
Returns the str representation of the final URL after redirects when the
server responds with a non-error status (<400). Returns None on any
network error or error status.
"""
try:
resp = request_with_retries(client, "HEAD", url, max_retries=1)
if resp.status_code in _HEAD_NOT_SUPPORTED:
resp = request_with_retries(client, "GET", url, max_retries=1)
if resp.status_code < 400:
return str(resp.url)
return None
except Exception:
return None