Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Jammi AI

Jammi is an embeddable AI engine that brings model inference into your data pipeline. Register data sources, run SQL queries, generate embeddings, search with vector similarity, fine-tune models on your domain, and evaluate results — all without leaving your application.

What Jammi does

  • Query local data with SQL — register Parquet, CSV, and JSON files, run full SQL via DataFusion
  • Federate external databases — query PostgreSQL and MySQL alongside local files
  • Generate embeddings — load any BERT-family model from HuggingFace Hub (or local safetensors / ONNX), persist results to Parquet with sidecar ANN indexes
  • Vector search — ANN similarity search over embedding tables with automatic fallback to brute-force
  • Search builder — fluent API for .join(), .annotate(), .filter(), .sort(), .limit(), .select(), .run()
  • Evidence provenanceretrieved_by and annotated_by tracking on every search result
  • Fine-tuning — LoRA adapters with contrastive loss to improve embeddings for your domain
  • Evaluation — retrieval metrics (recall@k, precision@k, MRR, nDCG), classification (accuracy, F1), and A/B model comparison
  • Per-row error handling — null or invalid text produces error status per row, not a batch failure
  • Model caching — LRU eviction, ref-counted guards, single-flight loading
  • GPU scheduling — memory-budget admission control with RAII permits
  • Crash recovery — on restart, recovers result tables stuck in “building” state
  • Inference observability — attach observers to hook into every output batch

Three ways to use Jammi

InterfaceBest forInstall
Rust libraryEmbedding Jammi into Rust applicationscargo add jammi-ai
Python packageData science, notebooks, scriptspip install jammi-ai
CLIShell workflows, quick queries, opscargo install jammi-cli

All three interfaces share the same engine, configuration, and storage format. Embeddings generated from Python are queryable from the CLI, and vice versa.

For multi-language access or BI tool integration, jammi serve starts an Arrow Flight SQL server — any Arrow client can connect and query via standard SQL.

Crates

CratePurpose
jammi-dbQuery engine, configuration, catalog, source management, Parquet storage, ANN indexes
jammi-aiModel loading, inference execution, embedding pipeline, vector search, evidence model, fine-tuning, evaluation
jammi-serverArrow Flight SQL server and HTTP health endpoint
jammi-cliCommand-line interface
jammi-pythonPython bindings via PyO3

jammi-db has no dependency on jammi-ai. You can use it standalone for SQL queries over local data without pulling in the AI layer.

Installation

Rust

Add Jammi to your Cargo.toml:

[dependencies]
jammi-db = "0.1"
jammi-ai = "0.1"
tokio = { version = "1", features = ["full"] }

Or install the CLI:

cargo install jammi-cli

Build dependencies (Linux)

If building from source, you need a C compiler and protoc:

# Debian/Ubuntu
apt-get install protobuf-compiler gcc g++ pkg-config

# RHEL/AlmaLinux
yum install protobuf-compiler gcc gcc-c++ pkg-config

All other native libraries (lzma, zstd, zlib, sqlite) are vendored and compiled from source automatically. These tools are pre-installed in the devcontainer and CI images.

Python

pip install jammi-ai

Requires Python 3.8+. Pre-built wheels are available for Linux, macOS, and Windows.

From source

git clone https://github.com/f-inverse/jammi-ai.git
cd jammi-ai
cargo build --release

The CLI binary is at target/release/jammi.

For the Python package from source:

pip install maturin
maturin develop --release

Runtime requirements

Jammi has no mandatory runtime dependencies beyond the binary itself.

Optional:

  • CUDA toolkit + cuDNN for GPU inference (CPU works out of the box)
  • HuggingFace Hub access for downloading models (first run downloads ~90MB for MiniLM, cached thereafter)
  • PostgreSQL / MySQL client libraries if using federated database sources

Set HF_TOKEN for gated models, or HF_HOME to control the cache location.

Quickstart: Rust

This walkthrough registers a local data file, runs a SQL query, generates embeddings, and performs a semantic search — all in one program.

Full example

extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::config::JammiConfig;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = JammiConfig::load(None)?;
    let session = Arc::new(InferenceSession::new(config).await?);

    // 1. Register a data source
    session.add_source("patents", SourceType::File, SourceConnection {
        url: Some("file:///path/to/patents.parquet".into()),
        format: Some(FileFormat::Parquet),
        ..Default::default()
    }).await?;

    // 2. Query with SQL
    let rows = session.sql(
        "SELECT id, title, year FROM patents.public.patents WHERE year > 2020 LIMIT 5"
    ).await?;
    for batch in &rows {
        println!("{batch:?}");
    }

    // 3. Generate embeddings
    let record = session.generate_text_embeddings(
        "patents",
        "sentence-transformers/all-MiniLM-L6-v2",
        &["title".to_string()],
        "id",
    ).await?;
    println!("Embedded {} rows", record.row_count);

    // 4. Semantic search
    let query = session.encode_text_query(
        "sentence-transformers/all-MiniLM-L6-v2",
        "quantum computing applications",
    ).await?;

    let results = session.search("patents", query, 5).await?
        .sort("similarity", true)?
        .run().await?;

    for batch in &results {
        println!("{batch:?}");
    }

    Ok(())
}

The first run downloads the model from HuggingFace Hub (~90MB). Subsequent runs load from cache.

What’s happening

  1. JammiConfig::load(None) loads config from jammi.toml, $JAMMI_CONFIG, or defaults
  2. InferenceSession wraps the query engine with model loading, caching, and GPU scheduling
  3. add_source registers a file in the catalog — it survives session restarts
  4. sql runs any SQL query via DataFusion, returns Vec<RecordBatch>
  5. generate_text_embeddings runs the model over every row, persists vectors to Parquet with a sidecar ANN index
  6. encode_text_query encodes a text string into the same vector space
  7. search finds the nearest neighbors, hydrates all source columns, and returns results with similarity scores

Next steps

Quickstart: Python

The full quickstart — install, connect, register, search — lives in the repo’s cookbook tree under cookbook/quickstart/ with a runnable quickstart.py that’s exercised end-to-end on every PR by tests/cookbook_smoke.py. This page mirrors the cookbook’s overview so the mdBook site renders a self-contained quickstart; the cookbook is the source of truth.

Goal: a fresh user goes from pip install jammi-ai to a successful vector query in five minutes. The end-to-end script lives next to this file in quickstart.py — copy-paste it, run it, then read the four step-by-step pages for the explanation.

Steps

  1. Installpip install jammi-ai
  2. Connect — open a session against a local artifact dir
  3. Register a source — attach a Parquet file
  4. Generate embeddings + search — build a vector index and run a similarity query

Run it

python cookbook/quickstart/quickstart.py

Expected output: a header row and three top-3 matches with cosine similarity scores. The script exits 0 in under 30 seconds on CPU.

Production substitution

The script uses the local cookbook/fixtures/tiny_bert/ model (32-dim, 88 KB, single-layer) so the example needs no network access. In a real workload you would swap in a Hub model — for example sentence-transformers/all-MiniLM-L6-v2 (384-dim, English) — by changing the MODEL constant. Everything else stays the same.

Quickstart: CLI

The jammi CLI lets you manage sources, run queries, and start the server from your terminal.

Register a source and query it

# Register a local Parquet file
jammi sources add patents --path /path/to/patents.parquet --format parquet

# List registered sources
jammi sources list

# Run a SQL query
jammi query "SELECT id, title, year FROM patents.public.patents WHERE year > 2020 LIMIT 5"

# Show the execution plan
jammi explain "SELECT * FROM patents.public.patents WHERE year > 2020"

Start the server

# Start Flight SQL (port 8081) + health probe (port 8080)
jammi serve

# With a custom config
jammi --config jammi.toml serve

Once the server is running, any Arrow Flight SQL client can connect on port 8081 to query your data.

Available commands

CommandDescription
jammi sources listList registered data sources
jammi sources add <NAME> --path <PATH> --format <FMT>Register a local file
jammi models listList registered models
jammi query "<SQL>"Run a SQL query and print results
jammi explain "<SQL>"Show the execution plan for a query
jammi serveStart the Flight SQL server + health probe

Global options

jammi --config <PATH> <command>    # Use a specific config file

Without --config, Jammi looks for configuration in this order:

  1. $JAMMI_CONFIG environment variable
  2. ./jammi.toml in the current directory
  3. ~/.config/jammi/config.toml
  4. Built-in defaults

Next steps

Cookbook Recipes (runnable)

Every recipe under cookbook/ ships as a runnable example.py next to a markdown README and is wired into CI via tests/cookbook_smoke.py — a broken recipe blocks the merge. The cookbook is the OSS source of truth; this page mirrors each README below.

The recipes shipped at MVP:

RecipeDemonstrates
mutable_tablesCreate/insert/select/drop on a mutable companion table
trigger_streamsPublish + subscribe on a topic via the in-process broker
eval_embeddingsrecall@k, MRR, nDCG against a golden set
eval_inferenceAccuracy + macro F1 against gold labels
fine_tuneLoRA fine-tune end-to-end
flight_sqlQuery a remote jammi serve over Arrow Flight SQL

Mutable tables

End-to-end create / insert / select / drop on a Jammi mutable table — the OSS primitive for state that needs to live alongside read-only result tables.

When to use this pattern. You need a writable table that sits in the same SQL catalog as your registered sources and embedding tables — for caching enriched rows, holding cursor state, recording user feedback, or any “small table I want to UPDATE / DELETE / INSERT from SQL” workload — without standing up an external Postgres.

What example.py does

  1. Connects to a temporary artifact dir
  2. Creates a notes mutable table with an int64 primary key + utf8 body column
  3. Inserts three rows through DataFusion DML (INSERT INTO ...)
  4. Verifies count and ordering via SELECT
  5. Drops the table, then asserts a SELECT after the drop raises
  6. Demonstrates the idempotent drop_mutable_table(..., if_exists=True)

API surface exercised

  • Database.create_mutable_table(name, *, schema, primary_key, ...)
  • Database.sql("INSERT INTO mutable.public.<name> ...")
  • Database.sql("SELECT ... FROM mutable.public.<name>")
  • Database.drop_mutable_table(name, *, if_exists=False)

The DataFusion namespace for mutable tables is always mutable.public.<name> — distinct from registered sources, which live under <source>.public.<source>.

Run it

python cookbook/recipes/mutable_tables/example.py

Exits 0 on success, prints mutable_tables: OK on the last line.


Trigger streams

End-to-end publish + subscribe on a Jammi topic, plus the registration and listing surface. Uses the embedded in-process broker — no NATS or external broker needed.

When to use this pattern. You need a low-friction event bus inside your application — for fan-out to downstream consumers, fan-in from batch jobs, or replay-from-offset semantics — without bringing up Kafka or NATS in dev/test. The same surface scales out to NATS JetStream by flipping a config flag at deploy time.

What example.py does

  1. Connects to a temporary artifact dir
  2. Registers a topic events.demo with a typed schema and broker metadata
  3. Confirms list_topics() returns the new topic
  4. Publishes a 3-row batch through publish_topic — captures the broker-assigned offset
  5. Subscribes from from_offset=0 and round-trips the same rows back
  6. Drops the topic, confirms it’s gone from list_topics()
  7. Demonstrates idempotent drop_topic(..., if_exists=True) and strict-mode failure when dropping a missing topic

API surface exercised

  • Database.register_topic(name, *, schema, broker_metadata=None)
  • Database.list_topics()
  • Database.publish_topic(name, *, batch) — returns the assigned offset
  • Database.subscribe_collect(name, *, from_offset, max_batches)
  • Database.drop_topic(name, *, if_exists=False)

The subscribe_collect path drives the replay-from-backing-table flow when from_offset=0; the live-tail flow is exercised in the broker integration suite.

Run it

python cookbook/recipes/trigger_streams/example.py

Exits 0 on success, prints trigger_streams: OK on the last line.


Evaluate retrieval quality

Measure recall@k, precision@k, MRR, and nDCG of an embedding index against a golden relevance set.

When to use this pattern. You have a corpus and a small set of (query, expected document) judgments, and you need a number that tells you “is my new encoder better than the one I shipped last month?” The same loop powers nightly regression dashboards and A/B model comparison.

What example.py does

  1. Connects to a temporary artifact dir
  2. Registers the tiny corpus as a Parquet source
  3. Builds 32-dim embeddings over the content column with the local tiny_bert fixture
  4. Reads cookbook/fixtures/tiny_golden.json, expands it into the (query_id, query_text, relevant_id) CSV shape eval_embeddings consumes, and registers it as a golden source
  5. Calls db.eval_embeddings(source="corpus", golden_source="golden.public.golden", k=5)
  6. Asserts each aggregate metric is in [0.0, 1.0] and the per-query records carry their golden-set query_id

API surface exercised

  • Database.generate_text_embeddings(source, *, model, columns, key)
  • Database.eval_embeddings(*, source, golden_source, model=None, k=10)

The returned dict carries aggregate (mean across queries — recall_at_k, precision_at_k, mrr, ndcg) and per_query (one entry per query with query_id and a metrics sub-dict of the same four names, un-averaged).

Golden source shape

eval_embeddings requires a registered source with these columns:

columntypeexample
query_idutf8q1
query_textutf8quantum computing applications
relevant_idutf81 (matches corpus.id as a string)

Image queries are supported via a query_image BLOB column instead of query_text; cross-modal eval is out of scope for this recipe.

Run it

python cookbook/recipes/eval_embeddings/example.py

Exits 0 on success, prints the metrics dict + eval_embeddings: OK.


Evaluate inference (classification)

Run a classifier over a registered source and score its predictions against gold labels.

When to use this pattern. You have a labelled holdout set and you want a single number — accuracy, macro F1, per-class F1 — to compare two classifiers, or to track drift over time on the same classifier.

What example.py does

  1. Connects to a temporary artifact dir
  2. Registers the tiny corpus as corpus (parquet)
  3. Registers tiny_labels.csv as golden (csv) — (id, label) rows
  4. Runs db.eval_inference with the local tiny_modernbert_classifier fixture against the content column
  5. Prints the returned aggregate accuracy, macro f1, per-class metrics, and the count of per-record predictions
  6. Asserts every reported rate is in [0.0, 1.0]

API surface exercised

  • Database.eval_inference(*, model, source, columns, task, golden_source, label_column)

The returned dict carries aggregate (tagged by "task" — currently "classification") with accuracy, f1, and per_class, plus per_record (one entry per aligned {record_id, predicted, gold}).

The task argument is the string form of the inference task — "classification" here. NER is recognized but not yet supported via this entrypoint (see the runner’s EvalTask::Ner branch); for token-level eval, call the jammi-numerics NER kernels directly.

Golden source shape

eval_inference requires a registered source with these columns:

columntypeexample
idutf8"1"
<label_column>utf8physics

label_column is the kwarg you pass at call time — label in this recipe. Every id in the golden source must resolve to a row in the input source; rows without a gold label are silently dropped from the metric.

Run it

python cookbook/recipes/eval_inference/example.py

Exits 0 on success, prints the metrics dict + eval_inference: OK.


Fine-tune an encoder

Run a LoRA fine-tune on top of an existing text encoder, poll the job to completion, and use the resulting checkpoint to encode a query.

When to use this pattern. Your domain (legal contracts, medical abstracts, patent claims, internal product docs) doesn’t match the distribution the base encoder was trained on, and you have a few hundred to a few thousand labelled or contrastive pairs. LoRA gets you ~80% of the lift of a full fine-tune at a fraction of the cost; the resulting adapter is small enough to ship as an attachment to the base model rather than a re-distributed full checkpoint.

What example.py does

  1. Connects to a temporary artifact dir
  2. Registers tiny_pairs.csv (30 contrastive pairs) as training
  3. Calls db.fine_tune(...) with the local tiny_bert base, a small LoRA rank, and one epoch — kept fast for CI
  4. Waits for terminal status via job.wait()
  5. Asserts the resulting model_id starts with jammi:fine-tuned:
  6. Encodes a query through the fine-tuned model to confirm it loads

API surface exercised

  • Database.fine_tune(*, source, base_model, columns, method, task=..., ...)
  • FineTuneJob.wait()
  • FineTuneJob.job_id, FineTuneJob.model_id
  • Database.encode_text_query(model_id, text)

