混合检索与重排序实验¶
本notebook演示混合检索(向量+BM25)和CrossEncoder重排序的完整实现。
1. 环境准备¶
In [ ]:
Copied!
# 安装依赖
!pip install sentence-transformers rank-bm25 chromadb
import numpy as np
from sentence_transformers import SentenceTransformer, CrossEncoder
from rank_bm25 import BM25Okapi
import chromadb
from chromadb.config import Settings
print('环境准备完成!')
# 安装依赖
!pip install sentence-transformers rank-bm25 chromadb
import numpy as np
from sentence_transformers import SentenceTransformer, CrossEncoder
from rank_bm25 import BM25Okapi
import chromadb
from chromadb.config import Settings
print('环境准备完成!')
2. 准备测试数据¶
In [ ]:
Copied!
# 示例文档
documents = [
"Python是一种高级编程语言,以其简洁的语法和强大的功能著称。Guido van Rossum于1991年创建了Python。",
"JavaScript是Web开发的标配语言,主要用于前端开发。Brendan Eich在1995年创造了JavaScript。",
"Python性能优化可以通过使用PyPy、Cython或优化算法实现。PyPy是一个JIT编译器。",
"JavaScript的性能优化包括减少DOM操作、使用事件委托等技术。V8引擎是Chrome的JS引擎。",
"Python的全局解释器锁(GIL)限制了多线程性能,但multiprocessing模块提供了替代方案。",
"V8引擎使得JavaScript执行速度大幅提升,接近编译型语言。它使用了JIT编译技术。",
"Python拥有丰富的库生态系统,包括NumPy、Pandas等数据分析工具。",
"Node.js使得JavaScript可以用于后端开发,实现全栈JavaScript。",
"Python的装饰器是一个强大的特性,可以用于AOP编程。",
"JavaScript的闭包特性使得函数可以访问其定义时的作用域。"
]
doc_ids = [f'doc_{i}' for i in range(len(documents))]
queries = [
"如何提升Python代码的性能?",
"JavaScript的V8引擎是什么?",
"Python和JavaScript在Web开发中的差异"
]
print(f'文档数: {len(documents)}')
print(f'查询数: {len(queries)}')
# 示例文档
documents = [
"Python是一种高级编程语言,以其简洁的语法和强大的功能著称。Guido van Rossum于1991年创建了Python。",
"JavaScript是Web开发的标配语言,主要用于前端开发。Brendan Eich在1995年创造了JavaScript。",
"Python性能优化可以通过使用PyPy、Cython或优化算法实现。PyPy是一个JIT编译器。",
"JavaScript的性能优化包括减少DOM操作、使用事件委托等技术。V8引擎是Chrome的JS引擎。",
"Python的全局解释器锁(GIL)限制了多线程性能,但multiprocessing模块提供了替代方案。",
"V8引擎使得JavaScript执行速度大幅提升,接近编译型语言。它使用了JIT编译技术。",
"Python拥有丰富的库生态系统,包括NumPy、Pandas等数据分析工具。",
"Node.js使得JavaScript可以用于后端开发,实现全栈JavaScript。",
"Python的装饰器是一个强大的特性,可以用于AOP编程。",
"JavaScript的闭包特性使得函数可以访问其定义时的作用域。"
]
doc_ids = [f'doc_{i}' for i in range(len(documents))]
queries = [
"如何提升Python代码的性能?",
"JavaScript的V8引擎是什么?",
"Python和JavaScript在Web开发中的差异"
]
print(f'文档数: {len(documents)}')
print(f'查询数: {len(queries)}')
3. 初始化模型¶
In [ ]:
Copied!
# 加载嵌入模型
embedding_model = SentenceTransformer('BAAI/bge-small-en-v1.5')
# 加载重排序模型
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
print('模型加载完成!')
print(f'嵌入模型: {embedding_model.__class__.__name__}')
print(f'重排序模型: {reranker.__class__.__name__}')
# 加载嵌入模型
embedding_model = SentenceTransformer('BAAI/bge-small-en-v1.5')
# 加载重排序模型
reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
print('模型加载完成!')
print(f'嵌入模型: {embedding_model.__class__.__name__}')
print(f'重排序模型: {reranker.__class__.__name__}')
4. 实现混合检索¶
In [ ]:
Copied!
class HybridRetriever:
"""混合检索器"""
def __init__(self, documents, doc_ids):
# 初始化向量数据库
self.chroma_client = chromadb.Client(Settings())
self.collection = self.chroma_client.get_or_create_collection(name="test_docs")
# 嵌入并存储文档
embeddings = embedding_model.encode(documents).tolist()
self.collection.add(
embeddings=embeddings,
documents=documents,
ids=doc_ids
)
# 构建BM25索引
self.documents = documents
self.doc_ids = doc_ids
tokenized_docs = [doc.split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
def vector_retrieve(self, query, top_k=5):
"""向量检索"""
query_embedding = embedding_model.encode([query]).tolist()
results = self.collection.query(
query_embeddings=query_embedding,
n_results=top_k
)
return list(zip(results['ids'][0], results['distances'][0]))
def bm25_retrieve(self, query, top_k=5):
"""BM25检索"""
tokenized_query = query.split()
scores = self.bm25.get_scores(tokenized_query)
top_indices = np.argsort(scores)[::-1][:top_k]
return [(self.doc_ids[idx], scores[idx]) for idx in top_indices if scores[idx] > 0]
def rrf_fusion(self, vector_results, bm25_results, k=60):
"""RRF融合"""
rrf_scores = {}
# 向量检索贡献
for rank, (doc_id, _) in enumerate(vector_results, start=1):
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1.0 / (k + rank)
# BM25检索贡献
for rank, (doc_id, _) in enumerate(bm25_results, start=1):
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1.0 / (k + rank)
# 排序
fused = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
return fused
# 创建检索器
retriever = HybridRetriever(documents, doc_ids)
print('检索器初始化完成!')
class HybridRetriever:
"""混合检索器"""
def __init__(self, documents, doc_ids):
# 初始化向量数据库
self.chroma_client = chromadb.Client(Settings())
self.collection = self.chroma_client.get_or_create_collection(name="test_docs")
# 嵌入并存储文档
embeddings = embedding_model.encode(documents).tolist()
self.collection.add(
embeddings=embeddings,
documents=documents,
ids=doc_ids
)
# 构建BM25索引
self.documents = documents
self.doc_ids = doc_ids
tokenized_docs = [doc.split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_docs)
def vector_retrieve(self, query, top_k=5):
"""向量检索"""
query_embedding = embedding_model.encode([query]).tolist()
results = self.collection.query(
query_embeddings=query_embedding,
n_results=top_k
)
return list(zip(results['ids'][0], results['distances'][0]))
def bm25_retrieve(self, query, top_k=5):
"""BM25检索"""
tokenized_query = query.split()
scores = self.bm25.get_scores(tokenized_query)
top_indices = np.argsort(scores)[::-1][:top_k]
return [(self.doc_ids[idx], scores[idx]) for idx in top_indices if scores[idx] > 0]
def rrf_fusion(self, vector_results, bm25_results, k=60):
"""RRF融合"""
rrf_scores = {}
# 向量检索贡献
for rank, (doc_id, _) in enumerate(vector_results, start=1):
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1.0 / (k + rank)
# BM25检索贡献
for rank, (doc_id, _) in enumerate(bm25_results, start=1):
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + 1.0 / (k + rank)
# 排序
fused = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
return fused
# 创建检索器
retriever = HybridRetriever(documents, doc_ids)
print('检索器初始化完成!')
5. 测试混合检索¶
In [ ]:
Copied!
# 测试查询
query = queries[0]
print(f'查询: {query}\n')
# 向量检索
print('=== 向量检索 ===')
vector_results = retriever.vector_retrieve(query, top_k=5)
for doc_id, score in vector_results:
idx = int(doc_id.split('_')[1])
print(f'{doc_id}: {score:.4f} - {documents[idx][:50]}...')
# BM25检索
print('\n=== BM25检索 ===')
bm25_results = retriever.bm25_retrieve(query, top_k=5)
for doc_id, score in bm25_results:
idx = int(doc_id.split('_')[1])
print(f'{doc_id}: {score:.4f} - {documents[idx][:50]}...')
# 测试查询
query = queries[0]
print(f'查询: {query}\n')
# 向量检索
print('=== 向量检索 ===')
vector_results = retriever.vector_retrieve(query, top_k=5)
for doc_id, score in vector_results:
idx = int(doc_id.split('_')[1])
print(f'{doc_id}: {score:.4f} - {documents[idx][:50]}...')
# BM25检索
print('\n=== BM25检索 ===')
bm25_results = retriever.bm25_retrieve(query, top_k=5)
for doc_id, score in bm25_results:
idx = int(doc_id.split('_')[1])
print(f'{doc_id}: {score:.4f} - {documents[idx][:50]}...')
In [ ]:
Copied!
# RRF融合
print('\n=== RRF融合 ===')
fused_results = retriever.rrf_fusion(vector_results, bm25_results)
for doc_id, score in fused_results[:5]:
idx = int(doc_id.split('_')[1])
print(f'{doc_id}: {score:.4f} - {documents[idx][:50]}...')
# RRF融合
print('\n=== RRF融合 ===')
fused_results = retriever.rrf_fusion(vector_results, bm25_results)
for doc_id, score in fused_results[:5]:
idx = int(doc_id.split('_')[1])
print(f'{doc_id}: {score:.4f} - {documents[idx][:50]}...')
6. 重排序¶
In [ ]:
Copied!
def rerank_results(query, candidates, top_k=5):
"""使用CrossEncoder重排序"""
# 准备查询-文档对
pairs = [(query, retriever.documents[int(id.split('_')[1])]) for id, _ in candidates]
# 计算相关性分数
scores = reranker.predict(pairs)
# 组合并排序
reranked = [(candidates[i][0], float(scores[i])) for i in range(len(candidates))]
reranked.sort(key=lambda x: x[1], reverse=True)
return reranked[:top_k]
# 重排序
print('=== 重排序结果 ===')
reranked = rerank_results(query, fused_results[:10], top_k=5)
for doc_id, score in reranked:
idx = int(doc_id.split('_')[1])
print(f'{doc_id}: {score:.4f} - {documents[idx][:50]}...')
def rerank_results(query, candidates, top_k=5):
"""使用CrossEncoder重排序"""
# 准备查询-文档对
pairs = [(query, retriever.documents[int(id.split('_')[1])]) for id, _ in candidates]
# 计算相关性分数
scores = reranker.predict(pairs)
# 组合并排序
reranked = [(candidates[i][0], float(scores[i])) for i in range(len(candidates))]
reranked.sort(key=lambda x: x[1], reverse=True)
return reranked[:top_k]
# 重排序
print('=== 重排序结果 ===')
reranked = rerank_results(query, fused_results[:10], top_k=5)
for doc_id, score in reranked:
idx = int(doc_id.split('_')[1])
print(f'{doc_id}: {score:.4f} - {documents[idx][:50]}...')
7. 性能对比¶
In [ ]:
Copied!
import pandas as pd
# 对比不同方法的Top-3结果
comparison_data = []
for query in queries:
vector = retriever.vector_retrieve(query, top_k=3)
bm25 = retriever.bm25_retrieve(query, top_k=3)
fused = retriever.rrf_fusion(vector, bm25)[:3]
reranked = rerank_results(query, fused[:10], top_k=3)
comparison_data.append({
'query': query,
'vector_top1': vector[0][0] if vector else '',
'bm25_top1': bm25[0][0] if bm25 else '',
'fused_top1': fused[0][0] if fused else '',
'reranked_top1': reranked[0][0] if reranked else ''
})
df = pd.DataFrame(comparison_data)
print(df)
# 保存结果
df.to_csv('retrieval_comparison.csv', index=False)
print('\n结果已保存到 retrieval_comparison.csv')
import pandas as pd
# 对比不同方法的Top-3结果
comparison_data = []
for query in queries:
vector = retriever.vector_retrieve(query, top_k=3)
bm25 = retriever.bm25_retrieve(query, top_k=3)
fused = retriever.rrf_fusion(vector, bm25)[:3]
reranked = rerank_results(query, fused[:10], top_k=3)
comparison_data.append({
'query': query,
'vector_top1': vector[0][0] if vector else '',
'bm25_top1': bm25[0][0] if bm25 else '',
'fused_top1': fused[0][0] if fused else '',
'reranked_top1': reranked[0][0] if reranked else ''
})
df = pd.DataFrame(comparison_data)
print(df)
# 保存结果
df.to_csv('retrieval_comparison.csv', index=False)
print('\n结果已保存到 retrieval_comparison.csv')
8. 总结¶
本实验展示了:
- 向量检索:擅长语义相似度,适合概念性查询
- BM25检索:擅长关键词匹配,适合专有名词查询
- RRF融合:结合两者优势,无需归一化
- 重排序:进一步精炼结果,提升精度
建议:
- 简单查询:向量检索即可
- 复杂查询:混合检索+重排序
- 生产环境:需要根据实际数据调优