---
title: "How to Build Production Embedding Pipelines"
subtitle: "Infrastructure, Latency, and Reliability for Teams That Can't Afford to Guess"
author: "Kelly Price"
date: "2026-04-21"
description: "A hands-on guide to building embedding pipelines that hold up in production — covering infrastructure decisions, chunking, vector storage, serving patterns, drift monitoring, and incremental re-indexing."
tags: [embeddings, ai, developer-tools, infrastructure]
---

# How to Build Production Embedding Pipelines
## Infrastructure, Latency, and Reliability for Teams That Can't Afford to Guess

*Kelly Price*

---

## About This Guide

Most embedding pipeline tutorials end where the hard problems begin. They show you how to call `model.encode()`, store the result in a list, and retrieve the top-k nearest neighbors — then declare victory. What they skip is the part where your pipeline has to handle 80,000 documents at startup, serve queries in under 20ms on a CPU-only server, and stay coherent after 6,000 incremental updates over 18 months.

This guide covers what comes after the notebook. It is written for engineers who have already shipped one embedding-based feature — a semantic search bar, a recommendation system, a code assistant — and are now staring down the reliability, cost, and performance problems that come with keeping that feature alive in production.

The advice here comes from building and operating a real embedding pipeline for a production code intelligence system. That system indexes codebases of up to 100,000 files, handles per-file incremental updates triggered by Git webhooks, serves sub-10ms semantic queries, and runs on CPU hardware with a memory footprint small enough to coexist with the rest of a production server. The specific numbers throughout this book — 6ms embedding latency, 820MB to 197MB RSS reduction from switching to ONNX, 79% monthly cost reduction from replacing Pinecone with self-hosted pgvector — are measurements from that system, not estimates.

You will find no GPU requirements in this book. The default assumption is CPU-only infrastructure, because that is what most teams have access to, and because it forces you to make smart decisions about model size, batching, and serving architecture. The patterns here work on GPU hardware too, and are often faster than equivalent GPU setups that were never tuned.

Each chapter covers one concrete problem. You can read them in order or jump to the chapter that matches your current pain. The code examples are Python and SQL, and they are real — they run, they have edge cases handled, and they reflect decisions I would make again.

By the end, you will have the mental model and the implementation patterns to build an embedding pipeline that does not degrade quietly, does not surprise you with a $3,000 monthly bill, and does not require a full rebuild every time your data changes.

---

## Table of Contents

1. The Production Gap: Why Notebook Embeddings Don't Scale
2. Infrastructure Decisions: GPU vs CPU, ONNX vs PyTorch
3. Chunking Strategies That Survive Real Data
4. Vector Store Selection: pgvector vs ChromaDB vs Pinecone
5. Serving Patterns and Latency SLOs
6. Monitoring Embedding Drift in Production
7. Incremental Re-Embedding: Staying Fresh Without Full Rebuilds
8. Testing Your Pipeline End-to-End
9. Operating a Production Embedding System

Conclusion
Appendix A: Glossary
Appendix B: Tools and Resources
Appendix C: Further Reading

---

## Chapter 1: The Production Gap: Why Notebook Embeddings Don't Scale

There is a specific kind of confidence that comes from getting a Jupyter notebook to return the right document for a query. You embed ten sentences, compute cosine similarities, and the results look good. Maybe you bump it to a hundred documents and it still works. Then you ship it, and everything that was hidden in the notebook's happy path comes due at once.

The production gap is not a performance gap. It is a design gap. Notebook code is written to answer the question "does this approach work?" Production code has to answer "does this approach work, at the volume and latency my users need, while sharing a server with five other services, and while the data underneath it changes continuously?" Those are different questions, and the architecture that answers the first one well often answers the second one badly.

### What Breaks First

The first thing that breaks is memory. A typical transformer-based embedding model loaded with PyTorch occupies somewhere between 400MB and 600MB of RAM just for weights. Load it in a Jupyter notebook and you never notice, because the notebook is the only thing on the machine. Load it inside a production server process and you have immediately consumed most of the memory budget that was supposed to be shared with your API layer, your database connection pool, and your caching layer.

The second thing that breaks is latency. Embedding latency is not just the time it takes to run the model. It includes tokenization, padding to sequence length, the forward pass, normalization, and transfer to CPU if you were running on GPU. For a single document, that might be 20ms. For 10,000 documents at startup, that is — if you are lucky and do it in batches — several minutes of blocking initialization.

The third thing that breaks is consistency. Notebook code typically encodes queries and documents using the same function at different times. It almost never checks whether the model that produced the stored embeddings is the same model being used to encode the query. When you update the model — even a minor version bump that changes the tokenizer — your retrieval quality silently collapses.

The fourth thing that breaks is the index. A flat cosine search over 1,000 documents is fine. Over 100,000 documents, it becomes the dominant cost in your query path. Approximate nearest neighbor structures like HNSW help, but they introduce their own failure mode: they can degrade silently as the index grows without maintenance, with latency staying stable while retrieval quality drops. You will not notice this from your dashboards unless you instrument for it specifically.

### The Architecture You Actually Need

A production embedding pipeline has at least four distinct components: an **ingestion pipeline** that processes and embeds documents, a **vector store** that persists embeddings and handles retrieval, a **serving layer** that encodes queries and translates vector search results into API responses, and a **maintenance loop** that keeps the index fresh and detects when its quality has degraded.

None of these components can be the same Python function called in slightly different contexts. They have different performance characteristics, different failure modes, and different operational requirements. The ingestion pipeline is a batch job. The serving layer is a latency-sensitive hot path. The maintenance loop is a background process with different resource constraints than both.

> **Key Insight:** The decision to separate ingestion from serving is not about code cleanliness — it is about resource isolation. An ingestion job that suddenly has to re-embed 10,000 documents should not be competing for the same thread pool as a live query.

The chapters that follow go through each of these components in detail. But before getting into specifics, there is one mental model shift that matters more than any implementation detail: stop thinking about your embedding pipeline as a step in your application and start thinking about it as a service with an SLO. What is the maximum acceptable latency for a query? What is the maximum acceptable staleness for an index? What is the acceptable rate of false negatives in retrieval? Answering those questions before you write code determines most of the architecture decisions automatically.

> **Warning:** Teams that skip SLO definition end up in a constant state of reactive optimization — speeding up the part that just complained, without knowing whether they are addressing the right bottleneck or making the right tradeoff.

### The Quiet Failure Mode

The most dangerous property of embedding pipelines is that they fail quietly. When a database goes down, you get errors. When an embedding pipeline degrades — whether from index drift, model version mismatch, or threshold miscalibration — your users get worse results. If you are not measuring retrieval quality continuously, you will not know until someone complains, and by then the degradation may be months old.

This is why monitoring is not optional. Chapter 6 covers it in detail. But the key point to carry forward from this chapter is that the production gap is not closed by getting the pipeline to work. It is closed by getting the pipeline to fail loudly, degrade visibly, and recover automatically.

**Key Takeaways**

- Notebook embedding code fails in production because it was never designed to handle memory constraints, startup latency, model version consistency, or index maintenance.
- A production pipeline has four distinct components: ingestion, vector store, serving, and maintenance loop.
- The most dangerous failure mode in embedding systems is silent quality degradation — caught only by active retrieval quality monitoring.
- Define latency and staleness SLOs before writing any infrastructure code.
- Separating ingestion from serving is a resource isolation decision, not a code organization preference.

**Practical Exercise**

Take an embedding pipeline you currently have running in production or staging. Write down, without looking at dashboards: (1) the p95 query latency, (2) how long a full re-embed of your corpus takes, (3) what model version produced your stored embeddings, and (4) the last time you verified that top-1 retrieval was returning the correct result. If you cannot answer all four from memory, you have gaps in your observability that this book will help you close.

---

## Chapter 2: Infrastructure Decisions: GPU vs CPU, ONNX vs PyTorch

The first infrastructure decision most teams make wrong is the GPU decision. The assumption is that embeddings require GPU, and that the goal is to get GPU access as cheaply as possible. Both halves of that assumption are worth questioning.

For inference — not training, inference — a properly optimized CPU-based embedding setup outperforms a naive GPU-based setup on latency at low-to-moderate concurrency, and it costs dramatically less at any scale that is not Google-scale. The key word is "properly optimized." An unoptimized CPU setup is slow. An optimized one, using the right runtime and the right model format, is fast enough for most production workloads and costs a fraction of GPU compute.

### The Case for ONNX Runtime

PyTorch is a training framework that can do inference. ONNX Runtime is an inference runtime. That distinction matters a lot when your embedding model is sitting inside a production server process that also has to handle HTTP requests, database queries, and cache operations.

A PyTorch model loaded in the standard way — `from transformers import AutoModel; model = AutoModel.from_pretrained(...)` — will occupy between 400MB and 600MB of RAM just for model weights on most sentence-transformer architectures. The full RSS of a process hosting that model is typically around 820MB once you account for PyTorch's memory allocator, gradient machinery, and the Python interpreter.

The same model exported to ONNX and loaded with ONNX Runtime uses approximately 30MB for model weights. The full process RSS drops to around 197MB. That is not a tuning improvement — it is a 4x reduction in memory footprint from switching runtimes. In a containerized environment where your pod has a 512MB memory limit, the difference is whether your service runs at all.

> **Key Insight:** ONNX Runtime achieves this reduction by stripping everything PyTorch carries for training — gradient buffers, optimizer state machinery, the autograd engine. For inference, you need none of that. ONNX gives you the computation graph with all the training scaffolding removed.

Here is the full export and load pattern:

```python
from sentence_transformers import SentenceTransformer
from optimum.onnxruntime import ORTModelForFeatureExtraction
from transformers import AutoTokenizer
import onnxruntime as ort
import numpy as np

# Export once — run this offline, commit the result
def export_to_onnx(model_name: str, output_dir: str):
    model = SentenceTransformer(model_name)
    model.save(output_dir)

    from optimum.onnxruntime import ORTModelForFeatureExtraction
    ort_model = ORTModelForFeatureExtraction.from_pretrained(
        output_dir, export=True
    )
    ort_model.save_pretrained(output_dir + "_onnx")

# Load at runtime — this is what your server process does
class ONNXEmbedder:
    def __init__(self, model_dir: str):
        sess_options = ort.SessionOptions()
        sess_options.intra_op_num_threads = 4
        sess_options.inter_op_num_threads = 1
        sess_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL

        self.session = ort.InferenceSession(
            f"{model_dir}/model.onnx",
            sess_options=sess_options,
            providers=["CPUExecutionProvider"],
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_dir)

    def embed(self, texts: list[str]) -> np.ndarray:
        encoded = self.tokenizer(
            texts,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="np",
        )
        outputs = self.session.run(
            None,
            {
                "input_ids": encoded["input_ids"],
                "attention_mask": encoded["attention_mask"],
            },
        )
        # Mean pooling over token dimension
        token_embeddings = outputs[0]
        attention_mask = encoded["attention_mask"]
        mask_expanded = attention_mask[:, :, np.newaxis].astype(np.float32)
        sum_embeddings = np.sum(token_embeddings * mask_expanded, axis=1)
        sum_mask = np.clip(mask_expanded.sum(axis=1), a_min=1e-9, a_max=None)
        embeddings = sum_embeddings / sum_mask
        # L2 normalize
        norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
        return embeddings / np.clip(norms, a_min=1e-9, a_max=None)
```

With this setup on a standard 4-core CPU, embedding latency for a single 128-token document runs around 6ms at 384 embedding dimensions. That is fast enough for a synchronous query path without any caching layer.

### Thread Configuration

The `intra_op_num_threads` setting controls how many threads ONNX Runtime uses for operations within a single operator (like a matrix multiply). Setting it too high causes thread contention when you have concurrent requests. For a server handling moderate concurrency, 4 intra-op threads with 1 inter-op thread is a stable default.

If you are serving queries sequentially — one at a time, no concurrency — you can push `intra_op_num_threads` to match your core count. If you are handling concurrent requests, lower it to 2-4 and let the concurrency provide throughput rather than individual-request parallelism.

> **Warning:** The default ONNX Runtime thread count is "all available cores." On a 32-core server, this means each embedding call spawns 32 threads. Two concurrent requests produce 64 threads fighting over 32 cores, causing severe latency spikes. Always set thread counts explicitly.

### GPU When It Actually Helps

GPU accelerates embedding when you have large batches and your bottleneck is throughput, not latency. If you are doing an initial indexing run over 500,000 documents and time-to-completion is the constraint, a GPU will outrun CPU by 10-20x on raw throughput.

For serving — where you are embedding individual queries or small batches in a hot path — GPU often does not help and sometimes hurts, because the overhead of CPU-GPU data transfer adds latency that dwarfs the compute savings for small inputs. The break-even point depends on batch size and model size, but for sub-100 document batches on most sentence transformer models, CPU with ONNX Runtime is competitive with or faster than GPU with PyTorch.

If you do use GPU for serving, use it with batching enabled and set a maximum queue wait time so that low-traffic periods do not result in single-item batches that gain nothing from GPU parallelism.

**Key Takeaways**

- ONNX Runtime reduces model weight RAM from ~496MB to ~30MB compared to PyTorch; full process RSS drops from ~820MB to ~197MB on typical sentence transformer models.
- Switching to ONNX is not an optimization — it is a different runtime designed for inference. The memory reduction is structural, not incidental.
- Set ONNX thread counts explicitly; the default "all cores" setting causes severe contention under concurrent load.
- CPU with ONNX Runtime achieves ~6ms embedding latency at 384 dimensions, competitive with GPU for low-batch serving workloads.
- GPU provides real throughput gains for bulk indexing; for query-time serving, the break-even over optimized CPU is at larger batch sizes than most teams assume.

**Practical Exercise**

If you have a PyTorch-based embedding model in production, measure its current RSS with `psutil.Process().memory_info().rss`. Then export the model to ONNX using `optimum`, load it with ONNX Runtime, and measure again. Record both numbers. If you are running in a container, compare the peak RSS to your container memory limit. This measurement should inform your next infrastructure sizing decision.

---

## Chapter 3: Chunking Strategies That Survive Real Data

Chunking is the part of embedding pipeline design that gets the least attention and causes the most production problems. The standard advice — "split documents into 512-token chunks with 50-token overlap" — is a starting point, not a strategy. Real data is heterogeneous, and a chunking approach designed for a single document type will produce garbage on everything else.

### What Chunking Is Actually Doing

A chunk is the unit of retrieval. When a user queries your system, they get back chunks — not documents. The chunk boundaries determine what context accompanies each retrieved result. A chunk that cuts a function definition in half, or separates a code comment from the code it describes, or merges two unrelated sections of a document will produce embeddings that do not accurately represent any coherent unit of meaning.

The embedding model is not magic. It encodes whatever text it receives. If that text is incoherent — half a SQL query followed by an unrelated paragraph — the resulting vector will be similarly incoherent: not representative of either thing, not a good match for queries about either thing.

Good chunking is semantic. It tries to keep units of meaning together. The definition of "unit of meaning" varies by content type, which is why a single chunking strategy rarely works across a mixed corpus.

### Fixed-Size Chunking and When It Works

Fixed-size chunking — splitting on character or token count, with optional overlap — is appropriate when your documents are dense and homogeneous. Long-form prose, legal documents, and support ticket bodies are reasonable candidates. The content is continuous enough that an arbitrary split usually lands mid-thought rather than mid-concept, and the overlap gives each chunk enough context to be self-explanatory.

```python
def chunk_fixed(text: str, chunk_size: int = 400, overlap: int = 50) -> list[str]:
    words = text.split()
    chunks = []
    start = 0
    while start < len(words):
        end = min(start + chunk_size, len(words))
        chunks.append(" ".join(words[start:end]))
        if end == len(words):
            break
        start += chunk_size - overlap
    return chunks
```

Fixed-size chunking breaks down on structured data — code, markdown with headers, JSON, CSV. In those cases, arbitrary splits produce chunks that are syntactically invalid, contextually meaningless, or both.

### Structural Chunking for Code

For source code, the natural chunking unit is the syntactic structure: functions, classes, methods. A function is a meaningful unit of retrieval. Half a function is not.

```python
import ast
from dataclasses import dataclass

@dataclass
class CodeChunk:
    text: str
    start_line: int
    end_line: int
    chunk_type: str  # "function", "class", "module_docstring", "import_block"

def chunk_python_file(source: str, filepath: str) -> list[CodeChunk]:
    try:
        tree = ast.parse(source)
    except SyntaxError:
        # Fall back to fixed-size chunking for unparseable files
        lines = source.splitlines()
        return [
            CodeChunk(
                text="\n".join(lines[i:i+50]),
                start_line=i,
                end_line=min(i+50, len(lines)),
                chunk_type="raw"
            )
            for i in range(0, len(lines), 40)
        ]

    chunks = []
    lines = source.splitlines()

    for node in ast.walk(tree):
        if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
            start = node.lineno - 1
            end = node.end_lineno
            chunk_text = "\n".join(lines[start:end])
            # Include leading docstring from parent if available
            chunks.append(CodeChunk(
                text=f"# File: {filepath}\n{chunk_text}",
                start_line=start,
                end_line=end,
                chunk_type="function"
            ))
        elif isinstance(node, ast.ClassDef):
            start = node.lineno - 1
            end = node.end_lineno
            # Include class-level docstring but not method bodies
            # (methods are chunked separately above)
            class_header_lines = []
            for line in lines[start:end]:
                class_header_lines.append(line)
                if '"""' in line or "'''" in line:
                    if len(class_header_lines) > 1:
                        break
            chunks.append(CodeChunk(
                text=f"# File: {filepath}\n" + "\n".join(class_header_lines),
                start_line=start,
                end_line=start + len(class_header_lines),
                chunk_type="class"
            ))

    return chunks
```

> **Key Insight:** Prepending the file path to each code chunk significantly improves retrieval for queries that include directory or module context ("the auth middleware", "the database migration utilities"). The model sees this as content and encodes it accordingly.

### Markdown and Document Chunking

For markdown documents, split on headers. A section under a `##` heading is a natural retrieval unit — it is the author's own declaration of where one topic ends and another begins.

```python
import re

def chunk_markdown(text: str, max_chunk_tokens: int = 500) -> list[str]:
    header_pattern = re.compile(r'^#{1,3}\s+.+$', re.MULTILINE)
    header_positions = [m.start() for m in header_pattern.finditer(text)]
    header_positions.append(len(text))

    chunks = []
    for i in range(len(header_positions) - 1):
        section = text[header_positions[i]:header_positions[i+1]].strip()
        if not section:
            continue
        # If section is too long, subdivide at paragraph boundaries
        if len(section.split()) > max_chunk_tokens:
            paragraphs = re.split(r'\n\n+', section)
            current = []
            current_len = 0
            for para in paragraphs:
                para_len = len(para.split())
                if current_len + para_len > max_chunk_tokens and current:
                    chunks.append("\n\n".join(current))
                    current = [para]
                    current_len = para_len
                else:
                    current.append(para)
                    current_len += para_len
            if current:
                chunks.append("\n\n".join(current))
        else:
            chunks.append(section)

    return [c for c in chunks if len(c.strip()) > 30]
```

> **Warning:** Chunks shorter than about 20 words produce unreliable embeddings. The model has too little signal to encode anything meaningful, and the resulting vectors tend to cluster near the center of the embedding space, making them poor discriminators in retrieval. Filter out short chunks before embedding.

### The Overlap Question

Overlap between chunks serves one purpose: ensuring that content near a chunk boundary is represented in at least two chunks, so a query about that content can match at least one of them. The right overlap size depends on the typical query length and the typical length of the concept the query is looking for.

For prose, 10-15% overlap is a reasonable default. For code, overlap is usually wrong — you do not want half a function body appearing in two different chunks, because the half-chunks are misleading rather than helpful. Use no overlap for structural code chunks.

**Key Takeaways**

- Chunking determines the unit of retrieval; chunks that cut semantic units in half produce embeddings that do not represent any coherent concept.
- Use structural chunking for code (functions, classes) and header-based chunking for markdown; reserve fixed-size chunking for dense, homogeneous prose.
- Prepend file path or section title to each chunk; the model encodes this metadata and it improves retrieval for context-specific queries.
- Filter out chunks shorter than ~20 words; they produce unreliable embeddings that hurt retrieval quality.
- Overlap is appropriate for prose, counterproductive for structural content types like code.

**Practical Exercise**

Take 50 representative documents from your corpus and your current chunking configuration. For each chunk, ask: "Could a user's query plausibly be about this chunk, and only this chunk?" Count the chunks where the answer is clearly no — because the chunk is half a sentence, or combines two unrelated topics, or is too short to have a coherent topic. That count is your chunking quality baseline. Improve your chunking until it drops by at least 50%.

---

## Chapter 4: Vector Store Selection: pgvector vs ChromaDB vs Pinecone

The vector store decision is where teams spend the most time researching and make the most avoidable mistakes. The research usually focuses on the wrong dimensions — features, managed vs self-hosted, ecosystem integrations — rather than the dimensions that actually differentiate production behavior: query latency, operational overhead, cost at your scale, and failure modes.

Here is what the numbers look like from real production traffic. Comparing pgvector on self-hosted Postgres against Pinecone for a corpus of ~80,000 embeddings at 384 dimensions: pgvector delivers 1.4x lower p95 latency than Pinecone, and 79% lower monthly cost when running on existing Postgres infrastructure. Those numbers will vary with your workload and infrastructure, but the direction is consistent — managed vector databases carry a significant cost premium, and the latency advantages they advertise are often offset by network round-trip overhead.

### pgvector

pgvector is a Postgres extension that adds vector similarity search. If you are already running Postgres, pgvector is the default choice until you have a concrete reason to look elsewhere.

Setup is straightforward:

```sql
CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE embeddings (
    id          BIGSERIAL PRIMARY KEY,
    source_id   TEXT NOT NULL,
    source_hash TEXT NOT NULL,  -- SHA-256 of source content
    chunk_index INTEGER NOT NULL,
    content     TEXT NOT NULL,
    embedding   vector(384),
    metadata    JSONB,
    created_at  TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE (source_id, chunk_index)
);

-- HNSW index for approximate nearest neighbor
CREATE INDEX ON embeddings USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);

-- For exact search on small corpora (< 10K rows)
-- CREATE INDEX ON embeddings USING ivfflat (embedding vector_cosine_ops)
-- WITH (lists = 100);
```

A basic query:

```python
import psycopg2
import numpy as np
from typing import Optional

def search_embeddings(
    conn,
    query_embedding: np.ndarray,
    top_k: int = 5,
    threshold: Optional[float] = None,
    source_filter: Optional[str] = None,
) -> list[dict]:
    embedding_list = query_embedding.tolist()

    where_clauses = []
    params = [embedding_list, top_k]

    if threshold is not None:
        where_clauses.append(f"1 - (embedding <=> $1::vector) >= ${len(params)+1}")
        params.append(threshold)

    if source_filter is not None:
        where_clauses.append(f"source_id LIKE ${len(params)+1}")
        params.append(source_filter)

    where_sql = "WHERE " + " AND ".join(where_clauses) if where_clauses else ""

    sql = f"""
        SELECT
            source_id,
            chunk_index,
            content,
            metadata,
            1 - (embedding <=> $1::vector) AS similarity
        FROM embeddings
        {where_sql}
        ORDER BY embedding <=> $1::vector
        LIMIT $2
    """

    with conn.cursor() as cur:
        cur.execute(sql, params)
        rows = cur.fetchall()

    return [
        {
            "source_id": r[0],
            "chunk_index": r[1],
            "content": r[2],
            "metadata": r[3],
            "similarity": float(r[4]),
        }
        for r in rows
    ]
```

> **Warning:** The HNSW index in pgvector degrades silently. As you insert records after building the index, the index structure becomes increasingly suboptimal — retrieval recall drops while query latency remains stable. This means your dashboards will look fine while your users get worse results. Schedule periodic `REINDEX` operations and track retrieval quality separately from query time.

### ChromaDB

ChromaDB is the right choice when you want an embedded vector store with zero infrastructure overhead — a single `pip install chromadb` and you are running. It works well for development, for small-to-medium corpora that fit comfortably in memory, and for teams that do not already run Postgres.

```python
import chromadb
from chromadb.config import Settings
import numpy as np

client = chromadb.PersistentClient(
    path="./chroma_db",
    settings=Settings(anonymized_telemetry=False)
)

collection = client.get_or_create_collection(
    name="documents",
    metadata={"hnsw:space": "cosine"}
)

def upsert_chunks(
    collection,
    ids: list[str],
    embeddings: list[list[float]],
    documents: list[str],
    metadatas: list[dict]
):
    collection.upsert(
        ids=ids,
        embeddings=embeddings,
        documents=documents,
        metadatas=metadatas
    )

def search(collection, query_embedding: np.ndarray, top_k: int = 5) -> list[dict]:
    results = collection.query(
        query_embeddings=[query_embedding.tolist()],
        n_results=top_k,
        include=["documents", "metadatas", "distances"]
    )

    output = []
    for i, doc_id in enumerate(results["ids"][0]):
        output.append({
            "id": doc_id,
            "content": results["documents"][0][i],
            "metadata": results["metadatas"][0][i],
            "similarity": 1 - results["distances"][0][i],  # cosine distance to similarity
        })
    return output
```

ChromaDB's limitations show at scale. Its write throughput is lower than pgvector for bulk ingestion, and its filtering capabilities are more limited. For corpora above 500,000 chunks or workloads with complex metadata filtering requirements, you will hit those limits.

### In-Memory NumPy Search

For corpora under approximately 100,000 documents, in-memory cosine search over a NumPy matrix beats both Pinecone and ChromaDB on latency and cost. The full matrix fits in a few hundred megabytes, search is a single batched matrix multiply, and there is no network round-trip or index maintenance overhead.

```python
import numpy as np
from dataclasses import dataclass

@dataclass
class MemoryIndex:
    embeddings: np.ndarray   # shape: (n_chunks, dim)
    ids: list[str]
    contents: list[str]
    metadatas: list[dict]

def build_memory_index(chunks: list[dict]) -> MemoryIndex:
    embeddings = np.array([c["embedding"] for c in chunks], dtype=np.float32)
    # Normalize for cosine similarity via dot product
    norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
    embeddings = embeddings / np.clip(norms, 1e-9, None)

    return MemoryIndex(
        embeddings=embeddings,
        ids=[c["id"] for c in chunks],
        contents=[c["content"] for c in chunks],
        metadatas=[c["metadata"] for c in chunks],
    )

def search_memory(
    index: MemoryIndex,
    query_embedding: np.ndarray,
    top_k: int = 5,
    threshold: float = 0.0,
) -> list[dict]:
    q = query_embedding / np.linalg.norm(query_embedding)
    scores = index.embeddings @ q  # dot product = cosine similarity after normalization

    top_indices = np.argpartition(scores, -top_k)[-top_k:]
    top_indices = top_indices[np.argsort(scores[top_indices])[::-1]]

    results = []
    for idx in top_indices:
        sim = float(scores[idx])
        if sim >= threshold:
            results.append({
                "id": index.ids[idx],
                "content": index.contents[idx],
                "metadata": index.metadatas[idx],
                "similarity": sim,
            })
    return results
```

> **Key Insight:** For most codebases under 100,000 files, in-memory NumPy search beats Pinecone on both latency (no network round-trip) and cost (zero). Load the index from Postgres or a file at startup, serve queries from RAM. Reload on updates.

### When to Use Pinecone

Pinecone makes sense when: (1) your corpus is genuinely large (millions of documents), (2) you need multi-tenant isolation with separate namespaces, (3) you have no existing Postgres infrastructure and no ops capacity to run it, or (4) you need specific features like sparse-dense hybrid retrieval that Pinecone supports natively.

For most teams building their first or second production embedding system, none of those conditions apply. Start with pgvector. Move to Pinecone if and when you have a concrete reason to.

**Key Takeaways**

- pgvector delivers 1.4x lower p95 latency than Pinecone and 79% lower monthly cost when self-hosted on existing Postgres.
- HNSW index degradation in pgvector is silent — schedule `REINDEX` and monitor retrieval quality separately from query latency.
- For corpora under 100,000 chunks, in-memory NumPy search outperforms both Pinecone and ChromaDB on latency with zero infrastructure cost.
- ChromaDB is the right default for development and small-to-medium corpora with no existing Postgres infrastructure.
- Use Pinecone when corpus size is genuinely in the millions or when you need multi-tenant namespace isolation at scale.

**Practical Exercise**

If you are currently using a managed vector database, calculate your monthly cost. Then estimate the cost of running an equivalent Postgres instance on your existing infrastructure (or on a small dedicated VM) with pgvector. Factor in the p95 latency difference. If the cost difference is more than $100/month and your corpus is under 1 million chunks, schedule a migration spike.

---

## Chapter 5: Serving Patterns and Latency SLOs

Getting an embedding pipeline to serve at low latency requires answering a few questions that most teams defer until they have a problem: What is the budget for query embedding? What is the budget for vector retrieval? What fraction of queries can exceed the budget before the SLO is violated? And what happens when you exceed it?

These questions sound abstract until you are on-call at 2am because search response time spiked from 25ms to 800ms and you do not know whether the problem is in the embedding step, the vector store, or something upstream. Latency decomposition — knowing which component owns which portion of your end-to-end latency — is what turns that 2am incident into a 5-minute diagnosis.

### Latency Decomposition

A typical semantic search query has four latency components:

1. **Query embedding**: tokenization + model forward pass + normalization
2. **Vector retrieval**: index lookup in your vector store
3. **Result hydration**: fetching content for the matched IDs (if not stored inline)
4. **Post-processing**: threshold filtering, reranking, deduplication

With ONNX Runtime on a 4-core CPU, component 1 runs around 6ms for a typical query. Component 2 depends on your vector store and corpus size. Component 3 is a database lookup and should be under 2ms with a warm connection pool. Component 4 is usually under 1ms.

That means a well-tuned system can deliver end-to-end semantic search in under 20ms — entirely on CPU, no GPU required. The p95 number creeps up because of garbage collection pauses, connection pool exhaustion under load, and HNSW index variance. A 30ms p95 target is achievable. A 50ms p95 target is straightforward.

```python
import time
from contextlib import contextmanager
from dataclasses import dataclass, field

@dataclass
class QueryTrace:
    query: str
    embed_ms: float = 0.0
    retrieve_ms: float = 0.0
    hydrate_ms: float = 0.0
    postprocess_ms: float = 0.0
    total_ms: float = 0.0
    result_count: int = 0

@contextmanager
def timed(trace: QueryTrace, field_name: str):
    start = time.perf_counter()
    yield
    elapsed = (time.perf_counter() - start) * 1000
    setattr(trace, field_name, elapsed)

def search_with_trace(
    query: str,
    embedder,
    vector_store,
    threshold: float = 0.5,
    top_k: int = 5,
) -> tuple[list[dict], QueryTrace]:
    trace = QueryTrace(query=query)
    start_total = time.perf_counter()

    with timed(trace, "embed_ms"):
        query_embedding = embedder.embed([query])[0]

    with timed(trace, "retrieve_ms"):
        raw_results = vector_store.search(query_embedding, top_k=top_k * 2)

    with timed(trace, "postprocess_ms"):
        results = [r for r in raw_results if r["similarity"] >= threshold][:top_k]

    trace.total_ms = (time.perf_counter() - start_total) * 1000
    trace.result_count = len(results)
    return results, trace
```

### Batching at the Serving Layer

Individual query embedding is fast, but concurrent queries can saturate CPU if each spawns its own model inference call. A batching queue at the serving layer collects concurrent queries and runs them through the model together, dramatically improving throughput without increasing per-query latency for the common case.

```python
import asyncio
from collections import deque

class BatchingEmbedder:
    def __init__(self, embedder, max_batch_size: int = 100, max_wait_ms: float = 5.0):
        self.embedder = embedder
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self._queue: deque = deque()
        self._lock = asyncio.Lock()
        self._batch_event = asyncio.Event()

    async def embed_single(self, text: str) -> list[float]:
        future = asyncio.get_event_loop().create_future()
        async with self._lock:
            self._queue.append((text, future))
            if len(self._queue) >= self.max_batch_size:
                self._batch_event.set()

        # Wait for either batch to fill or timeout
        try:
            await asyncio.wait_for(
                asyncio.shield(future),
                timeout=self.max_wait_ms / 1000 + 0.1
            )
        except asyncio.TimeoutError:
            pass

        if not future.done():
            await self._flush()

        return await future

    async def _flush(self):
        async with self._lock:
            if not self._queue:
                return
            batch = []
            futures = []
            while self._queue and len(batch) < self.max_batch_size:
                text, future = self._queue.popleft()
                batch.append(text)
                futures.append(future)

        if not batch:
            return

        embeddings = await asyncio.get_event_loop().run_in_executor(
            None, self.embedder.embed, batch
        )

        for future, embedding in zip(futures, embeddings):
            if not future.done():
                future.set_result(embedding.tolist())
```

The batch size of 100 is the throughput-latency sweet spot for most hardware — large enough to saturate vectorized operations, small enough that individual-request wait time stays within SLO.

> **Key Insight:** Batch size 100 is not a guess — it is the crossover point where the amortized cost of model overhead per item is minimized, while queuing delay is still within a 5ms SLO contribution budget. Measure your own crossover by timing batches of 10, 50, 100, 200, and 500 on your target hardware.

### Similarity Threshold Calibration

Most tutorials suggest a static similarity threshold around 0.7. That number is wrong for most applications and it was never derived from data.

The right threshold is the point on the similarity score distribution that separates relevant from irrelevant results in your specific corpus with your specific queries. This is an empirical question, and the answer depends on your model, your content type, and your query distribution.

```python
import numpy as np
from sklearn.metrics import precision_recall_curve

def calibrate_threshold(
    eval_pairs: list[tuple[str, str, bool]],  # (query, chunk, is_relevant)
    embedder,
) -> float:
    queries = [p[0] for p in eval_pairs]
    chunks = [p[1] for p in eval_pairs]
    labels = [int(p[2]) for p in eval_pairs]

    query_embeddings = embedder.embed(queries)
    chunk_embeddings = embedder.embed(chunks)

    # Cosine similarity for each pair
    similarities = np.array([
        float(np.dot(q, c) / (np.linalg.norm(q) * np.linalg.norm(c)))
        for q, c in zip(query_embeddings, chunk_embeddings)
    ])

    precisions, recalls, thresholds = precision_recall_curve(labels, similarities)

    # Find threshold that maximizes F1
    f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-9)
    best_idx = np.argmax(f1_scores[:-1])
    best_threshold = float(thresholds[best_idx])

    print(f"Optimal threshold: {best_threshold:.3f}")
    print(f"  Precision: {precisions[best_idx]:.3f}")
    print(f"  Recall: {recalls[best_idx]:.3f}")
    print(f"  F1: {f1_scores[best_idx]:.3f}")

    return best_threshold
```

> **Warning:** Running with a threshold that is too low floods results with irrelevant content; too high, and relevant results are silently dropped. Neither failure mode produces an error — users just get bad results. The only way to catch this is with a labeled eval set and automated threshold calibration.

**Key Takeaways**

- Decompose end-to-end query latency into at least four components and instrument each separately; you cannot diagnose latency regressions without this.
- A well-tuned ONNX + pgvector stack can serve semantic search in under 20ms end-to-end on CPU-only hardware.
- Batch size 100 is the throughput-latency crossover point on most hardware; measure your own crossover rather than accepting the default.
- Similarity thresholds should be calibrated from a labeled eval set, not set statically at 0.7.
- Serving-layer batching queues significantly improve throughput under concurrent load without increasing p50 latency.

**Practical Exercise**

Instrument your current query path with the `QueryTrace` pattern above. Run 100 representative queries and record p50, p95, and p99 for each component. Identify which component dominates your p99. That component is where your next optimization effort should go.

---

## Chapter 6: Monitoring Embedding Drift in Production

Embedding pipelines have a failure mode with no equivalent in traditional software: the model that produces your stored embeddings and the model that encodes your queries can silently diverge. This happens when you update the query-time model without re-embedding your stored documents, when the model is retrained, when a dependency changes tokenization behavior, or when you accidentally load a different model checkpoint. The result is a space mismatch — queries land in a different region of the embedding space than the documents they should match, and retrieval quality collapses.

Because retrieval quality is not directly observable from your infrastructure dashboards, this failure is indistinguishable from "users stopped searching" until you look at the right metrics.

### What to Monitor

There are four monitoring signals that together give you a complete picture of embedding pipeline health:

**1. Model version consistency.** Store the model name and version hash with every embedding. On each query, verify that the model currently loaded for query encoding matches the model that produced the stored embeddings you are searching against.

