跳转至

第19章:监控和日志

构建完整的可观测性系统,实时掌握RAG系统运行状态


📚 章节概述

本章将学习如何为RAG系统构建完整的监控和日志系统,实现端到端的可观测性。

学习目标

完成本章后,你将能够: - ✅ 理解可观测性的三大支柱 - ✅ 配置Prometheus指标采集 - ✅ 创建Grafana可视化仪表盘 - ✅ 搭建ELK日志聚合系统 - ✅ 实施分布式追踪 - ✅ 配置智能告警

预计时间

  • 理论学习:60分钟
  • 实践操作:90-120分钟
  • 总计:约3-4小时

1. 可观测性基础

1.1 三大支柱

可观测性 = Metrics + Logs + Traces

Metrics(指标):
  → WHAT发生了什么
  → 数值化的可聚合数据
  → 例如:QPS、延迟、错误率

Logs(日志):
  → WHY为什么发生
  → 离散的事件记录
  → 例如:错误消息、调试信息

Traces(追踪):
  → WHERE在哪里发生
  → 请求的完整路径
  → 例如:微服务调用链

1.2 监控层次

基础设施层(Infrastructure)
平台层(Platform)
应用层(Application)
业务层(Business)

1.3 监控指标类型

RED Method: - **R**ate(速率):每秒请求数 - **E**rrors(错误):错误率 - **D**uration(延迟):响应时间

USE Method: - **U**tilization(利用率):资源使用百分比 - **S**aturation(饱和度):资源压力程度 - **E**rrors(错误):错误计数


2. Prometheus监控

2.1 Prometheus架构

┌──────────────┐
│   Services   │  (RAG API, PostgreSQL等)
└──────┬───────┘
       │ /metrics
┌──────────────┐
│  Prometheus  │  (采集和存储)
└──────┬───────┘
┌──────────────┐
│   Grafana    │  (可视化)
└──────────────┘
┌──────────────┐
│  Alertmanager│  (告警)
└──────────────┘

2.2 集成Prometheus到RAG API

安装依赖

pip install prometheus-fastapi-instrumentator

FastAPI集成

from fastapi import FastAPI
from prometheus_fastapi_instrumentator import Instrumentator

app = FastAPI()

# 启用Prometheus指标
Instrumentator().instrument(app).expose(app, endpoint="/metrics")

@app.get("/query")
async def query_endpoint(text: str):
    # 你的RAG逻辑
    return {"result": "answer"}

自定义指标

from prometheus_client import Counter, Histogram, Gauge

# 定义指标
query_counter = Counter(
    'rag_queries_total',
    'Total number of RAG queries',
    ['method', 'status']
)

query_duration = Histogram(
    'rag_query_duration_seconds',
    'RAG query duration',
    ['method']
)

active_queries = Gauge(
    'rag_active_queries',
    'Number of active queries'
)

# 在应用中使用
@app.post("/query")
async def query(text: str):
    active_queries.inc()
    with query_duration.labels(method='post').time():
        try:
            result = await rag_query(text)
            query_counter.labels(method='post', status='success').inc()
            return result
        except Exception as e:
            query_counter.labels(method='post', status='error').inc()
            raise
        finally:
            active_queries.dec()

2.3 K8s ServiceMonitor配置

# servicemonitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: rag-api-monitor
  namespace: rag-system
  labels:
    app: rag-api
spec:
  selector:
    matchLabels:
      app: rag-api
  endpoints:
  - port: http
    path: /metrics
    interval: 15s
    scrapeTimeout: 10s

Service配置

apiVersion: v1
kind: Service
metadata:
  name: rag-api
  namespace: rag-system
  labels:
    app: rag-api
  annotations:
    prometheus.io/scrape: "true"
    prometheus.io/port: "8000"
    prometheus.io/path: "/metrics"
spec:
  selector:
    matchLabels:
      app: rag-api
  ports:
  - name: http
    port: 8000
    targetPort: 8000


3. Grafana可视化

3.1 数据源配置

{
  "name": "Prometheus",
  "type": "prometheus",
  "url": "http://prometheus-service:9090",
  "access": "proxy",
  "isDefault": true
}

3.2 核心仪表盘

RAG系统概览仪表盘

{
  "dashboard": {
    "title": "RAG System Overview",
    "panels": [
      {
        "title": "Query Rate (QPS)",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(rag_queries_total[5m])"
          }
        ]
      },
      {
        "title": "Query Duration (p95)",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m]))"
          }
        ]
      },
      {
        "title": "Error Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(rag_queries_total{status=\"error\"}[5m]) / rate(rag_queries_total[5m])"
          }
        ]
      },
      {
        "title": "Active Queries",
        "type": "graph",
        "targets": [
          {
            "expr": "rag_active_queries"
          }
        ]
      }
    ]
  }
}