The full keyword list on fine_tune covers LoRA rank/alpha/dropout, learning rate, epochs, batch size, max sequence length, validation fraction, early-stopping patience/metric, warmup, gradient accumulation, backbone dtype, weight decay, and gradient clipping — the recipe uses the defaults for everything except rank and epochs.

Performance note

This recipe is excluded from the per-PR smoke matrix because even at one epoch it runs ~30 seconds on CPU. The nightly cron with JAMMI_COOKBOOK_SLOW=1 includes it. Override the gate locally:

JAMMI_COOKBOOK_SLOW=1 python tests/cookbook_smoke.py

Run it

python cookbook/recipes/fine_tune/example.py

Exits 0 on success, prints job_id, model_id, and fine_tune: OK.


Connect via Flight SQL

Run a query against a remote jammi server over Arrow Flight SQL.

When to use this pattern. You’re connecting from a non-Python client (Tableau, dbt, JDBC tools, Rust binaries), or you want to expose Jammi to multiple readers without each one holding an embedded session. The same protocol is what dbt-flightsql, the official Flight SQL JDBC driver, and BI tools speak natively.

What example.py does

  1. Spawns target/release/jammi serve as a child process pointed at a temp artifact_dir
  2. Polls the health endpoint (http://127.0.0.1:8080/health) until the server is ready (5 s budget)
  3. Opens a pyarrow.flight.FlightClient against grpc://127.0.0.1:8081
  4. Submits SELECT 1 AS one over Flight SQL and confirms the response
  5. Tears down the server process cleanly

This recipe is gated out of the per-PR CI matrix — it depends on the jammi binary being built (cargo build --release -p jammi-cli), and the build cost dominates the test wall-clock. The nightly cookbook job builds the binary and runs the recipe behind JAMMI_COOKBOOK_SLOW=1.

Prerequisites

  • cargo build --release -p jammi-cli — produces target/release/jammi
  • pip install pyarrow (already a jammi-ai dependency)

The script auto-detects JAMMI_BIN (env var) or falls back to the workspace’s target/release/jammi.

API surface exercised

  • pyarrow.flight.FlightClient.execute(query) over the Flight SQL command dialect
  • jammi serve — the OSS deployment-shape binary entrypoint

Run it

cargo build --release -p jammi-cli      # one-time build
python cookbook/recipes/flight_sql/example.py

Exits 0 on success, prints the query result + flight_sql: OK.

Query Your Data with SQL

Register data files as named sources, then query them with full SQL. Sources are persisted in the catalog and survive session restarts.

Register a source

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_db::source::{FileFormat, SourceConnection, SourceType};

session.add_source("patents", SourceType::File, SourceConnection {
    url: Some("file:///data/patents.parquet".into()),
    format: Some(FileFormat::Parquet),
    ..Default::default()
}).await?;
Ok(()) }
}

Python

db.add_source("patents", path="/data/patents.parquet", format="parquet")

CLI

jammi sources add patents --path /data/patents.parquet --format parquet

Supported formats

FormatRustPython/CLINotes
ParquetFileFormat::Parquet"parquet"Columnar, compressed, recommended for large datasets
CSVFileFormat::Csv"csv"Auto-detected schema
JSONFileFormat::Json"json"Line-delimited JSON

Run a SQL query

Sources are accessible via three-part SQL names: <source_id>.public.<table_name>. The table name is derived from the file name (e.g., patents.parquet becomes patents).

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let results = session.sql(
    "SELECT id, title, year FROM patents.public.patents WHERE year > 2020 ORDER BY year"
).await?;

for batch in &results {
    println!("{batch:?}");
}
Ok(()) }
}

Python

table = db.sql("SELECT id, title, year FROM patents.public.patents WHERE year > 2020 ORDER BY year")
print(table.to_pandas())

CLI

jammi query "SELECT id, title, year FROM patents.public.patents WHERE year > 2020 ORDER BY year"

Aggregations

SELECT category, COUNT(*) as count, AVG(citation_count) as avg_citations
FROM patents.public.patents
WHERE year > 2020
GROUP BY category
ORDER BY count DESC

Joins across sources

Register multiple sources and join them in a single query:

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.add_source("companies", SourceType::File, SourceConnection {
    url: Some("file:///data/companies.csv".into()),
    format: Some(FileFormat::Csv),
    ..Default::default()
}).await?;

let results = session.sql("
    SELECT p.title, c.company_name
    FROM patents.public.patents p
    JOIN companies.public.companies c ON p.assignee_id = c.id
").await?;
Ok(()) }
}

Python

db.add_source("companies", path="/data/companies.csv", format="csv")

table = db.sql("""
    SELECT p.title, c.company_name
    FROM patents.public.patents p
    JOIN companies.public.companies c ON p.assignee_id = c.id
""")

Source lifecycle

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
// List registered sources
let sources = session.catalog().list_sources().await?;

// Remove a source
session.remove_source("patents").await?;
Ok(()) }
}

CLI

jammi sources list

Sources persist in the SQLite catalog at <artifact_dir>/catalog.db. Registering the same source ID twice returns an error — remove it first.

Execution plans

Use EXPLAIN (or the CLI explain command) to see how DataFusion will execute your query:

jammi explain "SELECT * FROM patents.public.patents WHERE year > 2020"

Generate Embeddings

Generate vector embeddings by running a model over text columns from a registered source. Results are persisted to Parquet with sidecar ANN indexes for fast similarity search.

Basic usage

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let record = session.generate_text_embeddings(
    "patents",
    "sentence-transformers/all-MiniLM-L6-v2",
    &["abstract".to_string()],
    "id",
).await?;

println!("Embedded {} rows, {} dimensions", record.row_count, record.dimensions.unwrap());
Ok(()) }
}

Python

db.generate_text_embeddings(
    source="patents",
    model="sentence-transformers/all-MiniLM-L6-v2",
    columns=["abstract"],
    key="id",
)

What gets created

Each call creates a timestamped Parquet file plus a sidecar ANN index bundle:

{artifact_dir}/jammi_db/
├── patents__embedding__all-MiniLM-L6-v2__20260325T120000.parquet
├── patents__embedding__all-MiniLM-L6-v2__20260325T120000.usearch
├── patents__embedding__all-MiniLM-L6-v2__20260325T120000.rowmap
└── patents__embedding__all-MiniLM-L6-v2__20260325T120000.manifest.json
  • Parquet file — source of truth. Contains _row_id, _source_id, _model_id, vector. Readable by external tools (DuckDB, Polars, pandas).
  • .usearch — USearch HNSW graph for ANN search.
  • .rowmap — maps internal USearch keys to _row_id strings.
  • .manifest.json — metadata (dimensions, count, metric, backend).

The sidecar files are disposable — deleting them falls back to brute-force exact search. The Parquet file is the only thing that matters.

Embedding table schema

ColumnTypeDescription
_row_idUtf8Key column value cast to string
_source_idUtf8Source identifier
_model_idUtf8Model identifier
vectorFixedSizeList(Float32, N)L2-normalized embedding vector

Failed rows (null or empty text) are excluded — only successfully embedded rows appear in the output.

Multiple text columns

Pass multiple column names to concatenate them (space-separated) before embedding:

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.generate_text_embeddings(
    "papers",
    "sentence-transformers/all-MiniLM-L6-v2",
    &["title".to_string(), "abstract".to_string()],
    "doi",
).await?;
Ok(()) }
}

Python

db.generate_text_embeddings(
    source="papers",
    model="sentence-transformers/all-MiniLM-L6-v2",
    columns=["title", "abstract"],
    key="doi",
)

Multiple embedding tables

Each call creates a new table. Multiple tables can coexist for the same source (different models, different columns):

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.generate_text_embeddings("patents", "all-MiniLM-L6-v2", &["abstract".into()], "id").await?;
session.generate_text_embeddings("patents", "bge-small-en-v1.5", &["title".into()], "id").await?;
Ok(()) }
}

When searching, the latest ready embedding table is used by default.

Supported models

Any encoder model on HuggingFace Hub with safetensors weights. Supported architectures:

BERT family — BERT, RoBERTa, DistilBERT, CamemBERT, XLM-RoBERTa:

  • sentence-transformers/all-MiniLM-L6-v2 (384-dim, fast)
  • sentence-transformers/all-mpnet-base-v2 (768-dim, higher quality)
  • BAAI/bge-small-en-v1.5, BAAI/bge-base-en-v1.5

ModernBERT — modernized encoder with rotary embeddings, 8192-token context, GeGLU:

  • answerdotai/ModernBERT-base (768-dim)
  • answerdotai/ModernBERT-large (1024-dim)

Or any local directory with config.json + model.safetensors + tokenizer.json. The architecture is detected automatically from model_type in config.json.

Use a local model:

#![allow(unused)]
fn main() {
extern crate jammi_ai;
use jammi_ai::model::ModelSource;
let model = ModelSource::local("/path/to/my-model");
}

Raw inference (no persistence)

To get embeddings as RecordBatch without writing to disk:

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::model::{ModelSource, ModelTask};

let model = ModelSource::hf("sentence-transformers/all-MiniLM-L6-v2");
let results = session.infer("patents", &model, ModelTask::TextEmbedding, &["abstract".into()], "id").await?;
Ok(()) }
}

Python

results = db.infer(
    source="patents",
    model="sentence-transformers/all-MiniLM-L6-v2",
    columns=["abstract"],
    task="text_embedding",
    key="id",
)

Each RecordBatch has prefix columns (_row_id, _source, _model, _status, _error, _latency_ms) plus task-specific columns (e.g., vector for embeddings).

Error handling

Inference never panics on bad input. Errors are tracked per-row:

Condition_status_errorvector
Valid text"ok"null384-dim float vector
Null text"error""Empty or null text input"null
Empty text"error""Empty or null text input"null

The batch continues processing even when individual rows fail.

Dynamic batch sizing

The runner starts with the configured inference.batch_size (default: 32). If an out-of-memory error occurs:

  1. Halve the batch size
  2. Retry (up to 3 times)
  3. If OOM persists at batch size 1, mark the row as error and continue

The reduced batch size is sticky for the remainder of the stream.

Crash recovery

If the process dies mid-generation, the table is left in “building” status. On the next session start, recovery runs automatically:

  • Parquet missing — mark as failed
  • Parquet corrupt — delete file, mark as failed
  • Parquet valid but stuck in “building” — promote to “ready”, rebuild ANN index

No data is lost if the Parquet file was fully written.

DataFusion integration

Result tables are automatically registered in DataFusion and queryable via SQL:

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::catalog::result_repo::ResultTableRecord;
async fn ex(session: &InferenceSession, record: &ResultTableRecord) -> jammi_db::error::Result<()> {
let results = session.sql(&format!(
    "SELECT _row_id, _source_id FROM \"jammi.{}\" LIMIT 10",
    record.table_name
)).await?;
Ok(()) }
}

Generate Image Embeddings

Generate vector embeddings from images using an OpenCLIP-compatible vision model. Results are persisted to Parquet with sidecar ANN indexes, identical to text embeddings — the same search(), evaluation, and SQL tools work on both.

The OpenCLIP family is cross-modal: the vision tower and the text tower in the same checkpoint produce embeddings in a shared latent space, so a text query encoded with the same model can search image embeddings directly. See Search Text Against Images (Cross-Modal) for the full text-to-image recipe.

Basic usage

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let record = session.generate_image_embeddings(
    "figures",
    "laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
    "image",       // column containing image data
    "figure_id",   // key column
).await?;

println!("Embedded {} images, {} dimensions", record.row_count, record.dimensions.unwrap());
Ok(()) }
}

Python

db.generate_image_embeddings(
    source="figures",
    model="laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
    image_column="image",
    key="figure_id",
)

Image column format

The image column can be either:

  • Binary — inline image bytes (PNG, JPEG, TIFF) stored directly in Parquet
  • Utf8 — file paths pointing to images on disk

Image preprocessing

Each image is automatically preprocessed before embedding:

  1. Pad to square — white canvas, image centered (preserves aspect ratio)
  2. Resize — bicubic interpolation to the model’s input size (224x224 for CLIP)
  3. Normalize — per-channel normalization using constants from the model’s config

Preprocessing parameters (mean, std, image size) are model-driven — parsed from the model’s config file, not hardcoded.

Encode a single image

To embed one image without persistence (e.g., for a query):

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> Result<(), Box<dyn std::error::Error>> {
let image_bytes = std::fs::read("query.png")?;
let vector = session
    .encode_image_query("laion/CLIP-ViT-B-32-laion2B-s34B-b79K", &image_bytes)
    .await?;
// vector: Vec<f32>, L2-normalized, dimensionality = model's embed_dim
Ok(()) }
}

Python

with open("query.png", "rb") as f:
    image_bytes = f.read()

vector = db.encode_image_query("laion/CLIP-ViT-B-32-laion2B-s34B-b79K", image_bytes)

Supported models

OpenCLIP-compatible models with safetensors weights. The repo must carry:

  • open_clip_config.json with model_cfg.vision_cfg (and model_cfg.text_cfg if you want cross-modal text queries)
  • open_clip_model.safetensors with OpenCLIP weight key naming (visual.* for vision, root-level for text)
  • Either a tokenizer.json or the OpenCLIP-native bpe_simple_vocab_16e6.txt.gz (only required for text-side queries)

The architecture (ViT width, layers, heads, patch size, pooling strategy), the shared latent dimensionality (embed_dim), and the preprocessing config (mean, std, image size) are detected automatically from the config — no per-model code path.

Output schema

Same as text embeddings:

ColumnTypeDescription
_row_idUtf8Key value
_source_idUtf8Source identifier
_model_idUtf8Model identifier
vectorFixedSizeList(Float32, N)L2-normalized embedding vector (N = embed_dim)

Image embeddings work with the same search() API as text embeddings:

vector = db.encode_image_query("laion/CLIP-ViT-B-32-laion2B-s34B-b79K", query_bytes)
results = db.search("figures", query=vector, k=10).run()

All SearchBuilder operations (join, filter, sort, limit, annotate) compose identically.

Error handling

Condition_status_error
Valid image"ok"null
Null image"error""Null or missing image input"
Corrupt image"error""Failed to decode image at row N: ..."

Search Text Against Images (Cross-Modal)

OpenCLIP-family models carry both a vision tower and a text tower in the same checkpoint, with both towers projecting into a shared latent space. That means a text query embedded with the text tower lives in the same vector space as image embeddings produced by the vision tower — vector search against an image corpus accepts a text query directly, no separate text encoder, no projection bridge.

This recipe shows the full path: index images with the vision tower, embed a text query with the text tower, run search().

1. Index the image corpus with the vision tower

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.generate_image_embeddings(
    "figures",
    "laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
    "image",       // column containing image data
    "figure_id",   // key column
).await?;
Ok(()) }
}

Python

db.generate_image_embeddings(
    source="figures",
    model="laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
    image_column="image",
    key="figure_id",
)

2. Embed a text query with the same model’s text tower

encode_text_query dispatches to the OpenCLIP text tower when the model ID resolves to an OpenCLIP checkpoint. The output vector dimensionality matches embed_dim — the same dim the image embeddings carry.

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let query_vec = session
    .encode_text_query(
        "laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
        "a red circle on a white background",
    )
    .await?;
// query_vec: Vec<f32>, L2-normalized, same length as the image embedding vector
Ok(()) }
}

Python

query_vec = db.encode_text_query(
    "laion/CLIP-ViT-B-32-laion2B-s34B-b79K",
    "a red circle on a white background",
)

3. Search image embeddings with the text vector

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
async fn ex(session: Arc<InferenceSession>, query_vec: Vec<f32>) -> jammi_db::error::Result<()> {
let results = session.search("figures", query_vec, 10).await?.run().await?;
Ok(()) }
}

Python

results = db.search("figures", query=query_vec, k=10).run()

The hydrated results carry your source’s columns (figure_id, plus any joined / annotated columns) alongside the similarity score. All SearchBuilder operations — join, filter, sort, limit, annotate — compose identically to text-against-text search.

Why this works

Both towers in an OpenCLIP checkpoint emit vectors of size embed_dim (the shared latent dimensionality declared at the top of open_clip_config.json). The vision tower applies a visual.proj matrix after pooling its patch tokens; the text tower applies a text_projection matrix after pooling at the <|endoftext|> token. The two projections are jointly trained so the cosine similarity between a text vector and an image vector reflects semantic alignment.