```python
import hashlib
import json
from pathlib import Path

def compute_model_fingerprint(model_dir: str) -> str:
    config_path = Path(model_dir) / "config.json"
    tokenizer_path = Path(model_dir) / "tokenizer_config.json"

    fingerprint_data = {}
    for path in [config_path, tokenizer_path]:
        if path.exists():
            fingerprint_data[path.name] = json.loads(path.read_text())

    serialized = json.dumps(fingerprint_data, sort_keys=True)
    return hashlib.sha256(serialized.encode()).hexdigest()[:16]

def store_embedding_with_provenance(
    conn, source_id: str, chunk_index: int,
    content: str, embedding: list[float],
    model_fingerprint: str
):
    with conn.cursor() as cur:
        cur.execute("""
            INSERT INTO embeddings
                (source_id, chunk_index, content, embedding, model_fingerprint)
            VALUES (%s, %s, %s, %s::vector, %s)
            ON CONFLICT (source_id, chunk_index)
            DO UPDATE SET
                content = EXCLUDED.content,
                embedding = EXCLUDED.embedding,
                model_fingerprint = EXCLUDED.model_fingerprint,
                created_at = NOW()
        """, (source_id, chunk_index, content, embedding, model_fingerprint))
```

**2. Retrieval quality on a fixed eval set.** Maintain a set of query-document pairs with known relevance judgments. Run this eval on a schedule — daily at minimum — and track the precision and recall over time. A sudden drop indicates a pipeline problem. A gradual decline indicates index drift or threshold miscalibration.

```python
def run_retrieval_eval(
    eval_set: list[dict],  # {query, expected_source_id, expected_chunk_index}
    embedder,
    vector_store,
    threshold: float,
    top_k: int = 5,
) -> dict:
    hits = 0
    mrr_sum = 0.0

    for item in eval_set:
        query_embedding = embedder.embed([item["query"]])[0]
        results = vector_store.search(query_embedding, top_k=top_k)

        for rank, result in enumerate(results, 1):
            if (result["source_id"] == item["expected_source_id"] and
                    result["chunk_index"] == item["expected_chunk_index"] and
                    result["similarity"] >= threshold):
                hits += 1
                mrr_sum += 1.0 / rank
                break

    return {
        "recall_at_k": hits / len(eval_set),
        "mrr": mrr_sum / len(eval_set),
        "eval_set_size": len(eval_set),
    }
```

**3. Similarity score distribution.** Embed a fixed set of probe documents daily and record the mean and standard deviation of similarity scores for a fixed set of probe queries against them. A shift in the distribution indicates that the embedding space has changed — either from model drift or from a change in the index composition.

```python
import numpy as np

def compute_score_distribution(
    probe_queries: list[str],
    probe_documents: list[str],
    embedder,
) -> dict:
    query_embeddings = embedder.embed(probe_queries)
    doc_embeddings = embedder.embed(probe_documents)

    # Compute all pairwise cosine similarities
    q_norm = query_embeddings / np.linalg.norm(query_embeddings, axis=1, keepdims=True)
    d_norm = doc_embeddings / np.linalg.norm(doc_embeddings, axis=1, keepdims=True)
    similarity_matrix = q_norm @ d_norm.T

    return {
        "mean_similarity": float(similarity_matrix.mean()),
        "std_similarity": float(similarity_matrix.std()),
        "p25_similarity": float(np.percentile(similarity_matrix, 25)),
        "p75_similarity": float(np.percentile(similarity_matrix, 75)),
    }
```

> **Key Insight:** Store probe distribution snapshots daily. Alert when the mean similarity shifts by more than 2 standard deviations from the 30-day rolling average. This catches model version mismatches within one day of occurrence.

**4. Index staleness.** Track the timestamp of the most recently updated embedding and compare it to the timestamp of the most recently modified source document. If the gap exceeds your staleness SLO — whatever that is for your application — trigger a re-indexing check.

```sql
SELECT
    COUNT(*) FILTER (WHERE e.created_at < s.last_modified) AS stale_chunks,
    COUNT(*) AS total_chunks,
    MAX(s.last_modified - e.created_at) AS max_staleness_interval
FROM embeddings e
JOIN source_documents s ON s.id = e.source_id;
```

### Alerting Thresholds

The hard part is not collecting these metrics — it is setting alert thresholds that catch real problems without flooding you with false positives. A useful starting point: alert when recall@5 drops by more than 10 percentage points from its 7-day average, when mean similarity shifts by more than 2 standard deviations, or when model fingerprints diverge between query-time and stored embeddings. Tune from there based on your corpus volatility and your tolerance for false alerts.

> **Warning:** Do not set absolute thresholds for similarity metrics — they are model-specific and corpus-specific. Use relative thresholds (deviation from rolling baseline) so the alert adapts as your system changes.

**Key Takeaways**

- Model version mismatch between stored embeddings and query-time model produces silent, catastrophic retrieval quality degradation.
- Store model fingerprints with every embedding; verify fingerprint consistency on each query path.
- Run a fixed eval set on a daily schedule and track recall and MRR over time — the trend is as important as the absolute value.
- Monitor similarity score distributions from fixed probe pairs; distribution shifts indicate embedding space changes.
- Use relative alert thresholds, not absolute ones, for embedding quality metrics.

**Practical Exercise**

Build a minimal eval set: 20 queries from your real query logs, with the correct source document manually identified for each. Run it against your current system and record recall@5 and MRR. Run it again in one week. If either metric has changed by more than 5 percentage points without a deliberate pipeline change, you have drift you did not know about.

---

## Chapter 7: Incremental Re-Embedding: Staying Fresh Without Full Rebuilds

The naive approach to keeping an embedding index fresh is to re-embed everything on a schedule. Run the full pipeline nightly, replace the index, done. For small corpora this works. For anything above a few thousand documents, it becomes the dominant cost of operating the pipeline — both in compute and in the latency window during which your index is being replaced.

The right approach is incremental: re-embed only the documents that have changed since the last indexing run. This requires tracking which documents have changed, being able to re-embed and update individual documents without rebuilding the whole index, and handling deletions cleanly.

### Content Hashing

The foundation of incremental re-embedding is a hash of each document's content stored alongside its embeddings. On each indexing cycle, compute the hash of the current document content and compare it to the stored hash. If they match, skip the document. If they differ, re-embed and update.

```python
import hashlib

def compute_content_hash(content: str) -> str:
    return hashlib.sha256(content.encode("utf-8")).hexdigest()

def get_stale_sources(conn, source_ids_and_hashes: list[tuple[str, str]]) -> list[str]:
    if not source_ids_and_hashes:
        return []

    with conn.cursor() as cur:
        cur.execute("""
            CREATE TEMP TABLE IF NOT EXISTS current_hashes (
                source_id TEXT,
                content_hash TEXT
            ) ON COMMIT DELETE ROWS
        """)

        cur.executemany(
            "INSERT INTO current_hashes VALUES (%s, %s)",
            source_ids_and_hashes
        )

        cur.execute("""
            SELECT ch.source_id
            FROM current_hashes ch
            LEFT JOIN (
                SELECT DISTINCT ON (source_id) source_id, source_hash
                FROM embeddings
                ORDER BY source_id, created_at DESC
            ) e ON e.source_id = ch.source_id
            WHERE e.source_id IS NULL
               OR e.source_hash != ch.content_hash
        """)

        return [row[0] for row in cur.fetchall()]
```

### Git Webhook Re-Indexing

For codebases, Git provides the exact list of changed files on every push. Integrating with Git webhooks eliminates the need to scan the entire corpus to find stale documents — you get the diff for free.

```python
from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib
import json
import asyncio

app = FastAPI()

async def process_push_event(payload: dict, indexer):
    repo = payload.get("repository", {}).get("full_name", "unknown")
    commits = payload.get("commits", [])

    changed_files = set()
    removed_files = set()

    for commit in commits:
        changed_files.update(commit.get("added", []))
        changed_files.update(commit.get("modified", []))
        removed_files.update(commit.get("removed", []))

    # Remove deleted files from index
    if removed_files:
        await indexer.delete_sources(list(removed_files))

    # Re-embed only changed files
    if changed_files:
        await indexer.reindex_files(
            repo=repo,
            file_paths=list(changed_files)
        )

    return {
        "reindexed": len(changed_files),
        "deleted": len(removed_files)
    }

@app.post("/webhook/github")
async def github_webhook(request: Request):
    secret = b"your_webhook_secret"
    body = await request.body()

    signature = request.headers.get("X-Hub-Signature-256", "")
    expected = "sha256=" + hmac.new(secret, body, hashlib.sha256).hexdigest()

    if not hmac.compare_digest(signature, expected):
        raise HTTPException(status_code=403, detail="Invalid signature")

    payload = json.loads(body)
    event_type = request.headers.get("X-GitHub-Event", "")

    if event_type == "push":
        # Process asynchronously — don't block the webhook response
        asyncio.create_task(process_push_event(payload, request.app.state.indexer))

    return {"status": "accepted"}
```

> **Key Insight:** Git webhook re-indexing keeps your code intelligence index perpetually fresh with zero polling overhead and minimal compute. Each push triggers re-embedding of only the changed files — for typical development workflows, that is 1-10 files per event, not thousands.

### Batch Re-Embedding with Concurrency Control

When re-embedding a large number of changed files, you need to control concurrency to avoid overwhelming the embedding model or the vector store.

