Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Monitor Inference

Attach an observer to inspect every output batch during inference. Use this for logging, metrics collection, quality checks, or progress tracking.

Attach an observer

Rust

#![allow(unused)]
fn main() {
extern crate jammi_db;
extern crate jammi_ai;
extern crate arrow;
extern crate tokio;
use jammi_ai::session::InferenceSession;
use jammi_db::config::JammiConfig;
async fn ex(config: JammiConfig) -> jammi_db::error::Result<()> {
use jammi_ai::inference::observer::InferenceObserver;
use std::sync::Arc;

struct MetricsCollector;

impl InferenceObserver for MetricsCollector {
    fn on_batch(
        &self,
        batch: &arrow::record_batch::RecordBatch,
        model_id: &str,
        latency: std::time::Duration,
    ) {
        println!(
            "Batch: {} rows from {model_id} in {latency:?}",
            batch.num_rows()
        );
    }
}

let session = InferenceSession::with_observer(
    config,
    Some(Arc::new(MetricsCollector) as Arc<dyn InferenceObserver>),
).await?;
Ok(()) }
}

The observer is called once per output batch. When no observer is attached, the overhead is a single Option branch — effectively zero.

Use cases

Progress logging

#![allow(unused)]
fn main() {
extern crate jammi_ai;
extern crate arrow;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use arrow::record_batch::RecordBatch;
use jammi_ai::inference::observer::InferenceObserver;
struct ProgressLogger { total: AtomicUsize }

impl InferenceObserver for ProgressLogger {
    fn on_batch(&self, batch: &RecordBatch, _model_id: &str, _latency: Duration) {
        let count = self.total.fetch_add(batch.num_rows(), Ordering::Relaxed) + batch.num_rows();
        eprintln!("Processed {count} rows...");
    }
}
}

Quality checks

#![allow(unused)]
fn main() {
extern crate jammi_ai;
extern crate arrow;
use std::time::Duration;
use arrow::array::StringArray;
use arrow::record_batch::RecordBatch;
use jammi_ai::inference::observer::InferenceObserver;
struct QualityChecker;

impl InferenceObserver for QualityChecker {
    fn on_batch(&self, batch: &RecordBatch, model_id: &str, latency: Duration) {
        // Check for high error rates
        let status = batch.column_by_name("_status").unwrap();
        let errors = status.as_any().downcast_ref::<StringArray>().unwrap()
            .iter().filter(|s| s == &Some("error")).count();

        if errors > batch.num_rows() / 2 {
            eprintln!("WARNING: {model_id} batch has {errors}/{} errors", batch.num_rows());
        }
    }
}
}

Latency tracking

#![allow(unused)]
fn main() {
extern crate jammi_ai;
extern crate arrow;
use std::time::Duration;
use arrow::record_batch::RecordBatch;
use jammi_ai::inference::observer::InferenceObserver;
struct LatencyTracker { slow_threshold: Duration }

impl InferenceObserver for LatencyTracker {
    fn on_batch(&self, batch: &RecordBatch, model_id: &str, latency: Duration) {
        if latency > self.slow_threshold {
            eprintln!(
                "SLOW: {model_id} took {latency:?} for {} rows ({:?}/row)",
                batch.num_rows(),
                latency / batch.num_rows() as u32,
            );
        }
    }
}
}

Pipeline architecture

Source (Parquet/CSV/DB)
    |
    v  DataFusion scan
    |
InferenceExec operator
    |-- Loads model (or cache hit)
    |-- Bounded channel (capacity=2, backpressure)
    |-- InferenceRunner (async task)
    |     |-- Reads input batches
    |     |-- Extracts text from content columns
    |     |-- Tokenizes with model's tokenizer
    |     |-- BERT forward pass
    |     |-- Mean pooling + L2 normalization
    |     |-- Constructs prefix + vector columns
    |     |-- ** Observer called here **
    |     '-- Sends to output channel
    |
    v  RecordBatch stream
    |
Results

Model caching

Models are loaded once and cached with LRU eviction:

  • First load: downloads from HF Hub (or reads from local path), loads weights into memory
  • Subsequent calls: cache hit, returns immediately
  • Ref counting: model stays in memory while any inference is running
  • Eviction: when the LRU limit is reached, the least-recently-used model with no active references is evicted