If you embed text and images with separate models (e.g. a BERT encoder + a vision model that wasn’t jointly trained with it), the resulting vectors don’t share a latent space and the similarities are meaningless. Cross-modal search only works when both modalities are projected by the same CLIP-style joint training.

Model requirements

Same as Generate Image Embeddings, plus:

  • open_clip_config.json must contain a populated model_cfg.text_cfg (with width, layers, and either heads or a width that is a multiple of 64).
  • The safetensors checkpoint must contain the text-tower keys: token_embedding.weight, positional_embedding, transformer.resblocks.*, ln_final.*, and text_projection.
  • A tokenizer must be available — either an HF-converted tokenizer.json or the OpenCLIP-native bpe_simple_vocab_16e6.txt.gz.

Classify Text

Run a classification model over text columns to assign labels and confidence scores. Any HuggingFace model with id2label in its config works out of the box.

Basic usage

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::model::{ModelSource, ModelTask};

let model = ModelSource::hf("answerdotai/ModernBERT-base-classification");
let results = session.infer(
    "patents",
    &model,
    ModelTask::Classification,
    &["abstract".to_string()],
    "id",
).await?;
Ok(()) }
}

Python

results = db.infer(
    source="patents",
    model="answerdotai/ModernBERT-base-classification",
    columns=["abstract"],
    task="classification",
    key="id",
)

Output schema

Each RecordBatch has prefix columns plus classification-specific columns:

ColumnTypeDescription
_row_idUtf8Key column value
_sourceUtf8Source identifier
_modelUtf8Model identifier
_statusUtf8"ok" or "error"
_errorUtf8 (nullable)Error message if failed
_latency_msFloat32Inference latency
labelUtf8 (nullable)Predicted class label
confidenceFloat32 (nullable)Confidence score (0-1)
all_scores_jsonUtf8 (nullable)JSON with all class scores

Supported model architectures

Classification models must have id2label in their config.json. Supported architectures:

BERT family — BERT, RoBERTa, DistilBERT, CamemBERT, XLM-RoBERTa:

  • Loads classifier.weight + classifier.bias from safetensors
  • CLS token pooling + linear classifier + softmax

ModernBERT — uses the built-in ModernBertForSequenceClassification:

  • CLS or MEAN pooling (configured via classifier_pooling in config)
  • Head (dense + GELU + LayerNorm) + classifier + softmax

Fine-tuning for classification

Train a LoRA adapter with a classification head on your labeled data:

Prepare training data

text,label
"quantum error correction","physics"
"CRISPR gene editing","biology"

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::fine_tune::FineTuneMethod;
use jammi_db::ModelTask;

let job = session.fine_tune(
    "training",
    "sentence-transformers/all-MiniLM-L6-v2",
    &["text".into(), "label".into()],
    FineTuneMethod::Lora,
    ModelTask::Classification,
    None,
).await?;

job.wait().await?;
Ok(()) }
}

Python

job = db.fine_tune(
    source="training",
    base_model="sentence-transformers/all-MiniLM-L6-v2",
    columns=["text", "label"],
    method="lora",
    task="classification",
)
job.wait()

The fine-tuned model trains a LoRA projection plus a linear classification head using cross-entropy loss. Both are saved to adapter.safetensors.

Error handling

Same per-row error tracking as embeddings:

Condition_statuslabelconfidence
Valid text"ok"Predicted label0-1 score
Null/empty text"error"nullnull

Extract Entities (NER)

Run a Named Entity Recognition model over text columns to extract person names, organizations, locations, and other entities. Results are returned as JSON arrays of entity spans with character positions and confidence scores.

Basic usage

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::model::{ModelSource, ModelTask};

let model = ModelSource::hf("dslim/bert-base-NER");
let results = session.infer(
    "patents",
    &model,
    ModelTask::Ner,
    &["abstract".to_string()],
    "id",
).await?;
Ok(()) }
}

Python

results = db.infer(
    source="patents",
    model="dslim/bert-base-NER",
    columns=["abstract"],
    task="ner",
    key="id",
)

Output schema

ColumnTypeDescription
_row_idUtf8Key column value
_sourceUtf8Source identifier
_modelUtf8Model identifier
_statusUtf8"ok" or "error"
_errorUtf8 (nullable)Error message if failed
_latency_msFloat32Inference latency
entitiesUtf8 (nullable)JSON array of entity spans

Entity span format

Each entity in the JSON array has:

{
  "text": "Google",
  "label": "ORG",
  "start": 15,
  "end": 21,
  "confidence": 0.97
}
FieldTypeDescription
textstringThe entity text extracted from the input
labelstringEntity type (PER, ORG, LOC, etc.) without B-/I- prefix
startintegerCharacter start position (inclusive)
endintegerCharacter end position (exclusive)
confidencefloatAverage softmax confidence across entity tokens

Supported models

NER models must have id2label with BIO-tagged labels (e.g., B-PER, I-PER, O) in their config.json.

BERT family — loads classifier.weight + classifier.bias on top of the encoder:

  • dslim/bert-base-NER (English, 4 entity types)
  • dbmdz/bert-large-cased-finetuned-conll03-english

ModernBERT — same pattern, modern encoder architecture.

How it works

text → tokenize (with character offsets)
     → encoder forward → hidden states [batch, seq_len, hidden]
     → Linear(hidden, num_labels) per token → logits
     → softmax → argmax → BIO tag per token
     → merge consecutive B-/I- tags into entity spans
     → map character offsets back to original text

The BIO decoding handles:

  • B-TYPE: starts a new entity of that type
  • I-TYPE: continues the current entity (must match type)
  • O: outside any entity
  • Special tokens ([CLS], [SEP], padding) are automatically skipped

Semantic Search

Perform ANN vector similarity search over embedding tables. Results include all original source columns, similarity scores, and evidence provenance.

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::config::JammiConfig;
async fn ex(config: JammiConfig) -> jammi_db::error::Result<()> {
use std::sync::Arc;

let session = Arc::new(InferenceSession::new(config).await?);

// Encode a query
let query = session.encode_text_query(
    "sentence-transformers/all-MiniLM-L6-v2",
    "quantum computing applications",
).await?;

// Search — returns top 10 results
let results = session.search("patents", query, 10).await?
    .run().await?;
Ok(()) }
}

Python

query_vec = db.encode_text_query("sentence-transformers/all-MiniLM-L6-v2", "quantum computing applications")

search = db.search("patents", query=query_vec, k=10)
results = search.run()
print(results.to_pandas())

What search returns

Results are RecordBatch / pyarrow.Table with:

  • All original source columns (e.g., id, title, abstract, year)
  • _row_id — the source key
  • _source_id — which source the row came from
  • similarity — cosine similarity score (1.0 = identical, 0.0 = orthogonal)
  • retrieved_byList<Utf8> provenance: which channels found this row
  • annotated_byList<Utf8> provenance: which channels added evidence post-retrieval

SearchBuilder

The SearchBuilder composes query operations. Each method adds a node to a DataFusion execution plan — no data is processed until .run().

Filter

Apply a SQL WHERE clause to hydrated columns:

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &std::sync::Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
session.search("patents", query, 20).await?
    .filter("year > 2020")?
    .run().await?;
Ok(()) }
}

Python

search = db.search("patents", query=query_vec, k=20)
search.filter("year > 2020")
results = search.run()

Sort

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &std::sync::Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
session.search("patents", query, 20).await?
    .sort("similarity", true)?  // descending
    .run().await?;
Ok(()) }
}

Python

search = db.search("patents", query=query_vec, k=20)
search.sort("similarity", descending=True)
results = search.run()

Limit

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &std::sync::Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
session.search("patents", query, 20).await?
    .sort("similarity", true)?
    .limit(5)
    .run().await?;
Ok(()) }
}

Python

search = db.search("patents", query=query_vec, k=20)
search.sort("similarity", descending=True)
search.limit(5)
results = search.run()

Select

Project specific columns. Note that retrieved_by and annotated_by evidence columns are always appended to the output regardless of your selection:

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &std::sync::Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
session.search("patents", query, 10).await?
    .select(&["_row_id".into(), "title".into(), "similarity".into()])?
    .run().await?;
Ok(()) }
}

Python

search = db.search("patents", query=query_vec, k=10)
search.select(["_row_id", "title", "similarity"])
results = search.run()

Chaining everything

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &std::sync::Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
let results = session.search("patents", query, 100).await?
    .filter("year > 2020")?
    .sort("similarity", true)?
    .limit(10)
    .select(&["title".into(), "similarity".into()])?
    .run().await?;
Ok(()) }
}

Python

search = db.search("patents", query=query_vec, k=100)
search.filter("year > 2020")
search.sort("similarity", descending=True)
search.limit(10)
search.select(["title", "similarity"])
results = search.run()

Search automatically selects the best path:

  • ANN (fast) — when sidecar index files (.usearch + .rowmap + .manifest.json) exist and load successfully
  • Exact (brute-force) — fallback when sidecar files are missing or corrupt

The caller never knows the difference. Deleting sidecar files degrades performance but not correctness.

Embedding table resolution

When multiple embedding tables exist for a source, search uses the most recently created “ready” table. The resolution order:

  1. Explicit table name (if provided)
  2. Latest ready embedding table for the source (by created_at)
  3. Error if no embedding table exists

Enrich Results with Joins and Annotations

Search results can be enriched by joining with other data sources and annotating with additional model inference. Every enrichment step is tracked in the evidence provenance columns.

Join with another source

Join search results with a registered source to add context columns (e.g., company name, category labels):

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
async fn ex(session: &Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
session.add_source("assignees", SourceType::File, SourceConnection {
    url: Some("file:///data/assignees.csv".into()),
    format: Some(FileFormat::Csv),
    ..Default::default()
}).await?;

let results = session.search("patents", query, 10).await?
    .join("assignees", "assignee_id=id", None).await?  // left join by default
    .run().await?;
// Results now include company_name, country from assignees
Ok(()) }
}

Python

db.add_source("assignees", path="/data/assignees.csv", format="csv")

search = db.search("patents", query=query_vec, k=10)
search.join("assignees", on="assignee_id=id")
results = search.run()
# Results now include company_name, country from assignees

The on parameter is "left_col=right_col". The optional join type is "inner" or "left" (default).

Annotate with model inference

Run a model over search results to add new columns:

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::ModelTask;
async fn ex(session: &Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
let results = session.search("patents", query, 10).await?
    .annotate(
        "sentence-transformers/all-MiniLM-L6-v2",
        ModelTask::TextEmbedding,
        &["abstract".to_string()],
    ).await?
    .run().await?;
Ok(()) }
}

Python

search = db.search("patents", query=query_vec, k=10)
search.annotate(
    model="sentence-transformers/all-MiniLM-L6-v2",
    task="text_embedding",
    columns=["abstract"],
)
results = search.run()

Evidence provenance

Every search result carries provenance tracking that records how each row was found and enriched:

Scenarioretrieved_byannotated_by
Plain search["vector"][]
Search + annotate["vector"]["inference"]

These are List<Utf8> columns — each row has its own list of contributing channels.

Composing everything

All operations compose freely:

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::ModelTask;
async fn ex(session: &Arc<InferenceSession>, query: Vec<f32>) -> jammi_db::error::Result<()> {
let results = session.search("patents", query, 100).await?
    .join("assignees", "assignee_id=id", None).await?
    .annotate("all-MiniLM-L6-v2", ModelTask::TextEmbedding, &["abstract".into()]).await?
    .filter("country = 'US'")?
    .sort("similarity", true)?
    .limit(10)
    .select(&["title".into(), "company_name".into(), "similarity".into()])?
    .run().await?;
Ok(()) }
}

Python

search = db.search("patents", query=query_vec, k=100)
search.join("assignees", on="assignee_id=id")
search.annotate(model="all-MiniLM-L6-v2", task="text_embedding", columns=["abstract"])
search.filter("country = 'US'")
search.sort("similarity", descending=True)
search.limit(10)
search.select(["title", "company_name", "similarity"])
results = search.run()

The pipeline builds a DataFusion execution plan under the hood. No data is processed until .run() — so adding more steps doesn’t cost anything until execution.

Declare a Custom Provenance Channel

Every row that flows through Jammi carries provenance — retrieved_by and annotated_by lists that record how the row was found and what was added after retrieval. Jammi ships two built-in channels — vector (declares similarity) and inference (declares inference_model, inference_task, inference_confidence) — but the catalog accepts any channel a consumer wants to register. Each channel declares the columns it contributes; the engine merges those columns into every result RecordBatch at query time.

This recipe walks through registering a third channel, scored_by, for a multi-stage retrieval pipeline where a federated reranker rescores the vector hits. The same shape applies to any non-built-in provenance signal: a citation graph, an attribution chain, a quality-grading pass.

Setup

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate arrow;
extern crate tokio;
use jammi_db::config::JammiConfig;
async fn ex(config: JammiConfig) -> jammi_db::error::Result<()> {
use std::sync::Arc;
use arrow::array::{ArrayRef, Float32Array, StringArray};
use jammi_ai::evidence::{merge_channels, ChannelContribution};
use jammi_ai::session::InferenceSession;
use jammi_db::catalog::channel_repo::{ChannelColumn, ChannelColumnType, ChannelSpec};
use jammi_db::ChannelId;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};

let session = Arc::new(InferenceSession::new(config).await?);
session.add_source("patents", SourceType::File, SourceConnection {
    url: Some("file:///data/patents.parquet".into()),
    format: Some(FileFormat::Parquet),
    ..Default::default()
}).await?;
Ok(()) }
}

Python

import jammi_ai

db = jammi_ai.connect(artifact_dir="/var/lib/jammi")
db.add_source("patents", path="/data/patents.parquet", format="parquet")

Declare the channel

Channel declarations are catalog rows. Each declared column has a name, an Arrow type, and an ordinal. The set is append-only — once ranker: Utf8 is declared, the engine refuses to redeclare it as Int32 or drop it.

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::catalog::channel_repo::{ChannelColumn, ChannelColumnType, ChannelSpec};
use jammi_db::ChannelId;
async fn ex(session: &Arc<InferenceSession>) -> jammi_db::error::Result<()> {
session.catalog().channels().register(&ChannelSpec {
    id: ChannelId::new("scored_by")?,
    priority: 3,
    columns: vec![
        ChannelColumn {
            name: "ranker".into(),
            data_type: ChannelColumnType::Utf8,
        },
        ChannelColumn {
            name: "rank_score".into(),
            data_type: ChannelColumnType::Float32,
        },
    ],
}).await?;
Ok(()) }
}

Python

db.register_channel(
    "scored_by",
    priority=3,
    columns=[("ranker", "Utf8"), ("rank_score", "Float32")],
)

priority controls the order columns appear in the merged output — vector (1) and inference (2) come first, then scored_by (3).

To add more columns to an already-registered channel:

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::catalog::channel_repo::{ChannelColumn, ChannelColumnType};
use jammi_db::ChannelId;
async fn ex(session: &Arc<InferenceSession>) -> jammi_db::error::Result<()> {
session.catalog().channels().add_columns(
    &ChannelId::new("scored_by")?,
    &[ChannelColumn { name: "scored_at".into(), data_type: ChannelColumnType::Utf8 }],
).await?;
Ok(()) }
}
db.add_channel_columns("scored_by", columns=[("scored_at", "Utf8")])

add_columns is append-only by construction. Trying to redeclare ranker with the same or a different type returns JammiError::EvidenceChannel(_).

Use the channel

Build a ChannelContribution for each batch your reranker produces. The arrays must align 1:1 with the channel’s declared columns (ranker first, rank_score second) and have the same length as the batch’s row count.

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate arrow;
extern crate tokio;
use std::sync::Arc;
use arrow::array::{ArrayRef, Float32Array, RecordBatch, StringArray};
use jammi_ai::evidence::{merge_channels, ChannelContribution};
use jammi_ai::session::InferenceSession;
use jammi_db::ChannelId;
fn rerank_scores(_batch: &RecordBatch) -> Vec<f32> { vec![] }
async fn ex(session: &Arc<InferenceSession>, batches: Vec<RecordBatch>) -> jammi_db::error::Result<()> {
let scored_by = ChannelId::new("scored_by")?;
let vector = ChannelId::new("vector")?;

let mut contributions = Vec::with_capacity(batches.len());
for batch in &batches {
    let n = batch.num_rows();
    let ranker: ArrayRef = Arc::new(StringArray::from(vec!["bm25"; n]));
    let rank_score: ArrayRef = Arc::new(Float32Array::from(rerank_scores(batch)));
    contributions.push(vec![ChannelContribution {
        channel: scored_by.clone(),
        columns: vec![ranker, rank_score],
    }]);
}

let merged = merge_channels(
    session.catalog(),
    &batches,
    &[vector.clone(), scored_by.clone()],
    &[vector, scored_by],   // retrieved_by
    &[],                     // annotated_by
    &contributions,
).await?;
Ok(()) }
}

