构建异构 MLOps 服务栈中 Go 语言 gRPC 适配层的单元测试与集成策略


我们面临一个典型的现代企业困境。基于 Kubeflow 和 FastAPI 的新 MLOps 平台已经能稳定地产出模型并提供在线预测服务,但模型效果依赖于一份关键的实时用户画像数据。这份数据的唯一权威来源(Source of Truth)是一个运行了近十年的 Java 单体应用,其数据访问层深度绑定了 MyBatis。直接连接它的数据库是被严令禁止的,而它暴露的旧有 RESTful API 性能孱弱、响应时间不稳定,且缺乏明确的 API 契约,导致我们的 Python 服务在集成时痛苦不堪。

初步的方案是直接在 FastAPI 服务中通过 requests 库调用这个旧 API,但这很快就暴露了问题:

  1. 性能瓶颈: Java 服务的 P95 响应时间在高峰期能达到 200ms,加上网络开销和 FastAPI 自身的处理,整个预测请求的延迟无法接受。
  2. 可靠性差: 每次 Java 服务部署,API 字段的细微变更都可能导致我们的 Python 服务解析失败,这种脆弱的集成方式在生产环境中就是定时炸弹。
  3. 测试黑洞: 对 FastAPI 的单元测试几乎无法进行。我们不得不依赖端到端的集成测试,但这既缓慢又不稳定,严重拖慢了迭代速度。

我们需要一个中间层。一个高性能、强类型、且易于测试的适配器,它能作为新世界(Python/MLOps)和旧世界(Java/MyBatis)之间的桥梁。技术选型很快锁定在 Go 和 gRPC 上。Go 的并发模型和性能非常适合网络 I/O 密集型应用,而 gRPC 的 Protobuf 则提供了我们梦寐以求的强类型契约。

这个适配器的核心职责是:接收来自 FastAPI 服务的 gRPC 请求,将其转换为对旧 Java 服务的 REST 调用,处理响应,然后通过 gRPC 返回给调用方。整个过程必须是高可用的,并且,最重要的是,它的每一个环节都必须具备隔离进行单元测试的能力。

定义不可动摇的契约:Protobuf

一切工作的起点是定义服务间的契约。这份 .proto 文件是整个异构系统的“宪法”,它规定了 FastAPI 服务和 Go 适配器之间通信的数据结构和方法。

// proto/feature/v1/feature_service.proto

syntax = "proto3";

package feature.v1;

option go_package = "github.com/your-org/feature-adapter/gen/go/feature/v1;featurev1";

// FeatureService 定义了获取用户画像的服务
service FeatureService {
  // GetUserFeatures 获取单个用户的实时画像特征
  rpc GetUserFeatures(GetUserFeaturesRequest) returns (GetUserFeaturesResponse);
}

message GetUserFeaturesRequest {
  string user_id = 1;
}

message GetUserFeaturesResponse {
  string user_id = 1;
  map<string, FeatureValue> features = 2;
}

message FeatureValue {
  oneof value {
    string string_value = 1;
    int64 int_value = 2;
    double double_value = 3;
    bool bool_value = 4;
  }
}

这份契约非常明确:GetUserFeatures 方法接收一个包含 user_id 的请求,返回一个包含 user_id 和一个特征映射 map<string, FeatureValue> 的响应。FeatureValue 使用 oneof 来支持多种数据类型。这份契约一旦确立,我们就可以为 Go 和 Python 生成相应的服务端和客户端代码,这是后续所有开发和测试的基础。

Go 适配器的实现与可测试性设计

Go 适配器是整个架构的核心。它的设计必须把可测试性放在首位。这意味着要彻底避免在业务逻辑中直接实例化具体的依赖,比如 HTTP 客户端。所有外部依赖都必须通过接口(Interface)注入。

项目结构

一个合理的项目结构是保证可维护性的第一步。

feature-adapter/
├── cmd/
│   └── server/
│       └── main.go         // 程序入口
├── internal/
│   ├── adapter/
│   │   └── legacy_client.go // 调用遗留 Java 服务的 HTTP 客户端
│   ├── config/
│   │   └── config.go       // 配置加载
│   ├── server/
│   │   └── grpc.go         // gRPC 服务实现
│   └── service/
│       └── feature.go      // 业务逻辑核心
├── gen/
│   └── go/                 // protoc 生成的 Go 代码
└── proto/
    └── ...

与遗留系统的交互层

这里的关键是定义一个接口来抽象对 Java 服务的 HTTP 调用。

internal/adapter/legacy_client.go:

package adapter

import (
	"context"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"time"
)

