1. 环境准备¶
In [ ]:
Copied!
# 导入必要的库
import os
from pathlib import Path
from typing import List, Dict, Any
# 检查环境
print("检查环境...")
print(f"Python版本: {os.sys.version}")
# 模拟LlamaIndex和ChromaDB(如果未安装)
try:
import llama_index
import chromadb
print("✅ LlamaIndex和ChromaDB已安装")
except ImportError:
print("⚠️ 部分库未安装,将使用模拟数据演示")
print(" 请运行: pip install llama-index-core chromadb")
# 创建示例数据目录
DATA_DIR = Path("./data/sample_documents")
DATA_DIR.mkdir(parents=True, exist_ok=True)
print(f"\n数据目录: {DATA_DIR.absolute()}")
# 导入必要的库
import os
from pathlib import Path
from typing import List, Dict, Any
# 检查环境
print("检查环境...")
print(f"Python版本: {os.sys.version}")
# 模拟LlamaIndex和ChromaDB(如果未安装)
try:
import llama_index
import chromadb
print("✅ LlamaIndex和ChromaDB已安装")
except ImportError:
print("⚠️ 部分库未安装,将使用模拟数据演示")
print(" 请运行: pip install llama-index-core chromadb")
# 创建示例数据目录
DATA_DIR = Path("./data/sample_documents")
DATA_DIR.mkdir(parents=True, exist_ok=True)
print(f"\n数据目录: {DATA_DIR.absolute()}")
In [ ]:
Copied!
# 创建示例文档
sample_docs = {
"doc1.txt": """
Python是一种高级编程语言
Python由Guido van Rossum于1991年创建。
它的设计哲学强调代码的可读性。
Python广泛应用于Web开发、数据科学和人工智能。
""",
"doc2.txt": """
RAG技术详解
RAG(Retrieval-Augmented Generation)是一种AI技术。
它结合了信息检索和生成模型。
RAG可以减少LLM的幻觉问题,提高答案准确性。
""",
"doc3.txt": """
向量数据库指南
向量数据库专门用于存储和检索高维向量。
常见的向量数据库包括Chroma、Pinecone和Milvus。
它们使用HNSW等算法实现快速近似搜索。
"""
}
# 写入文件
for filename, content in sample_docs.items():
file_path = DATA_DIR / filename
file_path.write_text(content, encoding='utf-8')
print(f"✅ 创建: {filename}")
print(f"\n总共创建了 {len(sample_docs)} 个示例文档")
# 创建示例文档
sample_docs = {
"doc1.txt": """
Python是一种高级编程语言
Python由Guido van Rossum于1991年创建。
它的设计哲学强调代码的可读性。
Python广泛应用于Web开发、数据科学和人工智能。
""",
"doc2.txt": """
RAG技术详解
RAG(Retrieval-Augmented Generation)是一种AI技术。
它结合了信息检索和生成模型。
RAG可以减少LLM的幻觉问题,提高答案准确性。
""",
"doc3.txt": """
向量数据库指南
向量数据库专门用于存储和检索高维向量。
常见的向量数据库包括Chroma、Pinecone和Milvus。
它们使用HNSW等算法实现快速近似搜索。
"""
}
# 写入文件
for filename, content in sample_docs.items():
file_path = DATA_DIR / filename
file_path.write_text(content, encoding='utf-8')
print(f"✅ 创建: {filename}")
print(f"\n总共创建了 {len(sample_docs)} 个示例文档")
2.2 文档加载器¶
In [ ]:
Copied!
class SimpleDocumentLoader:
"""
简单的文档加载器
"""
def __init__(self, directory: Path):
self.directory = directory
def load(self) -> List[Dict[str, Any]]:
"""
加载目录中的所有文本文档
Returns:
文档列表,每个文档包含content和metadata
"""
documents = []
# 支持的文件扩展名
supported_extensions = ['.txt', '.md']
# 遍历目录
for file_path in self.directory.iterdir():
if file_path.is_file() and file_path.suffix in supported_extensions:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
documents.append({
'content': content,
'metadata': {
'source': str(file_path),
'filename': file_path.name
}
})
return documents
# 加载文档
loader = SimpleDocumentLoader(DATA_DIR)
documents = loader.load()
print(f"成功加载 {len(documents)} 个文档")
for i, doc in enumerate(documents, 1):
print(f"\n文档 {i}:")
print(f" 来源: {doc['metadata']['source']}")
print(f" 预览: {doc['content'][:50]}...")
class SimpleDocumentLoader:
"""
简单的文档加载器
"""
def __init__(self, directory: Path):
self.directory = directory
def load(self) -> List[Dict[str, Any]]:
"""
加载目录中的所有文本文档
Returns:
文档列表,每个文档包含content和metadata
"""
documents = []
# 支持的文件扩展名
supported_extensions = ['.txt', '.md']
# 遍历目录
for file_path in self.directory.iterdir():
if file_path.is_file() and file_path.suffix in supported_extensions:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
documents.append({
'content': content,
'metadata': {
'source': str(file_path),
'filename': file_path.name
}
})
return documents
# 加载文档
loader = SimpleDocumentLoader(DATA_DIR)
documents = loader.load()
print(f"成功加载 {len(documents)} 个文档")
for i, doc in enumerate(documents, 1):
print(f"\n文档 {i}:")
print(f" 来源: {doc['metadata']['source']}")
print(f" 预览: {doc['content'][:50]}...")
In [ ]:
Copied!
class TextSplitter:
"""
文本分块器
"""
def __init__(self, chunk_size: int = 100, chunk_overlap: int = 20):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split(self, text: str) -> List[str]:
"""
将文本切分成块
Args:
text: 输入文本
Returns:
文本块列表
"""
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
chunk = text[start:end]
if chunk:
chunks.append(chunk.strip())
# 移动窗口,保留overlap
start = end - self.chunk_overlap
return chunks
# 测试分块器
splitter = TextSplitter(chunk_size=150, chunk_overlap=30)
print("测试文本分块:\n")
test_text = documents[0]['content']
chunks = splitter.split(test_text)
print(f"原文长度: {len(test_text)} 字符")
print(f"分块数量: {len(chunks)} 块\n")
for i, chunk in enumerate(chunks, 1):
print(f"块 {i}: {chunk}\n")
class TextSplitter:
"""
文本分块器
"""
def __init__(self, chunk_size: int = 100, chunk_overlap: int = 20):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split(self, text: str) -> List[str]:
"""
将文本切分成块
Args:
text: 输入文本
Returns:
文本块列表
"""
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
chunk = text[start:end]
if chunk:
chunks.append(chunk.strip())
# 移动窗口,保留overlap
start = end - self.chunk_overlap
return chunks
# 测试分块器
splitter = TextSplitter(chunk_size=150, chunk_overlap=30)
print("测试文本分块:\n")
test_text = documents[0]['content']
chunks = splitter.split(test_text)
print(f"原文长度: {len(test_text)} 字符")
print(f"分块数量: {len(chunks)} 块\n")
for i, chunk in enumerate(chunks, 1):
print(f"块 {i}: {chunk}\n")
In [ ]:
Copied!
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class SimpleVectorStore:
"""
简化的向量存储(使用TF-IDF)
"""
def __init__(self):
self.vectorizer = TfidfVectorizer()
self.vectors = None
self.documents = []
def add_documents(self, docs: List[Dict[str, Any]]):
"""
添加文档到向量存储
"""
texts = [doc['content'] for doc in docs]
self.documents = docs
self.vectors = self.vectorizer.fit_transform(texts)
def search(self, query: str, top_k: int = 3) -> List[Dict]:
"""
搜索相关文档
"""
if self.vectors is None:
return []
# 将查询转换为向量
query_vector = self.vectorizer.transform([query])
# 计算相似度
similarities = cosine_similarity(query_vector, self.vectors)[0]
# 排序并返回top_k
top_indices = np.argsort(similarities)[::-1][:top_k]
results = []
for idx in top_indices:
results.append({
'document': self.documents[idx]['content'],
'metadata': self.documents[idx]['metadata'],
'score': float(similarities[idx])
})
return results
# 创建向量存储并添加文档
vector_store = SimpleVectorStore()
vector_store.add_documents(documents)
print("向量索引创建完成!")
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class SimpleVectorStore:
"""
简化的向量存储(使用TF-IDF)
"""
def __init__(self):
self.vectorizer = TfidfVectorizer()
self.vectors = None
self.documents = []
def add_documents(self, docs: List[Dict[str, Any]]):
"""
添加文档到向量存储
"""
texts = [doc['content'] for doc in docs]
self.documents = docs
self.vectors = self.vectorizer.fit_transform(texts)
def search(self, query: str, top_k: int = 3) -> List[Dict]:
"""
搜索相关文档
"""
if self.vectors is None:
return []
# 将查询转换为向量
query_vector = self.vectorizer.transform([query])
# 计算相似度
similarities = cosine_similarity(query_vector, self.vectors)[0]
# 排序并返回top_k
top_indices = np.argsort(similarities)[::-1][:top_k]
results = []
for idx in top_indices:
results.append({
'document': self.documents[idx]['content'],
'metadata': self.documents[idx]['metadata'],
'score': float(similarities[idx])
})
return results
# 创建向量存储并添加文档
vector_store = SimpleVectorStore()
vector_store.add_documents(documents)
print("向量索引创建完成!")
4.2 测试检索¶
In [ ]:
Copied!
# 测试检索
test_queries = [
"Python是什么?",
"RAG技术的优势",
"向量数据库"
]
for query in test_queries:
print(f"\n{'='*60}")
print(f"查询: {query}")
print(f"{'='*60}")
results = vector_store.search(query, top_k=2)
if results:
for i, result in enumerate(results, 1):
print(f"\n结果 {i} (相关度: {result['score']:.3f}):")
print(f"{result['document'][:100]}...")
else:
print("未找到相关文档")
# 测试检索
test_queries = [
"Python是什么?",
"RAG技术的优势",
"向量数据库"
]
for query in test_queries:
print(f"\n{'='*60}")
print(f"查询: {query}")
print(f"{'='*60}")
results = vector_store.search(query, top_k=2)
if results:
for i, result in enumerate(results, 1):
print(f"\n结果 {i} (相关度: {result['score']:.3f}):")
print(f"{result['document'][:100]}...")
else:
print("未找到相关文档")
5. 完整的RAG查询系统¶
In [ ]:
Copied!
class SimpleRAG:
"""
简单的RAG系统
"""
def __init__(self, documents: List[Dict[str, Any]]):
self.vector_store = SimpleVectorStore()
self.vector_store.add_documents(documents)
# 模拟LLM响应
self.llm_prompt_template = """
基于以下文档回答问题:
{context}
问题: {question}
答案:
"""
def query(self, question: str, top_k: int = 2) -> Dict:
"""
RAG查询
Args:
question: 用户问题
top_k: 检索文档数量
Returns:
包含答案和来源的字典
"""
# 步骤1:检索
print(f"\n🔍 步骤1: 检索相关文档")
retrieved_docs = self.vector_store.search(question, top_k=top_k)
if not retrieved_docs:
return {
'question': question,
'answer': '抱歉,知识库中没有找到相关信息。',
'sources': []
}
print(f"找到 {len(retrieved_docs)} 个相关文档")
# 步骤2:构建上下文
print(f"\n📝 步骤2: 构建上下文")
context = "\n\n".join([
f"文档{i+1}: {doc['document'][:100]}..."
for i, doc in enumerate(retrieved_docs)
])
# 步骤3:生成答案
print(f"\n🤖 步骤3: 生成答案")
answer = self._generate_answer(question, context)
return {
'question': question,
'answer': answer,
'sources': [
{
'file': doc['metadata']['filename'],
'score': doc['score']
}
for doc in retrieved_docs
]
}
def _generate_answer(self, question: str, context: str) -> str:
"""
生成答案(简化版)
"""
# 简化实现:基于规则生成答案
if "Python" in question and "Python" in context:
return "根据文档,Python是一种高级编程语言,由Guido van Rossum于1991年创建,广泛应用于Web开发、数据科学和人工智能。"
elif "RAG" in question:
return "RAG(Retrieval-Augmented Generation)结合了信息检索和生成模型,可以减少LLM的幻觉问题,提高答案准确性。"
else:
return f"根据检索到的信息,{context[:100]}..."
# 创建RAG系统
rag_system = SimpleRAG(documents)
print("RAG系统初始化完成!")
class SimpleRAG:
"""
简单的RAG系统
"""
def __init__(self, documents: List[Dict[str, Any]]):
self.vector_store = SimpleVectorStore()
self.vector_store.add_documents(documents)
# 模拟LLM响应
self.llm_prompt_template = """
基于以下文档回答问题:
{context}
问题: {question}
答案:
"""
def query(self, question: str, top_k: int = 2) -> Dict:
"""
RAG查询
Args:
question: 用户问题
top_k: 检索文档数量
Returns:
包含答案和来源的字典
"""
# 步骤1:检索
print(f"\n🔍 步骤1: 检索相关文档")
retrieved_docs = self.vector_store.search(question, top_k=top_k)
if not retrieved_docs:
return {
'question': question,
'answer': '抱歉,知识库中没有找到相关信息。',
'sources': []
}
print(f"找到 {len(retrieved_docs)} 个相关文档")
# 步骤2:构建上下文
print(f"\n📝 步骤2: 构建上下文")
context = "\n\n".join([
f"文档{i+1}: {doc['document'][:100]}..."
for i, doc in enumerate(retrieved_docs)
])
# 步骤3:生成答案
print(f"\n🤖 步骤3: 生成答案")
answer = self._generate_answer(question, context)
return {
'question': question,
'answer': answer,
'sources': [
{
'file': doc['metadata']['filename'],
'score': doc['score']
}
for doc in retrieved_docs
]
}
def _generate_answer(self, question: str, context: str) -> str:
"""
生成答案(简化版)
"""
# 简化实现:基于规则生成答案
if "Python" in question and "Python" in context:
return "根据文档,Python是一种高级编程语言,由Guido van Rossum于1991年创建,广泛应用于Web开发、数据科学和人工智能。"
elif "RAG" in question:
return "RAG(Retrieval-Augmented Generation)结合了信息检索和生成模型,可以减少LLM的幻觉问题,提高答案准确性。"
else:
return f"根据检索到的信息,{context[:100]}..."
# 创建RAG系统
rag_system = SimpleRAG(documents)
print("RAG系统初始化完成!")
6. 测试RAG系统¶
In [ ]:
Copied!
# 测试RAG系统
test_questions = [
"Python有哪些特点?",
"什么是RAG技术?",
"如何使用向量数据库?"
]
for q in test_questions:
result = rag_system.query(q)
print(f"\n{'='*60}")
print(f"问题: {result['question']}")
print(f"\n答案: {result['answer']}")
print(f"\n来源: {len(result['sources'])} 个文档")
for source in result['sources']:
print(f" - {source['file']} (相关度: {source['score']:.3f})")
print()
print()
print("-"*40)
print()
print("-"*40)
print()
print("-"*40)
print()
print("-"*40)
print()
# 测试RAG系统
test_questions = [
"Python有哪些特点?",
"什么是RAG技术?",
"如何使用向量数据库?"
]
for q in test_questions:
result = rag_system.query(q)
print(f"\n{'='*60}")
print(f"问题: {result['question']}")
print(f"\n答案: {result['answer']}")
print(f"\n来源: {len(result['sources'])} 个文档")
for source in result['sources']:
print(f" - {source['file']} (相关度: {source['score']:.3f})")
print()
print()
print("-"*40)
print()
print("-"*40)
print()
print("-"*40)
print()
print("-"*40)
print()
In [ ]:
Copied!
# TODO: 实现改进的分块器
class SmartTextSplitter(TextSplitter):
def split_by_sentence(self, text: str) -> List[str]:
# 按句子分割
# TODO: 实现代码
pass
# 测试新的分块器
# smart_splitter = SmartTextSplitter()
# chunks = smart_splitter.split_by_sentence(test_text)
# TODO: 实现改进的分块器
class SmartTextSplitter(TextSplitter):
def split_by_sentence(self, text: str) -> List[str]:
# 按句子分割
# TODO: 实现代码
pass
# 测试新的分块器
# smart_splitter = SmartTextSplitter()
# chunks = smart_splitter.split_by_sentence(test_text)
练习2:添加元数据¶
为文档添加更多元数据(如类别、日期、作者),并在检索时支持元数据过滤。
In [ ]:
Copied!
# TODO: 添加元数据支持
class DocumentWithMetadata:
def __init__(self, content: str, category: str, author: str):
self.content = content
self.category = category
self.author = author
# ... 实现代码
# TODO: 添加元数据支持
class DocumentWithMetadata:
def __init__(self, content: str, category: str, author: str):
self.content = content
self.category = category
self.author = author
# ... 实现代码