Verify

The merged output schema includes the declared columns. Rows where the channel did not supply a value carry NULL.

Rust

#![allow(unused)]
fn main() {
extern crate arrow;
use arrow::array::RecordBatch;
fn ex(merged: Vec<RecordBatch>) {
let schema = merged[0].schema();
assert!(schema.field_with_name("ranker").is_ok());
assert!(schema.field_with_name("rank_score").is_ok());

for batch in &merged {
    let ranker = batch.column_by_name("ranker").unwrap();
    println!("first ranker: {:?}", ranker);
}
}
}

Python

From the SQL surface, the declared columns show up in any query that touches the result table — Python sees them as plain Arrow columns:

table = db.sql(
    "SELECT _row_id, similarity, ranker, rank_score FROM results LIMIT 3"
)
for row in table.to_pylist():
    print(row["ranker"], row["rank_score"])

What you cannot do

The channel declaration is append-only. Once scored_by ships with ranker: Utf8, you cannot:

  • Redeclare ranker as Int32add_columns rejects with JammiError::EvidenceChannel("channel 'scored_by': column 'ranker' was declared Utf8, cannot redeclare as Int32"). From Python, the same call raises RuntimeError carrying the identical message:

    db.add_channel_columns("scored_by", columns=[("ranker", "Int32")])
    # RuntimeError: channel 'scored_by': column 'ranker' was declared Utf8, cannot redeclare as Int32
    
  • Add a second column with the same name — add_columns rejects with JammiError::EvidenceChannel("channel 'scored_by': column 'ranker' already declared").

  • Drop ranker from the channel — there is no drop_column method by design.

If a column needs to change shape, declare a new column under a new name and migrate consumers. This preserves byte-for-byte readability of any backing table or downstream artifact that already references the original column.

Fine-Tune for Your Domain

Train LoRA adapters on your data to improve embedding quality for your domain. The base model stays frozen — only a small projection layer is trained and saved.

Prepare training data

Create contrastive pairs with a similarity score:

text_a,text_b,score
"quantum error correction","superconducting qubit stabilization",0.88
"quantum error correction","medieval poetry analysis",0.08

High scores mean similar; low scores mean dissimilar.

Register the training data as a source:

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.add_source("training", SourceType::File, SourceConnection {
    url: Some("file:///data/training_pairs.csv".into()),
    format: Some(FileFormat::Csv),
    ..Default::default()
}).await?;
Ok(()) }
}

Python

db.add_source("training", path="/data/training_pairs.csv", format="csv")

Start a fine-tuning job

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::fine_tune::FineTuneMethod;
use jammi_db::ModelTask;

let job = session.fine_tune(
    "training",
    "sentence-transformers/all-MiniLM-L6-v2",
    &["text_a".into(), "text_b".into(), "score".into()],
    FineTuneMethod::Lora,
    ModelTask::TextEmbedding,
    None,  // default config
).await?;

println!("Job: {}", job.job_id);
job.wait().await?;
println!("Model: {}", job.model_id());
Ok(()) }
}

Python

job = db.fine_tune(
    source="training",
    base_model="sentence-transformers/all-MiniLM-L6-v2",
    columns=["text_a", "text_b", "score"],
    method="lora",
    task="embedding",
)

job.wait()
print(f"Model: {job.model_id}")

Custom configuration

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_ai::fine_tune::{FineTuneMethod, LrSchedule};
async fn ex(session: &InferenceSession, model: &str, columns: Vec<String>) -> jammi_db::error::Result<()> {
use jammi_ai::fine_tune::FineTuneConfig;
use jammi_db::ModelTask;

let config = FineTuneConfig {
    lora_rank: 4,
    learning_rate: 5e-4,
    epochs: 5,
    batch_size: 4,
    warmup_steps: 10,
    lr_schedule: LrSchedule::CosineDecay,
    early_stopping_patience: 2,
    validation_fraction: 0.2,
    gradient_accumulation_steps: 4,  // effective batch = 4 x 4 = 16
    ..Default::default()
};

let job = session.fine_tune(
    "training", model, &columns, FineTuneMethod::Lora, ModelTask::TextEmbedding, Some(config),
).await?;
Ok(()) }
}

Configuration reference

FieldDefaultDescription
lora_rank8Low-rank dimension
lora_alpha16.0Scaling factor
lora_dropout0.05Dropout probability
learning_rate2e-4Base learning rate
epochs3Training epochs
batch_size8Micro-batch size
max_seq_length512Max tokens per text
gradient_accumulation_steps1Steps before optimizer update
validation_fraction0.1Holdout fraction for early stopping
early_stopping_patience3Epochs without improvement before stopping
warmup_steps100Linear warmup from 0 to base LR
lr_scheduleCosineDecayDecay after warmup: Constant, CosineDecay, LinearDecay
embedding_lossautoCoSent (pairs+scores), Triplet, MultipleNegativesRanking

Use the fine-tuned model

The fine-tuned model is automatically registered and can be used anywhere a model ID is accepted:

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_ai::fine_tune::job::FineTuneJob;
async fn ex(session: &InferenceSession, job: &FineTuneJob) -> jammi_db::error::Result<()> {
let model_id = job.model_id();

let embedding = session.encode_text_query(model_id, "quantum computing").await?;
session.generate_text_embeddings("patents", model_id, &["abstract".into()], "id").await?;
Ok(()) }
}

Python

model_id = job.model_id

query_vec = db.encode_text_query(model_id, "quantum computing")
db.generate_text_embeddings(source="patents", model=model_id, columns=["abstract"], key="id")

How it works

text -> encoder (frozen) -> base embedding -> LoRA projection (trained) -> output
  1. The base encoder model (BERT, ModernBERT, etc.) is loaded and frozen
  2. A LoRA projection layer (identity + low-rank A/B matrices) is added after pooling
  3. For each batch: text is encoded, projected through LoRA, and loss is computed
  4. Only the A/B matrices receive gradients
  5. The adapter is saved as adapter.safetensors in the artifact directory

Encoder-adapters fine-tuning (PEFT-style adapter injection)

The default flow above trains a single low-rank projection head sitting outside the frozen encoder. For higher capacity at the same parameter budget, Jammi also supports encoder adapters — LoRA injected into named linear layers inside the encoder stack, matching the PEFT convention.

Switch to encoder adapters by populating target_modules on FineTuneConfig:

#![allow(unused)]
fn main() {
extern crate jammi_ai;
use jammi_ai::fine_tune::FineTuneConfig;
fn make() -> FineTuneConfig {
let config = FineTuneConfig {
    lora_rank: 8,
    lora_alpha: 16.0,
    // Inject LoRA into BERT's attention query and value projections.
    target_modules: vec!["query".to_string(), "value".to_string()],
    ..Default::default()
};
config }
}
job = db.fine_tune(
    source="training",
    base_model="sentence-transformers/all-MiniLM-L6-v2",
    columns=["text_a", "text_b", "score"],
    method="lora",
    task="text_embedding",
    target_modules=["query", "value"],
)

Target-module conventions

Pick target_modules per the architecture you’re fine-tuning:

ArchitectureCommon target_modules
BERT / RoBERTa / CamemBERT / XLM-RoBERTa["query", "value"] (recommended) or ["query", "key", "value", "dense"]
DistilBERT["q_lin", "v_lin"] or ["q_lin", "k_lin", "v_lin", "out_lin"]
ModernBERT["Wqkv", "Wo"] (fused QKV + output)
Any encoder["all-linear"] — every linear layer gets an adapter (largest capacity)

Names match the trailing module-name segment in the HuggingFace weight layout. Suffix matching is the rule, so "query" matches "attention.self.query".

Layer ranges and per-module ranks

Two optional refinements:

  • layers_to_transform — restrict injection to specific 0-based layer indices. None (default) applies to every layer.
  • rank_pattern — override lora_rank for individual modules. Keys are substring matches against the module name; values are the override rank.
#![allow(unused)]
fn main() {
extern crate jammi_ai;
use jammi_ai::fine_tune::FineTuneConfig;
fn make() -> FineTuneConfig {
let mut rank_pattern = std::collections::HashMap::new();
rank_pattern.insert("query".to_string(), 16);  // higher capacity on Q
rank_pattern.insert("value".to_string(), 4);   // lower on V

let config = FineTuneConfig {
    lora_rank: 8,                                     // default rank
    target_modules: vec!["query".into(), "value".into()],
    layers_to_transform: Some(vec![6, 7, 8, 9, 10, 11]), // top half only
    rank_pattern,
    ..Default::default()
};
config }
}

On-disk artifact

Every fine-tuned model writes adapter.safetensors plus an adapter_config.json whose adapter_type tag discriminates between the two adapter shapes Jammi produces.

Encoder-adapters example:

{
  "adapter_type": "encoder_adapters",
  "model_type": "bert",
  "lora_rank": 8,
  "lora_alpha": 16.0,
  "use_rslora": false,
  "target_modules": ["query", "value"],
  "layers_to_transform": [6, 7, 8, 9, 10, 11],
  "rank_pattern": {"query": 16, "value": 4},
  "backbone_dtype": "f32"
}

Projection-head example:

{
  "adapter_type": "projection_head",
  "lora_rank": 8,
  "lora_alpha": 16.0,
  "head_layers": ["projection"]
}

The Candle inference backend reads adapter_config.json on model load and dispatches on adapter_type: encoder_adapters rebuilds the encoder with frozen backbone weights plus the LoRA A/B from adapter.safetensors; projection_head loads the saved projection weights as a LoraLinear applied after pooling.

When to use each

  • Projection head — fastest training, smallest artifact, lowest memory. The default when target_modules is empty. Best for adapting embedding direction without changing per-token attention behaviour.
  • Encoder adapters — higher representational ceiling per adapter parameter; required if the task needs to reshape attention behaviour (e.g. a domain where the base attention pattern mismatches the query distribution). Costs a slightly slower forward pass since the LoRA path runs per layer.

Training safety

  • Divergence detection: if loss is NaN or >100 for 3 consecutive batches, the job fails with a clear error
  • Early stopping: training stops when validation loss doesn’t improve for patience epochs, best checkpoint weights are restored
  • Checkpoints: saved at ~10% intervals for crash recovery

Evaluate and Compare Models

Measure embedding quality and classification accuracy against golden datasets. Results are recorded in the catalog for tracking over time.

Prepare a golden dataset

A golden dataset is any registered source with the right columns. No special format required.

Retrieval golden set

query_id,query_text,relevant_id
q1,quantum computing applications,1
q1,quantum computing applications,4
q2,machine learning for science,2
ColumnTypeRequired
query_idUtf8yes
query_textUtf8yes
relevant_idUtf8 or Intyes
relevance_gradeInt32no (default: 1 = binary)

Register it as a source:

db.add_source("golden", path="/data/golden_relevance.csv", format="csv")

Evaluate embedding quality

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let report = session.eval_embeddings(
    "patents",
    None,                              // use latest embedding table
    "golden.public.golden_relevance",  // golden dataset
    10,                                // k for recall@k, precision@k
).await?;

println!("recall@10:    {}", report.aggregate.recall_at_k);
println!("precision@10: {}", report.aggregate.precision_at_k);
println!("MRR:          {}", report.aggregate.mrr);
println!("nDCG:         {}", report.aggregate.ndcg);
Ok(()) }
}

Python

metrics = db.eval_embeddings(
    source="patents",
    golden_source="golden.public.golden_relevance",
    k=10,
)

agg = metrics["aggregate"]
print(f"recall@10:    {agg['recall_at_k']:.3f}")
print(f"precision@10: {agg['precision_at_k']:.3f}")
print(f"MRR:          {agg['mrr']:.3f}")
print(f"nDCG:         {agg['ndcg']:.3f}")

Per-query drill-down

The report also carries a per_query array — one record per golden-set query, in golden order. This is what sample-based statistical rules (Welch’s t, Mann-Whitney U) consume at gate time.

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let report = session.eval_embeddings("patents", None, "golden.public.golden_relevance", 10).await?;
for record in &report.per_query {
    println!("{}: recall={:.3} ndcg={:.3}",
        record.query_id, record.metrics.recall, record.metrics.ndcg);
}
Ok(()) }
}
for record in metrics["per_query"]:
    m = record["metrics"]
    print(f"{record['query_id']}: recall={m['recall']:.3f} ndcg={m['ndcg']:.3f}")

Retrieval metrics

MetricWhat it measures
recall_at_kFraction of relevant docs found in top-k
precision_at_kFraction of top-k that are relevant
mrrReciprocal rank of the first relevant result
ndcgNormalized discounted cumulative gain (uses graded relevance if provided)

All metrics are in [0, 1]. Higher is better.

Compare models (A/B)

Compare a base model against a fine-tuned model:

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession, base_table: String, finetuned_table: String) -> jammi_db::error::Result<()> {
let comparison = session.eval_compare(
    &[base_table.clone(), finetuned_table.clone()],
    "patents",
    "golden.public.golden_relevance",
    10,
).await?;

// The first entry is the baseline (`delta: None`); every subsequent entry
// carries a delta against it.
for entry in comparison.per_table.iter().skip(1) {
    let delta = entry.delta.as_ref().expect("non-baseline entries carry a delta");
    println!(
        "{}: recall@10 delta {:+.3} ({:+.1}%)",
        entry.table_name,
        delta.recall_at_k.absolute,
        delta.recall_at_k.relative * 100.0,
    );
}
Ok(()) }
}

Python

comparison = db.eval_compare(
    embedding_tables=[base_table, finetuned_table],
    source="patents",
    golden_source="golden.public.golden_relevance",
    k=10,
)
# `per_table[0]` is the baseline (`delta` is None); subsequent entries
# carry a `delta` dict keyed by metric name (recall_at_k, precision_at_k,
# mrr, ndcg) with `absolute` and `relative` sub-keys.
for entry in comparison["per_table"][1:]:
    d = entry["delta"]["recall_at_k"]
    print(f"{entry['table_name']}: recall@10 delta {d['absolute']:+.3f} ({d['relative']*100:+.1f}%)")

The first table is the baseline. Deltas (absolute and relative) are computed for all subsequent tables.

Evaluate classification

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_ai::eval::{EvalTask, InferenceAggregate};

let report = session.eval_inference(
    "facebook/bart-large-mnli",
    "test_data",
    &["text".into()],
    EvalTask::Classification,
    "golden.public.labels",
    "category",
).await?;

match &report.aggregate {
    InferenceAggregate::Classification(c) => {
        println!("Accuracy: {}", c.accuracy);
        println!("Macro F1: {}", c.f1);
    }
    InferenceAggregate::Ner(n) => {
        println!("NER F1: {}", n.f1);
    }
}
println!("per_record predictions: {}", report.per_record.len());
Ok(()) }
}

Python

metrics = db.eval_inference(
    model="facebook/bart-large-mnli",
    source="test_data",
    columns=["text"],
    task="classification",
    golden_source="golden.public.labels",
    label_column="category",
)

# `aggregate` is tagged by `task`; for classification it carries
# `accuracy`, `f1`, and `per_class`.
agg = metrics["aggregate"]
print(f"Accuracy: {agg['accuracy']:.3f}")
print(f"Macro F1: {agg['f1']:.3f}")
# `per_record` is one entry per aligned predicted/gold pair.
print(f"per_record predictions: {len(metrics['per_record'])}")

Eval runs in the catalog

Every evaluation is recorded automatically:

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
let runs = session.catalog().list_eval_runs().await?;
for run in &runs {
    println!("{}: {} on {} (k={:?})", run.eval_run_id, run.eval_type, run.golden_source, run.k);
}
Ok(()) }
}

