Connector Implementation Guide¶
How to build a parsimony connector from scratch. Covers provider research, the plugin skeleton, schema design, error mapping, testing, and catalog integration.
For the authoritative plugin contract, see contract.md.
For private / internal connectors (Postgres, Snowflake, S3), see
internal-connectors.md. To contribute a public
connector to the official monorepo, start with
ockham-sh/parsimony-connectors CONTRIBUTING.md.
Overview¶
Every parsimony connector ships as its own PyPI distribution —
parsimony-<name> — registered via the parsimony.providers entry-point
group. The kernel has no in-tree connectors; the package layout below
matches every plugin, from the minimal parsimony-treasury (no
credentials, flat REST) up to parsimony-sdmx (multi-agency SDMX with
catalog publishing).
parsimony-<name>/
├── parsimony_<name>/
│ ├── __init__.py CONNECTORS (+ optional CATALOGS / RESOLVE_CATALOG)
│ ├── connectors.py @connector / @enumerator / @loader functions
│ └── py.typed
├── tests/
│ ├── test_conformance.py assert_plugin_valid — release-blocking
│ └── test_<name>_connectors.py happy path + error mapping (respx mocks)
├── .github/workflows/
│ ├── ci.yml lint + type + test + conformance
│ └── release.yml OIDC PyPI publish on release
├── pyproject.toml entry-point registration + [project.urls] Homepage
├── README.md
├── CHANGELOG.md
└── LICENSE
Tags and MCP exposure. Every
@connectorand@enumeratortakes atags=argument. Tagging a connector with"tool"opts it into the MCP server's tool surface — that means the agent invokes it interactively, and its result must fit in a context window. Read Phase 2 — Tags and MCP exposure before writing your first decorator.
Phase 0 — Provider research¶
Core principle: docs lie. Test everything live before trusting it.
Spend 30–60 minutes researching the provider before writing any connector code. Skipping live exploration saves hours now and loses days later when documented claims turn out to be wrong.
1. Documentation scan (15 min max)¶
- [ ] Find the official API documentation
- [ ] Identify the claimed protocol: REST, SDMX, GraphQL, bulk download
- [ ] Note the base URL and API version
- [ ] Check for an OpenAPI/Swagger spec
- [ ] Note what docs claim about: auth, rate limits, search, response formats
Do not trust any of the above. Every claim gets verified in step 2.
2. Authentication setup¶
A human has to do this before hitting any endpoint. For commercial APIs this step is mandatory — skip it and every test below is invalid.
Verify credentials load correctly with one authenticated request before proceeding.
3. Live API exploration (30–45 min — the critical step)¶
Open a terminal. Use curl or httpx in a REPL:
import httpx, json
r = httpx.get("https://api.example.com/v1/series", params={"id": "CPI"})
print(r.status_code, r.headers.get("content-type"))
print(json.dumps(r.json(), indent=2)[:2000])
Verify each documented endpoint:
- [ ] Does the endpoint exist and respond?
- [ ] Does the response structure match the docs? (Field names, nesting, types — all frequently differ.)
- [ ] What pagination method? (offset/limit, cursor,
Linkheaders.) - [ ] What rate-limit signals? (
X-RateLimit-Remaining, 429 bodies.) - [ ] What nulls look like in actual responses (
null,"NaN",".", empty string, or just missing keys)?
Discover undocumented endpoints:
- Browser dev tools. Open the provider's data-explorer UI, filter Network by XHR/Fetch, and watch what calls the frontend makes. This often reveals search, catalog, and filter endpoints that aren't in the public docs.
- Common URL patterns.
/search,/query,/series,/datasets,/metadata,/v2/,/sdmx/v2.1/dataflow/all/all/latest. - SDMX discovery. Even if the provider doesn't mention SDMX, try
{base_url}/sdmx/v2.1/dataflow/all/all/latest. If it responds with XML, you've found an SDMX endpoint — hand the agency off to theparsimony-sdmxplugin (add the agency to itsALL_AGENCIESset if not already supported).
4. Search-capability tier¶
Classify the provider from live testing, not from docs:
| Tier | Capability | Catalog strategy |
|---|---|---|
| 1 | Native search endpoint that returns good results | Use directly as @connector(tags=["tool"]) |
| 2 | Structured list endpoint (paginated, or SDMX DSD/JSON schema) | @enumerator → Catalog.add_from_result → catalog.search |
| 3 | Website browsable, no API search | Scrape or curate the catalog, then index |
| 4 | Bulk files only | Parse files into @enumerator output |
5. Document findings¶
Before writing code, write down what you learned — what differs from the docs, what rate limits you actually observed, which endpoints are dead, which shape the response takes. Future-you (and reviewers) will need it.
Phase 1 — Scaffold the plugin¶
Use the parsimony plugin template:
Answer the prompts (provider_name, description, author info). The
scaffold produces the structure shown at the top of this doc.
pyproject.toml¶
[project]
name = "parsimony-<your-name>"
version = "0.1.0"
license = "Apache-2.0"
requires-python = ">=3.11"
dependencies = [
"parsimony-core>=0.4,<0.5",
"pydantic>=2.11,<3",
"pandas>=2.3,<3",
"httpx>=0.27,<1",
]
[project.urls]
Homepage = "https://your-provider.example"
[project.optional-dependencies]
dev = [
"pytest>=9.0",
"pytest-asyncio>=1.3",
"respx>=0.22",
"ruff>=0.15",
"mypy>=1.10",
]
[project.entry-points."parsimony.providers"]
<your-name> = "parsimony_<your_name>"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["parsimony_<your_name>"]
The entry-point registration is what makes your plugin discoverable by
the kernel. Exactly one entry per provider module. [project.urls] Homepage
is what the kernel surfaces as Provider.homepage.
Minimum parsimony_<your_name>/__init__.py¶
from parsimony import Connectors
from parsimony_<your_name>.connectors import <your_name>_search, <your_name>_fetch
CONNECTORS = Connectors([<your_name>_search, <your_name>_fetch])
CONNECTORS— required. The immutableConnectorscollection containing every decorated function in this plugin.- Per-connector env vars live on the
@connector(env={...})decorator (see Phase 6). The consumer resolves them viaConnectors.bind_env(). - Provider metadata (homepage, version, description) lives in
pyproject.toml([project.urls] Homepage,[project] description). The kernel reads it on demand viaimportlib.metadata. There is no module-levelENV_VARS,PROVIDER_METADATA, or__version__.
See Phase 8 — Catalog integration below if your plugin ships catalog bundles.
Phase 2 — Tags and MCP exposure¶
Tags control two things: (1) whether the connector is exposed as an interactive MCP tool, and (2) which domain category it belongs to for filtering and catalog organization.
@connector(tags=["macro", "tool"]) # MCP tool + macro category
@connector(tags=["macro"]) # fetch-only, not an MCP tool
@connector(tags=["equity", "tool"]) # equity MCP tool
@enumerator(tags=["macro", "us"]) # US macro enumerator
"tool"— marks connectors exposed as interactive MCP tools (search, discovery, screener). Fetch connectors typically omit"tool"because the agent invokes them programmatically after catalog discovery.- Domain tags (
"macro","equity","us","global") support filtering and catalog organization.
Rule of thumb: if the agent calls it interactively to discover or
search data, add "tool". If the agent calls it programmatically
(after finding what it needs via the catalog), omit "tool".
Phase 3 — Write the params models¶
One Pydantic model per connector function. The framework reads the JSON
Schema from the type annotation and uses it for LLM tool descriptions
via to_llm().
from typing import Annotated
from pydantic import BaseModel, Field, field_validator
class MySourceFetchParams(BaseModel):
"""Parameters for fetching a time series from MySource."""
series_id: Annotated[str, "ns:my_source"] = Field(
..., description="Series identifier (e.g. CPI.TOTAL)"
)
start_date: str | None = Field(
default=None, description="Start date (YYYY-MM-DD)"
)
end_date: str | None = Field(
default=None, description="End date (YYYY-MM-DD)"
)
@field_validator("series_id")
@classmethod
def _non_empty(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("series_id must be non-empty")
return v
Conventions:
Field(...)for required;Field(default=...)for optional.description=on every field — these appear verbatim in the agent's system prompt.Annotated[str, "ns:my_source"]— sentinel string telling the kernel this parameter is the entity code in themy_sourcecatalog namespace. Replaces the olderNamespace("my_source")annotation class.@field_validatorfor input sanitization.
Aliasing reserved Python keywords¶
Some APIs use Python keywords as parameter names (from, type, in,
class). Use alias= with populate_by_name=True to give the field a
legal Python name while serializing with the API's name:
from pydantic import ConfigDict
class MyParams(BaseModel):
model_config = ConfigDict(populate_by_name=True)
from_date: str | None = Field(
default=None,
alias="from",
description="Start date ISO 8601. Use as from_date='2024-01-15'",
)
to_date: str | None = Field(
default=None,
alias="to",
description="End date ISO 8601. Use as to_date='2024-12-31'",
)
The alias= controls how the field serializes to the API query string;
the Python-facing name (from_date) is what callers and LLM agents use.
Critically, the description= must include Use as from_date=
— without it, agents attempt from="2024-01-15" and hit a SyntaxError.
Sharing models across connectors¶
Share a param model across connectors only when the parameters are
truly identical. For example, FMP's income_statement,
balance_sheet, and cash_flow_statement all accept the same
(symbol, period, limit) — one model is correct. If the parameters
differ even slightly, use separate models.
Phase 4 — Design the OutputConfig¶
OutputConfig declares the semantic meaning of each column. Four roles:
| Role | Purpose | Constraint |
|---|---|---|
KEY |
Entity identifier (series_id, ticker) | Exactly one. Namespace optional — defaults to catalog name. |
TITLE |
Human-readable name | Exactly one when KEY is present. |
DATA |
Observation values (date, value, price) | The actual data columns. |
METADATA |
Supplementary context (frequency, units) | Optional. |
from parsimony import OutputConfig, Column, ColumnRole
ENUMERATE_OUTPUT = OutputConfig(columns=[
Column(name="series_id", role=ColumnRole.KEY, namespace="my_source"),
Column(name="title", role=ColumnRole.TITLE),
Column(name="category", role=ColumnRole.METADATA),
Column(name="frequency", role=ColumnRole.METADATA),
])
FETCH_OUTPUT = OutputConfig(columns=[
Column(name="series_id", role=ColumnRole.KEY,
param_key="series_id", namespace="my_source"),
Column(name="title", role=ColumnRole.TITLE),
Column(name="date", dtype="datetime", role=ColumnRole.DATA),
Column(name="value", dtype="numeric", role=ColumnRole.DATA),
])
Column options:
dtype— coercion hint:"auto","datetime","date","numeric","timestamp","bool","str", or a pandas dtype string.param_key— seed this column with the value from a named param field (e.g.param_key="series_id"fills every row with the param value).mapped_name— rename the upstream column to match a canonical name.exclude_from_llm_view— hide the column from agent-facing tool schemas (METADATA only).namespace— catalog namespace for KEY columns. Optional. When omitted,Catalog.add_from_resultuses the catalog's ownnameas the default.
Columns in the DataFrame not declared in OutputConfig automatically
become DATA columns.
dtype reference¶
| dtype | Coercion pipeline | Expected input | Failure mode |
|---|---|---|---|
"auto" |
pandas infers | Any | No validation |
"timestamp" |
pd.to_numeric(errors="coerce") → scale ms→s if >1e11 → pd.to_datetime(unit="s") |
Unix epoch seconds or milliseconds | ParseError if all values NaT |
"date" |
pd.to_datetime(series).dt.normalize() |
ISO 8601 date string or epoch | Raises on unparseable |
"datetime" |
pd.to_datetime(series) |
ISO 8601 datetime or epoch | Raises on unparseable |
"numeric" |
pd.to_numeric(errors="coerce") |
Numeric string or number | ParseError if all NaN |
"bool" |
.astype(bool) |
Truthy/falsy | ParseError on astype failure |
"str" |
.astype(str) |
Any | Never fails |
custom (e.g. "category") |
.astype(dtype) fallback |
Must be a valid pandas dtype | ParseError on astype failure |
"timestamp" vs "date" — the most common coercion trap:
- Use
"timestamp"when the API returns unix epoch values (integers like1704067200or1704067200000). - Use
"date"when the API returns ISO 8601 date strings (like"2024-01-01").
Mixing them up produces all-NaT values and raises ParseError. The
error message names the failing column.
Missing-data sentinels. APIs use ".", "None" (string), "-", or
empty strings. The "numeric" dtype handles these via
pd.to_numeric(errors="coerce"); "date" and "datetime" crash.
Replace sentinels with None in the row-building loop for date/datetime
columns.
Multi-namespace providers¶
Some providers serve multiple asset classes under one API. When identifier
spaces are disjoint (equity ticker AAPL is meaningless in the crypto
endpoint, crypto pair btcusd is meaningless in equity), use separate
namespaces:
# Equities
Column(name="ticker", role=ColumnRole.KEY, namespace="my_source_equity")
# Crypto
Column(name="ticker", role=ColumnRole.KEY, namespace="my_source_crypto")
# Forex
Column(name="ticker", role=ColumnRole.KEY, namespace="my_source_fx")
Annotated[str, "ns:my_source_crypto"] on parameter models restricts
which connectors accept which identifiers — preventing an agent from
passing a crypto pair to an equity endpoint.
OutputConfig pitfalls¶
Match actual response columns, not docs. Column names in
OutputConfig must match what pd.DataFrame(response.json()) actually
produces, not what documentation claims. Make a real API call and
inspect df.columns before defining the config.
Missing columns log a warning. OutputConfig.build_table_result()
matches declared columns against the DataFrame. A typo logs a WARNING
listing unmatched config columns and the available DataFrame columns; it
doesn't crash. Watch logs during development.
You can also assert column match in tests:
assert not MY_OUTPUT.validate_columns(sample_df), (
f"Unmatched: {MY_OUTPUT.validate_columns(sample_df)}"
)
Per-resource OutputConfig mapping¶
When a single connector serves multiple resource types with different schemas, use a mapping:
_OUTPUT_MAP = {
"filing_types": FILING_TYPES_OUTPUT,
"countries": COUNTRIES_OUTPUT,
"languages": GENERIC_OUTPUT,
}
output = _OUTPUT_MAP.get(params.resource, GENERIC_OUTPUT)
return output.build_table_result(df, provenance=..., params=params.model_dump())
Phase 5 — HTTP client¶
Use parsimony.transport.HttpClient. It wraps httpx.AsyncClient and
redacts sensitive query-param values in structured logs (api_key,
token, password, anything ending _token, etc.).
from parsimony.transport import HttpClient
_BASE_URL = "https://api.my-source.example.com/v1"
def _make_http(api_key: str) -> HttpClient:
return HttpClient(
_BASE_URL,
default_params={"api_key": api_key},
timeout=30.0,
)
If your credential query-param name isn't in the default redaction list, file an issue against the kernel to add it.
Phase 6 — Write the connectors¶
import pandas as pd
from parsimony import connector, enumerator, Result, Provenance
from parsimony.transport import HttpClient, map_http_error
@enumerator(
output=ENUMERATE_OUTPUT,
env={"api_key": "MY_SOURCE_API_KEY"},
tags=["my_source"],
)
async def enumerate_my_source(
params: MySourceEnumerateParams,
*,
api_key: str,
) -> pd.DataFrame:
"""Enumerate every series in the MySource catalog."""
async with HttpClient(_BASE_URL, default_params={"api_key": api_key}) as http:
try:
response = await http.get("/series")
except httpx.HTTPStatusError as exc:
raise map_http_error(exc, provider="my_source", op_name="enumerate") from exc
data = response.json()
return pd.DataFrame(data.get("series", []))
@connector(
output=FETCH_OUTPUT,
env={"api_key": "MY_SOURCE_API_KEY"},
tags=["my_source", "tool"],
)
async def my_source_fetch(
params: MySourceFetchParams,
*,
api_key: str,
) -> pd.DataFrame:
"""Fetch time series observations by series_id from MySource."""
async with HttpClient(_BASE_URL, default_params={"api_key": api_key}) as http:
try:
response = await http.get(f"/series/{params.series_id}/observations")
except httpx.HTTPStatusError as exc:
raise map_http_error(exc, provider="my_source", op_name="fetch") from exc
return pd.DataFrame(response.json().get("observations", []))
The env={"api_key": "MY_SOURCE_API_KEY"} argument tells the kernel
which environment variable backs the api_key keyword-only dep. Both
decorators accept the same env= kwarg; multi-credential providers
(username + password, etc.) use one entry per dep.
The decorator wraps the returned DataFrame in a Result with the
provenance generated from the params model and the declared
OutputConfig. You return a DataFrame; the framework handles the rest.
Error mapping¶
Every connector should funnel upstream HTTP failures through
map_http_error. The mapping:
401/403→UnauthorizedError402→PaymentRequiredError429→RateLimitError(carriesretry_after: float)- anything else →
ProviderError
For timeouts: from parsimony.transport import map_timeout_error and
wrap httpx.TimeoutException similarly.
Empty results are a signal, not an error shape. Raise EmptyDataError
when the upstream clearly returned "no data for this input" (an empty
list, status: "no_data", etc.) rather than letting a zero-row DataFrame
propagate silently.
Pagination¶
Document what the provider supports; choose one pattern:
Offset/limit:
rows = []
offset = 0
while True:
response = await http.get("/series", params={"limit": 100, "offset": offset})
batch = response.json().get("series", [])
if not batch:
break
rows.extend(batch)
offset += 100
Cursor:
rows = []
cursor = None
while True:
params = {"limit": 100}
if cursor:
params["cursor"] = cursor
response = await http.get("/series", params=params)
payload = response.json()
rows.extend(payload["data"])
cursor = payload.get("next_cursor")
if not cursor:
break
Link header:
url = "/series?limit=100"
while url:
response = await http.get(url)
rows.extend(response.json())
url = _next_link(response.headers.get("link"))
For burst-heavy enumerators (fan-out enrichment, screener joins), use
parsimony.transport.pooled_client instead of HttpClient to share a
TCP connection pool.
Phase 7 — Test¶
Conformance (release-blocking)¶
# tests/test_conformance.py
import parsimony_my_source
from parsimony.testing import assert_plugin_valid
def test_plugin_conforms() -> None:
assert_plugin_valid(parsimony_my_source)
Or pytest-class style:
from parsimony.testing import ProviderTestSuite
import parsimony_my_source
class TestMySource(ProviderTestSuite):
module = parsimony_my_source
Happy path + error mapping¶
Use respx to mock HTTP responses:
import httpx
import pytest
import respx
from parsimony import UnauthorizedError, RateLimitError
from parsimony_my_source import CONNECTORS
@respx.mock
@pytest.mark.asyncio
async def test_fetch_happy_path():
respx.get("https://api.my-source.example.com/v1/series/CPI/observations").mock(
return_value=httpx.Response(200, json={"observations": [
{"series_id": "CPI", "date": "2024-01-01", "value": 100.0},
]})
)
bound = CONNECTORS.bind(api_key="test-key")
result = await bound["my_source_fetch"](series_id="CPI")
assert result.provenance.source == "my_source"
assert len(result.data) == 1
@respx.mock
@pytest.mark.asyncio
async def test_fetch_401_maps_to_unauthorized():
respx.get("https://api.my-source.example.com/v1/series/X/observations").mock(
return_value=httpx.Response(401, json={"error": "bad key"})
)
bound = CONNECTORS.bind(api_key="live-looking-key")
with pytest.raises(UnauthorizedError) as exc_info:
await bound["my_source_fetch"](series_id="X")
# Ensure the key doesn't leak into the exception message:
assert "live-looking-key" not in str(exc_info.value)
Required error tests for any connector with an api_key / token
dep:
- 401 →
UnauthorizedError, with an assertion that the key doesn't appear in the exception message. - 429 →
RateLimitError, same key-leak assertion.
Run locally¶
All four must pass before you cut a release.
Phase 8 — Catalog integration¶
If your plugin publishes catalog bundles (reusable Hugging Face FAISS
snapshots for agents to load), export CATALOGS on the module.
Static CATALOGS (simple case)¶
When the namespace set is known at import time:
from parsimony_my_source.connectors import my_source_enumerate
CATALOGS = [("my_source", my_source_enumerate)]
parsimony publish --provider my_source --target 'hf://org/catalog-{namespace}'
runs my_source_enumerate, ingests the result into a fresh Catalog,
and pushes it to hf://org/catalog-my_source.
Dynamic CATALOGS (async generator)¶
When namespaces are discovered at build time (e.g. SDMX fans out across live agencies / dataflows):
from functools import partial
from typing import AsyncIterator, Awaitable, Callable
async def CATALOGS() -> AsyncIterator[tuple[str, Callable[[], Awaitable]]]:
yield "my_source_datasets", enumerate_datasets
async for family in _fetch_families():
ns = f"my_source_family_{family.code.lower()}"
yield ns, partial(enumerate_family, family_code=family.code)
RESOLVE_CATALOG (optional reverse lookup)¶
For large dynamic CATALOGS generators, plugins can supply a reverse
lookup so --only NS can build a single catalog without walking the
generator:
def RESOLVE_CATALOG(namespace: str) -> Callable | None:
if namespace == "my_source_datasets":
return enumerate_datasets
prefix = "my_source_family_"
if namespace.startswith(prefix):
family_code = namespace.removeprefix(prefix).upper()
return partial(enumerate_family, family_code=family_code)
return None
When the user runs parsimony publish --only my_source_family_gdp, the
publisher calls RESOLVE_CATALOG("my_source_family_gdp") first and
skips the CATALOGS walk entirely if the resolver returns a callable.
See contract.md §6 for the full spec.
Publishing the plugin¶
- Configure PyPI trusted publishing for your GitHub repo — one-time setup.
- Copy the release workflow from an existing plugin
(e.g.
parsimony-fred/.github/workflows/release.yml) into yours. - Tag a release: GitHub Actions publishes to PyPI via OIDC trusted publishing — no tokens in GitHub secrets.
- Verify discovery in a fresh venv:
Your plugin should appear.
parsimony list --strictruns the conformance suite and exits non-zero on any failure — the bar every release must clear.
Checklist before cutting v0.1.0¶
- [ ]
parsimony_<your_name>module exportsCONNECTORS; optionallyCATALOGS,RESOLVE_CATALOG. - [ ] Per-connector
@connector(env={...})declarations cover every required keyword-only dep. - [ ]
[project.urls] Homepageset inpyproject.toml. - [ ] Entry point registered in
pyproject.tomlunderparsimony.providers. - [ ]
parsimony.testing.assert_plugin_valid(module)passes. - [ ] Tool-tagged connectors have ≥40-char descriptions.
- [ ] Unit tests cover happy path + at least one error path (401, 429, empty).
- [ ]
parsimony list --strictexits 0. - [ ]
ruff check+mypygreen. - [ ]
README.mddocuments install, setup, example usage. - [ ]
LICENSEfile present (Apache-2.0 for official plugins). - [ ] CI workflows green on main.
When to create a per-provider vs protocol-grouped plugin¶
- Per-provider (
parsimony-<provider>) when the API is bespoke. Default. - Protocol-grouped (
parsimony-<protocol>) only when multiple providers share a wire protocol, >60% of implementation, dependency tree, and maintenance cadence. Examples:parsimony-sdmx,parsimony-pxweb.