Skip to content

企业知识库

企业知识库是RAG技术的重要应用场景之一。通过构建基于RAG的企业知识库,可以实现企业内部知识的智能检索和问答,提高员工的工作效率和知识共享。本章节将详细介绍如何构建企业知识库系统。

1. 企业知识库概述

核心功能

  • 智能问答:回答员工关于企业政策、流程、产品等问题
  • 知识检索:快速检索企业内部文档和资料
  • 知识管理:管理和维护企业知识资产
  • 知识更新:及时更新企业知识库内容

应用场景

  • 新员工培训:帮助新员工快速了解企业政策和流程
  • 日常工作支持:为员工提供工作所需的知识和信息
  • 决策支持:为管理层提供决策所需的数据分析和市场信息
  • 客户支持:为客户服务团队提供产品和服务信息

2. 系统架构

整体架构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  用户界面   │────>│  应用层     │────>│  RAG系统    │
└─────────────┘     └─────────────┘     └─────────────┘


┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  管理界面   │<────│  管理系统   │<────│  知识库     │
└─────────────┘     └─────────────┘     └─────────────┘

技术栈选择

  • 前端:React、Vue、Angular等
  • 后端:Python、Node.js等
  • RAG框架:LangChain、LlamaIndex等
  • 向量数据库:Pinecone、Weaviate、Chroma等
  • LLM:OpenAI GPT、Claude、本地LLM等

3. 数据处理

数据来源

  • 文档:PDF、Word、Excel等格式的文档
  • 知识库:结构化的知识库系统
  • 邮件:企业内部邮件
  • 聊天记录:企业即时通讯记录
  • 数据库:企业数据库中的结构化数据

文档处理流程

python
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

class EnterpriseDocumentProcessor:
    def __init__(self, docs_path):
        self.docs_path = docs_path
    
    def load_documents(self):
        """加载企业文档"""
        loader = DirectoryLoader(
            self.docs_path,
            glob="**/*.{pdf,docx,txt,md}",
            show_progress=True
        )
        documents = loader.load()
        return documents
    
    def process_documents(self, documents):
        """处理文档"""
        # 文本分割
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", "。", ";", " "]
        )
        
        chunks = text_splitter.split_documents(documents)
        
        # 添加元数据
        for chunk in chunks:
            chunk.metadata.update({
                'department': self.extract_department(chunk.metadata['source']),
                'doc_type': self.extract_doc_type(chunk.metadata['source']),
                'upload_date': datetime.now().isoformat()
            })
        
        return chunks
    
    def extract_department(self, filepath):
        """从文件路径提取部门信息"""
        # 根据企业目录结构解析
        parts = filepath.split('/')
        if len(parts) > 1:
            return parts[1]  # 假设第二级目录是部门
        return "general"
    
    def extract_doc_type(self, filepath):
        """提取文档类型"""
        ext = filepath.split('.')[-1].lower()
        type_map = {
            'pdf': 'document',
            'docx': 'document',
            'txt': 'text',
            'md': 'markdown'
        }
        return type_map.get(ext, 'unknown')

4. 权限控制

基于角色的访问控制

python
from functools import wraps

class AccessControl:
    def __init__(self):
        self.role_permissions = {
            'admin': ['read', 'write', 'delete', 'manage'],
            'manager': ['read', 'write', 'manage'],
            'employee': ['read'],
            'guest': ['read_limited']
        }
        
        self.department_access = {
            'hr': ['hr', 'general'],
            'finance': ['finance', 'general'],
            'tech': ['tech', 'general'],
            'sales': ['sales', 'general']
        }
    
    def check_permission(self, user_role, action):
        """检查权限"""
        permissions = self.role_permissions.get(user_role, [])
        return action in permissions
    
    def check_department_access(self, user_dept, doc_dept):
        """检查部门访问权限"""
        accessible_depts = self.department_access.get(user_dept, ['general'])
        return doc_dept in accessible_depts
    
    def filter_by_permission(self, user, documents):
        """根据权限过滤文档"""
        filtered = []
        for doc in documents:
            doc_dept = doc.metadata.get('department', 'general')
            if self.check_department_access(user['department'], doc_dept):
                filtered.append(doc)
        return filtered

