RAG 实战:从原理到生产环境的七个踩坑点
背景
RAG 的原理听起来简单:让 LLM 回答问题前,先从知识库里检索相关内容,把相关内容当作 context 一起发给 LLM。
用户问题 → 检索 → 相关文档 → LLM → 回答但原理和工程实现之间,隔着一百个大坑。
这篇文章是我的踩坑笔记,不是 RAG 入门教程。我假设你知道 RAG 是什么,重点讲生产环境真正遇到了什么问题。
坑 1: Embedding 模型选错了
Embedding 模型决定了你检索的质量上限。
常见的错误:直接用 OpenAI 的 text-embedding-ada-002。
ada-002 在 2023 年底已经不是最好的了。新出的模型在中文理解和代码检索上有显著提升。
# ❌ 直接用 ada-002
from openai import OpenAI
response = client.embeddings.create(
model="text-embedding-ada-002",
input="你的文本"
)
# ✅ 考虑换成这些(根据你的场景)
# 中文 embedding: multilingual-e5-large, BGE
# 代码检索: codellama embedding, GTE-code实际测试结果(我们内部数据):
| Embedding 模型 | 中文语义检索 | 代码检索 |
|---|---|---|
| ada-002 | 72% | 65% |
| text-embedding-3-small | 78% | 71% |
| BGE-large-zh | 89% | 68% |
| GTE-code-large | 81% | 91% |
选什么取决于你的语料。如果是技术文档,代码检索能力很重要。
坑 2: Chunk 策略拍脑袋
大多数教程教你:
# 按固定长度分块
texts = text_splitter.split_text(document)这是最 naive 的方案。固定长度 chunk 会把一句话从中间切开,检索时拿出一半没头没尾的句子,LLM 根本没法理解。
更好的策略:语义分块
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 目标块大小
chunk_overlap=50, # 块之间重叠,保持上下文
separators=["。\n", ",\n", "\n", " "] # 按句子/段落分割
)但更好的做法是按语义单元分块:
def semantic_chunk(text):
"""
按语义单元分块:每块是一个完整的问答对或主题段落
"""
sections = []
for section in text.split("\n## "): # 按 Markdown 标题分割
if len(section) < 100:
continue # 跳过太短的标题行
# 如果块太大,按子标题继续拆分
if len(section) > 1000:
subsections = section.split("\n### ")
for sub in subsections:
if len(sub) > 50:
sections.append(sub.strip())
else:
sections.append(section.strip())
return sections评估你的分块策略
def evaluate_chunking(chunks, top_k=5):
"""
简单评估:对于每个测试问题,看检索出来的块是否真的相关
"""
test_questions = [
"这个文档讲了什么?",
"如何部署到生产环境?",
"支持的配置参数有哪些?"
]
for q in test_questions:
results = vector_store.similarity_search(q, k=top_k)
print(f"\nQ: {q}")
for i, r in enumerate(results):
print(f" [{i}] {r.page_content[:100]}...")坑 3: 只用向量检索
大多数人只做了向量相似度检索:
# 这是不够的
results = vector_store.similarity_search(query, k=5)当用户的 query 是结构化的,比如"查找配置项 timeout > 30s 的所有日志",向量检索根本没用。你需要关键词检索 + 向量检索的混合。
from langchain.retrievers import EnsembleRetriever
# BM25 关键词检索
bm25_retriever = BM25Retriever.from_texts(chunks)
# 向量检索
vector_retriever = vector_store.as_retriever(search_kwargs={"k": 5})
# 混合检索
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, vector_retriever],
weights=[0.3, 0.7] # 关键词权重低,向量权重高
)坑 4: 没有做 Query 改写
用户的问题和知识库里的内容,表达方式往往不同。
用户问:"这个服务的 CPU 占用很高怎么办?"
知识库写:"高负载排查指南 - CPU使用率异常处理"这两个意思一样,但文字表达完全不同。直接检索可能匹配不到。
Query 改写(Query Rewriting):
def rewrite_query(query, llm):
"""
用 LLM 把用户的问题改写成更适合检索的形式
"""
prompt = f"""把下面的自然语言问题,改写成更适合知识库检索的表达形式。
保持原意,但使用更正式、更接近文档表述方式的语言。
原始问题: {query}
改写后:"""
response = llm.complete(prompt)
return response.text.strip()实际使用:
# 用户问:我服务起不来
original = "我服务起不来"
rewritten = rewrite_query(original, llm) # 可能是 "服务启动失败 故障排查"
# 用改写后的 query 检索
results = ensemble_retriever.get_relevant_documents(rewritten)坑 5: Context 塞太满
检索回来 5 个块,每个块 1000 tokens,加起来 5000 tokens。加上你的 system prompt 和对话历史,context 窗口很快就满了。
几个处理策略:
5.1 只取最相关的块
def rerank_and_truncate(query, docs, llm, max_tokens=3000):
"""
用 LLM 做重排序,然后截断到 max_tokens
"""
# 先检索更多候选
candidates = vector_store.similarity_search(query, k=10)
# 用 LLM 重排序(简单做法:让 LLM 逐个评分)
scored = []
for doc in candidates:
score_prompt = f"""判断这个文档片段对回答问题的相关程度。
问题: {query}
文档: {doc.page_content}
相关程度: 1-5 分"""
score = int(llm.complete(score_prompt).text.strip())
scored.append((score, doc))
# 取最高的几个
top_docs = sorted(scored, key=lambda x: x[0], reverse=True)[:5]
# 截断到 max_tokens
total_tokens = 0
selected = []
for score, doc in top_docs:
doc_tokens = len(doc.page_content) // 4 # 粗略估算
if total_tokens + doc_tokens <= max_tokens:
selected.append(doc)
total_tokens += doc_tokens
return selected5.2 用摘要而不是完整块
def summarize_chunks(chunks, llm, max_per_chunk=200):
"""
先把每个块摘要,再塞进 context
"""
summarized = []
for chunk in chunks:
if len(chunk.page_content) > max_per_chunk * 4: # 超过约 200 tokens
summary = llm.complete(
f"摘要以下内容,保留关键信息,不超过{max_per_chunk}字:\n\n{chunk.page_content}"
)
summarized.append(summary.text.strip())
else:
summarized.append(chunk.page_content)
return summarized坑 6: 不知道 Embedding 需要定期更新
知识库是动态的,但 embedding 数据库不会自动更新。
问题:
- 文档更新了,embedding 还是旧的,检索结果不准确
- 产品迭代了,术语变了,用户用新术语搜不到
解决方案:
# 定期重建 embedding
import schedule
def rebuild_index_if_needed():
"""
检查文档变更时间,有更新就重建 embedding
"""
docs_version = get_docs_version() # 从数据库或文件 hash 获取
if docs_version != cached_version:
print("文档有更新,重新构建 index...")
# 增量更新:只处理变更的文档
changed_docs = get_changed_documents(cached_version, docs_version)
for doc in changed_docs:
# 删除旧 embedding
vector_store.delete_by_doc_id(doc.id)
# 重建新 embedding
new_embedding = embed_text(doc.content)
vector_store.add_texts([doc.content], ids=[doc.id])
update_cached_version(docs_version)
# 每天检查一次
schedule.every().day.at("02:00").do(rebuild_index_if_needed)坑 7: 没有评估指标
上线之后怎么知道 RAG 系统好不好?大多数团队没有这个意识。
基础指标
def evaluate_rag_system(test_questions, ground_truth, rag_pipeline):
"""
基础评估:检索 Recall 和 Answer 准确率
"""
results = {
'retrieval_recall': [],
'answer_relevance': []
}
for question, expected_docs in zip(test_questions, ground_truth):
# 检索评估
retrieved = rag_pipeline.retrieve(question)
retrieved_ids = {doc.id for doc in retrieved}
expected_ids = {doc.id for doc in expected_docs}
recall = len(retrieved_ids & expected_ids) / len(expected_ids)
results['retrieval_recall'].append(recall)
# 回答质量评估(用 LLM 打分)
answer = rag_pipeline.answer(question)
relevance_score = llm.evaluate(
f"问题: {question}\n回答: {answer}\n打分 1-5,5分最好"
)
results['answer_relevance'].append(int(relevance_score))
return {
'avg_recall': sum(results['retrieval_recall']) / len(results['retrieval_recall']),
'avg_relevance': sum(results['answer_relevance']) / len(results['answer_relevance'])
}总结
RAG 真正落地生产的难点:
- Embedding 选型 — 不是 ada-002 一刀切,按场景选
- 分块策略 — 语义分块远优于固定长度
- 混合检索 — 关键词 + 向量,别只靠向量
- Query 改写 — 用户语言和文档语言的 gap
- Context 管理 — 截断、摘要、重排序
- 索引更新 — 知识库变了 embedding 要同步
- 评估体系 — 没有指标就没法优化
RAG 不是一个"搭起来就能用"的东西,需要持续的调优和监控。