监控系统里的服务依赖拓扑图通常是静态的,基于某个时间点的快照生成,或者以较低的频率更新。在真实生产环境中,特别是在进行灰度发布、混沌工程实验或应对突发故障时,我们需要的是一个能实时反映服务间调用关系、健康状态和流量变化的动态视图。静态拓扑图在这种场景下几乎无用,因为它无法捕捉到毫秒级的状态变迁。
这个问题的核心在于构建一个从后端数据源到前端UI的低延迟、高吞吐的实时数据管道。我们的目标是创建一个轻量级的可视化工具,后端能高效处理和广播状态更新,前端能流畅地渲染这些变化,并且整个前端应用的复杂逻辑必须有可靠的测试覆盖。
技术选型直接决定了方案的成败。后端需要一个高性能、低资源占用的Web框架,能轻松处理大量并发WebSocket连接。Go语言及其生态中的Fiber框架因其基于Fasthttp的性能优势成为首选。前端渲染动态拓扑,节点和连接线的样式需根据实时数据(如健康状态、延迟)动态变化,这是一个典型的CSS-in-JS适用场景,Emotion的动态样式能力和性能表现都很出色。最后,这种高度动态化、依赖外部数据源(WebSocket)的前端应用,其测试是个挑战。Vitest以其现代化的设计、与Vite的无缝集成以及强大的Mock能力,能很好地应对这一挑战。
后端架构:基于 Go-Fiber 的 WebSocket 广播中心
我们的后端不处理复杂的业务逻辑,其唯一职责是:维护一个WebSocket连接池,并定期向所有连接的客户端广播最新的服务拓扑数据。为此,需要设计一个并发安全的广播中心(Hub)。
// main.go
package main
import (
"encoding/json"
"log"
"sync"
"time"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/contrib/websocket"
"math/rand"
)
// Service represents a single microservice node in the topology.
type Service struct {
ID string `json:"id"`
Name string `json:"name"`
Status string `json:"status"` // "healthy", "degraded", "down"
}
// Link represents a connection between two services.
type Link struct {
Source string `json:"source"`
Target string `json:"target"`
Status string `json:"status"` // "active", "inactive"
}
// Topology represents the entire service graph.
type Topology struct {
Services []Service `json:"services"`
Links []Link `json:"links"`
}
// Hub maintains the set of active clients and broadcasts messages to them.
type Hub struct {
clients map[*websocket.Conn]bool
broadcast chan []byte
register chan *websocket.Conn
unregister chan *websocket.Conn
mutex sync.Mutex
}
func newHub() *Hub {
return &Hub{
broadcast: make(chan []byte),
register: make(chan *websocket.Conn),
unregister: make(chan *websocket.Conn),
clients: make(map[*websocket.Conn]bool),
}
}
// The core logic for the hub. It listens on its channels and handles client registration,
// unregistration, and message broadcasting in a single goroutine to prevent race conditions.
func (h *Hub) run() {
for {
select {
case conn := <-h.register:
h.mutex.Lock()
h.clients[conn] = true
h.mutex.Unlock()
log.Println("Client registered. Total clients:", len(h.clients))
case conn := <-h.unregister:
h.mutex.Lock()
if _, ok := h.clients[conn]; ok {
delete(h.clients, conn)
conn.Close()
log.Println("Client unregistered. Total clients:", len(h.clients))
}
h.mutex.Unlock()
case message := <-h.broadcast:
h.mutex.Lock()
for conn := range h.clients {
// WriteMessage is not guaranteed to be thread-safe by all implementations,
// so even though we are in a single goroutine, an external lock on the map is good practice.
if err := conn.WriteMessage(websocket.TextMessage, message); err != nil {
log.Println("write error:", err)
// In a real-world scenario, you might want to unregister the client here.
go func(c *websocket.Conn) {
h.unregister <- c
}(conn)
}
}
h.mutex.Unlock()
}
}
}
// A mock data generator that simulates topology changes every 2 seconds.
// This is where you would integrate with a real service discovery or observability backend.
func generateMockTopology() Topology {
services := []Service{
{ID: "svc-api-gateway", Name: "API Gateway", Status: "healthy"},
{ID: "svc-auth", Name: "Auth Service", Status: "healthy"},
{ID: "svc-users", Name: "Users Service", Status: "healthy"},
{ID: "svc-orders", Name: "Orders Service", Status: "degraded"},
{ID: "svc-payments", Name: "Payments Service", Status: "down"},
{ID: "svc-db-master", Name: "Database", Status: "healthy"},
}
// Randomly change service statuses to simulate a dynamic environment.
statuses := []string{"healthy", "degraded", "down"}
for i := range services {
if rand.Float32() < 0.2 { // 20% chance to change status
services[i].Status = statuses[rand.Intn(len(statuses))]
}
}
links := []Link{
{Source: "svc-api-gateway", Target: "svc-auth", Status: "active"},
{Source: "svc-api-gateway", Target: "svc-users", Status: "active"},
{Source: "svc-api-gateway", Target: "svc-orders", Status: "active"},
{Source: "svc-orders", Target: "svc-payments", Status: "inactive"}, // Payments service is down
{Source: "svc-orders", Target: "svc-db-master", Status: "active"},
{Source: "svc-users", Target: "svc-db-master", Status: "active"},
}
// If target is down, link should be inactive
for i := range links {
var targetStatus string
for _, srv := range services {
if srv.ID == links[i].Target {
targetStatus = srv.Status
break
}
}
if targetStatus == "down" {
links[i].Status = "inactive"
} else {
links[i].Status = "active"
}
}
return Topology{Services: services, Links: links}
}
func main() {
app := fiber.New()
hub := newHub()
go hub.run()
// Ticker to periodically generate and broadcast new topology data.
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
<-ticker.C
topology := generateMockTopology()
message, err := json.Marshal(topology)
if err != nil {
log.Println("json marshal error:", err)
continue
}
hub.broadcast <- message
}
}()
app.Use("/ws", func(c *fiber.Ctx) error {
if websocket.IsWebSocketUpgrade(c) {
c.Locals("allowed", true)
return c.Next()
}
return fiber.ErrUpgradeRequired
})
app.Get("/ws/topology", websocket.New(func(c *websocket.Conn) {
// When a new client connects, register it with the hub.
hub.register <- c
// The defer call ensures unregistration when the client disconnects for any reason.
// This is critical for preventing memory leaks in the hub.
defer func() {
hub.unregister <- c
}()
// Keep the connection alive by reading messages. We don't do anything with them.
// This loop will break if the client closes the connection, triggering the defer.
for {
if _, _, err := c.ReadMessage(); err != nil {
log.Println("read error:", err)
break
}
}
}))
log.Fatal(app.Listen(":3001"))
}
这里的核心是Hub结构体。它使用channel来序列化对clients map的访问,这是一种比直接使用sync.Mutex包裹每次访问更符合Go语言习惯的并发控制模式。一个独立的goroutine hub.run()作为Hub的“心脏”,处理所有的并发事件。数据生成器generateMockTopology在一个单独的goroutine中运行,通过hub.broadcast channel将数据发送给Hub,实现了生产者与消费者的解耦。
前端实现:React、Emotion 与自定义 WebSocket Hook
前端的任务是连接WebSocket,接收数据,并使用Emotion将拓扑数据动态渲染出来。一个常见的错误是直接在组件内部管理WebSocket连接,这会导致组件重渲染时连接被重复创建或断开。正确的做法是将其抽象为一个自定义Hook。
// src/hooks/useTopologySocket.ts
import { useEffect, useState, useRef } from 'react';
// Corresponds to the Go backend structs
export interface Service {
id: string;
name: string;
status: 'healthy' | 'degraded' | 'down';
}
export interface Link {
source: string;
target: string;
status: 'active' | 'inactive';
}
export interface Topology {
services: Service[];
links: Link[];
}
type WebSocketStatus = 'connecting' | 'open' | 'closing' | 'closed';
export const useTopologySocket = (url: string) => {
const [topology, setTopology] = useState<Topology>({ services: [], links: [] });
const [status, setStatus] = useState<WebSocketStatus>('connecting');
const ws = useRef<WebSocket | null>(null);
useEffect(() => {
// This effect handles the entire lifecycle of the WebSocket connection.
ws.current = new WebSocket(url);
setStatus('connecting');
ws.current.onopen = () => {
console.log('WebSocket connection established.');
setStatus('open');
};
ws.current.onmessage = (event) => {
try {
const data: Topology = JSON.parse(event.data);
// In a high-frequency update scenario, it's crucial that this state update
// is efficient. React's reconciliation will handle the rest.
setTopology(data);
} catch (error) {
console.error('Failed to parse topology data:', error);
}
};
ws.current.onerror = (event) => {
console.error('WebSocket error:', event);
};
ws.current.onclose = () => {
console.log('WebSocket connection closed.');
setStatus('closed');
};
// The cleanup function is critical. It ensures that when the component
// unmounts (or the URL changes), the old WebSocket connection is properly closed.
return () => {
if (ws.current && ws.current.readyState < 2) { // CONNECTING or OPEN
setStatus('closing');
ws.current.close();
}
};
}, [url]); // Re-run the effect if the URL changes.
return { topology, status };
};
接下来,我们使用Emotion创建可根据状态改变样式的组件,并在主视图中消费上述Hook。
// src/components/TopologyVisualizer.tsx
import styled from '@emotion/styled';
import { useTopologySocket, Service, Link } from '../hooks/useTopologySocket';
const STATUS_COLORS = {
healthy: '#28a745',
degraded: '#ffc107',
down: '#dc3545',
active: '#007bff',
inactive: '#6c757d',
};
const Canvas = styled.div`
position: relative;
width: 100%;
height: 80vh;
background-color: #f8f9fa;
border: 1px solid #dee2e6;
overflow: hidden; /* Prevent nodes from being dragged out */
`;
// Using transient props (`$`) to prevent them from being passed to the DOM element.
const ServiceNode = styled.div<{ $status: Service['status']; $x: number; $y: number }>`
position: absolute;
left: ${({ $x }) => $x}%;
top: ${({ $y }) => $y}%;
transform: translate(-50%, -50%);
width: 120px;
padding: 10px;
background-color: #fff;
border: 2px solid ${({ $status }) => STATUS_COLORS[$status]};
border-radius: 8px;
text-align: center;
font-family: monospace;
font-size: 14px;
cursor: pointer;
transition: all 0.3s ease-in-out;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
&:hover {
transform: translate(-50%, -50%) scale(1.05);
z-index: 10;
}
`;
// Simple SVG line for links. In a real app, you'd use a more robust graphing library.
const SvgCanvas = styled.svg`
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
pointer-events: none; /* Allows clicking on nodes underneath */
`;
const ConnectionLine = styled.line<{ $status: Link['status'] }>`
stroke: ${({ $status }) => STATUS_COLORS[$status]};
stroke-width: 2;
stroke-dasharray: ${({ $status }) => ($status === 'inactive' ? '5, 5' : 'none')};
transition: stroke 0.3s ease-in-out;
`;
// A simple layout engine for positioning nodes.
const calculateLayout = (services: Service[]) => {
const positions: Record<string, { x: number; y: number }> = {};
const count = services.length;
const radius = 35; // 35% of canvas half-width/height
const centerX = 50;
const centerY = 50;
services.forEach((service, index) => {
const angle = (index / count) * 2 * Math.PI;
positions[service.id] = {
x: centerX + radius * Math.cos(angle),
y: centerY + radius * Math.sin(angle),
};
});
return positions;
};
export const TopologyVisualizer = () => {
const { topology, status } = useTopologySocket('ws://localhost:3001/ws/topology');
if (status !== 'open' || !topology) {
return <div>Status: {status}...</div>;
}
const positions = calculateLayout(topology.services);
return (
<Canvas>
<SvgCanvas>
{topology.links.map((link) => {
const sourcePos = positions[link.source];
const targetPos = positions[link.target];
if (!sourcePos || !targetPos) return null;
return (
<ConnectionLine
key={`${link.source}-${link.target}`}
x1={`${sourcePos.x}%`}
y1={`${sourcePos.y}%`}
x2={`${targetPos.x}%`}
y2={`${targetPos.y}%`}
$status={link.status}
/>
);
})}
</SvgCanvas>
{topology.services.map((service) => {
const pos = positions[service.id];
if (!pos) return null;
return (
<ServiceNode key={service.id} $status={service.status} $x={pos.x} $y={pos.y}>
<strong>{service.name}</strong>
<div style={{ color: STATUS_COLORS[service.status] }}>{service.status}</div>
</ServiceNode>
);
})}
</Canvas>
);
};
这里的关键是Emotion的使用。ServiceNode和ConnectionLine的样式完全由传入的props驱动。例如,border-color直接映射到$status prop。这种方式使得UI状态和数据状态紧密耦合,代码内聚性更高,也更容易推理。
测试挑战:使用 Vitest Mock WebSocket
如何测试useTopologySocket这个Hook?它依赖于一个全局的、基于网络的WebSocket对象。直接在测试环境中运行会失败。Vitest的vi.spyOn和vi.fn可以优雅地解决这个问题。我们需要创建一个WebSocket的Mock实现。
// src/hooks/useTopologySocket.test.ts
import { renderHook, act } from '@testing-library/react';
import { vi, describe, it, expect, beforeEach, afterEach } from 'vitest';
import { useTopologySocket, Topology } from './useTopologySocket';
// A mock WebSocket class to simulate server behavior.
// This is the core of the test setup.
class MockWebSocket {
static instances: MockWebSocket[] = [];
url: string;
onopen: () => void = () => {};
onmessage: (event: { data: string }) => void = () => {};
onclose: () => void = () => {};
onerror: (event: any) => void = () => {};
readyState: number = 0; // 0: CONNECTING
constructor(url: string) {
this.url = url;
MockWebSocket.instances.push(this);
this.readyState = 0;
// Simulate async connection
setTimeout(() => {
this.readyState = 1; // 1: OPEN
this.onopen();
}, 10);
}
// Method for tests to simulate a server message
mockReceiveMessage(data: Topology) {
this.onmessage({ data: JSON.stringify(data) });
}
// Method for tests to simulate a server close
mockClose() {
this.readyState = 3; // 3: CLOSED
this.onclose();
}
close() {
this.readyState = 2; // 2: CLOSING
setTimeout(() => {
this.mockClose();
}, 10);
}
}
describe('useTopologySocket', () => {
beforeEach(() => {
// Replace the global WebSocket with our mock before each test.
vi.stubGlobal('WebSocket', MockWebSocket);
MockWebSocket.instances = [];
});
afterEach(() => {
// Restore the original WebSocket implementation after each test.
vi.unstubAllGlobals();
});
it('should establish a connection and update status to open', async () => {
const { result } = renderHook(() => useTopologySocket('ws://test-server'));
expect(result.current.status).toBe('connecting');
// Wait for the mock connection to "open"
await act(async () => {
await new Promise(resolve => setTimeout(resolve, 20));
});
expect(result.current.status).toBe('open');
expect(MockWebSocket.instances).toHaveLength(1);
expect(MockWebSocket.instances[0].url).toBe('ws://test-server');
});
it('should receive and parse topology data', async () => {
const { result } = renderHook(() => useTopologySocket('ws://test-server'));
const mockTopology: Topology = {
services: [{ id: 'svc1', name: 'Service 1', status: 'healthy' }],
links: [],
};
// Wait for connection to open, then simulate a message from the server.
await act(async () => {
await new Promise(resolve => setTimeout(resolve, 20));
MockWebSocket.instances[0].mockReceiveMessage(mockTopology);
});
expect(result.current.topology).toEqual(mockTopology);
});
it('should correctly handle connection closure on unmount', async () => {
const { unmount } = renderHook(() => useTopologySocket('ws://test-server'));
await act(async () => {
await new Promise(resolve => setTimeout(resolve, 20));
});
const instance = MockWebSocket.instances[0];
const closeSpy = vi.spyOn(instance, 'close');
// unmount will trigger the cleanup function in useEffect
unmount();
expect(closeSpy).toHaveBeenCalled();
});
});
这个测试文件的精髓在于MockWebSocket类。它模拟了真实WebSocket的构造函数和核心事件处理器(onopen, onmessage等)。vi.stubGlobal在测试开始前用我们的Mock替换了全局WebSocket对象,并在结束后恢复它,确保测试的隔离性。测试用例通过调用mockReceiveMessage来模拟服务器推送数据,然后断言Hook的状态是否如预期那样更新。这种方式让我们可以在完全脱离网络的环境下,对复杂的异步和外部依赖逻辑进行确定性的测试。
graph TD
A[Frontend App / React] -- "Renders" --> B(TopologyVisualizer Component);
B -- "Calls hook" --> C{useTopologySocket};
C -- "new WebSocket()" --> D(WebSocket Connection);
D -- "wss://..." --> E[Go-Fiber Server];
E -- "Manages connections" --> F(Hub);
G[Mock Data Ticker] -- "Generates topology every 2s" --> F;
F -- "Broadcasts data" --> E;
E -- "Pushes message" --> D;
D -- "onmessage" --> C;
C -- "setTopology()" --> B;
B -- "Re-renders with new data" --> A;
方案的局限与未来路径
这个实现虽然验证了核心链路,但在生产环境中还有几个方面需要深化。
首先,前端的可视化部分非常基础。一个真正的拓扑图需要一个健壮的布局算法来避免节点和连线重叠,以及更丰富的交互,如拖拽、缩放、点击节点显示详情。这通常需要引入如 D3.js 或 react-flow 这样的专用库。
其次,后端的数据源是模拟的。在实际应用中,它需要接入一个真正的服务发现系统(如Consul, Etcd)或一个可观测性后端(如Prometheus, OpenTelemetry Collector),从那里获取实时的服务元数据和健康检查结果。
最后,WebSocket广播中心的实现是单实例的。如果客户端数量巨大,或者需要高可用性,这个单点Hub会成为瓶颈和故障点。一个可扩展的方案需要将广播逻辑与应用实例解耦,通常会引入一个外部消息中间件,如Redis Pub/Sub,由Go应用将消息发布到Redis频道,所有实例再从频道订阅消息并推送给各自连接的客户端。这样就实现了水平扩展。