3.3 重要PromQL查询

系统指标

# CPU使用率
rate(process_cpu_seconds_total[5m])

# 内存使用
process_resident_memory_bytes

# 请求速率
rate(http_requests_total[5m])

# 延迟(p95)
histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))

# 错误率
rate(http_requests_total{status=~"5.."}[5m])

RAG业务指标

# 总查询数
rag_queries_total

# 查询成功率
rate(rag_queries_total{status="success"}[5m]) / rate(rag_queries_total[5m])

# 平均检索文档数
rate(rag_retrieved_documents_total[5m]) / rate(rag_queries_total[5m])

# LLM Token使用率
rate(rag_llm_tokens_total[5m])

# 向量检索延迟
histogram_quantile(0.95, rate(rag_vector_search_duration_seconds_bucket[5m]))


4. 日志系统(ELK)

4.1 ELK架构

┌──────────┐
│  Filebeat│  (日志收集)
└─────┬────┘
┌──────────┐
│ Logstash │  (日志处理)
└─────┬────┘
┌──────────┐
│Elasticsearch│ (日志存储)
└─────┬────┘
┌──────────┐
│ Kibana   │  (日志可视化)
└──────────┘

4.2 日志格式化

结构化日志(JSON)

import json
import logging
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }

        # 添加自定义字段
        if hasattr(record, 'query_id'):
            log_data['query_id'] = record.query_id
        if hasattr(record, 'user_id'):
            log_data['user_id'] = record.user_id

        # 添加异常信息
        if record.exc_info:
            log_data['exception'] = self.formatException(record.exc_info)

        return json.dumps(log_data)

# 配置日志
logger = logging.getLogger("rag_api")
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

使用示例

@app.post("/query")
async def query(text: str, user_id: str = None):
    query_id = str(uuid.uuid4())

    # 添加上下文
    logger.info("Query received", extra={
        'query_id': query_id,
        'user_id': user_id,
        'query_length': len(text)
    })

    try:
        result = await process_query(text)
        logger.info("Query completed", extra={
            'query_id': query_id,
            'documents_retrieved': len(result['documents']),
            'response_length': len(result['answer'])
        })
        return result
    except Exception as e:
        logger.error("Query failed", extra={
            'query_id': query_id,
            'error': str(e)
        }, exc_info=True)
        raise

4.3 Filebeat配置

# filebeat.yml
filebeat.inputs:
- type: container
  paths:
    - /var/log/containers/rag-api-*.log
  processors:
  - add_kubernetes_metadata:
      host: ${NODE_NAME}
      matchers:
      - logs_path:
          logs_path: "/var/log/containers/"

  - drop_event:
      when:
        equals:
          kubernetes.container.name: "filebeat"

output.elasticsearch:
  hosts: ["elasticsearch-service:9200"]
  indices:
    - index: "rag-api-%{+yyyy.MM.dd}"
      when.equals:
        kubernetes.labels.app: "rag-api"

setup.kibana:
  host: "kibana-service:5601"

logging.level: info

5. 分布式追踪

5.1 OpenTelemetry集成

安装

pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-fastapi
pip install opentelemetry-exporter-otlp

配置

from fastapi import FastAPI
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

# 配置追踪
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

otlp_exporter = OTLPSpanExporter(endpoint="http://jaeger-service:4317")
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

app = FastAPI()
FastAPIInstrumentor.instrument_app(app)

# 自定义span
@app.post("/query")
async def query(text: str):
    with tracer.start_as_current_span("rag_query") as span:
        span.set_attribute("query.length", len(text))

        with tracer.start_as_current_span("vector_search"):
            documents = await vector_search(text)

        with tracer.start_as_current_span("llm_generation"):
            answer = await generate_answer(text, documents)

        return {"answer": answer, "documents": documents}

5.2 Jaeger部署

# jaeger-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: jaeger
  namespace: rag-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: jaeger
  template:
    metadata:
      labels:
        app: jaeger
    spec:
      containers:
      - name: jaeger
        image: jaegertracing/all-in-one:latest
        ports:
        - containerPort: 5775  # accept zipkin.thrift over compact thrift protocol
        - containerPort: 6831  # accept jaeger.thrift over compact thrift protocol
        - containerPort: 6832  # accept jaeger.thrift over binary thrift protocol
        - containerPort: 5778  # serve configs
        - containerPort: 16686 # serve frontend
        - containerPort: 14268 # accept jaeger.thrift directly from clients
        - containerPort: 14250 # accept model.proto
        - containerPort: 9411  # Zipkin compatible endpoint
        - containerPort: 4317  # OTLP gRPC receiver
        - containerPort: 4318  # OTLP HTTP receiver
        env:
        - name: COLLECTOR_OTLP_ENABLED
          value: "true"