def require_permission(action):
    """权限装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            user = kwargs.get('user')
            if not user:
                raise PermissionError("未提供用户信息")
            
            ac = AccessControl()
            if not ac.check_permission(user['role'], action):
                raise PermissionError(f"用户没有{action}权限")
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

5. 知识库实现

python
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Pinecone
import pinecone

class EnterpriseKnowledgeBase:
    def __init__(self, api_key, environment):
        # 初始化Pinecone
        pinecone.init(api_key=api_key, environment=environment)
        
        # 初始化嵌入模型
        self.embeddings = OpenAIEmbeddings()
        
        # 创建或加载索引
        self.index_name = "enterprise-kb"
        if self.index_name not in pinecone.list_indexes():
            pinecone.create_index(
                name=self.index_name,
                dimension=1536,
                metric="cosine"
            )
        
        self.vectorstore = Pinecone(
            index=pinecone.Index(self.index_name),
            embedding=self.embeddings,
            text_key="text"
        )
        
        self.access_control = AccessControl()
    
    def add_documents(self, documents, user):
        """添加文档到知识库"""
        # 检查权限
        if not self.access_control.check_permission(user['role'], 'write'):
            raise PermissionError("没有写入权限")
        
        # 处理文档
        processor = EnterpriseDocumentProcessor("")
        chunks = processor.process_documents(documents)
        
        # 添加到向量存储
        self.vectorstore.add_documents(chunks)
        
        return len(chunks)
    
    def search(self, query, user, k=5):
        """搜索知识库"""
        # 执行检索
        results = self.vectorstore.similarity_search(query, k=k*2)
        
        # 根据权限过滤
        filtered_results = self.access_control.filter_by_permission(user, results)
        
        return filtered_results[:k]
    
    def query(self, question, user):
        """问答查询"""
        from langchain.chains import RetrievalQA
        from langchain.llms import OpenAI
        
        # 创建带权限控制的检索器
        retriever = self.vectorstore.as_retriever(
            search_kwargs={"k": 5}
        )
        
        # 创建RAG链
        qa_chain = RetrievalQA.from_chain_type(
            llm=OpenAI(temperature=0),
            chain_type="stuff",
            retriever=retriever
        )
        
        # 执行查询
        result = qa_chain({"query": question})
        
        return result

6. 用户界面

Web界面示例

python
from flask import Flask, request, jsonify, session
from functools import wraps

app = Flask(__name__)
kb = EnterpriseKnowledgeBase(api_key="...", environment="...")

def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'user_id' not in session:
            return jsonify({"error": "请先登录"}), 401
        return f(*args, **kwargs)
    return decorated_function

@app.route('/api/search', methods=['POST'])
@login_required
def search():
    """搜索接口"""
    data = request.json
    query = data.get('query')
    user = session.get('user')
    
    try:
        results = kb.search(query, user)
        return jsonify({
            "results": [
                {
                    "content": r.page_content[:200],
                    "source": r.metadata.get('source'),
                    "department": r.metadata.get('department')
                }
                for r in results
            ]
        })
    except PermissionError as e:
        return jsonify({"error": str(e)}), 403

@app.route('/api/ask', methods=['POST'])
@login_required
def ask():
    """问答接口"""
    data = request.json
    question = data.get('question')
    user = session.get('user')
    
    try:
        result = kb.query(question, user)
        return jsonify({
            "answer": result['result'],
            "sources": [
                doc.metadata.get('source')
                for doc in result.get('source_documents', [])
            ]
        })
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/documents', methods=['POST'])
@login_required
@require_permission('write')
def upload_document():
    """上传文档接口"""
    # 处理文件上传
    # ...
    pass

7. 知识更新与维护

自动更新机制

python
import schedule
import time
from datetime import datetime, timedelta

class KnowledgeBaseMaintenance:
    def __init__(self, kb):
        self.kb = kb
        self.update_log = []
    
    def scan_new_documents(self, docs_path):
        """扫描新文档"""
        new_docs = []
        for root, dirs, files in os.walk(docs_path):
            for file in files:
                filepath = os.path.join(root, file)
                # 检查是否已索引
                if not self.is_indexed(filepath):
                    new_docs.append(filepath)
        return new_docs
    
    def is_indexed(self, filepath):
        """检查文档是否已索引"""
        # 实现索引状态检查逻辑
        pass
    
    def update_index(self):
        """更新索引"""
        print(f"[{datetime.now()}] 开始更新索引...")
        
        # 扫描新文档
        new_docs = self.scan_new_documents("/path/to/docs")
        
        if new_docs:
            # 处理并添加新文档
            processor = EnterpriseDocumentProcessor("")
            documents = []
            for doc_path in new_docs:
                # 加载文档
                pass
            
            # 添加到知识库
            # self.kb.add_documents(documents, admin_user)
            
            self.update_log.append({
                'timestamp': datetime.now(),
                'added_documents': len(new_docs)
            })
        
        print(f"[{datetime.now()}] 索引更新完成")
    
    def schedule_updates(self):
        """定时更新"""
        # 每天凌晨2点更新
        schedule.every().day.at("02:00").do(self.update_index)
        
        while True:
            schedule.run_pending()
            time.sleep(60)

8. 监控与分析

python
class KnowledgeBaseAnalytics:
    def __init__(self, kb):
        self.kb = kb
        self.query_log = []
    
    def log_query(self, user, query, results, response_time):
        """记录查询日志"""
        self.query_log.append({
            'timestamp': datetime.now(),
            'user': user['id'],
            'department': user['department'],
            'query': query,
            'results_count': len(results),
            'response_time': response_time
        })
    
    def get_popular_queries(self, days=7):
        """获取热门查询"""
        from collections import Counter
        
        cutoff = datetime.now() - timedelta(days=days)
        recent_queries = [
            log['query'] for log in self.query_log
            if log['timestamp'] > cutoff
        ]
        
        return Counter(recent_queries).most_common(10)
    
    def get_department_usage(self, days=30):
        """获取部门使用情况"""
        from collections import defaultdict
        
        cutoff = datetime.now() - timedelta(days=days)
        dept_usage = defaultdict(int)
        
        for log in self.query_log:
            if log['timestamp'] > cutoff:
                dept_usage[log['department']] += 1
        
        return dict(dept_usage)
    
    def identify_knowledge_gaps(self):
        """识别知识缺口"""
        # 分析查询但未找到结果的查询
        failed_queries = [
            log['query'] for log in self.query_log
            if log['results_count'] == 0
        ]
        
        # 聚类分析找出常见主题
        # ...
        
        return failed_queries