Skip to content

milvus pdf 嵌入实战

一、环境准备

# 基础依赖
!pip install pymupdf openai sentence-transformers pymilvus
  • 推荐Python 3.8+环境(网页8)
  • 确保Milvus服务已启动(默认端口19530)
  • 建议配置8GB+内存(处理大规模PDF时)

二、PDF文本处理

1. 文本提取

python
def extract_pdf_text(path):
    """
    从PDF文件中提取文本内容
    
    参数:
        path: PDF文件路径
        
    返回:
        提取的文本字符串
    """
    try:
        doc = pymupdf.open(path)
        return " ".join([page.get_text() for page in doc])
    except Exception as e:
        raise RuntimeError(f"PDF解析失败: {str(e)}")

2. 文本重构增强

python
def reload_text(path):
    """
    使用大模型重构PDF提取的文本,提高文本质量
    
    参数:
        path: PDF文件路径
        
    返回:
        重构后的文本字符串
    """
    text = extract_pdf_text(path)

    # 初始化OpenAI客户端,连接到阿里云DashScope
    client = OpenAI(
        api_key = api_key,
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")
    
    try:
        # 调用DeepSeek-v3模型重构文本
        response = client.chat.completions.create(
            model="deepseek-v3",
            messages=[
                {"role": "user", 
                "content": "这是pdf 解析出来的文档 请你仔细阅读,将内容调整为完整的句子。同时需要确保不能丢失原文的信息。解析文档内容如下:{}".format(text)
                }
                ]
        )
        reload_text=response.choices[0].message.content
        return reload_text
    except Exception as e:
        print(f"API调用失败,启用降级处理: {str(e)}")
        # 降级处理:清理特殊字符并合并空格
        cleaned_text = re.sub(r'[^\w\u4e00-\u9fff\s。?!;,]', '', text)
        return re.sub(r'\s+', ' ', cleaned_text)

注意事项

  • 添加异常降级处理逻辑

三、向量化处理

1. 智能分块策略

python
def split_text(path):
    """
    对重构后的文本进行分段并生成向量表示
    
    参数:
        path: PDF文件路径
        
    返回:
        tuple: (分段文本列表, 对应的向量表示)
    """
    text=reload_text(path)
    
    # 使用正则表达式按照标点符号分割文本
    split_pattern = r'(?<=[。!?;.?!!;\n])\s*'
    chunks = [c.strip() for c in re.split(split_pattern, text) if len(c.strip()) > 20]

    # 滑动窗口合并短文本,每5个句子为一组
    final_chunks = []
    for i in range(0, len(chunks), 3):
        window = chunks[i:i+5]
        final_chunks.append(" ".join(window))

    # 打印分段结果
    for index,value in enumerate(final_chunks):
        print(f"第{index}个句子-->{value}")
    
    # 加载本地预训练的向量模型
    model = SentenceTransformer('C:/Users/Administrator/Desktop/easy-vectordb/model/bge-small-zh-v1.5')
    
    # 生成文本向量并进行L2归一化
    embeddings = model.encode(final_chunks)
    embeddings = normalize(embeddings, norm='l2')
    
    return final_chunks, embeddings

最佳实践

  • 采用滑动窗口重叠分块/单独一个句子也是可以的
  • 添加文本清洗步骤:去除特殊符号/乱码
  • 对短文本进行合并处理

四、Milvus集成

1. 集合配置优化

python
# 增强型schema配置(网页3)
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),  # 主键字段
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=512),  # 512维浮点向量
    FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=2000)  # 原始文本内容
]

schema = CollectionSchema(fields, 
    description="PDF知识库",
    enable_dynamic_field=True)  # 开启动态字段
collection = Collection("pdf_test", schema)

索引策略

python
# 复合索引配置(网页5)
vector_index = {
    "index_type": "IVF_PQ",
    "metric_type": "IP", 
    "params": {"nlist": 2048, "m": 32}
}

collection.create_index("vector", vector_index)
collection.create_index("content", {"index_type": "TRIE"})

2. 批量插入

python
data = [
    [i for i in range(len(texts_list))], # 主键ID列表
    embeddings.tolist(), # 向量列表(转换为Python列表)
    texts_list # 原始文本列表
]

# 执行插入并刷新内存
collection.insert(data)
collection.flush()
collection.load()  # 将索引加载到内存

五、完整工作流