---
apiVersion: v1
kind: Service
metadata:
  name: jaeger-service
  namespace: rag-system
spec:
  selector:
    app: jaeger
  ports:
  - name: ui
    port: 16686
    targetPort: 16686
  - name: otlp-grpc
    port: 4317
    targetPort: 4317
  type: LoadBalancer

6. 告警配置

6.1 Alertmanager规则

# alerting_rules.yaml
groups:
- name: rag_api_alerts
  interval: 30s
  rules:
  # 高错误率告警
  - alert: HighErrorRate
    expr: |
      rate(rag_queries_total{status="error"}[5m])
      / rate(rag_queries_total[5m]) > 0.05
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "High error rate detected"
      description: "Error rate is {{ $value | humanizePercentage }} for the last 5 minutes"

  # 高延迟告警
  - alert: HighLatency
    expr: |
      histogram_quantile(0.95, rate(rag_query_duration_seconds_bucket[5m])) > 5
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "High query latency"
      description: "P95 latency is {{ $value }}s"

  # Pod宕机告警
  - alert: PodDown
    expr: up{job="rag-api"} == 0
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "RAG API pod is down"
      description: "{{ $labels.instance }} has been down for more than 2 minutes"

  # 资源使用告警
  - alert: HighCPUUsage
    expr: rate(process_cpu_seconds_total[5m]) > 0.8
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "High CPU usage"
      description: "CPU usage is {{ $value | humanizePercentage }}"

  - alert: HighMemoryUsage
    expr: |
      (process_resident_memory_bytes / node_memory_MemTotal_bytes) > 0.9
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "High memory usage"
      description: "Memory usage is {{ $value | humanizePercentage }}"

6.2 告警路由

# alertmanager.yml
global:
  resolve_timeout: 5m

route:
  group_by: ['alertname', 'cluster', 'service']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: 'default'
  routes:
  - match:
      severity: critical
    receiver: 'critical'
    continue: true

  - match:
      severity: warning
    receiver: 'warnings'

receivers:
- name: 'default'
  webhook_configs:
  - url: 'http://slack-service/hooks/prometheus'

- name: 'critical'
  webhook_configs:
  - url: 'http://slack-service/hooks/critical'
  email_configs:
  - to: 'oncall@example.com'
    from: 'alertmanager@example.com'
    smarthost: 'smtp.example.com:587'

- name: 'warnings'
  webhook_configs:
  - url: 'http://slack-service/hooks/warnings'

7. 实战练习

练习1:搭建监控系统

任务: 1. 部署Prometheus + Grafana 2. 配置ServiceMonitor 3. 创建基础仪表盘

验证

# 访问Grafana
kubectl port-forward svc/grafana 3000:80 -n monitoring

# 访问Prometheus
kubectl port-forward svc/prometheus 9090:9090 -n monitoring


练习2:配置日志聚合

任务: 1. 部署ELK Stack 2. 配置Filebeat 3. 在Kibana创建索引模式

验证

# 访问Kibana
kubectl port-forward svc/kibana 5601:5601 -n logging

# 查看日志
curl http://localhost:5601


练习3:实施分布式追踪

任务: 1. 集成OpenTelemetry 2. 部署Jaeger 3. 追踪完整请求链路

验证

# 访问Jaeger UI
kubectl port-forward svc/jaeger-service 16686:16686 -n rag-system

# 查看trace
curl http://localhost:16686


8. 最佳实践

8.1 指标设计

  • 使用标准指标类型:Counter, Gauge, Histogram
  • 添加有意义的标签:method, status, endpoint
  • 避免高基数标签:user_id, request_id
  • 预聚合数据:使用rate()处理counter

8.2 日志最佳实践

  • 结构化日志:使用JSON格式
  • 适当的日志级别:DEBUG, INFO, WARNING, ERROR
  • 包含上下文:request_id, user_id
  • 避免敏感信息:密码、token

8.3 告警策略

  • 设置合理的阈值:避免告警疲劳
  • 使用告警分组:相关告警合并
  • 配置告警静默:维护期间
  • 建立值班轮换:及时响应

9. 总结

关键要点

  1. 可观测性三大支柱
  2. Metrics:监控数值指标
  3. Logs:记录事件日志
  4. Traces:追踪请求链路

  5. 监控工具栈

  6. Prometheus:指标采集
  7. Grafana:可视化
  8. ELK:日志聚合
  9. Jaeger:分布式追踪

  10. 告警管理

  11. 合理的告警规则
  12. 有效的路由策略
  13. 及时响应机制

下一步

  • 学习CI/CD流程(第20章)
  • 性能优化(第21章)

恭喜完成第19章! 🎉

你已经掌握构建完整监控系统的技能!

下一步:第20章 - CI/CD流程