团队收到的反馈是“网站有时很慢”,这是一个典型的、几乎无法 actionable 的问题。我们的 APM 工具显示服务器端 P95 响应时间稳定在 150ms 以下,Core Web Vitals 聚合数据也显示 LCP 平均值在 2.1s,看起来很健康。但总有那么一小部分用户,他们的体验是断崖式的。问题在于,我们无法将某个用户的糟糕体验(例如 12s 的 LCP)与 उस session 期间发生的一系列具体事件关联起来——无论是客户端错误、特定的 API 调用延迟,还是用户与复杂组件的交互。聚合数据抹平了个体用户的痛苦。
我们需要的是一个能够下钻到单个用户会话维度的观测系统,将前端性能指标(Metrics)与该会话期间产生的所有日志(Logs)精确地关联起来。现有商业方案要么过于昂贵,要么不够灵活。因此,我们决定自建一个轻量级的解决方案。技术栈选型如下:
- 前端: React,用于构建数据采集端和最终的分析仪表盘。
- 后端: Ruby (Sinatra),作为一个轻量级的 API 网关,接收、处理并转发数据。
- 指标存储: InfluxDB,其时间序列特性和高效的 tag 索引机制,是存储性能指标的不二之选。
- 日志存储: Loki,它基于标签的索引模型与我们的核心需求——用一个唯一 ID 关联所有日志——完美契合,且成本效益极高。
- 前端样式: Styled-Components,组件化的样式方案能更好地应对复杂数据可视化界面的构建。
整个系统的核心是一个贯穿始终的 correlation_id。
架构设计与数据流
在动手之前,一个清晰的架构图是必要的。整个数据流被设计为单向的,从用户浏览器到我们的存储后端,再由分析仪表盘统一查询。
sequenceDiagram
participant UserBrowser as React App
participant Backend as Ruby (Sinatra) API
participant Influx as InfluxDB
participant Loki as Loki
UserBrowser->>+Backend: GET / (Initial Page Load)
Backend-->>-UserBrowser: HTML (with correlation_id)
loop Session Lifetime
UserBrowser->>UserBrowser: Captures Web Vitals (LCP, FID)
UserBrowser->>+Backend: POST /api/v1/metrics (vitals + correlation_id)
Backend->>+Influx: Write Point (tags: {correlation_id})
Influx-->>-Backend: Ack
UserBrowser->>UserBrowser: User action or error occurs
UserBrowser->>+Backend: POST /api/v1/logs (log message + correlation_id)
Backend->>+Loki: Push Log Stream (labels: {correlation_id})
Loki-->>-Backend: Ack
end
第一步:后端服务的奠基 - Correlation ID 的生成与传递
我们的 Ruby 服务不需要 Rails 那么重,Sinatra 足以胜任。它的主要职责有三:服务 React 应用的静态文件、生成 correlation_id、提供 metrics 和 logs 的接收端点。
首先是 correlation_id 的生成。我们通过一个 Rack 中间件来实现,它会在每个新会话的第一个请求中生成一个 UUID,并将其注入到一个全局 JavaScript 变量中,以便 React 应用可以访问。
config.ru
# frozen_string_literal: true
require 'sinatra'
require 'securerandom'
require_relative 'app/api_server'
# 中间件,用于生成和注入 Correlation ID
class CorrelationIdInjector
def initialize(app)
@app = app
end
def call(env)
# 尝试从 session 或 header 中获取,如果不存在则认为是新会话
# 在这个简化实现中,我们为每个请求都生成一个新的,实际项目应基于会话
correlation_id = SecureRandom.uuid
env['app.correlation_id'] = correlation_id
status, headers, response = @app.call(env)
# 如果是 HTML 响应,则注入 ID
if headers['Content-Type']&.include?('text/html')
body = response.body.join.sub(
'</head>',
"<script>window.CORRELATION_ID = '#{correlation_id}';</script></head>"
)
headers['Content-Length'] = body.bytesize.to_s
return [status, headers, [body]]
end
[status, headers, response]
end
end
use CorrelationIdInjector
run ApiServer
这个中间件非常直接,它拦截响应,如果是 HTML,就在 </head> 之前插入一段脚本,将 ID 挂载到 window 对象上。
第二步:指标管道 - 从 React 到 InfluxDB
前端需要捕获性能指标。Google 的 web-vitals 库是最佳选择。
frontend/src/VitalsReporter.js
import React, { useEffect } from 'react';
import { onLCP, onFID, onCLS } from 'web-vitals';
// 生产环境中,这个 URL 应该是可配置的
const METRICS_ENDPOINT = '/api/v1/metrics';
const sendMetric = (metric) => {
const body = {
name: metric.name,
value: metric.value,
correlation_id: window.CORRELATION_ID, // 从 window 对象获取 ID
};
// 使用 Beacon API 确保在页面卸载时也能发送数据
if (navigator.sendBeacon) {
navigator.sendBeacon(METRICS_ENDPOINT, JSON.stringify(body));
} else {
// 降级处理
fetch(METRICS_ENDPOINT, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
keepalive: true,
}).catch(error => {
console.error('Failed to send metric:', error);
});
}
};
const VitalsReporter = () => {
useEffect(() => {
// 注册 web-vitals 的回调
onLCP(sendMetric);
onFID(sendMetric);
onCLS(sendMetric);
}, []);
return null; // 这个组件没有 UI
};
export default VitalsReporter;
在 React 应用的根组件中加载 <VitalsReporter />,它就会在后台自动捕获并发送指标。注意这里使用了 navigator.sendBeacon,这是一个关键优化,它能保证在页面关闭或跳转时,数据依然能够可靠地发送出去,而不会阻塞主线程。
后端接收到数据后,需要将其写入 InfluxDB。我们使用官方的 influxdb-client-ruby gem。
app/api_server.rb
# frozen_string_literal: true
require 'sinatra/base'
require 'sinatra/json'
require 'influxdb-client'
require 'json'
require 'logger'
class ApiServer < Sinatra::Base
# 配置日志
configure do
set :logger, Logger.new($stdout)
set :public_folder, 'frontend/build'
end
# 配置 InfluxDB 客户端
before do
@influx_client ||= InfluxDB2::Client.new(
ENV.fetch('INFLUXDB_URL', 'http://localhost:8086'),
ENV.fetch('INFLUXDB_TOKEN'),
org: ENV.fetch('INFLUXDB_ORG'),
bucket: ENV.fetch('INFLUXDB_BUCKET'),
use_ssl: false,
precision: InfluxDB2::WritePrecision::MS # 毫秒级精度
)
@influx_write_api = @influx_client.create_write_api
end
after do
@influx_client&.close!
end
# Metrics Endpoint
post '/api/v1/metrics' do
begin
payload = JSON.parse(request.body.read)
logger.info "Received metric: #{payload}"
correlation_id = payload['correlation_id']
metric_name = payload['name']
metric_value = payload['value'].to_f
# 数据点(Point)是 InfluxDB 的核心概念
# 我们将 correlation_id 作为 tag,这对于后续查询至关重要
point = InfluxDB2::Point.new(name: 'web_vitals')
.add_tag('correlation_id', correlation_id)
.add_tag('metric_name', metric_name)
.add_field('value', metric_value)
.time(Time.now.to_i * 1000) # 使用当前服务器时间戳
@influx_write_api.write(data: point)
status 202 # Accepted
json success: true
rescue JSON::ParserError => e
status 400
json error: 'Invalid JSON payload'
rescue StandardError => e
logger.error "Failed to write to InfluxDB: #{e.message}"
status 500
json error: 'Internal Server Error'
end
end
# ... 其他路由 ...
get '*' do
send_file File.join(settings.public_folder, 'index.html')
end
end
这里的关键在于,correlation_id 被用作了 InfluxDB 的 tag。在 InfluxDB 中,tags 是被索引的,而 fields 不是。这意味着我们可以极快地通过 correlation_id 筛选出属于某个会话的所有性能指标。这是一个核心的设计决策。
第三步:日志管道 - 从 React 到 Loki
与指标类似,我们需要一个前端日志收集器。我们可以创建一个简单的 React Hook。
frontend/src/hooks/useLogger.js
import { useCallback } from 'react';
const LOGS_ENDPOINT = '/api/v1/logs';
export const useLogger = () => {
const log = useCallback((level, message, context = {}) => {
const payload = {
level,
message,
context,
timestamp: new Date().toISOString(),
correlation_id: window.CORRELATION_ID,
};
const body = JSON.stringify(payload);
// 日志发送失败不应影响用户体验,同样使用 sendBeacon
if (navigator.sendBeacon) {
navigator.sendBeacon(LOGS_ENDPOINT, body);
} else {
fetch(LOGS_ENDPOINT, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,
keepalive: true,
}).catch(error => {
// 在开发模式下可以打印错误,生产环境应静默失败
if (process.env.NODE_ENV === 'development') {
console.error('Failed to send log:', error);
}
});
}
}, []);
return {
info: (message, context) => log('info', message, context),
warn: (message, context) => log('warn', message, context),
error: (message, context) => log('error', message, context),
};
};
在任何组件中,我们都可以这样使用:const logger = useLogger();logger.error('Failed to fetch user data', { component: 'UserProfile' });
后端接收到日志后,需要将其推送到 Loki。Loki 的 HTTP API 允许我们推送日志流。
app/api_server.rb (续)
# ...
require 'net/http'
require 'uri'
# ... in ApiServer class
# Loki Endpoint
post '/api/v1/logs' do
begin
payload = JSON.parse(request.body.read)
logger.info "Received log: #{payload}"
correlation_id = payload['correlation_id']
# Loki 要求的时间戳是纳秒级的字符串
timestamp = (Time.parse(payload['timestamp']).to_f * 1_000_000_000).to_i.to_s
log_line = payload.to_json
# 构建 Loki Push API 的请求体
loki_payload = {
'streams' => [
{
'stream' => {
'job' => 'frontend_app',
'level' => payload['level'],
'correlation_id' => correlation_id # correlation_id 作为 Loki Label
},
'values' => [
[timestamp, log_line]
]
}
]
}.to_json
# 直接使用 Net::HTTP 发送请求
loki_uri = URI.parse(ENV.fetch('LOKI_PUSH_URL', 'http://localhost:3100/loki/api/v1/push'))
http = Net::HTTP.new(loki_uri.host, loki_uri.port)
request = Net::HTTP::Post.new(loki_uri.request_uri, 'Content-Type' => 'application/json')
request.body = loki_payload
response = http.request(request)
if response.code == '204'
status 202
json success: true
else
logger.error "Failed to push to Loki: #{response.code} #{response.body}"
status 502 # Bad Gateway
json error: 'Failed to forward log to Loki'
end
rescue JSON::ParserError => e
status 400
json error: 'Invalid JSON payload'
rescue StandardError => e
logger.error "Log processing error: #{e.message}"
status 500
json error: 'Internal Server Error'
end
end
# ...
同样,correlation_id 在这里被用作了 Loki 的 label。这使得我们能够通过 LogQL 查询 {correlation_id="some-uuid"} 来精确地拉取一个会话的所有日志。这是 Loki 设计哲学的体现:只索引元数据(labels),不索引日志内容,从而实现极高的写入吞吐和存储效率。
第四步:统一仪表盘 - 查询与可视化
现在数据已经分门别类地存储好了。最后一步是构建一个 React 仪表盘,将它们关联并展示出来。
我们需要一个新的后端端点,它接收一个 correlation_id,然后并行查询 InfluxDB 和 Loki。
app/api_server.rb (续)
# ...
# in ApiServer class
get '/api/v1/session/:id' do |id|
content_type :json
# 使用线程并行查询
influx_thread = Thread.new do
begin
query = <<~FLUX
from(bucket: "#{ENV.fetch('INFLUXDB_BUCKET')}")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "web_vitals")
|> filter(fn: (r) => r["correlation_id"] == "#{id}")
|> pivot(rowKey:["_time"], columnKey: ["metric_name"], valueColumn: "_value")
|> yield(name: "results")
FLUX
@influx_client.create_query_api.query(query: query).to_json
rescue StandardError => e
logger.error "InfluxDB query failed: #{e.message}"
{ error: 'InfluxDB query failed' }.to_json
end
end
loki_thread = Thread.new do
begin
loki_query = URI.encode_www_form(
'query' => "{correlation_id=\"#{id}\"}",
'limit' => 1000, # 设置一个合理的上限
'start' => (Time.now - 24 * 3600).to_i * 1_000_000_000,
'direction' => 'forward'
)
loki_uri = URI.parse("#{ENV.fetch('LOKI_QUERY_URL', 'http://localhost:3100')}/loki/api/v1/query_range?#{loki_query}")
response = Net::HTTP.get_response(loki_uri)
response.body
rescue StandardError => e
logger.error "Loki query failed: #{e.message}"
{ error: 'Loki query failed' }.to_json
end
end
influx_results = JSON.parse(influx_thread.value)
loki_results = JSON.parse(loki_thread.value)
{
metrics: influx_results,
logs: loki_results
}.to_json
end
# ...
这个端点是仪表盘的核心数据源。前端拿到这些数据后,剩下的就是可视化的工作了。这里 styled-components 的作用就体现出来了。我们可以为不同的日志级别定义不同的样式,为指标图表创建可复用的主题。
frontend/src/components/SessionDashboard.js
import React, { useState, useEffect } from 'react';
import { useParams } from 'react-router-dom';
import styled from 'styled-components';
const DashboardLayout = styled.div`
display: grid;
grid-template-columns: 1fr;
grid-template-rows: auto 1fr;
gap: 20px;
padding: 20px;
height: calc(100vh - 40px);
font-family: monospace;
`;
const MetricsPanel = styled.div`
background-color: #282c34;
color: white;
padding: 15px;
border-radius: 8px;
`;
const LogsPanel = styled.div`
background-color: #f0f0f0;
padding: 15px;
border-radius: 8px;
overflow-y: auto;
`;
const LogEntry = styled.div`
padding: 5px;
border-bottom: 1px solid #ddd;
color: ${props => {
switch (props.level) {
case 'error': return '#d9534f';
case 'warn': return '#f0ad4e';
default: return '#333';
}
}};
`;
const SessionDashboard = () => {
const { sessionId } = useParams();
const [data, setData] = useState({ metrics: [], logs: null });
const [loading, setLoading] = useState(true);
const [error, setError] = useState(null);
useEffect(() => {
const fetchSessionData = async () => {
try {
setLoading(true);
const response = await fetch(`/api/v1/session/${sessionId}`);
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const result = await response.json();
// 解析 Loki 返回的数据
const parsedLogs = result.logs?.data?.result[0]?.values.map(v => ({
timestamp: new Date(parseInt(v[0].substring(0, 13))).toISOString(),
...JSON.parse(v[1])
})) || [];
setData({ metrics: result.metrics, logs: parsedLogs });
} catch (e) {
setError(e.message);
} finally {
setLoading(false);
}
};
fetchSessionData();
}, [sessionId]);
if (loading) return <div>Loading session {sessionId}...</div>;
if (error) return <div>Error: {error}</div>;
return (
<DashboardLayout>
<MetricsPanel>
<h3>Metrics for Session: {sessionId}</h3>
{/* 在这里可以使用图表库如 Recharts 来渲染指标 */}
<pre>{JSON.stringify(data.metrics, null, 2)}</pre>
</MetricsPanel>
<LogsPanel>
<h3>Logs</h3>
{data.logs && data.logs.map((log, index) => (
<LogEntry key={index} level={log.level}>
<strong>{log.timestamp} [{log.level.toUpperCase()}]</strong>: {log.message}
{Object.keys(log.context).length > 0 && <pre>{JSON.stringify(log.context, null, 2)}</pre>}
</LogEntry>
))}
</LogsPanel>
</DashboardLayout>
);
};
export default SessionDashboard;
这个仪表盘虽然简单,但它验证了整个链路的可行性。我们可以清晰地看到某个会话的所有性能数据和日志,它们被 correlation_id 牢牢地绑定在一起。当看到一个 LCP 高达 8 秒的指标时,我们可以立刻滚动到对应时间点的日志,可能会发现一个关键的 API 请求失败,或者一个渲染密集型组件被重复挂载。问题定位的效率得到了质的提升。
局限性与未来展望
这个系统并非完美。在真实生产环境中,有几个问题需要考虑:
- 后端瓶颈: Ruby API 是一个单点。在高流量下,它可能成为瓶颈。一个更健壮的架构应该让前端直接将数据发送到可扩展的收集器(如 OpenTelemetry Collector),由收集器负责将数据分发到 InfluxDB 和 Loki。
- Loki 的高基数问题: 将 UUID 作为 label 会导致 Loki 的索引基数(cardinality)爆炸,这在超大规模下会严重影响 Loki 的性能。一个改进方案是将
correlation_id作为日志内容的一部分,而不是 label。查询时使用 Loki 的 filter 表达式(如|= "correlation_id=some-uuid")进行过滤。这样查询速度会变慢,但保证了 Loki 索引的健康。这是一个典型的权衡。 - 数据采样: 收集所有用户的每一个会话数据成本高昂。可以实施采样策略,比如只收集 10% 的会话,或者只收集那些性能指标超过某个阈值的会话。
- 数据安全: 当前实现没有考虑认证和授权,在公网环境中是完全不可接受的。需要增加 API key 或其他认证机制。
下一步的迭代方向是引入分布式追踪(Tracing),将 correlation_id 升级为 trace_id,并使其在前端、Ruby 后端以及所有下游微服务之间传递,从而构建一个完整的、贯穿前后台的统一观测平台。