Skip to content

评估方法

评估RAG系统的性能是确保系统质量的关键步骤。通过合理的评估方法,可以识别系统的优势和不足,指导系统优化和改进。本章节将详细介绍RAG系统的评估指标、方法和工具。

1. 评估指标

准确性指标

  • 精确率(Precision):检索结果中相关文档的比例

    精确率 = 相关文档数 / 检索结果总数
  • 召回率(Recall):相关文档被检索到的比例

    召回率 = 检索到的相关文档数 / 所有相关文档数
  • F1分数:精确率和召回率的调和平均

    F1 = 2 × (精确率 × 召回率) / (精确率 + 召回率)
  • 平均精度(MAP):平均每个查询的精度

    MAP = (1/n) × Σ(精度@k)

    其中n是查询总数,精度@k是前k个结果的精度。

相关性指标

  • 相关性评分:人工或自动评估检索结果与查询的相关程度
  • 语义相似度:使用嵌入模型计算查询与检索结果的语义相似度
  • 上下文匹配度:评估检索结果与生成回答的匹配程度

性能指标

  • 检索延迟:从查询到返回检索结果的时间
  • 生成延迟:从检索结果到生成回答的时间
  • 总延迟:从查询到返回回答的总时间
  • 吞吐量:系统每秒处理的查询数

质量指标

  • 回答质量:回答的准确性、完整性和相关性
  • 幻觉率:回答中包含错误信息的比例
  • 引用准确性:回答中引用的来源是否准确

2. 评估方法

离线评估

使用标注数据集进行评估:

python
class OfflineEvaluator:
    def __init__(self, rag_system):
        self.rag = rag_system
    
    def evaluate_retrieval(self, test_data):
        """
        评估检索性能
        test_data: [{"query": "...", "relevant_docs": ["..."]}]
        """
        total_precision = 0
        total_recall = 0
        
        for item in test_data:
            query = item["query"]
            relevant_docs = set(item["relevant_docs"])
            
            # 执行检索
            results = self.rag.retrieve(query, k=5)
            retrieved_docs = set([r["document"]["metadata"]["source"] for r in results])
            
            # 计算指标
            if retrieved_docs:
                precision = len(retrieved_docs & relevant_docs) / len(retrieved_docs)
                recall = len(retrieved_docs & relevant_docs) / len(relevant_docs)
                
                total_precision += precision
                total_recall += recall
        
        n = len(test_data)
        avg_precision = total_precision / n
        avg_recall = total_recall / n
        f1 = 2 * (avg_precision * avg_recall) / (avg_precision + avg_recall) if (avg_precision + avg_recall) > 0 else 0
        
        return {
            "precision": avg_precision,
            "recall": avg_recall,
            "f1": f1
        }
    
    def evaluate_generation(self, test_data):
        """
        评估生成质量
        test_data: [{"query": "...", "ground_truth": "..."}]
        """
        from rouge import Rouge
        
        rouge = Rouge()
        rouge_scores = []
        
        for item in test_data:
            query = item["query"]
            ground_truth = item["ground_truth"]
            
            # 生成回答
            result = self.rag.query(query)
            generated = result["answer"]
            
            # 计算ROUGE分数
            scores = rouge.get_scores(generated, ground_truth)[0]
            rouge_scores.append(scores)
        
        # 计算平均分数
        avg_scores = {
            "rouge-1": sum([s["rouge-1"]["f"] for s in rouge_scores]) / len(rouge_scores),
            "rouge-2": sum([s["rouge-2"]["f"] for s in rouge_scores]) / len(rouge_scores),
            "rouge-l": sum([s["rouge-l"]["f"] for s in rouge_scores]) / len(rouge_scores)
        }
        
        return avg_scores

在线评估

收集用户反馈进行评估:

python
class OnlineEvaluator:
    def __init__(self):
        self.feedback_data = []
    
    def collect_feedback(self, query, answer, rating, feedback_type="explicit"):
        """
        收集用户反馈
        rating: 1-5的评分
        feedback_type: explicit(显式)或 implicit(隐式)
        """
        self.feedback_data.append({
            "query": query,
            "answer": answer,
            "rating": rating,
            "type": feedback_type,
            "timestamp": time.time()
        })
    
    def calculate_satisfaction(self):
        """计算用户满意度"""
        if not self.feedback_data:
            return 0
        
        ratings = [f["rating"] for f in self.feedback_data]
        return sum(ratings) / len(ratings)
    
    def get_metrics(self):
        """获取在线评估指标"""
        total_queries = len(self.feedback_data)
        
        if total_queries == 0:
            return {}
        
        # 满意度分布
        rating_distribution = {}
        for f in self.feedback_data:
            rating = f["rating"]
            rating_distribution[rating] = rating_distribution.get(rating, 0) + 1
        
        return {
            "total_queries": total_queries,
            "satisfaction": self.calculate_satisfaction(),
            "rating_distribution": rating_distribution
        }

