## RAG Pipeline Architecture
### Core Components
```python
from typing import List, Dict, Any
from dataclasses import dataclass
import numpy as np
@dataclass
class Document:
id: str
content: str
metadata: Dict[str, Any]
embedding: List[float] = None
@dataclass
class Chunk:
id: str
text: str
source_doc_id: str
embedding: List[float] = None
metadata: Dict[str, Any] = None
class RAGPipeline:
"""Complete RAG pipeline implementation"""
def __init__(self, embedding_model, vector_store, llm_client):
self.embedder = embedding_model
self.vector_store = vector_store
self.llm = llm_client
self.chunk_size = 512
self.chunk_overlap = 50
def chunk_document(self, document: Document) -> List[Chunk]:
"""Split document into semantic chunks"""
text = document.content
chunks = []
start = 0
chunk_id = 0
while start < len(text):
end = start + self.chunk_size
if end < len(text):
for delimiter in ['\n\n', '. ', '! ', '? ']:
pos = text.rfind(delimiter, start, end + 100)
if pos != -1 and pos > start + self.chunk_size * 0.5:
end = pos + len(delimiter)
break
chunk_text = text[start:end].strip()
if chunk_text:
chunks.append(Chunk(
id=f"{document.id}_chunk_{chunk_id}",
text=chunk_text,
source_doc_id=document.id,
metadata=document.metadata
))
chunk_id += 1
start = end - self.chunk_overlap
return chunks
async def index_documents(self, documents: List[Document]):
"""Index documents into vector store"""
all_chunks = []
for doc in documents:
chunks = self.chunk_document(doc)
batch_size = 32
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [c.text for c in batch]
embeddings = await self.embedder.embed(texts)
for chunk, embedding in zip(batch, embeddings):
chunk.embedding = embedding
all_chunks.append(chunk)
await self.vector_store.upsert(all_chunks)
async def retrieve(self, query: str, top_k: int = 5) -> List[Chunk]:
"""Retrieve relevant chunks for query"""
query_embedding = await self.embedder.embed([query])
results = await self.vector_store.search(
query_embedding[0],
top_k=top_k
)
return results
async def generate(self, query: str, context_chunks: List[Chunk]) -> Dict[str, Any]:
"""Generate response with retrieved context"""
context_text = "\n\n".join([
f"[Source {i+1}]: {chunk.text}"
for i, chunk in enumerate(context_chunks)
])
prompt = f"""Answer the question based on the provided context.
Context:
{context_text}
Question: {query}
Instructions:
- Answer using only the information in the context
- If the context doesn't contain the answer, say "I don't have enough information"
- Cite sources using [Source X] notation
- Be concise but complete
Answer:"""
response = await self.llm.complete(prompt)
return {
"answer": response,
"sources": [
{
"id": chunk.id,
"source_doc": chunk.source_doc_id,
"text_preview": chunk.text[:200] + "..."
}
for chunk in context_chunks
]
}
async def query(self, query: str) -> Dict[str, Any]:
"""End-to-end RAG query"""
chunks = await self.retrieve(query)
result = await self.generate(query, chunks)
return result
```
### Advanced Retrieval with Hybrid Search
```python
class HybridRAG(RAGPipeline):
"""RAG with keyword + semantic search"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.keyword_weight = 0.3
self.semantic_weight = 0.7
async def retrieve_hybrid(self, query: str, top_k: int = 5) -> List[Chunk]:
"""Combine keyword and semantic search"""
semantic_results = await self.retrieve(query, top_k=top_k * 2)
keyword_results = await self.keyword_search(query, top_k=top_k * 2)
all_results = {}
for i, chunk in enumerate(semantic_results):
score = (len(semantic_results) - i) * self.semantic_weight
all_results[chunk.id] = {"chunk": chunk, "score": score}
for i, chunk in enumerate(keyword_results):
score = (len(keyword_results) - i) * self.keyword_weight
if chunk.id in all_results:
all_results[chunk.id]["score"] += score
else:
all_results[chunk.id] = {"chunk": chunk, "score": score}
ranked = sorted(
all_results.values(),
key=lambda x: x["score"],
reverse=True
)
return [r["chunk"] for r in ranked[:top_k]]
```
### Reranking with Cross-Encoder
```python
class RerankedRAG(RAGPipeline):
"""RAG with cross-encoder reranking"""
def __init__(self, *args, cross_encoder=None, **kwargs):
super().__init__(*args, **kwargs)
self.cross_encoder = cross_encoder
async def retrieve_with_rerank(self, query: str, top_k: int = 5) -> List[Chunk]:
"""Retrieve more candidates and rerank"""
candidates = await self.retrieve(query, top_k=top_k * 4)
if not self.cross_encoder:
return candidates[:top_k]
pairs = [(query, chunk.text) for chunk in candidates]
scores = await self.cross_encoder.score(pairs)
scored_chunks = list(zip(candidates, scores))
scored_chunks.sort(key=lambda x: x[1], reverse=True)
return [chunk for chunk, _ in scored_chunks[:top_k]]
```
## Vector Store Implementations
### Pinecone
```python
from pinecone import Pinecone
class PineconeStore:
def __init__(self, api_key, index_name, dimension=1536):
self.pc = Pinecone(api_key=api_key)
self.index = self.pc.Index(index_name)
async def upsert(self, chunks: List[Chunk]):
vectors = [
{
"id": chunk.id,
"values": chunk.embedding,
"metadata": {
"text": chunk.text,
"source": chunk.source_doc_id,
**chunk.metadata
}
}
for chunk in chunks
]
self.index.upsert(vectors=vectors)
async def search(self, query_embedding: List[float], top_k: int = 5):
results = self.index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True
)
return [
Chunk(
id=match.id,
text=match.metadata["text"],
source_doc_id=match.metadata["source"],
metadata={k: v for k, v in match.metadata.items() if k not in ["text", "source"]}
)
for match in results.matches
]
```
## Evaluation
```python
class RAGEvaluator:
"""Evaluate RAG system performance"""
def evaluate_retrieval(self, queries: List[dict]):
"""Measure retrieval quality"""
metrics = {
"hit_rate": 0,
"mrr": 0, # Mean Reciprocal Rank
"ndcg": 0
}
for q in queries:
results = self.rag.retrieve(q["query"])
result_ids = [r.source_doc_id for r in results]
if q["relevant_doc"] in result_ids:
metrics["hit_rate"] += 1
rank = result_ids.index(q["relevant_doc"]) + 1
metrics["mrr"] += 1 / rank
n = len(queries)
return {
"hit_rate": metrics["hit_rate"] / n,
"mrr": metrics["mrr"] / n
}
def evaluate_generation(self, qa_pairs: List[dict]):
"""Measure answer quality"""
scores = []
for qa in qa_pairs:
result = self.rag.query(qa["question"])
return scores
```
## Best Practices
1. **Chunking Strategy**
- Size: 200-500 tokens typically optimal
- Overlap: 10-20% prevents context loss
- Boundaries: Respect semantic units (paragraphs, sections)
2. **Embedding Models**
- Text-embedding-3-large for general use
- Fine-tune for domain-specific content
- Consider multilingual needs
3. **Metadata Filtering**
- Filter by date, source, category before search
- Improves precision and reduces latency
4. **Query Enhancement**
- Expand queries with synonyms
- Use HyDE (Hypothetical Document Embedding)
- Rewrite ambiguous queries