基于Ruby构建关联Loki日志与InfluxDB指标的全栈前端性能观测系统


团队收到的反馈是“网站有时很慢”,这是一个典型的、几乎无法 actionable 的问题。我们的 APM 工具显示服务器端 P95 响应时间稳定在 150ms 以下,Core Web Vitals 聚合数据也显示 LCP 平均值在 2.1s,看起来很健康。但总有那么一小部分用户,他们的体验是断崖式的。问题在于,我们无法将某个用户的糟糕体验(例如 12s 的 LCP)与 उस session 期间发生的一系列具体事件关联起来——无论是客户端错误、特定的 API 调用延迟,还是用户与复杂组件的交互。聚合数据抹平了个体用户的痛苦。

我们需要的是一个能够下钻到单个用户会话维度的观测系统,将前端性能指标(Metrics)与该会话期间产生的所有日志(Logs)精确地关联起来。现有商业方案要么过于昂贵,要么不够灵活。因此,我们决定自建一个轻量级的解决方案。技术栈选型如下:

  • 前端: React,用于构建数据采集端和最终的分析仪表盘。
  • 后端: Ruby (Sinatra),作为一个轻量级的 API 网关,接收、处理并转发数据。
  • 指标存储: InfluxDB,其时间序列特性和高效的 tag 索引机制,是存储性能指标的不二之选。
  • 日志存储: Loki,它基于标签的索引模型与我们的核心需求——用一个唯一 ID 关联所有日志——完美契合,且成本效益极高。
  • 前端样式: Styled-Components,组件化的样式方案能更好地应对复杂数据可视化界面的构建。

整个系统的核心是一个贯穿始终的 correlation_id

架构设计与数据流

在动手之前,一个清晰的架构图是必要的。整个数据流被设计为单向的,从用户浏览器到我们的存储后端,再由分析仪表盘统一查询。

sequenceDiagram
    participant UserBrowser as React App
    participant Backend as Ruby (Sinatra) API
    participant Influx as InfluxDB
    participant Loki as Loki

    UserBrowser->>+Backend: GET / (Initial Page Load)
    Backend-->>-UserBrowser: HTML (with correlation_id)

    loop Session Lifetime
        UserBrowser->>UserBrowser: Captures Web Vitals (LCP, FID)
        UserBrowser->>+Backend: POST /api/v1/metrics (vitals + correlation_id)
        Backend->>+Influx: Write Point (tags: {correlation_id})
        Influx-->>-Backend: Ack

        UserBrowser->>UserBrowser: User action or error occurs
        UserBrowser->>+Backend: POST /api/v1/logs (log message + correlation_id)
        Backend->>+Loki: Push Log Stream (labels: {correlation_id})
        Loki-->>-Backend: Ack
    end

第一步:后端服务的奠基 - Correlation ID 的生成与传递

我们的 Ruby 服务不需要 Rails 那么重,Sinatra 足以胜任。它的主要职责有三:服务 React 应用的静态文件、生成 correlation_id、提供 metrics 和 logs 的接收端点。

首先是 correlation_id 的生成。我们通过一个 Rack 中间件来实现,它会在每个新会话的第一个请求中生成一个 UUID,并将其注入到一个全局 JavaScript 变量中,以便 React 应用可以访问。

config.ru

# frozen_string_literal: true

require 'sinatra'
require 'securerandom'
require_relative 'app/api_server'

# 中间件,用于生成和注入 Correlation ID
class CorrelationIdInjector
  def initialize(app)
    @app = app
  end

  def call(env)
    # 尝试从 session 或 header 中获取,如果不存在则认为是新会话
    # 在这个简化实现中,我们为每个请求都生成一个新的,实际项目应基于会话
    correlation_id = SecureRandom.uuid
    env['app.correlation_id'] = correlation_id
    
    status, headers, response = @app.call(env)

    # 如果是 HTML 响应,则注入 ID
    if headers['Content-Type']&.include?('text/html')
      body = response.body.join.sub(
        '</head>',
        "<script>window.CORRELATION_ID = '#{correlation_id}';</script></head>"
      )
      headers['Content-Length'] = body.bytesize.to_s
      return [status, headers, [body]]
    end

    [status, headers, response]
  end
end

use CorrelationIdInjector
run ApiServer

这个中间件非常直接,它拦截响应,如果是 HTML,就在 </head> 之前插入一段脚本,将 ID 挂载到 window 对象上。

第二步:指标管道 - 从 React 到 InfluxDB

前端需要捕获性能指标。Google 的 web-vitals 库是最佳选择。

frontend/src/VitalsReporter.js

import React, { useEffect } from 'react';
import { onLCP, onFID, onCLS } from 'web-vitals';

// 生产环境中,这个 URL 应该是可配置的
const METRICS_ENDPOINT = '/api/v1/metrics';

