Skip to content

数据插入与管理

数据准备

单条数据插入

python
from pymilvus import Collection

# 获取集合
collection = Collection("article_search")

# 准备单条数据
data = {
    "article_vector": [0.1, 0.2, 0.3, ..., 0.128],  # 128维向量
    "title": "Milvus 向量数据库入门教程",
    "content": "Milvus 是一个开源的向量数据库...",
    "category": "技术",
    "publish_time": 1704067200,
    "read_count": 1000
}

# 插入数据
insert_result = collection.insert(data)
print(f"插入成功,ID: {insert_result.primary_keys}")

批量数据插入

python
import random

# 准备批量数据
entities = []
for i in range(1000):
    entity = {
        "article_vector": [random.random() for _ in range(128)],
        "title": f"文章标题_{i}",
        "content": f"文章内容_{i}",
        "category": random.choice(["技术", "生活", "娱乐", "新闻"]),
        "publish_time": 1704067200 + i * 3600,
        "read_count": random.randint(100, 10000)
    }
    entities.append(entity)

# 批量插入
insert_result = collection.insert(entities)
print(f"成功插入 {len(insert_result.primary_keys)} 条数据")

按字段批量插入

python
import random

# 准备数据(按字段组织)
article_vectors = [[random.random() for _ in range(128)] for _ in range(1000)]
titles = [f"文章标题_{i}" for i in range(1000)]
contents = [f"文章内容_{i}" for i in range(1000)]
categories = [random.choice(["技术", "生活", "娱乐"]) for _ in range(1000)]
publish_times = [1704067200 + i * 3600 for i in range(1000)]
read_counts = [random.randint(100, 10000) for _ in range(1000)]

# 插入数据
insert_result = collection.insert([
    article_vectors,
    titles,
    contents,
    categories,
    publish_times,
    read_counts
])

分区插入

插入到指定分区

python
# 插入到默认分区
collection.insert(data)

# 插入到指定分区
collection.insert(data, partition_name="tech_articles")

# 批量插入到分区
collection.insert(entities, partition_name="tech_articles")

数据查询

基本查询

python
# 查询所有数据(限制返回数量)
results = collection.query(
    expr="",  # 空表达式表示查询所有
    output_fields=["id", "title", "category"],
    limit=10
)

for result in results:
    print(f"ID: {result['id']}, 标题: {result['title']}")

条件查询

python
# 根据 ID 查询
results = collection.query(
    expr="id in [1, 2, 3, 4, 5]",
    output_fields=["id", "title", "category", "read_count"]
)

# 根据分类查询
results = collection.query(
    expr='category == "技术"',
    output_fields=["id", "title", "read_count"],
    limit=20
)

# 范围查询
results = collection.query(
    expr="read_count > 5000 and read_count < 8000",
    output_fields=["id", "title", "read_count"]
)

# 时间范围查询
results = collection.query(
    expr="publish_time >= 1704067200 and publish_time <= 1706745600",
    output_fields=["id", "title", "publish_time"]
)

表达式语法

操作符说明示例
==等于category == "技术"
!=不等于category != "广告"
>大于read_count > 1000
<小于read_count < 5000
>=大于等于publish_time >= 1704067200
<=小于等于publish_time <= 1706745600
in在列表中id in [1, 2, 3]
not in不在列表中id not in [4, 5, 6]
and逻辑与read_count > 1000 and category == "技术"
or逻辑或category == "技术" or category == "科学"

数据删除

根据 ID 删除

python
# 删除单条数据
collection.delete("id in [1]")

# 删除多条数据
collection.delete("id in [1, 2, 3, 4, 5]")

条件删除

python
# 根据条件删除
collection.delete('category == "测试"')

# 复杂条件删除
collection.delete('read_count < 100 and category == "草稿"')

# 删除过期数据
collection.delete("publish_time < 1704067200")

清空集合

python
# 删除所有数据(保留集合结构)
collection.delete("id >= 0")

# 或者重新创建集合
from pymilvus import utility
utility.drop_collection("article_search")
# 然后重新创建...

数据更新

Milvus 不支持直接更新操作,需要通过删除后重新插入实现:

python
def update_entity(collection, entity_id, new_data):
    """更新实体数据"""
    # 1. 删除旧数据
    collection.delete(f"id in [{entity_id}]")
    
    # 2. 准备新数据(保留原有 ID)
    new_data["id"] = entity_id
    
    # 3. 插入新数据
    collection.insert(new_data)
    
    print(f"实体 {entity_id} 更新成功")

# 使用示例
new_data = {
    "article_vector": [0.5, 0.6, 0.7, ...],
    "title": "更新后的标题",
    "category": "技术",
    "read_count": 2000
}
update_entity(collection, 1, new_data)

数据统计

获取集合统计信息

python
# 获取实体数量
count = collection.num_entities
print(f"集合中的实体数量: {count}")

# 获取集合统计信息
stats = collection.get_stats()
print(f"集合统计: {stats}")

查询统计

python
# 统计各分类文章数量
results = collection.query(
    expr='category == "技术"',
    output_fields=["id"]
)
tech_count = len(results)
print(f"技术类文章数量: {tech_count}")

数据导入导出

导出数据

python
import json

# 查询所有数据
results = collection.query(
    expr="",
    output_fields=["id", "title", "category", "read_count"],
    limit=10000
)

# 保存为 JSON
with open("articles.json", "w", encoding="utf-8") as f:
    json.dump(results, f, ensure_ascii=False, indent=2)

print(f"已导出 {len(results)} 条数据")

导入数据

python
import json

# 从 JSON 文件加载
with open("articles.json", "r", encoding="utf-8") as f:
    data = json.load(f)

# 批量插入
batch_size = 1000
for i in range(0, len(data), batch_size):
    batch = data[i:i + batch_size]
    collection.insert(batch)
    print(f"已导入第 {i//batch_size + 1} 批数据")

print(f"总共导入 {len(data)} 条数据")

批量操作最佳实践

高效批量插入

python
def batch_insert(collection, data_list, batch_size=1000):
    """高效批量插入数据"""
    total = len(data_list)
    for i in range(0, total, batch_size):
        batch = data_list[i:i + batch_size]
        collection.insert(batch)
        print(f"已插入 {min(i + batch_size, total)}/{total} 条数据")

# 使用示例
data_list = [...]  # 大量数据
batch_insert(collection, data_list, batch_size=1000)

批量删除

python
def batch_delete(collection, ids, batch_size=100):
    """批量删除数据"""
    for i in range(0, len(ids), batch_size):
        batch_ids = ids[i:i + batch_size]
        id_str = ",".join(map(str, batch_ids))
        collection.delete(f"id in [{id_str}]")
        print(f"已删除第 {i//batch_size + 1} 批数据")

# 使用示例
ids_to_delete = [1, 2, 3, ..., 1000]
batch_delete(collection, ids_to_delete)

完整示例

python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import random
import time

def demo_data_management():
    """数据管理完整示例"""
    
    # 连接 Milvus
    connections.connect(host="localhost", port="19530")
    
    # 创建集合
    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
        FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128),
        FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256),
        FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
        FieldSchema(name="score", dtype=DataType.FLOAT)
    ]
    
    schema = CollectionSchema(fields, "示例集合")
    collection = Collection("demo_collection", schema)
    
    print("=== 插入数据 ===")
    # 插入示例数据
    data = []
    categories = ["科技", "生活", "娱乐", "教育"]
    for i in range(100):
        data.append({
            "vector": [random.random() for _ in range(128)],
            "title": f"文章_{i}",
            "category": random.choice(categories),
            "score": random.uniform(0, 100)
        })
    
    insert_result = collection.insert(data)
    print(f"插入 {len(insert_result.primary_keys)} 条数据")
    
    print("\n=== 查询数据 ===")
    # 查询科技类文章
    results = collection.query(
        expr='category == "科技"',
        output_fields=["id", "title", "score"],
        limit=5
    )
    for r in results:
        print(f"ID: {r['id']}, 标题: {r['title']}, 评分: {r['score']:.2f}")
    
    print("\n=== 统计数据 ===")
    print(f"总数量: {collection.num_entities}")
    
    print("\n=== 删除数据 ===")
    # 删除评分低于 20 的数据
    collection.delete("score < 20")
    print(f"删除后数量: {collection.num_entities}")
    
    # 清理
    from pymilvus import utility
    utility.drop_collection("demo_collection")
    print("\n演示完成!")

if __name__ == "__main__":
    demo_data_management()

注意事项

  1. 批量大小: 建议每次插入 1000-10000 条数据,过大可能导致内存问题
  2. 主键冲突: 使用 auto_id=True 避免主键冲突
  3. 数据类型: 确保插入数据的类型与 Schema 定义一致
  4. 向量维度: 向量维度必须与 Schema 中定义的 dim 一致
  5. 字符编码: 字符串字段使用 UTF-8 编码

下一步

掌握数据插入与管理后,你可以:

  1. 学习向量搜索
  2. 了解索引优化
  3. 探索高级功能