Schema validation

Golden datasets are validated before evaluation starts. Missing or wrong-type columns produce clear errors:

Eval error: Golden dataset missing required column 'query_text'
Eval error: Golden dataset column 'query_id' has type Boolean, expected Utf8

Integer ID columns (Int32, Int64) are accepted where Utf8 is expected.

Connect to PostgreSQL / MySQL

Jammi federates external databases alongside local files. Register a database as a source and query it with the same SQL interface — joins across local files and databases work seamlessly.

PostgreSQL

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
use jammi_db::source::{SourceConnection, SourceType};

session.add_source("pg_data", SourceType::Postgres, SourceConnection {
    url: Some("postgresql://user:pass@localhost:5432/mydb".into()),
    ..Default::default()
}).await?;

let results = session.sql(
    "SELECT id, title FROM pg_data.public.articles WHERE published = true LIMIT 10"
).await?;
Ok(()) }
}

MySQL

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::source::{SourceConnection, SourceType};
async fn ex(session: &InferenceSession) -> jammi_db::error::Result<()> {
session.add_source("mysql_data", SourceType::Mysql, SourceConnection {
    url: Some("mysql://user:pass@localhost:3306/mydb".into()),
    ..Default::default()
}).await?;
Ok(()) }
}

Cross-source joins

Once registered, external databases are queryable with the same three-part naming convention and can be joined with local files:

SELECT p.title, a.author_name
FROM local_data.public.papers p
JOIN pg_data.public.authors a ON p.author_id = a.id
WHERE a.institution = 'MIT'

Generate embeddings from external sources

External databases work as sources for embedding generation:

# Note: external databases must be registered through the Rust API,
# which exposes the typed SourceType::Postgres / SourceType::Mysql variants.
# The Python `add_source(url=…, format=…)` surface is for file-shaped sources.

db.generate_text_embeddings(
    source="pg_articles",
    model="sentence-transformers/all-MiniLM-L6-v2",
    columns=["title", "abstract"],
    key="id",
)

Feature flags

External source support requires feature flags when building from source:

SourceFeature flag
PostgreSQLpostgres
MySQLmysql

These are enabled by default in published crates and pre-built binaries.

Supported source types

TypeDescriptionStatus
File (file://)Parquet, CSV, JSON on local diskAlways available
File (s3:// / gs:// / azure://)Same formats over cloud object storesFeature-gated — see Cloud Storage
PostgreSQLAny PostgreSQL-compatible databaseAvailable
MySQLMySQL / MariaDBAvailable
SQLiteSQLite databasesNot supported (rusqlite version conflict)

Store Sources and Results in Cloud Object Storage

Jammi treats local disk, S3, GCS, and Azure Blob as interchangeable backends. Any place the engine accepts a local file path it also accepts a storage URL — file://, s3://, gs://, or azure:// — including registered file-shaped sources and the result-table Parquet that embedding and inference jobs write.

Build with the cloud features you need

The default build ships only file:// and the in-memory test driver. Cloud schemes are opt-in per provider so a deployment that only uses S3 does not pull in the GCS and Azure SDK chains:

FeatureSchemes it enables
storage-s3s3:// (AWS S3 and S3-compatible: MinIO, R2, LocalStack)
storage-gcsgs://
storage-azureazure://, abfss://
storage-cloudAll three (umbrella)
[dependencies]
jammi-db = { version = "0.5", features = ["storage-s3", "storage-gcs"] }

Live integration tests live behind matching live-s3-tests, live-gcs-tests, live-azure-tests features so the hermetic cargo test lane never reaches the network.

Register an S3-backed source

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use jammi_ai::session::InferenceSession;
async fn ex(session: &InferenceSession) -> Result<(), Box<dyn std::error::Error>> {
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
use jammi_db::storage::{CloudConfig, S3Config, StorageUrl};

let url = StorageUrl::parse("s3://benchmarks/snapshots/2026/papers.parquet")?;

let conn = SourceConnection {
    url: Some(url.to_string()),
    format: Some(FileFormat::Parquet),
    cloud: Some(CloudConfig::S3(S3Config {
        region: Some("us-east-1".into()),
        ..Default::default()
    })),
    ..Default::default()
};

session.add_source("papers", SourceType::File, conn).await?;

let rows = session
    .sql("SELECT id, title FROM papers.public.papers LIMIT 10")
    .await?;
Ok(()) }
}

If the cloud field is None and the URL is a cloud scheme, the driver falls back to the SDK’s ambient credential chain — env vars, instance profile, IRSA, ADC, Managed Identity.

Python

from jammi_ai import Database

db = Database()
db.add_source("papers", url="s3://benchmarks/snapshots/2026/papers.parquet", format="parquet")
db.sql("SELECT id, title FROM papers.public.papers LIMIT 10")

The Python binding accepts the same URL forms as the Rust API; per-source cloud credentials are read from process environment.

CLI

jammi sources add papers \
    --url s3://benchmarks/snapshots/2026/papers.parquet \
    --format parquet

GCS and Azure

The pattern is identical — only the URL prefix and the CloudConfig variant change:

#![allow(unused)]
fn main() {
extern crate jammi_db;
use jammi_db::source::{FileFormat, SourceConnection};
fn make() -> SourceConnection {
use jammi_db::storage::{CloudConfig, GcsConfig};

let conn = SourceConnection {
    url: Some("gs://archives/2026/jan.parquet".into()),
    format: Some(FileFormat::Parquet),
    cloud: Some(CloudConfig::Gcs(GcsConfig {
        service_account_path: Some("/etc/jammi/sa.json".into()),
        ..Default::default()
    })),
    ..Default::default()
};
conn }
}
#![allow(unused)]
fn main() {
extern crate jammi_db;
use jammi_db::source::{FileFormat, SourceConnection};
fn make() -> Result<SourceConnection, Box<dyn std::error::Error>> {
use jammi_db::storage::{AzureConfig, CloudConfig};

let conn = SourceConnection {
    url: Some("azure://snapshots/model_outputs.parquet".into()),
    format: Some(FileFormat::Parquet),
    cloud: Some(CloudConfig::Azure(AzureConfig {
        account_name: Some("mystorage".into()),
        sas_token: Some(std::env::var("AZURE_SAS_TOKEN")?),
        ..Default::default()
    })),
    ..Default::default()
};
Ok(conn) }
}

Persist result tables to the cloud

ResultStore accepts a [StorageUrl] root, so embedding and inference outputs land in the same bucket as the source data:

#![allow(unused)]
fn main() {
extern crate jammi_db;
use std::sync::Arc;
use jammi_db::catalog::Catalog;
fn ex(catalog: Arc<Catalog>) -> jammi_db::error::Result<()> {
use jammi_db::storage::{StorageRegistry, StorageUrl};
use jammi_db::store::ResultStore;
use std::sync::Arc;

let root = StorageUrl::parse("s3://benchmarks/jammi_db")?;
let registry = StorageRegistry::new();
let result_store = Arc::new(ResultStore::with_root(root, registry, catalog)?);
Ok(()) }
}

Every result table the session creates writes its Parquet and sidecar ANN index to that prefix; delete_table_files and the crash-recovery pass operate against the same backend.

How the layout maps onto buckets

For a result table named papers__text_embedding__bge-m3__20260520T120000Z_abc12345, the engine writes three siblings:

s3://benchmarks/jammi_db/papers__text_embedding__bge-m3__….parquet
s3://benchmarks/jammi_db/papers__text_embedding__bge-m3__….idx.usearch
s3://benchmarks/jammi_db/papers__text_embedding__bge-m3__….idx.rowmap
s3://benchmarks/jammi_db/papers__text_embedding__bge-m3__….idx.manifest.json

The sidecar layout is the same on every backend; the only difference is the driver under the hood. USearch’s path-based FFI is bridged through a tempfile for cloud schemes so its save / load calls work unchanged.

Register a Mutable Companion Table

A mutable companion table lives in the same backend database as the Jammi catalog (SQLite by default, Postgres in shared deployments), supports transactional INSERT / UPDATE / DELETE through DataFusion DML, and federates with Parquet result tables and external sources in one SQL surface. Reach for it when a tenant needs a relation it can edit row by row — a feature-store slowly-changing dimension table, a per-user state table, a config-driven lookup — that still has to participate in the same JOINs as your immutable result tables.

The primitive carries only what every consumer needs: a schema, a primary key, optional tenant scope, optional secondary indexes, optional ordering column. No history semantics, no lifecycle vocabulary, no audit columns.

Goal

This recipe walks through registering one mutable companion table for a neutral third-party use case (a feature-store team called Polaris Features maintaining slowly-changing dimensions for their recommender) and shows the equivalent Rust / Python / CLI surface.

Setup

Assumes a working JammiSession. The session opens the catalog at the configured artifact directory; nothing else is needed.

Define the schema

Polaris keeps one row per (item_id, valid_from, valid_to) interval:

#![allow(unused)]
fn main() {
extern crate arrow_schema;
fn make() {
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema};

let schema = Arc::new(Schema::new(vec![
    Field::new("item_id",      DataType::Utf8,    false),
    Field::new("price_tier",   DataType::Utf8,    false),
    Field::new("availability", DataType::Utf8,    false),
    Field::new("valid_from",   DataType::Int64,   false),  // epoch milliseconds
    Field::new("valid_to",     DataType::Int64,   true),   // epoch milliseconds; NULL = open
]));
}
}

The catalog encoder accepts the closed primitive subset enforced by every MutableBackend impl — Boolean, the integer family, Float32 / Float64, Utf8, Binary. Wider types (e.g. Timestamp, Decimal) round-trip via their natural numeric encoding (Int64 epoch milliseconds, scaled Int64) so the schema stays narrow and the rule stays one-line at the boundary.

The engine reserves tenant_id and any column whose name starts with _ — the schema builder rejects them at build time per ADR-00. (The tenant_id column is always present on the storage table; the engine appends it implicitly.)

Build the definition

MutableTableDefinitionBuilder chains the field validations:

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate arrow_schema;
use std::sync::Arc;
use arrow_schema::Schema;
use jammi_db::store::mutable::definition::{
    MutableIndexDef, MutableTableDefinitionBuilder, MutableTableId,
};

fn make(schema: Arc<Schema>) -> jammi_db::store::mutable::definition::MutableTableDefinition {
let def = MutableTableDefinitionBuilder::new(
        MutableTableId::new("item_dimensions").unwrap(),
        schema,
    )
    .primary_key(vec!["item_id".into(), "valid_from".into()])
    .index(MutableIndexDef {
        name: "idx_item_dim_active".into(),
        columns: vec!["item_id".into(), "valid_to".into()],
        unique: false,
    })
    .build()
    .unwrap();
def
}
}

The primary key must be a non-empty subset of the schema; secondary indexes are optional but persisted on the storage table so the backend can use them for WHERE clauses.

Register

The registration is atomic: catalog row + storage CREATE TABLE + every secondary CREATE INDEX commit together. If any step fails, nothing lands.

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use jammi_db::store::mutable::definition::MutableTableDefinition;
use jammi_db::session::JammiSession;
async fn ex(session: &JammiSession, def: MutableTableDefinition) -> jammi_db::error::Result<()> {
let id = session.create_mutable_table(def).await?;
// The table is now queryable as `mutable.public.item_dimensions` in the
// same SQL surface that federates result tables and external sources.
Ok(())
}
}

Python

import pyarrow as pa
import jammi_ai

db = jammi_ai.connect(artifact_dir="/var/lib/jammi")
# The Python wrapper exposes mutable-table registration through the
# `create_mutable_table` accessor (see `jammi.mutable`). The recipe below
# is illustrative; consult the API reference for the binding shape your
# version ships.

CLI

The jammi CLI exposes mutable-table registration through the lower-level sources surface for now; programmatic clients should use the Rust or Python APIs.

Verify

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
async fn ex(session: &jammi_db::session::JammiSession) -> jammi_db::error::Result<()> {
let zero_rows = session
    .sql("SELECT * FROM mutable.public.item_dimensions LIMIT 0")
    .await?;
assert_eq!(zero_rows[0].schema().fields().len(), 5);
Ok(())
}
}

The query returns a zero-row batch with the declared schema — confirmation that the table is registered and DataFusion can route mutable.public.<id> correctly.

Federation tease

The mutable table now JOINs with your existing result tables and sources:

SELECT  d.item_id, d.price_tier, e.embedding
FROM    mutable.public.item_dimensions d
JOIN    itemembs.public.item_embeddings e ON e.item_id = d.item_id
WHERE   d.valid_to IS NULL
  AND   d.price_tier = 'premium'
LIMIT 10;

See the Run Transactional Updates on a Mutable Table recipe for INSERT / UPDATE / DELETE round-trips and the SCD Type 2 close-and-open pattern.

Run Transactional Updates on a Mutable Table

Once a mutable companion table is registered (see Register a Mutable Companion Table), you update its rows with the same SQL surface that runs your read queries. Every INSERT / UPDATE / DELETE lands in one backend transaction — either every row commits or none does — and federates with your immutable result tables in subsequent SELECTs.

Goal

Walk through the three DML verbs against the item_dimensions table from the previous recipe, then demonstrate the Slowly-Changing Dimension Type 2 close-and-open pattern Polaris uses to record a price-tier change.

Insert

INSERT INTO mutable.public.item_dimensions
    (item_id, price_tier, availability, valid_from)
VALUES
    ('sku-1842', 'standard', 'in_stock', '2026-04-01T00:00:00Z'),
    ('sku-2901', 'premium',  'in_stock', '2026-04-01T00:00:00Z'),
    ('sku-3457', 'standard', 'out_of_stock', '2026-04-01T00:00:00Z');

The RecordBatch returned by session.sql(...) carries a single-row UInt64 column called count per DataFusion’s TableProvider::insert_into contract. Three rows landed; the JOIN-against-result-tables query from the previous recipe now returns three rows.

Update

UPDATE mutable.public.item_dimensions
   SET availability = 'low_stock'
 WHERE item_id = 'sku-2901';

Predicate columns that participate in an index pushdown become a backend WHERE clause; the rest filter above the scan node. The update commits in one transaction; if the predicate matches zero rows, the call succeeds with rows_affected = 0.

Delete

DELETE FROM mutable.public.item_dimensions
 WHERE item_id = 'sku-3457';

DELETE follows the same shape. Row-level cascades are SQLite’s job (the foreign-key declarations on the storage table); the engine does not model cascades above the backend.

SCD Type 2 — close-and-open

Polaris records a price-tier change by closing the active row’s valid_to and inserting a new row with the new tier. Both statements must land atomically; today the supported pattern is to issue them as a single multi-statement SQL string through session.sql, which DataFusion plans as one DML batch under one transaction:

-- Single sql() call so both statements land in one transaction.
UPDATE mutable.public.item_dimensions
   SET valid_to = '2026-05-15T12:00:00Z'
 WHERE item_id = 'sku-1842' AND valid_to IS NULL;

INSERT INTO mutable.public.item_dimensions
    (item_id, price_tier, availability, valid_from)
VALUES
    ('sku-1842', 'premium', 'in_stock', '2026-05-15T12:00:00Z');

A future JammiSession::transaction(|tx| async { … }) API will make multi-statement DML atomicity explicit; today the multi-statement SQL string is the supported surface.

Federation join

The mutable table now joins with the embedding table to surface recommender candidates filtered by current tier:

SELECT  d.item_id, d.price_tier, e.embedding
  FROM  mutable.public.item_dimensions d
  JOIN  itemembs.public.item_embeddings e ON e.item_id = d.item_id
 WHERE  d.valid_to IS NULL
   AND  d.price_tier = 'premium'
 LIMIT 10;

The federation is the engine’s existing FederationOptimizerRule work — no special integration needed; mutable tables register under the same SessionContext as your Parquet result tables and external sources.

Crash recovery

If the process dies mid-write, no partial commit is visible on restart. SQLite’s WAL mode (documentation) and Postgres’s MVCC each guarantee that an open transaction either commits as a whole or is rolled back on connection loss. The engine inherits that guarantee through the CatalogBackend::transaction closure shape: when the closure returns Err(_), the backend rolls back; when the process is killed mid-execution, the backend rolls back the in-flight transaction.

Direct-access append + replay (Phase 4 trigger streams)