const sendMetric = (metric) => {
  const body = {
    name: metric.name,
    value: metric.value,
    correlation_id: window.CORRELATION_ID, // 从 window 对象获取 ID
  };

  // 使用 Beacon API 确保在页面卸载时也能发送数据
  if (navigator.sendBeacon) {
    navigator.sendBeacon(METRICS_ENDPOINT, JSON.stringify(body));
  } else {
    // 降级处理
    fetch(METRICS_ENDPOINT, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(body),
      keepalive: true,
    }).catch(error => {
      console.error('Failed to send metric:', error);
    });
  }
};

const VitalsReporter = () => {
  useEffect(() => {
    // 注册 web-vitals 的回调
    onLCP(sendMetric);
    onFID(sendMetric);
    onCLS(sendMetric);
  }, []);

  return null; // 这个组件没有 UI
};

export default VitalsReporter;

在 React 应用的根组件中加载 <VitalsReporter />,它就会在后台自动捕获并发送指标。注意这里使用了 navigator.sendBeacon,这是一个关键优化,它能保证在页面关闭或跳转时,数据依然能够可靠地发送出去,而不会阻塞主线程。

后端接收到数据后,需要将其写入 InfluxDB。我们使用官方的 influxdb-client-ruby gem。

app/api_server.rb

# frozen_string_literal: true

require 'sinatra/base'
require 'sinatra/json'
require 'influxdb-client'
require 'json'
require 'logger'

class ApiServer < Sinatra::Base
  # 配置日志
  configure do
    set :logger, Logger.new($stdout)
    set :public_folder, 'frontend/build'
  end
  
  # 配置 InfluxDB 客户端
  before do
    @influx_client ||= InfluxDB2::Client.new(
      ENV.fetch('INFLUXDB_URL', 'http://localhost:8086'),
      ENV.fetch('INFLUXDB_TOKEN'),
      org: ENV.fetch('INFLUXDB_ORG'),
      bucket: ENV.fetch('INFLUXDB_BUCKET'),
      use_ssl: false,
      precision: InfluxDB2::WritePrecision::MS # 毫秒级精度
    )
    @influx_write_api = @influx_client.create_write_api
  end
  
  after do
    @influx_client&.close!
  end
  
  # Metrics Endpoint
  post '/api/v1/metrics' do
    begin
      payload = JSON.parse(request.body.read)
      logger.info "Received metric: #{payload}"

      correlation_id = payload['correlation_id']
      metric_name = payload['name']
      metric_value = payload['value'].to_f

      # 数据点(Point)是 InfluxDB 的核心概念
      # 我们将 correlation_id 作为 tag,这对于后续查询至关重要
      point = InfluxDB2::Point.new(name: 'web_vitals')
                              .add_tag('correlation_id', correlation_id)
                              .add_tag('metric_name', metric_name)
                              .add_field('value', metric_value)
                              .time(Time.now.to_i * 1000) # 使用当前服务器时间戳

      @influx_write_api.write(data: point)

      status 202 # Accepted
      json success: true
    rescue JSON::ParserError => e
      status 400
      json error: 'Invalid JSON payload'
    rescue StandardError => e
      logger.error "Failed to write to InfluxDB: #{e.message}"
      status 500
      json error: 'Internal Server Error'
    end
  end

  # ... 其他路由 ...
  get '*' do
    send_file File.join(settings.public_folder, 'index.html')
  end
end

这里的关键在于,correlation_id 被用作了 InfluxDB 的 tag。在 InfluxDB 中,tags 是被索引的,而 fields 不是。这意味着我们可以极快地通过 correlation_id 筛选出属于某个会话的所有性能指标。这是一个核心的设计决策。

第三步:日志管道 - 从 React 到 Loki

与指标类似,我们需要一个前端日志收集器。我们可以创建一个简单的 React Hook。

frontend/src/hooks/useLogger.js

import { useCallback } from 'react';

const LOGS_ENDPOINT = '/api/v1/logs';

export const useLogger = () => {
  const log = useCallback((level, message, context = {}) => {
    const payload = {
      level,
      message,
      context,
      timestamp: new Date().toISOString(),
      correlation_id: window.CORRELATION_ID,
    };

    const body = JSON.stringify(payload);
    
    // 日志发送失败不应影响用户体验,同样使用 sendBeacon
    if (navigator.sendBeacon) {
      navigator.sendBeacon(LOGS_ENDPOINT, body);
    } else {
      fetch(LOGS_ENDPOINT, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body,
        keepalive: true,
      }).catch(error => {
        // 在开发模式下可以打印错误,生产环境应静默失败
        if (process.env.NODE_ENV === 'development') {
          console.error('Failed to send log:', error);
        }
      });
    }
  }, []);

  return {
    info: (message, context) => log('info', message, context),
    warn: (message, context) => log('warn', message, context),
    error: (message, context) => log('error', message, context),
  };
};

