Jammi AI
Jammi is an embeddable AI engine that brings model inference into your data pipeline. Register data sources, run SQL queries, generate embeddings, search with vector similarity, fine-tune models on your domain, and evaluate results — all without leaving your application.
What Jammi does
- Query local data with SQL — register Parquet, CSV, and JSON files, run full SQL via DataFusion
- Federate external databases — query PostgreSQL and MySQL alongside local files
- Generate embeddings — load any BERT-family model from HuggingFace Hub (or local safetensors / ONNX), persist results to Parquet with sidecar ANN indexes
- Vector search — ANN similarity search over embedding tables with automatic fallback to brute-force
- Search builder — fluent API for
.join(),.annotate(),.filter(),.sort(),.limit(),.select(),.run() - Evidence provenance —
retrieved_byandannotated_bytracking on every search result - Fine-tuning — LoRA adapters with contrastive loss to improve embeddings for your domain
- Evaluation — retrieval metrics (recall@k, precision@k, MRR, nDCG), classification (accuracy, F1), and A/B model comparison
- Per-row error handling — null or invalid text produces error status per row, not a batch failure
- Model caching — LRU eviction, ref-counted guards, single-flight loading
- GPU scheduling — memory-budget admission control with RAII permits
- Crash recovery — on restart, recovers result tables stuck in “building” state
- Inference observability — attach observers to hook into every output batch
Three ways to use Jammi
| Interface | Best for | Install |
|---|---|---|
| Rust library | Embedding Jammi into Rust applications | cargo add jammi-ai |
| Python package | Data science, notebooks, scripts | pip install jammi-ai |
| CLI | Shell workflows, quick queries, ops | cargo install jammi-cli |
All three interfaces share the same engine, configuration, and storage format. Embeddings generated from Python are queryable from the CLI, and vice versa.
For multi-language access or BI tool integration, jammi serve starts an Arrow Flight SQL server — any Arrow client can connect and query via standard SQL.
Crates
| Crate | Purpose |
|---|---|
jammi-db | Query engine, configuration, catalog, source management, Parquet storage, ANN indexes |
jammi-ai | Model loading, inference execution, embedding pipeline, vector search, evidence model, fine-tuning, evaluation |
jammi-server | Arrow Flight SQL server and HTTP health endpoint |
jammi-cli | Command-line interface |
jammi-python | Python bindings via PyO3 |
jammi-db has no dependency on jammi-ai. You can use it standalone for SQL queries over local data without pulling in the AI layer.
Installation
Rust
Add Jammi to your Cargo.toml:
[dependencies]
jammi-db = "0.1"
jammi-ai = "0.1"
tokio = { version = "1", features = ["full"] }
Or install the CLI:
cargo install jammi-cli
Build dependencies (Linux)
If building from source, you need a C compiler and protoc:
# Debian/Ubuntu
apt-get install protobuf-compiler gcc g++ pkg-config
# RHEL/AlmaLinux
yum install protobuf-compiler gcc gcc-c++ pkg-config
All other native libraries (lzma, zstd, zlib, sqlite) are vendored and compiled from source automatically. These tools are pre-installed in the devcontainer and CI images.
Python
pip install jammi-ai
Requires Python 3.8+. Pre-built wheels are available for Linux, macOS, and Windows.
From source
git clone https://github.com/f-inverse/jammi-ai.git
cd jammi-ai
cargo build --release
The CLI binary is at target/release/jammi.
For the Python package from source:
pip install maturin
maturin develop --release
Runtime requirements
Jammi has no mandatory runtime dependencies beyond the binary itself.
Optional:
- CUDA toolkit + cuDNN for GPU inference (CPU works out of the box)
- HuggingFace Hub access for downloading models (first run downloads ~90MB for MiniLM, cached thereafter)
- PostgreSQL / MySQL client libraries if using federated database sources
Set HF_TOKEN for gated models, or HF_HOME to control the cache location.
Quickstart: Rust
This walkthrough registers a local data file, runs a SQL query, generates embeddings, and performs a semantic search — all in one program.
Full example
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::config::JammiConfig;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = JammiConfig::load(None)?;
let session = Arc::new(InferenceSession::new(config).await?);
// 1. Register a data source
session.add_source("patents", SourceType::File, SourceConnection {
url: Some("file:///path/to/patents.parquet".into()),
format: Some(FileFormat::Parquet),
..Default::default()
}).await?;
// 2. Query with SQL
let rows = session.sql(
"SELECT id, title, year FROM patents.public.patents WHERE year > 2020 LIMIT 5"
).await?;
for batch in &rows {
println!("{batch:?}");
}
// 3. Generate embeddings
let record = session.generate_text_embeddings(
"patents",
"sentence-transformers/all-MiniLM-L6-v2",
&["title".to_string()],
"id",
).await?;
println!("Embedded {} rows", record.row_count);
// 4. Semantic search
let query = session.encode_text_query(
"sentence-transformers/all-MiniLM-L6-v2",
"quantum computing applications",
).await?;
let results = session.search("patents", query, 5).await?
.sort("similarity", true)?
.run().await?;
for batch in &results {
println!("{batch:?}");
}
Ok(())
}
The first run downloads the model from HuggingFace Hub (~90MB). Subsequent runs load from cache.
What’s happening
JammiConfig::load(None)loads config fromjammi.toml,$JAMMI_CONFIG, or defaultsInferenceSessionwraps the query engine with model loading, caching, and GPU schedulingadd_sourceregisters a file in the catalog — it survives session restartssqlruns any SQL query via DataFusion, returnsVec<RecordBatch>generate_text_embeddingsruns the model over every row, persists vectors to Parquet with a sidecar ANN indexencode_text_queryencodes a text string into the same vector spacesearchfinds the nearest neighbors, hydrates all source columns, and returns results with similarity scores
Next steps
- Query Your Data with SQL — SQL features, joins, aggregations
- Generate Embeddings — persistence, multiple models, crash recovery
- Semantic Search — SearchBuilder API, filtering, evidence provenance
Quickstart: Python
The full quickstart — install, connect, register, search — lives in the
repo’s cookbook tree under
cookbook/quickstart/
with a runnable
quickstart.py
that’s exercised end-to-end on every PR by tests/cookbook_smoke.py.
This page mirrors the cookbook’s overview so the mdBook site renders a
self-contained quickstart; the cookbook is the source of truth.
Goal: a fresh user goes from pip install jammi-ai to a successful vector
query in five minutes. The end-to-end script lives next to this file in
quickstart.py — copy-paste it, run it, then read the
four step-by-step pages for the explanation.
Steps
- Install —
pip install jammi-ai - Connect — open a session against a local artifact dir
- Register a source — attach a Parquet file
- Generate embeddings + search — build a vector index and run a similarity query
Run it
python cookbook/quickstart/quickstart.py
Expected output: a header row and three top-3 matches with cosine similarity scores. The script exits 0 in under 30 seconds on CPU.
Production substitution
The script uses the local cookbook/fixtures/tiny_bert/ model (32-dim, 88 KB,
single-layer) so the example needs no network access. In a real workload
you would swap in a Hub model — for example
sentence-transformers/all-MiniLM-L6-v2 (384-dim, English) — by changing
the MODEL constant. Everything else stays the same.
Quickstart: CLI
The jammi CLI lets you manage sources, run queries, and start the server from your terminal.
Register a source and query it
# Register a local Parquet file
jammi sources add patents --path /path/to/patents.parquet --format parquet
# List registered sources
jammi sources list
# Run a SQL query
jammi query "SELECT id, title, year FROM patents.public.patents WHERE year > 2020 LIMIT 5"
# Show the execution plan
jammi explain "SELECT * FROM patents.public.patents WHERE year > 2020"
Start the server
# Start Flight SQL (port 8081) + health probe (port 8080)
jammi serve
# With a custom config
jammi --config jammi.toml serve
Once the server is running, any Arrow Flight SQL client can connect on port 8081 to query your data.
Available commands
| Command | Description |
|---|---|
jammi sources list | List registered data sources |
jammi sources add <NAME> --path <PATH> --format <FMT> | Register a local file |
jammi models list | List registered models |
jammi query "<SQL>" | Run a SQL query and print results |
jammi explain "<SQL>" | Show the execution plan for a query |
jammi serve | Start the Flight SQL server + health probe |
Global options
jammi --config <PATH> <command> # Use a specific config file
Without --config, Jammi looks for configuration in this order:
$JAMMI_CONFIGenvironment variable./jammi.tomlin the current directory~/.config/jammi/config.toml- Built-in defaults
Next steps
- Deploy as a Server — Flight SQL server, configuration, preloading models
- Configuration — full config reference
Cookbook Recipes (runnable)
Every recipe under cookbook/
ships as a runnable example.py next to a markdown README and is wired
into CI via tests/cookbook_smoke.py — a broken recipe blocks the
merge. The cookbook is the OSS source of truth; this page mirrors each
README below.
The recipes shipped at MVP:
| Recipe | Demonstrates |
|---|---|
mutable_tables | Create/insert/select/drop on a mutable companion table |
trigger_streams | Publish + subscribe on a topic via the in-process broker |
eval_embeddings | recall@k, MRR, nDCG against a golden set |
eval_inference | Accuracy + macro F1 against gold labels |
fine_tune | LoRA fine-tune end-to-end |
flight_sql | Query a remote jammi serve over Arrow Flight SQL |
Mutable tables
End-to-end create / insert / select / drop on a Jammi mutable table — the OSS primitive for state that needs to live alongside read-only result tables.
When to use this pattern. You need a writable table that sits in the same SQL catalog as your registered sources and embedding tables — for caching enriched rows, holding cursor state, recording user feedback, or any “small table I want to UPDATE / DELETE / INSERT from SQL” workload — without standing up an external Postgres.
What example.py does
- Connects to a temporary artifact dir
- Creates a
notesmutable table with anint64primary key +utf8body column - Inserts three rows through DataFusion DML (
INSERT INTO ...) - Verifies count and ordering via
SELECT - Drops the table, then asserts a
SELECTafter the drop raises - Demonstrates the idempotent
drop_mutable_table(..., if_exists=True)
API surface exercised
Database.create_mutable_table(name, *, schema, primary_key, ...)Database.sql("INSERT INTO mutable.public.<name> ...")Database.sql("SELECT ... FROM mutable.public.<name>")Database.drop_mutable_table(name, *, if_exists=False)
The DataFusion namespace for mutable tables is always
mutable.public.<name> — distinct from registered sources, which live
under <source>.public.<source>.
Run it
python cookbook/recipes/mutable_tables/example.py
Exits 0 on success, prints mutable_tables: OK on the last line.
Trigger streams
End-to-end publish + subscribe on a Jammi topic, plus the registration and listing surface. Uses the embedded in-process broker — no NATS or external broker needed.
When to use this pattern. You need a low-friction event bus inside your application — for fan-out to downstream consumers, fan-in from batch jobs, or replay-from-offset semantics — without bringing up Kafka or NATS in dev/test. The same surface scales out to NATS JetStream by flipping a config flag at deploy time.
What example.py does
- Connects to a temporary artifact dir
- Registers a topic
events.demowith a typed schema and broker metadata - Confirms
list_topics()returns the new topic - Publishes a 3-row batch through
publish_topic— captures the broker-assigned offset - Subscribes from
from_offset=0and round-trips the same rows back - Drops the topic, confirms it’s gone from
list_topics() - Demonstrates idempotent
drop_topic(..., if_exists=True)and strict-mode failure when dropping a missing topic
API surface exercised
Database.register_topic(name, *, schema, broker_metadata=None)Database.list_topics()Database.publish_topic(name, *, batch)— returns the assigned offsetDatabase.subscribe_collect(name, *, from_offset, max_batches)Database.drop_topic(name, *, if_exists=False)
The subscribe_collect path drives the replay-from-backing-table flow
when from_offset=0; the live-tail flow is exercised in the broker
integration suite.
Run it
python cookbook/recipes/trigger_streams/example.py
Exits 0 on success, prints trigger_streams: OK on the last line.
Evaluate retrieval quality
Measure recall@k, precision@k, MRR, and nDCG of an embedding index against a golden relevance set.
When to use this pattern. You have a corpus and a small set of (query, expected document) judgments, and you need a number that tells you “is my new encoder better than the one I shipped last month?” The same loop powers nightly regression dashboards and A/B model comparison.
What example.py does
- Connects to a temporary artifact dir
- Registers the tiny corpus as a Parquet source
- Builds 32-dim embeddings over the
contentcolumn with the localtiny_bertfixture - Reads
cookbook/fixtures/tiny_golden.json, expands it into the(query_id, query_text, relevant_id)CSV shapeeval_embeddingsconsumes, and registers it as agoldensource - Calls
db.eval_embeddings(source="corpus", golden_source="golden.public.golden", k=5) - Asserts each aggregate metric is in
[0.0, 1.0]and the per-query records carry their golden-setquery_id
API surface exercised
Database.generate_text_embeddings(source, *, model, columns, key)Database.eval_embeddings(*, source, golden_source, model=None, k=10)
The returned dict carries aggregate (mean across queries — recall_at_k,
precision_at_k, mrr, ndcg) and per_query (one entry per query with
query_id and a metrics sub-dict of the same four names, un-averaged).
Golden source shape
eval_embeddings requires a registered source with these columns:
| column | type | example |
|---|---|---|
query_id | utf8 | q1 |
query_text | utf8 | quantum computing applications |
relevant_id | utf8 | 1 (matches corpus.id as a string) |
Image queries are supported via a query_image BLOB column instead of
query_text; cross-modal eval is out of scope for this recipe.
Run it
python cookbook/recipes/eval_embeddings/example.py
Exits 0 on success, prints the metrics dict + eval_embeddings: OK.
Evaluate inference (classification)
Run a classifier over a registered source and score its predictions against gold labels.
When to use this pattern. You have a labelled holdout set and you want a single number — accuracy, macro F1, per-class F1 — to compare two classifiers, or to track drift over time on the same classifier.
What example.py does
- Connects to a temporary artifact dir
- Registers the tiny corpus as
corpus(parquet) - Registers
tiny_labels.csvasgolden(csv) —(id, label)rows - Runs
db.eval_inferencewith the localtiny_modernbert_classifierfixture against thecontentcolumn - Prints the returned aggregate
accuracy, macrof1, per-class metrics, and the count of per-record predictions - Asserts every reported rate is in
[0.0, 1.0]
API surface exercised
Database.eval_inference(*, model, source, columns, task, golden_source, label_column)
The returned dict carries aggregate (tagged by "task" — currently
"classification") with accuracy, f1, and per_class, plus
per_record (one entry per aligned {record_id, predicted, gold}).
The task argument is the string form of the inference task —
"classification" here. NER is recognized but not yet supported via this
entrypoint (see the runner’s EvalTask::Ner branch); for token-level
eval, call the jammi-numerics NER kernels directly.
Golden source shape
eval_inference requires a registered source with these columns:
| column | type | example |
|---|---|---|
id | utf8 | "1" |
<label_column> | utf8 | physics |
label_column is the kwarg you pass at call time — label in this
recipe. Every id in the golden source must resolve to a row in the
input source; rows without a gold label are silently dropped from the
metric.
Run it
python cookbook/recipes/eval_inference/example.py
Exits 0 on success, prints the metrics dict + eval_inference: OK.
Fine-tune an encoder
Run a LoRA fine-tune on top of an existing text encoder, poll the job to completion, and use the resulting checkpoint to encode a query.
When to use this pattern. Your domain (legal contracts, medical abstracts, patent claims, internal product docs) doesn’t match the distribution the base encoder was trained on, and you have a few hundred to a few thousand labelled or contrastive pairs. LoRA gets you ~80% of the lift of a full fine-tune at a fraction of the cost; the resulting adapter is small enough to ship as an attachment to the base model rather than a re-distributed full checkpoint.
What example.py does
- Connects to a temporary artifact dir
- Registers
tiny_pairs.csv(30 contrastive pairs) astraining - Calls
db.fine_tune(...)with the localtiny_bertbase, a small LoRA rank, and one epoch — kept fast for CI - Waits for terminal status via
job.wait() - Asserts the resulting
model_idstarts withjammi:fine-tuned: - Encodes a query through the fine-tuned model to confirm it loads
API surface exercised
Database.fine_tune(*, source, base_model, columns, method, task=..., ...)FineTuneJob.wait()FineTuneJob.job_id,FineTuneJob.model_idDatabase.encode_text_query(model_id, text)
The full keyword list on fine_tune covers LoRA rank/alpha/dropout,
learning rate, epochs, batch size, max sequence length, validation
fraction, early-stopping patience/metric, warmup, gradient accumulation,
backbone dtype, weight decay, and gradient clipping — the recipe uses
the defaults for everything except rank and epochs.
Performance note
This recipe is excluded from the per-PR smoke matrix because even at one
epoch it runs ~30 seconds on CPU. The nightly cron with
JAMMI_COOKBOOK_SLOW=1 includes it. Override the gate locally:
JAMMI_COOKBOOK_SLOW=1 python tests/cookbook_smoke.py
Run it
python cookbook/recipes/fine_tune/example.py
Exits 0 on success, prints job_id, model_id, and fine_tune: OK.
Connect via Flight SQL
Run a query against a remote jammi server over Arrow Flight SQL.
When to use this pattern. You’re connecting from a non-Python client
(Tableau, dbt, JDBC tools, Rust binaries), or you want to expose Jammi
to multiple readers without each one holding an embedded session. The
same protocol is what dbt-flightsql, the official Flight SQL JDBC
driver, and BI tools speak natively.
What example.py does
- Spawns
target/release/jammi serveas a child process pointed at a tempartifact_dir - Polls the health endpoint (
http://127.0.0.1:8080/health) until the server is ready (5 s budget) - Opens a
pyarrow.flight.FlightClientagainstgrpc://127.0.0.1:8081 - Submits
SELECT 1 AS oneover Flight SQL and confirms the response - Tears down the server process cleanly
This recipe is gated out of the per-PR CI matrix — it depends on the
jammi binary being built (cargo build --release -p jammi-cli), and
the build cost dominates the test wall-clock. The nightly cookbook job
builds the binary and runs the recipe behind JAMMI_COOKBOOK_SLOW=1.
Prerequisites
cargo build --release -p jammi-cli— producestarget/release/jammipip install pyarrow(already ajammi-aidependency)
The script auto-detects JAMMI_BIN (env var) or falls back to the
workspace’s target/release/jammi.
API surface exercised
pyarrow.flight.FlightClient.execute(query)over the Flight SQL command dialectjammi serve— the OSS deployment-shape binary entrypoint
Run it
cargo build --release -p jammi-cli # one-time build
python cookbook/recipes/flight_sql/example.py
Exits 0 on success, prints the query result + flight_sql: OK.
Query Your Data with SQL
Register data files as named sources, then query them with full SQL. Sources are persisted in the catalog and survive session restarts.
Register a source
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
session.add_source("patents", SourceType::File, SourceConnection {
url: Some("file:///data/patents.parquet".into()),
format: Some(FileFormat::Parquet),
..Default::default()
}).await?;
Ok(()) }
}
Python
db.add_source("patents", path="/data/patents.parquet", format="parquet")
CLI
jammi sources add patents --path /data/patents.parquet --format parquet
Supported formats
| Format | Rust | Python/CLI | Notes |
|---|---|---|---|
| Parquet | FileFormat::Parquet | "parquet" | Columnar, compressed, recommended for large datasets |
| CSV | FileFormat::Csv | "csv" | Auto-detected schema |
| JSON | FileFormat::Json | "json" | Line-delimited JSON |
Run a SQL query
Sources are accessible via three-part SQL names: <source_id>.public.<table_name>. The table name is derived from the file name (e.g., patents.parquet becomes patents).
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let results = session.sql(
"SELECT id, title, year FROM patents.public.patents WHERE year > 2020 ORDER BY year"
).await?;
for batch in &results {
println!("{batch:?}");
}
Ok(()) }
}
Python
table = db.sql("SELECT id, title, year FROM patents.public.patents WHERE year > 2020 ORDER BY year")
print(table.to_pandas())
CLI
jammi query "SELECT id, title, year FROM patents.public.patents WHERE year > 2020 ORDER BY year"
Aggregations
SELECT category, COUNT(*) as count, AVG(citation_count) as avg_citations
FROM patents.public.patents
WHERE year > 2020
GROUP BY category
ORDER BY count DESC
Joins across sources
Register multiple sources and join them in a single query:
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.add_source("companies", SourceType::File, SourceConnection {
url: Some("file:///data/companies.csv".into()),
format: Some(FileFormat::Csv),
..Default::default()
}).await?;
let results = session.sql("
SELECT p.title, c.company_name
FROM patents.public.patents p
JOIN companies.public.companies c ON p.assignee_id = c.id
").await?;
Ok(()) }
}
Python
db.add_source("companies", path="/data/companies.csv", format="csv")
table = db.sql("""
SELECT p.title, c.company_name
FROM patents.public.patents p
JOIN companies.public.companies c ON p.assignee_id = c.id
""")
Source lifecycle
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
// List registered sources
let sources = session.catalog().list_sources().await?;
// Remove a source
session.remove_source("patents").await?;
Ok(()) }
}
CLI
jammi sources list
Sources persist in the SQLite catalog at <artifact_dir>/catalog.db. Registering the same source ID twice returns an error — remove it first.
Execution plans
Use EXPLAIN (or the CLI explain command) to see how DataFusion will execute your query:
jammi explain "SELECT * FROM patents.public.patents WHERE year > 2020"
Generate Embeddings
Generate vector embeddings by running a model over text columns from a registered source. Results are persisted to Parquet with sidecar ANN indexes for fast similarity search.
Basic usage
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let record = session.generate_text_embeddings(
"patents",
"sentence-transformers/all-MiniLM-L6-v2",
&["abstract".to_string()],
"id",
).await?;
println!("Embedded {} rows, {} dimensions", record.row_count, record.dimensions.unwrap());
Ok(()) }
}
Python
db.generate_text_embeddings(
source="patents",
model="sentence-transformers/all-MiniLM-L6-v2",
columns=["abstract"],
key="id",
)
What gets created
Each call creates a timestamped Parquet file plus a sidecar ANN index bundle:
{artifact_dir}/jammi_db/
├── patents__embedding__all-MiniLM-L6-v2__20260325T120000.parquet
├── patents__embedding__all-MiniLM-L6-v2__20260325T120000.usearch
├── patents__embedding__all-MiniLM-L6-v2__20260325T120000.rowmap
└── patents__embedding__all-MiniLM-L6-v2__20260325T120000.manifest.json
- Parquet file — source of truth. Contains
_row_id,_source_id,_model_id,vector. Readable by external tools (DuckDB, Polars, pandas). .usearch— USearch HNSW graph for ANN search..rowmap— maps internal USearch keys to_row_idstrings..manifest.json— metadata (dimensions, count, metric, backend).
The sidecar files are disposable — deleting them falls back to brute-force exact search. The Parquet file is the only thing that matters.
Embedding table schema
| Column | Type | Description |
|---|---|---|
_row_id | Utf8 | Key column value cast to string |
_source_id | Utf8 | Source identifier |
_model_id | Utf8 | Model identifier |
vector | FixedSizeList(Float32, N) | L2-normalized embedding vector |
Failed rows (null or empty text) are excluded — only successfully embedded rows appear in the output.
Multiple text columns
Pass multiple column names to concatenate them (space-separated) before embedding:
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.generate_text_embeddings(
"papers",
"sentence-transformers/all-MiniLM-L6-v2",
&["title".to_string(), "abstract".to_string()],
"doi",
).await?;
Ok(()) }
}
Python
db.generate_text_embeddings(
source="papers",
model="sentence-transformers/all-MiniLM-L6-v2",
columns=["title", "abstract"],
key="doi",
)
Multiple embedding tables
Each call creates a new table. Multiple tables can coexist for the same source (different models, different columns):
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.generate_text_embeddings("patents", "all-MiniLM-L6-v2", &["abstract".into()], "id").await?;
session.generate_text_embeddings("patents", "bge-small-en-v1.5", &["title".into()], "id").await?;
Ok(()) }
}
When searching, the latest ready embedding table is used by default.
Supported models
Any encoder model on HuggingFace Hub with safetensors weights. Supported architectures:
BERT family — BERT, RoBERTa, DistilBERT, CamemBERT, XLM-RoBERTa:
sentence-transformers/all-MiniLM-L6-v2(384-dim, fast)sentence-transformers/all-mpnet-base-v2(768-dim, higher quality)BAAI/bge-small-en-v1.5,BAAI/bge-base-en-v1.5
ModernBERT — modernized encoder with rotary embeddings, 8192-token context, GeGLU:
answerdotai/ModernBERT-base(768-dim)answerdotai/ModernBERT-large(1024-dim)
Or any local directory with config.json + model.safetensors + tokenizer.json. The architecture is detected automatically from model_type in config.json.
Use a local model:
#![allow(unused)]
fn main() {
extern crate jammi_ai;
use jammi_ai::model::ModelSource;
let model = ModelSource::local("/path/to/my-model");
}
Raw inference (no persistence)
To get embeddings as RecordBatch without writing to disk:
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::model::{ModelSource, ModelTask};
let model = ModelSource::hf("sentence-transformers/all-MiniLM-L6-v2");
let results = session.infer("patents", &model, ModelTask::TextEmbedding, &["abstract".into()], "id").await?;
Ok(()) }
}
Python
results = db.infer(
source="patents",
model="sentence-transformers/all-MiniLM-L6-v2",
columns=["abstract"],
task="text_embedding",
key="id",
)
Each RecordBatch has prefix columns (_row_id, _source, _model, _status, _error, _latency_ms) plus task-specific columns (e.g., vector for embeddings).
Error handling
Inference never panics on bad input. Errors are tracked per-row:
| Condition | _status | _error | vector |
|---|---|---|---|
| Valid text | "ok" | null | 384-dim float vector |
| Null text | "error" | "Empty or null text input" | null |
| Empty text | "error" | "Empty or null text input" | null |
The batch continues processing even when individual rows fail.
Dynamic batch sizing
The runner starts with the configured inference.batch_size (default: 32). If an out-of-memory error occurs:
- Halve the batch size
- Retry (up to 3 times)
- If OOM persists at batch size 1, mark the row as error and continue
The reduced batch size is sticky for the remainder of the stream.
Crash recovery
If the process dies mid-generation, the table is left in “building” status. On the next session start, recovery runs automatically:
- Parquet missing — mark as failed
- Parquet corrupt — delete file, mark as failed
- Parquet valid but stuck in “building” — promote to “ready”, rebuild ANN index
No data is lost if the Parquet file was fully written.
DataFusion integration
Result tables are automatically registered in DataFusion and queryable via SQL:
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::catalog::result_repo::ResultTableRecord;
async fn ex(session: &InferenceSession, record: &ResultTableRecord) -> jammi_db::error::Result<()> {
let results = session.sql(&format!(
"SELECT _row_id, _source_id FROM \"jammi.{}\" LIMIT 10",
record.table_name
)).await?;
Ok(()) }
}
Generate Image Embeddings
Generate vector embeddings from images using an OpenCLIP-compatible vision model. Results are persisted to Parquet with sidecar ANN indexes, identical to text embeddings — the same search(), evaluation, and SQL tools work on both.
The OpenCLIP family is cross-modal: the vision tower and the text tower in the same checkpoint produce embeddings in a shared latent space, so a text query encoded with the same model can search image embeddings directly. See Search Text Against Images (Cross-Modal) for the full text-to-image recipe.
Basic usage
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let record = session.generate_image_embeddings(
"figures",
"laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
"image", // column containing image data
"figure_id", // key column
).await?;
println!("Embedded {} images, {} dimensions", record.row_count, record.dimensions.unwrap());
Ok(()) }
}
Python
db.generate_image_embeddings(
source="figures",
model="laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
image_column="image",
key="figure_id",
)
Image column format
The image column can be either:
- Binary — inline image bytes (PNG, JPEG, TIFF) stored directly in Parquet
- Utf8 — file paths pointing to images on disk
Image preprocessing
Each image is automatically preprocessed before embedding:
- Pad to square — white canvas, image centered (preserves aspect ratio)
- Resize — bicubic interpolation to the model’s input size (224x224 for CLIP)
- Normalize — per-channel normalization using constants from the model’s config
Preprocessing parameters (mean, std, image size) are model-driven — parsed from the model’s config file, not hardcoded.
Encode a single image
To embed one image without persistence (e.g., for a query):
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> Result<(), Box<dyn std::error::Error>> {
let image_bytes = std::fs::read("query.png")?;
let vector = session
.encode_image_query("laion/CLIP-ViT-B-32-laion2B-s34B-b79K", &image_bytes)
.await?;
// vector: Vec<f32>, L2-normalized, dimensionality = model's embed_dim
Ok(()) }
}
Python
with open("query.png", "rb") as f:
image_bytes = f.read()
vector = db.encode_image_query("laion/CLIP-ViT-B-32-laion2B-s34B-b79K", image_bytes)
Supported models
OpenCLIP-compatible models with safetensors weights. The repo must carry:
open_clip_config.jsonwithmodel_cfg.vision_cfg(andmodel_cfg.text_cfgif you want cross-modal text queries)open_clip_model.safetensorswith OpenCLIP weight key naming (visual.*for vision, root-level for text)- Either a
tokenizer.jsonor the OpenCLIP-nativebpe_simple_vocab_16e6.txt.gz(only required for text-side queries)
The architecture (ViT width, layers, heads, patch size, pooling strategy), the shared latent dimensionality (embed_dim), and the preprocessing config (mean, std, image size) are detected automatically from the config — no per-model code path.
Output schema
Same as text embeddings:
| Column | Type | Description |
|---|---|---|
_row_id | Utf8 | Key value |
_source_id | Utf8 | Source identifier |
_model_id | Utf8 | Model identifier |
vector | FixedSizeList(Float32, N) | L2-normalized embedding vector (N = embed_dim) |
Search
Image embeddings work with the same search() API as text embeddings:
vector = db.encode_image_query("laion/CLIP-ViT-B-32-laion2B-s34B-b79K", query_bytes)
results = db.search("figures", query=vector, k=10).run()
All SearchBuilder operations (join, filter, sort, limit, annotate) compose identically.
Error handling
| Condition | _status | _error |
|---|---|---|
| Valid image | "ok" | null |
| Null image | "error" | "Null or missing image input" |
| Corrupt image | "error" | "Failed to decode image at row N: ..." |
Search Text Against Images (Cross-Modal)
OpenCLIP-family models carry both a vision tower and a text tower in the same checkpoint, with both towers projecting into a shared latent space. That means a text query embedded with the text tower lives in the same vector space as image embeddings produced by the vision tower — vector search against an image corpus accepts a text query directly, no separate text encoder, no projection bridge.
This recipe shows the full path: index images with the vision tower, embed a text query with the text tower, run search().
1. Index the image corpus with the vision tower
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.generate_image_embeddings(
"figures",
"laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
"image", // column containing image data
"figure_id", // key column
).await?;
Ok(()) }
}
Python
db.generate_image_embeddings(
source="figures",
model="laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
image_column="image",
key="figure_id",
)
2. Embed a text query with the same model’s text tower
encode_text_query dispatches to the OpenCLIP text tower when the model ID resolves to an OpenCLIP checkpoint. The output vector dimensionality matches embed_dim — the same dim the image embeddings carry.
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let query_vec = session
.encode_text_query(
"laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
"a red circle on a white background",
)
.await?;
// query_vec: Vec<f32>, L2-normalized, same length as the image embedding vector
Ok(()) }
}
Python
query_vec = db.encode_text_query(
"laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
"a red circle on a white background",
)
3. Search image embeddings with the text vector
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
async fn ex(session: Arc<InferenceSession>, query_vec: Vec<f32>) -> jammi_db::error::Result<()> {
let results = session.search("figures", query_vec, 10).await?.run().await?;
Ok(()) }
}
Python
results = db.search("figures", query=query_vec, k=10).run()
The hydrated results carry your source’s columns (figure_id, plus any joined / annotated columns) alongside the similarity score. All SearchBuilder operations — join, filter, sort, limit, annotate — compose identically to text-against-text search.
Why this works
Both towers in an OpenCLIP checkpoint emit vectors of size embed_dim (the shared latent dimensionality declared at the top of open_clip_config.json). The vision tower applies a visual.proj matrix after pooling its patch tokens; the text tower applies a text_projection matrix after pooling at the <|endoftext|> token. The two projections are jointly trained so the cosine similarity between a text vector and an image vector reflects semantic alignment.
If you embed text and images with separate models (e.g. a BERT encoder + a vision model that wasn’t jointly trained with it), the resulting vectors don’t share a latent space and the similarities are meaningless. Cross-modal search only works when both modalities are projected by the same CLIP-style joint training.
Model requirements
Same as Generate Image Embeddings, plus:
open_clip_config.jsonmust contain a populatedmodel_cfg.text_cfg(withwidth,layers, and eitherheadsor awidththat is a multiple of 64).- The safetensors checkpoint must contain the text-tower keys:
token_embedding.weight,positional_embedding,transformer.resblocks.*,ln_final.*, andtext_projection. - A tokenizer must be available — either an HF-converted
tokenizer.jsonor the OpenCLIP-nativebpe_simple_vocab_16e6.txt.gz.
Classify Text
Run a classification model over text columns to assign labels and confidence scores. Any HuggingFace model with id2label in its config works out of the box.
Basic usage
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::model::{ModelSource, ModelTask};
let model = ModelSource::hf("answerdotai/ModernBERT-base-classification");
let results = session.infer(
"patents",
&model,
ModelTask::Classification,
&["abstract".to_string()],
"id",
).await?;
Ok(()) }
}
Python
results = db.infer(
source="patents",
model="answerdotai/ModernBERT-base-classification",
columns=["abstract"],
task="classification",
key="id",
)
Output schema
Each RecordBatch has prefix columns plus classification-specific columns:
| Column | Type | Description |
|---|---|---|
_row_id | Utf8 | Key column value |
_source | Utf8 | Source identifier |
_model | Utf8 | Model identifier |
_status | Utf8 | "ok" or "error" |
_error | Utf8 (nullable) | Error message if failed |
_latency_ms | Float32 | Inference latency |
label | Utf8 (nullable) | Predicted class label |
confidence | Float32 (nullable) | Confidence score (0-1) |
all_scores_json | Utf8 (nullable) | JSON with all class scores |
Supported model architectures
Classification models must have id2label in their config.json. Supported architectures:
BERT family — BERT, RoBERTa, DistilBERT, CamemBERT, XLM-RoBERTa:
- Loads
classifier.weight+classifier.biasfrom safetensors - CLS token pooling + linear classifier + softmax
ModernBERT — uses the built-in ModernBertForSequenceClassification:
- CLS or MEAN pooling (configured via
classifier_poolingin config) - Head (dense + GELU + LayerNorm) + classifier + softmax
Fine-tuning for classification
Train a LoRA adapter with a classification head on your labeled data:
Prepare training data
text,label
"quantum error correction","physics"
"CRISPR gene editing","biology"
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::fine_tune::FineTuneMethod;
use jammi_db::ModelTask;
let job = session.fine_tune(
"training",
"sentence-transformers/all-MiniLM-L6-v2",
&["text".into(), "label".into()],
FineTuneMethod::Lora,
ModelTask::Classification,
None,
).await?;
job.wait().await?;
Ok(()) }
}
Python
job = db.fine_tune(
source="training",
base_model="sentence-transformers/all-MiniLM-L6-v2",
columns=["text", "label"],
method="lora",
task="classification",
)
job.wait()
The fine-tuned model trains a LoRA projection plus a linear classification head using cross-entropy loss. Both are saved to adapter.safetensors.
Error handling
Same per-row error tracking as embeddings:
| Condition | _status | label | confidence |
|---|---|---|---|
| Valid text | "ok" | Predicted label | 0-1 score |
| Null/empty text | "error" | null | null |
Extract Entities (NER)
Run a Named Entity Recognition model over text columns to extract person names, organizations, locations, and other entities. Results are returned as JSON arrays of entity spans with character positions and confidence scores.
Basic usage
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::model::{ModelSource, ModelTask};
let model = ModelSource::hf("dslim/bert-base-NER");
let results = session.infer(
"patents",
&model,
ModelTask::Ner,
&["abstract".to_string()],
"id",
).await?;
Ok(()) }
}
Python
results = db.infer(
source="patents",
model="dslim/bert-base-NER",
columns=["abstract"],
task="ner",
key="id",
)
Output schema
| Column | Type | Description |
|---|---|---|
_row_id | Utf8 | Key column value |
_source | Utf8 | Source identifier |
_model | Utf8 | Model identifier |
_status | Utf8 | "ok" or "error" |
_error | Utf8 (nullable) | Error message if failed |
_latency_ms | Float32 | Inference latency |
entities | Utf8 (nullable) | JSON array of entity spans |
Entity span format
Each entity in the JSON array has:
{
"text": "Google",
"label": "ORG",
"start": 15,
"end": 21,
"confidence": 0.97
}
| Field | Type | Description |
|---|---|---|
text | string | The entity text extracted from the input |
label | string | Entity type (PER, ORG, LOC, etc.) without B-/I- prefix |
start | integer | Character start position (inclusive) |
end | integer | Character end position (exclusive) |
confidence | float | Average softmax confidence across entity tokens |
Supported models
NER models must have id2label with BIO-tagged labels (e.g., B-PER, I-PER, O) in their config.json.
BERT family — loads classifier.weight + classifier.bias on top of the encoder:
dslim/bert-base-NER(English, 4 entity types)dbmdz/bert-large-cased-finetuned-conll03-english
ModernBERT — same pattern, modern encoder architecture.
How it works
text → tokenize (with character offsets)
→ encoder forward → hidden states [batch, seq_len, hidden]
→ Linear(hidden, num_labels) per token → logits
→ softmax → argmax → BIO tag per token
→ merge consecutive B-/I- tags into entity spans
→ map character offsets back to original text
The BIO decoding handles:
- B-TYPE: starts a new entity of that type
- I-TYPE: continues the current entity (must match type)
- O: outside any entity
- Special tokens ([CLS], [SEP], padding) are automatically skipped
Semantic Search
Perform ANN vector similarity search over embedding tables. Results include all original source columns, similarity scores, and evidence provenance.
Basic search
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::config::JammiConfig;
async fn ex(config: JammiConfig) -> jammi_db::error::Result<()> {
use std::sync::Arc;
let session = Arc::new(InferenceSession::new(config).await?);
// Encode a query
let query = session.encode_text_query(
"sentence-transformers/all-MiniLM-L6-v2",
"quantum computing applications",
).await?;
// Search — returns top 10 results
let results = session.search("patents", query, 10).await?
.run().await?;
Ok(()) }
}
Python
query_vec = db.encode_text_query("sentence-transformers/all-MiniLM-L6-v2", "quantum computing applications")
search = db.search("patents", query=query_vec, k=10)
results = search.run()
print(results.to_pandas())
What search returns
Results are RecordBatch / pyarrow.Table with:
- All original source columns (e.g.,
id,title,abstract,year) _row_id— the source key_source_id— which source the row came fromsimilarity— cosine similarity score (1.0 = identical, 0.0 = orthogonal)retrieved_by—List<Utf8>provenance: which channels found this rowannotated_by—List<Utf8>provenance: which channels added evidence post-retrieval
SearchBuilder
The SearchBuilder composes query operations. Each method adds a node to a DataFusion execution plan — no data is processed until .run().
Filter
Apply a SQL WHERE clause to hydrated columns:
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &std::sync::Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
session.search("patents", query, 20).await?
.filter("year > 2020")?
.run().await?;
Ok(()) }
}
Python
search = db.search("patents", query=query_vec, k=20)
search.filter("year > 2020")
results = search.run()
Sort
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &std::sync::Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
session.search("patents", query, 20).await?
.sort("similarity", true)? // descending
.run().await?;
Ok(()) }
}
Python
search = db.search("patents", query=query_vec, k=20)
search.sort("similarity", descending=True)
results = search.run()
Limit
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &std::sync::Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
session.search("patents", query, 20).await?
.sort("similarity", true)?
.limit(5)
.run().await?;
Ok(()) }
}
Python
search = db.search("patents", query=query_vec, k=20)
search.sort("similarity", descending=True)
search.limit(5)
results = search.run()
Select
Project specific columns. Note that retrieved_by and annotated_by evidence columns are always appended to the output regardless of your selection:
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &std::sync::Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
session.search("patents", query, 10).await?
.select(&["_row_id".into(), "title".into(), "similarity".into()])?
.run().await?;
Ok(()) }
}
Python
search = db.search("patents", query=query_vec, k=10)
search.select(["_row_id", "title", "similarity"])
results = search.run()
Chaining everything
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &std::sync::Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
let results = session.search("patents", query, 100).await?
.filter("year > 2020")?
.sort("similarity", true)?
.limit(10)
.select(&["title".into(), "similarity".into()])?
.run().await?;
Ok(()) }
}
Python
search = db.search("patents", query=query_vec, k=100)
search.filter("year > 2020")
search.sort("similarity", descending=True)
search.limit(10)
search.select(["title", "similarity"])
results = search.run()
ANN vs exact search
Search automatically selects the best path:
- ANN (fast) — when sidecar index files (
.usearch+.rowmap+.manifest.json) exist and load successfully - Exact (brute-force) — fallback when sidecar files are missing or corrupt
The caller never knows the difference. Deleting sidecar files degrades performance but not correctness.
Embedding table resolution
When multiple embedding tables exist for a source, search uses the most recently created “ready” table. The resolution order:
- Explicit table name (if provided)
- Latest ready embedding table for the source (by
created_at) - Error if no embedding table exists
Enrich Results with Joins and Annotations
Search results can be enriched by joining with other data sources and annotating with additional model inference. Every enrichment step is tracked in the evidence provenance columns.
Join with another source
Join search results with a registered source to add context columns (e.g., company name, category labels):
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
async fn ex(session: &Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
session.add_source("assignees", SourceType::File, SourceConnection {
url: Some("file:///data/assignees.csv".into()),
format: Some(FileFormat::Csv),
..Default::default()
}).await?;
let results = session.search("patents", query, 10).await?
.join("assignees", "assignee_id=id", None).await? // left join by default
.run().await?;
// Results now include company_name, country from assignees
Ok(()) }
}
Python
db.add_source("assignees", path="/data/assignees.csv", format="csv")
search = db.search("patents", query=query_vec, k=10)
search.join("assignees", on="assignee_id=id")
results = search.run()
# Results now include company_name, country from assignees
The on parameter is "left_col=right_col". The optional join type is "inner" or "left" (default).
Annotate with model inference
Run a model over search results to add new columns:
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::ModelTask;
async fn ex(session: &Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
let results = session.search("patents", query, 10).await?
.annotate(
"sentence-transformers/all-MiniLM-L6-v2",
ModelTask::TextEmbedding,
&["abstract".to_string()],
).await?
.run().await?;
Ok(()) }
}
Python
search = db.search("patents", query=query_vec, k=10)
search.annotate(
model="sentence-transformers/all-MiniLM-L6-v2",
task="text_embedding",
columns=["abstract"],
)
results = search.run()
Evidence provenance
Every search result carries provenance tracking that records how each row was found and enriched:
| Scenario | retrieved_by | annotated_by |
|---|---|---|
| Plain search | ["vector"] | [] |
| Search + annotate | ["vector"] | ["inference"] |
These are List<Utf8> columns — each row has its own list of contributing channels.
Composing everything
All operations compose freely:
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::ModelTask;
async fn ex(session: &Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
let results = session.search("patents", query, 100).await?
.join("assignees", "assignee_id=id", None).await?
.annotate("all-MiniLM-L6-v2", ModelTask::TextEmbedding, &["abstract".into()]).await?
.filter("country = 'US'")?
.sort("similarity", true)?
.limit(10)
.select(&["title".into(), "company_name".into(), "similarity".into()])?
.run().await?;
Ok(()) }
}
Python
search = db.search("patents", query=query_vec, k=100)
search.join("assignees", on="assignee_id=id")
search.annotate(model="all-MiniLM-L6-v2", task="text_embedding", columns=["abstract"])
search.filter("country = 'US'")
search.sort("similarity", descending=True)
search.limit(10)
search.select(["title", "company_name", "similarity"])
results = search.run()
The pipeline builds a DataFusion execution plan under the hood. No data is processed until .run() — so adding more steps doesn’t cost anything until execution.
Declare a Custom Provenance Channel
Every row that flows through Jammi carries provenance — retrieved_by and annotated_by lists that record how the row was found and what was added after retrieval. Jammi ships two built-in channels — vector (declares similarity) and inference (declares inference_model, inference_task, inference_confidence) — but the catalog accepts any channel a consumer wants to register. Each channel declares the columns it contributes; the engine merges those columns into every result RecordBatch at query time.
This recipe walks through registering a third channel, scored_by, for a multi-stage retrieval pipeline where a federated reranker rescores the vector hits. The same shape applies to any non-built-in provenance signal: a citation graph, an attribution chain, a quality-grading pass.
Setup
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate arrow;
extern crate tokio;
use jammi_db::config::JammiConfig;
async fn ex(config: JammiConfig) -> jammi_db::error::Result<()> {
use std::sync::Arc;
use arrow::array::{ArrayRef, Float32Array, StringArray};
use jammi_ai::evidence::{merge_channels, ChannelContribution};
use jammi_ai::session::InferenceSession;
use jammi_db::catalog::channel_repo::{ChannelColumn, ChannelColumnType, ChannelSpec};
use jammi_db::ChannelId;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
let session = Arc::new(InferenceSession::new(config).await?);
session.add_source("patents", SourceType::File, SourceConnection {
url: Some("file:///data/patents.parquet".into()),
format: Some(FileFormat::Parquet),
..Default::default()
}).await?;
Ok(()) }
}
Python
import jammi_ai
db = jammi_ai.connect(artifact_dir="/var/lib/jammi")
db.add_source("patents", path="/data/patents.parquet", format="parquet")
Declare the channel
Channel declarations are catalog rows. Each declared column has a name, an Arrow type, and an ordinal. The set is append-only — once ranker: Utf8 is declared, the engine refuses to redeclare it as Int32 or drop it.
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::catalog::channel_repo::{ChannelColumn, ChannelColumnType, ChannelSpec};
use jammi_db::ChannelId;
async fn ex(session: &Arc<InferenceSession>) -> jammi_db::error::Result<()> {
session.catalog().channels().register(&ChannelSpec {
id: ChannelId::new("scored_by")?,
priority: 3,
columns: vec![
ChannelColumn {
name: "ranker".into(),
data_type: ChannelColumnType::Utf8,
},
ChannelColumn {
name: "rank_score".into(),
data_type: ChannelColumnType::Float32,
},
],
}).await?;
Ok(()) }
}
Python
db.register_channel(
"scored_by",
priority=3,
columns=[("ranker", "Utf8"), ("rank_score", "Float32")],
)
priority controls the order columns appear in the merged output — vector (1) and inference (2) come first, then scored_by (3).
To add more columns to an already-registered channel:
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::catalog::channel_repo::{ChannelColumn, ChannelColumnType};
use jammi_db::ChannelId;
async fn ex(session: &Arc<InferenceSession>) -> jammi_db::error::Result<()> {
session.catalog().channels().add_columns(
&ChannelId::new("scored_by")?,
&[ChannelColumn { name: "scored_at".into(), data_type: ChannelColumnType::Utf8 }],
).await?;
Ok(()) }
}
db.add_channel_columns("scored_by", columns=[("scored_at", "Utf8")])
add_columns is append-only by construction. Trying to redeclare ranker with the same or a different type returns JammiError::EvidenceChannel(_).
Use the channel
Build a ChannelContribution for each batch your reranker produces. The arrays must align 1:1 with the channel’s declared columns (ranker first, rank_score second) and have the same length as the batch’s row count.
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate arrow;
extern crate tokio;
use std::sync::Arc;
use arrow::array::{ArrayRef, Float32Array, RecordBatch, StringArray};
use jammi_ai::evidence::{merge_channels, ChannelContribution};
use jammi_ai::session::InferenceSession;
use jammi_db::ChannelId;
fn rerank_scores(_batch: &RecordBatch) -> Vec<f32> { vec![] }
async fn ex(session: &Arc<InferenceSession>, batches: Vec<RecordBatch>) -> jammi_db::error::Result<()> {
let scored_by = ChannelId::new("scored_by")?;
let vector = ChannelId::new("vector")?;
let mut contributions = Vec::with_capacity(batches.len());
for batch in &batches {
let n = batch.num_rows();
let ranker: ArrayRef = Arc::new(StringArray::from(vec!["bm25"; n]));
let rank_score: ArrayRef = Arc::new(Float32Array::from(rerank_scores(batch)));
contributions.push(vec![ChannelContribution {
channel: scored_by.clone(),
columns: vec![ranker, rank_score],
}]);
}
let merged = merge_channels(
session.catalog(),
&batches,
&[vector.clone(), scored_by.clone()],
&[vector, scored_by], // retrieved_by
&[], // annotated_by
&contributions,
).await?;
Ok(()) }
}
Verify
The merged output schema includes the declared columns. Rows where the channel did not supply a value carry NULL.
Rust
#![allow(unused)]
fn main() {
extern crate arrow;
use arrow::array::RecordBatch;
fn ex(merged: Vec<RecordBatch>) {
let schema = merged[0].schema();
assert!(schema.field_with_name("ranker").is_ok());
assert!(schema.field_with_name("rank_score").is_ok());
for batch in &merged {
let ranker = batch.column_by_name("ranker").unwrap();
println!("first ranker: {:?}", ranker);
}
}
}
Python
From the SQL surface, the declared columns show up in any query that touches the result table — Python sees them as plain Arrow columns:
table = db.sql(
"SELECT _row_id, similarity, ranker, rank_score FROM results LIMIT 3"
)
for row in table.to_pylist():
print(row["ranker"], row["rank_score"])
What you cannot do
The channel declaration is append-only. Once scored_by ships with ranker: Utf8, you cannot:
-
Redeclare
rankerasInt32—add_columnsrejects withJammiError::EvidenceChannel("channel 'scored_by': column 'ranker' was declared Utf8, cannot redeclare as Int32"). From Python, the same call raisesRuntimeErrorcarrying the identical message:db.add_channel_columns("scored_by", columns=[("ranker", "Int32")]) # RuntimeError: channel 'scored_by': column 'ranker' was declared Utf8, cannot redeclare as Int32 -
Add a second column with the same name —
add_columnsrejects withJammiError::EvidenceChannel("channel 'scored_by': column 'ranker' already declared"). -
Drop
rankerfrom the channel — there is nodrop_columnmethod by design.
If a column needs to change shape, declare a new column under a new name and migrate consumers. This preserves byte-for-byte readability of any backing table or downstream artifact that already references the original column.
Fine-Tune for Your Domain
Train LoRA adapters on your data to improve embedding quality for your domain. The base model stays frozen — only a small projection layer is trained and saved.
Prepare training data
Create contrastive pairs with a similarity score:
text_a,text_b,score
"quantum error correction","superconducting qubit stabilization",0.88
"quantum error correction","medieval poetry analysis",0.08
High scores mean similar; low scores mean dissimilar.
Register the training data as a source:
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.add_source("training", SourceType::File, SourceConnection {
url: Some("file:///data/training_pairs.csv".into()),
format: Some(FileFormat::Csv),
..Default::default()
}).await?;
Ok(()) }
}
Python
db.add_source("training", path="/data/training_pairs.csv", format="csv")
Start a fine-tuning job
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::fine_tune::FineTuneMethod;
use jammi_db::ModelTask;
let job = session.fine_tune(
"training",
"sentence-transformers/all-MiniLM-L6-v2",
&["text_a".into(), "text_b".into(), "score".into()],
FineTuneMethod::Lora,
ModelTask::TextEmbedding,
None, // default config
).await?;
println!("Job: {}", job.job_id);
job.wait().await?;
println!("Model: {}", job.model_id());
Ok(()) }
}
Python
job = db.fine_tune(
source="training",
base_model="sentence-transformers/all-MiniLM-L6-v2",
columns=["text_a", "text_b", "score"],
method="lora",
task="embedding",
)
job.wait()
print(f"Model: {job.model_id}")
Custom configuration
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_ai::fine_tune::{FineTuneMethod, LrSchedule};
async fn ex(session: &InferenceSession, model: &str, columns: Vec<String>) -> jammi_db::error::Result<()> {
use jammi_ai::fine_tune::FineTuneConfig;
use jammi_db::ModelTask;
let config = FineTuneConfig {
lora_rank: 4,
learning_rate: 5e-4,
epochs: 5,
batch_size: 4,
warmup_steps: 10,
lr_schedule: LrSchedule::CosineDecay,
early_stopping_patience: 2,
validation_fraction: 0.2,
gradient_accumulation_steps: 4, // effective batch = 4 x 4 = 16
..Default::default()
};
let job = session.fine_tune(
"training", model, &columns, FineTuneMethod::Lora, ModelTask::TextEmbedding, Some(config),
).await?;
Ok(()) }
}
Configuration reference
| Field | Default | Description |
|---|---|---|
lora_rank | 8 | Low-rank dimension |
lora_alpha | 16.0 | Scaling factor |
lora_dropout | 0.05 | Dropout probability |
learning_rate | 2e-4 | Base learning rate |
epochs | 3 | Training epochs |
batch_size | 8 | Micro-batch size |
max_seq_length | 512 | Max tokens per text |
gradient_accumulation_steps | 1 | Steps before optimizer update |
validation_fraction | 0.1 | Holdout fraction for early stopping |
early_stopping_patience | 3 | Epochs without improvement before stopping |
warmup_steps | 100 | Linear warmup from 0 to base LR |
lr_schedule | CosineDecay | Decay after warmup: Constant, CosineDecay, LinearDecay |
embedding_loss | auto | CoSent (pairs+scores), Triplet, MultipleNegativesRanking |
Use the fine-tuned model
The fine-tuned model is automatically registered and can be used anywhere a model ID is accepted:
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_ai::fine_tune::job::FineTuneJob;
async fn ex(session: &InferenceSession, job: &FineTuneJob) -> jammi_db::error::Result<()> {
let model_id = job.model_id();
let embedding = session.encode_text_query(model_id, "quantum computing").await?;
session.generate_text_embeddings("patents", model_id, &["abstract".into()], "id").await?;
Ok(()) }
}
Python
model_id = job.model_id
query_vec = db.encode_text_query(model_id, "quantum computing")
db.generate_text_embeddings(source="patents", model=model_id, columns=["abstract"], key="id")
How it works
text -> encoder (frozen) -> base embedding -> LoRA projection (trained) -> output
- The base encoder model (BERT, ModernBERT, etc.) is loaded and frozen
- A LoRA projection layer (identity + low-rank A/B matrices) is added after pooling
- For each batch: text is encoded, projected through LoRA, and loss is computed
- Only the A/B matrices receive gradients
- The adapter is saved as
adapter.safetensorsin the artifact directory
Encoder-adapters fine-tuning (PEFT-style adapter injection)
The default flow above trains a single low-rank projection head sitting outside the frozen encoder. For higher capacity at the same parameter budget, Jammi also supports encoder adapters — LoRA injected into named linear layers inside the encoder stack, matching the PEFT convention.
Switch to encoder adapters by populating target_modules on FineTuneConfig:
#![allow(unused)]
fn main() {
extern crate jammi_ai;
use jammi_ai::fine_tune::FineTuneConfig;
fn make() -> FineTuneConfig {
let config = FineTuneConfig {
lora_rank: 8,
lora_alpha: 16.0,
// Inject LoRA into BERT's attention query and value projections.
target_modules: vec!["query".to_string(), "value".to_string()],
..Default::default()
};
config }
}
job = db.fine_tune(
source="training",
base_model="sentence-transformers/all-MiniLM-L6-v2",
columns=["text_a", "text_b", "score"],
method="lora",
task="text_embedding",
target_modules=["query", "value"],
)
Target-module conventions
Pick target_modules per the architecture you’re fine-tuning:
| Architecture | Common target_modules |
|---|---|
| BERT / RoBERTa / CamemBERT / XLM-RoBERTa | ["query", "value"] (recommended) or ["query", "key", "value", "dense"] |
| DistilBERT | ["q_lin", "v_lin"] or ["q_lin", "k_lin", "v_lin", "out_lin"] |
| ModernBERT | ["Wqkv", "Wo"] (fused QKV + output) |
| Any encoder | ["all-linear"] — every linear layer gets an adapter (largest capacity) |
Names match the trailing module-name segment in the HuggingFace weight layout. Suffix matching is the rule, so "query" matches "attention.self.query".
Layer ranges and per-module ranks
Two optional refinements:
layers_to_transform— restrict injection to specific 0-based layer indices.None(default) applies to every layer.rank_pattern— overridelora_rankfor individual modules. Keys are substring matches against the module name; values are the override rank.
#![allow(unused)]
fn main() {
extern crate jammi_ai;
use jammi_ai::fine_tune::FineTuneConfig;
fn make() -> FineTuneConfig {
let mut rank_pattern = std::collections::HashMap::new();
rank_pattern.insert("query".to_string(), 16); // higher capacity on Q
rank_pattern.insert("value".to_string(), 4); // lower on V
let config = FineTuneConfig {
lora_rank: 8, // default rank
target_modules: vec!["query".into(), "value".into()],
layers_to_transform: Some(vec![6, 7, 8, 9, 10, 11]), // top half only
rank_pattern,
..Default::default()
};
config }
}
On-disk artifact
Every fine-tuned model writes adapter.safetensors plus an
adapter_config.json whose adapter_type tag discriminates between the two
adapter shapes Jammi produces.
Encoder-adapters example:
{
"adapter_type": "encoder_adapters",
"model_type": "bert",
"lora_rank": 8,
"lora_alpha": 16.0,
"use_rslora": false,
"target_modules": ["query", "value"],
"layers_to_transform": [6, 7, 8, 9, 10, 11],
"rank_pattern": {"query": 16, "value": 4},
"backbone_dtype": "f32"
}
Projection-head example:
{
"adapter_type": "projection_head",
"lora_rank": 8,
"lora_alpha": 16.0,
"head_layers": ["projection"]
}
The Candle inference backend reads adapter_config.json on model load and
dispatches on adapter_type: encoder_adapters rebuilds the encoder with
frozen backbone weights plus the LoRA A/B from adapter.safetensors;
projection_head loads the saved projection weights as a LoraLinear
applied after pooling.
When to use each
- Projection head — fastest training, smallest artifact, lowest memory.
The default when
target_modulesis empty. Best for adapting embedding direction without changing per-token attention behaviour. - Encoder adapters — higher representational ceiling per adapter parameter; required if the task needs to reshape attention behaviour (e.g. a domain where the base attention pattern mismatches the query distribution). Costs a slightly slower forward pass since the LoRA path runs per layer.
Training safety
- Divergence detection: if loss is NaN or >100 for 3 consecutive batches, the job fails with a clear error
- Early stopping: training stops when validation loss doesn’t improve for
patienceepochs, best checkpoint weights are restored - Checkpoints: saved at ~10% intervals for crash recovery
Evaluate and Compare Models
Measure embedding quality and classification accuracy against golden datasets. Results are recorded in the catalog for tracking over time.
Prepare a golden dataset
A golden dataset is any registered source with the right columns. No special format required.
Retrieval golden set
query_id,query_text,relevant_id
q1,quantum computing applications,1
q1,quantum computing applications,4
q2,machine learning for science,2
| Column | Type | Required |
|---|---|---|
query_id | Utf8 | yes |
query_text | Utf8 | yes |
relevant_id | Utf8 or Int | yes |
relevance_grade | Int32 | no (default: 1 = binary) |
Register it as a source:
db.add_source("golden", path="/data/golden_relevance.csv", format="csv")
Evaluate embedding quality
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let report = session.eval_embeddings(
"patents",
None, // use latest embedding table
"golden.public.golden_relevance", // golden dataset
10, // k for recall@k, precision@k
).await?;
println!("recall@10: {}", report.aggregate.recall_at_k);
println!("precision@10: {}", report.aggregate.precision_at_k);
println!("MRR: {}", report.aggregate.mrr);
println!("nDCG: {}", report.aggregate.ndcg);
Ok(()) }
}
Python
metrics = db.eval_embeddings(
source="patents",
golden_source="golden.public.golden_relevance",
k=10,
)
agg = metrics["aggregate"]
print(f"recall@10: {agg['recall_at_k']:.3f}")
print(f"precision@10: {agg['precision_at_k']:.3f}")
print(f"MRR: {agg['mrr']:.3f}")
print(f"nDCG: {agg['ndcg']:.3f}")
Per-query drill-down
The report also carries a per_query array — one record per golden-set query, in golden order. This is what sample-based statistical rules (Welch’s t, Mann-Whitney U) consume at gate time.
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let report = session.eval_embeddings("patents", None, "golden.public.golden_relevance", 10).await?;
for record in &report.per_query {
println!("{}: recall={:.3} ndcg={:.3}",
record.query_id, record.metrics.recall, record.metrics.ndcg);
}
Ok(()) }
}
for record in metrics["per_query"]:
m = record["metrics"]
print(f"{record['query_id']}: recall={m['recall']:.3f} ndcg={m['ndcg']:.3f}")
Retrieval metrics
| Metric | What it measures |
|---|---|
recall_at_k | Fraction of relevant docs found in top-k |
precision_at_k | Fraction of top-k that are relevant |
mrr | Reciprocal rank of the first relevant result |
ndcg | Normalized discounted cumulative gain (uses graded relevance if provided) |
All metrics are in [0, 1]. Higher is better.
Compare models (A/B)
Compare a base model against a fine-tuned model:
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession, base_table: String, finetuned_table: String) -> jammi_db::error::Result<()> {
let comparison = session.eval_compare(
&[base_table.clone(), finetuned_table.clone()],
"patents",
"golden.public.golden_relevance",
10,
).await?;
// The first entry is the baseline (`delta: None`); every subsequent entry
// carries a delta against it.
for entry in comparison.per_table.iter().skip(1) {
let delta = entry.delta.as_ref().expect("non-baseline entries carry a delta");
println!(
"{}: recall@10 delta {:+.3} ({:+.1}%)",
entry.table_name,
delta.recall_at_k.absolute,
delta.recall_at_k.relative * 100.0,
);
}
Ok(()) }
}
Python
comparison = db.eval_compare(
embedding_tables=[base_table, finetuned_table],
source="patents",
golden_source="golden.public.golden_relevance",
k=10,
)
# `per_table[0]` is the baseline (`delta` is None); subsequent entries
# carry a `delta` dict keyed by metric name (recall_at_k, precision_at_k,
# mrr, ndcg) with `absolute` and `relative` sub-keys.
for entry in comparison["per_table"][1:]:
d = entry["delta"]["recall_at_k"]
print(f"{entry['table_name']}: recall@10 delta {d['absolute']:+.3f} ({d['relative']*100:+.1f}%)")
The first table is the baseline. Deltas (absolute and relative) are computed for all subsequent tables.
Evaluate classification
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::eval::{EvalTask, InferenceAggregate};
let report = session.eval_inference(
"facebook/bart-large-mnli",
"test_data",
&["text".into()],
EvalTask::Classification,
"golden.public.labels",
"category",
).await?;
match &report.aggregate {
InferenceAggregate::Classification(c) => {
println!("Accuracy: {}", c.accuracy);
println!("Macro F1: {}", c.f1);
}
InferenceAggregate::Ner(n) => {
println!("NER F1: {}", n.f1);
}
}
println!("per_record predictions: {}", report.per_record.len());
Ok(()) }
}
Python
metrics = db.eval_inference(
model="facebook/bart-large-mnli",
source="test_data",
columns=["text"],
task="classification",
golden_source="golden.public.labels",
label_column="category",
)
# `aggregate` is tagged by `task`; for classification it carries
# `accuracy`, `f1`, and `per_class`.
agg = metrics["aggregate"]
print(f"Accuracy: {agg['accuracy']:.3f}")
print(f"Macro F1: {agg['f1']:.3f}")
# `per_record` is one entry per aligned predicted/gold pair.
print(f"per_record predictions: {len(metrics['per_record'])}")
Eval runs in the catalog
Every evaluation is recorded automatically:
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let runs = session.catalog().list_eval_runs().await?;
for run in &runs {
println!("{}: {} on {} (k={:?})", run.eval_run_id, run.eval_type, run.golden_source, run.k);
}
Ok(()) }
}
Schema validation
Golden datasets are validated before evaluation starts. Missing or wrong-type columns produce clear errors:
Eval error: Golden dataset missing required column 'query_text'
Eval error: Golden dataset column 'query_id' has type Boolean, expected Utf8
Integer ID columns (Int32, Int64) are accepted where Utf8 is expected.
Connect to PostgreSQL / MySQL
Jammi federates external databases alongside local files. Register a database as a source and query it with the same SQL interface — joins across local files and databases work seamlessly.
PostgreSQL
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_db::source::{SourceConnection, SourceType};
session.add_source("pg_data", SourceType::Postgres, SourceConnection {
url: Some("postgresql://user:pass@localhost:5432/mydb".into()),
..Default::default()
}).await?;
let results = session.sql(
"SELECT id, title FROM pg_data.public.articles WHERE published = true LIMIT 10"
).await?;
Ok(()) }
}
MySQL
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::source::{SourceConnection, SourceType};
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.add_source("mysql_data", SourceType::Mysql, SourceConnection {
url: Some("mysql://user:pass@localhost:3306/mydb".into()),
..Default::default()
}).await?;
Ok(()) }
}
Cross-source joins
Once registered, external databases are queryable with the same three-part naming convention and can be joined with local files:
SELECT p.title, a.author_name
FROM local_data.public.papers p
JOIN pg_data.public.authors a ON p.author_id = a.id
WHERE a.institution = 'MIT'
Generate embeddings from external sources
External databases work as sources for embedding generation:
# Note: external databases must be registered through the Rust API,
# which exposes the typed SourceType::Postgres / SourceType::Mysql variants.
# The Python `add_source(url=…, format=…)` surface is for file-shaped sources.
db.generate_text_embeddings(
source="pg_articles",
model="sentence-transformers/all-MiniLM-L6-v2",
columns=["title", "abstract"],
key="id",
)
Feature flags
External source support requires feature flags when building from source:
| Source | Feature flag |
|---|---|
| PostgreSQL | postgres |
| MySQL | mysql |
These are enabled by default in published crates and pre-built binaries.
Supported source types
| Type | Description | Status |
|---|---|---|
File (file://) | Parquet, CSV, JSON on local disk | Always available |
File (s3:// / gs:// / azure://) | Same formats over cloud object stores | Feature-gated — see Cloud Storage |
| PostgreSQL | Any PostgreSQL-compatible database | Available |
| MySQL | MySQL / MariaDB | Available |
| SQLite | SQLite databases | Not supported (rusqlite version conflict) |
Store Sources and Results in Cloud Object Storage
Jammi treats local disk, S3, GCS, and Azure Blob as interchangeable backends. Any place the engine accepts a local file path it also accepts a storage URL — file://, s3://, gs://, or azure:// — including registered file-shaped sources and the result-table Parquet that embedding and inference jobs write.
Build with the cloud features you need
The default build ships only file:// and the in-memory test driver. Cloud schemes are opt-in per provider so a deployment that only uses S3 does not pull in the GCS and Azure SDK chains:
| Feature | Schemes it enables |
|---|---|
storage-s3 | s3:// (AWS S3 and S3-compatible: MinIO, R2, LocalStack) |
storage-gcs | gs:// |
storage-azure | azure://, abfss:// |
storage-cloud | All three (umbrella) |
[dependencies]
jammi-db = { version = "0.5", features = ["storage-s3", "storage-gcs"] }
Live integration tests live behind matching live-s3-tests, live-gcs-tests, live-azure-tests features so the hermetic cargo test lane never reaches the network.
Register an S3-backed source
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> Result<(), Box<dyn std::error::Error>> {
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
use jammi_db::storage::{CloudConfig, S3Config, StorageUrl};
let url = StorageUrl::parse("s3://benchmarks/snapshots/2026/papers.parquet")?;
let conn = SourceConnection {
url: Some(url.to_string()),
format: Some(FileFormat::Parquet),
cloud: Some(CloudConfig::S3(S3Config {
region: Some("us-east-1".into()),
..Default::default()
})),
..Default::default()
};
session.add_source("papers", SourceType::File, conn).await?;
let rows = session
.sql("SELECT id, title FROM papers.public.papers LIMIT 10")
.await?;
Ok(()) }
}
If the cloud field is None and the URL is a cloud scheme, the driver falls back to the SDK’s ambient credential chain — env vars, instance profile, IRSA, ADC, Managed Identity.
Python
from jammi_ai import Database
db = Database()
db.add_source("papers", url="s3://benchmarks/snapshots/2026/papers.parquet", format="parquet")
db.sql("SELECT id, title FROM papers.public.papers LIMIT 10")
The Python binding accepts the same URL forms as the Rust API; per-source cloud credentials are read from process environment.
CLI
jammi sources add papers \
--url s3://benchmarks/snapshots/2026/papers.parquet \
--format parquet
GCS and Azure
The pattern is identical — only the URL prefix and the CloudConfig variant change:
#![allow(unused)]
fn main() {
extern crate jammi_db;
use jammi_db::source::{FileFormat, SourceConnection};
fn make() -> SourceConnection {
use jammi_db::storage::{CloudConfig, GcsConfig};
let conn = SourceConnection {
url: Some("gs://archives/2026/jan.parquet".into()),
format: Some(FileFormat::Parquet),
cloud: Some(CloudConfig::Gcs(GcsConfig {
service_account_path: Some("/etc/jammi/sa.json".into()),
..Default::default()
})),
..Default::default()
};
conn }
}
#![allow(unused)]
fn main() {
extern crate jammi_db;
use jammi_db::source::{FileFormat, SourceConnection};
fn make() -> Result<SourceConnection, Box<dyn std::error::Error>> {
use jammi_db::storage::{AzureConfig, CloudConfig};
let conn = SourceConnection {
url: Some("azure://snapshots/model_outputs.parquet".into()),
format: Some(FileFormat::Parquet),
cloud: Some(CloudConfig::Azure(AzureConfig {
account_name: Some("mystorage".into()),
sas_token: Some(std::env::var("AZURE_SAS_TOKEN")?),
..Default::default()
})),
..Default::default()
};
Ok(conn) }
}
Persist result tables to the cloud
ResultStore accepts a [StorageUrl] root, so embedding and inference outputs land in the same bucket as the source data:
#![allow(unused)]
fn main() {
extern crate jammi_db;
use std::sync::Arc;
use jammi_db::catalog::Catalog;
fn ex(catalog: Arc<Catalog>) -> jammi_db::error::Result<()> {
use jammi_db::storage::{StorageRegistry, StorageUrl};
use jammi_db::store::ResultStore;
use std::sync::Arc;
let root = StorageUrl::parse("s3://benchmarks/jammi_db")?;
let registry = StorageRegistry::new();
let result_store = Arc::new(ResultStore::with_root(root, registry, catalog)?);
Ok(()) }
}
Every result table the session creates writes its Parquet and sidecar ANN index to that prefix; delete_table_files and the crash-recovery pass operate against the same backend.
How the layout maps onto buckets
For a result table named papers__text_embedding__bge-m3__20260520T120000Z_abc12345, the engine writes three siblings:
s3://benchmarks/jammi_db/papers__text_embedding__bge-m3__….parquet
s3://benchmarks/jammi_db/papers__text_embedding__bge-m3__….idx.usearch
s3://benchmarks/jammi_db/papers__text_embedding__bge-m3__….idx.rowmap
s3://benchmarks/jammi_db/papers__text_embedding__bge-m3__….idx.manifest.json
The sidecar layout is the same on every backend; the only difference is the driver under the hood. USearch’s path-based FFI is bridged through a tempfile for cloud schemes so its save / load calls work unchanged.
Register a Mutable Companion Table
A mutable companion table lives in the same backend database as the
Jammi catalog (SQLite by default, Postgres in shared deployments), supports
transactional INSERT / UPDATE / DELETE through DataFusion DML, and
federates with Parquet result tables and external sources in one SQL
surface. Reach for it when a tenant needs a relation it can edit row by row
— a feature-store slowly-changing dimension table, a per-user state table,
a config-driven lookup — that still has to participate in the same JOINs as
your immutable result tables.
The primitive carries only what every consumer needs: a schema, a primary key, optional tenant scope, optional secondary indexes, optional ordering column. No history semantics, no lifecycle vocabulary, no audit columns.
Goal
This recipe walks through registering one mutable companion table for a neutral third-party use case (a feature-store team called Polaris Features maintaining slowly-changing dimensions for their recommender) and shows the equivalent Rust / Python / CLI surface.
Setup
Assumes a working JammiSession. The session opens the catalog at the
configured artifact directory; nothing else is needed.
Define the schema
Polaris keeps one row per (item_id, valid_from, valid_to) interval:
#![allow(unused)]
fn main() {
extern crate arrow_schema;
fn make() {
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema};
let schema = Arc::new(Schema::new(vec![
Field::new("item_id", DataType::Utf8, false),
Field::new("price_tier", DataType::Utf8, false),
Field::new("availability", DataType::Utf8, false),
Field::new("valid_from", DataType::Int64, false), // epoch milliseconds
Field::new("valid_to", DataType::Int64, true), // epoch milliseconds; NULL = open
]));
}
}
The catalog encoder accepts the closed primitive subset enforced by every
MutableBackend impl — Boolean, the integer family, Float32 / Float64,
Utf8, Binary. Wider types (e.g. Timestamp, Decimal) round-trip via
their natural numeric encoding (Int64 epoch milliseconds, scaled Int64)
so the schema stays narrow and the rule stays one-line at the boundary.
The engine reserves tenant_id and any column whose name starts with _
— the schema builder rejects them at build time per ADR-00. (The
tenant_id column is always present on the storage table; the engine
appends it implicitly.)
Build the definition
MutableTableDefinitionBuilder chains the field validations:
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate arrow_schema;
use std::sync::Arc;
use arrow_schema::Schema;
use jammi_db::store::mutable::definition::{
MutableIndexDef, MutableTableDefinitionBuilder, MutableTableId,
};
fn make(schema: Arc<Schema>) -> jammi_db::store::mutable::definition::MutableTableDefinition {
let def = MutableTableDefinitionBuilder::new(
MutableTableId::new("item_dimensions").unwrap(),
schema,
)
.primary_key(vec!["item_id".into(), "valid_from".into()])
.index(MutableIndexDef {
name: "idx_item_dim_active".into(),
columns: vec!["item_id".into(), "valid_to".into()],
unique: false,
})
.build()
.unwrap();
def
}
}
The primary key must be a non-empty subset of the schema; secondary indexes
are optional but persisted on the storage table so the backend can use
them for WHERE clauses.
Register
The registration is atomic: catalog row + storage CREATE TABLE + every
secondary CREATE INDEX commit together. If any step fails, nothing lands.
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use jammi_db::store::mutable::definition::MutableTableDefinition;
use jammi_db::session::JammiSession;
async fn ex(session: &JammiSession, def: MutableTableDefinition) -> jammi_db::error::Result<()> {
let id = session.create_mutable_table(def).await?;
// The table is now queryable as `mutable.public.item_dimensions` in the
// same SQL surface that federates result tables and external sources.
Ok(())
}
}
Python
import pyarrow as pa
import jammi_ai
db = jammi_ai.connect(artifact_dir="/var/lib/jammi")
# The Python wrapper exposes mutable-table registration through the
# `create_mutable_table` accessor (see `jammi.mutable`). The recipe below
# is illustrative; consult the API reference for the binding shape your
# version ships.
CLI
The jammi CLI exposes mutable-table registration through the lower-level
sources surface for now; programmatic clients should use the Rust or
Python APIs.
Verify
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
async fn ex(session: &jammi_db::session::JammiSession) -> jammi_db::error::Result<()> {
let zero_rows = session
.sql("SELECT * FROM mutable.public.item_dimensions LIMIT 0")
.await?;
assert_eq!(zero_rows[0].schema().fields().len(), 5);
Ok(())
}
}
The query returns a zero-row batch with the declared schema — confirmation
that the table is registered and DataFusion can route mutable.public.<id>
correctly.
Federation tease
The mutable table now JOINs with your existing result tables and sources:
SELECT d.item_id, d.price_tier, e.embedding
FROM mutable.public.item_dimensions d
JOIN itemembs.public.item_embeddings e ON e.item_id = d.item_id
WHERE d.valid_to IS NULL
AND d.price_tier = 'premium'
LIMIT 10;
See the Run Transactional Updates on a Mutable Table recipe for INSERT / UPDATE / DELETE round-trips and the SCD Type 2 close-and-open pattern.
Run Transactional Updates on a Mutable Table
Once a mutable companion table is registered (see
Register a Mutable Companion Table), you
update its rows with the same SQL surface that runs your read queries.
Every INSERT / UPDATE / DELETE lands in one backend transaction —
either every row commits or none does — and federates with your immutable
result tables in subsequent SELECTs.
Goal
Walk through the three DML verbs against the item_dimensions table from
the previous recipe, then demonstrate the Slowly-Changing Dimension Type
2 close-and-open pattern Polaris uses to record a price-tier change.
Insert
INSERT INTO mutable.public.item_dimensions
(item_id, price_tier, availability, valid_from)
VALUES
('sku-1842', 'standard', 'in_stock', '2026-04-01T00:00:00Z'),
('sku-2901', 'premium', 'in_stock', '2026-04-01T00:00:00Z'),
('sku-3457', 'standard', 'out_of_stock', '2026-04-01T00:00:00Z');
The RecordBatch returned by session.sql(...) carries a single-row
UInt64 column called count per DataFusion’s TableProvider::insert_into
contract. Three rows landed; the JOIN-against-result-tables query from
the previous recipe now returns three rows.
Update
UPDATE mutable.public.item_dimensions
SET availability = 'low_stock'
WHERE item_id = 'sku-2901';
Predicate columns that participate in an index pushdown become a backend
WHERE clause; the rest filter above the scan node. The update commits
in one transaction; if the predicate matches zero rows, the call succeeds
with rows_affected = 0.
Delete
DELETE FROM mutable.public.item_dimensions
WHERE item_id = 'sku-3457';
DELETE follows the same shape. Row-level cascades are SQLite’s job (the
foreign-key declarations on the storage table); the engine does not model
cascades above the backend.
SCD Type 2 — close-and-open
Polaris records a price-tier change by closing the active row’s
valid_to and inserting a new row with the new tier. Both statements
must land atomically; today the supported pattern is to issue them as a
single multi-statement SQL string through session.sql, which DataFusion
plans as one DML batch under one transaction:
-- Single sql() call so both statements land in one transaction.
UPDATE mutable.public.item_dimensions
SET valid_to = '2026-05-15T12:00:00Z'
WHERE item_id = 'sku-1842' AND valid_to IS NULL;
INSERT INTO mutable.public.item_dimensions
(item_id, price_tier, availability, valid_from)
VALUES
('sku-1842', 'premium', 'in_stock', '2026-05-15T12:00:00Z');
A future JammiSession::transaction(|tx| async { … }) API will make
multi-statement DML atomicity explicit; today the multi-statement SQL
string is the supported surface.
Federation join
The mutable table now joins with the embedding table to surface recommender candidates filtered by current tier:
SELECT d.item_id, d.price_tier, e.embedding
FROM mutable.public.item_dimensions d
JOIN itemembs.public.item_embeddings e ON e.item_id = d.item_id
WHERE d.valid_to IS NULL
AND d.price_tier = 'premium'
LIMIT 10;
The federation is the engine’s existing FederationOptimizerRule work
— no special integration needed; mutable tables register under the same
SessionContext as your Parquet result tables and external sources.
Crash recovery
If the process dies mid-write, no partial commit is visible on restart.
SQLite’s WAL mode (documentation)
and Postgres’s MVCC each guarantee that an open transaction either
commits as a whole or is rolled back on connection loss. The engine
inherits that guarantee through the CatalogBackend::transaction
closure shape: when the closure returns Err(_), the backend rolls
back; when the process is killed mid-execution, the backend rolls back
the in-flight transaction.
Direct-access append + replay (Phase 4 trigger streams)
Two lower-level methods bypass DataFusion’s planner for high-throughput event paths:
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate arrow;
extern crate tokio;
async fn ex(
session: &jammi_db::session::JammiSession,
batch: arrow::array::RecordBatch,
) -> jammi_db::error::Result<()> {
use jammi_db::store::mutable::definition::MutableTableId;
use jammi_db::catalog::backend::TxOptions;
let id = MutableTableId::new("events").unwrap();
let registry = session.mutable_tables_arc();
let backend = session.catalog().backend_arc();
// Direct INSERT via insert_batch — caller owns the transaction.
backend
.transaction(TxOptions::default(), move |tx| {
let registry = registry.clone();
let id = id.clone();
let batch = batch.clone();
Box::pin(async move {
registry
.insert_batch(tx, &id, &batch)
.await
.map_err(|e| jammi_db::BackendError::Execution(e.to_string()))?;
Ok::<(), jammi_db::BackendError>(())
})
})
.await?;
Ok(())
}
}
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate futures;
extern crate tokio;
use futures::StreamExt;
async fn ex(
session: &jammi_db::session::JammiSession,
) -> jammi_db::error::Result<()> {
use jammi_db::store::mutable::definition::MutableTableId;
let id = MutableTableId::new("events").unwrap();
// Stream rows where the registered `order_column` value > 100.
let mut stream = session
.mutable_tables()
.scan_after(&id, 100)
.await
.map_err(|e| jammi_db::error::JammiError::Catalog(e.to_string()))?;
while let Some(batch) = stream.next().await {
let _batch = batch
.map_err(|e| jammi_db::error::JammiError::Catalog(e.to_string()))?;
// …
}
Ok(())
}
}
These are the surface Phase 4’s trigger broker uses to publish events into a backing table and replay subscribers; general consumers should prefer the SQL surface.
Publish Events to a Topic
A trigger-stream topic is a catalog-registered Arrow schema plus a
backing mutable table. Publishers append RecordBatches; subscribers
filter and receive them. The engine owns the offset counter and the
durable event log; the broker (in-memory by default, NATS JetStream
in clustered deployments) fans live deliveries out to attached
subscribers.
Reach for the trigger stream when a tenant needs event semantics — a CDC pipeline, a feature-store update bus, a job-completion notification fan-out — that has to coexist with the SQL surface the rest of the platform already uses. Every published event lands as a row in the topic’s backing mutable table; that table is queryable with the same Flight SQL surface as any other mutable companion table, so ad-hoc analytics on the event log come for free.
Goal
Walk through registering one topic for a neutral third-tenant use case (a small CDC pipeline pulling Postgres change events into a downstream search index) and publish a batch of events from Rust.
Setup
Assumes a JammiSession whose JammiConfig.trigger_broker is left at
its default — the embedded InMemoryBroker. Production deployments
swap in JetStreamBroker via configuration; the publisher API does
not change.
Define the topic schema
#![allow(unused)]
fn main() {
extern crate arrow_schema;
fn make() {
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema};
let schema = Arc::new(Schema::new(vec![
Field::new("op", DataType::Utf8, false),
Field::new("ts_ms", DataType::Int64, false),
Field::new("key", DataType::Utf8, false),
Field::new("after", DataType::Utf8, true),
]));
}
}
The schema is the contract every published batch must satisfy. The
engine reserves the _offset, _row_idx, and _produced_at column
names (all leading-underscore names are reserved); user schemas must
not include them.
Register the topic
The simplest path is the SQL surface — session.sql("CREATE TOPIC …")
parses the same statement Flight SQL clients send across the wire:
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use jammi_db::session::JammiSession;
async fn ex(session: &JammiSession) -> jammi_db::error::Result<()> {
session
.sql(
"CREATE TOPIC cdc.orders (\
op TEXT NOT NULL, ts_ms BIGINT NOT NULL, \
key TEXT NOT NULL, after TEXT) \
WITH (retention_seconds = '604800')",
)
.await?;
Ok(())
}
}
The CLI exposes the same shape via jammi trigger register --name … --schema ….
import jammi_ai
db = jammi_ai.connect(artifact_dir="/var/lib/jammi")
db.sql(
"""
CREATE TOPIC cdc.orders (
op TEXT NOT NULL,
ts_ms BIGINT NOT NULL,
key TEXT NOT NULL,
after TEXT
) WITH (retention_seconds = '604800')
"""
)
For callers that build the topic programmatically (rather than via SQL), the Rust API surface is equivalent:
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate arrow_schema;
use std::collections::BTreeMap;
use std::sync::Arc;
use arrow_schema::SchemaRef;
use jammi_db::trigger::{TopicDefinition, TopicId};
fn make(schema: SchemaRef) -> TopicDefinition {
let topic = TopicDefinition {
id: TopicId::new(),
name: "cdc.orders".into(),
schema,
tenant: None, // None = global; Some(t) scopes to t
broker_metadata: BTreeMap::new(), // driver-specific opts (e.g. retention)
};
topic
}
}
The id is a UUIDv7 minted at construction — time-ordered so the
catalog index keeps insert locality. The name is opaque to the
engine beyond catalog lookup; pick a hierarchical namespace that suits
your platform (e.g. cdc.orders, feature_store.user_features).
Registration is atomic: the topics row, the backing mutable table,
and any broker-side state commit together; nothing lands on failure.
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use std::sync::Arc;
use jammi_db::trigger::{TopicDefinition, TriggerBroker};
async fn ex(
topic_repo: &jammi_db::catalog::topic_repo::TopicRepo,
broker: Arc<dyn TriggerBroker>,
topic: &TopicDefinition,
) -> Result<(), jammi_db::trigger::TriggerError> {
broker.register_topic(topic).await?;
topic_repo.register_topic(topic).await?;
Ok(())
}
}
Publish a batch
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate arrow;
extern crate arrow_schema;
extern crate tokio;
use std::sync::Arc;
use arrow::array::{Int64Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use jammi_db::trigger::{Publisher, TopicDefinition};
use jammi_db::TenantId;
async fn ex(
publisher: &Publisher,
topic: &TopicDefinition,
schema: SchemaRef,
tenant: Option<TenantId>,
) -> Result<(), jammi_db::trigger::TriggerError> {
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["c", "u", "d"])),
Arc::new(Int64Array::from(vec![1700_000_000_000, 1700_000_000_100, 1700_000_000_200])),
Arc::new(StringArray::from(vec!["order-1", "order-2", "order-3"])),
Arc::new(StringArray::from(vec![Some("{...}"), Some("{...}"), None])),
],
)
.unwrap();
let offset = publisher.publish_scoped(topic, tenant, batch).await?;
println!("published offset = {}", offset.value());
Ok(())
}
}
publish_scoped tags every row’s tenant_id column from the explicit
tenant: Option<TenantId> argument — no silent dependency on session
state at publish time. Pass None for global topics; pass the session’s
current tenant (session.tenant()) for tenant-scoped publishes.
Python equivalent — publish_topic accepts a pyarrow.Table via the
Arrow C Stream Interface so the conversion is zero-copy:
import pyarrow as pa
table = pa.table({
"op": ["c", "u", "d"],
"ts_ms": [1700_000_000_000, 1700_000_000_100, 1700_000_000_200],
"key": ["order-1", "order-2", "order-3"],
"after": ["{...}", "{...}", None],
})
offset = db.publish_topic("cdc.orders", batch=table)
print(f"published offset = {offset}")
publish_scoped validates the batch schema against the topic schema
before opening a transaction. A mismatch returns BatchSchemaMismatch
and nothing lands in the backing table. If the topic is tenant-pinned
(TopicDefinition::tenant = Some(t)) and the tenant argument doesn’t
match, the publish is rejected up front with
PublishTenantMismatch.
What just happened
- The
Publisherminted the next monotonic offset for the topic (seeded lazily fromMAX(_offset)on the backing table the first time the topic is touched). - The augmented batch — user columns plus
_offset,_row_idx, and_produced_at— was inserted into the topic’s backing mutable table inside oneCatalogBackend::transaction. On commit the offset advances; on rollback it is reused for the next attempt so no gaps appear in the log. - The broker received the batch for best-effort fan-out to any live subscribers. A broker fan-out failure after commit is logged but does not fail the publish — subscribers replay missed offsets from the backing table on next reconnect.
See also
- Subscribe with a SQL Predicate Filter —
open a subscription with a
WHERE-style predicate and consume the matching batches. - Replay Events from the Backing Table — run ad-hoc Flight SQL over the durable event log.
Subscribe to a Topic with a SQL Predicate Filter
A subscription tails a topic and yields only the batches whose rows
satisfy a SQL WHERE-clause predicate. The predicate is parsed once
through DataFusion at subscribe time; the broker delivers each batch,
the engine evaluates the predicate against it, and rows that match
flow to the consumer.
Reach for predicate-filtered subscriptions when different downstream
consumers need different selectivity on the same event stream — a
search-index that only cares about op = 'c', an audit log that
wants every event, a cache invalidator that wants op IN ('u', 'd').
Goal
Open a subscription on the cdc.orders topic with a predicate that
matches only deletes, and consume the stream from Rust.
Setup
Assumes the topic was registered (see
Publish Events to a Topic) and you have a
Subscriber constructed against the same broker the publisher uses.
Open the subscription
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate arrow;
extern crate arrow_schema;
extern crate datafusion;
extern crate futures;
extern crate tokio;
use std::sync::Arc;
use arrow_schema::SchemaRef;
use datafusion::execution::context::SessionContext;
use futures::StreamExt;
use jammi_db::trigger::{Predicate, Subscriber, TopicDefinition};
async fn ex(
subscriber: &Subscriber,
session: &SessionContext,
topic: &TopicDefinition,
) -> Result<(), jammi_db::trigger::TriggerError> {
let predicate = Predicate::from_sql(session, Arc::clone(&topic.schema), "op = 'd'")?;
let mut stream = subscriber
.subscribe(topic, predicate, None /* from_offset: None = live tail */)
.await?;
while let Some(delivered) = stream.next().await {
let batch = delivered?;
handle_deletes(batch.batch);
}
Ok(())
}
fn handle_deletes(_: arrow::array::RecordBatch) {}
}
from_offset = None starts the subscription at the broker’s live tail
(no replay). Some(0) starts from the earliest retained event; the
engine joins backing-table replay with the live broker stream so the
client sees one continuous sequence of DeliveredBatch.
Predicate dialect
Predicates are a subset of DataFusion SQL. The whitelist:
| Supported | Rejected |
|---|---|
Column references (col) | Subqueries (SELECT …) |
Literal scalars (1, 'foo', true) | Aggregates (SUM, COUNT, …) |
Comparison ops (=, <, >, <=, >=, !=) | Window functions |
Boolean ops (AND, OR, NOT) | Joins |
IS NULL, IS NOT NULL | CASE WHEN |
IN (literal, literal, …) | Functions outside the whitelist |
LIKE, BETWEEN | |
| Whitelisted string functions |
The string-function whitelist is lower, upper, length,
starts_with, ends_with. Anything outside this list returns
PredicateUnsupported at subscribe time — the stream never opens.
An unparseable predicate returns PredicateParse for the same
reason.
Reconnection and replay
If your consumer disconnects and reconnects, pass the last-seen
offset as from_offset to resume without missing events:
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate chrono;
extern crate tokio;
use chrono::Utc;
use std::sync::Arc;
use jammi_db::trigger::{Offset, Predicate, Subscriber, TopicDefinition};
async fn ex(
subscriber: &Subscriber,
topic: &TopicDefinition,
last_seen: u64,
) -> Result<(), jammi_db::trigger::TriggerError> {
let resume_from = Offset::new(last_seen + 1, Utc::now());
let _stream = subscriber
.subscribe(topic, Predicate::match_all(), Some(resume_from))
.await?;
Ok(())
}
}
The engine reads the backing table for offsets >= resume_from, then
attaches the broker live stream starting strictly above the last
replayed offset — the two halves never deliver the same offset twice.
Backpressure
A slow consumer slows the producer; events are not dropped. The
chain is: the broker tail backs up onto a bounded mpsc::channel, the
channel’s send() future awaits, the broker poll loop pauses,
publishers awaiting the broker fan-out experience matching back-
pressure. The backing table — the authoritative log — is still
written without delay, so a consumer that disconnects under load can
always catch up via replay.
See also
- Publish Events to a Topic — the publisher side.
- Replay Events from the Backing Table — bypass the broker entirely and read the event log via Flight SQL.
Replay Events from the Backing Table
Every topic’s event log is a Phase-2 mutable companion table named
__topic_<topic_id>. The double-underscore prefix is reserved for
engine-controlled tables; consumers do not register tables under that
namespace. Flight SQL queries against the backing table compose with
the same federation surface the rest of Jammi exposes — joins with
result tables, predicate pushdown, aggregates over event history.
Reach for direct replay when a tenant needs ad-hoc analytics on the event log that would be awkward through the subscribe surface — counting events per key, computing per-day rollups, joining the event stream against a Parquet result table.
Goal
Run an ad-hoc query that returns the count of op = 'd' events per
hour over the durable log for cdc.orders.
Backing table naming
Every registered topic has a backing table whose name is
__topic_<topic_id> where <topic_id> is the hyphenated lowercase
TopicId::Display. To find the name, query topics:
SELECT topic_id, name, backing_table FROM topics WHERE name = 'cdc.orders';
Schema
The backing table’s columns are the topic’s user schema with three engine-controlled columns prepended:
| Column | Type | Purpose |
|---|---|---|
_offset | BIGINT NOT NULL | Monotonic offset; stable across rows of one publish. |
_row_idx | BIGINT NOT NULL | Position within a publish, for the composite PK. |
_produced_at | BIGINT NOT NULL (UTC microseconds) | Publisher-side timestamp, single value per offset. |
| …user cols… | per TopicDefinition.schema | Payload columns. |
tenant_id | TEXT (nullable, added by Phase 2) | Tenant scope per ADR-00. |
The primary key is (_offset, _row_idx); _offset is the order column
so scan_after and ORDER BY _offset agree.
Query
SELECT
DATE_TRUNC('hour', TIMESTAMP_MICROS(_produced_at)) AS hour,
COUNT(*) AS deletes
FROM mutable.public.__topic_019088da_1234_7890_abcd_ef1234567890
WHERE op = 'd'
GROUP BY hour
ORDER BY hour;
Substitute your topic’s backing_table (looked up from the topics
catalog row) for the literal name in the example. The query runs
through Flight SQL like any other federated query — predicate
pushdown applies, joins compose, aggregates run.
Tenant scoping
The backing table carries the tenant_id column added by the Phase-2
mutable backend. Sessions bound to a tenant see only rows whose
tenant_id matches or is NULL, per Phase 3’s predicate-injection
analyzer rule — the same guard that scopes the rest of the catalog.
See also
- Publish Events to a Topic — the publisher side.
- Subscribe with a SQL Predicate Filter — the live-tail surface for these same events.
- Register a Mutable Companion Table — the substrate the backing table reuses; same schema validation, same federation properties.
Scope a Session to a Tenant
When more than one logical tenant shares a Jammi engine — a SaaS feature
store serving two ML teams, a research workbench shared across three labs,
a notebook product hosting one project per student — every catalog read and
write needs to belong to the right tenant. Jammi’s session-scoped tenant
binding does this without the caller having to spell a WHERE tenant_id = …
clause on every query.
Goal
After this recipe you can:
- Bind a tenant to a session in Rust, Python, and on the CLI.
- Verify that two sessions on the same process see disjoint rows.
- Bind a tenant on a remote client via the gRPC
SessionServiceso subsequent Flight SQL queries from the same connection observe the tenant.
Setup
Every example below assumes a configured JammiConfig (defaults are fine
for the recipe). The tenant identifier is a UUID v4 or v7 string — the
engine refuses the nil UUID (00000000-…) at the TenantId newtype
boundary.
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use std::str::FromStr;
use jammi_db::TenantId;
use jammi_db::session::JammiSession;
use jammi_db::config::JammiConfig;
async fn ex() -> jammi_db::error::Result<()> {
let config = JammiConfig::default();
let alice = TenantId::from_str("018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9a")?;
let session = JammiSession::new(config).await?.with_tenant(alice);
// Every catalog read and write on `session` now scopes to Alice.
Ok(())
}
}
with_tenant is a builder that consumes self and returns Self, so it
chains naturally. If you hold a session behind Arc, use bind_tenant(&t)
to update the binding in place — the session shares one TenantBinding
across all references.
Python
import jammi_ai
db = jammi_ai.connect(artifact_dir="/tmp/jammi")
db.with_tenant("018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9a")
# Subsequent calls observe Alice's tenant scope.
db.add_source("inbox", path="/data/alice/inbox.parquet", format="parquet")
db.sql("SELECT * FROM inbox.public.inbox")
Pass an empty string to clear: db.with_tenant("").
CLI
The --tenant flag is global; it applies to every subcommand.
jammi --tenant 018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9a sources list
jammi --tenant 018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9b query "SELECT * FROM models"
Remote clients (gRPC + Flight SQL)
A programmatic client (Python, Go, Java) binds the tenant once per
connection via the jammi.v1.session.SessionService.SetTenant RPC. The
server records the tenant against the jammi-session-id request metadata
header; every Flight SQL query the same connection issues afterwards
inherits the binding through the TenantInterceptor that fronts both
services. Browser clients reach the same SessionService over HTTP/1.1
via the gRPC-Web shim (application/grpc-web+proto) — no separate REST
surface, same jammi-session-id header semantics.
import grpc
from jammi.v1.session.session_pb2 import SetTenantRequest, Tenant
from jammi.v1.session.session_pb2_grpc import SessionServiceStub
channel = grpc.insecure_channel("jammi.example.com:50051")
metadata = [("jammi-session-id", "my-client-uuid")]
session = SessionServiceStub(channel)
session.SetTenant(
SetTenantRequest(tenant=Tenant(id="018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9a")),
metadata=metadata,
)
# Subsequent Flight SQL queries on the same channel + jammi-session-id
# observe Alice's tenant scope.
Disjoint views — what to expect
Two sessions on the same process, bound to different tenants, will:
- Read each other as invisible:
list_sources()returns the calling tenant’s sources plus any globally-scoped (tenant_id IS NULL) sources. - Write into different lanes: a
register_sourcefrom Alice produces a row taggedtenant_id = alice; Bob’slist_sourcesdoes not see it. - Share globally-scoped rows: an unscoped (
tenant_id IS NULL) registration — typically a public reference dataset — is visible to every tenant.
The engine enforces the binding at three layers (the SPEC-03 defence-in-depth discipline):
- Read-side predicate injection —
TenantScopeAnalyzerRuleinjectstenant_id = $current OR tenant_id IS NULLon everyTableScanwhose schema declares the column. - Write-side guard — every catalog
register_*and the mutable-table sink callsTransaction::assert_tenant_matchesbefore INSERT. - Storage-side filter — catalog repo reads also pass the predicate to the backend SQL layer, so the wrong tenant’s rows never leave the database.
A buggy caller that constructs a row with the wrong tenant_id gets
BackendError::TenantMismatch from the guard layer.
When the binding doesn’t apply
- External federated sources without a
tenant_idcolumn — Jammi’s analyzer rule has no column to inject against, so those sources show every row to every tenant unless the source declaration registers atenant_columnoverride. Catalog tables and mutable companion tables always carry the column. - Cross-tenant
WHEREclauses the caller writes by hand — a query that containsWHERE tenant_id = 'other-tenant'runs against the injected predicate plus the user’s clause; the analyzer rule does not remove user-written predicates. - Single-tenant deployments — bind nothing and every row is global; no
predicate is injected beyond
tenant_id IS NULL.
See also
- The discipline test in
SPEC-03 Register a Mutable Companion Tablefor how a mutable companion table also honours the tenant binding on write
Scope a Federated Source by Tenant
The session-scoped tenant binding (multi-tenant.md)
relies on every table the engine reads carrying a tenant_id column. That
works for mutable companion tables and Parquet result tables Jammi
produced itself — both emit the column by ADR-00. But a federated source
— a remote Postgres warehouse, a S3 Parquet lake, a CSV from someone
else’s pipeline — usually doesn’t. It may carry a customer_id, an
organization, a workspace column, or no tenant discriminator at all.
This recipe shows how to tell Jammi which column on a federated source
plays the role of the tenant discriminator, so the predicate-injection
analyzer rule scopes scans against that column instead of looking for the
engine’s built-in tenant_id name.
Goal
After this recipe you can:
- Register a federated source whose tenant discriminator is named
differently from
tenant_id. - Tell the analyzer rule which column to use.
- Verify two tenants get disjoint rows from the same physical source.
- Recognise what
set_source_tenant_columndoes not do.
Setup
The recipe assumes you have a Parquet file (or any other federated
source) whose schema includes a column that already carries the tenant
identifier — for example a customer_id column populated with the UUID
of the customer who owns each row. The column’s value must be the same
canonical hyphenated lowercase form TenantId::Display emits; the
analyzer rule does a string comparison after coercing the column to
Utf8.
Register a federated source
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use jammi_db::session::JammiSession;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
async fn ex(session: &JammiSession) -> jammi_db::error::Result<()> {
session
.add_source(
"notes",
SourceType::File,
SourceConnection {
url: Some("file:///data/notes.parquet".into()),
format: Some(FileFormat::Parquet),
..Default::default()
},
)
.await?;
Ok(())
}
}
Python
db.add_source("notes", path="/data/notes.parquet", format="parquet")
Declare its tenant column
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
use jammi_db::session::JammiSession;
fn ex(session: &JammiSession) {
session.set_source_tenant_column("notes", Some("customer_id".into()));
}
}
set_source_tenant_column registers the override on the session’s
SourceTenantColumns map. The next time the analyzer rule sees a scan
against notes.public.notes, it discovers the override and injects
WHERE CAST(customer_id AS Utf8) = $current_tenant OR CAST(customer_id AS Utf8) IS NULL
(or IS NULL only when the session is unscoped).
The Python and CLI surfaces do not expose this method today — it lives on Rust
JammiSessiononly. If you embed Jammi as a library this is the right hook; if you reach Jammi over Flight SQL / gRPC, the source registration and tenant-column declaration happen on the server side before the server starts accepting client connections.
Schema column tenant_id always wins. Only call
set_source_tenant_column when your federated source carries the
discriminator under a different name, or when it carries the
discriminator at all — sources without any tenant column remain
globally visible.
Verify the predicate
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use std::str::FromStr;
use jammi_db::TenantId;
use jammi_db::session::JammiSession;
use jammi_db::config::JammiConfig;
async fn ex() -> jammi_db::error::Result<()> {
let alice = TenantId::from_str("018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9a")?;
let bob = TenantId::from_str("018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9b")?;
let session_a = JammiSession::new(JammiConfig::default()).await?.with_tenant(alice);
session_a.set_source_tenant_column("notes", Some("customer_id".into()));
let session_b = JammiSession::new(JammiConfig::default()).await?.with_tenant(bob);
session_b.set_source_tenant_column("notes", Some("customer_id".into()));
let count_a = session_a.sql("SELECT COUNT(*) FROM notes.public.notes").await?;
let count_b = session_b.sql("SELECT COUNT(*) FROM notes.public.notes").await?;
// Each session sees only its own rows — `count_a` and `count_b` are
// disjoint subsets of the on-disk file.
Ok(())
}
}
For a file with 10 rows split 6 (customer_id = alice) + 4
(customer_id = bob), the two sessions get 6 and 4 respectively.
What you cannot do
- You cannot point
set_source_tenant_columnat a column that doesn’t exist on the source. The analyzer rule emits a column reference that DataFusion later fails to resolve at execution time, surfacing as aDataFusionError::SchemaError. The override is a trust contract — the engine does not validate the column’s presence at registration time. - You cannot mix
tenant_idand a non-tenant_idcolumn on the same source. When the source’s schema already declarestenant_id, the built-in column wins and the override is ignored. - You cannot remove the discriminator at runtime once tenants are
actively querying. Call
set_source_tenant_column("notes", None)to drop the override; subsequent queries onnotes.public.noteswill not be tenant-scoped at all.
If the federated source you are wrapping carries no tenant discriminator, two options are open: (1) re-shape upstream so each tenant lands in its own table, registered as a separate source, or (2) accept that the source is globally visible to every session and gate access at a higher layer (Flight SQL session interceptor, gRPC auth middleware). The engine itself does not authenticate; ADR-00 § Engine does not invent tenants applies.
See also
- Scope a Session to a Tenant — the broader session-binding recipe this one extends.
- Register a Mutable Companion Table —
for sources Jammi owns, the
tenant_idcolumn comes from ADR-00 by default.
Deploy as a Server
Jammi can run as an Arrow Flight SQL server, making all registered sources and embedding tables queryable from any Arrow-compatible client. Use this when multiple services, BI tools, or non-Rust/Python consumers need to query Jammi’s data.
The workflow
The server is a read path. Set up your data with the library or CLI, then deploy the server so other systems can query it:
# 1. Register sources and generate embeddings (CLI or library)
jammi sources add patents --path /data/patents.parquet --format parquet
# 2. Generate embeddings (library or Python — not available over Flight SQL)
python3 -c "
import jammi_ai
db = jammi_ai.connect()
db.generate_embeddings(source='patents', model='sentence-transformers/all-MiniLM-L6-v2', columns=['abstract'], key='id')
"
# 3. Start the server
jammi serve
Connecting with Arrow Flight SQL
Python (pyarrow)
from pyarrow.flight import FlightClient, FlightDescriptor
client = FlightClient("grpc://localhost:8081")
# Run a SQL query
info = client.get_flight_info(
FlightDescriptor.for_command(b"SELECT id, title, year FROM patents.public.patents WHERE year > 2020")
)
reader = client.do_get(info.endpoints[0].ticket)
table = reader.read_all()
print(table.to_pandas())
Query embedding tables
Embedding tables are registered in DataFusion and queryable via SQL:
# List all embedding tables
info = client.get_flight_info(
FlightDescriptor.for_command(b"SELECT table_name FROM information_schema.tables WHERE table_schema = 'jammi'")
)
# Query vectors directly
info = client.get_flight_info(
FlightDescriptor.for_command(b"SELECT _row_id, _model_id FROM \"jammi.patents__embedding__all-MiniLM-L6-v2__20260325\" LIMIT 10")
)
JDBC
Flight SQL is compatible with JDBC drivers that support the Arrow Flight SQL protocol, enabling access from Java applications, BI tools (Superset, DBeaver, Tableau), and SQL editors.
Server configuration
[server]
flight_listen = "0.0.0.0:8081"
preload_models = ["sentence-transformers/all-MiniLM-L6-v2"]
[logging]
level = "info"
format = "json" # structured logging for production
Preloading models
Models listed in preload_models are downloaded and loaded into memory at startup. This ensures the session is warm before the server accepts connections.
[server]
preload_models = [
"sentence-transformers/all-MiniLM-L6-v2",
"BAAI/bge-small-en-v1.5",
]
GPU configuration
For GPU-accelerated inference in production:
[gpu]
device = 0 # CUDA device index
memory_limit = "auto"
memory_fraction = 0.9
[inference]
batch_size = 64
max_loaded_models = 3
Set gpu.device = -1 for CPU-only deployment.
Environment variable overrides
Every config field can be overridden with environment variables, useful for containerized deployments:
JAMMI_SERVER__FLIGHT_LISTEN=0.0.0.0:9081 \
JAMMI_GPU__DEVICE=-1 \
JAMMI_LOGGING__FORMAT=json \
jammi serve
Health, readiness, and metrics
The server exposes three HTTP side-channel endpoints on port 8080:
curl http://localhost:8080/healthz
# {"status":"ok","version":"0.8.0"}
curl http://localhost:8080/readyz
# {"status":"ready"}
curl http://localhost:8080/metrics
# jammi_grpc_requests_total 0
# jammi_flight_queries_total 0
# jammi_eval_invocations_total 0
# jammi_search_latency_seconds_bucket{...} 0
/healthz is a liveness probe — a 200 means the process is running.
/readyz is a readiness probe — 200 means the catalog backend
responded; 503 means it didn’t and traffic should be drained from
this instance. Point your load balancer at /readyz.
/metrics exposes a small, substrate-level set of Prometheus counters
(gRPC requests, Flight SQL queries, eval invocations) plus a search-
latency histogram. Wider observability lives in the commercial server.
What the server can and cannot do
| Operation | Available over Flight SQL? |
|---|---|
| SQL queries on source tables | Yes |
| SQL queries on embedding tables | Yes |
| Joins, aggregations, filters | Yes |
| Generate embeddings | No — use library or CLI |
| Semantic vector search | No — use library or CLI |
| Fine-tuning | No — use library or CLI |
| Evaluation | No — use library or CLI |
The server is a query interface. ML operations (embeddings, search, fine-tuning) are done through the Rust library, Python package, or CLI — then the results are queryable over Flight SQL.
Graceful shutdown
The server drains active connections on SIGTERM / Ctrl+C before exiting. In-flight queries complete; long-running operations started via the library are unaffected.
Deploying as a container
The OSS server ships as a public Docker image at ghcr.io/f-inverse/jammi-ai-server. The image is built from a distroless base, runs as the nonroot user (uid 65532), and exposes the same 8080 / 8081 ports the local binary listens on.
docker run --rm \
-p 8080:8080 -p 8081:8081 \
-v jammi_data:/var/lib/jammi \
-v $(pwd)/jammi.toml:/etc/jammi/jammi.toml:ro \
ghcr.io/f-inverse/jammi-ai-server:latest
A minimal compose file lives in the workspace at examples/docker-compose/oss-server.yml:
cd examples/docker-compose
docker compose -f oss-server.yml up
Persistence
/var/lib/jammi holds the catalog DB, model weights, and indices. The Dockerfile declares it as a VOLUME — bind mounts work, but the host directory must be writable by uid 65532:
# Bind mount on the host.
sudo chown -R 65532:65532 /opt/jammi/data
docker run -v /opt/jammi/data:/var/lib/jammi ...
A named Docker volume (the compose default) sidesteps that step because Docker provisions ownership for the container’s user automatically.
Configuration
The container’s entrypoint expects /etc/jammi/jammi.toml. Bind-mount your config there:
# oss-server.yml
services:
jammi-server:
image: ghcr.io/f-inverse/jammi-ai-server:latest
volumes:
- ./jammi.toml:/etc/jammi/jammi.toml:ro
- jammi_data:/var/lib/jammi
ports:
- "8080:8080"
- "8081:8081"
Building from source
The Dockerfile lives at the workspace root and uses BuildKit cache mounts for the cargo registry and target directory:
DOCKER_BUILDKIT=1 docker build -t jammi-ai-server:dev -f Dockerfile .
Cold builds take ~30 minutes (the workspace is large); warm builds with cache hits land at ~3 minutes.
Monitor Inference
Attach an observer to inspect every output batch during inference. Use this for logging, metrics collection, quality checks, or progress tracking.
Attach an observer
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate arrow;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::config::JammiConfig;
async fn ex(config: JammiConfig) -> jammi_db::error::Result<()> {
use jammi_ai::inference::observer::InferenceObserver;
use std::sync::Arc;
struct MetricsCollector;
impl InferenceObserver for MetricsCollector {
fn on_batch(
&self,
batch: &arrow::record_batch::RecordBatch,
model_id: &str,
latency: std::time::Duration,
) {
println!(
"Batch: {} rows from {model_id} in {latency:?}",
batch.num_rows()
);
}
}
let session = InferenceSession::with_observer(
config,
Some(Arc::new(MetricsCollector) as Arc<dyn InferenceObserver>),
).await?;
Ok(()) }
}
The observer is called once per output batch. When no observer is attached, the overhead is a single Option branch — effectively zero.
Use cases
Progress logging
#![allow(unused)]
fn main() {
extern crate jammi_ai;
extern crate arrow;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use arrow::record_batch::RecordBatch;
use jammi_ai::inference::observer::InferenceObserver;
struct ProgressLogger { total: AtomicUsize }
impl InferenceObserver for ProgressLogger {
fn on_batch(&self, batch: &RecordBatch, _model_id: &str, _latency: Duration) {
let count = self.total.fetch_add(batch.num_rows(), Ordering::Relaxed) + batch.num_rows();
eprintln!("Processed {count} rows...");
}
}
}
Quality checks
#![allow(unused)]
fn main() {
extern crate jammi_ai;
extern crate arrow;
use std::time::Duration;
use arrow::array::StringArray;
use arrow::record_batch::RecordBatch;
use jammi_ai::inference::observer::InferenceObserver;
struct QualityChecker;
impl InferenceObserver for QualityChecker {
fn on_batch(&self, batch: &RecordBatch, model_id: &str, latency: Duration) {
// Check for high error rates
let status = batch.column_by_name("_status").unwrap();
let errors = status.as_any().downcast_ref::<StringArray>().unwrap()
.iter().filter(|s| s == &Some("error")).count();
if errors > batch.num_rows() / 2 {
eprintln!("WARNING: {model_id} batch has {errors}/{} errors", batch.num_rows());
}
}
}
}
Latency tracking
#![allow(unused)]
fn main() {
extern crate jammi_ai;
extern crate arrow;
use std::time::Duration;
use arrow::record_batch::RecordBatch;
use jammi_ai::inference::observer::InferenceObserver;
struct LatencyTracker { slow_threshold: Duration }
impl InferenceObserver for LatencyTracker {
fn on_batch(&self, batch: &RecordBatch, model_id: &str, latency: Duration) {
if latency > self.slow_threshold {
eprintln!(
"SLOW: {model_id} took {latency:?} for {} rows ({:?}/row)",
batch.num_rows(),
latency / batch.num_rows() as u32,
);
}
}
}
}
Pipeline architecture
Source (Parquet/CSV/DB)
|
v DataFusion scan
|
InferenceExec operator
|-- Loads model (or cache hit)
|-- Bounded channel (capacity=2, backpressure)
|-- InferenceRunner (async task)
| |-- Reads input batches
| |-- Extracts text from content columns
| |-- Tokenizes with model's tokenizer
| |-- BERT forward pass
| |-- Mean pooling + L2 normalization
| |-- Constructs prefix + vector columns
| |-- ** Observer called here **
| '-- Sends to output channel
|
v RecordBatch stream
|
Results
Model caching
Models are loaded once and cached with LRU eviction:
- First load: downloads from HF Hub (or reads from local path), loads weights into memory
- Subsequent calls: cache hit, returns immediately
- Ref counting: model stays in memory while any inference is running
- Eviction: when the LRU limit is reached, the least-recently-used model with no active references is evicted
Configuration
Jammi loads configuration from three sources, in priority order:
- Config file (TOML) — explicit path,
$JAMMI_CONFIGenv var,./jammi.toml, or~/.config/jammi/config.toml - Environment variables —
JAMMI_GPU__DEVICE=0,JAMMI_INFERENCE__BATCH_SIZE=64 - Defaults — sensible defaults for all fields
#![allow(unused)]
fn main() {
extern crate jammi_db;
use std::path::Path;
use jammi_db::config::JammiConfig;
fn ex() -> jammi_db::error::Result<()> {
// Load with defaults
let config = JammiConfig::load(None)?;
// Load from a specific file
let config = JammiConfig::load(Some(Path::new("/path/to/jammi.toml")))?;
Ok(()) }
}
Full reference
# Where Jammi stores artifacts (catalog DB, model cache, embeddings)
# Default: platform-specific data directory (~/.local/share/jammi on Linux)
artifact_dir = "/path/to/artifacts"
[engine]
# Number of DataFusion execution threads. Default: number of CPUs.
execution_threads = 8
# Memory limit for the query engine. Default: "75%".
memory_limit = "75%"
# Maximum rows per DataFusion batch. Default: 8192.
batch_size = 8192
[gpu]
# GPU device index. -1 for CPU only. Default: 0.
device = -1
# GPU memory limit. Default: "auto".
memory_limit = "auto"
# Fraction of GPU memory Jammi may use. Default: 0.9.
memory_fraction = 0.9
[inference]
# Default backend selection strategy. Default: "auto".
default_backend = "auto"
# Maximum rows per inference batch. Default: 32.
batch_size = 32
# Timeout for batch accumulation in server mode (seconds). Default: 300.
batch_timeout_secs = 300
# Maximum models kept loaded simultaneously. 0 = unlimited. Default: 0.
max_loaded_models = 0
[inference.http]
# HTTP request timeout (seconds). Default: 60.
timeout_secs = 60
# Custom headers for HTTP model endpoints.
[inference.http.headers]
# Authorization = "Bearer sk-..."
[embedding]
# Distance metric for vector indices. Default: "cosine".
default_distance_metric = "cosine"
# Index type for vector storage. Default: "ivf_hnsw_sq".
default_index_type = "ivf_hnsw_sq"
# Rows between embedding index checkpoints. Default: 1000.
checkpoint_interval = 1000
[fine_tuning]
# LoRA rank for fine-tuning. Default: 8.
default_lora_rank = 8
# Learning rate. Default: 0.0002.
default_learning_rate = 0.0002
# Training epochs. Default: 3.
default_epochs = 3
# Training batch size. Default: 8.
default_batch_size = 8
# Checkpoint every N fraction of training. Default: 0.1.
checkpoint_fraction = 0.1
[cache]
# Enable ANN query cache. Default: true.
ann_cache_enabled = true
# Max cached ANN queries. Default: 10000.
ann_cache_max_entries = 10000
# Enable embedding cache. Default: true.
embedding_cache_enabled = true
# Embedding cache size. Default: "1GB".
embedding_cache_size = "1GB"
[server]
# Health probe listen address. Default: "0.0.0.0:8080".
health_listen = "0.0.0.0:8080"
# Arrow Flight SQL listen address. Default: "0.0.0.0:8081".
flight_listen = "0.0.0.0:8081"
# Models to preload on server start. Default: [].
preload_models = ["sentence-transformers/all-MiniLM-L6-v2"]
[logging]
# Log level: "trace", "debug", "info", "warn", "error". Default: "info".
level = "info"
# Log format: "text" or "json". Default: "text".
format = "text"
Environment variable overrides
Every config field can be overridden with an environment variable using the pattern JAMMI_<SECTION>__<FIELD>:
| Variable | Overrides |
|---|---|
JAMMI_ARTIFACT_DIR | artifact_dir |
JAMMI_ENGINE__BATCH_SIZE | engine.batch_size |
JAMMI_GPU__DEVICE | gpu.device |
JAMMI_INFERENCE__BATCH_SIZE | inference.batch_size |
JAMMI_LOGGING__LEVEL | logging.level |
Note the double underscore (__) separating section and field.
Catalog Backend and Trigger Broker
Coordinator to relocate to the docs site (C3) when scaffold lands.
Jammi’s catalog (models, sources, eval runs, mutable companion tables) and
trigger broker (provenance channels, evidence streams) are selected through
two fields on JammiConfig: catalog and broker. The dev-laptop default
is SQLite + an in-process broker; production deployments swap one or both
for Postgres + JetStream.
TOML schema
The catalog stanza is a tagged enum keyed by kind:
[catalog]
kind = "sqlite"
# path = "/var/lib/jammi/catalog.db" # optional; defaults to {artifact_dir}/catalog.db
[catalog]
kind = "postgres"
url = "postgres://user:pass@host:5432/jammi"
pool_size = 16
max_lifetime_secs = 1800
The broker stanza follows the same shape:
[broker]
kind = "in_memory"
[broker]
kind = "jet_stream"
url = "nats://nats.svc:4222"
retention_seconds = 604800
credentials_path = "/var/run/secrets/nats.creds"
broker.kind = "jet_stream" requires the jetstream-broker cargo feature
on jammi-db; selecting it without the feature returns
JammiError::Config rather than panicking at session construction time.
Environment variable interpolation
JammiConfig::load substitutes ${NAME} patterns from the process
environment before TOML parsing. The rules:
${NAME}is replaced by the value ofstd::env::var("NAME").- A missing variable is an error. The loader never silently substitutes an empty string — that is a common source of “deployed config has an empty Postgres URL” outages.
$$escapes a literal$.- A bare
$not followed by$or{is preserved verbatim, so passwords containing a single$slip through unchanged. - An unterminated
${returnsJammiError::Config. - Interpolation is one-pass and not recursive:
${X}’s value is not re-scanned.
Combined with the tagged-enum shape:
artifact_dir = "/var/lib/jammi"
[catalog]
kind = "postgres"
url = "${POSTGRES_URL}"
pool_size = 16
max_lifetime_secs = 1800
[broker]
kind = "jet_stream"
url = "nats://${NATS_HOST}:4222"
retention_seconds = 604800
credentials_path = "/var/run/secrets/nats.creds"
A working copy of this file ships at
crates/jammi-db/examples/sample-postgres.toml.
SQLite vs Postgres trade-offs
| Concern | SQLite | Postgres |
|---|---|---|
| Operational footprint | One file under artifact_dir. No daemon. | Externally-managed Postgres cluster. |
| Concurrent writers | One; WAL mode lets many readers run alongside one writer. | Many. |
| Multi-process deployment | Single-process only — sharing the file across jammi-server replicas corrupts WAL. | Multi-replica safe. |
| Failure recovery | File restore from backup. | Standard Postgres point-in-time-recovery. |
| Pool tuning | None — opens one pool of 8 connections. | pool_size + max_lifetime_secs honour sqlx::PgPool knobs. |
For laptop / single-tenant deployments, SQLite is the right answer; the
trade-off table tilts to Postgres the moment a second jammi-server
replica enters the picture.
In-memory vs JetStream broker
| Concern | InMemory | JetStream |
|---|---|---|
| Persistence | In-process only; lost on restart. | NATS server retains streams per retention_seconds. |
| Cross-process delivery | None — a publish in process A is invisible to a subscriber in process B. | All subscribers (any process, any host) see every published batch within the retention window. |
| Auth | None. | Anonymous or NATS .creds file via credentials_path. |
| Operational footprint | None. | One NATS server (or cluster). |
In-memory is fine for tests, local development, and single-process server
deployments where every consumer lives in the same jammi-server process.
JetStream is required for any deployment that wants replay across
restarts or fan-out across multiple jammi-server replicas.
Health probe
CatalogBackend::ping runs SELECT 1 against the underlying pool and
classifies pool failures as BackendError::Unavailable. The
/readyz endpoint on jammi-server (when wired) reaches this via
session.catalog().ping().await. The primitive is cheap — microseconds
against a warm pool — and never opens a transaction.
Architecture
Crate dependency graph
jammi-db (foundation)
|
|-- Config, Catalog, Sources, SQL execution
|-- Parquet storage, ANN indexes, crash recovery
|
v
jammi-ai (intelligence)
|
|-- Model resolution, loading, caching
|-- InferenceExec, AnnSearchExec operators
|-- Embedding pipeline, result persistence
|-- SearchBuilder, evidence provenance
|-- Fine-tuning, evaluation
|-- GPU scheduling
|-- InferenceSession (wraps JammiSession)
|
+-------+-------+
| |
v v
jammi-server jammi-python
| |
|-- Flight SQL |-- PyO3 bindings
'-- Health API '-- pyarrow interop
jammi-cli
|
'-- Clap CLI wrapping InferenceSession
jammi-db has no dependency on jammi-ai. The intelligence layer is an optional addition — you can use jammi-db standalone for SQL queries over local data.
Key types and their responsibilities
Engine layer (jammi-db)
| Type | Responsibility |
|---|---|
JammiConfig | TOML + env config loading with defaults |
Catalog | SQLite-backed persistence for sources, models, result tables, eval runs, evidence channels |
JammiSession | DataFusion session + source registration + SQL execution |
SourceCatalog / JammiSchemaProvider | DataFusion catalog integration |
ResultStore | Parquet storage coordinator: create, finalize, recover, register |
ParquetResultWriter | ZSTD-compressed Parquet file writer (64K row groups) |
VectorIndex / SidecarIndex | ANN index trait + USearch implementation with row_id mapping |
AI layer (jammi-ai)
| Type | Responsibility |
|---|---|
InferenceSession | Wraps JammiSession + ModelCache + ResultStore. Entry point for all operations |
ModelResolver | Resolves model ID to file paths + backend. Chain: catalog -> local -> HF Hub |
ModelCache | LRU cache with single-flight loading, ref-counted guards |
CandleBackend / OrtBackend | Model backends: Candle (safetensors, BERT + ModernBERT + DistilBERT + OpenCLIP ViT), ONNX Runtime |
HttpBackend | Remote backend: HTTP endpoint for embeddings |
InferenceExec | DataFusion ExecutionPlan operator for inference with backpressure |
AnnSearchExec | DataFusion ExecutionPlan leaf node for ANN vector search |
EmbeddingPipeline | Orchestrates embedding generation (text or image): model -> InferenceExec -> ResultSink -> index. Parameterized by ModelTask |
ResultSink | Streams inference output to Parquet + sidecar index, filters failed rows |
SearchBuilder | Fluent API: join, annotate, filter, sort, limit, select, run |
EvidenceRow / RowProvenance | Evidence model types for provenance tracking |
OutputAdapter | Trait that converts raw model output to Arrow arrays per task |
GpuScheduler | GPU memory permit system with budget-based admission control |
FineTuneJob | LoRA fine-tuning with contrastive loss, checkpointing, early stopping |
EvalRunner | Retrieval and classification evaluation |
Server layer (jammi-server)
| Type | Responsibility |
|---|---|
AppState | Shared state: Arc<InferenceSession> + ANN cache |
FlightSqlService | Arrow Flight SQL server backed by DataFusion |
| Health endpoint | HTTP /health for container liveness probes |
Python layer (jammi-python)
| Type | Responsibility |
|---|---|
Database | PyO3 class wrapping Arc<InferenceSession> with shared tokio runtime |
SearchBuilder | PyO3 class with imperative-style search composition |
FineTuneJob | PyO3 class for monitoring fine-tuning jobs |
connect() | Module-level function to create a Database |
Data flow
SQL query path
JammiSession::sql("SELECT ...")
-> DataFusion parses SQL
-> Resolves table from SourceCatalog/JammiSchemaProvider
-> Creates ListingTable scan from Parquet/CSV/JSON or federated source
-> Executes plan
-> Returns Vec<RecordBatch>
Embedding generation path (text and image)
InferenceSession::generate_text_embeddings(source, model, columns, key)
InferenceSession::generate_image_embeddings(source, model, image_column, key)
-> EmbeddingPipeline::run(task = TextEmbedding | ImageEmbedding)
-> Register result_table (status = "building")
-> Build plan: SourceScan -> InferenceExec(task)
-> InferenceExec dispatches to CandleModel::forward(content, task):
| TextEmbedding: arrow_to_texts -> tokenize -> BERT/ModernBERT -> mean_pool -> L2_normalize
| ImageEmbedding: arrow_to_images -> preprocess (model-driven) -> ViT forward -> L2_normalize
-> Stream batches through ResultSink
| |-- Filter _status = "ok"
| |-- Transform to embedding schema
| |-- Write to Parquet via ParquetResultWriter
| '-- Feed vectors to SidecarIndex::add()
-> Close writer, build ANN index, save sidecar bundle
-> Register as DataFusion table, update catalog to "ready"
-> Return ResultTableRecord
Vector search path
InferenceSession::search(source, query_vec, k)
-> Resolve embedding table from catalog
-> AnnSearchExec: SidecarIndex (ANN) or exact_vector_search (fallback)
-> Hydration: join ANN results back to source table
-> SearchBuilder: .join() .annotate() .filter() .sort() .limit() .select()
-> .run(): execute DataFusion plan, add provenance columns
-> Returns Vec<RecordBatch> with similarity + original columns + evidence
Module layout
crates/jammi-db/src/
|-- config.rs # Configuration loading
|-- error.rs # Unified error type
|-- session.rs # JammiSession (DataFusion wrapper)
|-- catalog/ # SQLite-backed catalog
|-- source/ # Source types, registry, schema provider
|-- store/ # ResultStore, Parquet writer/reader
'-- index/ # VectorIndex trait, sidecar, exact search
crates/jammi-ai/src/
|-- session.rs # InferenceSession
|-- model/ # ModelResolver, ModelCache, backends
|-- operator/ # InferenceExec, AnnSearchExec
|-- inference/ # Runner, observer, output adapters, image preprocessing
|-- pipeline/ # EmbeddingPipeline, ResultSink
|-- evidence/ # Provenance types and columns
|-- search/ # SearchBuilder
|-- fine_tune/ # LoRA training, config, jobs
|-- eval/ # Retrieval and classification eval
'-- concurrency/ # GpuScheduler, permits
crates/jammi-server/src/
|-- lib.rs # Health server startup, signal handling
|-- routes/health.rs # GET /health liveness probe
|-- error.rs # 404 fallback
'-- flight.rs # Arrow Flight SQL service
crates/jammi-cli/src/
|-- main.rs # Clap CLI entry point
'-- commands/ # serve, query, sources, models, explain
crates/jammi-python/src/
|-- lib.rs # PyO3 module, connect()
|-- database.rs # Database class
|-- search.rs # SearchBuilder class
|-- job.rs # FineTuneJob class
|-- convert.rs # Arrow <-> PyArrow conversion
'-- error.rs # Error conversion