Skip to content

监控运维

监控和运维是确保RAG系统稳定运行的重要环节。通过建立完善的监控体系,可以及时发现和解决系统问题,提高系统的可靠性和可用性。本章节将详细介绍RAG系统的监控策略、工具和最佳实践。

1. 监控指标

系统指标

  • CPU使用率:系统CPU的使用情况
  • 内存使用率:系统内存的使用情况
  • 磁盘使用率:系统磁盘的使用情况
  • 网络流量:系统网络的进出流量
  • 进程状态:系统进程的运行状态

应用指标

  • 响应时间:API响应时间
  • 吞吐量:每秒处理的请求数
  • 错误率:系统错误的比例
  • 并发数:同时处理的请求数
  • 队列长度:等待处理的请求队列长度

业务指标

  • 查询量:系统处理的查询数量
  • 回答质量:回答的质量评分
  • 用户满意度:用户对系统的满意度
  • 知识覆盖率:系统覆盖的知识范围
  • 更新频率:知识库的更新频率

2. 监控工具

系统监控

  • Prometheus:开源的监控系统和时间序列数据库
  • Grafana:开源的可视化监控平台
  • Nagios:开源的网络监控和告警系统
  • Zabbix:开源的企业级监控解决方案

应用监控

  • Datadog:云时代的监控平台
  • New Relic:应用性能监控平台
  • AppDynamics:应用性能管理解决方案
  • ELK Stack:日志分析平台(Elasticsearch, Logstash, Kibana)

自定义监控

  • 自定义指标:根据业务需求定义监控指标
  • 健康检查:定期检查系统健康状态

3. Prometheus + Grafana 监控

Prometheus配置

yaml
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'rag-app'
    static_configs:
      - targets: ['rag-app:5000']
    metrics_path: /metrics
    scrape_interval: 5s

  - job_name: 'node-exporter'
    static_configs:
      - targets: ['node-exporter:9100']

应用指标暴露

python
# metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from flask import Flask, Response
import time

app = Flask(__name__)

# 定义指标
REQUEST_COUNT = Counter(
    'rag_requests_total',
    'Total requests',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'rag_request_duration_seconds',
    'Request latency',
    ['method', 'endpoint']
)

ACTIVE_REQUESTS = Gauge(
    'rag_active_requests',
    'Number of active requests'
)

QUERY_COUNT = Counter(
    'rag_queries_total',
    'Total queries processed'
)

RETRIEVAL_LATENCY = Histogram(
    'rag_retrieval_duration_seconds',
    'Retrieval latency'
)

GENERATION_LATENCY = Histogram(
    'rag_generation_duration_seconds',
    'Generation latency'
)

@app.route('/metrics')
def metrics():
    """Prometheus指标端点"""
    return Response(generate_latest(), mimetype='text/plain')

# 装饰器用于自动记录指标
def track_metrics(f):
    def wrapper(*args, **kwargs):
        ACTIVE_REQUESTS.inc()
        start_time = time.time()
        
        try:
            result = f(*args, **kwargs)
            status = 'success'
            return result
        except Exception as e:
            status = 'error'
            raise
        finally:
            duration = time.time() - start_time
            REQUEST_COUNT.labels(
                method=request.method,
                endpoint=request.endpoint,
                status=status
            ).inc()
            REQUEST_LATENCY.labels(
                method=request.method,
                endpoint=request.endpoint
            ).observe(duration)
            ACTIVE_REQUESTS.dec()
    
    return wrapper

Grafana仪表板

json
{
  "dashboard": {
    "title": "RAG System Dashboard",
    "panels": [
      {
        "title": "Request Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(rag_requests_total[5m])",
            "legendFormat": "{{method}} {{endpoint}}"
          }
        ]
      },
      {
        "title": "Response Time",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(rag_request_duration_seconds_bucket[5m]))",
            "legendFormat": "P95"
          },
          {
            "expr": "histogram_quantile(0.99, rate(rag_request_duration_seconds_bucket[5m]))",
            "legendFormat": "P99"
          }
        ]
      },
      {
        "title": "Error Rate",
        "type": "singlestat",
        "targets": [
          {
            "expr": "rate(rag_requests_total{status='error'}[5m]) / rate(rag_requests_total[5m])"
          }
        ]
      },
      {
        "title": "Active Requests",
        "type": "singlestat",
        "targets": [
          {
            "expr": "rag_active_requests"
          }
        ]
      }
    ]
  }
}

4. 日志管理

日志配置

python
# logging_config.py
import logging
import logging.handlers
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    """JSON格式日志"""
    def format(self, record):
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        
        if hasattr(record, 'request_id'):
            log_data['request_id'] = record.request_id
        
        if record.exc_info:
            log_data['exception'] = self.formatException(record.exc_info)
        
        return json.dumps(log_data, ensure_ascii=False)

def setup_logging():
    """配置日志"""
    logger = logging.getLogger('rag_system')
    logger.setLevel(logging.INFO)
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(JSONFormatter())
    logger.addHandler(console_handler)
    
    # 文件处理器
    file_handler = logging.handlers.RotatingFileHandler(
        'logs/rag.log',
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5
    )
    file_handler.setFormatter(JSONFormatter())
    logger.addHandler(file_handler)
    
    return logger

logger = setup_logging()

结构化日志

python
# structured_logging.py
import structlog
import logging

structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

# 使用示例
def process_query(query, user_id):
    logger.info(
        "processing_query",
        query=query,
        user_id=user_id,
        query_length=len(query)
    )
    # 处理逻辑...

ELK集成

yaml
# filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/rag/*.log
  json.keys_under_root: true
  json.add_error_key: true

output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  index: "rag-logs-%{+yyyy.MM.dd}"

processors:
- add_host_metadata:
    when.not.contains.tags: forwarded
- add_cloud_metadata: ~

5. 告警配置

Alertmanager配置

yaml
# alertmanager.yml
global:
  smtp_smarthost: 'smtp.gmail.com:587'
  smtp_from: 'alerts@yourcompany.com'
  smtp_auth_username: 'alerts@yourcompany.com'
  smtp_auth_password: 'your-password'

route:
  group_by: ['alertname']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: 'email-notifications'

receivers:
- name: 'email-notifications'
  email_configs:
  - to: 'ops-team@yourcompany.com'
    subject: 'RAG System Alert: {{ .GroupLabels.alertname }}'
    body: |
      {{ range .Alerts }}
      Alert: {{ .Annotations.summary }}
      Description: {{ .Annotations.description }}
      {{ end }}

- name: 'slack-notifications'
  slack_configs:
  - api_url: 'YOUR_SLACK_WEBHOOK_URL'
    channel: '#alerts'
    title: 'RAG System Alert'
    text: '{{ .CommonAnnotations.summary }}'

告警规则

yaml
# alerting_rules.yml
groups:
- name: rag_system_alerts
  rules:
  - alert: HighErrorRate
    expr: rate(rag_requests_total{status="error"}[5m]) > 0.1
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "High error rate detected"
      description: "Error rate is above 10% for more than 2 minutes"

  - alert: HighLatency
    expr: histogram_quantile(0.95, rate(rag_request_duration_seconds_bucket[5m])) > 2
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "High response latency"
      description: "P95 latency is above 2 seconds"

  - alert: LowQuerySuccessRate
    expr: rate(rag_queries_success_total[5m]) / rate(rag_queries_total[5m]) < 0.95
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "Low query success rate"
      description: "Query success rate is below 95%"

  - alert: HighMemoryUsage
    expr: (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes > 0.85
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "High memory usage"
      description: "Memory usage is above 85%"

6. 运维自动化

自动扩缩容

python
# auto_scaling.py
import boto3
from kubernetes import client, config

class AutoScaler:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        config.load_kube_config()
        self.k8s_apps = client.AppsV1Api()
    
    def get_metrics(self):
        """获取系统指标"""
        response = self.cloudwatch.get_metric_statistics(
            Namespace='AWS/ApplicationELB',
            MetricName='RequestCount',
            Dimensions=[
                {
                    'Name': 'LoadBalancer',
                    'Value': 'app/rag-lb/1234567890'
                }
            ],
            StartTime=datetime.utcnow() - timedelta(minutes=5),
            EndTime=datetime.utcnow(),
            Period=60,
            Statistics=['Sum']
        )
        return response['Datapoints']
    
    def scale_deployment(self, deployment_name, namespace, replicas):
        """扩缩容Deployment"""
        self.k8s_apps.patch_namespaced_deployment_scale(
            name=deployment_name,
            namespace=namespace,
            body={'spec': {'replicas': replicas}}
        )
    
    def auto_scale(self):
        """自动扩缩容逻辑"""
        metrics = self.get_metrics()
        current_requests = sum([m['Sum'] for m in metrics])
        
        # 根据请求量决定副本数
        if current_requests > 1000:
            self.scale_deployment('rag-app', 'default', 10)
        elif current_requests > 500:
            self.scale_deployment('rag-app', 'default', 5)
        else:
            self.scale_deployment('rag-app', 'default', 3)

自动恢复

python
# auto_recovery.py
import time
import requests
from kubernetes import client, config

class AutoRecovery:
    def __init__(self):
        config.load_kube_config()
        self.k8s_core = client.CoreV1Api()
        self.k8s_apps = client.AppsV1Api()
    
    def check_pod_health(self, namespace='default'):
        """检查Pod健康状态"""
        pods = self.k8s_core.list_namespaced_pod(namespace)
        
        unhealthy_pods = []
        for pod in pods.items:
            if pod.status.phase != 'Running':
                unhealthy_pods.append(pod.metadata.name)
        
        return unhealthy_pods
    
    def restart_pod(self, pod_name, namespace='default'):
        """重启Pod"""
        self.k8s_core.delete_namespaced_pod(
            name=pod_name,
            namespace=namespace
        )
        print(f"重启Pod: {pod_name}")
    
    def check_service_health(self, url):
        """检查服务健康状态"""
        try:
            response = requests.get(f"{url}/health", timeout=5)
            return response.status_code == 200
        except:
            return False
    
    def run_recovery_checks(self):
        """运行恢复检查"""
        while True:
            # 检查Pod健康
            unhealthy = self.check_pod_health()
            for pod in unhealthy:
                self.restart_pod(pod)
            
            # 检查服务健康
            if not self.check_service_health("http://rag-service"):
                print("服务不健康,触发告警")
                # 发送告警通知
            
            time.sleep(30)  # 每30秒检查一次

7. 备份与恢复

python
# backup.py
import shutil
import tarfile
from datetime import datetime
import boto3

class BackupManager:
    def __init__(self, backup_dir="/backups"):
        self.backup_dir = backup_dir
        self.s3 = boto3.client('s3')
    
    def backup_vector_db(self, db_path):
        """备份向量数据库"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_name = f"vector_db_{timestamp}.tar.gz"
        backup_path = f"{self.backup_dir}/{backup_name}"
        
        # 创建压缩包
        with tarfile.open(backup_path, "w:gz") as tar:
            tar.add(db_path, arcname="vector_db")
        
        # 上传到S3
        self.s3.upload_file(
            backup_path,
            'rag-backups',
            backup_name
        )
        
        print(f"备份完成: {backup_name}")
        return backup_name
    
    def restore_vector_db(self, backup_name, restore_path):
        """恢复向量数据库"""
        backup_path = f"{self.backup_dir}/{backup_name}"
        
        # 从S3下载
        self.s3.download_file(
            'rag-backups',
            backup_name,
            backup_path
        )
        
        # 解压
        with tarfile.open(backup_path, "r:gz") as tar:
            tar.extractall(restore_path)
        
        print(f"恢复完成: {backup_name}")
    
    def schedule_backups(self):
        """定时备份"""
        import schedule
        
        # 每天凌晨2点备份
        schedule.every().day.at("02:00").do(
            self.backup_vector_db,
            db_path="./vector_db"
        )
        
        while True:
            schedule.run_pending()
            time.sleep(60)