Appearance
数据备份与恢复
备份策略概述
为什么需要备份
- 数据安全: 防止数据丢失
- 灾难恢复: 应对硬件故障、误操作
- 版本回滚: 支持数据版本管理
- 迁移需求: 支持数据迁移和复制
Milvus 数据组成
Milvus 数据
├── 元数据 (Metadata) - 存储在 etcd
│ ├── Collection Schema
│ ├── Index 信息
│ └── 分区信息
│
├── 向量数据 (Vector Data) - 存储在对象存储
│ ├── 插入的数据
│ └── 索引文件
│
└── 日志数据 (Log) - 存储在消息队列
└── 操作日志Milvus Backup 工具
安装 Milvus Backup
bash
# 下载 Milvus Backup
git clone https://github.com/zilliztech/milvus-backup.git
cd milvus-backup
# 编译
make
# 或者使用 Docker
docker pull milvusdb/milvus-backup:latest配置文件
yaml
# backup.yaml
milvus:
address: localhost
port: 19530
authorization: "root:Milvus"
tlsMode: 0
minio:
address: localhost
port: 9000
accessKeyID: minioadmin
secretAccessKey: minioadmin
useSSL: false
bucketName: a-bucket
rootPath: files
backup:
storageType: local
rootPath: /var/lib/milvus-backup备份操作
完整备份
bash
# 备份所有集合
./milvus-backup create -n full_backup_$(date +%Y%m%d)
# 查看备份列表
./milvus-backup list指定集合备份
bash
# 备份单个集合
./milvus-backup create -n article_backup -c article_search
# 备份多个集合
./milvus-backup create -n multi_backup -c "collection1,collection2,collection3"Python 备份脚本
python
import subprocess
import datetime
import json
def backup_collection(collection_name, backup_name=None):
"""备份指定集合"""
if backup_name is None:
backup_name = f"{collection_name}_backup_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
cmd = [
"./milvus-backup", "create",
"-n", backup_name,
"-c", collection_name
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"备份成功: {backup_name}")
return backup_name
else:
print(f"备份失败: {result.stderr}")
return None
def backup_all(backup_name=None):
"""备份所有集合"""
if backup_name is None:
backup_name = f"full_backup_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
cmd = ["./milvus-backup", "create", "-n", backup_name]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"全量备份成功: {backup_name}")
return backup_name
else:
print(f"备份失败: {result.stderr}")
return None
def list_backups():
"""列出所有备份"""
cmd = ["./milvus-backup", "list"]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
backups = json.loads(result.stdout)
return backups
else:
print(f"获取备份列表失败: {result.stderr}")
return []恢复操作
从备份恢复
bash
# 恢复整个备份
./milvus-backup restore -n full_backup_20240115
# 恢复指定集合
./milvus-backup restore -n article_backup -c article_search
# 恢复到新名称
./milvus-backup restore -n article_backup -c article_search -s article_search_restoredPython 恢复脚本
python
def restore_backup(backup_name, collections=None, suffix=""):
"""从备份恢复"""
cmd = ["./milvus-backup", "restore", "-n", backup_name]
if collections:
cmd.extend(["-c", collections])
if suffix:
cmd.extend(["-s", suffix])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"恢复成功: {backup_name}")
return True
else:
print(f"恢复失败: {result.stderr}")
return False
def restore_to_new_name(backup_name, collection_name, new_name):
"""恢复到新集合名称"""
return restore_backup(
backup_name,
collections=collection_name,
suffix=f"_restored_{new_name}"
)手动备份方案
导出集合 Schema
python
from pymilvus import connections, Collection, utility
import json
def export_schema(collection_name, output_file):
"""导出集合 Schema"""
connections.connect(host="localhost", port="19530")
collection = Collection(collection_name)
schema_info = {
"name": collection.name,
"description": collection.description,
"fields": []
}
for field in collection.schema.fields:
field_info = {
"name": field.name,
"dtype": str(field.dtype),
"is_primary": field.is_primary,
"auto_id": field.auto_id,
"params": field.params
}
schema_info["fields"].append(field_info)
# 保存到文件
with open(output_file, "w", encoding="utf-8") as f:
json.dump(schema_info, f, ensure_ascii=False, indent=2)
print(f"Schema 已导出到: {output_file}")
# 使用示例
export_schema("article_search", "article_search_schema.json")导出数据
python
def export_data(collection_name, output_file, batch_size=1000):
"""导出集合数据"""
connections.connect(host="localhost", port="19530")
collection = Collection(collection_name)
collection.load()
# 获取所有字段名
field_names = [field.name for field in collection.schema.fields]
all_data = []
offset = 0
while True:
# 分批查询数据
results = collection.query(
expr="",
output_fields=field_names,
offset=offset,
limit=batch_size
)
if not results:
break
all_data.extend(results)
offset += batch_size
if len(results) < batch_size:
break
# 保存到 JSON 文件
with open(output_file, "w", encoding="utf-8") as f:
json.dump(all_data, f, ensure_ascii=False, indent=2)
print(f"数据已导出到: {output_file}")
print(f"共导出 {len(all_data)} 条记录")
# 使用示例
export_data("article_search", "article_search_data.json")导入数据
python
def import_data(collection_name, schema_file, data_file):
"""导入数据到集合"""
connections.connect(host="localhost", port="19530")
# 读取 Schema
with open(schema_file, "r", encoding="utf-8") as f:
schema_info = json.load(f)
# 检查集合是否存在
if utility.has_collection(collection_name):
print(f"集合 {collection_name} 已存在")
return
# 重建 Schema
from pymilvus import FieldSchema, CollectionSchema, Collection, DataType
fields = []
for field_info in schema_info["fields"]:
field = FieldSchema(
name=field_info["name"],
dtype=getattr(DataType, field_info["dtype"].split(".")[-1]),
is_primary=field_info.get("is_primary", False),
auto_id=field_info.get("auto_id", False),
**field_info.get("params", {})
)
fields.append(field)
schema = CollectionSchema(
fields=fields,
description=schema_info.get("description", "")
)
collection = Collection(collection_name, schema)
# 读取数据
with open(data_file, "r", encoding="utf-8") as f:
data = json.load(f)
# 分批插入
batch_size = 1000
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
collection.insert(batch)
print(f"已插入 {min(i + batch_size, len(data))}/{len(data)} 条数据")
print(f"数据导入完成,共 {len(data)} 条记录")
# 使用示例
import_data("article_search_new", "article_search_schema.json", "article_search_data.json")定时备份策略
使用 Cron 定时备份
bash
# 编辑 crontab
crontab -e
# 每天凌晨 2 点执行全量备份
0 2 * * * /path/to/milvus-backup create -n full_backup_$(date +\%Y\%m\%d)
# 每周日执行清理,保留最近 30 天的备份
0 3 * * 0 find /path/to/backups -name "full_backup_*" -mtime +30 -deletePython 定时备份脚本
python
import schedule
import time
import datetime
import os
def daily_backup():
"""每日备份"""
backup_name = f"daily_backup_{datetime.datetime.now().strftime('%Y%m%d')}"
backup_all(backup_name)
# 清理旧备份(保留 7 天)
cleanup_old_backups(days=7)
def weekly_backup():
"""每周备份"""
backup_name = f"weekly_backup_{datetime.datetime.now().strftime('%Y%m%d')}"
backup_all(backup_name)
# 清理旧备份(保留 30 天)
cleanup_old_backups(days=30)
def cleanup_old_backups(days):
"""清理旧备份"""
backup_dir = "/var/lib/milvus-backup"
for filename in os.listdir(backup_dir):
filepath = os.path.join(backup_dir, filename)
file_time = datetime.datetime.fromtimestamp(os.path.getctime(filepath))
if datetime.datetime.now() - file_time > datetime.timedelta(days=days):
os.remove(filepath)
print(f"删除旧备份: {filename}")
# 设置定时任务
schedule.every().day.at("02:00").do(daily_backup)
schedule.every().sunday.at("03:00").do(weekly_backup)
# 运行调度器
while True:
schedule.run_pending()
time.sleep(60)备份验证
验证备份完整性
python
def verify_backup(backup_name):
"""验证备份完整性"""
cmd = ["./milvus-backup", "check", "-n", backup_name]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"备份 {backup_name} 验证通过")
return True
else:
print(f"备份验证失败: {result.stderr}")
return False测试恢复流程
python
def test_restore(backup_name, test_collection_name):
"""测试恢复流程"""
# 恢复到临时集合
temp_name = f"test_restore_{int(time.time())}"
success = restore_backup(backup_name, test_collection_name, suffix=f"_{temp_name}")
if success:
# 验证数据
connections.connect(host="localhost", port="19530")
collection = Collection(f"{test_collection_name}_{temp_name}")
count = collection.num_entities
print(f"恢复后的数据量: {count}")
# 清理测试集合
utility.drop_collection(f"{test_collection_name}_{temp_name}")
return True
return False灾难恢复流程
python
def disaster_recovery(backup_name, target_collections=None):
"""灾难恢复流程"""
print("=== 开始灾难恢复 ===")
# 1. 停止写入服务
print("1. 停止写入服务...")
# stop_write_services()
# 2. 备份当前状态(如果可能)
print("2. 备份当前状态...")
emergency_backup = f"emergency_backup_{int(time.time())}"
backup_all(emergency_backup)
# 3. 清理损坏的数据
print("3. 清理损坏的数据...")
connections.connect(host="localhost", port="19530")
if target_collections:
collections = target_collections.split(",")
for collection in collections:
if utility.has_collection(collection):
utility.drop_collection(collection)
print(f" 删除集合: {collection}")
else:
# 删除所有集合
all_collections = utility.list_collections()
for collection in all_collections:
utility.drop_collection(collection)
print(f" 删除集合: {collection}")
# 4. 从备份恢复
print("4. 从备份恢复数据...")
restore_backup(backup_name, target_collections)
# 5. 验证恢复结果
print("5. 验证恢复结果...")
# verify_recovery()
# 6. 恢复写入服务
print("6. 恢复写入服务...")
# start_write_services()
print("=== 灾难恢复完成 ===")最佳实践
备份策略建议
- 定期全量备份: 每天执行全量备份
- 保留策略:
- 日备份保留 7 天
- 周备份保留 4 周
- 月备份保留 12 个月
- 异地备份: 将备份复制到异地存储
- 定期测试: 定期测试恢复流程
备份存储建议
python
# 多副本存储
import shutil
def backup_to_multiple_locations(backup_name, locations):
"""备份到多个位置"""
backup_dir = "/var/lib/milvus-backup"
backup_path = os.path.join(backup_dir, backup_name)
for location in locations:
dest_path = os.path.join(location, backup_name)
shutil.copytree(backup_path, dest_path)
print(f"备份已复制到: {dest_path}")
# 使用示例
locations = [
"/backup/local",
"/backup/nas",
"/mnt/cloud-backup"
]
backup_to_multiple_locations("full_backup_20240115", locations)下一步
掌握数据备份与恢复后,你可以: