Go-Fiber 与 Emotion 构建实时服务拓扑的可视化及其 Vitest 端到端测试实践


监控系统里的服务依赖拓扑图通常是静态的,基于某个时间点的快照生成,或者以较低的频率更新。在真实生产环境中,特别是在进行灰度发布、混沌工程实验或应对突发故障时,我们需要的是一个能实时反映服务间调用关系、健康状态和流量变化的动态视图。静态拓扑图在这种场景下几乎无用,因为它无法捕捉到毫秒级的状态变迁。

这个问题的核心在于构建一个从后端数据源到前端UI的低延迟、高吞吐的实时数据管道。我们的目标是创建一个轻量级的可视化工具,后端能高效处理和广播状态更新,前端能流畅地渲染这些变化,并且整个前端应用的复杂逻辑必须有可靠的测试覆盖。

技术选型直接决定了方案的成败。后端需要一个高性能、低资源占用的Web框架,能轻松处理大量并发WebSocket连接。Go语言及其生态中的Fiber框架因其基于Fasthttp的性能优势成为首选。前端渲染动态拓扑,节点和连接线的样式需根据实时数据(如健康状态、延迟)动态变化,这是一个典型的CSS-in-JS适用场景,Emotion的动态样式能力和性能表现都很出色。最后,这种高度动态化、依赖外部数据源(WebSocket)的前端应用,其测试是个挑战。Vitest以其现代化的设计、与Vite的无缝集成以及强大的Mock能力,能很好地应对这一挑战。

后端架构:基于 Go-Fiber 的 WebSocket 广播中心

我们的后端不处理复杂的业务逻辑,其唯一职责是:维护一个WebSocket连接池,并定期向所有连接的客户端广播最新的服务拓扑数据。为此,需要设计一个并发安全的广播中心(Hub)。

// main.go
package main

import (
	"encoding/json"
	"log"
	"sync"
	"time"

	"github.com/gofiber/fiber/v2"
	"github.com/gofiber/contrib/websocket"
	"math/rand"
)

// Service represents a single microservice node in the topology.
type Service struct {
	ID     string `json:"id"`
	Name   string `json:"name"`
	Status string `json:"status"` // "healthy", "degraded", "down"
}

// Link represents a connection between two services.
type Link struct {
	Source string `json:"source"`
	Target string `json:"target"`
	Status string `json:"status"` // "active", "inactive"
}

// Topology represents the entire service graph.
type Topology struct {
	Services []Service `json:"services"`
	Links    []Link    `json:"links"`
}

// Hub maintains the set of active clients and broadcasts messages to them.
type Hub struct {
	clients    map[*websocket.Conn]bool
	broadcast  chan []byte
	register   chan *websocket.Conn
	unregister chan *websocket.Conn
	mutex      sync.Mutex
}

func newHub() *Hub {
	return &Hub{
		broadcast:  make(chan []byte),
		register:   make(chan *websocket.Conn),
		unregister: make(chan *websocket.Conn),
		clients:    make(map[*websocket.Conn]bool),
	}
}

// The core logic for the hub. It listens on its channels and handles client registration,
// unregistration, and message broadcasting in a single goroutine to prevent race conditions.
func (h *Hub) run() {
	for {
		select {
		case conn := <-h.register:
			h.mutex.Lock()
			h.clients[conn] = true
			h.mutex.Unlock()
			log.Println("Client registered. Total clients:", len(h.clients))
		case conn := <-h.unregister:
			h.mutex.Lock()
			if _, ok := h.clients[conn]; ok {
				delete(h.clients, conn)
				conn.Close()
				log.Println("Client unregistered. Total clients:", len(h.clients))
			}
			h.mutex.Unlock()
		case message := <-h.broadcast:
			h.mutex.Lock()
			for conn := range h.clients {
				// WriteMessage is not guaranteed to be thread-safe by all implementations,
				// so even though we are in a single goroutine, an external lock on the map is good practice.
				if err := conn.WriteMessage(websocket.TextMessage, message); err != nil {
					log.Println("write error:", err)
					// In a real-world scenario, you might want to unregister the client here.
					go func(c *websocket.Conn) {
						h.unregister <- c
					}(conn)
				}
			}
			h.mutex.Unlock()
		}
	}
}

// A mock data generator that simulates topology changes every 2 seconds.
// This is where you would integrate with a real service discovery or observability backend.
func generateMockTopology() Topology {
	services := []Service{
		{ID: "svc-api-gateway", Name: "API Gateway", Status: "healthy"},
		{ID: "svc-auth", Name: "Auth Service", Status: "healthy"},
		{ID: "svc-users", Name: "Users Service", Status: "healthy"},
		{ID: "svc-orders", Name: "Orders Service", Status: "degraded"},
		{ID: "svc-payments", Name: "Payments Service", Status: "down"},
		{ID: "svc-db-master", Name: "Database", Status: "healthy"},
	}

	// Randomly change service statuses to simulate a dynamic environment.
	statuses := []string{"healthy", "degraded", "down"}
	for i := range services {
		if rand.Float32() < 0.2 { // 20% chance to change status
			services[i].Status = statuses[rand.Intn(len(statuses))]
		}
	}

	links := []Link{
		{Source: "svc-api-gateway", Target: "svc-auth", Status: "active"},
		{Source: "svc-api-gateway", Target: "svc-users", Status: "active"},
		{Source: "svc-api-gateway", Target: "svc-orders", Status: "active"},
		{Source: "svc-orders", Target: "svc-payments", Status: "inactive"}, // Payments service is down
		{Source: "svc-orders", Target: "svc-db-master", Status: "active"},
		{Source: "svc-users", Target: "svc-db-master", Status: "active"},
	}
    
    // If target is down, link should be inactive
    for i := range links {
        var targetStatus string
        for _, srv := range services {
            if srv.ID == links[i].Target {
                targetStatus = srv.Status
                break
            }
        }
        if targetStatus == "down" {
            links[i].Status = "inactive"
        } else {
            links[i].Status = "active"
        }
    }

	return Topology{Services: services, Links: links}
}


func main() {
	app := fiber.New()
	hub := newHub()
	go hub.run()

	// Ticker to periodically generate and broadcast new topology data.
	go func() {
		ticker := time.NewTicker(2 * time.Second)
		defer ticker.Stop()
		for {
			<-ticker.C
			topology := generateMockTopology()
			message, err := json.Marshal(topology)
			if err != nil {
				log.Println("json marshal error:", err)
				continue
			}
			hub.broadcast <- message
		}
	}()

	app.Use("/ws", func(c *fiber.Ctx) error {
		if websocket.IsWebSocketUpgrade(c) {
			c.Locals("allowed", true)
			return c.Next()
		}
		return fiber.ErrUpgradeRequired
	})

	app.Get("/ws/topology", websocket.New(func(c *websocket.Conn) {
		// When a new client connects, register it with the hub.
		hub.register <- c
		
		// The defer call ensures unregistration when the client disconnects for any reason.
		// This is critical for preventing memory leaks in the hub.
		defer func() {
			hub.unregister <- c
		}()

		// Keep the connection alive by reading messages. We don't do anything with them.
		// This loop will break if the client closes the connection, triggering the defer.
		for {
			if _, _, err := c.ReadMessage(); err != nil {
				log.Println("read error:", err)
				break 
			}
		}
	}))

	log.Fatal(app.Listen(":3001"))
}

这里的核心是Hub结构体。它使用channel来序列化对clients map的访问,这是一种比直接使用sync.Mutex包裹每次访问更符合Go语言习惯的并发控制模式。一个独立的goroutine hub.run()作为Hub的“心脏”,处理所有的并发事件。数据生成器generateMockTopology在一个单独的goroutine中运行,通过hub.broadcast channel将数据发送给Hub,实现了生产者与消费者的解耦。

前端实现:React、Emotion 与自定义 WebSocket Hook

前端的任务是连接WebSocket,接收数据,并使用Emotion将拓扑数据动态渲染出来。一个常见的错误是直接在组件内部管理WebSocket连接,这会导致组件重渲染时连接被重复创建或断开。正确的做法是将其抽象为一个自定义Hook。

// src/hooks/useTopologySocket.ts
import { useEffect, useState, useRef } from 'react';

// Corresponds to the Go backend structs
export interface Service {
  id: string;
  name: string;
  status: 'healthy' | 'degraded' | 'down';
}

export interface Link {
  source: string;

  target: string;
  status: 'active' | 'inactive';
}

export interface Topology {
  services: Service[];
  links: Link[];
}

type WebSocketStatus = 'connecting' | 'open' | 'closing' | 'closed';

export const useTopologySocket = (url: string) => {
  const [topology, setTopology] = useState<Topology>({ services: [], links: [] });
  const [status, setStatus] = useState<WebSocketStatus>('connecting');
  const ws = useRef<WebSocket | null>(null);

  useEffect(() => {
    // This effect handles the entire lifecycle of the WebSocket connection.
    ws.current = new WebSocket(url);
    setStatus('connecting');

    ws.current.onopen = () => {
      console.log('WebSocket connection established.');
      setStatus('open');
    };

    ws.current.onmessage = (event) => {
      try {
        const data: Topology = JSON.parse(event.data);
        // In a high-frequency update scenario, it's crucial that this state update
        // is efficient. React's reconciliation will handle the rest.
        setTopology(data);
      } catch (error) {
        console.error('Failed to parse topology data:', error);
      }
    };

    ws.current.onerror = (event) => {
      console.error('WebSocket error:', event);
    };

    ws.current.onclose = () => {
      console.log('WebSocket connection closed.');
      setStatus('closed');
    };

    // The cleanup function is critical. It ensures that when the component
    // unmounts (or the URL changes), the old WebSocket connection is properly closed.
    return () => {
      if (ws.current && ws.current.readyState < 2) { // CONNECTING or OPEN
          setStatus('closing');
          ws.current.close();
      }
    };
  }, [url]); // Re-run the effect if the URL changes.

  return { topology, status };
};

接下来,我们使用Emotion创建可根据状态改变样式的组件,并在主视图中消费上述Hook。

// src/components/TopologyVisualizer.tsx
import styled from '@emotion/styled';
import { useTopologySocket, Service, Link } from '../hooks/useTopologySocket';

const STATUS_COLORS = {
  healthy: '#28a745',
  degraded: '#ffc107',
  down: '#dc3545',
  active: '#007bff',
  inactive: '#6c757d',
};

const Canvas = styled.div`
  position: relative;
  width: 100%;
  height: 80vh;
  background-color: #f8f9fa;
  border: 1px solid #dee2e6;
  overflow: hidden; /* Prevent nodes from being dragged out */
`;

// Using transient props (`$`) to prevent them from being passed to the DOM element.
const ServiceNode = styled.div<{ $status: Service['status']; $x: number; $y: number }>`
  position: absolute;
  left: ${({ $x }) => $x}%;
  top: ${({ $y }) => $y}%;
  transform: translate(-50%, -50%);
  width: 120px;
  padding: 10px;
  background-color: #fff;
  border: 2px solid ${({ $status }) => STATUS_COLORS[$status]};
  border-radius: 8px;
  text-align: center;
  font-family: monospace;
  font-size: 14px;
  cursor: pointer;
  transition: all 0.3s ease-in-out;
  box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);

  &:hover {
    transform: translate(-50%, -50%) scale(1.05);
    z-index: 10;
  }
`;

// Simple SVG line for links. In a real app, you'd use a more robust graphing library.
const SvgCanvas = styled.svg`
  position: absolute;
  top: 0;
  left: 0;
  width: 100%;
  height: 100%;
  pointer-events: none; /* Allows clicking on nodes underneath */
`;

const ConnectionLine = styled.line<{ $status: Link['status'] }>`
  stroke: ${({ $status }) => STATUS_COLORS[$status]};
  stroke-width: 2;
  stroke-dasharray: ${({ $status }) => ($status === 'inactive' ? '5, 5' : 'none')};
  transition: stroke 0.3s ease-in-out;
`;

// A simple layout engine for positioning nodes.
const calculateLayout = (services: Service[]) => {
    const positions: Record<string, { x: number; y: number }> = {};
    const count = services.length;
    const radius = 35; // 35% of canvas half-width/height
    const centerX = 50;
    const centerY = 50;

    services.forEach((service, index) => {
        const angle = (index / count) * 2 * Math.PI;
        positions[service.id] = {
            x: centerX + radius * Math.cos(angle),
            y: centerY + radius * Math.sin(angle),
        };
    });
    return positions;
};


export const TopologyVisualizer = () => {
  const { topology, status } = useTopologySocket('ws://localhost:3001/ws/topology');
  
  if (status !== 'open' || !topology) {
    return <div>Status: {status}...</div>;
  }

  const positions = calculateLayout(topology.services);

  return (
    <Canvas>
      <SvgCanvas>
        {topology.links.map((link) => {
          const sourcePos = positions[link.source];
          const targetPos = positions[link.target];
          if (!sourcePos || !targetPos) return null;
          return (
            <ConnectionLine
              key={`${link.source}-${link.target}`}
              x1={`${sourcePos.x}%`}
              y1={`${sourcePos.y}%`}
              x2={`${targetPos.x}%`}
              y2={`${targetPos.y}%`}
              $status={link.status}
            />
          );
        })}
      </SvgCanvas>
      {topology.services.map((service) => {
        const pos = positions[service.id];
        if (!pos) return null;
        return (
          <ServiceNode key={service.id} $status={service.status} $x={pos.x} $y={pos.y}>
            <strong>{service.name}</strong>
            <div style={{ color: STATUS_COLORS[service.status] }}>{service.status}</div>
          </ServiceNode>
        );
      })}
    </Canvas>
  );
};

这里的关键是Emotion的使用。ServiceNodeConnectionLine的样式完全由传入的props驱动。例如,border-color直接映射到$status prop。这种方式使得UI状态和数据状态紧密耦合,代码内聚性更高,也更容易推理。

测试挑战:使用 Vitest Mock WebSocket

如何测试useTopologySocket这个Hook?它依赖于一个全局的、基于网络的WebSocket对象。直接在测试环境中运行会失败。Vitest的vi.spyOnvi.fn可以优雅地解决这个问题。我们需要创建一个WebSocket的Mock实现。

// src/hooks/useTopologySocket.test.ts
import { renderHook, act } from '@testing-library/react';
import { vi, describe, it, expect, beforeEach, afterEach } from 'vitest';
import { useTopologySocket, Topology } from './useTopologySocket';

// A mock WebSocket class to simulate server behavior.
// This is the core of the test setup.
class MockWebSocket {
  static instances: MockWebSocket[] = [];
  
  url: string;
  onopen: () => void = () => {};
  onmessage: (event: { data: string }) => void = () => {};
  onclose: () => void = () => {};
  onerror: (event: any) => void = () => {};
  readyState: number = 0; // 0: CONNECTING

  constructor(url: string) {
    this.url = url;
    MockWebSocket.instances.push(this);
    this.readyState = 0;
    // Simulate async connection
    setTimeout(() => {
        this.readyState = 1; // 1: OPEN
        this.onopen();
    }, 10);
  }

  // Method for tests to simulate a server message
  mockReceiveMessage(data: Topology) {
    this.onmessage({ data: JSON.stringify(data) });
  }
  
  // Method for tests to simulate a server close
  mockClose() {
    this.readyState = 3; // 3: CLOSED
    this.onclose();
  }
  
  close() {
    this.readyState = 2; // 2: CLOSING
    setTimeout(() => {
      this.mockClose();
    }, 10);
  }
}

