Declare a Custom Provenance Channel
Every row that flows through Jammi carries provenance — retrieved_by and annotated_by lists that record how the row was found and what was added after retrieval. Jammi ships two built-in channels — vector (declares similarity) and inference (declares inference_model, inference_task, inference_confidence) — but the catalog accepts any channel a consumer wants to register. Each channel declares the columns it contributes; the engine merges those columns into every result RecordBatch at query time.
This recipe walks through registering a third channel, scored_by, for a multi-stage retrieval pipeline where a federated reranker rescores the vector hits. The same shape applies to any non-built-in provenance signal: a citation graph, an attribution chain, a quality-grading pass.
Setup
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate arrow;
extern crate tokio;
use jammi_db::config::JammiConfig;
async fn ex(config: JammiConfig) -> jammi_db::error::Result<()> {
use std::sync::Arc;
use arrow::array::{ArrayRef, Float32Array, StringArray};
use jammi_ai::evidence::{merge_channels, ChannelContribution};
use jammi_ai::session::InferenceSession;
use jammi_db::catalog::channel_repo::{ChannelColumn, ChannelColumnType, ChannelSpec};
use jammi_db::ChannelId;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
let session = Arc::new(InferenceSession::new(config).await?);
session.add_source("patents", SourceType::File, SourceConnection {
url: Some("file:///data/patents.parquet".into()),
format: Some(FileFormat::Parquet),
..Default::default()
}).await?;
Ok(()) }
}
Python
import jammi_ai
db = jammi_ai.connect(artifact_dir="/var/lib/jammi")
db.add_source("patents", path="/data/patents.parquet", format="parquet")
Declare the channel
Channel declarations are catalog rows. Each declared column has a name, an Arrow type, and an ordinal. The set is append-only — once ranker: Utf8 is declared, the engine refuses to redeclare it as Int32 or drop it.
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::catalog::channel_repo::{ChannelColumn, ChannelColumnType, ChannelSpec};
use jammi_db::ChannelId;
async fn ex(session: &Arc<InferenceSession>) -> jammi_db::error::Result<()> {
session.catalog().channels().register(&ChannelSpec {
id: ChannelId::new("scored_by")?,
priority: 3,
columns: vec![
ChannelColumn {
name: "ranker".into(),
data_type: ChannelColumnType::Utf8,
},
ChannelColumn {
name: "rank_score".into(),
data_type: ChannelColumnType::Float32,
},
],
}).await?;
Ok(()) }
}
Python
db.register_channel(
"scored_by",
priority=3,
columns=[("ranker", "Utf8"), ("rank_score", "Float32")],
)
priority controls the order columns appear in the merged output — vector (1) and inference (2) come first, then scored_by (3).
To add more columns to an already-registered channel:
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate tokio;
use std::sync::Arc;
use jammi_ai::session::InferenceSession;
use jammi_db::catalog::channel_repo::{ChannelColumn, ChannelColumnType};
use jammi_db::ChannelId;
async fn ex(session: &Arc<InferenceSession>) -> jammi_db::error::Result<()> {
session.catalog().channels().add_columns(
&ChannelId::new("scored_by")?,
&[ChannelColumn { name: "scored_at".into(), data_type: ChannelColumnType::Utf8 }],
).await?;
Ok(()) }
}
db.add_channel_columns("scored_by", columns=[("scored_at", "Utf8")])
add_columns is append-only by construction. Trying to redeclare ranker with the same or a different type returns JammiError::EvidenceChannel(_).
Use the channel
Build a ChannelContribution for each batch your reranker produces. The arrays must align 1:1 with the channel’s declared columns (ranker first, rank_score second) and have the same length as the batch’s row count.
Rust
#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate arrow;
extern crate tokio;
use std::sync::Arc;
use arrow::array::{ArrayRef, Float32Array, RecordBatch, StringArray};
use jammi_ai::evidence::{merge_channels, ChannelContribution};
use jammi_ai::session::InferenceSession;
use jammi_db::ChannelId;
fn rerank_scores(_batch: &RecordBatch) -> Vec<f32> { vec![] }
async fn ex(session: &Arc<InferenceSession>, batches: Vec<RecordBatch>) -> jammi_db::error::Result<()> {
let scored_by = ChannelId::new("scored_by")?;
let vector = ChannelId::new("vector")?;
let mut contributions = Vec::with_capacity(batches.len());
for batch in &batches {
let n = batch.num_rows();
let ranker: ArrayRef = Arc::new(StringArray::from(vec!["bm25"; n]));
let rank_score: ArrayRef = Arc::new(Float32Array::from(rerank_scores(batch)));
contributions.push(vec![ChannelContribution {
channel: scored_by.clone(),
columns: vec![ranker, rank_score],
}]);
}
let merged = merge_channels(
session.catalog(),
&batches,
&[vector.clone(), scored_by.clone()],
&[vector, scored_by], // retrieved_by
&[], // annotated_by
&contributions,
).await?;
Ok(()) }
}
Verify
The merged output schema includes the declared columns. Rows where the channel did not supply a value carry NULL.
Rust
#![allow(unused)]
fn main() {
extern crate arrow;
use arrow::array::RecordBatch;
fn ex(merged: Vec<RecordBatch>) {
let schema = merged[0].schema();
assert!(schema.field_with_name("ranker").is_ok());
assert!(schema.field_with_name("rank_score").is_ok());
for batch in &merged {
let ranker = batch.column_by_name("ranker").unwrap();
println!("first ranker: {:?}", ranker);
}
}
}
Python
From the SQL surface, the declared columns show up in any query that touches the result table — Python sees them as plain Arrow columns:
table = db.sql(
"SELECT _row_id, similarity, ranker, rank_score FROM results LIMIT 3"
)
for row in table.to_pylist():
print(row["ranker"], row["rank_score"])
What you cannot do
The channel declaration is append-only. Once scored_by ships with ranker: Utf8, you cannot:
-
Redeclare
rankerasInt32—add_columnsrejects withJammiError::EvidenceChannel("channel 'scored_by': column 'ranker' was declared Utf8, cannot redeclare as Int32"). From Python, the same call raisesRuntimeErrorcarrying the identical message:db.add_channel_columns("scored_by", columns=[("ranker", "Int32")]) # RuntimeError: channel 'scored_by': column 'ranker' was declared Utf8, cannot redeclare as Int32 -
Add a second column with the same name —
add_columnsrejects withJammiError::EvidenceChannel("channel 'scored_by': column 'ranker' already declared"). -
Drop
rankerfrom the channel — there is nodrop_columnmethod by design.
If a column needs to change shape, declare a new column under a new name and migrate consumers. This preserves byte-for-byte readability of any backing table or downstream artifact that already references the original column.