Appearance
企业知识库
企业知识库是RAG技术的重要应用场景之一。通过构建基于RAG的企业知识库,可以实现企业内部知识的智能检索和问答,提高员工的工作效率和知识共享。本章节将详细介绍如何构建企业知识库系统。
1. 企业知识库概述
核心功能
- 智能问答:回答员工关于企业政策、流程、产品等问题
- 知识检索:快速检索企业内部文档和资料
- 知识管理:管理和维护企业知识资产
- 知识更新:及时更新企业知识库内容
应用场景
- 新员工培训:帮助新员工快速了解企业政策和流程
- 日常工作支持:为员工提供工作所需的知识和信息
- 决策支持:为管理层提供决策所需的数据分析和市场信息
- 客户支持:为客户服务团队提供产品和服务信息
2. 系统架构
整体架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 用户界面 │────>│ 应用层 │────>│ RAG系统 │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 管理界面 │<────│ 管理系统 │<────│ 知识库 │
└─────────────┘ └─────────────┘ └─────────────┘技术栈选择
- 前端:React、Vue、Angular等
- 后端:Python、Node.js等
- RAG框架:LangChain、LlamaIndex等
- 向量数据库:Pinecone、Weaviate、Chroma等
- LLM:OpenAI GPT、Claude、本地LLM等
3. 数据处理
数据来源
- 文档:PDF、Word、Excel等格式的文档
- 知识库:结构化的知识库系统
- 邮件:企业内部邮件
- 聊天记录:企业即时通讯记录
- 数据库:企业数据库中的结构化数据
文档处理流程
python
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
class EnterpriseDocumentProcessor:
def __init__(self, docs_path):
self.docs_path = docs_path
def load_documents(self):
"""加载企业文档"""
loader = DirectoryLoader(
self.docs_path,
glob="**/*.{pdf,docx,txt,md}",
show_progress=True
)
documents = loader.load()
return documents
def process_documents(self, documents):
"""处理文档"""
# 文本分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", "。", ";", " "]
)
chunks = text_splitter.split_documents(documents)
# 添加元数据
for chunk in chunks:
chunk.metadata.update({
'department': self.extract_department(chunk.metadata['source']),
'doc_type': self.extract_doc_type(chunk.metadata['source']),
'upload_date': datetime.now().isoformat()
})
return chunks
def extract_department(self, filepath):
"""从文件路径提取部门信息"""
# 根据企业目录结构解析
parts = filepath.split('/')
if len(parts) > 1:
return parts[1] # 假设第二级目录是部门
return "general"
def extract_doc_type(self, filepath):
"""提取文档类型"""
ext = filepath.split('.')[-1].lower()
type_map = {
'pdf': 'document',
'docx': 'document',
'txt': 'text',
'md': 'markdown'
}
return type_map.get(ext, 'unknown')4. 权限控制
基于角色的访问控制
python
from functools import wraps
class AccessControl:
def __init__(self):
self.role_permissions = {
'admin': ['read', 'write', 'delete', 'manage'],
'manager': ['read', 'write', 'manage'],
'employee': ['read'],
'guest': ['read_limited']
}
self.department_access = {
'hr': ['hr', 'general'],
'finance': ['finance', 'general'],
'tech': ['tech', 'general'],
'sales': ['sales', 'general']
}
def check_permission(self, user_role, action):
"""检查权限"""
permissions = self.role_permissions.get(user_role, [])
return action in permissions
def check_department_access(self, user_dept, doc_dept):
"""检查部门访问权限"""
accessible_depts = self.department_access.get(user_dept, ['general'])
return doc_dept in accessible_depts
def filter_by_permission(self, user, documents):
"""根据权限过滤文档"""
filtered = []
for doc in documents:
doc_dept = doc.metadata.get('department', 'general')
if self.check_department_access(user['department'], doc_dept):
filtered.append(doc)
return filtered
def require_permission(action):
"""权限装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
user = kwargs.get('user')
if not user:
raise PermissionError("未提供用户信息")
ac = AccessControl()
if not ac.check_permission(user['role'], action):
raise PermissionError(f"用户没有{action}权限")
return func(*args, **kwargs)
return wrapper
return decorator5. 知识库实现
python
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Pinecone
import pinecone
class EnterpriseKnowledgeBase:
def __init__(self, api_key, environment):
# 初始化Pinecone
pinecone.init(api_key=api_key, environment=environment)
# 初始化嵌入模型
self.embeddings = OpenAIEmbeddings()
# 创建或加载索引
self.index_name = "enterprise-kb"
if self.index_name not in pinecone.list_indexes():
pinecone.create_index(
name=self.index_name,
dimension=1536,
metric="cosine"
)
self.vectorstore = Pinecone(
index=pinecone.Index(self.index_name),
embedding=self.embeddings,
text_key="text"
)
self.access_control = AccessControl()
def add_documents(self, documents, user):
"""添加文档到知识库"""
# 检查权限
if not self.access_control.check_permission(user['role'], 'write'):
raise PermissionError("没有写入权限")
# 处理文档
processor = EnterpriseDocumentProcessor("")
chunks = processor.process_documents(documents)
# 添加到向量存储
self.vectorstore.add_documents(chunks)
return len(chunks)
def search(self, query, user, k=5):
"""搜索知识库"""
# 执行检索
results = self.vectorstore.similarity_search(query, k=k*2)
# 根据权限过滤
filtered_results = self.access_control.filter_by_permission(user, results)
return filtered_results[:k]
def query(self, question, user):
"""问答查询"""
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
# 创建带权限控制的检索器
retriever = self.vectorstore.as_retriever(
search_kwargs={"k": 5}
)
# 创建RAG链
qa_chain = RetrievalQA.from_chain_type(
llm=OpenAI(temperature=0),
chain_type="stuff",
retriever=retriever
)
# 执行查询
result = qa_chain({"query": question})
return result6. 用户界面
Web界面示例
python
from flask import Flask, request, jsonify, session
from functools import wraps
app = Flask(__name__)
kb = EnterpriseKnowledgeBase(api_key="...", environment="...")
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
return jsonify({"error": "请先登录"}), 401
return f(*args, **kwargs)
return decorated_function
@app.route('/api/search', methods=['POST'])
@login_required
def search():
"""搜索接口"""
data = request.json
query = data.get('query')
user = session.get('user')
try:
results = kb.search(query, user)
return jsonify({
"results": [
{
"content": r.page_content[:200],
"source": r.metadata.get('source'),
"department": r.metadata.get('department')
}
for r in results
]
})
except PermissionError as e:
return jsonify({"error": str(e)}), 403
@app.route('/api/ask', methods=['POST'])
@login_required
def ask():
"""问答接口"""
data = request.json
question = data.get('question')
user = session.get('user')
try:
result = kb.query(question, user)
return jsonify({
"answer": result['result'],
"sources": [
doc.metadata.get('source')
for doc in result.get('source_documents', [])
]
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/documents', methods=['POST'])
@login_required
@require_permission('write')
def upload_document():
"""上传文档接口"""
# 处理文件上传
# ...
pass7. 知识更新与维护
自动更新机制
python
import schedule
import time
from datetime import datetime, timedelta
class KnowledgeBaseMaintenance:
def __init__(self, kb):
self.kb = kb
self.update_log = []
def scan_new_documents(self, docs_path):
"""扫描新文档"""
new_docs = []
for root, dirs, files in os.walk(docs_path):
for file in files:
filepath = os.path.join(root, file)
# 检查是否已索引
if not self.is_indexed(filepath):
new_docs.append(filepath)
return new_docs
def is_indexed(self, filepath):
"""检查文档是否已索引"""
# 实现索引状态检查逻辑
pass
def update_index(self):
"""更新索引"""
print(f"[{datetime.now()}] 开始更新索引...")
# 扫描新文档
new_docs = self.scan_new_documents("/path/to/docs")
if new_docs:
# 处理并添加新文档
processor = EnterpriseDocumentProcessor("")
documents = []
for doc_path in new_docs:
# 加载文档
pass
# 添加到知识库
# self.kb.add_documents(documents, admin_user)
self.update_log.append({
'timestamp': datetime.now(),
'added_documents': len(new_docs)
})
print(f"[{datetime.now()}] 索引更新完成")
def schedule_updates(self):
"""定时更新"""
# 每天凌晨2点更新
schedule.every().day.at("02:00").do(self.update_index)
while True:
schedule.run_pending()
time.sleep(60)8. 监控与分析
python
class KnowledgeBaseAnalytics:
def __init__(self, kb):
self.kb = kb
self.query_log = []
def log_query(self, user, query, results, response_time):
"""记录查询日志"""
self.query_log.append({
'timestamp': datetime.now(),
'user': user['id'],
'department': user['department'],
'query': query,
'results_count': len(results),
'response_time': response_time
})
def get_popular_queries(self, days=7):
"""获取热门查询"""
from collections import Counter
cutoff = datetime.now() - timedelta(days=days)
recent_queries = [
log['query'] for log in self.query_log
if log['timestamp'] > cutoff
]
return Counter(recent_queries).most_common(10)
def get_department_usage(self, days=30):
"""获取部门使用情况"""
from collections import defaultdict
cutoff = datetime.now() - timedelta(days=days)
dept_usage = defaultdict(int)
for log in self.query_log:
if log['timestamp'] > cutoff:
dept_usage[log['department']] += 1
return dict(dept_usage)
def identify_knowledge_gaps(self):
"""识别知识缺口"""
# 分析查询但未找到结果的查询
failed_queries = [
log['query'] for log in self.query_log
if log['results_count'] == 0
]
# 聚类分析找出常见主题
# ...
return failed_queries