Appearance
分区与别名管理
分区(Partition)概述
分区是 Collection 的逻辑子集,用于组织和管理数据。通过合理使用分区,可以提升查询性能并简化数据管理。
分区的优势
- 查询优化: 只在相关分区中搜索,减少数据扫描量
- 数据隔离: 按业务逻辑隔离数据
- 生命周期管理: 可以单独删除过期分区
- 并行操作: 不同分区可以并行处理
分区操作
创建分区
python
from pymilvus import Collection
collection = Collection("article_search")
# 创建分区
collection.create_partition("tech_articles")
collection.create_partition("life_articles")
collection.create_partition("entertainment_articles")
print("分区创建成功")列出分区
python
# 获取所有分区
partitions = collection.partitions
for partition in partitions:
print(f"分区名: {partition.name}")
print(f"描述: {partition.description}")
print(f"是否已加载: {partition.is_loaded}")
print("---")检查分区是否存在
python
# 检查分区是否存在
exists = collection.has_partition("tech_articles")
print(f"分区是否存在: {exists}")删除分区
python
# 删除分区(会删除分区中的所有数据)
collection.drop_partition("old_articles")
# 删除前确认
if collection.has_partition("old_articles"):
collection.drop_partition("old_articles")
print("分区已删除")加载和释放分区
python
# 加载指定分区
collection.load(partition_names=["tech_articles"])
# 加载多个分区
collection.load(partition_names=["tech_articles", "life_articles"])
# 释放指定分区
collection.release_partition("tech_articles")
# 释放多个分区
collection.release_partition(["tech_articles", "life_articles"])分区数据操作
向分区插入数据
python
# 插入到指定分区
data = {
"article_vector": [0.1, 0.2, ..., 0.128],
"title": "技术文章标题",
"category": "技术"
}
collection.insert(data, partition_name="tech_articles")
# 批量插入到分区
entities = [...] # 多条数据
collection.insert(entities, partition_name="tech_articles")分区查询
python
# 查询指定分区的数据
results = collection.query(
expr="",
partition_names=["tech_articles"],
output_fields=["title", "category"],
limit=10
)
# 搜索指定分区
results = collection.search(
data=[query_vector],
anns_field="article_vector",
param=search_params,
partition_names=["tech_articles"],
limit=10
)分区策略
按时间分区
python
def create_time_partitions(collection, years):
"""按年份创建分区"""
for year in years:
partition_name = f"articles_{year}"
if not collection.has_partition(partition_name):
collection.create_partition(partition_name)
print(f"创建分区: {partition_name}")
# 使用示例
create_time_partitions(collection, [2022, 2023, 2024])按类别分区
python
def create_category_partitions(collection, categories):
"""按类别创建分区"""
for category in categories:
partition_name = f"category_{category}"
if not collection.has_partition(partition_name):
collection.create_partition(partition_name)
# 使用示例
categories = ["科技", "生活", "娱乐", "教育", "体育"]
create_category_partitions(collection, categories)按数据热度分区
python
# 创建热数据和冷数据分区
collection.create_partition("hot_data") # 频繁访问的数据
collection.create_partition("warm_data") # 偶尔访问的数据
collection.create_partition("cold_data") # 很少访问的数据
# 根据访问频率将数据插入不同分区
def insert_by_heat(data, access_frequency):
if access_frequency > 1000:
collection.insert(data, partition_name="hot_data")
elif access_frequency > 100:
collection.insert(data, partition_name="warm_data")
else:
collection.insert(data, partition_name="cold_data")别名(Alias)管理
什么是别名
别名是集合的替代名称,可以在不修改代码的情况下切换底层集合。常用于:
- 蓝绿部署
- A/B 测试
- 版本切换
创建别名
python
from pymilvus import utility
# 为集合创建别名
utility.create_alias(
collection_name="article_search_v1",
alias="article_search"
)
print("别名创建成功")使用别名
python
from pymilvus import Collection
# 通过别名访问集合(与直接使用集合名相同)
collection = Collection("article_search")
# 执行操作...
results = collection.search(...)列出别名
python
# 获取集合的所有别名
aliases = utility.list_aliases(collection_name="article_search_v1")
print(f"别名列表: {aliases}")删除别名
python
# 删除别名
utility.drop_alias(alias="article_search")
# 删除前确认
aliases = utility.list_aliases("article_search_v1")
if "article_search" in aliases:
utility.drop_alias("article_search")别名切换(蓝绿部署)
python
def switch_alias(alias_name, new_collection_name):
"""切换别名到新的集合"""
# 获取当前别名指向的集合
old_aliases = utility.list_aliases()
# 删除旧别名
if alias_name in old_aliases:
utility.drop_alias(alias_name)
# 创建新别名
utility.create_alias(new_collection_name, alias_name)
print(f"别名 '{alias_name}' 已切换到 '{new_collection_name}'")
# 蓝绿部署示例
# 1. 准备新版本集合
# utility.create_collection("article_search_v2", schema)
# ... 插入数据,创建索引 ...
# 2. 切换别名
switch_alias("article_search", "article_search_v2")
# 3. 验证无误后,可以删除旧版本
# utility.drop_collection("article_search_v1")完整示例
python
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
import random
def partition_and_alias_demo():
"""分区和别名管理完整示例"""
# 连接 Milvus
connections.connect(host="localhost", port="19530")
# 清理旧数据
if utility.has_collection("news_collection"):
utility.drop_collection("news_collection")
# 创建集合
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=128),
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="publish_date", dtype=DataType.INT64)
]
schema = CollectionSchema(fields, "新闻集合")
collection = Collection("news_collection", schema)
print("=== 创建分区 ===")
# 按类别创建分区
categories = ["科技", "财经", "体育", "娱乐"]
for category in categories:
partition_name = f"news_{category}"
collection.create_partition(partition_name)
print(f"创建分区: {partition_name}")
print("\n=== 向分区插入数据 ===")
# 向不同分区插入数据
for i, category in enumerate(categories):
partition_name = f"news_{category}"
data = []
for j in range(100):
data.append({
"vector": [random.random() for _ in range(128)],
"title": f"{category}新闻_{j}",
"category": category,
"publish_date": 1704067200 + i * 86400 + j * 3600
})
collection.insert(data, partition_name=partition_name)
print(f"向分区 '{partition_name}' 插入 {len(data)} 条数据")
print("\n=== 列出所有分区 ===")
partitions = collection.partitions
for partition in partitions:
print(f"分区: {partition.name}")
print("\n=== 创建索引并加载 ===")
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 128}
}
collection.create_index("vector", index_params)
utility.wait_for_index_building_complete("news_collection")
# 只加载科技新闻分区
collection.load(partition_names=["news_科技"])
print("\n=== 分区搜索 ===")
query_vector = [random.random() for _ in range(128)]
# 在科技分区搜索
results = collection.search(
data=[query_vector],
anns_field="vector",
param={"metric_type": "L2", "params": {"nprobe": 16}},
partition_names=["news_科技"],
limit=5,
output_fields=["title", "category"]
)
print("科技分区搜索结果:")
for hit in results[0]:
print(f" {hit.entity.get('title')} - {hit.entity.get('category')}")
print("\n=== 别名管理 ===")
# 创建别名
utility.create_alias("news_collection", "news")
print("创建别名: news -> news_collection")
# 通过别名访问
alias_collection = Collection("news")
print(f"通过别名访问,集合名称: {alias_collection.name}")
# 列出别名
aliases = utility.list_aliases("news_collection")
print(f"别名列表: {aliases}")
# 清理
utility.drop_alias("news")
utility.drop_collection("news_collection")
print("\n演示完成!")
if __name__ == "__main__":
partition_and_alias_demo()最佳实践
分区设计原则
- 避免过多分区: 建议每个集合不超过 4096 个分区
- 合理选择分区键: 选择查询时经常用于过滤的字段
- 分区大小均衡: 避免某些分区过大或过小
- 定期清理: 及时删除过期分区释放资源
别名使用建议
- 生产环境使用别名: 便于无缝切换
- 版本管理: 使用版本号命名集合,别名指向当前版本
- 备份策略: 保留旧版本直到确认新版本稳定
性能优化
python
# 只加载需要的分区
collection.load(partition_names=["hot_data", "recent_data"])
# 搜索时指定分区
results = collection.search(
data=[query_vector],
anns_field="vector",
param=search_params,
partition_names=["partition_2024"], # 只搜索最新分区
limit=10
)下一步
掌握分区和别名管理后,你可以: