Appearance
数据插入与管理
数据准备
单条数据插入
python
from pymilvus import Collection
# 获取集合
collection = Collection("article_search")
# 准备单条数据
data = {
"article_vector": [0.1, 0.2, 0.3, ..., 0.128], # 128维向量
"title": "Milvus 向量数据库入门教程",
"content": "Milvus 是一个开源的向量数据库...",
"category": "技术",
"publish_time": 1704067200,
"read_count": 1000
}
# 插入数据
insert_result = collection.insert(data)
print(f"插入成功,ID: {insert_result.primary_keys}")批量数据插入
python
import random
# 准备批量数据
entities = []
for i in range(1000):
entity = {
"article_vector": [random.random() for _ in range(128)],
"title": f"文章标题_{i}",
"content": f"文章内容_{i}",
"category": random.choice(["技术", "生活", "娱乐", "新闻"]),
"publish_time": 1704067200 + i * 3600,
"read_count": random.randint(100, 10000)
}
entities.append(entity)
# 批量插入
insert_result = collection.insert(entities)
print(f"成功插入 {len(insert_result.primary_keys)} 条数据")按字段批量插入
python
import random
# 准备数据(按字段组织)
article_vectors = [[random.random() for _ in range(128)] for _ in range(1000)]
titles = [f"文章标题_{i}" for i in range(1000)]
contents = [f"文章内容_{i}" for i in range(1000)]
categories = [random.choice(["技术", "生活", "娱乐"]) for _ in range(1000)]
publish_times = [1704067200 + i * 3600 for i in range(1000)]
read_counts = [random.randint(100, 10000) for _ in range(1000)]
# 插入数据
insert_result = collection.insert([
article_vectors,
titles,
contents,
categories,
publish_times,
read_counts
])分区插入
插入到指定分区
python
# 插入到默认分区
collection.insert(data)
# 插入到指定分区
collection.insert(data, partition_name="tech_articles")
# 批量插入到分区
collection.insert(entities, partition_name="tech_articles")数据查询
基本查询
python
# 查询所有数据(限制返回数量)
results = collection.query(
expr="", # 空表达式表示查询所有
output_fields=["id", "title", "category"],
limit=10
)
for result in results:
print(f"ID: {result['id']}, 标题: {result['title']}")条件查询
python
# 根据 ID 查询
results = collection.query(
expr="id in [1, 2, 3, 4, 5]",
output_fields=["id", "title", "category", "read_count"]
)
# 根据分类查询
results = collection.query(
expr='category == "技术"',
output_fields=["id", "title", "read_count"],
limit=20
)
# 范围查询
results = collection.query(
expr="read_count > 5000 and read_count < 8000",
output_fields=["id", "title", "read_count"]
)
# 时间范围查询
results = collection.query(
expr="publish_time >= 1704067200 and publish_time <= 1706745600",
output_fields=["id", "title", "publish_time"]
)表达式语法
| 操作符 | 说明 | 示例 |
|---|---|---|
== | 等于 | category == "技术" |
!= | 不等于 | category != "广告" |
> | 大于 | read_count > 1000 |
< | 小于 | read_count < 5000 |
>= | 大于等于 | publish_time >= 1704067200 |
<= | 小于等于 | publish_time <= 1706745600 |
in | 在列表中 | id in [1, 2, 3] |
not in | 不在列表中 | id not in [4, 5, 6] |
and | 逻辑与 | read_count > 1000 and category == "技术" |
or | 逻辑或 | category == "技术" or category == "科学" |
数据删除
根据 ID 删除
python
# 删除单条数据
collection.delete("id in [1]")
# 删除多条数据
collection.delete("id in [1, 2, 3, 4, 5]")条件删除
python
# 根据条件删除
collection.delete('category == "测试"')
# 复杂条件删除
collection.delete('read_count < 100 and category == "草稿"')
# 删除过期数据
collection.delete("publish_time < 1704067200")清空集合
python
# 删除所有数据(保留集合结构)
collection.delete("id >= 0")
# 或者重新创建集合
from pymilvus import utility
utility.drop_collection("article_search")
# 然后重新创建...数据更新
Milvus 不支持直接更新操作,需要通过删除后重新插入实现:
python
def update_entity(collection, entity_id, new_data):
"""更新实体数据"""
# 1. 删除旧数据
collection.delete(f"id in [{entity_id}]")
# 2. 准备新数据(保留原有 ID)
new_data["id"] = entity_id
# 3. 插入新数据
collection.insert(new_data)
print(f"实体 {entity_id} 更新成功")
# 使用示例
new_data = {
"article_vector": [0.5, 0.6, 0.7, ...],
"title": "更新后的标题",
"category": "技术",
"read_count": 2000
}
update_entity(collection, 1, new_data)数据统计
获取集合统计信息
python
# 获取实体数量
count = collection.num_entities
print(f"集合中的实体数量: {count}")
# 获取集合统计信息
stats = collection.get_stats()
print(f"集合统计: {stats}")查询统计
python
# 统计各分类文章数量
results = collection.query(
expr='category == "技术"',
output_fields=["id"]
)
tech_count = len(results)
print(f"技术类文章数量: {tech_count}")数据导入导出
导出数据
python
import json
# 查询所有数据
results = collection.query(
expr="",
output_fields=["id", "title", "category", "read_count"],
limit=10000
)
# 保存为 JSON
with open("articles.json", "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"已导出 {len(results)} 条数据")导入数据
python
import json
# 从 JSON 文件加载
with open("articles.json", "r", encoding="utf-8") as f:
data = json.load(f)
# 批量插入
batch_size = 1000
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
collection.insert(batch)
print(f"已导入第 {i//batch_size + 1} 批数据")
print(f"总共导入 {len(data)} 条数据")批量操作最佳实践
高效批量插入
python
def batch_insert(collection, data_list, batch_size=1000):
"""高效批量插入数据"""
total = len(data_list)
for i in range(0, total, batch_size):
batch = data_list[i:i + batch_size]
collection.insert(batch)
print(f"已插入 {min(i + batch_size, total)}/{total} 条数据")
# 使用示例
data_list = [...] # 大量数据
batch_insert(collection, data_list, batch_size=1000)批量删除
python
def batch_delete(collection, ids, batch_size=100):
"""批量删除数据"""
for i in range(0, len(ids), batch_size):
batch_ids = ids[i:i + batch_size]
id_str = ",".join(map(str, batch_ids))
collection.delete(f"id in [{id_str}]")
print(f"已删除第 {i//batch_size + 1} 批数据")
# 使用示例
ids_to_delete = [1, 2, 3, ..., 1000]
batch_delete(collection, ids_to_delete)完整示例
python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import random
import time
def demo_data_management():
"""数据管理完整示例"""
# 连接 Milvus
connections.connect(host="localhost", port="19530")
# 创建集合
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128),
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="score", dtype=DataType.FLOAT)
]
schema = CollectionSchema(fields, "示例集合")
collection = Collection("demo_collection", schema)
print("=== 插入数据 ===")
# 插入示例数据
data = []
categories = ["科技", "生活", "娱乐", "教育"]
for i in range(100):
data.append({
"vector": [random.random() for _ in range(128)],
"title": f"文章_{i}",
"category": random.choice(categories),
"score": random.uniform(0, 100)
})
insert_result = collection.insert(data)
print(f"插入 {len(insert_result.primary_keys)} 条数据")
print("\n=== 查询数据 ===")
# 查询科技类文章
results = collection.query(
expr='category == "科技"',
output_fields=["id", "title", "score"],
limit=5
)
for r in results:
print(f"ID: {r['id']}, 标题: {r['title']}, 评分: {r['score']:.2f}")
print("\n=== 统计数据 ===")
print(f"总数量: {collection.num_entities}")
print("\n=== 删除数据 ===")
# 删除评分低于 20 的数据
collection.delete("score < 20")
print(f"删除后数量: {collection.num_entities}")
# 清理
from pymilvus import utility
utility.drop_collection("demo_collection")
print("\n演示完成!")
if __name__ == "__main__":
demo_data_management()注意事项
- 批量大小: 建议每次插入 1000-10000 条数据,过大可能导致内存问题
- 主键冲突: 使用
auto_id=True避免主键冲突 - 数据类型: 确保插入数据的类型与 Schema 定义一致
- 向量维度: 向量维度必须与 Schema 中定义的
dim一致 - 字符编码: 字符串字段使用 UTF-8 编码
下一步
掌握数据插入与管理后,你可以: