Appearance
监控运维
监控和运维是确保RAG系统稳定运行的重要环节。通过建立完善的监控体系,可以及时发现和解决系统问题,提高系统的可靠性和可用性。本章节将详细介绍RAG系统的监控策略、工具和最佳实践。
1. 监控指标
系统指标
- CPU使用率:系统CPU的使用情况
- 内存使用率:系统内存的使用情况
- 磁盘使用率:系统磁盘的使用情况
- 网络流量:系统网络的进出流量
- 进程状态:系统进程的运行状态
应用指标
- 响应时间:API响应时间
- 吞吐量:每秒处理的请求数
- 错误率:系统错误的比例
- 并发数:同时处理的请求数
- 队列长度:等待处理的请求队列长度
业务指标
- 查询量:系统处理的查询数量
- 回答质量:回答的质量评分
- 用户满意度:用户对系统的满意度
- 知识覆盖率:系统覆盖的知识范围
- 更新频率:知识库的更新频率
2. 监控工具
系统监控
- Prometheus:开源的监控系统和时间序列数据库
- Grafana:开源的可视化监控平台
- Nagios:开源的网络监控和告警系统
- Zabbix:开源的企业级监控解决方案
应用监控
- Datadog:云时代的监控平台
- New Relic:应用性能监控平台
- AppDynamics:应用性能管理解决方案
- ELK Stack:日志分析平台(Elasticsearch, Logstash, Kibana)
自定义监控
- 自定义指标:根据业务需求定义监控指标
- 健康检查:定期检查系统健康状态
3. Prometheus + Grafana 监控
Prometheus配置
yaml
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'rag-app'
static_configs:
- targets: ['rag-app:5000']
metrics_path: /metrics
scrape_interval: 5s
- job_name: 'node-exporter'
static_configs:
- targets: ['node-exporter:9100']应用指标暴露
python
# metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from flask import Flask, Response
import time
app = Flask(__name__)
# 定义指标
REQUEST_COUNT = Counter(
'rag_requests_total',
'Total requests',
['method', 'endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'rag_request_duration_seconds',
'Request latency',
['method', 'endpoint']
)
ACTIVE_REQUESTS = Gauge(
'rag_active_requests',
'Number of active requests'
)
QUERY_COUNT = Counter(
'rag_queries_total',
'Total queries processed'
)
RETRIEVAL_LATENCY = Histogram(
'rag_retrieval_duration_seconds',
'Retrieval latency'
)
GENERATION_LATENCY = Histogram(
'rag_generation_duration_seconds',
'Generation latency'
)
@app.route('/metrics')
def metrics():
"""Prometheus指标端点"""
return Response(generate_latest(), mimetype='text/plain')
# 装饰器用于自动记录指标
def track_metrics(f):
def wrapper(*args, **kwargs):
ACTIVE_REQUESTS.inc()
start_time = time.time()
try:
result = f(*args, **kwargs)
status = 'success'
return result
except Exception as e:
status = 'error'
raise
finally:
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.endpoint,
status=status
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.endpoint
).observe(duration)
ACTIVE_REQUESTS.dec()
return wrapperGrafana仪表板
json
{
"dashboard": {
"title": "RAG System Dashboard",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(rag_requests_total[5m])",
"legendFormat": "{{method}} {{endpoint}}"
}
]
},
{
"title": "Response Time",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(rag_request_duration_seconds_bucket[5m]))",
"legendFormat": "P95"
},
{
"expr": "histogram_quantile(0.99, rate(rag_request_duration_seconds_bucket[5m]))",
"legendFormat": "P99"
}
]
},
{
"title": "Error Rate",
"type": "singlestat",
"targets": [
{
"expr": "rate(rag_requests_total{status='error'}[5m]) / rate(rag_requests_total[5m])"
}
]
},
{
"title": "Active Requests",
"type": "singlestat",
"targets": [
{
"expr": "rag_active_requests"
}
]
}
]
}
}4. 日志管理
日志配置
python
# logging_config.py
import logging
import logging.handlers
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""JSON格式日志"""
def format(self, record):
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
if hasattr(record, 'request_id'):
log_data['request_id'] = record.request_id
if record.exc_info:
log_data['exception'] = self.formatException(record.exc_info)
return json.dumps(log_data, ensure_ascii=False)
def setup_logging():
"""配置日志"""
logger = logging.getLogger('rag_system')
logger.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(JSONFormatter())
logger.addHandler(console_handler)
# 文件处理器
file_handler = logging.handlers.RotatingFileHandler(
'logs/rag.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(JSONFormatter())
logger.addHandler(file_handler)
return logger
logger = setup_logging()结构化日志
python
# structured_logging.py
import structlog
import logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# 使用示例
def process_query(query, user_id):
logger.info(
"processing_query",
query=query,
user_id=user_id,
query_length=len(query)
)
# 处理逻辑...ELK集成
yaml
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/rag/*.log
json.keys_under_root: true
json.add_error_key: true
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "rag-logs-%{+yyyy.MM.dd}"
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~5. 告警配置
Alertmanager配置
yaml
# alertmanager.yml
global:
smtp_smarthost: 'smtp.gmail.com:587'
smtp_from: 'alerts@yourcompany.com'
smtp_auth_username: 'alerts@yourcompany.com'
smtp_auth_password: 'your-password'
route:
group_by: ['alertname']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'email-notifications'
receivers:
- name: 'email-notifications'
email_configs:
- to: 'ops-team@yourcompany.com'
subject: 'RAG System Alert: {{ .GroupLabels.alertname }}'
body: |
{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
{{ end }}
- name: 'slack-notifications'
slack_configs:
- api_url: 'YOUR_SLACK_WEBHOOK_URL'
channel: '#alerts'
title: 'RAG System Alert'
text: '{{ .CommonAnnotations.summary }}'告警规则
yaml
# alerting_rules.yml
groups:
- name: rag_system_alerts
rules:
- alert: HighErrorRate
expr: rate(rag_requests_total{status="error"}[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is above 10% for more than 2 minutes"
- alert: HighLatency
expr: histogram_quantile(0.95, rate(rag_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "High response latency"
description: "P95 latency is above 2 seconds"
- alert: LowQuerySuccessRate
expr: rate(rag_queries_success_total[5m]) / rate(rag_queries_total[5m]) < 0.95
for: 5m
labels:
severity: warning
annotations:
summary: "Low query success rate"
description: "Query success rate is below 95%"
- alert: HighMemoryUsage
expr: (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes > 0.85
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage"
description: "Memory usage is above 85%"6. 运维自动化
自动扩缩容
python
# auto_scaling.py
import boto3
from kubernetes import client, config
class AutoScaler:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
config.load_kube_config()
self.k8s_apps = client.AppsV1Api()
def get_metrics(self):
"""获取系统指标"""
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/ApplicationELB',
MetricName='RequestCount',
Dimensions=[
{
'Name': 'LoadBalancer',
'Value': 'app/rag-lb/1234567890'
}
],
StartTime=datetime.utcnow() - timedelta(minutes=5),
EndTime=datetime.utcnow(),
Period=60,
Statistics=['Sum']
)
return response['Datapoints']
def scale_deployment(self, deployment_name, namespace, replicas):
"""扩缩容Deployment"""
self.k8s_apps.patch_namespaced_deployment_scale(
name=deployment_name,
namespace=namespace,
body={'spec': {'replicas': replicas}}
)
def auto_scale(self):
"""自动扩缩容逻辑"""
metrics = self.get_metrics()
current_requests = sum([m['Sum'] for m in metrics])
# 根据请求量决定副本数
if current_requests > 1000:
self.scale_deployment('rag-app', 'default', 10)
elif current_requests > 500:
self.scale_deployment('rag-app', 'default', 5)
else:
self.scale_deployment('rag-app', 'default', 3)自动恢复
python
# auto_recovery.py
import time
import requests
from kubernetes import client, config
class AutoRecovery:
def __init__(self):
config.load_kube_config()
self.k8s_core = client.CoreV1Api()
self.k8s_apps = client.AppsV1Api()
def check_pod_health(self, namespace='default'):
"""检查Pod健康状态"""
pods = self.k8s_core.list_namespaced_pod(namespace)
unhealthy_pods = []
for pod in pods.items:
if pod.status.phase != 'Running':
unhealthy_pods.append(pod.metadata.name)
return unhealthy_pods
def restart_pod(self, pod_name, namespace='default'):
"""重启Pod"""
self.k8s_core.delete_namespaced_pod(
name=pod_name,
namespace=namespace
)
print(f"重启Pod: {pod_name}")
def check_service_health(self, url):
"""检查服务健康状态"""
try:
response = requests.get(f"{url}/health", timeout=5)
return response.status_code == 200
except:
return False
def run_recovery_checks(self):
"""运行恢复检查"""
while True:
# 检查Pod健康
unhealthy = self.check_pod_health()
for pod in unhealthy:
self.restart_pod(pod)
# 检查服务健康
if not self.check_service_health("http://rag-service"):
print("服务不健康,触发告警")
# 发送告警通知
time.sleep(30) # 每30秒检查一次7. 备份与恢复
python
# backup.py
import shutil
import tarfile
from datetime import datetime
import boto3
class BackupManager:
def __init__(self, backup_dir="/backups"):
self.backup_dir = backup_dir
self.s3 = boto3.client('s3')
def backup_vector_db(self, db_path):
"""备份向量数据库"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"vector_db_{timestamp}.tar.gz"
backup_path = f"{self.backup_dir}/{backup_name}"
# 创建压缩包
with tarfile.open(backup_path, "w:gz") as tar:
tar.add(db_path, arcname="vector_db")
# 上传到S3
self.s3.upload_file(
backup_path,
'rag-backups',
backup_name
)
print(f"备份完成: {backup_name}")
return backup_name
def restore_vector_db(self, backup_name, restore_path):
"""恢复向量数据库"""
backup_path = f"{self.backup_dir}/{backup_name}"
# 从S3下载
self.s3.download_file(
'rag-backups',
backup_name,
backup_path
)
# 解压
with tarfile.open(backup_path, "r:gz") as tar:
tar.extractall(restore_path)
print(f"恢复完成: {backup_name}")
def schedule_backups(self):
"""定时备份"""
import schedule
# 每天凌晨2点备份
schedule.every().day.at("02:00").do(
self.backup_vector_db,
db_path="./vector_db"
)
while True:
schedule.run_pending()
time.sleep(60)