Two lower-level methods bypass DataFusion’s planner for high-throughput event paths:

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate arrow;
extern crate tokio;
async fn ex(
    session: &jammi_db::session::JammiSession,
    batch: arrow::array::RecordBatch,
) -> jammi_db::error::Result<()> {
use jammi_db::store::mutable::definition::MutableTableId;
use jammi_db::catalog::backend::TxOptions;

let id = MutableTableId::new("events").unwrap();
let registry = session.mutable_tables_arc();
let backend = session.catalog().backend_arc();

// Direct INSERT via insert_batch — caller owns the transaction.
backend
    .transaction(TxOptions::default(), move |tx| {
        let registry = registry.clone();
        let id = id.clone();
        let batch = batch.clone();
        Box::pin(async move {
            registry
                .insert_batch(tx, &id, &batch)
                .await
                .map_err(|e| jammi_db::BackendError::Execution(e.to_string()))?;
            Ok::<(), jammi_db::BackendError>(())
        })
    })
    .await?;
Ok(())
}
}
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate futures;
extern crate tokio;
use futures::StreamExt;
async fn ex(
    session: &jammi_db::session::JammiSession,
) -> jammi_db::error::Result<()> {
use jammi_db::store::mutable::definition::MutableTableId;

let id = MutableTableId::new("events").unwrap();
// Stream rows where the registered `order_column` value > 100.
let mut stream = session
    .mutable_tables()
    .scan_after(&id, 100)
    .await
    .map_err(|e| jammi_db::error::JammiError::Catalog(e.to_string()))?;
while let Some(batch) = stream.next().await {
    let _batch = batch
        .map_err(|e| jammi_db::error::JammiError::Catalog(e.to_string()))?;
    // …
}
Ok(())
}
}

These are the surface Phase 4’s trigger broker uses to publish events into a backing table and replay subscribers; general consumers should prefer the SQL surface.

Publish Events to a Topic

A trigger-stream topic is a catalog-registered Arrow schema plus a backing mutable table. Publishers append RecordBatches; subscribers filter and receive them. The engine owns the offset counter and the durable event log; the broker (in-memory by default, NATS JetStream in clustered deployments) fans live deliveries out to attached subscribers.

Reach for the trigger stream when a tenant needs event semantics — a CDC pipeline, a feature-store update bus, a job-completion notification fan-out — that has to coexist with the SQL surface the rest of the platform already uses. Every published event lands as a row in the topic’s backing mutable table; that table is queryable with the same Flight SQL surface as any other mutable companion table, so ad-hoc analytics on the event log come for free.

Goal

Walk through registering one topic for a neutral third-tenant use case (a small CDC pipeline pulling Postgres change events into a downstream search index) and publish a batch of events from Rust.

Setup

Assumes a JammiSession whose JammiConfig.trigger_broker is left at its default — the embedded InMemoryBroker. Production deployments swap in JetStreamBroker via configuration; the publisher API does not change.

Define the topic schema

#![allow(unused)]
fn main() {
extern crate arrow_schema;
fn make() {
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema};

let schema = Arc::new(Schema::new(vec![
    Field::new("op",         DataType::Utf8,  false),
    Field::new("ts_ms",      DataType::Int64, false),
    Field::new("key",        DataType::Utf8,  false),
    Field::new("after",      DataType::Utf8,  true),
]));
}
}

The schema is the contract every published batch must satisfy. The engine reserves the _offset, _row_idx, and _produced_at column names (all leading-underscore names are reserved); user schemas must not include them.

Register the topic

The simplest path is the SQL surface — session.sql("CREATE TOPIC …") parses the same statement Flight SQL clients send across the wire:

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use jammi_db::session::JammiSession;
async fn ex(session: &JammiSession) -> jammi_db::error::Result<()> {
session
    .sql(
        "CREATE TOPIC cdc.orders (\
             op TEXT NOT NULL, ts_ms BIGINT NOT NULL, \
             key TEXT NOT NULL, after TEXT) \
         WITH (retention_seconds = '604800')",
    )
    .await?;
Ok(())
}
}

The CLI exposes the same shape via jammi trigger register --name … --schema ….

import jammi_ai

db = jammi_ai.connect(artifact_dir="/var/lib/jammi")
db.sql(
    """
    CREATE TOPIC cdc.orders (
        op    TEXT NOT NULL,
        ts_ms BIGINT NOT NULL,
        key   TEXT NOT NULL,
        after TEXT
    ) WITH (retention_seconds = '604800')
    """
)

For callers that build the topic programmatically (rather than via SQL), the Rust API surface is equivalent:

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate arrow_schema;
use std::collections::BTreeMap;
use std::sync::Arc;
use arrow_schema::SchemaRef;
use jammi_db::trigger::{TopicDefinition, TopicId};

fn make(schema: SchemaRef) -> TopicDefinition {
let topic = TopicDefinition {
    id: TopicId::new(),
    name: "cdc.orders".into(),
    schema,
    tenant: None,                          // None = global; Some(t) scopes to t
    broker_metadata: BTreeMap::new(),      // driver-specific opts (e.g. retention)
};
topic
}
}

The id is a UUIDv7 minted at construction — time-ordered so the catalog index keeps insert locality. The name is opaque to the engine beyond catalog lookup; pick a hierarchical namespace that suits your platform (e.g. cdc.orders, feature_store.user_features).

Registration is atomic: the topics row, the backing mutable table, and any broker-side state commit together; nothing lands on failure.

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use std::sync::Arc;
use jammi_db::trigger::{TopicDefinition, TriggerBroker};
async fn ex(
    topic_repo: &jammi_db::catalog::topic_repo::TopicRepo,
    broker: Arc<dyn TriggerBroker>,
    topic: &TopicDefinition,
) -> Result<(), jammi_db::trigger::TriggerError> {
broker.register_topic(topic).await?;
topic_repo.register_topic(topic).await?;
Ok(())
}
}

Publish a batch

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate arrow;
extern crate arrow_schema;
extern crate tokio;
use std::sync::Arc;
use arrow::array::{Int64Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use jammi_db::trigger::{Publisher, TopicDefinition};
use jammi_db::TenantId;
async fn ex(
    publisher: &Publisher,
    topic: &TopicDefinition,
    schema: SchemaRef,
    tenant: Option<TenantId>,
) -> Result<(), jammi_db::trigger::TriggerError> {
let batch = RecordBatch::try_new(
    schema,
    vec![
        Arc::new(StringArray::from(vec!["c", "u", "d"])),
        Arc::new(Int64Array::from(vec![1700_000_000_000, 1700_000_000_100, 1700_000_000_200])),
        Arc::new(StringArray::from(vec!["order-1", "order-2", "order-3"])),
        Arc::new(StringArray::from(vec![Some("{...}"), Some("{...}"), None])),
    ],
)
.unwrap();
let offset = publisher.publish_scoped(topic, tenant, batch).await?;
println!("published offset = {}", offset.value());
Ok(())
}
}

publish_scoped tags every row’s tenant_id column from the explicit tenant: Option<TenantId> argument — no silent dependency on session state at publish time. Pass None for global topics; pass the session’s current tenant (session.tenant()) for tenant-scoped publishes.

Python equivalent — publish_topic accepts a pyarrow.Table via the Arrow C Stream Interface so the conversion is zero-copy:

import pyarrow as pa

table = pa.table({
    "op":    ["c", "u", "d"],
    "ts_ms": [1700_000_000_000, 1700_000_000_100, 1700_000_000_200],
    "key":   ["order-1", "order-2", "order-3"],
    "after": ["{...}", "{...}", None],
})
offset = db.publish_topic("cdc.orders", batch=table)
print(f"published offset = {offset}")

publish_scoped validates the batch schema against the topic schema before opening a transaction. A mismatch returns BatchSchemaMismatch and nothing lands in the backing table. If the topic is tenant-pinned (TopicDefinition::tenant = Some(t)) and the tenant argument doesn’t match, the publish is rejected up front with PublishTenantMismatch.

What just happened

  1. The Publisher minted the next monotonic offset for the topic (seeded lazily from MAX(_offset) on the backing table the first time the topic is touched).
  2. The augmented batch — user columns plus _offset, _row_idx, and _produced_at — was inserted into the topic’s backing mutable table inside one CatalogBackend::transaction. On commit the offset advances; on rollback it is reused for the next attempt so no gaps appear in the log.
  3. The broker received the batch for best-effort fan-out to any live subscribers. A broker fan-out failure after commit is logged but does not fail the publish — subscribers replay missed offsets from the backing table on next reconnect.

See also

Subscribe to a Topic with a SQL Predicate Filter

A subscription tails a topic and yields only the batches whose rows satisfy a SQL WHERE-clause predicate. The predicate is parsed once through DataFusion at subscribe time; the broker delivers each batch, the engine evaluates the predicate against it, and rows that match flow to the consumer.

Reach for predicate-filtered subscriptions when different downstream consumers need different selectivity on the same event stream — a search-index that only cares about op = 'c', an audit log that wants every event, a cache invalidator that wants op IN ('u', 'd').

Goal

Open a subscription on the cdc.orders topic with a predicate that matches only deletes, and consume the stream from Rust.

Setup

Assumes the topic was registered (see Publish Events to a Topic) and you have a Subscriber constructed against the same broker the publisher uses.

Open the subscription

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate arrow;
extern crate arrow_schema;
extern crate datafusion;
extern crate futures;
extern crate tokio;
use std::sync::Arc;
use arrow_schema::SchemaRef;
use datafusion::execution::context::SessionContext;
use futures::StreamExt;
use jammi_db::trigger::{Predicate, Subscriber, TopicDefinition};
async fn ex(
    subscriber: &Subscriber,
    session: &SessionContext,
    topic: &TopicDefinition,
) -> Result<(), jammi_db::trigger::TriggerError> {
let predicate = Predicate::from_sql(session, Arc::clone(&topic.schema), "op = 'd'")?;

let mut stream = subscriber
    .subscribe(topic, predicate, None /* from_offset: None = live tail */)
    .await?;

while let Some(delivered) = stream.next().await {
    let batch = delivered?;
    handle_deletes(batch.batch);
}
Ok(())
}
fn handle_deletes(_: arrow::array::RecordBatch) {}
}

from_offset = None starts the subscription at the broker’s live tail (no replay). Some(0) starts from the earliest retained event; the engine joins backing-table replay with the live broker stream so the client sees one continuous sequence of DeliveredBatch.

Predicate dialect

Predicates are a subset of DataFusion SQL. The whitelist:

SupportedRejected
Column references (col)Subqueries (SELECT …)
Literal scalars (1, 'foo', true)Aggregates (SUM, COUNT, …)
Comparison ops (=, <, >, <=, >=, !=)Window functions
Boolean ops (AND, OR, NOT)Joins
IS NULL, IS NOT NULLCASE WHEN
IN (literal, literal, …)Functions outside the whitelist
LIKE, BETWEEN
Whitelisted string functions

The string-function whitelist is lower, upper, length, starts_with, ends_with. Anything outside this list returns PredicateUnsupported at subscribe time — the stream never opens. An unparseable predicate returns PredicateParse for the same reason.

Reconnection and replay

If your consumer disconnects and reconnects, pass the last-seen offset as from_offset to resume without missing events:

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate chrono;
extern crate tokio;
use chrono::Utc;
use std::sync::Arc;
use jammi_db::trigger::{Offset, Predicate, Subscriber, TopicDefinition};
async fn ex(
    subscriber: &Subscriber,
    topic: &TopicDefinition,
    last_seen: u64,
) -> Result<(), jammi_db::trigger::TriggerError> {
let resume_from = Offset::new(last_seen + 1, Utc::now());
let _stream = subscriber
    .subscribe(topic, Predicate::match_all(), Some(resume_from))
    .await?;
Ok(())
}
}

The engine reads the backing table for offsets >= resume_from, then attaches the broker live stream starting strictly above the last replayed offset — the two halves never deliver the same offset twice.

Backpressure

A slow consumer slows the producer; events are not dropped. The chain is: the broker tail backs up onto a bounded mpsc::channel, the channel’s send() future awaits, the broker poll loop pauses, publishers awaiting the broker fan-out experience matching back- pressure. The backing table — the authoritative log — is still written without delay, so a consumer that disconnects under load can always catch up via replay.

See also

Replay Events from the Backing Table

Every topic’s event log is a Phase-2 mutable companion table named __topic_<topic_id>. The double-underscore prefix is reserved for engine-controlled tables; consumers do not register tables under that namespace. Flight SQL queries against the backing table compose with the same federation surface the rest of Jammi exposes — joins with result tables, predicate pushdown, aggregates over event history.

Reach for direct replay when a tenant needs ad-hoc analytics on the event log that would be awkward through the subscribe surface — counting events per key, computing per-day rollups, joining the event stream against a Parquet result table.

Goal

Run an ad-hoc query that returns the count of op = 'd' events per hour over the durable log for cdc.orders.

Backing table naming

Every registered topic has a backing table whose name is __topic_<topic_id> where <topic_id> is the hyphenated lowercase TopicId::Display. To find the name, query topics:

SELECT topic_id, name, backing_table FROM topics WHERE name = 'cdc.orders';

Schema

The backing table’s columns are the topic’s user schema with three engine-controlled columns prepended:

ColumnTypePurpose
_offsetBIGINT NOT NULLMonotonic offset; stable across rows of one publish.
_row_idxBIGINT NOT NULLPosition within a publish, for the composite PK.
_produced_atBIGINT NOT NULL (UTC microseconds)Publisher-side timestamp, single value per offset.
…user cols…per TopicDefinition.schemaPayload columns.
tenant_idTEXT (nullable, added by Phase 2)Tenant scope per ADR-00.

The primary key is (_offset, _row_idx); _offset is the order column so scan_after and ORDER BY _offset agree.

Query

SELECT
    DATE_TRUNC('hour', TIMESTAMP_MICROS(_produced_at))      AS hour,
    COUNT(*)                                                 AS deletes
FROM    mutable.public.__topic_019088da_1234_7890_abcd_ef1234567890
WHERE   op = 'd'
GROUP BY hour
ORDER BY hour;

Substitute your topic’s backing_table (looked up from the topics catalog row) for the literal name in the example. The query runs through Flight SQL like any other federated query — predicate pushdown applies, joins compose, aggregates run.

Tenant scoping

The backing table carries the tenant_id column added by the Phase-2 mutable backend. Sessions bound to a tenant see only rows whose tenant_id matches or is NULL, per Phase 3’s predicate-injection analyzer rule — the same guard that scopes the rest of the catalog.

See also

Scope a Session to a Tenant

When more than one logical tenant shares a Jammi engine — a SaaS feature store serving two ML teams, a research workbench shared across three labs, a notebook product hosting one project per student — every catalog read and write needs to belong to the right tenant. Jammi’s session-scoped tenant binding does this without the caller having to spell a WHERE tenant_id = … clause on every query.

Goal

After this recipe you can:

  1. Bind a tenant to a session in Rust, Python, and on the CLI.
  2. Verify that two sessions on the same process see disjoint rows.
  3. Bind a tenant on a remote client via the gRPC SessionService so subsequent Flight SQL queries from the same connection observe the tenant.

Setup

Every example below assumes a configured JammiConfig (defaults are fine for the recipe). The tenant identifier is a UUID v4 or v7 string — the engine refuses the nil UUID (00000000-…) at the TenantId newtype boundary.

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use std::str::FromStr;
use jammi_db::TenantId;
use jammi_db::session::JammiSession;
use jammi_db::config::JammiConfig;

async fn ex() -> jammi_db::error::Result<()> {
let config = JammiConfig::default();
let alice = TenantId::from_str("018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9a")?;

let session = JammiSession::new(config).await?.with_tenant(alice);
// Every catalog read and write on `session` now scopes to Alice.
Ok(())
}
}

with_tenant is a builder that consumes self and returns Self, so it chains naturally. If you hold a session behind Arc, use bind_tenant(&t) to update the binding in place — the session shares one TenantBinding across all references.

Python

import jammi_ai

db = jammi_ai.connect(artifact_dir="/tmp/jammi")
db.with_tenant("018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9a")

# Subsequent calls observe Alice's tenant scope.
db.add_source("inbox", path="/data/alice/inbox.parquet", format="parquet")
db.sql("SELECT * FROM inbox.public.inbox")