在任何组件中,我们都可以这样使用:
const logger = useLogger();
logger.error('Failed to fetch user data', { component: 'UserProfile' });

后端接收到日志后,需要将其推送到 Loki。Loki 的 HTTP API 允许我们推送日志流。

app/api_server.rb (续)

# ...
require 'net/http'
require 'uri'

# ... in ApiServer class
  # Loki Endpoint
  post '/api/v1/logs' do
    begin
      payload = JSON.parse(request.body.read)
      logger.info "Received log: #{payload}"
      
      correlation_id = payload['correlation_id']
      
      # Loki 要求的时间戳是纳秒级的字符串
      timestamp = (Time.parse(payload['timestamp']).to_f * 1_000_000_000).to_i.to_s
      log_line = payload.to_json

      # 构建 Loki Push API 的请求体
      loki_payload = {
        'streams' => [
          {
            'stream' => {
              'job' => 'frontend_app',
              'level' => payload['level'],
              'correlation_id' => correlation_id # correlation_id 作为 Loki Label
            },
            'values' => [
              [timestamp, log_line]
            ]
          }
        ]
      }.to_json

      # 直接使用 Net::HTTP 发送请求
      loki_uri = URI.parse(ENV.fetch('LOKI_PUSH_URL', 'http://localhost:3100/loki/api/v1/push'))
      http = Net::HTTP.new(loki_uri.host, loki_uri.port)
      request = Net::HTTP::Post.new(loki_uri.request_uri, 'Content-Type' => 'application/json')
      request.body = loki_payload
      
      response = http.request(request)

      if response.code == '204'
        status 202
        json success: true
      else
        logger.error "Failed to push to Loki: #{response.code} #{response.body}"
        status 502 # Bad Gateway
        json error: 'Failed to forward log to Loki'
      end
    rescue JSON::ParserError => e
      status 400
      json error: 'Invalid JSON payload'
    rescue StandardError => e
      logger.error "Log processing error: #{e.message}"
      status 500
      json error: 'Internal Server Error'
    end
  end
# ...

同样,correlation_id 在这里被用作了 Loki 的 label。这使得我们能够通过 LogQL 查询 {correlation_id="some-uuid"} 来精确地拉取一个会话的所有日志。这是 Loki 设计哲学的体现:只索引元数据(labels),不索引日志内容,从而实现极高的写入吞吐和存储效率。

第四步:统一仪表盘 - 查询与可视化

现在数据已经分门别类地存储好了。最后一步是构建一个 React 仪表盘,将它们关联并展示出来。

我们需要一个新的后端端点,它接收一个 correlation_id,然后并行查询 InfluxDB 和 Loki。

app/api_server.rb (续)

# ...
# in ApiServer class
  get '/api/v1/session/:id' do |id|
    content_type :json

    # 使用线程并行查询
    influx_thread = Thread.new do
      begin
        query = <<~FLUX
          from(bucket: "#{ENV.fetch('INFLUXDB_BUCKET')}")
            |> range(start: -24h) 
            |> filter(fn: (r) => r["_measurement"] == "web_vitals")
            |> filter(fn: (r) => r["correlation_id"] == "#{id}")
            |> pivot(rowKey:["_time"], columnKey: ["metric_name"], valueColumn: "_value")
            |> yield(name: "results")
        FLUX
        @influx_client.create_query_api.query(query: query).to_json
      rescue StandardError => e
        logger.error "InfluxDB query failed: #{e.message}"
        { error: 'InfluxDB query failed' }.to_json
      end
    end

    loki_thread = Thread.new do
      begin
        loki_query = URI.encode_www_form(
          'query' => "{correlation_id=\"#{id}\"}",
          'limit' => 1000, # 设置一个合理的上限
          'start' => (Time.now - 24 * 3600).to_i * 1_000_000_000,
          'direction' => 'forward'
        )
        loki_uri = URI.parse("#{ENV.fetch('LOKI_QUERY_URL', 'http://localhost:3100')}/loki/api/v1/query_range?#{loki_query}")
        response = Net::HTTP.get_response(loki_uri)
        response.body
      rescue StandardError => e
        logger.error "Loki query failed: #{e.message}"
        { error: 'Loki query failed' }.to_json
      end
    end

    influx_results = JSON.parse(influx_thread.value)
    loki_results = JSON.parse(loki_thread.value)

    {
      metrics: influx_results,
      logs: loki_results
    }.to_json
  end
# ...

这个端点是仪表盘的核心数据源。前端拿到这些数据后,剩下的就是可视化的工作了。这里 styled-components 的作用就体现出来了。我们可以为不同的日志级别定义不同的样式,为指标图表创建可复用的主题。