```python
import asyncio
from typing import Callable, Awaitable

async def reindex_files_batched(
    file_paths: list[str],
    read_file: Callable[[str], str],
    chunk_file: Callable[[str, str], list[dict]],
    embed_batch: Callable[[list[str]], list[list[float]]],
    upsert_chunks: Callable[[list[dict]], Awaitable[None]],
    batch_size: int = 100,
    max_concurrent_batches: int = 3,
) -> dict:
    semaphore = asyncio.Semaphore(max_concurrent_batches)
    results = {"indexed": 0, "failed": 0, "skipped": 0}

    async def process_file(path: str):
        async with semaphore:
            try:
                content = read_file(path)
                if not content.strip():
                    results["skipped"] += 1
                    return

                chunks = chunk_file(content, path)
                if not chunks:
                    results["skipped"] += 1
                    return

                # Embed in sub-batches
                all_embeddings = []
                texts = [c["text"] for c in chunks]
                for i in range(0, len(texts), batch_size):
                    batch_texts = texts[i:i + batch_size]
                    batch_embeddings = await asyncio.get_event_loop().run_in_executor(
                        None, embed_batch, batch_texts
                    )
                    all_embeddings.extend(batch_embeddings)

                records = []
                content_hash = compute_content_hash(content)
                for i, (chunk, embedding) in enumerate(zip(chunks, all_embeddings)):
                    records.append({
                        "source_id": path,
                        "source_hash": content_hash,
                        "chunk_index": i,
                        "content": chunk["text"],
                        "embedding": embedding,
                        "metadata": chunk.get("metadata", {}),
                    })

                await upsert_chunks(records)
                results["indexed"] += 1

            except Exception as e:
                print(f"Failed to index {path}: {e}")
                results["failed"] += 1

    await asyncio.gather(*[process_file(p) for p in file_paths])
    return results
```

> **Warning:** When upserting chunks for an updated file, delete all existing chunks for that source ID first, then insert the new chunks. If a file shrinks from 10 chunks to 7 chunks after an edit, you will be left with 3 stale chunks that match queries but point to content that no longer exists.

```python
async def safe_upsert_file_chunks(conn, source_id: str, new_chunks: list[dict]):
    async with conn.transaction():
        # Delete all existing chunks for this source
        await conn.execute(
            "DELETE FROM embeddings WHERE source_id = $1", source_id
        )
        # Insert new chunks
        await conn.executemany("""
            INSERT INTO embeddings
                (source_id, source_hash, chunk_index, content, embedding, metadata)
            VALUES ($1, $2, $3, $4, $5::vector, $6)
        """, [
            (
                chunk["source_id"], chunk["source_hash"], chunk["chunk_index"],
                chunk["content"], chunk["embedding"], json.dumps(chunk["metadata"])
            )
            for chunk in new_chunks
        ])
```

**Key Takeaways**

- Store a SHA-256 hash of each source document's content alongside its embeddings; use hash comparison to identify stale documents without reading embeddings.
- Git webhook integration provides exact changed-file lists with zero polling overhead, enabling precise incremental re-indexing on every push.
- Always delete all existing chunks for a source before inserting updated chunks, or you will accumulate stale chunks from shrinking documents.
- Control re-indexing concurrency with a semaphore to avoid saturating the embedding model or vector store.
- Batch size of 100 texts per embedding call is the throughput-latency optimum on most hardware.

**Practical Exercise**

Implement content-hash tracking in your current pipeline. Modify your ingestion code to store a SHA-256 hash of each source document alongside its embeddings. Then write a script that scans your corpus, compares current content hashes to stored hashes, and reports what percentage of your index is stale. If the answer is more than 5%, your refresh cadence needs attention.

---

## Chapter 8: Testing Your Pipeline End-to-End

Embedding pipelines are uniquely difficult to test because their output is a real-valued vector space, and correctness is not binary. A test that checks whether the output of `embed("hello")` equals some expected array is useless — it is brittle to model updates, it does not tell you anything about retrieval quality, and it asserts the wrong thing. What you actually care about is whether the right documents come back for the right queries.

This chapter covers what to test, how to structure tests that are stable across model versions, and how to run end-to-end pipeline tests that catch integration failures before they reach production.

### Testing Retrieval Quality, Not Vector Values

The fundamental principle of embedding pipeline testing: test the retrieval behavior, not the vector values.

```python
import pytest

@pytest.fixture(scope="module")
def pipeline(tmp_path_factory):
    from your_package import EmbeddingPipeline
    db_path = tmp_path_factory.mktemp("db")
    return EmbeddingPipeline(db_path=str(db_path), model_dir="./models/minilm_onnx")

@pytest.fixture(scope="module")
def indexed_pipeline(pipeline):
    documents = [
        {"id": "doc_auth", "content": "Authentication uses JWT tokens with RS256 signing. Tokens expire after 24 hours."},
        {"id": "doc_db", "content": "Database connections use a pool of 10 connections. Timeouts are set to 30 seconds."},
        {"id": "doc_cache", "content": "Redis cache stores session data with a TTL of 3600 seconds."},
        {"id": "doc_api", "content": "The REST API follows OpenAPI 3.0 specification. Rate limiting is 100 requests per minute."},
        {"id": "doc_log", "content": "Application logs are written to stdout in JSON format. Log level is configurable via LOG_LEVEL env var."},
    ]
    pipeline.index_documents(documents)
    return pipeline

def test_exact_concept_retrieval(indexed_pipeline):
    results = indexed_pipeline.search("JWT authentication tokens", top_k=1)
    assert len(results) == 1
    assert results[0]["id"] == "doc_auth"
    assert results[0]["similarity"] >= 0.7

def test_paraphrase_retrieval(indexed_pipeline):
    # Query uses different words but same concept
    results = indexed_pipeline.search("how are user sessions authenticated", top_k=2)
    retrieved_ids = [r["id"] for r in results]
    assert "doc_auth" in retrieved_ids

def test_negative_retrieval(indexed_pipeline):
    # Unrelated query should not return high-similarity results
    results = indexed_pipeline.search(
        "quantum computing photon entanglement",
        top_k=5,
        threshold=0.6
    )
    assert len(results) == 0, "Unrelated query should return no results above threshold"

def test_ranking_order(indexed_pipeline):
    results = indexed_pipeline.search("database connection pool size", top_k=3)
    assert results[0]["id"] == "doc_db"
    # Verify descending similarity order
    similarities = [r["similarity"] for r in results]
    assert similarities == sorted(similarities, reverse=True)
```

### Integration Tests for the Ingestion Pipeline

```python
import asyncio
import tempfile
import os

@pytest.mark.asyncio
async def test_incremental_update(pipeline):
    doc_id = "test_incremental"

    # Index initial version
    pipeline.index_documents([{
        "id": doc_id,
        "content": "The timeout setting controls how long to wait for a response."
    }])

    initial_results = pipeline.search("response timeout duration", top_k=1)
    assert initial_results[0]["id"] == doc_id

    # Update the document
    pipeline.index_documents([{
        "id": doc_id,
        "content": "The retry policy defines how many times to attempt failed requests."
    }])

    # Old content should no longer match as well
    old_query_results = pipeline.search("response timeout duration", top_k=1)
    new_query_results = pipeline.search("retry failed requests policy", top_k=1)

    assert new_query_results[0]["id"] == doc_id
    # Old query should either not match or match with lower similarity
    if old_query_results:
        assert old_query_results[0]["similarity"] < new_query_results[0]["similarity"]

def test_stale_chunk_cleanup(pipeline):
    # Index a document that produces multiple chunks
    long_content = " ".join([f"Sentence {i} about database configuration." for i in range(200)])
    pipeline.index_documents([{"id": "long_doc", "content": long_content}])

    initial_chunk_count = pipeline.count_chunks("long_doc")
    assert initial_chunk_count > 1

    # Replace with a much shorter document
    pipeline.index_documents([{"id": "long_doc", "content": "Short replacement content."}])

    new_chunk_count = pipeline.count_chunks("long_doc")
    assert new_chunk_count < initial_chunk_count, "Stale chunks should be removed on update"
    assert new_chunk_count >= 1
```

> **Key Insight:** Test for stale chunk cleanup explicitly. It is the most common incremental update bug, and it is invisible to retrieval quality tests when the new content is still the best match for relevant queries.

### Performance Regression Tests

```python
import time

def test_embedding_latency():
    from your_package import ONNXEmbedder
    embedder = ONNXEmbedder("./models/minilm_onnx")

    test_text = "This is a representative query for performance testing purposes."

    # Warm up
    for _ in range(3):
        embedder.embed([test_text])

    # Measure
    times = []
    for _ in range(50):
        start = time.perf_counter()
        embedder.embed([test_text])
        times.append((time.perf_counter() - start) * 1000)

    p50 = sorted(times)[25]
    p95 = sorted(times)[47]

    assert p50 < 15.0, f"p50 embedding latency {p50:.1f}ms exceeds 15ms budget"
    assert p95 < 30.0, f"p95 embedding latency {p95:.1f}ms exceeds 30ms budget"

def test_batch_embedding_throughput():
    from your_package import ONNXEmbedder
    embedder = ONNXEmbedder("./models/minilm_onnx")

    batch = [f"Document number {i} with some representative content length." for i in range(100)]

    start = time.perf_counter()
    embeddings = embedder.embed(batch)
    elapsed = time.perf_counter() - start

    assert len(embeddings) == 100
    throughput = 100 / elapsed
    assert throughput > 200, f"Batch throughput {throughput:.0f} docs/sec is below 200 docs/sec minimum"
```

> **Warning:** Performance tests that run against a shared embedding model instance in CI will produce wildly variable results depending on what else is running. Either run performance tests in isolation on dedicated hardware, or set your thresholds loose enough (3x your target) to account for CI noise and treat them as regression guards rather than SLO validators.