python
"""
PDF向量存储完整工作流
功能:解析PDF->文本重构->分块处理->向量生成->Milvus存储
环境要求:Python 3.10+,Milvus 2.4.x
"""
import pymupdf
from openai import OpenAI
from sklearn.preprocessing import normalize
from sentence_transformers import SentenceTransformer
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection
import re

# API配置 - 使用兼容模式连接到阿里云DashScope
api_key="sk-XXXXXXXX"
api_base="https://dashscope.aliyuncs.com/compatible-mode/v1"

def extract_pdf_text(path):
    """
    从PDF文件中提取文本内容
    
    参数:
        path: PDF文件路径
        
    返回:
        提取的文本字符串
    """
    try:
        doc = pymupdf.open(path)
        return " ".join([page.get_text() for page in doc])
    except Exception as e:
        raise RuntimeError(f"PDF解析失败: {str(e)}")

def reload_text(path):
    """
    使用大模型重构PDF提取的文本,提高文本质量
    
    参数:
        path: PDF文件路径
        
    返回:
        重构后的文本字符串
    """
    text = extract_pdf_text(path)

    # 初始化OpenAI客户端,连接到阿里云DashScope
    client = OpenAI(
        api_key = api_key,
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")
    
    try:
        # 调用DeepSeek-v3模型重构文本
        response = client.chat.completions.create(
            model="deepseek-v3",
            messages=[
                {"role": "user", 
                "content": "这是pdf 解析出来的文档 请你仔细阅读,将内容调整为完整的句子。同时需要确保不能丢失原文的信息。解析文档内容如下:{}".format(text)
                }
                ]
        )
        reload_text=response.choices[0].message.content
        return reload_text
    except Exception as e:
        print(f"API调用失败,启用降级处理: {str(e)}")
        # 降级处理:清理特殊字符并合并空格
        cleaned_text = re.sub(r'[^\w\u4e00-\u9fff\s。?!;,]', '', text)
        return re.sub(r'\s+', ' ', cleaned_text)

def split_text(path):
    """
    对重构后的文本进行分段并生成向量表示
    
    参数:
        path: PDF文件路径
        
    返回:
        tuple: (分段文本列表, 对应的向量表示)
    """
    text=reload_text(path)
    
    # 使用正则表达式按照标点符号分割文本
    split_pattern = r'(?<=[。!?;.?!!;\n])\s*'
    chunks = [c.strip() for c in re.split(split_pattern, text) if len(c.strip()) > 20]

    # 滑动窗口合并短文本,每5个句子为一组
    # 这里有很多处理的办法 文本随便演示了一种
    final_chunks = []
    for i in range(0, len(chunks), 3):
        window = chunks[i:i+5]
        final_chunks.append(" ".join(window))

    # 打印分段结果
    for index,value in enumerate(final_chunks):
        print(f"第{index}个句子-->{value}")
    
    # 加载本地预训练的向量模型
    model = SentenceTransformer('C:/Users/Administrator/Desktop/easy-vectordb/model/bge-small-zh-v1.5')
    
    # 生成文本向量并进行L2归一化
    embeddings = model.encode(final_chunks)
    embeddings = normalize(embeddings, norm='l2')
    
    return final_chunks, embeddings

# 主程序入口
if __name__ == "__main__":
    path="C:/Users/Administrator/Desktop/easy-vectordb/code/Datawhale社区介绍.pdf"
    texts_list,embeddings=split_text(path)
    
     # 连接到Milvus向量数据库
    connections.connect(alias="default", uri="http://127.0.0.1:19530")

    # 定义向量表结构
    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),  # 主键字段
        FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=512),  # 512维浮点向量
        FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=2000)  # 原始文本内容
    ]

    # 创建集合(表)
    schema = CollectionSchema(
            fields,
            description="PDF知识库",
            enable_dynamic_field=True
        )
    collection = Collection("pdf_test", schema)

    # 创建混合索引
    vector_index = {
            "index_type": "IVF_FLAT",
            "metric_type": "IP",
            "params": {"nlist": 1024}
        }
        
    collection.create_index("vector", vector_index)
    collection.create_index("content", {"index_type": "TRIE"})

    # 批量插入(含动态字段)
    data = [
        [i for i in range(len(texts_list))], # 主键ID列表
        embeddings.tolist(), # 向量列表(转换为Python列表)
        texts_list # 原始文本列表
    ]

    # 执行插入并刷新内存
    collection.insert(data)
    collection.flush()
    collection.load()  # 将索引加载到内存

基于 MIT 许可发布