Data stores¶
A data store is the "load" half of Parsimony's fetch/load split. After a
loader returns a
TabularResult, a data store extracts the DATA
columns and persists one DataFrame per entity, keyed by a canonical
(namespace, code) tuple. Where the catalog is the discovery layer
over entities, the data store is the observation layer that holds
the values those entities point at.
Parsimony ships one concrete implementation, InMemoryDataStore, importable from
the top level or from parsimony.stores:
from parsimony import InMemoryDataStore, LoadResult
# equivalently:
from parsimony.stores import InMemoryDataStore, LoadResult
There is no DataStore Protocol yet
The module deliberately names the single backend InMemoryDataStore rather
than exposing an abstract DataStore type. A structural Protocol will be
extracted from the public method set only when a second backend (SQLite,
Parquet, …) lands. Until then, type your code against InMemoryDataStore
directly — there is no DataStore symbol to import.
The CRUD surface¶
InMemoryDataStore wraps a process-local dict mapping each canonical
(namespace, code) key to a pandas DataFrame. Every method is a coroutine — even
though the in-memory backend performs no real I/O — so the interface already
matches a future I/O-backed store. You must await each call.
| Method | Signature | Behavior |
|---|---|---|
upsert |
async upsert(namespace, code, df) -> None |
Insert or replace one entity's observations. Stores df.copy(). |
get |
async get(namespace, code) -> pd.DataFrame \| None |
Return a copy of the stored frame, or None if the key is absent. |
delete |
async delete(namespace, code) -> None |
Idempotently remove an entity. No error if the key is absent. |
exists |
async exists(keys) -> set[tuple[str, str]] |
Given a list of (namespace, code) pairs, return the canonicalized subset that is present. |
load_result |
async load_result(table, *, force=False) -> LoadResult |
Extract DATA columns from a TabularResult and persist each entity. |
Every method routes its key through entity_key(namespace, code), which
normalizes the namespace to lowercase snake_case (^[a-z][a-z0-9_]*$) and strips
the code. An invalid namespace or empty code therefore raises ValueError at call
time — keys never silently miss. See Entities for the
normalization rules.
import asyncio
import pandas as pd
from parsimony import InMemoryDataStore
async def main() -> None:
store = InMemoryDataStore()
df = pd.DataFrame({"value": [1, 2]})
await store.upsert("fred", "gdp", df)
assert await store.exists([("fred", "gdp")]) == {("fred", "gdp")}
# get() returns a defensive copy: mutating it does not touch the store.
got = await store.get("fred", "gdp")
assert got is not None
got.loc[0, "value"] = 999
again = await store.get("fred", "gdp")
assert again is not None and again["value"].iloc[0] == 1
await store.delete("fred", "gdp")
assert await store.get("fred", "gdp") is None
asyncio.run(main())
Copy semantics
upsert stores df.copy() and get returns stored.copy(). Callers can
neither mutate stored state through a returned frame nor leak a later mutation
of the input frame into the store. Each entity's stored frame is independent.
exists returns canonicalized keys, not the raw pairs you passed in. If your
inputs were not already normalized (for example a code with trailing whitespace),
comparing the result against your raw inputs can mismatch — compare against the
normalized form instead.
Loading a result¶
The bridge from connector output to storage is load_result. It reads the
table's output_schema, finds the single KEY column (which must declare a
namespace), keeps the DATA columns, and groups rows by the KEY value into
one DataFrame per distinct entity. The KEY column is consumed for identity;
TITLE, METADATA, and any unmapped extra columns are dropped from the stored
frame.
import asyncio
import pandas as pd
from parsimony import InMemoryDataStore
from parsimony.result import Column, ColumnRole, OutputConfig, Provenance, TabularResult
SCHEMA = OutputConfig(
columns=[
Column(name="series_id", role=ColumnRole.KEY, namespace="fred"),
Column(name="value", role=ColumnRole.DATA),
]
)
async def main() -> None:
store = InMemoryDataStore()
table = TabularResult(
data=pd.DataFrame(
{
"series_id": ["GDP", "GDP", "CPI"],
"value": [1.0, 2.0, 3.0],
"note": ["a", "b", "c"], # unmapped extra column
}
),
provenance=Provenance(source="fred", source_description="FRED"),
output_schema=SCHEMA,
)
stats = await store.load_result(table)
assert (stats.total, stats.loaded, stats.skipped, stats.errors) == (2, 2, 0, 0)
gdp = await store.get("fred", "GDP")
assert gdp is not None
assert list(gdp.columns) == ["value"] # KEY consumed, extra dropped
assert len(gdp) == 2 # two rows for GDP collapsed into one frame
asyncio.run(main())
The two GDP rows collapse into a single two-row frame under code GDP, while
CPI becomes its own one-row entity — so total counts distinct entities, not
input rows. The namespace is taken from the KEY column's namespace=..., not
passed to load_result; a KEY (or METADATA) column is the only place a
Column may set namespace.
Statistics: LoadResult¶
load_result returns a LoadResult, a small pydantic model tallying the run.
All fields default to 0.
| Field | Meaning |
|---|---|
total |
Distinct entities extracted from the table. |
loaded |
Entities actually upserted. |
skipped |
Entities already present and not forced (only with force=False). |
errors |
Entities whose upsert raised a caught exception. |
If the table yields no rows (an empty DataFrame, or no entities after grouping),
load_result returns early with total == 0 and nothing is written.
force and existing keys¶
By default (force=False) load_result calls exists once for all extracted
keys, skips any entity whose canonical key is already present, and upserts only
the rest. With force=True it treats nothing as pre-existing and upserts every
extracted entity unconditionally.
import asyncio
import pandas as pd
from parsimony import InMemoryDataStore
from parsimony.result import Column, ColumnRole, OutputConfig, Provenance, TabularResult
SCHEMA = OutputConfig(
columns=[
Column(name="series_id", role=ColumnRole.KEY, namespace="fred"),
Column(name="value", role=ColumnRole.DATA),
]
)
async def main() -> None:
store = InMemoryDataStore()
await store.upsert("fred", "GDP", pd.DataFrame({"value": [0.0]}))
table = TabularResult(
data=pd.DataFrame({"series_id": ["GDP"], "value": [9.0]}),
provenance=Provenance(source="fred", source_description="FRED"),
output_schema=SCHEMA,
)
first = await store.load_result(table, force=False)
assert (first.loaded, first.skipped) == (0, 1) # GDP already present
second = await store.load_result(table, force=True)
assert (second.loaded, second.skipped) == (1, 0) # overwritten
gdp = await store.get("fred", "GDP")
assert gdp is not None and gdp["value"].iloc[0] == 9.0
asyncio.run(main())
Idempotent backfills
force=False makes load_result a cheap incremental backfill: re-running the
same load only writes entities you have not stored yet, and skipped tells you
how many were already present. Use force=True only when you intend to
overwrite existing observations with fresh values.
End to end: loader output into a store¶
A loader already returns a
TabularResult carrying its output schema, so wiring its output into a store is
a two-line call. The loader's schema satisfies the extraction contract by
construction — exactly one namespaced KEY column and at least one DATA column,
and no TITLE/METADATA columns.
import asyncio
import pandas as pd
from parsimony import InMemoryDataStore
from parsimony.connector import loader
from parsimony.result import Column, ColumnRole, OutputConfig
LOAD_SCHEMA = OutputConfig(
columns=[
Column(name="series_id", role=ColumnRole.KEY, namespace="fred"),
Column(name="value", role=ColumnRole.DATA),
]
)
@loader(output=LOAD_SCHEMA)
async def gdp_observations(series_id: str = "GDP") -> pd.DataFrame:
"""Return observation values for a FRED series."""
return pd.DataFrame({"series_id": [series_id, series_id], "value": [1.0, 2.0]})
async def main() -> None:
store = InMemoryDataStore()
result = await gdp_observations(series_id="GDP") # a TabularResult
stats = await store.load_result(result)
assert stats.loaded == 1
df = await store.get("fred", "GDP")
assert df is not None and list(df.columns) == ["value"] and len(df) == 2
asyncio.run(main())
Extraction errors vs. per-entity errors¶
load_result distinguishes two failure modes.
Extraction-time validation runs before any entity is written and raises —
it is never counted in errors. The table's schema must be well-formed:
| Condition | Raises |
|---|---|
output_schema is None |
ValueError |
data is not a DataFrame/Series |
TypeError |
Not exactly one KEY column |
ValueError |
KEY column has no namespace |
ValueError |
KEY column name absent from the data |
ValueError |
Zero DATA columns |
ValueError |
A declared DATA column missing from the data |
ValueError |
import asyncio
import pandas as pd
from parsimony import InMemoryDataStore
from parsimony.result import Column, ColumnRole, OutputConfig, Provenance, TabularResult
async def main() -> None:
store = InMemoryDataStore()
table = TabularResult(
data=pd.DataFrame({"series_id": ["GDP"], "value": [1.0]}),
provenance=Provenance(source="x", source_description="x"),
output_schema=OutputConfig(
columns=[
Column(name="series_id", role=ColumnRole.KEY), # no namespace
Column(name="value", role=ColumnRole.DATA),
]
),
)
try:
await store.load_result(table)
except ValueError as exc:
assert "namespace" in str(exc)
asyncio.run(main())
Per-entity upsert failures are different. Once extraction succeeds,
load_result upserts entities one by one; if an individual upsert raises
OSError, RuntimeError, ValueError, or TypeError, the failure is logged at
WARNING on the parsimony.stores logger, counted in errors, and the run
continues with the remaining entities. A single bad entity does not abort a load.
Process-local and not concurrency-safe
InMemoryDataStore keeps all state in an in-memory dict; it is lost when the
process exits and is not shared across processes. There is no lock, so
concurrent upsert calls (including those inside two overlapping
load_result runs) targeting the same key race on a plain dict. Drive a single
store from one task, or serialize access yourself.
See also¶
- Loaders and enumerators — the loader verb that produces the
TabularResulta store consumes. - Results and output schemas —
TabularResult,OutputConfig,Column, andColumnRole. - Entities — namespace/code normalization and the
entity_keycanonical key. - The Catalog — the discovery layer; an enumerator's output feeds a catalog, a loader's feeds a data store.