Pass an empty string to clear: db.with_tenant("").

CLI

The --tenant flag is global; it applies to every subcommand.

jammi --tenant 018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9a sources list
jammi --tenant 018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9b query "SELECT * FROM models"

Remote clients (gRPC + Flight SQL)

A programmatic client (Python, Go, Java) binds the tenant once per connection via the jammi.v1.session.SessionService.SetTenant RPC. The server records the tenant against the jammi-session-id request metadata header; every Flight SQL query the same connection issues afterwards inherits the binding through the TenantInterceptor that fronts both services. Browser clients reach the same SessionService over HTTP/1.1 via the gRPC-Web shim (application/grpc-web+proto) — no separate REST surface, same jammi-session-id header semantics.

import grpc
from jammi.v1.session.session_pb2 import SetTenantRequest, Tenant
from jammi.v1.session.session_pb2_grpc import SessionServiceStub

channel = grpc.insecure_channel("jammi.example.com:50051")
metadata = [("jammi-session-id", "my-client-uuid")]

session = SessionServiceStub(channel)
session.SetTenant(
    SetTenantRequest(tenant=Tenant(id="018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9a")),
    metadata=metadata,
)
# Subsequent Flight SQL queries on the same channel + jammi-session-id
# observe Alice's tenant scope.

Disjoint views — what to expect

Two sessions on the same process, bound to different tenants, will:

  1. Read each other as invisible: list_sources() returns the calling tenant’s sources plus any globally-scoped (tenant_id IS NULL) sources.
  2. Write into different lanes: a register_source from Alice produces a row tagged tenant_id = alice; Bob’s list_sources does not see it.
  3. Share globally-scoped rows: an unscoped (tenant_id IS NULL) registration — typically a public reference dataset — is visible to every tenant.

The engine enforces the binding at three layers (the SPEC-03 defence-in-depth discipline):

  • Read-side predicate injectionTenantScopeAnalyzerRule injects tenant_id = $current OR tenant_id IS NULL on every TableScan whose schema declares the column.
  • Write-side guard — every catalog register_* and the mutable-table sink calls Transaction::assert_tenant_matches before INSERT.
  • Storage-side filter — catalog repo reads also pass the predicate to the backend SQL layer, so the wrong tenant’s rows never leave the database.

A buggy caller that constructs a row with the wrong tenant_id gets BackendError::TenantMismatch from the guard layer.

When the binding doesn’t apply

  • External federated sources without a tenant_id column — Jammi’s analyzer rule has no column to inject against, so those sources show every row to every tenant unless the source declaration registers a tenant_column override. Catalog tables and mutable companion tables always carry the column.
  • Cross-tenant WHERE clauses the caller writes by hand — a query that contains WHERE tenant_id = 'other-tenant' runs against the injected predicate plus the user’s clause; the analyzer rule does not remove user-written predicates.
  • Single-tenant deployments — bind nothing and every row is global; no predicate is injected beyond tenant_id IS NULL.

See also

Scope a Federated Source by Tenant

The session-scoped tenant binding (multi-tenant.md) relies on every table the engine reads carrying a tenant_id column. That works for mutable companion tables and Parquet result tables Jammi produced itself — both emit the column by ADR-00. But a federated source — a remote Postgres warehouse, a S3 Parquet lake, a CSV from someone else’s pipeline — usually doesn’t. It may carry a customer_id, an organization, a workspace column, or no tenant discriminator at all.

This recipe shows how to tell Jammi which column on a federated source plays the role of the tenant discriminator, so the predicate-injection analyzer rule scopes scans against that column instead of looking for the engine’s built-in tenant_id name.

Goal

After this recipe you can:

  1. Register a federated source whose tenant discriminator is named differently from tenant_id.
  2. Tell the analyzer rule which column to use.
  3. Verify two tenants get disjoint rows from the same physical source.
  4. Recognise what set_source_tenant_column does not do.

Setup

The recipe assumes you have a Parquet file (or any other federated source) whose schema includes a column that already carries the tenant identifier — for example a customer_id column populated with the UUID of the customer who owns each row. The column’s value must be the same canonical hyphenated lowercase form TenantId::Display emits; the analyzer rule does a string comparison after coercing the column to Utf8.

Register a federated source

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use jammi_db::session::JammiSession;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};

async fn ex(session: &JammiSession) -> jammi_db::error::Result<()> {
session
    .add_source(
        "notes",
        SourceType::File,
        SourceConnection {
            url: Some("file:///data/notes.parquet".into()),
            format: Some(FileFormat::Parquet),
            ..Default::default()
        },
    )
    .await?;
Ok(())
}
}

Python

db.add_source("notes", path="/data/notes.parquet", format="parquet")

Declare its tenant column

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
use jammi_db::session::JammiSession;
fn ex(session: &JammiSession) {
session.set_source_tenant_column("notes", Some("customer_id".into()));
}
}

set_source_tenant_column registers the override on the session’s SourceTenantColumns map. The next time the analyzer rule sees a scan against notes.public.notes, it discovers the override and injects WHERE CAST(customer_id AS Utf8) = $current_tenant OR CAST(customer_id AS Utf8) IS NULL (or IS NULL only when the session is unscoped).

The Python and CLI surfaces do not expose this method today — it lives on Rust JammiSession only. If you embed Jammi as a library this is the right hook; if you reach Jammi over Flight SQL / gRPC, the source registration and tenant-column declaration happen on the server side before the server starts accepting client connections.

Schema column tenant_id always wins. Only call set_source_tenant_column when your federated source carries the discriminator under a different name, or when it carries the discriminator at all — sources without any tenant column remain globally visible.

Verify the predicate

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate tokio;
use std::str::FromStr;
use jammi_db::TenantId;
use jammi_db::session::JammiSession;
use jammi_db::config::JammiConfig;
async fn ex() -> jammi_db::error::Result<()> {
let alice = TenantId::from_str("018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9a")?;
let bob = TenantId::from_str("018f5a0e-c4c8-7e10-9c4f-3b6f7c5a8e9b")?;

let session_a = JammiSession::new(JammiConfig::default()).await?.with_tenant(alice);
session_a.set_source_tenant_column("notes", Some("customer_id".into()));

let session_b = JammiSession::new(JammiConfig::default()).await?.with_tenant(bob);
session_b.set_source_tenant_column("notes", Some("customer_id".into()));

let count_a = session_a.sql("SELECT COUNT(*) FROM notes.public.notes").await?;
let count_b = session_b.sql("SELECT COUNT(*) FROM notes.public.notes").await?;

// Each session sees only its own rows — `count_a` and `count_b` are
// disjoint subsets of the on-disk file.
Ok(())
}
}

For a file with 10 rows split 6 (customer_id = alice) + 4 (customer_id = bob), the two sessions get 6 and 4 respectively.

What you cannot do

  • You cannot point set_source_tenant_column at a column that doesn’t exist on the source. The analyzer rule emits a column reference that DataFusion later fails to resolve at execution time, surfacing as a DataFusionError::SchemaError. The override is a trust contract — the engine does not validate the column’s presence at registration time.
  • You cannot mix tenant_id and a non-tenant_id column on the same source. When the source’s schema already declares tenant_id, the built-in column wins and the override is ignored.
  • You cannot remove the discriminator at runtime once tenants are actively querying. Call set_source_tenant_column("notes", None) to drop the override; subsequent queries on notes.public.notes will not be tenant-scoped at all.

If the federated source you are wrapping carries no tenant discriminator, two options are open: (1) re-shape upstream so each tenant lands in its own table, registered as a separate source, or (2) accept that the source is globally visible to every session and gate access at a higher layer (Flight SQL session interceptor, gRPC auth middleware). The engine itself does not authenticate; ADR-00 § Engine does not invent tenants applies.

See also

Deploy as a Server

Jammi can run as an Arrow Flight SQL server, making all registered sources and embedding tables queryable from any Arrow-compatible client. Use this when multiple services, BI tools, or non-Rust/Python consumers need to query Jammi’s data.

The workflow

The server is a read path. Set up your data with the library or CLI, then deploy the server so other systems can query it:

# 1. Register sources and generate embeddings (CLI or library)
jammi sources add patents --path /data/patents.parquet --format parquet

# 2. Generate embeddings (library or Python — not available over Flight SQL)
python3 -c "
import jammi_ai
db = jammi_ai.connect()
db.generate_embeddings(source='patents', model='sentence-transformers/all-MiniLM-L6-v2', columns=['abstract'], key='id')
"

# 3. Start the server
jammi serve

Connecting with Arrow Flight SQL

Python (pyarrow)

from pyarrow.flight import FlightClient, FlightDescriptor

client = FlightClient("grpc://localhost:8081")

# Run a SQL query
info = client.get_flight_info(
    FlightDescriptor.for_command(b"SELECT id, title, year FROM patents.public.patents WHERE year > 2020")
)
reader = client.do_get(info.endpoints[0].ticket)
table = reader.read_all()
print(table.to_pandas())

Query embedding tables

Embedding tables are registered in DataFusion and queryable via SQL:

# List all embedding tables
info = client.get_flight_info(
    FlightDescriptor.for_command(b"SELECT table_name FROM information_schema.tables WHERE table_schema = 'jammi'")
)

# Query vectors directly
info = client.get_flight_info(
    FlightDescriptor.for_command(b"SELECT _row_id, _model_id FROM \"jammi.patents__embedding__all-MiniLM-L6-v2__20260325\" LIMIT 10")
)

JDBC

Flight SQL is compatible with JDBC drivers that support the Arrow Flight SQL protocol, enabling access from Java applications, BI tools (Superset, DBeaver, Tableau), and SQL editors.

Server configuration

[server]
flight_listen = "0.0.0.0:8081"
preload_models = ["sentence-transformers/all-MiniLM-L6-v2"]

[logging]
level = "info"
format = "json"    # structured logging for production

Preloading models

Models listed in preload_models are downloaded and loaded into memory at startup. This ensures the session is warm before the server accepts connections.

[server]
preload_models = [
    "sentence-transformers/all-MiniLM-L6-v2",
    "BAAI/bge-small-en-v1.5",
]

GPU configuration

For GPU-accelerated inference in production:

[gpu]
device = 0            # CUDA device index
memory_limit = "auto"
memory_fraction = 0.9

[inference]
batch_size = 64
max_loaded_models = 3

Set gpu.device = -1 for CPU-only deployment.

Environment variable overrides

Every config field can be overridden with environment variables, useful for containerized deployments:

JAMMI_SERVER__FLIGHT_LISTEN=0.0.0.0:9081 \
JAMMI_GPU__DEVICE=-1 \
JAMMI_LOGGING__FORMAT=json \
jammi serve

Health, readiness, and metrics

The server exposes three HTTP side-channel endpoints on port 8080:

curl http://localhost:8080/healthz
# {"status":"ok","version":"0.8.0"}

curl http://localhost:8080/readyz
# {"status":"ready"}

curl http://localhost:8080/metrics
# jammi_grpc_requests_total 0
# jammi_flight_queries_total 0
# jammi_eval_invocations_total 0
# jammi_search_latency_seconds_bucket{...} 0

/healthz is a liveness probe — a 200 means the process is running. /readyz is a readiness probe — 200 means the catalog backend responded; 503 means it didn’t and traffic should be drained from this instance. Point your load balancer at /readyz.

/metrics exposes a small, substrate-level set of Prometheus counters (gRPC requests, Flight SQL queries, eval invocations) plus a search- latency histogram. Wider observability lives in the commercial server.

What the server can and cannot do

OperationAvailable over Flight SQL?
SQL queries on source tablesYes
SQL queries on embedding tablesYes
Joins, aggregations, filtersYes
Generate embeddingsNo — use library or CLI
Semantic vector searchNo — use library or CLI
Fine-tuningNo — use library or CLI
EvaluationNo — use library or CLI

The server is a query interface. ML operations (embeddings, search, fine-tuning) are done through the Rust library, Python package, or CLI — then the results are queryable over Flight SQL.

Graceful shutdown

The server drains active connections on SIGTERM / Ctrl+C before exiting. In-flight queries complete; long-running operations started via the library are unaffected.

Deploying as a container

The OSS server ships as a public Docker image at ghcr.io/f-inverse/jammi-ai-server. The image is built from a distroless base, runs as the nonroot user (uid 65532), and exposes the same 8080 / 8081 ports the local binary listens on.

docker run --rm \
  -p 8080:8080 -p 8081:8081 \
  -v jammi_data:/var/lib/jammi \
  -v $(pwd)/jammi.toml:/etc/jammi/jammi.toml:ro \
  ghcr.io/f-inverse/jammi-ai-server:latest

A minimal compose file lives in the workspace at examples/docker-compose/oss-server.yml:

cd examples/docker-compose
docker compose -f oss-server.yml up

Persistence

/var/lib/jammi holds the catalog DB, model weights, and indices. The Dockerfile declares it as a VOLUME — bind mounts work, but the host directory must be writable by uid 65532:

# Bind mount on the host.
sudo chown -R 65532:65532 /opt/jammi/data
docker run -v /opt/jammi/data:/var/lib/jammi ...

A named Docker volume (the compose default) sidesteps that step because Docker provisions ownership for the container’s user automatically.

Configuration

The container’s entrypoint expects /etc/jammi/jammi.toml. Bind-mount your config there:

# oss-server.yml
services:
  jammi-server:
    image: ghcr.io/f-inverse/jammi-ai-server:latest
    volumes:
      - ./jammi.toml:/etc/jammi/jammi.toml:ro
      - jammi_data:/var/lib/jammi
    ports:
      - "8080:8080"
      - "8081:8081"

Building from source

The Dockerfile lives at the workspace root and uses BuildKit cache mounts for the cargo registry and target directory:

DOCKER_BUILDKIT=1 docker build -t jammi-ai-server:dev -f Dockerfile .

Cold builds take ~30 minutes (the workspace is large); warm builds with cache hits land at ~3 minutes.

Monitor Inference

Attach an observer to inspect every output batch during inference. Use this for logging, metrics collection, quality checks, or progress tracking.

Attach an observer

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate arrow;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::config::JammiConfig;
async fn ex(config: JammiConfig) -> jammi_db::error::Result<()> {
use jammi_ai::inference::observer::InferenceObserver;
use std::sync::Arc;

struct MetricsCollector;

impl InferenceObserver for MetricsCollector {
    fn on_batch(
        &self,
        batch: &arrow::record_batch::RecordBatch,
        model_id: &str,
        latency: std::time::Duration,
    ) {
        println!(
            "Batch: {} rows from {model_id} in {latency:?}",
            batch.num_rows()
        );
    }
}

let session = InferenceSession::with_observer(
    config,
    Some(Arc::new(MetricsCollector) as Arc<dyn InferenceObserver>),
).await?;
Ok(()) }
}

The observer is called once per output batch. When no observer is attached, the overhead is a single Option branch — effectively zero.

Use cases

Progress logging

#![allow(unused)]
fn main() {
extern crate jammi_ai;
extern crate arrow;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use arrow::record_batch::RecordBatch;
use jammi_ai::inference::observer::InferenceObserver;
struct ProgressLogger { total: AtomicUsize }

impl InferenceObserver for ProgressLogger {
    fn on_batch(&self, batch: &RecordBatch, _model_id: &str, _latency: Duration) {
        let count = self.total.fetch_add(batch.num_rows(), Ordering::Relaxed) + batch.num_rows();
        eprintln!("Processed {count} rows...");
    }
}
}

Quality checks

#![allow(unused)]
fn main() {
extern crate jammi_ai;
extern crate arrow;
use std::time::Duration;
use arrow::array::StringArray;
use arrow::record_batch::RecordBatch;
use jammi_ai::inference::observer::InferenceObserver;
struct QualityChecker;

impl InferenceObserver for QualityChecker {
    fn on_batch(&self, batch: &RecordBatch, model_id: &str, latency: Duration) {
        // Check for high error rates
        let status = batch.column_by_name("_status").unwrap();
        let errors = status.as_any().downcast_ref::<StringArray>().unwrap()
            .iter().filter(|s| s == &Some("error")).count();

        if errors > batch.num_rows() / 2 {
            eprintln!("WARNING: {model_id} batch has {errors}/{} errors", batch.num_rows());
        }
    }
}
}

