1. 环境准备¶
In [ ]:
Copied!
# 导入必要的库
import re
import numpy as np
from typing import List, Dict, Any
from dataclasses import dataclass
print("检查环境...")
print(f"NumPy版本: {np.__version__}")
print("\n环境准备完成!")
# 导入必要的库
import re
import numpy as np
from typing import List, Dict, Any
from dataclasses import dataclass
print("检查环境...")
print(f"NumPy版本: {np.__version__}")
print("\n环境准备完成!")
In [ ]:
Copied!
# 示例:不同分块策略的效果对比
sample_text = """
人工智能(AI)是计算机科学的一个分支。它致力于创建能够模拟人类智能的系统。
机器学习是AI的子领域,专注于从数据中学习模式。
深度学习则是机器学习的进一步发展,使用多层神经网络。
这些技术已经广泛应用于图像识别、自然语言处理等领域。
"""
print("示例文档:")
print("=" * 60)
print(sample_text)
print("=" * 60)
print(f"总字符数: {len(sample_text)}")
# 示例:不同分块策略的效果对比
sample_text = """
人工智能(AI)是计算机科学的一个分支。它致力于创建能够模拟人类智能的系统。
机器学习是AI的子领域,专注于从数据中学习模式。
深度学习则是机器学习的进一步发展,使用多层神经网络。
这些技术已经广泛应用于图像识别、自然语言处理等领域。
"""
print("示例文档:")
print("=" * 60)
print(sample_text)
print("=" * 60)
print(f"总字符数: {len(sample_text)}")
In [ ]:
Copied!
@dataclass
class Chunk:
"""
文档块
"""
content: str
metadata: Dict[str, Any]
class FixedLengthChunker:
"""
固定长度分块器
"""
def __init__(self, chunk_size: int = 200, chunk_overlap: int = 50):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split(self, text: str) -> List[Chunk]:
"""
按固定长度分块
"""
chunks = []
start = 0
chunk_id = 0
while start < len(text):
end = start + self.chunk_size
chunk_text = text[start:end].strip()
if chunk_text:
chunks.append(Chunk(
content=chunk_text,
metadata={
"chunk_id": chunk_id,
"start": start,
"end": end,
"length": len(chunk_text)
}
))
chunk_id += 1
start = end - self.chunk_overlap
return chunks
# 测试固定长度分块
chunker = FixedLengthChunker(chunk_size=100, chunk_overlap=20)
chunks = chunker.split(sample_text)
print("\n固定长度分块结果:")
print("-" * 60)
print(f"分块数量: {len(chunks)}\n")
for chunk in chunks:
print(f"块 {chunk.metadata['chunk_id']}: ")
print(f" 长度: {chunk.metadata['length']} 字符")
print(f" 内容: {chunk.content[:80]}...\n")
@dataclass
class Chunk:
"""
文档块
"""
content: str
metadata: Dict[str, Any]
class FixedLengthChunker:
"""
固定长度分块器
"""
def __init__(self, chunk_size: int = 200, chunk_overlap: int = 50):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split(self, text: str) -> List[Chunk]:
"""
按固定长度分块
"""
chunks = []
start = 0
chunk_id = 0
while start < len(text):
end = start + self.chunk_size
chunk_text = text[start:end].strip()
if chunk_text:
chunks.append(Chunk(
content=chunk_text,
metadata={
"chunk_id": chunk_id,
"start": start,
"end": end,
"length": len(chunk_text)
}
))
chunk_id += 1
start = end - self.chunk_overlap
return chunks
# 测试固定长度分块
chunker = FixedLengthChunker(chunk_size=100, chunk_overlap=20)
chunks = chunker.split(sample_text)
print("\n固定长度分块结果:")
print("-" * 60)
print(f"分块数量: {len(chunks)}\n")
for chunk in chunks:
print(f"块 {chunk.metadata['chunk_id']}: ")
print(f" 长度: {chunk.metadata['length']} 字符")
print(f" 内容: {chunk.content[:80]}...\n")
In [ ]:
Copied!
class ParagraphChunker:
"""
按段落分块
"""
def __init__(self, min_length: int = 50):
self.min_length = min_length
def split(self, text: str) -> List[Chunk]:
"""
按段落切分
"""
# 按双换行符分段
paragraphs = re.split(r'\n\s*\n', text.strip())
chunks = []
for i, para in enumerate(paragraphs):
para = para.strip()
if len(para) >= self.min_length:
chunks.append(Chunk(
content=para,
metadata={
"chunk_id": i,
"type": "paragraph",
"length": len(para)
}
))
return chunks
# 测试段落分块
para_chunker = ParagraphChunker(min_length=30)
para_chunks = para_chunker.split(sample_text)
print("\n按段落分块结果:")
print("-" * 60)
print(f"分块数量: {len(para_chunks)}\n")
for chunk in para_chunks:
print(f"块 {chunk.metadata['chunk_id']}:")
print(f" 类型: {chunk.metadata['type']}")
print(f" 长度: {chunk.metadata['length']} 字符")
print(f" 内容: {chunk.content}\n")
class ParagraphChunker:
"""
按段落分块
"""
def __init__(self, min_length: int = 50):
self.min_length = min_length
def split(self, text: str) -> List[Chunk]:
"""
按段落切分
"""
# 按双换行符分段
paragraphs = re.split(r'\n\s*\n', text.strip())
chunks = []
for i, para in enumerate(paragraphs):
para = para.strip()
if len(para) >= self.min_length:
chunks.append(Chunk(
content=para,
metadata={
"chunk_id": i,
"type": "paragraph",
"length": len(para)
}
))
return chunks
# 测试段落分块
para_chunker = ParagraphChunker(min_length=30)
para_chunks = para_chunker.split(sample_text)
print("\n按段落分块结果:")
print("-" * 60)
print(f"分块数量: {len(para_chunks)}\n")
for chunk in para_chunks:
print(f"块 {chunk.metadata['chunk_id']}:")
print(f" 类型: {chunk.metadata['type']}")
print(f" 长度: {chunk.metadata['length']} 字符")
print(f" 内容: {chunk.content}\n")
4.2 按句子分块¶
In [ ]:
Copied!
class SentenceChunker:
"""
按句子分块
"""
def __init__(self, sentences_per_chunk: int = 3, overlap: int = 1):
self.sentences_per_chunk = sentences_per_chunk
self.overlap = overlap
def split(self, text: str) -> List[Chunk]:
"""
按句子切分(保持语义完整)
"""
# 简单的句子分割(按句号、问号、感叹号)
sentences = re.split(r'([。!?.!?])', text)
# 重组句子
full_sentences = []
for i in range(0, len(sentences) - 1, 2):
sentence = sentences[i] + (sentences[i+1] if i+1 < len(sentences) else '')
sentence = sentence.strip()
if sentence:
full_sentences.append(sentence)
# 按指定句数分块
chunks = []
chunk_id = 0
for i in range(0, len(full_sentences), self.sentences_per_chunk - self.overlap):
chunk_sentences = full_sentences[i:i + self.sentences_per_chunk]
if chunk_sentences:
chunk_text = ' '.join(chunk_sentences)
chunks.append(Chunk(
content=chunk_text,
metadata={
"chunk_id": chunk_id,
"type": "sentence_group",
"num_sentences": len(chunk_sentences)
}
))
chunk_id += 1
return chunks
# 测试句子分块
sent_chunker = SentenceChunker(sentences_per_chunk=2, overlap=0)
sent_chunks = sent_chunker.split(sample_text)
print("\n按句子分块结果:")
print("-" * 60)
print(f"分块数量: {len(sent_chunks)}\n")
for chunk in sent_chunks[:3]: # 只显示前3个
print(f"块 {chunk.metadata['chunk_id']}:")
print(f" 句子数: {chunk.metadata['num_sentences']}")
print(f" 内容: {chunk.content}\n")
class SentenceChunker:
"""
按句子分块
"""
def __init__(self, sentences_per_chunk: int = 3, overlap: int = 1):
self.sentences_per_chunk = sentences_per_chunk
self.overlap = overlap
def split(self, text: str) -> List[Chunk]:
"""
按句子切分(保持语义完整)
"""
# 简单的句子分割(按句号、问号、感叹号)
sentences = re.split(r'([。!?.!?])', text)
# 重组句子
full_sentences = []
for i in range(0, len(sentences) - 1, 2):
sentence = sentences[i] + (sentences[i+1] if i+1 < len(sentences) else '')
sentence = sentence.strip()
if sentence:
full_sentences.append(sentence)
# 按指定句数分块
chunks = []
chunk_id = 0
for i in range(0, len(full_sentences), self.sentences_per_chunk - self.overlap):
chunk_sentences = full_sentences[i:i + self.sentences_per_chunk]
if chunk_sentences:
chunk_text = ' '.join(chunk_sentences)
chunks.append(Chunk(
content=chunk_text,
metadata={
"chunk_id": chunk_id,
"type": "sentence_group",
"num_sentences": len(chunk_sentences)
}
))
chunk_id += 1
return chunks
# 测试句子分块
sent_chunker = SentenceChunker(sentences_per_chunk=2, overlap=0)
sent_chunks = sent_chunker.split(sample_text)
print("\n按句子分块结果:")
print("-" * 60)
print(f"分块数量: {len(sent_chunks)}\n")
for chunk in sent_chunks[:3]: # 只显示前3个
print(f"块 {chunk.metadata['chunk_id']}:")
print(f" 句子数: {chunk.metadata['num_sentences']}")
print(f" 内容: {chunk.content}\n")
4.3 递归分块¶
In [ ]:
Copied!
class RecursiveChunker:
"""
递归分块器
尝试不同的分隔符,优先使用更高级别的分隔符
"""
def __init__(self, chunk_size: int = 200, chunk_overlap: int = 50):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# 按优先级排序的分隔符
self.separators = ["\n\n", "\n", "。", "", " "]
def split(self, text: str, separators: List[str] = None) -> List[Chunk]:
"""
递归分块
"""
if separators is None:
separators = self.separators
# 最后的回退:固定长度分块
if not separators:
return FixedLengthChunker(self.chunk_size, self.chunk_overlap).split(text)
# 使用当前分隔符
separator = separators[0]
remaining_separators = separators[1:]
# 分割文本
splits = text.split(separator)
# 检查每段大小
chunks = []
current_chunk = ""
chunk_id = 0
for split in splits:
# 加上分隔符
if current_chunk:
split_with_sep = separator + split
else:
split_with_sep = split
# 检查是否超过chunk_size
if len(current_chunk) + len(split_with_sep) < self.chunk_size:
current_chunk += split_with_sep
else:
# 保存当前块
if current_chunk:
chunks.append(Chunk(
content=current_chunk.strip(),
metadata={"chunk_id": chunk_id, "separator": separator}
))
chunk_id += 1
# 处理过长的split(递归使用下一级分隔符)
if len(split) > self.chunk_size:
sub_chunks = self.split(split, remaining_separators)
chunks.extend(sub_chunks)
current_chunk = ""
else:
current_chunk = split
# 保存最后一块
if current_chunk:
chunks.append(Chunk(
content=current_chunk.strip(),
metadata={"chunk_id": chunk_id, "separator": separator}
))
return chunks
# 测试递归分块
recursive_chunker = RecursiveChunker(chunk_size=150, chunk_overlap=30)
recursive_chunks = recursive_chunker.split(sample_text)
print("\n递归分块结果:")
print("-" * 60)
print(f"分块数量: {len(recursive_chunks)}\n")
for chunk in recursive_chunks:
print(f"块 {chunk.metadata['chunk_id']}:")
print(f" 分隔符: {repr(chunk.metadata['separator'])}")
print(f" 内容: {chunk.content[:80]}...\n")
class RecursiveChunker:
"""
递归分块器
尝试不同的分隔符,优先使用更高级别的分隔符
"""
def __init__(self, chunk_size: int = 200, chunk_overlap: int = 50):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# 按优先级排序的分隔符
self.separators = ["\n\n", "\n", "。", "", " "]
def split(self, text: str, separators: List[str] = None) -> List[Chunk]:
"""
递归分块
"""
if separators is None:
separators = self.separators
# 最后的回退:固定长度分块
if not separators:
return FixedLengthChunker(self.chunk_size, self.chunk_overlap).split(text)
# 使用当前分隔符
separator = separators[0]
remaining_separators = separators[1:]
# 分割文本
splits = text.split(separator)
# 检查每段大小
chunks = []
current_chunk = ""
chunk_id = 0
for split in splits:
# 加上分隔符
if current_chunk:
split_with_sep = separator + split
else:
split_with_sep = split
# 检查是否超过chunk_size
if len(current_chunk) + len(split_with_sep) < self.chunk_size:
current_chunk += split_with_sep
else:
# 保存当前块
if current_chunk:
chunks.append(Chunk(
content=current_chunk.strip(),
metadata={"chunk_id": chunk_id, "separator": separator}
))
chunk_id += 1
# 处理过长的split(递归使用下一级分隔符)
if len(split) > self.chunk_size:
sub_chunks = self.split(split, remaining_separators)
chunks.extend(sub_chunks)
current_chunk = ""
else:
current_chunk = split
# 保存最后一块
if current_chunk:
chunks.append(Chunk(
content=current_chunk.strip(),
metadata={"chunk_id": chunk_id, "separator": separator}
))
return chunks
# 测试递归分块
recursive_chunker = RecursiveChunker(chunk_size=150, chunk_overlap=30)
recursive_chunks = recursive_chunker.split(sample_text)
print("\n递归分块结果:")
print("-" * 60)
print(f"分块数量: {len(recursive_chunks)}\n")
for chunk in recursive_chunks:
print(f"块 {chunk.metadata['chunk_id']}:")
print(f" 分隔符: {repr(chunk.metadata['separator'])}")
print(f" 内容: {chunk.content[:80]}...\n")
5. 分块策略对比¶
In [ ]:
Copied!
# 对比不同分块策略
chunkers = {
"固定长度": FixedLengthChunker(chunk_size=100, chunk_overlap=20),
"按段落": ParagraphChunker(min_length=30),
"按句子": SentenceChunker(sentences_per_chunk=2, overlap=0),
"递归分块": RecursiveChunker(chunk_size=150, chunk_overlap=30),
}
print("\n分块策略对比:")
print("=" * 80)
print(f"{'策略':<15} {'块数量':<10} {'平均长度':<15} {'最小长度':<15} {'最大长度':<15}")
print("=" * 80)
for name, chunker in chunkers.items():
chunks = chunker.split(sample_text)
if chunks:
lengths = [c.metadata.get('length', len(c.content)) for c in chunks]
avg_length = np.mean(lengths)
min_length = np.min(lengths)
max_length = np.max(lengths)
print(f"{name:<15} {len(chunks):<10} {avg_length:<15.1f} {min_length:<15.0f} {max_length:<15.0f}")
else:
print(f"{name:<15} {'无块':<10} {'-':<15} {'-':<15} {'-':<15}")
# 对比不同分块策略
chunkers = {
"固定长度": FixedLengthChunker(chunk_size=100, chunk_overlap=20),
"按段落": ParagraphChunker(min_length=30),
"按句子": SentenceChunker(sentences_per_chunk=2, overlap=0),
"递归分块": RecursiveChunker(chunk_size=150, chunk_overlap=30),
}
print("\n分块策略对比:")
print("=" * 80)
print(f"{'策略':<15} {'块数量':<10} {'平均长度':<15} {'最小长度':<15} {'最大长度':<15}")
print("=" * 80)
for name, chunker in chunkers.items():
chunks = chunker.split(sample_text)
if chunks:
lengths = [c.metadata.get('length', len(c.content)) for c in chunks]
avg_length = np.mean(lengths)
min_length = np.min(lengths)
max_length = np.max(lengths)
print(f"{name:<15} {len(chunks):<10} {avg_length:<15.1f} {min_length:<15.0f} {max_length:<15.0f}")
else:
print(f"{name:<15} {'无块':<10} {'-':<15} {'-':<15} {'-':<15}")
6. 实战:选择最佳分块策略¶
In [ ]:
Copied!
def recommend_chunking_strategy(
text_type: str,
query_type: str,
doc_length: int
) -> str:
"""
根据场景推荐分块策略
Args:
text_type: 文本类型 (article/code/conversation/report)
query_type: 查询类型 (specific/broad)
doc_length: 文档长度
Returns:
推荐策略
"""
# 代码文档:按函数/类分块
if text_type == "code":
return "使用代码特定的AST分块"
# 对话:按对话轮次
if text_type == "conversation":
return "按对话轮次分块"
# 技术报告:按章节
if text_type == "report":
return "按章节/段落分块(递归分块)"
# 文章
if text_type == "article":
if query_type == "specific":
# 具体查询:小块
return "按句子分块,每块2-3句"
else:
# 广泛查询:大块
return "按段落分块,保持上下文"
# 默认
return "递归分块,chunk_size=512, overlap=50"
# 测试推荐
scenarios = [
{"text_type": "article", "query_type": "specific", "doc_length": 2000},
{"text_type": "code", "query_type": "specific", "doc_length": 5000},
{"text_type": "report", "query_type": "broad", "doc_length": 10000},
]
print("\n分块策略推荐:")
print("-" * 60)
for i, scenario in enumerate(scenarios, 1):
recommendation = recommend_chunking_strategy(**scenario)
print(f"\n场景{i}: {scenario}")
print(f"推荐: {recommendation}")
def recommend_chunking_strategy(
text_type: str,
query_type: str,
doc_length: int
) -> str:
"""
根据场景推荐分块策略
Args:
text_type: 文本类型 (article/code/conversation/report)
query_type: 查询类型 (specific/broad)
doc_length: 文档长度
Returns:
推荐策略
"""
# 代码文档:按函数/类分块
if text_type == "code":
return "使用代码特定的AST分块"
# 对话:按对话轮次
if text_type == "conversation":
return "按对话轮次分块"
# 技术报告:按章节
if text_type == "report":
return "按章节/段落分块(递归分块)"
# 文章
if text_type == "article":
if query_type == "specific":
# 具体查询:小块
return "按句子分块,每块2-3句"
else:
# 广泛查询:大块
return "按段落分块,保持上下文"
# 默认
return "递归分块,chunk_size=512, overlap=50"
# 测试推荐
scenarios = [
{"text_type": "article", "query_type": "specific", "doc_length": 2000},
{"text_type": "code", "query_type": "specific", "doc_length": 5000},
{"text_type": "report", "query_type": "broad", "doc_length": 10000},
]
print("\n分块策略推荐:")
print("-" * 60)
for i, scenario in enumerate(scenarios, 1):
recommendation = recommend_chunking_strategy(**scenario)
print(f"\n场景{i}: {scenario}")
print(f"推荐: {recommendation}")