HTTP transport¶
Most connectors fetch from an HTTP API, and every one of them needs the same
unglamorous plumbing: a base URL, default credentials, transient-retry logic,
secret redaction in logs, and a translation from httpx failures into the
typed errors that agents understand. Parsimony's parsimony.transport
package provides that layer so connector authors write the fetch, not the
plumbing.
These symbols are not re-exported from the top-level parsimony package.
Import the primitives from parsimony.transport and the convenience constructors
from parsimony.transport.helpers:
from parsimony.transport import (
HttpClient,
HttpRetryPolicy,
DEFAULT_HTTP_RETRY_POLICY,
map_http_error,
map_timeout_error,
parse_retry_after,
pooled_client,
redact_url,
redact_params_for_logging,
redact_sensitive_text,
)
from parsimony.transport.helpers import fetch_json, make_http_client, make_api_key_client
The layer is built on httpx, a base dependency
of parsimony-core, so everything on this page runs with only the core install
(no standard/litellm extras and no plugin).
HttpClient¶
HttpClient is an async wrapper around httpx.AsyncClient. It holds the
provider's base URL, default headers and query params, a timeout, TLS settings,
redirect policy, and a retry policy. Its one coroutine, request(), issues a
single logical request and returns the raw httpx.Response.
class HttpClient:
def __init__(
self,
base_url: str,
*,
timeout: float = 30.0,
verify_ssl: bool = True,
headers: dict[str, Any] | None = None,
query_params: dict[str, Any] | None = None,
follow_redirects: bool = True,
max_redirects: int = 5,
_transport: httpx.AsyncBaseTransport | None = None,
shared_client: httpx.AsyncClient | None = None,
retry_policy: HttpRetryPolicy | None = DEFAULT_HTTP_RETRY_POLICY,
) -> None: ...
| Parameter | Default | Behavior |
|---|---|---|
base_url |
required | Provider root. Its trailing slash is stripped on construction. |
timeout |
30.0 |
Per-request timeout in seconds. |
verify_ssl |
True |
TLS certificate verification. |
headers |
None |
Default headers merged into every request. |
query_params |
None |
Default query params merged into every request (the API-key idiom). |
follow_redirects |
True |
Whether httpx follows redirects. |
max_redirects |
5 |
Redirect chain cap. |
_transport |
None |
Inject an httpx.AsyncBaseTransport (e.g. httpx.MockTransport) for tests. |
shared_client |
None |
Reuse one pooled httpx.AsyncClient instead of opening a fresh one per request. |
retry_policy |
DEFAULT_HTTP_RETRY_POLICY |
An HttpRetryPolicy, or None to disable retries. Validated on construction. |
The base_url property returns the stored URL with its trailing slash already
removed:
from parsimony.transport import HttpClient
http = HttpClient("https://api.example.com/v1/")
assert http.base_url == "https://api.example.com/v1"
Sending a request¶
request() builds the URL as base_url + "/" + path.lstrip("/"), merges the
default query params and headers with the per-call ones (the per-call values
win), emits a redacted structured log line, runs the retry loop, logs any
redirect chain and the final response, and returns the response object.
async def request(
self,
method: str,
path: str,
params: dict[str, Any] | None = None,
json: dict[str, Any] | None = None,
headers: dict[str, Any] | None = None,
) -> httpx.Response: ...
Because it is a coroutine, drive it from an async def under asyncio.run.
The example below injects an httpx.MockTransport so it runs offline:
import asyncio
import httpx
from parsimony.transport import HttpClient, HttpRetryPolicy
async def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, json={"ok": True}, request=request)
async def main() -> int:
http = HttpClient(
"https://api.example.com",
timeout=5.0,
headers={"X-App": "demo"},
query_params={"apikey": "secret"}, # merged into every request
retry_policy=HttpRetryPolicy(max_attempts=2, base_delay_s=0.0, jitter_s=0.0),
_transport=httpx.MockTransport(handler),
)
response = await http.request("GET", "/status")
return response.status_code
assert asyncio.run(main()) == 200
request() never calls raise_for_status()
request() returns the raw response for any status — including 4xx and
5xx, and even after retries are exhausted. A 503 looks just like a 200 unless
you check. Call response.raise_for_status() yourself and feed the resulting
httpx.HTTPStatusError to map_http_error, or use the
fetch_json helper which does both for you.
One client per request, by default¶
By default each request() call opens a short-lived httpx.AsyncClient inside
an async with block and closes it when the call returns. This deliberately
avoids sharing TCP connections across distinct asyncio.run() event loops,
which httpx does not support. The cost is that a tight fan-out loop pays for a
fresh connection each time — use pooled_client to opt
into pooling when one logical operation issues many requests.
Retries and backoff¶
HttpRetryPolicy is a frozen dataclass describing when and how HttpClient
retries transient failures.
@dataclass(frozen=True)
class HttpRetryPolicy:
max_attempts: int = 3
base_delay_s: float = 0.25
max_delay_s: float = 8.0
jitter_s: float = 0.1
retryable_methods: frozenset[str] = frozenset({"GET", "HEAD", "OPTIONS"})
retryable_statuses: frozenset[int] = frozenset({429, 500, 502, 503, 504})
| Field | Default | Meaning |
|---|---|---|
max_attempts |
3 |
Total attempts for a retryable method (1 = no retry). |
base_delay_s |
0.25 |
Base of the exponential backoff. |
max_delay_s |
8.0 |
Hard cap on any single delay. Must be > 0. |
jitter_s |
0.1 |
Upper bound of the uniform random jitter added per delay. |
retryable_methods |
{GET, HEAD, OPTIONS} |
Only these idempotent methods are retried. |
retryable_statuses |
{429, 500, 502, 503, 504} |
Response statuses that trigger a retry. |
DEFAULT_HTTP_RETRY_POLICY is the module-level validated instance with exactly
these defaults; it is the default value of HttpClient(retry_policy=...).
The retry rules:
- Methods. A non-idempotent method (e.g.
POST) always runs exactly one attempt regardless of the policy. Only methods inretryable_methodsretry. - Responses. A response is retried only when its status is in
retryable_statuses, the method is retryable, and there are attempts left. - Exceptions. A raised exception is retried only when it is one of
httpx.TimeoutException,httpx.ConnectError,httpx.ReadError, orhttpx.RemoteProtocolErrorand there are attempts left; any other exception re-raises immediately. - Backoff. For a normal retryable status the delay is
base_delay_s * 2 ** (attempt - 1)plus uniform jitter in[0, jitter_s), capped atmax_delay_s. For a 429, the delay instead comes from the server'sRetry-Afterviaparse_retry_after, still clamped tomax_delay_s. - Disabling.
retry_policy=Nonecollapsesmax_attemptsto 1 and turns off both exception and status retries.
When attempts run out, request() returns the last response unchanged so you can
still map its status to a typed error.
Make retry tests deterministic
The first retry under the defaults waits ~0.25s plus jitter. In tests, set
base_delay_s=0.0 and jitter_s=0.0 so retries fire instantly.
The policy validates itself on construction via HttpClient (and at import for
the module default). validate() raises ValueError for max_attempts < 1,
base_delay_s < 0, max_delay_s <= 0, or jitter_s < 0 — note max_delay_s
must be strictly positive while the other delays may be 0.
from parsimony.transport import HttpRetryPolicy
# Honor Retry-After on a 429, but never wait longer than max_delay_s.
policy = HttpRetryPolicy(max_delay_s=8.0)
assert policy.backoff_seconds(1, retry_after=30.0) == 8.0 # clamped down
assert policy.backoff_seconds(1, retry_after=2.0) == 2.0
Mapping errors¶
request() returns the raw response and surfaces raw httpx exceptions on
transport failure; it is your job to turn those into typed errors. Three free
functions translate them into the typed parsimony.errors hierarchy so the rest
of your connector — and any agent consuming it — sees consistent, class-aware
errors. (The fetch_json helper applies these mappers for you, so
it raises the typed errors directly rather than raw httpx exceptions.) Each
function is NoReturn (it always raises) and chains the original via
raise ... from exc, so the traceback and __cause__ are preserved.
map_http_error¶
| Upstream status | Raised error |
|---|---|
| 401, 403 | UnauthorizedError |
| 402 | PaymentRequiredError |
| 429 | RateLimitError with retry_after from parse_retry_after |
| any other | ProviderError carrying status_code |
import httpx
from parsimony.errors import RateLimitError
from parsimony.transport import map_http_error
request = httpx.Request("GET", "https://api.example.com/v1/data?api_key=secret")
response = httpx.Response(429, headers={"Retry-After": "30"}, request=request)
exc = httpx.HTTPStatusError("rate limited", request=request, response=response)
try:
map_http_error(exc, provider="example", op_name="get_data")
except RateLimitError as err:
assert err.provider == "example"
assert err.retry_after == 30.0
assert err.__cause__ is exc # original chained
assert "secret" not in str(err) # message never leaks the API key
Messages never embed the URL or credentials
The default messages from map_http_error/map_timeout_error name only the
provider and op_name; they deliberately omit the request URL and any
secret. If you want a URL in a message you build yourself, redact it first
with redact_url. See Errors for the
message= override escape hatch and its security caveat.
map_timeout_error¶
Always raises ProviderError with status_code=408 — the HTTP "request
timeout" semantic — so downstream code can treat a timeout uniformly with other
transport failures. There is no dedicated timeout exception class; key off
status_code == 408.
import httpx
from parsimony.errors import ProviderError
from parsimony.transport import map_timeout_error
try:
map_timeout_error(httpx.TimeoutException("slow"), provider="example", op_name="get_data")
except ProviderError as err:
assert err.status_code == 408
parse_retry_after¶
Extracts a retry delay (in seconds) from a 429 response, in order:
- the numeric
Retry-Afterheader (seconds); - the
X-Ratelimit-Resetheader read as a Unix epoch, returningmax(1.0, reset - now); - the
default(60.0 seconds).
Every candidate must fall in (0, 86400]; anything outside — for instance a raw
epoch timestamp mistakenly placed in Retry-After — is skipped and the next
source is tried. This 24-hour cap is a paired invariant with RateLimitError,
which raises ValueError for a retry_after above 86400 on the theory that such
a value is a mis-encoded timestamp, not a duration.
import httpx
from parsimony.transport import parse_retry_after
resp = httpx.Response(429, headers={"Retry-After": "42"}, request=httpx.Request("GET", "https://x.test"))
assert parse_retry_after(resp) == 42.0
no_header = httpx.Response(429, headers={}, request=httpx.Request("GET", "https://x.test"))
assert parse_retry_after(no_header, default=30.0) == 30.0
Redaction¶
Secret redaction is the security backbone of the transport layer — it is what
keeps API keys out of your logs and exception messages. HttpClient.request
applies it automatically to its own log lines; the functions are also exported
for you to use when you build messages or log statements yourself.
| Function | Returns | Marker | Notes |
|---|---|---|---|
redact_url(url) |
str |
*** |
Masks sensitive query-param values; leaves non-sensitive params intact; returns the URL unchanged if it has no query string. |
redact_params_for_logging(params) |
dict |
***REDACTED*** |
Shallow copy of a params dict with sensitive values masked. |
redact_sensitive_text(text) |
str |
*** |
Scans arbitrary text for embedded http(s) URLs and applies redact_url to each. |
A parameter is "sensitive" when its name (lowercased, with hyphens normalized to
underscores) matches the built-in set: api_key, apikey, api_token, token,
access_token, refresh_token, id_token, client_secret, secret,
password, authorization.
from parsimony.transport import redact_url, redact_params_for_logging, redact_sensitive_text
assert redact_url("https://x.test/p?api_key=abc&series=UNRATE") == \
"https://x.test/p?api_key=%2A%2A%2A&series=UNRATE"
msg = redact_sensitive_text("failed at https://x.test/p?token=t1&series=A")
assert "t1" not in msg and "series=A" in msg
log_params = redact_params_for_logging(
{"series_id": "UNRATE", "api_key": "sk", "session_token": "x"}
)
assert log_params == {
"series_id": "UNRATE",
"api_key": "***REDACTED***",
"session_token": "***REDACTED***", # any *_token key is caught too
}
Two redactors, two rules, two markers
redact_params_for_logging masks any key ending in _token (e.g.
session_token) in addition to the explicit set, and uses the marker
***REDACTED***. redact_url uses the explicit name set only — it does
not apply the _token suffix rule — and its marker is ***. So a
?session_token=x query param is masked in structured logs but not by
redact_url unless its exact name is in the set. redact_url also only
touches the query string: secrets in the path or userinfo are left alone.
When HttpClient.request follows a redirect, it logs the final URL with the
query string stripped entirely (scheme, host, and path only) so no redirect-chain
secret leaks into the logs.
Connection pooling¶
For a single logical operation that issues many requests — an enumerator loop, a
screener fan-out — open one underlying httpx.AsyncClient and reuse it.
pooled_client is an async context manager that does this: it builds one client
from the source HttpClient's configuration (base URL, headers, query params,
timeout, TLS, transport) and yields a new HttpClient that routes every request
through it.
import asyncio
import httpx
from parsimony.transport import HttpClient, pooled_client
async def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, json={"path": request.url.path}, request=request)
async def main() -> list[int]:
http = HttpClient(
"https://api.example.com",
query_params={"apikey": "k"},
_transport=httpx.MockTransport(handler),
)
statuses: list[int] = []
async with pooled_client(http) as shared: # one httpx.AsyncClient reused
for key in ("a", "b", "c"):
response = await shared.request("GET", f"/data/{key}")
statuses.append(response.status_code)
return statuses
assert asyncio.run(main()) == [200, 200, 200]
The yielded client shares one connection pool for the whole async with block.
Do not hold onto it past the block, and do not use it across event loops. You can
also create a pooled client directly with client.with_shared_client(httpx_client),
which returns a new HttpClient reusing the supplied httpx.AsyncClient.
Convenience constructors and fetch_json¶
The parsimony.transport.helpers submodule provides thin constructors and a
one-call GET-and-parse helper. These are the symbols most connector bodies
actually touch.
Helper timeout differs from HttpClient
make_http_client and make_api_key_client default timeout to 15.0
seconds, shorter than HttpClient's own intrinsic default of 30.0.
make_http_client and make_api_key_client¶
def make_http_client(
base_url: str,
*,
query_params: dict[str, Any] | None = None,
headers: dict[str, Any] | None = None,
timeout: float = 15.0,
) -> HttpClient: ...
def make_api_key_client(
base_url: str,
*,
api_key: str,
api_key_param: str = "apikey",
timeout: float = 15.0,
) -> HttpClient: ...
make_api_key_client is the common case: it pre-sets the API key as a default
query parameter (named apikey unless you override api_key_param), so every
request the returned client makes carries the key without you threading it
through each call. This pairs naturally with Connector.bind — bind the key once
at provider-setup time so it never appears on the connector's call surface. See
Calling, binding, and composing.
fetch_json¶
async def fetch_json(
http: HttpClient,
*,
path: str,
params: dict[str, Any] | None = None,
provider: str,
op_name: str,
) -> Any: ...
fetch_json is the recommended one-liner for a JSON GET. It:
- drops any param whose value is
None(so optional connector arguments map to "omit this query param" rather than "sendNone"); - issues
GET /{path}throughhttp.request; - calls
response.raise_for_status(); - maps
httpx.HTTPStatusErrorviamap_http_errorandhttpx.TimeoutExceptionviamap_timeout_error— turning transport failures into typed errors; - returns
response.json().
import asyncio
import httpx
from parsimony.transport import HttpClient, HttpRetryPolicy
from parsimony.transport.helpers import fetch_json
async def handler(request: httpx.Request) -> httpx.Response:
# `start=None` was dropped; `series_id` was sent.
assert b"start" not in request.url.query
assert b"series_id" in request.url.query
return httpx.Response(200, json={"series_id": "UNRATE", "value": 3.9}, request=request)
async def main() -> dict:
http = HttpClient(
"https://api.example.com/v1",
retry_policy=HttpRetryPolicy(max_attempts=1),
_transport=httpx.MockTransport(handler),
)
return await fetch_json(
http,
path="series",
params={"series_id": "UNRATE", "start": None}, # None is dropped
provider="example",
op_name="get_series",
)
print(asyncio.run(main())) # {'series_id': 'UNRATE', 'value': 3.9}
In a real provider plugin you would build the client once with
make_api_key_client, pass the live API key, and call fetch_json from inside
your @connector body — letting the framework wrap the returned data in a
Result/TabularResult and surfacing any transport failure as a
typed error.
Logging¶
The module logs through logging.getLogger("parsimony.transport"). At INFO it
emits the request line (with a redacted URL and redacted params), a redirect
summary, and the response status and size; at WARNING it emits retry notices.
Structured extra fields include http_method, http_url, http_path,
http_params, http_status_code, http_response_size, http_redirect_hops,
and http_redirect_target. No environment variables configure this layer
directly — all tuning is per HttpClient / HttpRetryPolicy instance.
See also¶
- Errors — the typed exception hierarchy these mappers raise, and the
message=override caveat. - Authoring a provider plugin — building a
parsimony-<name>distribution that uses this transport layer. - Calling, binding, and composing —
bindan API key once so it stays off the connector's call surface. - Results and output schemas — how the framework wraps connector return values into
Result/TabularResult.