Skip to content

应用案例

案例一:以图搜图系统

系统架构

用户上传图片


┌─────────────────┐
│  图片预处理      │  ← 尺寸调整、格式转换
└────────┬────────┘


┌─────────────────┐
│  特征提取        │  ← ResNet/CLIP 模型
│  (向量化)        │
└────────┬────────┘


┌─────────────────┐
│  Milvus 向量搜索 │  ← 相似图片检索
└────────┬────────┘


┌─────────────────┐
│  结果返回        │  ← 相似图片列表
└─────────────────┘

完整代码实现

python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
from PIL import Image
import torch
import torchvision.transforms as transforms
from torchvision import models
import numpy as np
import io
import base64

class ImageSearchSystem:
    """以图搜图系统"""
    
    def __init__(self, collection_name="image_search"):
        self.collection_name = collection_name
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # 加载预训练模型
        self.model = models.resnet50(pretrained=True)
        self.model = torch.nn.Sequential(*list(self.model.children())[:-1])
        self.model.to(self.device)
        self.model.eval()
        
        # 图像预处理
        self.transform = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])
        
        # 连接 Milvus
        connections.connect(host="localhost", port="19530")
        self._init_collection()
    
    def _init_collection(self):
        """初始化集合"""
        if utility.has_collection(self.collection_name):
            self.collection = Collection(self.collection_name)
        else:
            fields = [
                FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
                FieldSchema(name="image_vector", dtype=DataType.FLOAT_VECTOR, dim=2048),
                FieldSchema(name="image_url", dtype=DataType.VARCHAR, max_length=1024),
                FieldSchema(name="image_name", dtype=DataType.VARCHAR, max_length=256),
                FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
                FieldSchema(name="upload_time", dtype=DataType.INT64)
            ]
            
            schema = CollectionSchema(fields, "图片搜索集合")
            self.collection = Collection(self.collection_name, schema)
            
            # 创建索引
            index_params = {
                "index_type": "HNSW",
                "metric_type": "L2",
                "params": {"M": 16, "efConstruction": 200}
            }
            self.collection.create_index("image_vector", index_params)
    
    def extract_features(self, image_input):
        """提取图片特征向量"""
        # 支持文件路径、PIL Image 或字节流
        if isinstance(image_input, str):
            image = Image.open(image_input).convert('RGB')
        elif isinstance(image_input, bytes):
            image = Image.open(io.BytesIO(image_input)).convert('RGB')
        else:
            image = image_input.convert('RGB')
        
        # 预处理
        input_tensor = self.transform(image).unsqueeze(0).to(self.device)
        
        # 提取特征
        with torch.no_grad():
            features = self.model(input_tensor)
        
        # 转换为 numpy 数组并归一化
        features = features.squeeze().cpu().numpy()
        features = features / np.linalg.norm(features)
        
        return features.tolist()
    
    def add_image(self, image_input, image_url, image_name, category=""):
        """添加图片到索引"""
        import time
        
        # 提取特征
        features = self.extract_features(image_input)
        
        # 插入数据
        data = {
            "image_vector": features,
            "image_url": image_url,
            "image_name": image_name,
            "category": category,
            "upload_time": int(time.time())
        }
        
        insert_result = self.collection.insert(data)
        return insert_result.primary_keys[0]
    
    def search(self, image_input, top_k=10, category=None):
        """搜索相似图片"""
        # 提取查询图片特征
        query_vector = self.extract_features(image_input)
        
        # 构建过滤条件
        expr = None
        if category:
            expr = f'category == "{category}"'
        
        # 加载集合并搜索
        self.collection.load()
        
        search_params = {
            "metric_type": "L2",
            "params": {"ef": 64}
        }
        
        results = self.collection.search(
            data=[query_vector],
            anns_field="image_vector",
            param=search_params,
            limit=top_k,
            expr=expr,
            output_fields=["image_url", "image_name", "category"]
        )
        
        # 格式化结果
        search_results = []
        for hit in results[0]:
            search_results.append({
                "id": hit.id,
                "distance": hit.distance,
                "image_url": hit.entity.get("image_url"),
                "image_name": hit.entity.get("image_name"),
                "category": hit.entity.get("category")
            })
        
        return search_results
    
    def batch_add_images(self, image_list):
        """批量添加图片"""
        data_batch = []
        import time
        
        for img_info in image_list:
            features = self.extract_features(img_info["image"])
            data_batch.append({
                "image_vector": features,
                "image_url": img_info["url"],
                "image_name": img_info["name"],
                "category": img_info.get("category", ""),
                "upload_time": int(time.time())
            })
        
        # 批量插入
        insert_result = self.collection.insert(data_batch)
        return insert_result.primary_keys