describe('useTopologySocket', () => {
  beforeEach(() => {
    // Replace the global WebSocket with our mock before each test.
    vi.stubGlobal('WebSocket', MockWebSocket);
    MockWebSocket.instances = [];
  });

  afterEach(() => {
    // Restore the original WebSocket implementation after each test.
    vi.unstubAllGlobals();
  });

  it('should establish a connection and update status to open', async () => {
    const { result } = renderHook(() => useTopologySocket('ws://test-server'));

    expect(result.current.status).toBe('connecting');

    // Wait for the mock connection to "open"
    await act(async () => {
      await new Promise(resolve => setTimeout(resolve, 20));
    });

    expect(result.current.status).toBe('open');
    expect(MockWebSocket.instances).toHaveLength(1);
    expect(MockWebSocket.instances[0].url).toBe('ws://test-server');
  });

  it('should receive and parse topology data', async () => {
    const { result } = renderHook(() => useTopologySocket('ws://test-server'));

    const mockTopology: Topology = {
      services: [{ id: 'svc1', name: 'Service 1', status: 'healthy' }],
      links: [],
    };

    // Wait for connection to open, then simulate a message from the server.
    await act(async () => {
      await new Promise(resolve => setTimeout(resolve, 20));
      MockWebSocket.instances[0].mockReceiveMessage(mockTopology);
    });

    expect(result.current.topology).toEqual(mockTopology);
  });
  
  it('should correctly handle connection closure on unmount', async () => {
    const { unmount } = renderHook(() => useTopologySocket('ws://test-server'));
    
    await act(async () => {
      await new Promise(resolve => setTimeout(resolve, 20));
    });

    const instance = MockWebSocket.instances[0];
    const closeSpy = vi.spyOn(instance, 'close');

    // unmount will trigger the cleanup function in useEffect
    unmount();

    expect(closeSpy).toHaveBeenCalled();
  });
});

这个测试文件的精髓在于MockWebSocket类。它模拟了真实WebSocket的构造函数和核心事件处理器(onopen, onmessage等)。vi.stubGlobal在测试开始前用我们的Mock替换了全局WebSocket对象,并在结束后恢复它,确保测试的隔离性。测试用例通过调用mockReceiveMessage来模拟服务器推送数据,然后断言Hook的状态是否如预期那样更新。这种方式让我们可以在完全脱离网络的环境下,对复杂的异步和外部依赖逻辑进行确定性的测试。

graph TD
    A[Frontend App / React] -- "Renders" --> B(TopologyVisualizer Component);
    B -- "Calls hook" --> C{useTopologySocket};
    C -- "new WebSocket()" --> D(WebSocket Connection);
    D -- "wss://..." --> E[Go-Fiber Server];
    E -- "Manages connections" --> F(Hub);
    G[Mock Data Ticker] -- "Generates topology every 2s" --> F;
    F -- "Broadcasts data" --> E;
    E -- "Pushes message" --> D;
    D -- "onmessage" --> C;
    C -- "setTopology()" --> B;
    B -- "Re-renders with new data" --> A;

方案的局限与未来路径

这个实现虽然验证了核心链路,但在生产环境中还有几个方面需要深化。
首先,前端的可视化部分非常基础。一个真正的拓扑图需要一个健壮的布局算法来避免节点和连线重叠,以及更丰富的交互,如拖拽、缩放、点击节点显示详情。这通常需要引入如 D3.jsreact-flow 这样的专用库。

其次,后端的数据源是模拟的。在实际应用中,它需要接入一个真正的服务发现系统(如Consul, Etcd)或一个可观测性后端(如Prometheus, OpenTelemetry Collector),从那里获取实时的服务元数据和健康检查结果。

最后,WebSocket广播中心的实现是单实例的。如果客户端数量巨大,或者需要高可用性,这个单点Hub会成为瓶颈和故障点。一个可扩展的方案需要将广播逻辑与应用实例解耦,通常会引入一个外部消息中间件,如Redis Pub/Sub,由Go应用将消息发布到Redis频道,所有实例再从频道订阅消息并推送给各自连接的客户端。这样就实现了水平扩展。


  目录