Latency tracking

#![allow(unused)]
fn main() {
extern crate jammi_ai;
extern crate arrow;
use std::time::Duration;
use arrow::record_batch::RecordBatch;
use jammi_ai::inference::observer::InferenceObserver;
struct LatencyTracker { slow_threshold: Duration }

impl InferenceObserver for LatencyTracker {
    fn on_batch(&self, batch: &RecordBatch, model_id: &str, latency: Duration) {
        if latency > self.slow_threshold {
            eprintln!(
                "SLOW: {model_id} took {latency:?} for {} rows ({:?}/row)",
                batch.num_rows(),
                latency / batch.num_rows() as u32,
            );
        }
    }
}
}

Pipeline architecture

Source (Parquet/CSV/DB)
    |
    v  DataFusion scan
    |
InferenceExec operator
    |-- Loads model (or cache hit)
    |-- Bounded channel (capacity=2, backpressure)
    |-- InferenceRunner (async task)
    |     |-- Reads input batches
    |     |-- Extracts text from content columns
    |     |-- Tokenizes with model's tokenizer
    |     |-- BERT forward pass
    |     |-- Mean pooling + L2 normalization
    |     |-- Constructs prefix + vector columns
    |     |-- ** Observer called here **
    |     '-- Sends to output channel
    |
    v  RecordBatch stream
    |
Results

Model caching

Models are loaded once and cached with LRU eviction:

  • First load: downloads from HF Hub (or reads from local path), loads weights into memory
  • Subsequent calls: cache hit, returns immediately
  • Ref counting: model stays in memory while any inference is running
  • Eviction: when the LRU limit is reached, the least-recently-used model with no active references is evicted

Configuration

Jammi loads configuration from three sources, in priority order:

  1. Config file (TOML) — explicit path, $JAMMI_CONFIG env var, ./jammi.toml, or ~/.config/jammi/config.toml
  2. Environment variablesJAMMI_GPU__DEVICE=0, JAMMI_INFERENCE__BATCH_SIZE=64
  3. Defaults — sensible defaults for all fields
#![allow(unused)]
fn main() {
extern crate jammi_db;
use std::path::Path;
use jammi_db::config::JammiConfig;
fn ex() -> jammi_db::error::Result<()> {
// Load with defaults
let config = JammiConfig::load(None)?;

// Load from a specific file
let config = JammiConfig::load(Some(Path::new("/path/to/jammi.toml")))?;
Ok(()) }
}

Full reference

# Where Jammi stores artifacts (catalog DB, model cache, embeddings)
# Default: platform-specific data directory (~/.local/share/jammi on Linux)
artifact_dir = "/path/to/artifacts"

[engine]
# Number of DataFusion execution threads. Default: number of CPUs.
execution_threads = 8
# Memory limit for the query engine. Default: "75%".
memory_limit = "75%"
# Maximum rows per DataFusion batch. Default: 8192.
batch_size = 8192

[gpu]
# GPU device index. -1 for CPU only. Default: 0.
device = -1
# GPU memory limit. Default: "auto".
memory_limit = "auto"
# Fraction of GPU memory Jammi may use. Default: 0.9.
memory_fraction = 0.9

[inference]
# Default backend selection strategy. Default: "auto".
default_backend = "auto"
# Maximum rows per inference batch. Default: 32.
batch_size = 32
# Timeout for batch accumulation in server mode (seconds). Default: 300.
batch_timeout_secs = 300
# Maximum models kept loaded simultaneously. 0 = unlimited. Default: 0.
max_loaded_models = 0

[inference.http]
# HTTP request timeout (seconds). Default: 60.
timeout_secs = 60
# Custom headers for HTTP model endpoints.
[inference.http.headers]
# Authorization = "Bearer sk-..."

[embedding]
# Distance metric for vector indices. Default: "cosine".
default_distance_metric = "cosine"
# Index type for vector storage. Default: "ivf_hnsw_sq".
default_index_type = "ivf_hnsw_sq"
# Rows between embedding index checkpoints. Default: 1000.
checkpoint_interval = 1000

[fine_tuning]
# LoRA rank for fine-tuning. Default: 8.
default_lora_rank = 8
# Learning rate. Default: 0.0002.
default_learning_rate = 0.0002
# Training epochs. Default: 3.
default_epochs = 3
# Training batch size. Default: 8.
default_batch_size = 8
# Checkpoint every N fraction of training. Default: 0.1.
checkpoint_fraction = 0.1

[cache]
# Enable ANN query cache. Default: true.
ann_cache_enabled = true
# Max cached ANN queries. Default: 10000.
ann_cache_max_entries = 10000
# Enable embedding cache. Default: true.
embedding_cache_enabled = true
# Embedding cache size. Default: "1GB".
embedding_cache_size = "1GB"

[server]
# Health probe listen address. Default: "0.0.0.0:8080".
health_listen = "0.0.0.0:8080"
# Arrow Flight SQL listen address. Default: "0.0.0.0:8081".
flight_listen = "0.0.0.0:8081"
# Models to preload on server start. Default: [].
preload_models = ["sentence-transformers/all-MiniLM-L6-v2"]

[logging]
# Log level: "trace", "debug", "info", "warn", "error". Default: "info".
level = "info"
# Log format: "text" or "json". Default: "text".
format = "text"

Environment variable overrides

Every config field can be overridden with an environment variable using the pattern JAMMI_<SECTION>__<FIELD>:

VariableOverrides
JAMMI_ARTIFACT_DIRartifact_dir
JAMMI_ENGINE__BATCH_SIZEengine.batch_size
JAMMI_GPU__DEVICEgpu.device
JAMMI_INFERENCE__BATCH_SIZEinference.batch_size
JAMMI_LOGGING__LEVELlogging.level

Note the double underscore (__) separating section and field.

Catalog Backend and Trigger Broker

Coordinator to relocate to the docs site (C3) when scaffold lands.

Jammi’s catalog (models, sources, eval runs, mutable companion tables) and trigger broker (provenance channels, evidence streams) are selected through two fields on JammiConfig: catalog and broker. The dev-laptop default is SQLite + an in-process broker; production deployments swap one or both for Postgres + JetStream.

TOML schema

The catalog stanza is a tagged enum keyed by kind:

[catalog]
kind = "sqlite"
# path = "/var/lib/jammi/catalog.db"   # optional; defaults to {artifact_dir}/catalog.db
[catalog]
kind = "postgres"
url = "postgres://user:pass@host:5432/jammi"
pool_size = 16
max_lifetime_secs = 1800

The broker stanza follows the same shape:

[broker]
kind = "in_memory"
[broker]
kind = "jet_stream"
url = "nats://nats.svc:4222"
retention_seconds = 604800
credentials_path = "/var/run/secrets/nats.creds"

broker.kind = "jet_stream" requires the jetstream-broker cargo feature on jammi-db; selecting it without the feature returns JammiError::Config rather than panicking at session construction time.

Environment variable interpolation

JammiConfig::load substitutes ${NAME} patterns from the process environment before TOML parsing. The rules:

  • ${NAME} is replaced by the value of std::env::var("NAME").
  • A missing variable is an error. The loader never silently substitutes an empty string — that is a common source of “deployed config has an empty Postgres URL” outages.
  • $$ escapes a literal $.
  • A bare $ not followed by $ or { is preserved verbatim, so passwords containing a single $ slip through unchanged.
  • An unterminated ${ returns JammiError::Config.
  • Interpolation is one-pass and not recursive: ${X}’s value is not re-scanned.

Combined with the tagged-enum shape:

artifact_dir = "/var/lib/jammi"

[catalog]
kind = "postgres"
url = "${POSTGRES_URL}"
pool_size = 16
max_lifetime_secs = 1800

[broker]
kind = "jet_stream"
url = "nats://${NATS_HOST}:4222"
retention_seconds = 604800
credentials_path = "/var/run/secrets/nats.creds"

A working copy of this file ships at crates/jammi-db/examples/sample-postgres.toml.

SQLite vs Postgres trade-offs

ConcernSQLitePostgres
Operational footprintOne file under artifact_dir. No daemon.Externally-managed Postgres cluster.
Concurrent writersOne; WAL mode lets many readers run alongside one writer.Many.
Multi-process deploymentSingle-process only — sharing the file across jammi-server replicas corrupts WAL.Multi-replica safe.
Failure recoveryFile restore from backup.Standard Postgres point-in-time-recovery.
Pool tuningNone — opens one pool of 8 connections.pool_size + max_lifetime_secs honour sqlx::PgPool knobs.

For laptop / single-tenant deployments, SQLite is the right answer; the trade-off table tilts to Postgres the moment a second jammi-server replica enters the picture.

In-memory vs JetStream broker

ConcernInMemoryJetStream
PersistenceIn-process only; lost on restart.NATS server retains streams per retention_seconds.
Cross-process deliveryNone — a publish in process A is invisible to a subscriber in process B.All subscribers (any process, any host) see every published batch within the retention window.
AuthNone.Anonymous or NATS .creds file via credentials_path.
Operational footprintNone.One NATS server (or cluster).

In-memory is fine for tests, local development, and single-process server deployments where every consumer lives in the same jammi-server process. JetStream is required for any deployment that wants replay across restarts or fan-out across multiple jammi-server replicas.

Health probe

CatalogBackend::ping runs SELECT 1 against the underlying pool and classifies pool failures as BackendError::Unavailable. The /readyz endpoint on jammi-server (when wired) reaches this via session.catalog().ping().await. The primitive is cheap — microseconds against a warm pool — and never opens a transaction.

Architecture

Crate dependency graph

jammi-db (foundation)
    |
    |-- Config, Catalog, Sources, SQL execution
    |-- Parquet storage, ANN indexes, crash recovery
    |
    v
jammi-ai (intelligence)
    |
    |-- Model resolution, loading, caching
    |-- InferenceExec, AnnSearchExec operators
    |-- Embedding pipeline, result persistence
    |-- SearchBuilder, evidence provenance
    |-- Fine-tuning, evaluation
    |-- GPU scheduling
    |-- InferenceSession (wraps JammiSession)
    |
    +-------+-------+
    |               |
    v               v
jammi-server    jammi-python
    |               |
    |-- Flight SQL  |-- PyO3 bindings
    '-- Health API   '-- pyarrow interop

jammi-cli
    |
    '-- Clap CLI wrapping InferenceSession

jammi-db has no dependency on jammi-ai. The intelligence layer is an optional addition — you can use jammi-db standalone for SQL queries over local data.

Key types and their responsibilities

Engine layer (jammi-db)

TypeResponsibility
JammiConfigTOML + env config loading with defaults
CatalogSQLite-backed persistence for sources, models, result tables, eval runs, evidence channels
JammiSessionDataFusion session + source registration + SQL execution
SourceCatalog / JammiSchemaProviderDataFusion catalog integration
ResultStoreParquet storage coordinator: create, finalize, recover, register
ParquetResultWriterZSTD-compressed Parquet file writer (64K row groups)
VectorIndex / SidecarIndexANN index trait + USearch implementation with row_id mapping

AI layer (jammi-ai)

TypeResponsibility
InferenceSessionWraps JammiSession + ModelCache + ResultStore. Entry point for all operations
ModelResolverResolves model ID to file paths + backend. Chain: catalog -> local -> HF Hub
ModelCacheLRU cache with single-flight loading, ref-counted guards
CandleBackend / OrtBackendModel backends: Candle (safetensors, BERT + ModernBERT + DistilBERT + OpenCLIP ViT), ONNX Runtime
HttpBackendRemote backend: HTTP endpoint for embeddings
InferenceExecDataFusion ExecutionPlan operator for inference with backpressure
AnnSearchExecDataFusion ExecutionPlan leaf node for ANN vector search
EmbeddingPipelineOrchestrates embedding generation (text or image): model -> InferenceExec -> ResultSink -> index. Parameterized by ModelTask
ResultSinkStreams inference output to Parquet + sidecar index, filters failed rows
SearchBuilderFluent API: join, annotate, filter, sort, limit, select, run
EvidenceRow / RowProvenanceEvidence model types for provenance tracking
OutputAdapterTrait that converts raw model output to Arrow arrays per task
GpuSchedulerGPU memory permit system with budget-based admission control
FineTuneJobLoRA fine-tuning with contrastive loss, checkpointing, early stopping
EvalRunnerRetrieval and classification evaluation

Server layer (jammi-server)

TypeResponsibility
AppStateShared state: Arc<InferenceSession> + ANN cache
FlightSqlServiceArrow Flight SQL server backed by DataFusion
Health endpointHTTP /health for container liveness probes

Python layer (jammi-python)

TypeResponsibility
DatabasePyO3 class wrapping Arc<InferenceSession> with shared tokio runtime
SearchBuilderPyO3 class with imperative-style search composition
FineTuneJobPyO3 class for monitoring fine-tuning jobs
connect()Module-level function to create a Database

Data flow

SQL query path

JammiSession::sql("SELECT ...")
    -> DataFusion parses SQL
    -> Resolves table from SourceCatalog/JammiSchemaProvider
    -> Creates ListingTable scan from Parquet/CSV/JSON or federated source
    -> Executes plan
    -> Returns Vec<RecordBatch>

Embedding generation path (text and image)

InferenceSession::generate_text_embeddings(source, model, columns, key)
InferenceSession::generate_image_embeddings(source, model, image_column, key)
    -> EmbeddingPipeline::run(task = TextEmbedding | ImageEmbedding)
    -> Register result_table (status = "building")
    -> Build plan: SourceScan -> InferenceExec(task)
    -> InferenceExec dispatches to CandleModel::forward(content, task):
    |   TextEmbedding:  arrow_to_texts -> tokenize -> BERT/ModernBERT -> mean_pool -> L2_normalize
    |   ImageEmbedding: arrow_to_images -> preprocess (model-driven) -> ViT forward -> L2_normalize
    -> Stream batches through ResultSink
    |   |-- Filter _status = "ok"
    |   |-- Transform to embedding schema
    |   |-- Write to Parquet via ParquetResultWriter
    |   '-- Feed vectors to SidecarIndex::add()
    -> Close writer, build ANN index, save sidecar bundle
    -> Register as DataFusion table, update catalog to "ready"
    -> Return ResultTableRecord

Vector search path

InferenceSession::search(source, query_vec, k)
    -> Resolve embedding table from catalog
    -> AnnSearchExec: SidecarIndex (ANN) or exact_vector_search (fallback)
    -> Hydration: join ANN results back to source table
    -> SearchBuilder: .join() .annotate() .filter() .sort() .limit() .select()
    -> .run(): execute DataFusion plan, add provenance columns
    -> Returns Vec<RecordBatch> with similarity + original columns + evidence

Module layout

crates/jammi-db/src/
|-- config.rs           # Configuration loading
|-- error.rs            # Unified error type
|-- session.rs          # JammiSession (DataFusion wrapper)
|-- catalog/            # SQLite-backed catalog
|-- source/             # Source types, registry, schema provider
|-- store/              # ResultStore, Parquet writer/reader
'-- index/              # VectorIndex trait, sidecar, exact search

crates/jammi-ai/src/
|-- session.rs          # InferenceSession
|-- model/              # ModelResolver, ModelCache, backends
|-- operator/           # InferenceExec, AnnSearchExec
|-- inference/          # Runner, observer, output adapters, image preprocessing
|-- pipeline/           # EmbeddingPipeline, ResultSink
|-- evidence/           # Provenance types and columns
|-- search/             # SearchBuilder
|-- fine_tune/          # LoRA training, config, jobs
|-- eval/               # Retrieval and classification eval
'-- concurrency/        # GpuScheduler, permits

crates/jammi-server/src/
|-- lib.rs              # Health server startup, signal handling
|-- routes/health.rs    # GET /health liveness probe
|-- error.rs            # 404 fallback
'-- flight.rs           # Arrow Flight SQL service

crates/jammi-cli/src/
|-- main.rs             # Clap CLI entry point
'-- commands/           # serve, query, sources, models, explain

crates/jammi-python/src/
|-- lib.rs              # PyO3 module, connect()
|-- database.rs         # Database class
|-- search.rs           # SearchBuilder class
|-- job.rs              # FineTuneJob class
|-- convert.rs          # Arrow <-> PyArrow conversion
'-- error.rs            # Error conversion