**Key Takeaways**

- Test retrieval behavior (which documents come back for which queries), not vector values; vector-equality tests are brittle and test the wrong thing.
- Include negative retrieval tests: verify that unrelated queries return no results above your similarity threshold.
- Test stale chunk cleanup explicitly — it is the most common incremental update bug and is invisible to other retrieval tests.
- Set performance regression test thresholds at 2-3x your actual SLO to account for CI environment noise.
- Integration tests should exercise the full pipeline path: chunking, embedding, storage, and retrieval in sequence.

**Practical Exercise**

Write five retrieval quality tests for your current pipeline: two positive tests (relevant queries that should return a specific document), one paraphrase test (a query that uses different words than the document but the same concept), one negative test (an unrelated query that should return nothing above threshold), and one ranking test (a query where you expect a specific document to be first). Run them. If any fail, your pipeline has a measurable quality gap.

---

## Chapter 9: Operating a Production Embedding System

The difference between a pipeline that works and a pipeline that you can operate is mostly about what happens when things go wrong. A well-operated system fails loudly, recovers automatically, and gives you enough information to understand what happened and prevent it from happening again. Getting there requires deliberate choices about deployment, resource management, graceful degradation, and runbooks.

### Deployment Patterns

Embedding models should not be reloaded on every process restart. The ONNX model load is fast — under a second for most sentence transformers — but the tokenizer initialization adds overhead, and in a high-churn deployment environment (frequent restarts from deploys, OOM kills, health check failures), constant reloading adds up. Use a singleton pattern with a warm-up routine.

```python
import threading
from typing import Optional

class EmbedderSingleton:
    _instance: Optional["EmbedderSingleton"] = None
    _lock = threading.Lock()

    def __new__(cls, model_dir: str):
        with cls._lock:
            if cls._instance is None:
                instance = super().__new__(cls)
                instance._init(model_dir)
                cls._instance = instance
        return cls._instance

    def _init(self, model_dir: str):
        from your_package import ONNXEmbedder
        self.embedder = ONNXEmbedder(model_dir)
        self.model_fingerprint = compute_model_fingerprint(model_dir)
        self._warm_up()

    def _warm_up(self):
        warmup_texts = [
            "warmup query one",
            "warmup query two",
            "warmup query three",
        ]
        self.embedder.embed(warmup_texts)

    def embed(self, texts: list[str]):
        return self.embedder.embed(texts)
```

The warm-up matters because ONNX Runtime's first inference call triggers JIT compilation of the computation graph. Without warm-up, your first real query takes 200-500ms. With warm-up, it runs at steady-state latency.

### Graceful Degradation

When your vector store is unavailable, your application should not fail completely. Semantic search failing gracefully — returning an empty result set with a status indicator — is better than returning a 500 error that blocks the entire page load.

```python
from enum import Enum
from dataclasses import dataclass

class SearchStatus(Enum):
    OK = "ok"
    DEGRADED_EMPTY = "degraded_empty"  # Vector store unavailable
    DEGRADED_STALE = "degraded_stale"  # Index is stale beyond SLO

@dataclass
class SearchResponse:
    results: list[dict]
    status: SearchStatus
    latency_ms: float

async def search_with_fallback(
    query: str,
    embedder,
    vector_store,
    timeout_seconds: float = 2.0,
) -> SearchResponse:
    start = time.perf_counter()

    try:
        query_embedding = embedder.embed([query])[0]

        results = await asyncio.wait_for(
            asyncio.get_event_loop().run_in_executor(
                None, vector_store.search, query_embedding, 5
            ),
            timeout=timeout_seconds
        )

        return SearchResponse(
            results=results,
            status=SearchStatus.OK,
            latency_ms=(time.perf_counter() - start) * 1000
        )

    except asyncio.TimeoutError:
        return SearchResponse(
            results=[],
            status=SearchStatus.DEGRADED_EMPTY,
            latency_ms=(time.perf_counter() - start) * 1000
        )
    except Exception as e:
        print(f"Search failed: {e}")
        return SearchResponse(
            results=[],
            status=SearchStatus.DEGRADED_EMPTY,
            latency_ms=(time.perf_counter() - start) * 1000
        )
```

### Resource Limits and Memory Management

Set explicit memory limits on your embedding service process. If the process leaks memory — which Python processes do under long-running load, through NumPy array accumulation, ChromaDB caching, or lingering database cursors — you want the process to restart cleanly rather than to be killed by OOM and leave your server in a partially degraded state.

In a systemd unit or Docker container, set `MemoryMax` to 110% of your measured steady-state RSS. When the process approaches the limit, it will be killed cleanly and restarted by your process supervisor. This is preferable to operating in a degraded state near the OOM boundary.

> **Key Insight:** In-memory indexes (NumPy arrays) should be rebuilt from the database on startup, not checkpointed and reloaded. Database persistence is more reliable than file-system state, and the rebuild time for a 100,000-chunk index is under 10 seconds on local hardware.

### Runbooks

Every production system needs a set of documented responses to predictable failure scenarios. For embedding pipelines, the minimum runbook set is:

**Scenario: Query latency spike**
1. Check `QueryTrace` logs for which component has elevated latency.
2. If `embed_ms` is high: check ONNX Runtime thread count, check for concurrent ingestion jobs saturating CPU.
3. If `retrieve_ms` is high: check Postgres connection pool saturation, check HNSW index size and trigger `REINDEX` if needed.
4. If `total_ms` is high but components look normal: check for Python GC pauses (set `PYTHONGC` flags, run `gc.collect()` periodically).

**Scenario: Retrieval quality drop**
1. Run the fixed eval set immediately.
2. Check model fingerprint consistency between query-time model and stored embeddings.
3. Check the similarity score distribution against the 30-day baseline.
4. If fingerprints diverge: identify when the model was updated, trigger full re-indexing.
5. If fingerprints match but quality is down: check for index corruption, trigger `REINDEX`, re-run eval.

**Scenario: Index staleness alert**
1. Check Git webhook delivery logs for missed events.
2. Identify files with content hash mismatches.
3. Trigger targeted re-indexing for stale files.
4. Verify staleness metric returns to baseline within 15 minutes.

> **Warning:** Document runbooks at the time you build the system, not after the first incident. After an incident, you document what you did, not what you should do. The difference matters for the next person on-call.

### Capacity Planning

Capacity planning for an embedding pipeline has three variables: corpus size, query rate, and update rate. Measure all three in production, then extrapolate:

- **Memory**: 384-dimensional float32 embeddings take 1.5KB per chunk. 100,000 chunks = 150MB for vectors alone. Add 2-3x for metadata, indexes, and overhead.
- **Disk**: Same calculation for pgvector. Budget 500MB per 100,000 chunks including HNSW index overhead.
- **CPU**: At 6ms per embedding and 4 threads, a single core can handle roughly 160 queries per second. For 95th-percentile headroom, plan for 50% of theoretical max as sustained throughput.
- **Ingestion**: Batch re-indexing at batch size 100 processes roughly 1,500 chunks per minute on a 4-core CPU with ONNX Runtime.

**Key Takeaways**

- Warm up the ONNX model at process start; the first inference call triggers JIT compilation and takes 200-500ms without warm-up.
- Implement graceful degradation at the search path level so that vector store unavailability returns empty results rather than 500 errors.
- Set process memory limits to 110% of measured steady-state RSS; prefer clean restarts over operating near OOM.
- Write runbooks at build time for the three most predictable failure scenarios: latency spike, quality drop, and index staleness.
- Rebuild in-memory indexes from the database on startup rather than checkpointing and reloading from disk.

**Practical Exercise**

Write a runbook for the "retrieval quality drop" scenario for your current system. It should include: how to detect it (specific metric and threshold), how to diagnose the cause (the decision tree), and how to remediate each possible cause (specific commands or procedures). Share it with whoever else is on-call. If writing the runbook reveals steps you cannot currently execute — because you lack the metrics, or the diagnostic tooling, or the re-indexing mechanism — those gaps are your next build priority.

---

## Conclusion: The System That Holds

Building an embedding pipeline is not hard. You can have something working in an afternoon. Building one that holds up in production — one that you can monitor, debug, tune, and operate for 18 months without a surprise rebuild — is a different problem.

The gap between those two things is mostly about intentionality. A notebook embedding script becomes a production system by answering a specific set of questions that notebooks never have to answer: Where does the model live? What happens to the index when the model changes? How do you know when retrieval quality has dropped? What happens when the vector store is slow? How do you update 80 changed files without re-embedding 80,000?

The answers to those questions determine your architecture more than any choice of model, vector store, or framework. The specific choices matter less than having made them deliberately, with the constraints of production in mind.

### What You Now Have

The patterns in this book give you a complete, operable embedding pipeline:

An ONNX Runtime serving layer that uses 30MB of model weight RAM instead of 496MB, runs embedding at 6ms per query on a CPU, and handles concurrent queries without thread contention. A chunking system that produces semantically coherent units rather than arbitrary text windows, handles multiple content types correctly, and filters out the short chunks that pollute embedding spaces. A vector store decision framework grounded in measured latency and cost numbers — with pgvector as the default for teams that already run Postgres, and in-memory NumPy search as the overlooked winner for corpora under 100,000 chunks. A serving layer that instruments latency at component granularity, batches concurrent queries, and calibrates similarity thresholds from eval data rather than intuition.

