目录

RAG 实战:从原理到生产环境的七个踩坑点

背景

RAG 的原理听起来简单:让 LLM 回答问题前,先从知识库里检索相关内容,把相关内容当作 context 一起发给 LLM。

用户问题 → 检索 → 相关文档 → LLM → 回答

但原理和工程实现之间,隔着一百个大坑。

这篇文章是我的踩坑笔记,不是 RAG 入门教程。我假设你知道 RAG 是什么,重点讲生产环境真正遇到了什么问题

坑 1: Embedding 模型选错了

Embedding 模型决定了你检索的质量上限。

常见的错误:直接用 OpenAI 的 text-embedding-ada-002

ada-002 在 2023 年底已经不是最好的了。新出的模型在中文理解和代码检索上有显著提升。

# ❌ 直接用 ada-002
from openai import OpenAI
response = client.embeddings.create(
    model="text-embedding-ada-002",
    input="你的文本"
)

# ✅ 考虑换成这些(根据你的场景)
# 中文 embedding: multilingual-e5-large, BGE
# 代码检索: codellama embedding, GTE-code

实际测试结果(我们内部数据):

Embedding 模型 中文语义检索 代码检索
ada-002 72% 65%
text-embedding-3-small 78% 71%
BGE-large-zh 89% 68%
GTE-code-large 81% 91%

选什么取决于你的语料。如果是技术文档,代码检索能力很重要。

坑 2: Chunk 策略拍脑袋

大多数教程教你:

# 按固定长度分块
texts = text_splitter.split_text(document)

这是最 naive 的方案。固定长度 chunk 会把一句话从中间切开,检索时拿出一半没头没尾的句子,LLM 根本没法理解。

更好的策略:语义分块

from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,      # 目标块大小
    chunk_overlap=50,    # 块之间重叠,保持上下文
    separators=["。\n", ",\n", "\n", " "]  # 按句子/段落分割
)

但更好的做法是按语义单元分块

def semantic_chunk(text):
    """
    按语义单元分块:每块是一个完整的问答对或主题段落
    """
    sections = []
    
    for section in text.split("\n## "):  # 按 Markdown 标题分割
        if len(section) < 100:
            continue  # 跳过太短的标题行
        
        # 如果块太大,按子标题继续拆分
        if len(section) > 1000:
            subsections = section.split("\n### ")
            for sub in subsections:
                if len(sub) > 50:
                    sections.append(sub.strip())
        else:
            sections.append(section.strip())
    
    return sections

评估你的分块策略

def evaluate_chunking(chunks, top_k=5):
    """
    简单评估:对于每个测试问题,看检索出来的块是否真的相关
    """
    test_questions = [
        "这个文档讲了什么?",
        "如何部署到生产环境?",
        "支持的配置参数有哪些?"
    ]
    
    for q in test_questions:
        results = vector_store.similarity_search(q, k=top_k)
        print(f"\nQ: {q}")
        for i, r in enumerate(results):
            print(f"  [{i}] {r.page_content[:100]}...")

坑 3: 只用向量检索

大多数人只做了向量相似度检索:

# 这是不够的
results = vector_store.similarity_search(query, k=5)

当用户的 query 是结构化的,比如"查找配置项 timeout > 30s 的所有日志",向量检索根本没用。你需要关键词检索 + 向量检索的混合

from langchain.retrievers import EnsembleRetriever

# BM25 关键词检索
bm25_retriever = BM25Retriever.from_texts(chunks)

# 向量检索
vector_retriever = vector_store.as_retriever(search_kwargs={"k": 5})

# 混合检索
ensemble_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, vector_retriever],
    weights=[0.3, 0.7]  # 关键词权重低,向量权重高
)

坑 4: 没有做 Query 改写

用户的问题和知识库里的内容,表达方式往往不同。

用户问:"这个服务的 CPU 占用很高怎么办?"
知识库写:"高负载排查指南 - CPU使用率异常处理"

这两个意思一样,但文字表达完全不同。直接检索可能匹配不到。

Query 改写(Query Rewriting):

def rewrite_query(query, llm):
    """
    用 LLM 把用户的问题改写成更适合检索的形式
    """
    prompt = f"""把下面的自然语言问题,改写成更适合知识库检索的表达形式。
    保持原意,但使用更正式、更接近文档表述方式的语言。
    
    原始问题: {query}
    改写后:"""
    
    response = llm.complete(prompt)
    return response.text.strip()

实际使用:

# 用户问:我服务起不来
original = "我服务起不来"
rewritten = rewrite_query(original, llm)  # 可能是 "服务启动失败 故障排查"

# 用改写后的 query 检索
results = ensemble_retriever.get_relevant_documents(rewritten)

坑 5: Context 塞太满

