Skip to content

数据备份与恢复

备份策略概述

为什么需要备份

  • 数据安全: 防止数据丢失
  • 灾难恢复: 应对硬件故障、误操作
  • 版本回滚: 支持数据版本管理
  • 迁移需求: 支持数据迁移和复制

Milvus 数据组成

Milvus 数据
├── 元数据 (Metadata) - 存储在 etcd
│   ├── Collection Schema
│   ├── Index 信息
│   └── 分区信息

├── 向量数据 (Vector Data) - 存储在对象存储
│   ├── 插入的数据
│   └── 索引文件

└── 日志数据 (Log) - 存储在消息队列
    └── 操作日志

Milvus Backup 工具

安装 Milvus Backup

bash
# 下载 Milvus Backup
git clone https://github.com/zilliztech/milvus-backup.git
cd milvus-backup

# 编译
make

# 或者使用 Docker
docker pull milvusdb/milvus-backup:latest

配置文件

yaml
# backup.yaml
milvus:
  address: localhost
  port: 19530
  authorization: "root:Milvus"
  tlsMode: 0

minio:
  address: localhost
  port: 9000
  accessKeyID: minioadmin
  secretAccessKey: minioadmin
  useSSL: false
  bucketName: a-bucket
  rootPath: files

backup:
  storageType: local
  rootPath: /var/lib/milvus-backup

备份操作

完整备份

bash
# 备份所有集合
./milvus-backup create -n full_backup_$(date +%Y%m%d)

# 查看备份列表
./milvus-backup list

指定集合备份

bash
# 备份单个集合
./milvus-backup create -n article_backup -c article_search

# 备份多个集合
./milvus-backup create -n multi_backup -c "collection1,collection2,collection3"

Python 备份脚本

python
import subprocess
import datetime
import json

def backup_collection(collection_name, backup_name=None):
    """备份指定集合"""
    if backup_name is None:
        backup_name = f"{collection_name}_backup_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
    
    cmd = [
        "./milvus-backup", "create",
        "-n", backup_name,
        "-c", collection_name
    ]
    
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    if result.returncode == 0:
        print(f"备份成功: {backup_name}")
        return backup_name
    else:
        print(f"备份失败: {result.stderr}")
        return None

def backup_all(backup_name=None):
    """备份所有集合"""
    if backup_name is None:
        backup_name = f"full_backup_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
    
    cmd = ["./milvus-backup", "create", "-n", backup_name]
    
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    if result.returncode == 0:
        print(f"全量备份成功: {backup_name}")
        return backup_name
    else:
        print(f"备份失败: {result.stderr}")
        return None

def list_backups():
    """列出所有备份"""
    cmd = ["./milvus-backup", "list"]
    
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    if result.returncode == 0:
        backups = json.loads(result.stdout)
        return backups
    else:
        print(f"获取备份列表失败: {result.stderr}")
        return []

恢复操作

从备份恢复

bash
# 恢复整个备份
./milvus-backup restore -n full_backup_20240115

# 恢复指定集合
./milvus-backup restore -n article_backup -c article_search

# 恢复到新名称
./milvus-backup restore -n article_backup -c article_search -s article_search_restored

Python 恢复脚本

python
def restore_backup(backup_name, collections=None, suffix=""):
    """从备份恢复"""
    cmd = ["./milvus-backup", "restore", "-n", backup_name]
    
    if collections:
        cmd.extend(["-c", collections])
    
    if suffix:
        cmd.extend(["-s", suffix])
    
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    if result.returncode == 0:
        print(f"恢复成功: {backup_name}")
        return True
    else:
        print(f"恢复失败: {result.stderr}")
        return False

def restore_to_new_name(backup_name, collection_name, new_name):
    """恢复到新集合名称"""
    return restore_backup(
        backup_name,
        collections=collection_name,
        suffix=f"_restored_{new_name}"
    )

手动备份方案

导出集合 Schema

python
from pymilvus import connections, Collection, utility
import json

def export_schema(collection_name, output_file):
    """导出集合 Schema"""
    connections.connect(host="localhost", port="19530")
    
    collection = Collection(collection_name)
    
    schema_info = {
        "name": collection.name,
        "description": collection.description,
        "fields": []
    }
    
    for field in collection.schema.fields:
        field_info = {
            "name": field.name,
            "dtype": str(field.dtype),
            "is_primary": field.is_primary,
            "auto_id": field.auto_id,
            "params": field.params
        }
        schema_info["fields"].append(field_info)
    
    # 保存到文件
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(schema_info, f, ensure_ascii=False, indent=2)
    
    print(f"Schema 已导出到: {output_file}")

# 使用示例
export_schema("article_search", "article_search_schema.json")

导出数据

python
def export_data(collection_name, output_file, batch_size=1000):
    """导出集合数据"""
    connections.connect(host="localhost", port="19530")
    
    collection = Collection(collection_name)
    collection.load()
    
    # 获取所有字段名
    field_names = [field.name for field in collection.schema.fields]
    
    all_data = []
    offset = 0
    
    while True:
        # 分批查询数据
        results = collection.query(
            expr="",
            output_fields=field_names,
            offset=offset,
            limit=batch_size
        )
        
        if not results:
            break
        
        all_data.extend(results)
        offset += batch_size
        
        if len(results) < batch_size:
            break
    
    # 保存到 JSON 文件
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(all_data, f, ensure_ascii=False, indent=2)
    
    print(f"数据已导出到: {output_file}")
    print(f"共导出 {len(all_data)} 条记录")

# 使用示例
export_data("article_search", "article_search_data.json")

导入数据

python
def import_data(collection_name, schema_file, data_file):
    """导入数据到集合"""
    connections.connect(host="localhost", port="19530")
    
    # 读取 Schema
    with open(schema_file, "r", encoding="utf-8") as f:
        schema_info = json.load(f)
    
    # 检查集合是否存在
    if utility.has_collection(collection_name):
        print(f"集合 {collection_name} 已存在")
        return
    
    # 重建 Schema
    from pymilvus import FieldSchema, CollectionSchema, Collection, DataType
    
    fields = []
    for field_info in schema_info["fields"]:
        field = FieldSchema(
            name=field_info["name"],
            dtype=getattr(DataType, field_info["dtype"].split(".")[-1]),
            is_primary=field_info.get("is_primary", False),
            auto_id=field_info.get("auto_id", False),
            **field_info.get("params", {})
        )
        fields.append(field)
    
    schema = CollectionSchema(
        fields=fields,
        description=schema_info.get("description", "")
    )
    
    collection = Collection(collection_name, schema)
    
    # 读取数据
    with open(data_file, "r", encoding="utf-8") as f:
        data = json.load(f)
    
    # 分批插入
    batch_size = 1000
    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        collection.insert(batch)
        print(f"已插入 {min(i + batch_size, len(data))}/{len(data)} 条数据")
    
    print(f"数据导入完成,共 {len(data)} 条记录")

# 使用示例
import_data("article_search_new", "article_search_schema.json", "article_search_data.json")

定时备份策略

使用 Cron 定时备份

bash
# 编辑 crontab
crontab -e

# 每天凌晨 2 点执行全量备份
0 2 * * * /path/to/milvus-backup create -n full_backup_$(date +\%Y\%m\%d)

# 每周日执行清理,保留最近 30 天的备份
0 3 * * 0 find /path/to/backups -name "full_backup_*" -mtime +30 -delete

Python 定时备份脚本

python
import schedule
import time
import datetime
import os

def daily_backup():
    """每日备份"""
    backup_name = f"daily_backup_{datetime.datetime.now().strftime('%Y%m%d')}"
    backup_all(backup_name)
    
    # 清理旧备份(保留 7 天)
    cleanup_old_backups(days=7)

def weekly_backup():
    """每周备份"""
    backup_name = f"weekly_backup_{datetime.datetime.now().strftime('%Y%m%d')}"
    backup_all(backup_name)
    
    # 清理旧备份(保留 30 天)
    cleanup_old_backups(days=30)

def cleanup_old_backups(days):
    """清理旧备份"""
    backup_dir = "/var/lib/milvus-backup"
    
    for filename in os.listdir(backup_dir):
        filepath = os.path.join(backup_dir, filename)
        file_time = datetime.datetime.fromtimestamp(os.path.getctime(filepath))
        
        if datetime.datetime.now() - file_time > datetime.timedelta(days=days):
            os.remove(filepath)
            print(f"删除旧备份: {filename}")

# 设置定时任务
schedule.every().day.at("02:00").do(daily_backup)
schedule.every().sunday.at("03:00").do(weekly_backup)

# 运行调度器
while True:
    schedule.run_pending()
    time.sleep(60)

备份验证

验证备份完整性

python
def verify_backup(backup_name):
    """验证备份完整性"""
    cmd = ["./milvus-backup", "check", "-n", backup_name]
    
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    if result.returncode == 0:
        print(f"备份 {backup_name} 验证通过")
        return True
    else:
        print(f"备份验证失败: {result.stderr}")
        return False

测试恢复流程

python
def test_restore(backup_name, test_collection_name):
    """测试恢复流程"""
    # 恢复到临时集合
    temp_name = f"test_restore_{int(time.time())}"
    
    success = restore_backup(backup_name, test_collection_name, suffix=f"_{temp_name}")
    
    if success:
        # 验证数据
        connections.connect(host="localhost", port="19530")
        collection = Collection(f"{test_collection_name}_{temp_name}")
        
        count = collection.num_entities
        print(f"恢复后的数据量: {count}")
        
        # 清理测试集合
        utility.drop_collection(f"{test_collection_name}_{temp_name}")
        
        return True
    
    return False

灾难恢复流程

python
def disaster_recovery(backup_name, target_collections=None):
    """灾难恢复流程"""
    
    print("=== 开始灾难恢复 ===")
    
    # 1. 停止写入服务
    print("1. 停止写入服务...")
    # stop_write_services()
    
    # 2. 备份当前状态(如果可能)
    print("2. 备份当前状态...")
    emergency_backup = f"emergency_backup_{int(time.time())}"
    backup_all(emergency_backup)
    
    # 3. 清理损坏的数据
    print("3. 清理损坏的数据...")
    connections.connect(host="localhost", port="19530")
    
    if target_collections:
        collections = target_collections.split(",")
        for collection in collections:
            if utility.has_collection(collection):
                utility.drop_collection(collection)
                print(f"  删除集合: {collection}")
    else:
        # 删除所有集合
        all_collections = utility.list_collections()
        for collection in all_collections:
            utility.drop_collection(collection)
            print(f"  删除集合: {collection}")
    
    # 4. 从备份恢复
    print("4. 从备份恢复数据...")
    restore_backup(backup_name, target_collections)
    
    # 5. 验证恢复结果
    print("5. 验证恢复结果...")
    # verify_recovery()
    
    # 6. 恢复写入服务
    print("6. 恢复写入服务...")
    # start_write_services()
    
    print("=== 灾难恢复完成 ===")

最佳实践

备份策略建议

  1. 定期全量备份: 每天执行全量备份
  2. 保留策略:
    • 日备份保留 7 天
    • 周备份保留 4 周
    • 月备份保留 12 个月
  3. 异地备份: 将备份复制到异地存储
  4. 定期测试: 定期测试恢复流程

备份存储建议

python
# 多副本存储
import shutil

def backup_to_multiple_locations(backup_name, locations):
    """备份到多个位置"""
    backup_dir = "/var/lib/milvus-backup"
    backup_path = os.path.join(backup_dir, backup_name)
    
    for location in locations:
        dest_path = os.path.join(location, backup_name)
        shutil.copytree(backup_path, dest_path)
        print(f"备份已复制到: {dest_path}")

# 使用示例
locations = [
    "/backup/local",
    "/backup/nas",
    "/mnt/cloud-backup"
]
backup_to_multiple_locations("full_backup_20240115", locations)

下一步

掌握数据备份与恢复后,你可以:

  1. 学习性能优化
  2. 了解监控与运维
  3. 探索实际应用案例