3. 自动化评估

LLM作为评估器

python
class LLMEvaluator:
    def __init__(self, model="gpt-4"):
        self.model = model
    
    def evaluate_answer(self, query, answer, context):
        """使用LLM评估回答质量"""
        prompt = f"""请评估以下回答的质量。

问题:{query}

上下文:{context}

回答:{answer}

请从以下维度评估(1-5分):
1. 准确性:回答是否准确
2. 完整性:回答是否完整
3. 相关性:回答是否与问题相关
4. 流畅性:回答是否流畅自然

请以JSON格式返回评分:
{{
    "accuracy": 分数,
    "completeness": 分数,
    "relevance": 分数,
    "fluency": 分数,
    "explanation": "简要说明"
}}"""
        
        response = openai.ChatCompletion.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        # 解析JSON响应
        import json
        try:
            result = json.loads(response.choices[0].message.content)
            return result
        except:
            return {"error": "无法解析评估结果"}
    
    def compare_answers(self, query, answer_a, answer_b):
        """比较两个回答"""
        prompt = f"""请比较以下两个回答,判断哪个更好。

问题:{query}

回答A:{answer_a}

回答B:{answer_b}

请选择A或B,并说明理由。"""
        
        response = openai.ChatCompletion.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        return response.choices[0].message.content

4. A/B测试

python
class ABTest:
    def __init__(self, system_a, system_b):
        self.system_a = system_a
        self.system_b = system_b
        self.results = {"a": [], "b": []}
    
    def run_test(self, queries, split_ratio=0.5):
        """运行A/B测试"""
        import random
        
        for query in queries:
            # 随机分配到A或B
            if random.random() < split_ratio:
                system = self.system_a
                label = "a"
            else:
                system = self.system_b
                label = "b"
            
            # 记录结果
            start_time = time.time()
            result = system.query(query)
            latency = time.time() - start_time
            
            self.results[label].append({
                "query": query,
                "answer": result["answer"],
                "latency": latency
            })
    
    def analyze_results(self):
        """分析A/B测试结果"""
        analysis = {}
        
        for label, data in self.results.items():
            if not data:
                continue
            
            latencies = [r["latency"] for r in data]
            analysis[label] = {
                "count": len(data),
                "avg_latency": sum(latencies) / len(latencies),
                "p95_latency": sorted(latencies)[int(len(latencies) * 0.95)]
            }
        
        return analysis

5. 评估报告

python
def generate_evaluation_report(evaluator, test_data):
    """生成评估报告"""
    
    # 运行评估
    retrieval_metrics = evaluator.evaluate_retrieval(test_data)
    generation_metrics = evaluator.evaluate_generation(test_data)
    
    report = f"""
# RAG系统评估报告

## 检索性能
- 精确率: {retrieval_metrics['precision']:.4f}
- 召回率: {retrieval_metrics['recall']:.4f}
- F1分数: {retrieval_metrics['f1']:.4f}

## 生成质量
- ROUGE-1: {generation_metrics['rouge-1']:.4f}
- ROUGE-2: {generation_metrics['rouge-2']:.4f}
- ROUGE-L: {generation_metrics['rouge-l']:.4f}

## 建议
"""
    
    # 根据指标给出建议
    if retrieval_metrics['recall'] < 0.7:
        report += "- 召回率较低,建议优化检索算法或增加检索数量\n"
    
    if generation_metrics['rouge-l'] < 0.5:
        report += "- 生成质量有待提升,建议优化提示模板或更换生成模型\n"
    
    return report

6. 持续监控

python
class ContinuousMonitor:
    def __init__(self, rag_system):
        self.rag = rag_system
        self.metrics_history = []
    
    def monitor_query(self, query, result):
        """监控单个查询"""
        metrics = {
            "timestamp": time.time(),
            "query_length": len(query),
            "answer_length": len(result["answer"]),
            "num_sources": len(result.get("sources", []))
        }
        self.metrics_history.append(metrics)
    
    def get_trends(self):
        """获取趋势分析"""
        if not self.metrics_history:
            return {}
        
        # 按时间分组
        from collections import defaultdict
        daily_metrics = defaultdict(list)
        
        for m in self.metrics_history:
            day = datetime.fromtimestamp(m["timestamp"]).strftime("%Y-%m-%d")
            daily_metrics[day].append(m)
        
        # 计算每日平均指标
        trends = {}
        for day, metrics in daily_metrics.items():
            trends[day] = {
                "avg_answer_length": sum(m["answer_length"] for m in metrics) / len(metrics),
                "avg_sources": sum(m["num_sources"] for m in metrics) / len(metrics),
                "query_count": len(metrics)
            }
        
        return trends