Skip to content

智能客服

智能客服是RAG技术的另一个重要应用场景。通过构建基于RAG的智能客服系统,可以实现自动化的客户服务,提高客户满意度和服务效率。本章节将详细介绍如何构建智能客服系统。

1. 智能客服概述

核心功能

  • 自动问答:自动回答客户的常见问题
  • 多轮对话:支持多轮对话,理解上下文
  • 知识检索:快速检索产品和服务信息
  • 个性化服务:根据客户历史提供个性化服务
  • 情绪识别:识别客户情绪,提供相应的服务

应用场景

  • 在线客服:网站和应用的在线客服
  • 电话客服:电话自动语音服务
  • 邮件客服:自动回复客户邮件
  • 社交媒体客服:社交媒体平台的自动回复

2. 系统架构

整体架构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  用户界面   │────>│  对话管理   │────>│  RAG系统    │
└─────────────┘     └─────────────┘     └─────────────┘


┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  监控系统   │<────│  分析系统   │<────│  知识库     │
└─────────────┘     └─────────────┘     └─────────────┘

技术栈选择

  • 前端:React、Vue、Angular等
  • 后端:Python、Node.js等
  • 对话管理:Rasa、Dialogflow等
  • RAG框架:LangChain、LlamaIndex等
  • 向量数据库:Pinecone、Weaviate、Chroma等
  • LLM:OpenAI GPT、Claude、本地LLM等

3. 数据处理

数据来源

  • 产品文档:产品说明书、使用手册等
  • FAQ:常见问题解答
  • 历史对话:客服历史对话记录
  • 知识库:企业知识库
  • 工单数据:历史工单和解决方案

知识准备

python
from langchain.document_loaders import JSONLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

class CustomerServiceDataProcessor:
    def __init__(self):
        self.faq_data = []
        self.product_docs = []
    
    def load_faq(self, faq_file):
        """加载FAQ数据"""
        import json
        with open(faq_file, 'r', encoding='utf-8') as f:
            self.faq_data = json.load(f)
        
        # 转换为文档格式
        documents = []
        for item in self.faq_data:
            doc = Document(
                page_content=f"Q: {item['question']}\nA: {item['answer']}",
                metadata={
                    'type': 'faq',
                    'category': item.get('category', 'general'),
                    'tags': item.get('tags', [])
                }
            )
            documents.append(doc)
        
        return documents
    
    def load_product_docs(self, docs_path):
        """加载产品文档"""
        loader = DirectoryLoader(
            docs_path,
            glob="**/*.{pdf,docx,txt}",
            show_progress=True
        )
        documents = loader.load()
        
        # 添加元数据
        for doc in documents:
            doc.metadata.update({
                'type': 'product_doc',
                'product': self.extract_product_name(doc.metadata['source'])
            })
        
        return documents
    
    def extract_product_name(self, filepath):
        """从产品文档路径提取产品名称"""
        # 根据目录结构解析
        parts = filepath.split('/')
        if len(parts) > 2:
            return parts[2]
        return "unknown"
    
    def process_conversation_history(self, history_file):
        """处理历史对话数据"""
        # 从历史对话中提取问答对
        qa_pairs = []
        
        # 解析对话数据
        # ...
        
        return qa_pairs

4. 对话管理

对话状态跟踪

python
class ConversationManager:
    def __init__(self):
        self.sessions = {}
    
    def create_session(self, user_id):
        """创建新会话"""
        session = {
            'user_id': user_id,
            'history': [],
            'context': {},
            'start_time': datetime.now(),
            'last_activity': datetime.now()
        }
        self.sessions[user_id] = session
        return session
    
    def get_session(self, user_id):
        """获取会话"""
        if user_id not in self.sessions:
            return self.create_session(user_id)
        
        session = self.sessions[user_id]
        session['last_activity'] = datetime.now()
        return session
    
    def add_message(self, user_id, role, content):
        """添加消息到会话历史"""
        session = self.get_session(user_id)
        session['history'].append({
            'role': role,
            'content': content,
            'timestamp': datetime.now()
        })
    
    def get_context(self, user_id, max_history=5):
        """获取对话上下文"""
        session = self.get_session(user_id)
        return session['history'][-max_history:]
    
    def update_context(self, user_id, key, value):
        """更新上下文信息"""
        session = self.get_session(user_id)
        session['context'][key] = value
    
    def clear_session(self, user_id):
        """清除会话"""
        if user_id in self.sessions:
            del self.sessions[user_id]

