In [ ]:
Copied!
# 示例:传统检索的问题
query = "Python的异步编程如何使用?"
# 传统检索结果
retrieved_docs = [
"""Python异步编程使用async/await语法...
[此处省略2000字详细说明]
""",
"""asyncio是Python的标准库...
[此处省略2000字API文档]
""",
"""异步编程可以提高性能...
[此处省略2000字性能分析]
"""
]
total_tokens = sum(len(doc.split()) for doc in retrieved_docs)
print(f"检索到{len(retrieved_docs)}个文档")
print(f"总长度: {total_tokens} tokens")
print("\n问题:")
print(" ✓ 可能超出模型上下文限制")
print(" ✓ 大量冗余和无关信息")
print(" ✓ API调用成本高")
print(" ✓ 响应速度慢")
# 示例:传统检索的问题
query = "Python的异步编程如何使用?"
# 传统检索结果
retrieved_docs = [
"""Python异步编程使用async/await语法...
[此处省略2000字详细说明]
""",
"""asyncio是Python的标准库...
[此处省略2000字API文档]
""",
"""异步编程可以提高性能...
[此处省略2000字性能分析]
"""
]
total_tokens = sum(len(doc.split()) for doc in retrieved_docs)
print(f"检索到{len(retrieved_docs)}个文档")
print(f"总长度: {total_tokens} tokens")
print("\n问题:")
print(" ✓ 可能超出模型上下文限制")
print(" ✓ 大量冗余和无关信息")
print(" ✓ API调用成本高")
print(" ✓ 响应速度慢")
In [ ]:
Copied!
def truncate_by_length(text: str, max_length: int) -> str:
"""
按固定长度截断
保留开头80%和结尾20%
"""
if len(text) <= max_length:
return text
head_length = int(max_length * 0.8)
tail_length = max_length - head_length
head = text[:head_length]
tail = text[-tail_length:] if tail_length > 0 else ""
return f"{head}\n...[省略]...\n{tail}"
# 示例
long_text = "Python是一门高级编程语言。" * 100
compressed = truncate_by_length(long_text, max_length=200)
print(f"原文长度: {len(long_text)} 字符")
print(f"压缩后: {len(compressed)} 字符")
print(f"压缩率: {len(compressed)/len(long_text)*100:.1f}%")
print("\n压缩结果:")
print(compressed)
def truncate_by_length(text: str, max_length: int) -> str:
"""
按固定长度截断
保留开头80%和结尾20%
"""
if len(text) <= max_length:
return text
head_length = int(max_length * 0.8)
tail_length = max_length - head_length
head = text[:head_length]
tail = text[-tail_length:] if tail_length > 0 else ""
return f"{head}\n...[省略]...\n{tail}"
# 示例
long_text = "Python是一门高级编程语言。" * 100
compressed = truncate_by_length(long_text, max_length=200)
print(f"原文长度: {len(long_text)} 字符")
print(f"压缩后: {len(compressed)} 字符")
print(f"压缩率: {len(compressed)/len(long_text)*100:.1f}%")
print("\n压缩结果:")
print(compressed)
2.2 去除格式和冗余¶
In [ ]:
Copied!
import re
def clean_text(text: str) -> str:
"""
清理文本:去除格式标记和冗余内容
"""
# 去除多余空白
text = re.sub(r'\s+', ' ', text)
# 去除Markdown格式
text = re.sub(r'#{1,6}\s', '', text) # 标题
text = re.sub(r'\*\*(.*?)\*\*', r'\1', text) # 粗体
text = re.sub(r'\*(.*?)\*', r'\1', text) # 斜体
text = re.sub(r'`(.*?)`', r'\1', text) # 代码
# 去除HTML标签
text = re.sub(r'<[^>]+>', '', text)
return text.strip()
def remove_redundant_sentences(text: str) -> str:
"""
去除冗余句子(简化版)
"""
sentences = [s.strip() for s in text.split('。') if s.strip()]
unique_sentences = []
for sent in sentences:
# 检查是否与已有句子重复
is_duplicate = any(
sent in existing or existing in sent
for existing in unique_sentences
)
if not is_duplicate:
unique_sentences.append(sent)
return '。'.join(unique_sentences) + '。'
# 示例
text_with_format = """
## Python简介
**Python**是一门*高级*编程语言。
Python的语法非常简洁。
Python的代码很容易阅读。
Python的语法非常简洁。
"""
cleaned = clean_text(text_with_format)
deduped = remove_redundant_sentences(cleaned)
print("原始文本:")
print(text_with_format)
print("\n清理去重后:")
print(deduped)
import re
def clean_text(text: str) -> str:
"""
清理文本:去除格式标记和冗余内容
"""
# 去除多余空白
text = re.sub(r'\s+', ' ', text)
# 去除Markdown格式
text = re.sub(r'#{1,6}\s', '', text) # 标题
text = re.sub(r'\*\*(.*?)\*\*', r'\1', text) # 粗体
text = re.sub(r'\*(.*?)\*', r'\1', text) # 斜体
text = re.sub(r'`(.*?)`', r'\1', text) # 代码
# 去除HTML标签
text = re.sub(r'<[^>]+>', '', text)
return text.strip()
def remove_redundant_sentences(text: str) -> str:
"""
去除冗余句子(简化版)
"""
sentences = [s.strip() for s in text.split('。') if s.strip()]
unique_sentences = []
for sent in sentences:
# 检查是否与已有句子重复
is_duplicate = any(
sent in existing or existing in sent
for existing in unique_sentences
)
if not is_duplicate:
unique_sentences.append(sent)
return '。'.join(unique_sentences) + '。'
# 示例
text_with_format = """
## Python简介
**Python**是一门*高级*编程语言。
Python的语法非常简洁。
Python的代码很容易阅读。
Python的语法非常简洁。
"""
cleaned = clean_text(text_with_format)
deduped = remove_redundant_sentences(cleaned)
print("原始文本:")
print(text_with_format)
print("\n清理去重后:")
print(deduped)
In [ ]:
Copied!
from typing import List, Dict, Tuple
class SemanticCompressor:
"""
语义压缩器
特点:
1. 保留关键信息
2. 简化表达
3. 去除冗余
4. 保持可读性
"""
def __init__(self, llm=None):
self.llm = llm
def compress_single_document(self,
document: str,
compression_ratio: float = 0.25) -> str:
"""
压缩单个文档
Args:
document: 原始文档
compression_ratio: 压缩比例 (0-1)
Returns:
压缩后的文档
"""
target_length = int(len(document) * compression_ratio)
# 如果没有LLM,使用简单压缩
if not self.llm:
return self._simple_compress(document, target_length)
# 使用LLM压缩
prompt = f"""请将以下文档压缩到{target_length}字符左右。
要求:
1. 保留所有关键信息
2. 去除冗余和修饰内容
3. 简化表达但保持原意
4. 保持逻辑结构清晰
原文档:
{document}
请输出压缩后的文档:"""
return self.llm.predict(prompt)
def compress_multiple_documents(self,
documents: List[str],
query: str,
max_total_length: int = 1000) -> str:
"""
压缩并合并多个文档
"""
print(f"压缩{len(documents)}个文档,目标长度: {max_total_length}")
# 逐个压缩
compressed_docs = []
for i, doc in enumerate(documents, 1):
budget = max_total_length // len(documents)
compressed = self._compress_with_context(doc, query, budget)
compressed_docs.append(compressed)
# 合并
merged = "\n\n".join(compressed_docs)
# 如果超长,整体压缩
if len(merged) > max_total_length:
merged = self._simple_compress(merged, max_total_length)
return merged
def _compress_with_context(self, document: str, query: str, max_length: int) -> str:
"""基于查询上下文压缩文档"""
if not self.llm:
return self._simple_compress(document, max_length)
prompt = f"""用户查询: {query}
请将以下文档压缩到{max_length}字符以内,要求:
1. 优先保留与查询相关的内容
2. 保留关键细节和数据
3. 去除与查询无关的内容
文档:
{document}
压缩结果:"""
return self.llm.predict(prompt)
def _simple_compress(self, text: str, max_length: int) -> str:
"""简单压缩(不使用LLM)"""
if len(text) <= max_length:
return text
# 保留关键句子
sentences = [s.strip() for s in text.split('。') if s.strip()]
# 简单策略:保留前面的句子(通常包含更多关键信息)
result_sentences = []
current_length = 0
for sent in sentences:
if current_length + len(sent) <= max_length * 0.9:
result_sentences.append(sent)
current_length += len(sent)
else:
break
return '。'.join(result_sentences) + '。'
# 创建压缩器
compressor = SemanticCompressor()
print("语义压缩器创建成功!")
from typing import List, Dict, Tuple
class SemanticCompressor:
"""
语义压缩器
特点:
1. 保留关键信息
2. 简化表达
3. 去除冗余
4. 保持可读性
"""
def __init__(self, llm=None):
self.llm = llm
def compress_single_document(self,
document: str,
compression_ratio: float = 0.25) -> str:
"""
压缩单个文档
Args:
document: 原始文档
compression_ratio: 压缩比例 (0-1)
Returns:
压缩后的文档
"""
target_length = int(len(document) * compression_ratio)
# 如果没有LLM,使用简单压缩
if not self.llm:
return self._simple_compress(document, target_length)
# 使用LLM压缩
prompt = f"""请将以下文档压缩到{target_length}字符左右。
要求:
1. 保留所有关键信息
2. 去除冗余和修饰内容
3. 简化表达但保持原意
4. 保持逻辑结构清晰
原文档:
{document}
请输出压缩后的文档:"""
return self.llm.predict(prompt)
def compress_multiple_documents(self,
documents: List[str],
query: str,
max_total_length: int = 1000) -> str:
"""
压缩并合并多个文档
"""
print(f"压缩{len(documents)}个文档,目标长度: {max_total_length}")
# 逐个压缩
compressed_docs = []
for i, doc in enumerate(documents, 1):
budget = max_total_length // len(documents)
compressed = self._compress_with_context(doc, query, budget)
compressed_docs.append(compressed)
# 合并
merged = "\n\n".join(compressed_docs)
# 如果超长,整体压缩
if len(merged) > max_total_length:
merged = self._simple_compress(merged, max_total_length)
return merged
def _compress_with_context(self, document: str, query: str, max_length: int) -> str:
"""基于查询上下文压缩文档"""
if not self.llm:
return self._simple_compress(document, max_length)
prompt = f"""用户查询: {query}
请将以下文档压缩到{max_length}字符以内,要求:
1. 优先保留与查询相关的内容
2. 保留关键细节和数据
3. 去除与查询无关的内容
文档:
{document}
压缩结果:"""
return self.llm.predict(prompt)
def _simple_compress(self, text: str, max_length: int) -> str:
"""简单压缩(不使用LLM)"""
if len(text) <= max_length:
return text
# 保留关键句子
sentences = [s.strip() for s in text.split('。') if s.strip()]
# 简单策略:保留前面的句子(通常包含更多关键信息)
result_sentences = []
current_length = 0
for sent in sentences:
if current_length + len(sent) <= max_length * 0.9:
result_sentences.append(sent)
current_length += len(sent)
else:
break
return '。'.join(result_sentences) + '。'
# 创建压缩器
compressor = SemanticCompressor()
print("语义压缩器创建成功!")
In [ ]:
Copied!
# 测试语义压缩
documents = [
"Python异步编程使用async/await语法,可以显著提高IO密集型任务的性能。asyncio是Python的标准库,提供了事件循环、协程、Future等组件。",
"异步编程允许程序在等待IO操作时执行其他任务,从而提高并发性能。FastAPI、aiohttp等框架充分利用了异步特性。",
"Python的异步生态系统包括aiohttp、asyncpg等库。异步编程的挑战包括调试困难、并发控制等。"
]
query = "Python异步编程的优势是什么?"
compressed = compressor.compress_multiple_documents(
documents=documents,
query=query,
max_total_length=200
)
print("原始文档总长度:", sum(len(d) for d in documents))
print("压缩后长度:", len(compressed))
print(f"压缩率: {len(compressed)/sum(len(d) for d in documents)*100:.1f}%")
print("\n压缩结果:")
print(compressed)
# 测试语义压缩
documents = [
"Python异步编程使用async/await语法,可以显著提高IO密集型任务的性能。asyncio是Python的标准库,提供了事件循环、协程、Future等组件。",
"异步编程允许程序在等待IO操作时执行其他任务,从而提高并发性能。FastAPI、aiohttp等框架充分利用了异步特性。",
"Python的异步生态系统包括aiohttp、asyncpg等库。异步编程的挑战包括调试困难、并发控制等。"
]
query = "Python异步编程的优势是什么?"
compressed = compressor.compress_multiple_documents(
documents=documents,
query=query,
max_total_length=200
)
print("原始文档总长度:", sum(len(d) for d in documents))
print("压缩后长度:", len(compressed))
print(f"压缩率: {len(compressed)/sum(len(d) for d in documents)*100:.1f}%")
print("\n压缩结果:")
print(compressed)
In [ ]:
Copied!
class ParagraphCompressor:
"""
段落级压缩器
特点:
1. 段落级别的细粒度控制
2. 相关性筛选
3. 智能合并
"""
def __init__(self, llm=None):
self.llm = llm
def compress(self,
documents: List[str],
query: str,
target_length: int) -> Tuple[str, Dict]:
"""
段落级压缩主方法
Returns:
(压缩后的文档, 统计信息)
"""
stats = {
"original_length": sum(len(d) for d in documents),
"num_paragraphs": 0,
"num_kept": 0,
"num_discarded": 0,
}
# 步骤1: 拆分段落
paragraphs = self._split_into_paragraphs(documents)
stats["num_paragraphs"] = len(paragraphs)
print(f"拆分得到 {len(paragraphs)} 个段落")
# 步骤2: 评估相关性
scored_paragraphs = self._score_relevance(paragraphs, query)
# 步骤3: 选择段落
selected = self._select_paragraphs(scored_paragraphs, target_length)
stats["num_kept"] = len(selected)
stats["num_discarded"] = len(paragraphs) - len(selected)
# 步骤4: 压缩选中段落
compressed_paragraphs = [
self._compress_paragraph(para, query)
for para, score in selected
]
# 步骤5: 合并
final_doc = "\n\n".join(compressed_paragraphs)
stats["final_length"] = len(final_doc)
stats["compression_ratio"] = final_doc["final_length"] / stats["original_length"]
return final_doc, stats
def _split_into_paragraphs(self, documents: List[str]) -> List[str]:
"""将文档拆分为段落"""
paragraphs = []
for doc in documents:
doc_paragraphs = doc.split('\n\n')
paragraphs.extend([p.strip() for p in doc_paragraphs if p.strip()])
return paragraphs
def _score_relevance(self, paragraphs: List[str], query: str) -> List[Tuple[str, float]]:
"""评估段落相关性"""
scored = []
for para in paragraphs:
score = self._calculate_relevance(para, query)
scored.append((para, score))
# 按相关性排序
scored.sort(key=lambda x: x[1], reverse=True)
return scored
def _calculate_relevance(self, paragraph: str, query: str) -> float:
"""计算段落与查询的相关性(简化版)"""
query_words = set(query.lower().split())
para_words = set(paragraph.lower().split())
overlap = len(query_words & para_words)
return min(overlap / len(query_words), 1.0) if query_words else 0.0
def _select_paragraphs(self,
scored_paragraphs: List[Tuple[str, float]],
target_length: int) -> List[Tuple[str, float]]:
"""选择段落"""
selected = []
current_length = 0
for para, score in scored_paragraphs:
estimated_length = len(para) * 0.4
if current_length + estimated_length <= target_length:
selected.append((para, score))
current_length += estimated_length
else:
break
return selected
def _compress_paragraph(self, paragraph: str, query: str) -> str:
"""压缩单个段落"""
if not self.llm:
# 简单压缩
target_length = max(len(paragraph) * 0.5, 50)
sentences = paragraph.split('。')
result = []
current_len = 0
for sent in sentences:
if current_len + len(sent) <= target_length:
result.append(sent.strip())
current_len += len(sent)
else:
break
return '。'.join(result) + '。'
# 使用LLM压缩
target_length = max(len(paragraph) * 0.4, 50)
prompt = f"""用户查询: {query}\n\n请将以下段落压缩到{int(target_length)}字符:\n\n{paragraph}"""
return self.llm.predict(prompt)
# 创建段落压缩器
para_compressor = ParagraphCompressor()
print("段落压缩器创建成功!")
class ParagraphCompressor:
"""
段落级压缩器
特点:
1. 段落级别的细粒度控制
2. 相关性筛选
3. 智能合并
"""
def __init__(self, llm=None):
self.llm = llm
def compress(self,
documents: List[str],
query: str,
target_length: int) -> Tuple[str, Dict]:
"""
段落级压缩主方法
Returns:
(压缩后的文档, 统计信息)
"""
stats = {
"original_length": sum(len(d) for d in documents),
"num_paragraphs": 0,
"num_kept": 0,
"num_discarded": 0,
}
# 步骤1: 拆分段落
paragraphs = self._split_into_paragraphs(documents)
stats["num_paragraphs"] = len(paragraphs)
print(f"拆分得到 {len(paragraphs)} 个段落")
# 步骤2: 评估相关性
scored_paragraphs = self._score_relevance(paragraphs, query)
# 步骤3: 选择段落
selected = self._select_paragraphs(scored_paragraphs, target_length)
stats["num_kept"] = len(selected)
stats["num_discarded"] = len(paragraphs) - len(selected)
# 步骤4: 压缩选中段落
compressed_paragraphs = [
self._compress_paragraph(para, query)
for para, score in selected
]
# 步骤5: 合并
final_doc = "\n\n".join(compressed_paragraphs)
stats["final_length"] = len(final_doc)
stats["compression_ratio"] = final_doc["final_length"] / stats["original_length"]
return final_doc, stats
def _split_into_paragraphs(self, documents: List[str]) -> List[str]:
"""将文档拆分为段落"""
paragraphs = []
for doc in documents:
doc_paragraphs = doc.split('\n\n')
paragraphs.extend([p.strip() for p in doc_paragraphs if p.strip()])
return paragraphs
def _score_relevance(self, paragraphs: List[str], query: str) -> List[Tuple[str, float]]:
"""评估段落相关性"""
scored = []
for para in paragraphs:
score = self._calculate_relevance(para, query)
scored.append((para, score))
# 按相关性排序
scored.sort(key=lambda x: x[1], reverse=True)
return scored
def _calculate_relevance(self, paragraph: str, query: str) -> float:
"""计算段落与查询的相关性(简化版)"""
query_words = set(query.lower().split())
para_words = set(paragraph.lower().split())
overlap = len(query_words & para_words)
return min(overlap / len(query_words), 1.0) if query_words else 0.0
def _select_paragraphs(self,
scored_paragraphs: List[Tuple[str, float]],
target_length: int) -> List[Tuple[str, float]]:
"""选择段落"""
selected = []
current_length = 0
for para, score in scored_paragraphs:
estimated_length = len(para) * 0.4
if current_length + estimated_length <= target_length:
selected.append((para, score))
current_length += estimated_length
else:
break
return selected
def _compress_paragraph(self, paragraph: str, query: str) -> str:
"""压缩单个段落"""
if not self.llm:
# 简单压缩
target_length = max(len(paragraph) * 0.5, 50)
sentences = paragraph.split('。')
result = []
current_len = 0
for sent in sentences:
if current_len + len(sent) <= target_length:
result.append(sent.strip())
current_len += len(sent)
else:
break
return '。'.join(result) + '。'
# 使用LLM压缩
target_length = max(len(paragraph) * 0.4, 50)
prompt = f"""用户查询: {query}\n\n请将以下段落压缩到{int(target_length)}字符:\n\n{paragraph}"""
return self.llm.predict(prompt)
# 创建段落压缩器
para_compressor = ParagraphCompressor()
print("段落压缩器创建成功!")
In [ ]:
Copied!
# 测试段落压缩
docs = [
"""Python的异步编程是一个重要特性。
async/await语法是Python 3.5引入的,用于简化异步代码的编写。
asyncio是Python的标准库,提供了事件循环、协程、Future等组件。""",
"""异步编程可以显著提高IO密集型应用的性能。
在Web开发中,异步框架如FastAPI、aiohttp可以利用异步特性处理大量并发请求。"""
]
query = "Python异步编程的特点和优势"
result, stats = para_compressor.compress(docs, query, target_length=300)
print("压缩结果:")
print(result)
print("\n统计信息:")
for k, v in stats.items():
print(f" {k}: {v}")
# 测试段落压缩
docs = [
"""Python的异步编程是一个重要特性。
async/await语法是Python 3.5引入的,用于简化异步代码的编写。
asyncio是Python的标准库,提供了事件循环、协程、Future等组件。""",
"""异步编程可以显著提高IO密集型应用的性能。
在Web开发中,异步框架如FastAPI、aiohttp可以利用异步特性处理大量并发请求。"""
]
query = "Python异步编程的特点和优势"
result, stats = para_compressor.compress(docs, query, target_length=300)
print("压缩结果:")
print(result)
print("\n统计信息:")
for k, v in stats.items():
print(f" {k}: {v}")
In [ ]:
Copied!
class CompressionEvaluator:
"""
压缩效果评估器
"""
def evaluate(self, original: str, compressed: str, query: str) -> Dict:
"""
评估压缩效果
"""
return {
"compression_ratio": self._compression_ratio(original, compressed),
"token_savings": self._token_savings(original, compressed),
"semantic_similarity": self._semantic_similarity(original, compressed),
"query_relevance": self._query_relevance(compressed, query),
}
def _compression_ratio(self, original: str, compressed: str) -> float:
"""压缩比例"""
return len(compressed) / len(original)
def _token_savings(self, original: str, compressed: str) -> Dict:
"""Token节省(简化估算)"""
original_tokens = len(original) / 2
compressed_tokens = len(compressed) / 2
return {
"original_tokens": int(original_tokens),
"compressed_tokens": int(compressed_tokens),
"saved_tokens": int(original_tokens - compressed_tokens),
"savings_ratio": (original_tokens - compressed_tokens) / original_tokens,
}
def _semantic_similarity(self, original: str, compressed: str) -> float:
"""语义相似度(简化版)"""
orig_words = set(original.lower().split())
comp_words = set(compressed.lower().split())
overlap = len(orig_words & comp_words)
union = len(orig_words | comp_words)
return overlap / union if union > 0 else 0.0
def _query_relevance(self, compressed: str, query: str) -> float:
"""查询相关性"""
query_words = set(query.lower().split())
comp_words = set(compressed.lower().split())
overlap = len(query_words & comp_words)
return overlap / len(query_words) if query_words else 0.0
# 测试评估
evaluator = CompressionEvaluator()
original = "Python异步编程使用async/await语法。asyncio是Python的标准库,提供了事件循环机制。异步编程可以提高程序的并发性能,特别适合IO密集型任务。"
compressed = "Python异步编程用async/await语法,通过asyncio的事件循环机制提升IO密集型任务的并发性能。"
query = "Python异步编程"
results = evaluator.evaluate(original, compressed, query)
print("压缩效果评估:")
for metric, value in results.items():
if isinstance(value, dict):
print(f"\n{metric}:")
for k, v in value.items():
print(f" {k}: {v:.2f}" if isinstance(v, float) else f" {k}: {v}")
else:
print(f"{metric}: {value:.2%}")
class CompressionEvaluator:
"""
压缩效果评估器
"""
def evaluate(self, original: str, compressed: str, query: str) -> Dict:
"""
评估压缩效果
"""
return {
"compression_ratio": self._compression_ratio(original, compressed),
"token_savings": self._token_savings(original, compressed),
"semantic_similarity": self._semantic_similarity(original, compressed),
"query_relevance": self._query_relevance(compressed, query),
}
def _compression_ratio(self, original: str, compressed: str) -> float:
"""压缩比例"""
return len(compressed) / len(original)
def _token_savings(self, original: str, compressed: str) -> Dict:
"""Token节省(简化估算)"""
original_tokens = len(original) / 2
compressed_tokens = len(compressed) / 2
return {
"original_tokens": int(original_tokens),
"compressed_tokens": int(compressed_tokens),
"saved_tokens": int(original_tokens - compressed_tokens),
"savings_ratio": (original_tokens - compressed_tokens) / original_tokens,
}
def _semantic_similarity(self, original: str, compressed: str) -> float:
"""语义相似度(简化版)"""
orig_words = set(original.lower().split())
comp_words = set(compressed.lower().split())
overlap = len(orig_words & comp_words)
union = len(orig_words | comp_words)
return overlap / union if union > 0 else 0.0
def _query_relevance(self, compressed: str, query: str) -> float:
"""查询相关性"""
query_words = set(query.lower().split())
comp_words = set(compressed.lower().split())
overlap = len(query_words & comp_words)
return overlap / len(query_words) if query_words else 0.0
# 测试评估
evaluator = CompressionEvaluator()
original = "Python异步编程使用async/await语法。asyncio是Python的标准库,提供了事件循环机制。异步编程可以提高程序的并发性能,特别适合IO密集型任务。"
compressed = "Python异步编程用async/await语法,通过asyncio的事件循环机制提升IO密集型任务的并发性能。"
query = "Python异步编程"
results = evaluator.evaluate(original, compressed, query)
print("压缩效果评估:")
for metric, value in results.items():
if isinstance(value, dict):
print(f"\n{metric}:")
for k, v in value.items():
print(f" {k}: {v:.2f}" if isinstance(v, float) else f" {k}: {v}")
else:
print(f"{metric}: {value:.2%}")
6. 实战练习¶
练习1: 对比不同压缩策略¶
尝试对比:
- 固定长度截断
- 语义压缩
- 段落级压缩
评估它们的:
- 压缩率
- 语义保留度
- 查询相关性
练习2: 集成真实LLM¶
- 集成OpenAI或其他LLM
- 实现完整的语义压缩
- 对比使用LLM前后的效果
练习3: 优化相关性计算¶
- 使用embedding计算相似度
- 实现更智能的段落选择
- 测试对压缩效果的影响
总结¶
本节学习了检索压缩的核心技术:
- 压缩价值: 节省成本、提升速度、提高质量
- 压缩策略: 简单截断、语义压缩、段落级压缩
- 实现方法: SemanticCompressor、ParagraphCompressor
- 效果评估: 多维度评估压缩质量
应用建议:
- 简单场景: 规则压缩
- 复杂场景: 语义压缩
- 高质量要求: 段落级压缩
下一步:
- 尝试在实际RAG系统中应用检索压缩
- 根据具体场景优化压缩策略
- 监控压缩对最终答案质量的影响