第19章:监控和日志¶
构建完整的可观测性系统,实时掌握RAG系统运行状态
📚 章节概述¶
本章将学习如何为RAG系统构建完整的监控和日志系统,实现端到端的可观测性。
学习目标¶
完成本章后,你将能够: - ✅ 理解可观测性的三大支柱 - ✅ 配置Prometheus指标采集 - ✅ 创建Grafana可视化仪表盘 - ✅ 搭建ELK日志聚合系统 - ✅ 实施分布式追踪 - ✅ 配置智能告警
预计时间¶
- 理论学习:60分钟
- 实践操作:90-120分钟
- 总计:约3-4小时
1. 可观测性基础¶
1.1 三大支柱¶
可观测性 = Metrics + Logs + Traces
Metrics(指标):
→ WHAT发生了什么
→ 数值化的可聚合数据
→ 例如:QPS、延迟、错误率
Logs(日志):
→ WHY为什么发生
→ 离散的事件记录
→ 例如:错误消息、调试信息
Traces(追踪):
→ WHERE在哪里发生
→ 请求的完整路径
→ 例如:微服务调用链
1.2 监控层次¶
1.3 监控指标类型¶
RED Method: - **R**ate(速率):每秒请求数 - **E**rrors(错误):错误率 - **D**uration(延迟):响应时间
USE Method: - **U**tilization(利用率):资源使用百分比 - **S**aturation(饱和度):资源压力程度 - **E**rrors(错误):错误计数
2. Prometheus监控¶
2.1 Prometheus架构¶
┌──────────────┐
│ Services │ (RAG API, PostgreSQL等)
└──────┬───────┘
│ /metrics
↓
┌──────────────┐
│ Prometheus │ (采集和存储)
└──────┬───────┘
│
↓
┌──────────────┐
│ Grafana │ (可视化)
└──────────────┘
│
↓
┌──────────────┐
│ Alertmanager│ (告警)
└──────────────┘
2.2 集成Prometheus到RAG API¶
安装依赖:
FastAPI集成:
from fastapi import FastAPI
from prometheus_fastapi_instrumentator import Instrumentator
app = FastAPI()
# 启用Prometheus指标
Instrumentator().instrument(app).expose(app, endpoint="/metrics")
@app.get("/query")
async def query_endpoint(text: str):
# 你的RAG逻辑
return {"result": "answer"}
自定义指标:
from prometheus_client import Counter, Histogram, Gauge
# 定义指标
query_counter = Counter(
'rag_queries_total',
'Total number of RAG queries',
['method', 'status']
)
query_duration = Histogram(
'rag_query_duration_seconds',
'RAG query duration',
['method']
)
active_queries = Gauge(
'rag_active_queries',
'Number of active queries'
)
# 在应用中使用
@app.post("/query")
async def query(text: str):
active_queries.inc()
with query_duration.labels(method='post').time():
try:
result = await rag_query(text)
query_counter.labels(method='post', status='success').inc()
return result
except Exception as e:
query_counter.labels(method='post', status='error').inc()
raise
finally:
active_queries.dec()
2.3 K8s ServiceMonitor配置¶
# servicemonitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: rag-api-monitor
namespace: rag-system
labels:
app: rag-api
spec:
selector:
matchLabels:
app: rag-api
endpoints:
- port: http
path: /metrics
interval: 15s
scrapeTimeout: 10s
Service配置:
apiVersion: v1
kind: Service
metadata:
name: rag-api
namespace: rag-system
labels:
app: rag-api
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8000"
prometheus.io/path: "/metrics"
spec:
selector:
matchLabels:
app: rag-api
ports:
- name: http
port: 8000
targetPort: 8000
3. Grafana可视化¶
3.1 数据源配置¶
{
"name": "Prometheus",
"type": "prometheus",
"url": "http://prometheus-service:9090",
"access": "proxy",
"isDefault": true
}
3.2 核心仪表盘¶
RAG系统概览仪表盘:
{
"dashboard": {
"title": "RAG System Overview",
"panels": [
{
"title": "Query Rate (QPS)",
"type": "graph",
"targets": [
{
"expr": "rate(rag_queries_total[5m])"
}
]
},
{
"title": "Query Duration (p95)",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m]))"
}
]
},
{
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(rag_queries_total{status=\"error\"}[5m]) / rate(rag_queries_total[5m])"
}
]
},
{
"title": "Active Queries",
"type": "graph",
"targets": [
{
"expr": "rag_active_queries"
}
]
}
]
}
}
3.3 重要PromQL查询¶
系统指标:
# CPU使用率
rate(process_cpu_seconds_total[5m])
# 内存使用
process_resident_memory_bytes
# 请求速率
rate(http_requests_total[5m])
# 延迟(p95)
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))
# 错误率
rate(http_requests_total{status=~"5.."}[5m])
RAG业务指标:
# 总查询数
rag_queries_total
# 查询成功率
rate(rag_queries_total{status="success"}[5m]) / rate(rag_queries_total[5m])
# 平均检索文档数
rate(rag_retrieved_documents_total[5m]) / rate(rag_queries_total[5m])
# LLM Token使用率
rate(rag_llm_tokens_total[5m])
# 向量检索延迟
histogram_quantile(0.95, rate(rag_vector_search_duration_seconds_bucket[5m]))
4. 日志系统(ELK)¶
4.1 ELK架构¶
┌──────────┐
│ Filebeat│ (日志收集)
└─────┬────┘
│
↓
┌──────────┐
│ Logstash │ (日志处理)
└─────┬────┘
│
↓
┌──────────┐
│Elasticsearch│ (日志存储)
└─────┬────┘
│
↓
┌──────────┐
│ Kibana │ (日志可视化)
└──────────┘
4.2 日志格式化¶
结构化日志(JSON):
import json
import logging
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# 添加自定义字段
if hasattr(record, 'query_id'):
log_data['query_id'] = record.query_id
if hasattr(record, 'user_id'):
log_data['user_id'] = record.user_id
# 添加异常信息
if record.exc_info:
log_data['exception'] = self.formatException(record.exc_info)
return json.dumps(log_data)
# 配置日志
logger = logging.getLogger("rag_api")
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
使用示例:
@app.post("/query")
async def query(text: str, user_id: str = None):
query_id = str(uuid.uuid4())
# 添加上下文
logger.info("Query received", extra={
'query_id': query_id,
'user_id': user_id,
'query_length': len(text)
})
try:
result = await process_query(text)
logger.info("Query completed", extra={
'query_id': query_id,
'documents_retrieved': len(result['documents']),
'response_length': len(result['answer'])
})
return result
except Exception as e:
logger.error("Query failed", extra={
'query_id': query_id,
'error': str(e)
}, exc_info=True)
raise
4.3 Filebeat配置¶
# filebeat.yml
filebeat.inputs:
- type: container
paths:
- /var/log/containers/rag-api-*.log
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
- drop_event:
when:
equals:
kubernetes.container.name: "filebeat"
output.elasticsearch:
hosts: ["elasticsearch-service:9200"]
indices:
- index: "rag-api-%{+yyyy.MM.dd}"
when.equals:
kubernetes.labels.app: "rag-api"
setup.kibana:
host: "kibana-service:5601"
logging.level: info
5. 分布式追踪¶
5.1 OpenTelemetry集成¶
安装:
pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-fastapi
pip install opentelemetry-exporter-otlp
配置:
from fastapi import FastAPI
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
# 配置追踪
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
otlp_exporter = OTLPSpanExporter(endpoint="http://jaeger-service:4317")
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
# 自定义span
@app.post("/query")
async def query(text: str):
with tracer.start_as_current_span("rag_query") as span:
span.set_attribute("query.length", len(text))
with tracer.start_as_current_span("vector_search"):
documents = await vector_search(text)
with tracer.start_as_current_span("llm_generation"):
answer = await generate_answer(text, documents)
return {"answer": answer, "documents": documents}
5.2 Jaeger部署¶
# jaeger-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: jaeger
namespace: rag-system
spec:
replicas: 1
selector:
matchLabels:
app: jaeger
template:
metadata:
labels:
app: jaeger
spec:
containers:
- name: jaeger
image: jaegertracing/all-in-one:latest
ports:
- containerPort: 5775 # accept zipkin.thrift over compact thrift protocol
- containerPort: 6831 # accept jaeger.thrift over compact thrift protocol
- containerPort: 6832 # accept jaeger.thrift over binary thrift protocol
- containerPort: 5778 # serve configs
- containerPort: 16686 # serve frontend
- containerPort: 14268 # accept jaeger.thrift directly from clients
- containerPort: 14250 # accept model.proto
- containerPort: 9411 # Zipkin compatible endpoint
- containerPort: 4317 # OTLP gRPC receiver
- containerPort: 4318 # OTLP HTTP receiver
env:
- name: COLLECTOR_OTLP_ENABLED
value: "true"
---
apiVersion: v1
kind: Service
metadata:
name: jaeger-service
namespace: rag-system
spec:
selector:
app: jaeger
ports:
- name: ui
port: 16686
targetPort: 16686
- name: otlp-grpc
port: 4317
targetPort: 4317
type: LoadBalancer
6. 告警配置¶
6.1 Alertmanager规则¶
# alerting_rules.yaml
groups:
- name: rag_api_alerts
interval: 30s
rules:
# 高错误率告警
- alert: HighErrorRate
expr: |
rate(rag_queries_total{status="error"}[5m])
/ rate(rag_queries_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} for the last 5 minutes"
# 高延迟告警
- alert: HighLatency
expr: |
histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m])) > 5
for: 10m
labels:
severity: warning
annotations:
summary: "High query latency"
description: "P95 latency is {{ $value }}s"
# Pod宕机告警
- alert: PodDown
expr: up{job="rag-api"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "RAG API pod is down"
description: "{{ $labels.instance }} has been down for more than 2 minutes"
# 资源使用告警
- alert: HighCPUUsage
expr: rate(process_cpu_seconds_total[5m]) > 0.8
for: 10m
labels:
severity: warning
annotations:
summary: "High CPU usage"
description: "CPU usage is {{ $value | humanizePercentage }}"
- alert: HighMemoryUsage
expr: |
(process_resident_memory_bytes / node_memory_MemTotal_bytes) > 0.9
for: 5m
labels:
severity: critical
annotations:
summary: "High memory usage"
description: "Memory usage is {{ $value | humanizePercentage }}"
6.2 告警路由¶
# alertmanager.yml
global:
resolve_timeout: 5m
route:
group_by: ['alertname', 'cluster', 'service']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'default'
routes:
- match:
severity: critical
receiver: 'critical'
continue: true
- match:
severity: warning
receiver: 'warnings'
receivers:
- name: 'default'
webhook_configs:
- url: 'http://slack-service/hooks/prometheus'
- name: 'critical'
webhook_configs:
- url: 'http://slack-service/hooks/critical'
email_configs:
- to: 'oncall@example.com'
from: 'alertmanager@example.com'
smarthost: 'smtp.example.com:587'
- name: 'warnings'
webhook_configs:
- url: 'http://slack-service/hooks/warnings'
7. 实战练习¶
练习1:搭建监控系统¶
任务: 1. 部署Prometheus + Grafana 2. 配置ServiceMonitor 3. 创建基础仪表盘
验证:
# 访问Grafana
kubectl port-forward svc/grafana 3000:80 -n monitoring
# 访问Prometheus
kubectl port-forward svc/prometheus 9090:9090 -n monitoring
练习2:配置日志聚合¶
任务: 1. 部署ELK Stack 2. 配置Filebeat 3. 在Kibana创建索引模式
验证:
练习3:实施分布式追踪¶
任务: 1. 集成OpenTelemetry 2. 部署Jaeger 3. 追踪完整请求链路
验证:
# 访问Jaeger UI
kubectl port-forward svc/jaeger-service 16686:16686 -n rag-system
# 查看trace
curl http://localhost:16686
8. 最佳实践¶
8.1 指标设计¶
- 使用标准指标类型:Counter, Gauge, Histogram
- 添加有意义的标签:method, status, endpoint
- 避免高基数标签:user_id, request_id
- 预聚合数据:使用rate()处理counter
8.2 日志最佳实践¶
- 结构化日志:使用JSON格式
- 适当的日志级别:DEBUG, INFO, WARNING, ERROR
- 包含上下文:request_id, user_id
- 避免敏感信息:密码、token
8.3 告警策略¶
- 设置合理的阈值:避免告警疲劳
- 使用告警分组:相关告警合并
- 配置告警静默:维护期间
- 建立值班轮换:及时响应
9. 总结¶
关键要点¶
- 可观测性三大支柱
- Metrics:监控数值指标
- Logs:记录事件日志
-
Traces:追踪请求链路
-
监控工具栈
- Prometheus:指标采集
- Grafana:可视化
- ELK:日志聚合
-
Jaeger:分布式追踪
-
告警管理
- 合理的告警规则
- 有效的路由策略
- 及时响应机制
下一步¶
- 学习CI/CD流程(第20章)
- 性能优化(第21章)
恭喜完成第19章! 🎉
你已经掌握构建完整监控系统的技能!
下一步:第20章 - CI/CD流程