意图识别

python
class IntentClassifier:
    def __init__(self):
        self.intents = {
            'product_inquiry': ['产品', '功能', '特性', '多少钱'],
            'technical_support': ['故障', '错误', '无法', '问题'],
            'order_status': ['订单', '物流', '发货', '快递'],
            'refund_request': ['退款', '退货', '取消'],
            'complaint': ['投诉', '不满', '差评', '服务差']
        }
    
    def classify(self, text):
        """识别用户意图"""
        scores = {}
        for intent, keywords in self.intents.items():
            score = sum(1 for kw in keywords if kw in text)
            scores[intent] = score
        
        # 返回得分最高的意图
        if max(scores.values()) > 0:
            return max(scores, key=scores.get)
        return 'general'
    
    def get_response_strategy(self, intent):
        """根据意图获取响应策略"""
        strategies = {
            'product_inquiry': 'detailed_info',
            'technical_support': 'step_by_step_guide',
            'order_status': 'quick_lookup',
            'refund_request': 'escalate_to_human',
            'complaint': 'empathetic_response'
        }
        return strategies.get(intent, 'general')

5. RAG客服系统实现

python
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.chains import ConversationalRetrievalChain
from langchain.llms import OpenAI

class CustomerServiceRAG:
    def __init__(self):
        # 初始化嵌入模型
        self.embeddings = OpenAIEmbeddings()
        
        # 加载向量存储
        self.vectorstore = Chroma(
            persist_directory="./cs_kb",
            embedding_function=self.embeddings
        )
        
        # 创建对话检索链
        self.qa_chain = ConversationalRetrievalChain.from_llm(
            llm=OpenAI(temperature=0),
            retriever=self.vectorstore.as_retriever(search_kwargs={"k": 3}),
            return_source_documents=True
        )
        
        # 对话管理
        self.conversation_manager = ConversationManager()
        self.intent_classifier = IntentClassifier()
    
    def handle_message(self, user_id, message):
        """处理用户消息"""
        # 获取会话
        session = self.conversation_manager.get_session(user_id)
        
        # 识别意图
        intent = self.intent_classifier.classify(message)
        
        # 根据意图选择处理策略
        if intent == 'complaint':
            return self.handle_complaint(user_id, message)
        elif intent == 'refund_request':
            return self.handle_refund_request(user_id, message)
        
        # 常规RAG处理
        response = self.generate_response(user_id, message)
        
        # 记录对话
        self.conversation_manager.add_message(user_id, 'user', message)
        self.conversation_manager.add_message(user_id, 'assistant', response['answer'])
        
        return response
    
    def generate_response(self, user_id, message):
        """生成回复"""
        # 获取对话历史
        chat_history = self.conversation_manager.get_context(user_id)
        
        # 格式化历史
        formatted_history = [
            (msg['content'] if msg['role'] == 'user' else msg['content'])
            for msg in chat_history[:-1]  # 排除当前消息
        ]
        
        # 执行RAG
        result = self.qa_chain({
            "question": message,
            "chat_history": formatted_history
        })
        
        return {
            "answer": result["answer"],
            "sources": [doc.metadata for doc in result.get("source_documents", [])]
        }
    
    def handle_complaint(self, user_id, message):
        """处理投诉"""
        # 表达同理心
        empathetic_response = "我理解您的不满,非常抱歉给您带来了不好的体验。"
        
        # 尝试解决问题
        solution = self.generate_response(user_id, message)
        
        # 提供转人工选项
        full_response = f"""{empathetic_response}

{solution['answer']}

如果问题仍未解决,我可以为您转接人工客服,请问您需要吗?"""
        
        return {
            "answer": full_response,
            "escalate": True,
            "sources": solution.get('sources', [])
        }
    
    def handle_refund_request(self, user_id, message):
        """处理退款请求"""
        # 检查订单信息
        order_info = self.check_order_info(user_id, message)
        
        if order_info:
            # 根据订单状态提供退款指引
            response = self.generate_refund_guidance(order_info)
        else:
            response = "请提供您的订单号,以便我帮您查询退款相关信息。"
        
        return {
            "answer": response,
            "requires_order_info": not order_info
        }
    
    def check_order_info(self, user_id, message):
        """检查订单信息"""
        # 从消息中提取订单号
        import re
        order_pattern = r'订单[号]?[::]?\s*(\d+)'
        match = re.search(order_pattern, message)
        
        if match:
            order_id = match.group(1)
            # 查询订单系统
            # return order_system.lookup(order_id)
            return {"order_id": order_id, "status": "shipped"}
        
        return None

6. 情绪识别

python
class EmotionDetector:
    def __init__(self):
        # 可以使用预训练的情绪识别模型
        self.emotion_keywords = {
            'angry': ['生气', '愤怒', '糟糕', '垃圾', '投诉'],
            'frustrated': ['郁闷', '无奈', '失望', '不满'],
            'anxious': ['着急', '担心', '焦虑', '快点'],
            'satisfied': ['谢谢', '感谢', '满意', '不错']
        }
    
    def detect(self, text):
        """检测情绪"""
        emotions = {}
        for emotion, keywords in self.emotion_keywords.items():
            score = sum(1 for kw in keywords if kw in text)
            emotions[emotion] = score
        
        # 返回主要情绪
        if max(emotions.values()) > 0:
            return max(emotions, key=emotions.get)
        return 'neutral'
    
    def adjust_response(self, emotion, base_response):
        """根据情绪调整回复"""
        adjustments = {
            'angry': '非常抱歉给您带来不好的体验。',
            'frustrated': '我理解您的困扰,让我来帮您解决。',
            'anxious': '请放心,我会尽快帮您处理。',
            'satisfied': '很高兴能帮到您!'
        }
        
        prefix = adjustments.get(emotion, '')
        if prefix:
            return f"{prefix}\n\n{base_response}"
        return base_response

7. 质量监控

python
class ServiceQualityMonitor:
    def __init__(self):
        self.conversations = []
        self.metrics = {
            'total_conversations': 0,
            'avg_response_time': 0,
            'satisfaction_rate': 0,
            'escalation_rate': 0
        }
    
    def log_conversation(self, user_id, messages, response_time):
        """记录对话"""
        self.conversations.append({
            'user_id': user_id,
            'messages': messages,
            'response_time': response_time,
            'timestamp': datetime.now()
        })
        self.metrics['total_conversations'] += 1
    
    def calculate_metrics(self):
        """计算指标"""
        if not self.conversations:
            return self.metrics
        
        response_times = [c['response_time'] for c in self.conversations]
        self.metrics['avg_response_time'] = sum(response_times) / len(response_times)
        
        return self.metrics
    
    def detect_issues(self):
        """检测问题"""
        issues = []
        
        for conv in self.conversations:
            # 检测长时间对话(可能需要人工介入)
            if len(conv['messages']) > 10:
                issues.append({
                    'type': 'long_conversation',
                    'user_id': conv['user_id'],
                    'message_count': len(conv['messages'])
                })
            
            # 检测多次重复问题
            user_messages = [m['content'] for m in conv['messages'] if m['role'] == 'user']
            if len(set(user_messages)) < len(user_messages) * 0.5:
                issues.append({
                    'type': 'repetitive_questions',
                    'user_id': conv['user_id']
                })
        
        return issues

8. 人工接管

python
class HumanHandoff:
    def __init__(self):
        self.handoff_queue = []
        self.agents = {}
    
    def request_handoff(self, user_id, reason, context):
        """请求人工接管"""
        handoff_request = {
            'user_id': user_id,
            'reason': reason,
            'context': context,
            'timestamp': datetime.now(),
            'status': 'pending'
        }
        
        self.handoff_queue.append(handoff_request)
        
        # 通知客服
        self.notify_agents(handoff_request)
        
        return handoff_request
    
    def assign_agent(self, request_id, agent_id):
        """分配客服"""
        for request in self.handoff_queue:
            if request.get('id') == request_id:
                request['status'] = 'assigned'
                request['agent_id'] = agent_id
                self.agents[agent_id] = request['user_id']
                return True
        return False
    
    def should_handoff(self, message, conversation_history):
        """判断是否需要人工接管"""
        # 检查关键词
        escalation_keywords = ['人工', '客服', '经理', '投诉']
        if any(kw in message for kw in escalation_keywords):
            return True, 'user_request'
        
        # 检查对话轮数
        if len(conversation_history) > 10:
            return True, 'too_many_rounds'
        
        # 检查情绪
        emotion_detector = EmotionDetector()
        emotion = emotion_detector.detect(message)
        if emotion == 'angry':
            return True, 'negative_emotion'
        
        return False, None