// LegacyFeature represents the structure of the data from the old Java service.
// 注意,这里的字段名可能与我们的 gRPC 契约不同,这就是适配器存在的意义。
type LegacyFeature struct {
	UID          string      `json:"uid"`
	ProfileValue interface{} `json:"profileValue"`
	ProfileType  string      `json:"profileType"`
	FeatureName  string      `json:"featureName"`
}

// LegacyServiceClient defines the interface for interacting with the legacy system.
// 这是可测试性设计的核心:定义一个接口。
type LegacyServiceClient interface {
	FetchUserFeatures(ctx context.Context, userID string) ([]LegacyFeature, error)
}

// httpLegacyClient implements the LegacyServiceClient with a real HTTP client.
type httpLegacyClient struct {
	client  *http.Client
	baseURL string
}

// NewHTTPLegacyClient creates a new client for the legacy service.
func NewHTTPLegacyClient(baseURL string, timeout time.Duration) LegacyServiceClient {
	return &httpLegacyClient{
		client: &http.Client{
			Timeout: timeout,
		},
		baseURL: baseURL,
	}
}

// FetchUserFeatures performs the actual HTTP call to the Java/MyBatis service.
func (c *httpLegacyClient) FetchUserFeatures(ctx context.Context, userID string) ([]LegacyFeature, error) {
	// 在真实项目中,URL 和请求构建会更复杂。
	url := fmt.Sprintf("%s/api/v1/user-features?userId=%s", c.baseURL, userID)
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
	if err != nil {
		// 这里的日志记录至关重要
		return nil, fmt.Errorf("failed to create request for user %s: %w", userID, err)
	}

	resp, err := c.client.Do(req)
	if err != nil {
		return nil, fmt.Errorf("http call to legacy service failed for user %s: %w", userID, err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		// 对非 200 状态码的健壮处理
		bodyBytes, _ := io.ReadAll(resp.Body)
		return nil, fmt.Errorf("legacy service returned status %d for user %s: %s", resp.StatusCode, userID, string(bodyBytes))
	}

	var features []LegacyFeature
	if err := json.NewDecoder(resp.Body).Decode(&features); err != nil {
		return nil, fmt.Errorf("failed to decode legacy service response for user %s: %w", userID, err)
	}

	return features, nil
}

通过定义 LegacyServiceClient 接口,我们在单元测试中就可以用一个 mock 对象来替换掉 httpLegacyClient,从而完全隔离对外部 HTTP 的依赖。

核心业务逻辑

业务逻辑层 (FeatureService) 接收 LegacyServiceClient 作为依赖,负责编排调用、数据转换和错误处理。

internal/service/feature.go:

package service

import (
	"context"
	"log/slog"

	"github.com/your-org/feature-adapter/internal/adapter"
	featurev1 "github.com/your-org/feature-adapter/gen/go/feature/v1"
)

// FeatureService implements the gRPC service logic.
// 它依赖于我们之前定义的接口,而不是具体实现。
type FeatureService struct {
	legacyClient adapter.LegacyServiceClient
	logger       *slog.Logger
}

func NewFeatureService(legacyClient adapter.LegacyServiceClient, logger *slog.Logger) *FeatureService {
	return &FeatureService{
		legacyClient: legacyClient,
		logger:       logger,
	}
}

// GetUserFeatures handles the gRPC request, calls the legacy service via the adapter,
// and transforms the data into the protobuf format.
func (s *FeatureService) GetUserFeatures(ctx context.Context, req *featurev1.GetUserFeaturesRequest) (*featurev1.GetUserFeaturesResponse, error) {
	userID := req.GetUserId()
	if userID == "" {
		// 参数校验是服务的第一道防线
		return nil, fmt.Errorf("user_id cannot be empty") // 在 gRPC 中会转换为 INVALID_ARGUMENT 状态码
	}

	s.logger.Info("Fetching features for user", "user_id", userID)

	legacyFeatures, err := s.legacyClient.FetchUserFeatures(ctx, userID)
	if err != nil {
		s.logger.Error("Failed to fetch features from legacy service", "error", err, "user_id", userID)
		// 错误需要被包装和向上传递
		return nil, fmt.Errorf("internal error fetching features: %w", err) // 转换为 UNKNOWN 状态码
	}

	// 数据转换逻辑:从遗留系统的模型转换为 gRPC 契约模型
	respFeatures := make(map[string]*featurev1.FeatureValue)
	for _, f := range legacyFeatures {
		fv := &featurev1.FeatureValue{}
		switch v := f.ProfileValue.(type) {
		case string:
			fv.Value = &featurev1.FeatureValue_StringValue{StringValue: v}
		case float64:
			// JSON number unmarshals to float64 by default
			// 这里需要根据 ProfileType 做更精细的判断
			if f.ProfileType == "INTEGER" {
				fv.Value = &featurev1.FeatureValue_IntValue{IntValue: int64(v)}
			} else {
				fv.Value = &featurev1.FeatureValue_DoubleValue{DoubleValue: v}
			}
		case bool:
			fv.Value = &featurev1.FeatureValue_BoolValue{BoolValue: v}
		default:
			s.logger.Warn("Unsupported feature type from legacy system", "feature_name", f.FeatureName, "type", f.ProfileType)
			continue // 跳过无法处理的类型
		}
		respFeatures[f.FeatureName] = fv
	}
	
	return &featurev1.GetUserFeaturesResponse{
		UserId:   userID,
		Features: respFeatures,
	}, nil
}

Go 适配器的单元测试

现在,我们可以为 FeatureService 编写一个完全隔离的单元测试。我们将使用 gomock 来生成 LegacyServiceClient 接口的 mock 实现。

首先,安装 mockgen 并生成 mock 文件:
go install go.uber.org/mock/mockgen@latest
mockgen -source=internal/adapter/legacy_client.go -destination=internal/adapter/mock_legacy_client_test.go -package=adapter

internal/service/feature_test.go:

package service

import (
	"context"
	"errors"
	"log/slog"
	"os"
	"testing"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	"go.uber.org/mock/gomock"

	"github.com/your-org/feature-adapter/internal/adapter"
	featurev1 "github.com/your-org/feature-adapter/gen/go/feature/v1"
)

func TestFeatureService_GetUserFeatures(t *testing.T) {
	// 创建一个 gomock 控制器
	ctrl := gomock.NewController(t)
	defer ctrl.Finish()

	// 创建 mock client
	mockLegacyClient := adapter.NewMockLegacyServiceClient(ctrl)
	logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
	
	// 实例化被测试的服务,注入 mock client
	featureSvc := NewFeatureService(mockLegacyClient, logger)

	ctx := context.Background()
	testUserID := "user-123"

	t.Run("Success case", func(t *testing.T) {
		// 1. 准备 mock 数据
		mockResponse := []adapter.LegacyFeature{
			{UID: testUserID, FeatureName: "age", ProfileValue: float64(30), ProfileType: "INTEGER"},
			{UID: testUserID, FeatureName: "city", ProfileValue: "New York", ProfileType: "STRING"},
			{UID: testUserID, FeatureName: "is_premium", ProfileValue: true, ProfileType: "BOOLEAN"},
		}

		// 2. 设定 mock 预期:当 FetchUserFeatures 以指定参数被调用时,返回我们准备好的数据
		mockLegacyClient.EXPECT().
			FetchUserFeatures(gomock.Any(), testUserID).
			Return(mockResponse, nil).
			Times(1) // 确保它只被调用一次

		// 3. 执行被测试的方法
		req := &featurev1.GetUserFeaturesRequest{UserId: testUserID}
		resp, err := featureSvc.GetUserFeatures(ctx, req)

		// 4. 断言结果
		require.NoError(t, err)
		require.NotNil(t, resp)
		assert.Equal(t, testUserID, resp.UserId)
		assert.Len(t, resp.Features, 3)
		assert.Equal(t, int64(30), resp.Features["age"].GetIntValue())
		assert.Equal(t, "New York", resp.Features["city"].GetStringValue())
		assert.Equal(t, true, resp.Features["is_premium"].GetBoolValue())
	})

	t.Run("Legacy service returns error", func(t *testing.T) {
		// 设定 mock 预期:当方法被调用时,返回一个错误
		expectedErr := errors.New("network timeout")
		mockLegacyClient.EXPECT().
			FetchUserFeatures(gomock.Any(), testUserID).
			Return(nil, expectedErr).
			Times(1)

		req := &featurev1.GetUserFeaturesRequest{UserId: testUserID}
		resp, err := featureSvc.GetUserFeatures(ctx, req)

		// 断言我们的服务正确地处理并向上传播了错误
		assert.Nil(t, resp)
		require.Error(t, err)
		assert.Contains(t, err.Error(), "internal error fetching features")
		assert.ErrorIs(t, err, expectedErr)
	})

	t.Run("Request with empty user ID", func(t *testing.T) {
		req := &featurev1.GetUserFeaturesRequest{UserId: ""}
		resp, err := featureSvc.GetUserFeatures(ctx, req)

		assert.Nil(t, resp)
		require.Error(t, err)
		assert.Equal(t, "user_id cannot be empty", err.Error())
	})
}

这个测试文件覆盖了成功、依赖服务失败和输入参数校验失败等核心场景。它运行速度极快,因为它不涉及任何网络或文件 I/O,可以在 CI/CD 流水线中瞬时完成,为我们提供了强大的信心。

FastAPI 服务的集成与单元测试

在 Python 端,我们首先需要根据 .proto 文件生成 gRPC 客户端代码。

python -m grpc_tools.protoc -I./proto --python_out=./generated --grpc_python_out=./generated ./proto/feature/v1/feature_service.proto

然后,在 FastAPI 应用中,我们需要一个服务层来封装对 Go 适配器的 gRPC 调用。同样,为了可测试性,这个 gRPC client 不应该被直接使用。

FastAPI 端的服务层封装

# ml_service/feature_client.py
import grpc
from generated.feature.v1 import feature_service_pb2
from generated.feature.v1 import feature_service_pb2_grpc
from typing import Dict, Any

class FeatureClient:
    """
    一个封装了 gRPC 调用的客户端,使其更易于在 FastAPI 中使用和 mock。
    """
    def __init__(self, grpc_channel):
        self.stub = feature_service_pb2_grpc.FeatureServiceStub(grpc_channel)

    def get_features(self, user_id: str) -> Dict[str, Any]:
        try:
            request = feature_service_pb2.GetUserFeaturesRequest(user_id=user_id)
            response = self.stub.GetUserFeatures(request, timeout=0.5) # 设置超时
            
            # 将 protobuf 的 map 转换为 Python dict
            features = {}
            for key, feature_value in response.features.items():
                if feature_value.HasField("string_value"):
                    features[key] = feature_value.string_value
                elif feature_value.HasField("int_value"):
                    features[key] = feature_value.int_value
                # ... 其他类型
            return features

        except grpc.RpcError as e:
            # 这里的日志和错误处理是生产级代码的关键
            print(f"gRPC call failed for user {user_id}: {e.details()}")
            # 可以根据 e.code() 决定是重试还是直接失败
            raise  # 重新抛出,让上层处理

# 在应用启动时创建单例
# feature_client = FeatureClient(grpc.insecure_channel("go-adapter:50051"))

FastAPI Endpoint 实现

在 FastAPI 端点中,我们通过依赖注入来获取 FeatureClient 的实例。

# ml_service/main.py
from fastapi import FastAPI, Depends, HTTPException
from .feature_client import FeatureClient
# 假设有一个模型服务
# from .model import predict

app = FastAPI()

# 依赖注入函数,这使得替换 mock client 变得简单
def get_feature_client():
    # 在真实环境中,这里会返回连接到 Go 服务的真实 client
    # 为了演示,我们暂时不实现它
    # return feature_client
    pass

@app.post("/predict")
async def predict_endpoint(user_id: str, client: FeatureClient = Depends(get_feature_client)):
    try:
        # 1. 获取特征
        features = client.get_features(user_id)
        
        # 2. 如果没有特征,可能代表用户不存在
        if not features:
            raise HTTPException(status_code=404, detail="User features not found")

        # 3. 调用模型进行预测
        # prediction_result = predict(features)
        prediction_result = {"score": 0.95, "features_used": list(features.keys())} # 模拟结果

        return {"user_id": user_id, "prediction": prediction_result}
    except grpc.RpcError as e:
        # 将 gRPC 错误转换为合适的 HTTP 错误
        if e.code() == grpc.StatusCode.NOT_FOUND:
            raise HTTPException(status_code=404, detail=f"User {user_id} not found in downstream service.")
        else:
            raise HTTPException(status_code=503, detail=f"Feature service unavailable: {e.details()}")

FastAPI 的单元测试

现在,最关键的部分来了:如何测试 /predict 端点,而不实际调用 Go gRPC 服务?我们可以利用 FastAPI 的依赖注入覆盖(Dependency Injection Overrides)和 unittest.mock

# tests/test_main.py
from fastapi.testclient import TestClient
from unittest.mock import MagicMock
import pytest
import grpc

from ml_service.main import app, get_feature_client
from ml_service.feature_client import FeatureClient

# 创建 TestClient 实例
client = TestClient(app)

# 创建一个 mock FeatureClient
mock_feature_client = MagicMock(spec=FeatureClient)

# 定义一个覆盖 get_feature_client 依赖的函数
def override_get_feature_client():
    return mock_feature_client

# 在测试期间应用这个覆盖
app.dependency_overrides[get_feature_client] = override_get_feature_client

def test_predict_success():
    # 1. 准备 mock:当 get_features 被调用时,返回预设的数据
    user_id = "test-user"
    expected_features = {"age": 30, "city": "New York"}
    mock_feature_client.get_features.return_value = expected_features

    # 2. 发起请求
    response = client.post(f"/predict?user_id={user_id}")

    # 3. 断言
    assert response.status_code == 200
    data = response.json()
    assert data["user_id"] == user_id
    assert "prediction" in data
    # 验证 mock client 是否被正确调用
    mock_feature_client.get_features.assert_called_once_with(user_id)
    # 重置 mock,以便用于下一个测试
    mock_feature_client.reset_mock()

def test_predict_feature_service_unavailable():
    user_id = "error-user"
    # 模拟 gRPC 调用失败
    # 创建一个模拟的 RpcError
    mock_grpc_error = grpc.RpcError()
    mock_grpc_error.code = lambda: grpc.StatusCode.UNAVAILABLE
    mock_grpc_error.details = lambda: "connection refused"
    
    mock_feature_client.get_features.side_effect = mock_grpc_error

    response = client.post(f"/predict?user_id={user_id}")
    
    # 我们期望 FastAPI 能捕获异常并返回 503
    assert response.status_code == 503
    assert "Feature service unavailable" in response.json()["detail"]
    mock_feature_client.get_features.assert_called_once_with(user_id)
    mock_feature_client.reset_mock()

通过这种方式,我们为 FastAPI 端点编写了快速、可靠的单元测试。测试覆盖了业务成功路径和对下游服务故障的容错处理,完全不需要一个运行中的 Go 服务或网络连接。

架构的全景图

通过 Mermaid.js,我们可以清晰地看到整个请求流程和测试边界。

请求时序图:

sequenceDiagram
    participant Client
    participant FastAPI as FastAPI Service
    participant GoAdapter as Go gRPC Adapter
    participant JavaLegacy as Java/MyBatis Service

    Client->>+FastAPI: POST /predict?user_id=...
    FastAPI->>+GoAdapter: gRPC GetUserFeatures(user_id)
    GoAdapter->>+JavaLegacy: HTTP GET /api/v1/user-features?userId=...
    JavaLegacy-->>-GoAdapter: HTTP 200 OK (JSON features)
    GoAdapter-->>-FastAPI: gRPC Response (Protobuf features)
    Note right of FastAPI: Run Model Inference
    FastAPI-->>-Client: HTTP 200 OK (Prediction)

组件与测试边界:

graph TD
    subgraph Python_ML_Service
        A[FastAPI Endpoint] --> B{FeatureClient Interface}
    end
    
    subgraph Go_Adapter
        C[gRPC Server Impl] --> D{LegacyServiceClient Interface}
        D --> E[HTTP Client to Legacy]
    end
    
    subgraph Legacy_Java_Service
        F[REST Controller] --> G[MyBatis DAL]
    end

    A -- gRPC Call --> C
    E -- HTTP Call --> F

    subgraph Unit_Test_Boundaries
        T1(Pytest) -- Mocks --> B
        T2(Go Test) -- Mocks --> D
    end

    style T1 fill:#cde,stroke:#333,stroke-width:2px
    style T2 fill:#cde,stroke:#333,stroke-width:2px

当前方案的局限与未来演进

这个基于 Go 和 gRPC 的适配器架构并非银弹。它成功地解决了眼前的性能、可靠性和可测试性问题,但引入了一个新的独立部署单元,增加了运维的复杂度。

当前最大的瓶颈仍然是 Go 适配器对 Java 服务的同步 HTTP 调用。如果 Java 服务出现长时间的延迟,Go 适配器虽然能通过超时机制保护自身,但最终还是会将失败传递给 FastAPI 服务。

一个更彻底的优化路径是走向事件驱动。我们可以改造 Java 服务,让它在用户画像数据发生变更时,通过 MyBatis 的拦截器或业务层逻辑,将变更事件推送到 Kafka 这样的消息队列中。Go 适配器则从一个同步请求/响应的服务,转变为一个事件消费者。它会订阅 Kafka topic,将数据近实时地写入一个高性能的缓存(如 Redis)中。这样,当 FastAPI 服务请求特征时,Go 服务可以直接从 Redis 读取,响应时间可以降至毫秒级,并且与 Java 服务的运行时状态完全解耦。这无疑是一个更复杂的架构,但它为未来的可扩展性和韧性奠定了更坚实的基础。


  目录