frontend/src/components/SessionDashboard.js

import React, { useState, useEffect } from 'react';
import { useParams } from 'react-router-dom';
import styled from 'styled-components';

const DashboardLayout = styled.div`
  display: grid;
  grid-template-columns: 1fr;
  grid-template-rows: auto 1fr;
  gap: 20px;
  padding: 20px;
  height: calc(100vh - 40px);
  font-family: monospace;
`;

const MetricsPanel = styled.div`
  background-color: #282c34;
  color: white;
  padding: 15px;
  border-radius: 8px;
`;

const LogsPanel = styled.div`
  background-color: #f0f0f0;
  padding: 15px;
  border-radius: 8px;
  overflow-y: auto;
`;

const LogEntry = styled.div`
  padding: 5px;
  border-bottom: 1px solid #ddd;
  color: ${props => {
    switch (props.level) {
      case 'error': return '#d9534f';
      case 'warn': return '#f0ad4e';
      default: return '#333';
    }
  }};
`;

const SessionDashboard = () => {
  const { sessionId } = useParams();
  const [data, setData] = useState({ metrics: [], logs: null });
  const [loading, setLoading] = useState(true);
  const [error, setError] = useState(null);

  useEffect(() => {
    const fetchSessionData = async () => {
      try {
        setLoading(true);
        const response = await fetch(`/api/v1/session/${sessionId}`);
        if (!response.ok) {
          throw new Error(`HTTP error! status: ${response.status}`);
        }
        const result = await response.json();
        
        // 解析 Loki 返回的数据
        const parsedLogs = result.logs?.data?.result[0]?.values.map(v => ({
            timestamp: new Date(parseInt(v[0].substring(0, 13))).toISOString(),
            ...JSON.parse(v[1])
        })) || [];
        
        setData({ metrics: result.metrics, logs: parsedLogs });
      } catch (e) {
        setError(e.message);
      } finally {
        setLoading(false);
      }
    };

    fetchSessionData();
  }, [sessionId]);

  if (loading) return <div>Loading session {sessionId}...</div>;
  if (error) return <div>Error: {error}</div>;

  return (
    <DashboardLayout>
      <MetricsPanel>
        <h3>Metrics for Session: {sessionId}</h3>
        {/* 在这里可以使用图表库如 Recharts 来渲染指标 */}
        <pre>{JSON.stringify(data.metrics, null, 2)}</pre>
      </MetricsPanel>
      <LogsPanel>
        <h3>Logs</h3>
        {data.logs && data.logs.map((log, index) => (
          <LogEntry key={index} level={log.level}>
            <strong>{log.timestamp} [{log.level.toUpperCase()}]</strong>: {log.message}
            {Object.keys(log.context).length > 0 && <pre>{JSON.stringify(log.context, null, 2)}</pre>}
          </LogEntry>
        ))}
      </LogsPanel>
    </DashboardLayout>
  );
};

export default SessionDashboard;

这个仪表盘虽然简单,但它验证了整个链路的可行性。我们可以清晰地看到某个会话的所有性能数据和日志,它们被 correlation_id 牢牢地绑定在一起。当看到一个 LCP 高达 8 秒的指标时,我们可以立刻滚动到对应时间点的日志,可能会发现一个关键的 API 请求失败,或者一个渲染密集型组件被重复挂载。问题定位的效率得到了质的提升。

局限性与未来展望

这个系统并非完美。在真实生产环境中,有几个问题需要考虑:

  1. 后端瓶颈: Ruby API 是一个单点。在高流量下,它可能成为瓶颈。一个更健壮的架构应该让前端直接将数据发送到可扩展的收集器(如 OpenTelemetry Collector),由收集器负责将数据分发到 InfluxDB 和 Loki。
  2. Loki 的高基数问题: 将 UUID 作为 label 会导致 Loki 的索引基数(cardinality)爆炸,这在超大规模下会严重影响 Loki 的性能。一个改进方案是将 correlation_id 作为日志内容的一部分,而不是 label。查询时使用 Loki 的 filter 表达式(如 |= "correlation_id=some-uuid")进行过滤。这样查询速度会变慢,但保证了 Loki 索引的健康。这是一个典型的权衡。
  3. 数据采样: 收集所有用户的每一个会话数据成本高昂。可以实施采样策略,比如只收集 10% 的会话,或者只收集那些性能指标超过某个阈值的会话。
  4. 数据安全: 当前实现没有考虑认证和授权,在公网环境中是完全不可接受的。需要增加 API key 或其他认证机制。

下一步的迭代方向是引入分布式追踪(Tracing),将 correlation_id 升级为 trace_id,并使其在前端、Ruby 后端以及所有下游微服务之间传递,从而构建一个完整的、贯穿前后台的统一观测平台。


  目录