A monitoring system that tracks model version consistency, retrieval quality on a fixed eval set, and similarity score distributions — and alerts on deviation rather than absolute values, so the alerts remain calibrated as your system evolves. An incremental re-indexing system driven by content hashes and Git webhooks, so your index stays current with your codebase without full rebuilds or polling overhead. A test suite that validates retrieval behavior rather than vector values, catches stale chunk accumulation, and guards performance regressions. And an operational posture with warm-up routines, graceful degradation, memory limits, and documented runbooks.

### The One Thing Most Teams Skip

Of all the topics in this book, monitoring retrieval quality is the one most consistently skipped. Teams invest heavily in the ingestion pipeline and the serving layer, build comprehensive infrastructure dashboards, and then have no idea whether the system is actually returning the right documents until a user complains.

The minimum viable retrieval quality monitor is 20 labeled query-document pairs, a script that runs them and records recall@5, and a scheduled job that runs that script daily and alerts if recall drops by 10 percentage points. You can build it in an afternoon. Not building it means flying blind — and embedding pipelines are unusually good at hiding the fact that they have stopped working correctly.

### What to Build Next

If you have shipped the patterns in this book, there are three directions worth exploring next:

**Hybrid retrieval** combines semantic search with BM25 keyword search, using reciprocal rank fusion to merge the result lists. It outperforms pure semantic search on queries that include specific technical terms, identifiers, or exact phrases that semantic similarity does not handle well.

**Reranking** adds a second-stage cross-encoder model that scores the top-k semantic search results for relevance to the query. Cross-encoders are slower than bi-encoders (they process query and document jointly, not separately) but more accurate. Running a cross-encoder over the top 20 results and returning the top 5 is a standard pattern for high-stakes retrieval applications.

**Domain adaptation** fine-tunes the embedding model on your specific domain's data. A model fine-tuned on your codebase or your documentation corpus will outperform a general-purpose sentence transformer on your specific queries. The fine-tuning process requires labeled pairs, but the threshold is lower than most teams expect — a few hundred high-quality examples can produce measurable improvements.

### The Maintenance Mindset

Production embedding systems require ongoing attention in a way that most production services do not. The data changes. The queries shift. The index grows. What worked at 10,000 chunks may need adjustment at 100,000. The threshold that was well-calibrated six months ago may have drifted. The chunking strategy that was designed for your original document set may not handle the new content types that have accumulated in your corpus.

Schedule a quarterly pipeline review. Run the eval set. Check the latency decomposition. Verify that the threshold is still calibrated. Check index health. It takes two hours and prevents the kind of silent degradation that takes two weeks to diagnose after a user complaint finally surfaces it.

The system that holds is the one that gets maintained. Everything else in this book is preparation for that.

---

## Appendix A: Glossary

**Approximate Nearest Neighbor (ANN)**: A retrieval method that finds vectors close to the query vector without guaranteeing an exact result. Faster than exact search at scale; used by HNSW and IVFFlat indexes.

**BM25**: A keyword-based ranking function that scores documents based on term frequency, inverse document frequency, and document length normalization. Effective for queries containing specific terms or identifiers.

**Chunk**: The unit of retrieval in an embedding pipeline. A portion of a source document that is embedded as a single vector and returned as a single result.

**ChromaDB**: An open-source embedded vector database. Runs in-process; no separate server required. Well-suited for development and small-to-medium corpora.

**Cosine Similarity**: A measure of similarity between two vectors defined as the dot product of their L2-normalized versions. Ranges from -1 (opposite) to 1 (identical direction). Standard for embedding similarity.

**Drift**: In embedding systems, drift refers to changes in the embedding space over time — either because the model changes, the data distribution shifts, or the index accumulates changes without maintenance.

**HNSW (Hierarchical Navigable Small World)**: A graph-based approximate nearest neighbor index. Provides fast query time and good recall. Standard in pgvector and most ANN libraries. Degrades silently without periodic maintenance.

**Incremental Re-indexing**: Re-embedding only documents that have changed since the last indexing run, rather than rebuilding the entire index.

**L2 Normalization**: Scaling a vector so that its Euclidean norm equals 1. Required for cosine similarity computed via dot product.

**Mean Pooling**: Averaging the token-level embeddings from a transformer model to produce a single document-level embedding. Standard for sentence transformer models.

**MRR (Mean Reciprocal Rank)**: A retrieval quality metric. For each query, records the reciprocal of the rank at which the first relevant result appears. Higher is better; maximum is 1.0.

**ONNX (Open Neural Network Exchange)**: A standardized format for representing machine learning models. ONNX Runtime is an inference engine that executes ONNX models faster and with lower memory overhead than training frameworks.

**pgvector**: A Postgres extension that adds vector data types and similarity search operators. Enables storing and querying embeddings directly in an existing Postgres database.

**Recall@K**: The fraction of queries for which the relevant document appears in the top-K results. Primary retrieval quality metric.

**RSS (Resident Set Size)**: The portion of a process's memory that is held in RAM. The relevant memory metric for production capacity planning.

**Similarity Threshold**: A minimum similarity score below which results are filtered out. Should be calibrated from labeled eval data rather than set statically.

**Sentence Transformer**: A class of transformer models fine-tuned to produce semantically meaningful sentence-level embeddings. Examples: `all-MiniLM-L6-v2`, `all-mpnet-base-v2`.

**Vector Store**: A database optimized for storing and querying high-dimensional vectors. Examples: pgvector, ChromaDB, Pinecone.

---

## Appendix B: Tools and Resources

### Embedding Runtimes

**ONNX Runtime** — `onnxruntime` (PyPI)
Inference runtime for ONNX models. Use `CPUExecutionProvider` for CPU inference.

**Optimum** — `optimum[onnxruntime]` (PyPI)
Hugging Face library for exporting transformer models to ONNX format.

**sentence-transformers** — `sentence-transformers` (PyPI)
High-level library for loading and using sentence embedding models. Handles pooling and normalization.

### Vector Stores

**pgvector** — `pgvector` (PostgreSQL extension)
Install via `apt install postgresql-16-pgvector` or build from source. Python client: `psycopg2` or `asyncpg`.

**ChromaDB** — `chromadb` (PyPI)
Embedded vector database. No separate server required for local or single-process use.

**Pinecone** — `pinecone-client` (PyPI)
Managed vector database. API-based; no self-hosted option.

### Models

**all-MiniLM-L6-v2** — `sentence-transformers/all-MiniLM-L6-v2` (Hugging Face)
384-dimensional embeddings. Fast, small, well-suited for CPU inference. Good general-purpose baseline.

**all-mpnet-base-v2** — `sentence-transformers/all-mpnet-base-v2` (Hugging Face)
768-dimensional embeddings. Higher quality than MiniLM; 2-3x slower. Use when retrieval quality matters more than latency.

**bge-small-en-v1.5** — `BAAI/bge-small-en-v1.5` (Hugging Face)
384-dimensional embeddings. Strong retrieval quality for its size; competitive with MiniLM.

### Supporting Libraries

**psycopg2** / **asyncpg** — PostgreSQL clients for synchronous and async Python respectively.

**NumPy** — Required for in-memory index operations and embedding normalization.

**scikit-learn** — `sklearn.metrics.precision_recall_curve` for threshold calibration.

**FastAPI** — Recommended for building the serving layer; native async support with Pydantic validation.

**psutil** — For measuring process RSS in memory monitoring scripts.

---

## Appendix C: Further Reading

### Approximate Nearest Neighbor Algorithms

**"Efficient and Robust Approximate Nearest Neighbor Search Using Hierarchical Navigable Small World Graphs"** — Malkov and Yashunin, 2018. The HNSW paper. Understanding the algorithm helps you tune `m` and `ef_construction` parameters meaningfully.

**"Billion-Scale Approximate Nearest Neighbor Search"** — Johnson, Douze, Jégou (Facebook AI Research). The FAISS paper. Covers IVFFlat, PQ, and HNSW variants with empirical comparisons.

### Embedding Models and Training

**"Sentence-BERT: Sentence Embeddings Using Siamese BERT-Networks"** — Reimers and Gurevych, 2019. The foundational paper for sentence transformer architecture. Explains why bi-encoder models are fast but cross-encoders are more accurate.

**"MTEB: Massive Text Embedding Benchmark"** — Muennighoff et al., 2022. The standard benchmark for embedding model evaluation. Use it to compare models on tasks similar to your retrieval use case before committing to a model.

### Production ML Systems

**"Machine Learning Systems Design"** — Chip Huyen. Covers production ML infrastructure holistically, including data pipelines, monitoring, and deployment patterns that apply beyond embedding systems.

**"Designing Data-Intensive Applications"** — Martin Kleppmann. The database and distributed systems concepts in chapters 3, 7, and 11 are directly applicable to vector store selection and operational reliability.

### Retrieval and Search

**"Pretrained Transformers for Text Ranking: BERT and Beyond"** — Lin et al., 2021. Covers the academic state of the art in neural retrieval, including cross-encoder reranking, which is the natural next step after deploying a bi-encoder retrieval system.

**"Introduction to Information Retrieval"** — Manning, Raghavan, Schütze (available free online). The BM25 chapter and the evaluation metrics chapter are worth reading in full before designing your retrieval quality monitoring.

---

*© 2026 Pyckle. All rights reserved. This guide may be shared freely for personal and educational use. Commercial reproduction or redistribution requires written permission. Contact kellyprice@pyckle.co.*