检索回来 5 个块,每个块 1000 tokens,加起来 5000 tokens。加上你的 system prompt 和对话历史,context 窗口很快就满了。

几个处理策略:

5.1 只取最相关的块

def rerank_and_truncate(query, docs, llm, max_tokens=3000):
    """
    用 LLM 做重排序,然后截断到 max_tokens
    """
    # 先检索更多候选
    candidates = vector_store.similarity_search(query, k=10)
    
    # 用 LLM 重排序(简单做法:让 LLM 逐个评分)
    scored = []
    for doc in candidates:
        score_prompt = f"""判断这个文档片段对回答问题的相关程度。
        问题: {query}
        文档: {doc.page_content}
        相关程度: 1-5 分"""
        
        score = int(llm.complete(score_prompt).text.strip())
        scored.append((score, doc))
    
    # 取最高的几个
    top_docs = sorted(scored, key=lambda x: x[0], reverse=True)[:5]
    
    # 截断到 max_tokens
    total_tokens = 0
    selected = []
    for score, doc in top_docs:
        doc_tokens = len(doc.page_content) // 4  # 粗略估算
        if total_tokens + doc_tokens <= max_tokens:
            selected.append(doc)
            total_tokens += doc_tokens
    
    return selected

5.2 用摘要而不是完整块

def summarize_chunks(chunks, llm, max_per_chunk=200):
    """
    先把每个块摘要,再塞进 context
    """
    summarized = []
    for chunk in chunks:
        if len(chunk.page_content) > max_per_chunk * 4:  # 超过约 200 tokens
            summary = llm.complete(
                f"摘要以下内容,保留关键信息,不超过{max_per_chunk}字:\n\n{chunk.page_content}"
            )
            summarized.append(summary.text.strip())
        else:
            summarized.append(chunk.page_content)
    
    return summarized

坑 6: 不知道 Embedding 需要定期更新

知识库是动态的,但 embedding 数据库不会自动更新。

问题

  • 文档更新了,embedding 还是旧的,检索结果不准确
  • 产品迭代了,术语变了,用户用新术语搜不到

解决方案:

# 定期重建 embedding
import schedule

def rebuild_index_if_needed():
    """
    检查文档变更时间,有更新就重建 embedding
    """
    docs_version = get_docs_version()  # 从数据库或文件 hash 获取
    
    if docs_version != cached_version:
        print("文档有更新,重新构建 index...")
        
        # 增量更新:只处理变更的文档
        changed_docs = get_changed_documents(cached_version, docs_version)
        
        for doc in changed_docs:
            # 删除旧 embedding
            vector_store.delete_by_doc_id(doc.id)
            
            # 重建新 embedding
            new_embedding = embed_text(doc.content)
            vector_store.add_texts([doc.content], ids=[doc.id])
        
        update_cached_version(docs_version)

# 每天检查一次
schedule.every().day.at("02:00").do(rebuild_index_if_needed)

坑 7: 没有评估指标

上线之后怎么知道 RAG 系统好不好?大多数团队没有这个意识。

基础指标

def evaluate_rag_system(test_questions, ground_truth, rag_pipeline):
    """
    基础评估:检索 Recall 和 Answer 准确率
    """
    results = {
        'retrieval_recall': [],
        'answer_relevance': []
    }
    
    for question, expected_docs in zip(test_questions, ground_truth):
        # 检索评估
        retrieved = rag_pipeline.retrieve(question)
        retrieved_ids = {doc.id for doc in retrieved}
        expected_ids = {doc.id for doc in expected_docs}
        
        recall = len(retrieved_ids & expected_ids) / len(expected_ids)
        results['retrieval_recall'].append(recall)
        
        # 回答质量评估(用 LLM 打分)
        answer = rag_pipeline.answer(question)
        relevance_score = llm.evaluate(
            f"问题: {question}\n回答: {answer}\n打分 1-5,5分最好"
        )
        results['answer_relevance'].append(int(relevance_score))
    
    return {
        'avg_recall': sum(results['retrieval_recall']) / len(results['retrieval_recall']),
        'avg_relevance': sum(results['answer_relevance']) / len(results['answer_relevance'])
    }

总结

RAG 真正落地生产的难点:

  1. Embedding 选型 — 不是 ada-002 一刀切,按场景选
  2. 分块策略 — 语义分块远优于固定长度
  3. 混合检索 — 关键词 + 向量,别只靠向量
  4. Query 改写 — 用户语言和文档语言的 gap
  5. Context 管理 — 截断、摘要、重排序
  6. 索引更新 — 知识库变了 embedding 要同步
  7. 评估体系 — 没有指标就没法优化

RAG 不是一个"搭起来就能用"的东西,需要持续的调优和监控。