# 使用示例
def demo_image_search():
    """图片搜索演示"""
    search_system = ImageSearchSystem()
    
    # 添加示例图片
    print("添加图片到索引...")
    # image_id = search_system.add_image(
    #     "path/to/image.jpg",
    #     "https://example.com/image.jpg",
    #     "示例图片",
    #     "风景"
    # )
    
    # 搜索相似图片
    print("\n搜索相似图片...")
    # results = search_system.search("path/to/query.jpg", top_k=5)
    # for result in results:
    #     print(f"图片: {result['image_name']}, 相似度: {1/(1+result['distance']):.4f}")

if __name__ == "__main__":
    demo_image_search()

案例二:智能问答系统

系统架构

用户问题


┌─────────────────┐
│  问题理解        │  ← 意图识别、实体提取
└────────┬────────┘


┌─────────────────┐
│  问题向量化      │  ← Embedding 模型
└────────┬────────┘


┌─────────────────┐
│  知识库检索      │  ← Milvus 语义搜索
└────────┬────────┘


┌─────────────────┐
│  答案生成        │  ← LLM 生成回答
└─────────────────┘

完整代码实现

python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
from sentence_transformers import SentenceTransformer
import numpy as np
import openai

class QA_System:
    """智能问答系统"""
    
    def __init__(self, collection_name="knowledge_base"):
        self.collection_name = collection_name
        
        # 加载 Embedding 模型
        self.embedding_model = SentenceTransformer('BAAI/bge-large-zh')
        
        # 设置 OpenAI API(用于生成回答)
        # openai.api_key = "your-api-key"
        
        # 连接 Milvus
        connections.connect(host="localhost", port="19530")
        self._init_collection()
    
    def _init_collection(self):
        """初始化知识库集合"""
        if utility.has_collection(self.collection_name):
            self.collection = Collection(self.collection_name)
        else:
            fields = [
                FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
                FieldSchema(name="question_vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
                FieldSchema(name="answer_vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
                FieldSchema(name="question", dtype=DataType.VARCHAR, max_length=1024),
                FieldSchema(name="answer", dtype=DataType.VARCHAR, max_length=8192),
                FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
                FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256)
            ]
            
            schema = CollectionSchema(fields, "知识库问答集合")
            self.collection = Collection(self.collection_name, schema)
            
            # 创建索引
            for field in ["question_vector", "answer_vector"]:
                index_params = {
                    "index_type": "HNSW",
                    "metric_type": "COSINE",
                    "params": {"M": 16, "efConstruction": 200}
                }
                self.collection.create_index(field, index_params)
    
    def get_embedding(self, text):
        """获取文本的向量表示"""
        embedding = self.embedding_model.encode(text, normalize_embeddings=True)
        return embedding.tolist()
    
    def add_knowledge(self, question, answer, category="general", source=""):
        """添加知识到知识库"""
        question_vector = self.get_embedding(question)
        answer_vector = self.get_embedding(answer)
        
        data = {
            "question_vector": question_vector,
            "answer_vector": answer_vector,
            "question": question,
            "answer": answer,
            "category": category,
            "source": source
        }
        
        insert_result = self.collection.insert(data)
        return insert_result.primary_keys[0]
    
    def search_similar_questions(self, query, top_k=5, category=None):
        """搜索相似问题"""
        query_vector = self.get_embedding(query)
        
        expr = None
        if category:
            expr = f'category == "{category}"'
        
        self.collection.load()
        
        search_params = {
            "metric_type": "COSINE",
            "params": {"ef": 64}
        }
        
        results = self.collection.search(
            data=[query_vector],
            anns_field="question_vector",
            param=search_params,
            limit=top_k,
            expr=expr,
            output_fields=["question", "answer", "category", "source"]
        )
        
        knowledge_items = []
        for hit in results[0]:
            knowledge_items.append({
                "id": hit.id,
                "similarity": hit.distance,
                "question": hit.entity.get("question"),
                "answer": hit.entity.get("answer"),
                "category": hit.entity.get("category"),
                "source": hit.entity.get("source")
            })
        
        return knowledge_items
    
    def generate_answer(self, query, use_rag=True):
        """生成回答"""
        if use_rag:
            # RAG (检索增强生成)
            relevant_knowledge = self.search_similar_questions(query, top_k=3)
            
            # 构建上下文
            context = "\n\n".join([
                f"Q: {item['question']}\nA: {item['answer']}"
                for item in relevant_knowledge
            ])
            
            prompt = f"""基于以下相关知识回答问题:

相关知识:
{context}

用户问题:{query}

请根据相关知识回答用户问题:"""
        else:
            prompt = query
        
        # 调用 LLM 生成回答(示例使用 OpenAI)
        # response = openai.ChatCompletion.create(
        #     model="gpt-3.5-turbo",
        #     messages=[
        #         {"role": "system", "content": "你是一个 helpful 的助手。"},
        #         {"role": "user", "content": prompt}
        #     ]
        # )
        # return response.choices[0].message.content
        
        # 模拟返回
        return f"基于检索到的 {len(relevant_knowledge) if use_rag else 0} 条知识生成的回答..."
    
    def batch_import_knowledge(self, knowledge_list):
        """批量导入知识"""
        data_batch = []
        
        for item in knowledge_list:
            data_batch.append({
                "question_vector": self.get_embedding(item["question"]),
                "answer_vector": self.get_embedding(item["answer"]),
                "question": item["question"],
                "answer": item["answer"],
                "category": item.get("category", "general"),
                "source": item.get("source", "")
            })
        
        # 分批插入
        batch_size = 100
        for i in range(0, len(data_batch), batch_size):
            batch = data_batch[i:i + batch_size]
            self.collection.insert(batch)
            print(f"已导入 {min(i + batch_size, len(data_batch))}/{len(data_batch)} 条知识")

# 使用示例
def demo_qa_system():
    """问答系统演示"""
    qa_system = QA_System()
    
    # 添加示例知识
    print("添加知识到知识库...")
    
    knowledge_data = [
        {
            "question": "什么是 Milvus?",
            "answer": "Milvus 是一个开源的向量数据库,专门用于存储、索引和管理大规模的向量数据。",
            "category": "技术",
            "source": "官方文档"
        },
        {
            "question": "Milvus 支持哪些索引类型?",
            "answer": "Milvus 支持 FLAT、IVF_FLAT、IVF_SQ8、IVF_PQ、HNSW、ANNOY 等多种索引类型。",
            "category": "技术",
            "source": "官方文档"
        },
        {
            "question": "如何安装 Milvus?",
            "answer": "可以使用 Docker Compose 快速安装 Milvus,或者使用 Kubernetes 进行分布式部署。",
            "category": "技术",
            "source": "安装指南"
        }
    ]
    
    qa_system.batch_import_knowledge(knowledge_data)
    
    # 测试问答
    print("\n测试问答...")
    query = "Milvus 是什么?"
    
    # 检索相似问题
    similar = qa_system.search_similar_questions(query, top_k=3)
    print(f"\n相似问题:")
    for item in similar:
        print(f"  - {item['question']} (相似度: {item['similarity']:.4f})")
    
    # 生成回答
    answer = qa_system.generate_answer(query, use_rag=True)
    print(f"\n生成的回答: {answer}")

if __name__ == "__main__":
    demo_qa_system()

案例三:推荐系统

系统架构

用户行为数据


┌─────────────────┐
│  用户画像构建    │  ← 行为分析、特征提取
└────────┬────────┘


┌─────────────────┐
│  物品向量化      │  ← 内容特征、协同过滤
└────────┬────────┘


┌─────────────────┐
│  相似度计算      │  ← Milvus 向量搜索
└────────┬────────┘


┌─────────────────┐
│  推荐生成        │  ← 个性化推荐列表
└─────────────────┘

完整代码实现

python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
import numpy as np
from sklearn.decomposition import TruncatedSVD
from scipy.sparse import csr_matrix
import pandas as pd

class RecommendationSystem:
    """基于向量搜索的推荐系统"""
    
    def __init__(self, collection_name="recommendation_items"):
        self.collection_name = collection_name
        
        # 连接 Milvus
        connections.connect(host="localhost", port="19530")
        self._init_collection()
        
        # 用户-物品交互矩阵
        self.user_item_matrix = None
        self.item_factors = None
    
    def _init_collection(self):
        """初始化物品集合"""
        if utility.has_collection(self.collection_name):
            self.collection = Collection(self.collection_name)
        else:
            fields = [
                FieldSchema(name="item_id", dtype=DataType.INT64, is_primary=True),
                FieldSchema(name="item_vector", dtype=DataType.FLOAT_VECTOR, dim=128),
                FieldSchema(name="item_name", dtype=DataType.VARCHAR, max_length=256),
                FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
                FieldSchema(name="price", dtype=DataType.FLOAT),
                FieldSchema(name="tags", dtype=DataType.ARRAY, 
                           element_type=DataType.VARCHAR, max_length=32, max_capacity=10)
            ]
            
            schema = CollectionSchema(fields, "推荐系统物品集合")
            self.collection = Collection(self.collection_name, schema)
            
            # 创建索引
            index_params = {
                "index_type": "HNSW",
                "metric_type": "IP",  # 内积适合推荐场景
                "params": {"M": 16, "efConstruction": 200}
            }
            self.collection.create_index("item_vector", index_params)
    
    def build_user_item_matrix(self, interactions_df):
        """
        构建用户-物品交互矩阵
        
        Args:
            interactions_df: DataFrame with columns [user_id, item_id, rating]
        """
        # 创建稀疏矩阵
        users = interactions_df['user_id'].unique()
        items = interactions_df['item_id'].unique()
        
        user_map = {u: i for i, u in enumerate(users)}
        item_map = {item: i for i, item in enumerate(items)}
        
        row = [user_map[u] for u in interactions_df['user_id']]
        col = [item_map[item] for item in interactions_df['item_id']]
        data = interactions_df['rating'].values
        
        self.user_item_matrix = csr_matrix(
            (data, (row, col)),
            shape=(len(users), len(items))
        )
        
        self.user_map = user_map
        self.item_map = item_map
        self.reverse_item_map = {v: k for k, v in item_map.items()}
        
        # 使用 SVD 降维
        svd = TruncatedSVD(n_components=128)
        self.item_factors = svd.fit_transform(self.user_item_matrix.T)
        
        print(f"用户-物品矩阵形状: {self.user_item_matrix.shape}")
        print(f"物品因子矩阵形状: {self.item_factors.shape}")
    
    def add_items(self, items_df):
        """
        添加物品到 Milvus
        
        Args:
            items_df: DataFrame with item information
        """
        if self.item_factors is None:
            raise ValueError("请先构建用户-物品矩阵")
        
        data_batch = []
        
        for idx, row in items_df.iterrows():
            item_id = row['item_id']
            
            # 获取物品的向量表示
            if item_id in self.item_map:
                vector = self.item_factors[self.item_map[item_id]].tolist()
            else:
                # 新物品使用随机向量或内容特征
                vector = np.random.randn(128).tolist()
            
            data_batch.append({
                "item_id": item_id,
                "item_vector": vector,
                "item_name": row.get('item_name', ''),
                "category": row.get('category', ''),
                "price": float(row.get('price', 0)),
                "tags": row.get('tags', [])
            })
        
        # 批量插入
        batch_size = 1000
        for i in range(0, len(data_batch), batch_size):
            batch = data_batch[i:i + batch_size]
            self.collection.insert(batch)
            print(f"已添加 {min(i + batch_size, len(data_batch))}/{len(data_batch)} 个物品")
    
    def get_user_vector(self, user_id):
        """获取用户的向量表示"""
        if user_id not in self.user_map:
            return None
        
        user_idx = self.user_map[user_id]
        user_vector = self.user_item_matrix[user_idx].toarray()[0]
        
        # 使用物品因子计算用户偏好向量
        user_pref = np.dot(user_vector, self.item_factors)
        
        # 归一化
        norm = np.linalg.norm(user_pref)
        if norm > 0:
            user_pref = user_pref / norm
        
        return user_pref.tolist()
    
    def recommend_for_user(self, user_id, top_k=10, filters=None):
        """为用户生成推荐"""
        user_vector = self.get_user_vector(user_id)
        
        if user_vector is None:
            # 新用户,返回热门物品
            return self.get_popular_items(top_k)
        
        # 构建过滤条件
        expr = None
        if filters:
            conditions = []
            if 'category' in filters:
                conditions.append(f'category == "{filters["category"]}"')
            if 'max_price' in filters:
                conditions.append(f'price <= {filters["max_price"]}')
            expr = ' and '.join(conditions)
        
        # 加载集合并搜索
        self.collection.load()
        
        search_params = {
            "metric_type": "IP",
            "params": {"ef": 64}
        }
        
        results = self.collection.search(
            data=[user_vector],
            anns_field="item_vector",
            param=search_params,
            limit=top_k,
            expr=expr,
            output_fields=["item_id", "item_name", "category", "price", "tags"]
        )
        
        recommendations = []
        for hit in results[0]:
            recommendations.append({
                "item_id": hit.entity.get("item_id"),
                "item_name": hit.entity.get("item_name"),
                "category": hit.entity.get("category"),
                "price": hit.entity.get("price"),
                "tags": hit.entity.get("tags"),
                "score": hit.distance  # IP 距离即相似度分数
            })
        
        return recommendations
    
    def get_similar_items(self, item_id, top_k=10):
        """获取相似物品"""
        # 查询物品的向量
        results = self.collection.query(
            expr=f"item_id == {item_id}",
            output_fields=["item_vector"],
            limit=1
        )
        
        if not results:
            return []
        
        item_vector = results[0]["item_vector"]
        
        # 搜索相似物品
        search_results = self.collection.search(
            data=[item_vector],
            anns_field="item_vector",
            param={"metric_type": "IP", "params": {"ef": 64}},
            limit=top_k + 1,  # +1 排除自身
            expr=f"item_id != {item_id}",
            output_fields=["item_id", "item_name", "category", "price"]
        )
        
        similar_items = []
        for hit in search_results[0]:
            similar_items.append({
                "item_id": hit.entity.get("item_id"),
                "item_name": hit.entity.get("item_name"),
                "category": hit.entity.get("category"),
                "price": hit.entity.get("price"),
                "similarity": hit.distance
            })
        
        return similar_items
    
    def get_popular_items(self, top_k=10):
        """获取热门物品"""
        # 计算每个物品的总评分
        item_scores = np.array(self.user_item_matrix.sum(axis=0))[0]
        popular_indices = np.argsort(item_scores)[-top_k:][::-1]
        
        popular_items = []
        for idx in popular_indices:
            item_id = self.reverse_item_map[idx]
            popular_items.append({
                "item_id": item_id,
                "score": item_scores[idx],
                "reason": "热门物品"
            })
        
        return popular_items

# 使用示例
def demo_recommendation_system():
    """推荐系统演示"""
    
    # 创建示例数据
    np.random.seed(42)
    
    # 用户-物品交互数据
    n_users = 1000
    n_items = 500
    
    interactions = []
    for _ in range(10000):
        user_id = np.random.randint(0, n_users)
        item_id = np.random.randint(0, n_items)
        rating = np.random.randint(1, 6)
        interactions.append({
            "user_id": user_id,
            "item_id": item_id,
            "rating": rating
        })
    
    interactions_df = pd.DataFrame(interactions)
    
    # 物品信息
    categories = ["电子产品", "服装", "食品", "图书", "家居"]
    items = []
    for i in range(n_items):
        items.append({
            "item_id": i,
            "item_name": f"商品_{i}",
            "category": np.random.choice(categories),
            "price": np.random.uniform(10, 1000),
            "tags": [f"标签_{j}" for j in range(np.random.randint(1, 5))]
        })
    
    items_df = pd.DataFrame(items)
    
    # 初始化推荐系统
    rec_system = RecommendationSystem()
    
    # 构建矩阵并添加物品
    print("构建用户-物品矩阵...")
    rec_system.build_user_item_matrix(interactions_df)
    
    print("\n添加物品到 Milvus...")
    rec_system.add_items(items_df)
    
    # 为用户生成推荐
    print("\n为用户生成推荐...")
    user_id = 42
    recommendations = rec_system.recommend_for_user(user_id, top_k=5)
    
    print(f"\n为用户 {user_id} 推荐:")
    for i, rec in enumerate(recommendations, 1):
        print(f"{i}. {rec['item_name']} ({rec['category']}) - "
              f{rec['price']:.2f} - 分数: {rec['score']:.4f}")
    
    # 查找相似物品
    print("\n查找相似物品...")
    similar = rec_system.get_similar_items(item_id=10, top_k=5)
    
    print(f"\n与商品 10 相似的物品:")
    for i, item in enumerate(similar, 1):
        print(f"{i}. {item['item_name']} ({item['category']}) - "
              f"相似度: {item['similarity']:.4f}")

if __name__ == "__main__":
    demo_recommendation_system()

案例四:文本语义搜索

完整代码实现

python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
from sentence_transformers import SentenceTransformer
import numpy as np

class SemanticSearchEngine:
    """文本语义搜索引擎"""
    
    def __init__(self, collection_name="semantic_search"):
        self.collection_name = collection_name
        
        # 加载多语言 Embedding 模型
        self.model = SentenceTransformer('BAAI/bge-m3')
        
        # 连接 Milvus
        connections.connect(host="localhost", port="19530")
        self._init_collection()
    
    def _init_collection(self):
        """初始化集合"""
        if utility.has_collection(self.collection_name):
            self.collection = Collection(self.collection_name)
        else:
            fields = [
                FieldSchema(name="doc_id", dtype=DataType.INT64, is_primary=True),
                FieldSchema(name="doc_vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
                FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=512),
                FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=8192),
                FieldSchema(name="author", dtype=DataType.VARCHAR, max_length=128),
                FieldSchema(name="publish_date", dtype=DataType.INT64),
                FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
                FieldSchema(name="keywords", dtype=DataType.ARRAY,
                           element_type=DataType.VARCHAR, max_length=32, max_capacity=20)
            ]
            
            schema = CollectionSchema(fields, "语义搜索文档集合")
            self.collection = Collection(self.collection_name, schema)
            
            # 创建索引
            index_params = {
                "index_type": "HNSW",
                "metric_type": "COSINE",
                "params": {"M": 16, "efConstruction": 200}
            }
            self.collection.create_index("doc_vector", index_params)
    
    def encode_text(self, text):
        """将文本编码为向量"""
        embedding = self.model.encode(text, normalize_embeddings=True)
        return embedding.tolist()
    
    def add_document(self, doc_id, title, content, author="", 
                     publish_date=0, category="", keywords=None):
        """添加文档"""
        # 组合标题和内容进行编码
        full_text = f"{title} {content[:500]}"  # 取内容前500字符
        doc_vector = self.encode_text(full_text)
        
        data = {
            "doc_id": doc_id,
            "doc_vector": doc_vector,
            "title": title,
            "content": content[:8192],  # 限制长度
            "author": author,
            "publish_date": publish_date,
            "category": category,
            "keywords": keywords or []
        }
        
        self.collection.insert(data)
    
    def search(self, query, top_k=10, filters=None):
        """语义搜索"""
        query_vector = self.encode_text(query)
        
        # 构建过滤条件
        expr = None
        if filters:
            conditions = []
            if 'category' in filters:
                conditions.append(f'category == "{filters["category"]}"')
            if 'author' in filters:
                conditions.append(f'author == "{filters["author"]}"')
            if 'date_range' in filters:
                start, end = filters['date_range']
                conditions.append(f'publish_date >= {start} and publish_date <= {end}')
            expr = ' and '.join(conditions)
        
        self.collection.load()
        
        search_params = {
            "metric_type": "COSINE",
            "params": {"ef": 64}
        }
        
        results = self.collection.search(
            data=[query_vector],
            anns_field="doc_vector",
            param=search_params,
            limit=top_k,
            expr=expr,
            output_fields=["doc_id", "title", "content", "author", 
                          "category", "keywords"]
        )
        
        search_results = []
        for hit in results[0]:
            search_results.append({
                "doc_id": hit.entity.get("doc_id"),
                "title": hit.entity.get("title"),
                "content_preview": hit.entity.get("content")[:200] + "...",
                "author": hit.entity.get("author"),
                "category": hit.entity.get("category"),
                "keywords": hit.entity.get("keywords"),
                "similarity": hit.distance
            })
        
        return search_results
    
    def hybrid_search(self, query, keywords=None, top_k=10):
        """混合搜索:语义 + 关键词"""
        # 语义搜索结果
        semantic_results = self.search(query, top_k=top_k * 2)
        
        if not keywords:
            return semantic_results[:top_k]
        
        # 关键词匹配加分
        scored_results = []
        for result in semantic_results:
            score = result['similarity']
            
            # 关键词匹配加分
            content = result.get('content_preview', '').lower()
            title = result.get('title', '').lower()
            
            for keyword in keywords:
                keyword = keyword.lower()
                if keyword in title:
                    score += 0.1
                if keyword in content:
                    score += 0.05
            
            result['final_score'] = score
            scored_results.append(result)
        
        # 按最终分数排序
        scored_results.sort(key=lambda x: x['final_score'], reverse=True)
        
        return scored_results[:top_k]

# 使用示例
def demo_semantic_search():
    """语义搜索演示"""
    
    search_engine = SemanticSearchEngine()
    
    # 添加示例文档
    print("添加文档...")
    
    documents = [
        {
            "doc_id": 1,
            "title": "Milvus 向量数据库入门指南",
            "content": "Milvus 是一个开源的向量数据库,专门用于存储和检索大规模向量数据...",
            "author": "张三",
            "category": "技术",
            "keywords": ["Milvus", "向量数据库", "AI"]
        },
        {
            "doc_id": 2,
            "title": "深度学习在图像识别中的应用",
            "content": "深度学习技术已经在图像识别领域取得了巨大成功...",
            "author": "李四",
            "category": "技术",
            "keywords": ["深度学习", "图像识别", "AI"]
        },
        {
            "doc_id": 3,
            "title": "自然语言处理最新进展",
            "content": "近年来,自然语言处理技术发展迅速,大语言模型成为研究热点...",
            "author": "王五",
            "category": "技术",
            "keywords": ["NLP", "大语言模型", "AI"]
        }
    ]
    
    for doc in documents:
        search_engine.add_document(**doc)
    
    # 语义搜索
    print("\n语义搜索演示...")
    query = "如何存储和搜索向量数据"
    results = search_engine.search(query, top_k=3)
    
    print(f"\n查询: {query}")
    print("搜索结果:")
    for i, result in enumerate(results, 1):
        print(f"{i}. {result['title']} (相似度: {result['similarity']:.4f})")
        print(f"   作者: {result['author']}, 分类: {result['category']}")
    
    # 混合搜索
    print("\n混合搜索演示...")
    query = "AI 技术应用"
    results = search_engine.hybrid_search(query, keywords=["深度学习", "图像"], top_k=3)
    
    print(f"\n查询: {query}")
    print("混合搜索结果:")
    for i, result in enumerate(results, 1):
        print(f"{i}. {result['title']} (最终分数: {result['final_score']:.4f})")

if __name__ == "__main__":
    demo_semantic_search()

总结

以上案例展示了 Milvus 在不同场景下的应用:

  1. 以图搜图: 使用深度学习模型提取图像特征,实现视觉相似度搜索
  2. 智能问答: 结合 Embedding 模型和大语言模型,实现语义理解和回答生成
  3. 推荐系统: 基于协同过滤和向量搜索,实现个性化推荐
  4. 文本语义搜索: 利用语义向量实现超越关键词匹配的搜索体验

这些案例可以根据实际业务需求进行扩展和定制。