1. 环境准备¶
In [ ]:
Copied!
# 导入必要的库
import numpy as np
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
print("检查环境...")
print(f"NumPy版本: {np.__version__}")
print("\n环境准备完成!")
# 导入必要的库
import numpy as np
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
print("检查环境...")
print(f"NumPy版本: {np.__version__}")
print("\n环境准备完成!")
In [ ]:
Copied!
# RAG模式对比
rag_levels = [
{
"level": "Level 1: Naive RAG",
"features": ["一次检索", "一次生成"],
"use_case": "简单问答",
"module": "模块1"
},
{
"level": "Level 2: RAG + 优化",
"features": ["更好的嵌入模型", "高级分块", "查询增强", "混合检索+重排序"],
"use_case": "中等复杂度问答",
"module": "模块2"
},
{
"level": "Level 3: Advanced RAG",
"features": ["迭代检索", "自适应检索", "跳跃读取", "元数据过滤"],
"use_case": "复杂多跳问答",
"module": "本章"
},
{
"level": "Level 4: Agentic RAG",
"features": ["Agent自主决策", "工具调用", "多Agent协作"],
"use_case": "高度复杂任务",
"module": "模块3"
}
]
print("RAG模式演进路径:")
print("=" * 80)
for rag in rag_levels:
print(f"\n{rag['level']} ({rag['module']}):")
print(f" 特性: {', '.join(rag['features'])}")
print(f" 适用场景: {rag['use_case']}")
# RAG模式对比
rag_levels = [
{
"level": "Level 1: Naive RAG",
"features": ["一次检索", "一次生成"],
"use_case": "简单问答",
"module": "模块1"
},
{
"level": "Level 2: RAG + 优化",
"features": ["更好的嵌入模型", "高级分块", "查询增强", "混合检索+重排序"],
"use_case": "中等复杂度问答",
"module": "模块2"
},
{
"level": "Level 3: Advanced RAG",
"features": ["迭代检索", "自适应检索", "跳跃读取", "元数据过滤"],
"use_case": "复杂多跳问答",
"module": "本章"
},
{
"level": "Level 4: Agentic RAG",
"features": ["Agent自主决策", "工具调用", "多Agent协作"],
"use_case": "高度复杂任务",
"module": "模块3"
}
]
print("RAG模式演进路径:")
print("=" * 80)
for rag in rag_levels:
print(f"\n{rag['level']} ({rag['module']}):")
print(f" 特性: {', '.join(rag['features'])}")
print(f" 适用场景: {rag['use_case']}")
In [ ]:
Copied!
@dataclass
class Document:
"""
文档
"""
content: str
metadata: Dict[str, Any]
class IterativeRetriever:
"""
迭代检索器
"""
def __init__(self, max_iterations: int = 3):
self.max_iterations = max_iterations
self.knowledge_base = []
def add_documents(self, docs: List[Document]):
self.knowledge_base = docs
def retrieve(self, query: str, top_k: int = 2) -> List[Document]:
"""
简化的检索(关键词匹配)
"""
results = []
query_lower = query.lower()
for doc in self.knowledge_base:
# 简单的关键词匹配
score = sum(1 for word in query_lower.split() if word in doc.content.lower())
if score > 0:
results.append((doc, score))
# 排序并返回top-k
results.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in results[:top_k]]
class IterativeRAG:
"""
迭代RAG系统
"""
def __init__(self, retriever: IterativeRetriever):
self.retriever = retriever
def query(self, initial_query: str) -> Dict[str, Any]:
"""
迭代查询
"""
current_query = initial_query
all_retrieved = []
iterations = []
for i in range(self.retriever.max_iterations):
print(f"\n第{i+1}次迭代查询: {current_query}")
# 检索
docs = self.retriever.retrieve(current_query)
if not docs:
print("未找到相关文档,停止迭代")
break
all_retrieved.extend(docs)
iteration_info = {
"iteration": i + 1,
"query": current_query,
"docs": [d.content[:50] + "..." for d in docs]
}
iterations.append(iteration_info)
# 简化:生成下一个查询(基于第一个文档的关键信息)
if i < self.retriever.max_iterations - 1:
# 实际应用中应该使用LLM生成下一个查询
current_query = self._generate_next_query(current_query, docs)
if current_query is None:
print("已获得足够信息,停止迭代")
break
return {
"initial_query": initial_query,
"iterations": iterations,
"all_documents": all_retrieved,
"num_iterations": len(iterations)
}
def _generate_next_query(self, current_query: str, docs: List[Document]) -> Optional[str]:
"""
基于检索结果生成下一个查询
"""
# 简化实现:基于规则
# 实际应用中应该使用LLM
for doc in docs:
content = doc.content.lower()
# 如果找到更具体的实体,生成新查询
if "spacex" in content and "spacex" not in current_query.lower():
return "SpaceX最近一次发射"
if "openai" in content and "openai" not in current_query.lower():
return "OpenAI最新产品"
return None
# 测试迭代检索
# 创建知识库
docs = [
Document("SpaceX是埃隆·马斯克创立的太空探索技术公司", {"source": "doc1", "entity": "SpaceX"}),
Document("SpaceX于2024年1月成功发射了星舰系统", {"source": "doc2", "entity": "SpaceX"}),
Document("OpenAI是一家人工智能研究公司", {"source": "doc3", "entity": "OpenAI"}),
Document("OpenAI发布了GPT-4 Turbo模型", {"source": "doc4", "entity": "OpenAI"}),
]
retriever = IterativeRetriever(max_iterations=3)
retriever.add_documents(docs)
iterative_rag = IterativeRAG(retriever)
# 测试查询
query = "马斯克的火箭公司最近一次发射是什么时候?"
print("\n迭代检索示例:")
print("=" * 80)
print(f"\n初始查询: {query}")
result = iterative_rag.query(query)
print(f"\n\n检索结果汇总:")
print(f"-" * 60)
print(f"迭代次数: {result['num_iterations']}")
print(f"检索到的文档数: {len(result['all_documents'])}")
@dataclass
class Document:
"""
文档
"""
content: str
metadata: Dict[str, Any]
class IterativeRetriever:
"""
迭代检索器
"""
def __init__(self, max_iterations: int = 3):
self.max_iterations = max_iterations
self.knowledge_base = []
def add_documents(self, docs: List[Document]):
self.knowledge_base = docs
def retrieve(self, query: str, top_k: int = 2) -> List[Document]:
"""
简化的检索(关键词匹配)
"""
results = []
query_lower = query.lower()
for doc in self.knowledge_base:
# 简单的关键词匹配
score = sum(1 for word in query_lower.split() if word in doc.content.lower())
if score > 0:
results.append((doc, score))
# 排序并返回top-k
results.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in results[:top_k]]
class IterativeRAG:
"""
迭代RAG系统
"""
def __init__(self, retriever: IterativeRetriever):
self.retriever = retriever
def query(self, initial_query: str) -> Dict[str, Any]:
"""
迭代查询
"""
current_query = initial_query
all_retrieved = []
iterations = []
for i in range(self.retriever.max_iterations):
print(f"\n第{i+1}次迭代查询: {current_query}")
# 检索
docs = self.retriever.retrieve(current_query)
if not docs:
print("未找到相关文档,停止迭代")
break
all_retrieved.extend(docs)
iteration_info = {
"iteration": i + 1,
"query": current_query,
"docs": [d.content[:50] + "..." for d in docs]
}
iterations.append(iteration_info)
# 简化:生成下一个查询(基于第一个文档的关键信息)
if i < self.retriever.max_iterations - 1:
# 实际应用中应该使用LLM生成下一个查询
current_query = self._generate_next_query(current_query, docs)
if current_query is None:
print("已获得足够信息,停止迭代")
break
return {
"initial_query": initial_query,
"iterations": iterations,
"all_documents": all_retrieved,
"num_iterations": len(iterations)
}
def _generate_next_query(self, current_query: str, docs: List[Document]) -> Optional[str]:
"""
基于检索结果生成下一个查询
"""
# 简化实现:基于规则
# 实际应用中应该使用LLM
for doc in docs:
content = doc.content.lower()
# 如果找到更具体的实体,生成新查询
if "spacex" in content and "spacex" not in current_query.lower():
return "SpaceX最近一次发射"
if "openai" in content and "openai" not in current_query.lower():
return "OpenAI最新产品"
return None
# 测试迭代检索
# 创建知识库
docs = [
Document("SpaceX是埃隆·马斯克创立的太空探索技术公司", {"source": "doc1", "entity": "SpaceX"}),
Document("SpaceX于2024年1月成功发射了星舰系统", {"source": "doc2", "entity": "SpaceX"}),
Document("OpenAI是一家人工智能研究公司", {"source": "doc3", "entity": "OpenAI"}),
Document("OpenAI发布了GPT-4 Turbo模型", {"source": "doc4", "entity": "OpenAI"}),
]
retriever = IterativeRetriever(max_iterations=3)
retriever.add_documents(docs)
iterative_rag = IterativeRAG(retriever)
# 测试查询
query = "马斯克的火箭公司最近一次发射是什么时候?"
print("\n迭代检索示例:")
print("=" * 80)
print(f"\n初始查询: {query}")
result = iterative_rag.query(query)
print(f"\n\n检索结果汇总:")
print(f"-" * 60)
print(f"迭代次数: {result['num_iterations']}")
print(f"检索到的文档数: {len(result['all_documents'])}")
In [ ]:
Copied!
class AdaptiveRAG:
"""
自适应RAG系统
"""
def __init__(self, retriever: IterativeRetriever):
self.retriever = retriever
def assess_complexity(self, query: str) -> str:
"""
评估查询复杂度
Returns:
'simple', 'medium', 或 'complex'
"""
# 简化实现:基于规则
# 复杂查询的特征
complex_indicators = [
"比较", "对比", "差异", # 比较类
"为什么", "原因", "如何" + "和", # 多跳
"最近", "最新", "最新一次", # 时序
]
complexity_score = sum(1 for indicator in complex_indicators if indicator in query)
if complexity_score >= 2:
return "complex"
elif complexity_score == 1:
return "medium"
else:
return "simple"
def query(self, query: str) -> Dict[str, Any]:
"""
自适应查询
"""
complexity = self.assess_complexity(query)
print(f"\n查询复杂度评估: {complexity}")
# 根据复杂度选择策略
if complexity == "simple":
top_k = 2
use_reranking = False
elif complexity == "medium":
top_k = 4
use_reranking = True
else: # complex
top_k = 6
use_reranking = True
# 对于复杂查询,可能需要迭代检索
result = self._complex_query(query)
return result
# 执行检索
docs = self.retriever.retrieve(query, top_k=top_k)
return {
"query": query,
"complexity": complexity,
"top_k": top_k,
"use_reranking": use_reranking,
"retrieved_docs": [d.content for d in docs],
"num_docs": len(docs)
}
def _complex_query(self, query: str) -> Dict[str, Any]:
"""
处理复杂查询(使用迭代检索)
"""
iterative_rag = IterativeRAG(self.retriever)
result = iterative_rag.query(query)
result["complexity"] = "complex"
result["strategy"] = "iterative"
return result
# 测试自适应检索
adaptive_rag = AdaptiveRAG(retriever)
# 测试不同复杂度的查询
test_queries = [
"什么是SpaceX", # simple
"SpaceX最近一次发射", # medium
"比较SpaceX和Blue Origin的差异", # complex
]
print("\n自适应检索示例:")
print("=" * 80)
for query in test_queries:
print(f"\n查询: {query}")
print("-" * 60)
result = adaptive_rag.query(query)
if result.get("strategy") == "iterative":
print(f"策略: 迭代检索")
print(f"迭代次数: {result['num_iterations']}")
else:
print(f"策略: 单次检索")
print(f"检索参数: top_k={result['top_k']}, reranking={result['use_reranking']}")
print(f"检索文档数: {result['num_docs']}")
class AdaptiveRAG:
"""
自适应RAG系统
"""
def __init__(self, retriever: IterativeRetriever):
self.retriever = retriever
def assess_complexity(self, query: str) -> str:
"""
评估查询复杂度
Returns:
'simple', 'medium', 或 'complex'
"""
# 简化实现:基于规则
# 复杂查询的特征
complex_indicators = [
"比较", "对比", "差异", # 比较类
"为什么", "原因", "如何" + "和", # 多跳
"最近", "最新", "最新一次", # 时序
]
complexity_score = sum(1 for indicator in complex_indicators if indicator in query)
if complexity_score >= 2:
return "complex"
elif complexity_score == 1:
return "medium"
else:
return "simple"
def query(self, query: str) -> Dict[str, Any]:
"""
自适应查询
"""
complexity = self.assess_complexity(query)
print(f"\n查询复杂度评估: {complexity}")
# 根据复杂度选择策略
if complexity == "simple":
top_k = 2
use_reranking = False
elif complexity == "medium":
top_k = 4
use_reranking = True
else: # complex
top_k = 6
use_reranking = True
# 对于复杂查询,可能需要迭代检索
result = self._complex_query(query)
return result
# 执行检索
docs = self.retriever.retrieve(query, top_k=top_k)
return {
"query": query,
"complexity": complexity,
"top_k": top_k,
"use_reranking": use_reranking,
"retrieved_docs": [d.content for d in docs],
"num_docs": len(docs)
}
def _complex_query(self, query: str) -> Dict[str, Any]:
"""
处理复杂查询(使用迭代检索)
"""
iterative_rag = IterativeRAG(self.retriever)
result = iterative_rag.query(query)
result["complexity"] = "complex"
result["strategy"] = "iterative"
return result
# 测试自适应检索
adaptive_rag = AdaptiveRAG(retriever)
# 测试不同复杂度的查询
test_queries = [
"什么是SpaceX", # simple
"SpaceX最近一次发射", # medium
"比较SpaceX和Blue Origin的差异", # complex
]
print("\n自适应检索示例:")
print("=" * 80)
for query in test_queries:
print(f"\n查询: {query}")
print("-" * 60)
result = adaptive_rag.query(query)
if result.get("strategy") == "iterative":
print(f"策略: 迭代检索")
print(f"迭代次数: {result['num_iterations']}")
else:
print(f"策略: 单次检索")
print(f"检索参数: top_k={result['top_k']}, reranking={result['use_reranking']}")
print(f"检索文档数: {result['num_docs']}")
In [ ]:
Copied!
class MetadataFilteredRetriever:
"""
支持元数据过滤的检索器
"""
def __init__(self):
self.documents = []
def add_documents(self, docs: List[Document]):
self.documents = docs
def retrieve(
self,
query: str,
top_k: int = 3,
filters: Dict[str, Any] = None
) -> List[Document]:
"""
带元数据过滤的检索
Args:
query: 查询
top_k: 返回数量
filters: 元数据过滤条件
"""
# 首先应用元数据过滤
filtered_docs = self.documents
if filters:
filtered_docs = [
doc for doc in self.documents
if all(
doc.metadata.get(key) == value
for key, value in filters.items()
)
]
# 然后进行语义检索
query_lower = query.lower()
results = []
for doc in filtered_docs:
score = sum(1 for word in query_lower.split() if word in doc.content.lower())
if score > 0:
results.append((doc, score))
# 排序
results.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in results[:top_k]]
# 测试元数据过滤
# 创建带元数据的文档
docs_with_metadata = [
Document("Python是一种编程语言", {"category": "programming", "year": 2023}),
Document("Python 3.12发布了新特性", {"category": "programming", "year": 2024}),
Document("JavaScript也是一种编程语言", {"category": "programming", "year": 2023}),
Document("AI技术正在快速发展", {"category": "ai", "year": 2024}),
]
filtered_retriever = MetadataFilteredRetriever()
filtered_retriever.add_documents(docs_with_metadata)
print("\n元数据过滤示例:")
print("=" * 80)
# 测试查询
query = "Python"
# 无过滤
print(f"\n查询: {query} (无过滤)")
results = filtered_retriever.retrieve(query, top_k=3)
for doc in results:
print(f" - {doc.content} (年份: {doc.metadata.get('year']})")
# 有过滤:只获取2024年的
print(f"\n查询: {query} (年份=2024)")
results = filtered_retriever.retrieve(query, top_k=3, filters={"year": 2024})
for doc in results:
print(f" - {doc.content} (年份: {doc.metadata.get('year']})")
# 有过滤:只获取AI类别的
print(f"\n查询: {query} (类别=ai)")
results = filtered_retriever.retrieve(query, top_k=3, filters={"category": "ai"})
for doc in results:
print(f" - {doc.content} (类别: {doc.metadata.get('category'])")
if not results:
print(" (无结果)")
class MetadataFilteredRetriever:
"""
支持元数据过滤的检索器
"""
def __init__(self):
self.documents = []
def add_documents(self, docs: List[Document]):
self.documents = docs
def retrieve(
self,
query: str,
top_k: int = 3,
filters: Dict[str, Any] = None
) -> List[Document]:
"""
带元数据过滤的检索
Args:
query: 查询
top_k: 返回数量
filters: 元数据过滤条件
"""
# 首先应用元数据过滤
filtered_docs = self.documents
if filters:
filtered_docs = [
doc for doc in self.documents
if all(
doc.metadata.get(key) == value
for key, value in filters.items()
)
]
# 然后进行语义检索
query_lower = query.lower()
results = []
for doc in filtered_docs:
score = sum(1 for word in query_lower.split() if word in doc.content.lower())
if score > 0:
results.append((doc, score))
# 排序
results.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in results[:top_k]]
# 测试元数据过滤
# 创建带元数据的文档
docs_with_metadata = [
Document("Python是一种编程语言", {"category": "programming", "year": 2023}),
Document("Python 3.12发布了新特性", {"category": "programming", "year": 2024}),
Document("JavaScript也是一种编程语言", {"category": "programming", "year": 2023}),
Document("AI技术正在快速发展", {"category": "ai", "year": 2024}),
]
filtered_retriever = MetadataFilteredRetriever()
filtered_retriever.add_documents(docs_with_metadata)
print("\n元数据过滤示例:")
print("=" * 80)
# 测试查询
query = "Python"
# 无过滤
print(f"\n查询: {query} (无过滤)")
results = filtered_retriever.retrieve(query, top_k=3)
for doc in results:
print(f" - {doc.content} (年份: {doc.metadata.get('year']})")
# 有过滤:只获取2024年的
print(f"\n查询: {query} (年份=2024)")
results = filtered_retriever.retrieve(query, top_k=3, filters={"year": 2024})
for doc in results:
print(f" - {doc.content} (年份: {doc.metadata.get('year']})")
# 有过滤:只获取AI类别的
print(f"\n查询: {query} (类别=ai)")
results = filtered_retriever.retrieve(query, top_k=3, filters={"category": "ai"})
for doc in results:
print(f" - {doc.content} (类别: {doc.metadata.get('category'])")
if not results:
print(" (无结果)")