我们面临一个典型的现代企业困境。基于 Kubeflow 和 FastAPI 的新 MLOps 平台已经能稳定地产出模型并提供在线预测服务,但模型效果依赖于一份关键的实时用户画像数据。这份数据的唯一权威来源(Source of Truth)是一个运行了近十年的 Java 单体应用,其数据访问层深度绑定了 MyBatis。直接连接它的数据库是被严令禁止的,而它暴露的旧有 RESTful API 性能孱弱、响应时间不稳定,且缺乏明确的 API 契约,导致我们的 Python 服务在集成时痛苦不堪。
初步的方案是直接在 FastAPI 服务中通过 requests 库调用这个旧 API,但这很快就暴露了问题:
- 性能瓶颈: Java 服务的 P95 响应时间在高峰期能达到 200ms,加上网络开销和 FastAPI 自身的处理,整个预测请求的延迟无法接受。
- 可靠性差: 每次 Java 服务部署,API 字段的细微变更都可能导致我们的 Python 服务解析失败,这种脆弱的集成方式在生产环境中就是定时炸弹。
- 测试黑洞: 对 FastAPI 的单元测试几乎无法进行。我们不得不依赖端到端的集成测试,但这既缓慢又不稳定,严重拖慢了迭代速度。
我们需要一个中间层。一个高性能、强类型、且易于测试的适配器,它能作为新世界(Python/MLOps)和旧世界(Java/MyBatis)之间的桥梁。技术选型很快锁定在 Go 和 gRPC 上。Go 的并发模型和性能非常适合网络 I/O 密集型应用,而 gRPC 的 Protobuf 则提供了我们梦寐以求的强类型契约。
这个适配器的核心职责是:接收来自 FastAPI 服务的 gRPC 请求,将其转换为对旧 Java 服务的 REST 调用,处理响应,然后通过 gRPC 返回给调用方。整个过程必须是高可用的,并且,最重要的是,它的每一个环节都必须具备隔离进行单元测试的能力。
定义不可动摇的契约:Protobuf
一切工作的起点是定义服务间的契约。这份 .proto 文件是整个异构系统的“宪法”,它规定了 FastAPI 服务和 Go 适配器之间通信的数据结构和方法。
// proto/feature/v1/feature_service.proto
syntax = "proto3";
package feature.v1;
option go_package = "github.com/your-org/feature-adapter/gen/go/feature/v1;featurev1";
// FeatureService 定义了获取用户画像的服务
service FeatureService {
// GetUserFeatures 获取单个用户的实时画像特征
rpc GetUserFeatures(GetUserFeaturesRequest) returns (GetUserFeaturesResponse);
}
message GetUserFeaturesRequest {
string user_id = 1;
}
message GetUserFeaturesResponse {
string user_id = 1;
map<string, FeatureValue> features = 2;
}
message FeatureValue {
oneof value {
string string_value = 1;
int64 int_value = 2;
double double_value = 3;
bool bool_value = 4;
}
}
这份契约非常明确:GetUserFeatures 方法接收一个包含 user_id 的请求,返回一个包含 user_id 和一个特征映射 map<string, FeatureValue> 的响应。FeatureValue 使用 oneof 来支持多种数据类型。这份契约一旦确立,我们就可以为 Go 和 Python 生成相应的服务端和客户端代码,这是后续所有开发和测试的基础。
Go 适配器的实现与可测试性设计
Go 适配器是整个架构的核心。它的设计必须把可测试性放在首位。这意味着要彻底避免在业务逻辑中直接实例化具体的依赖,比如 HTTP 客户端。所有外部依赖都必须通过接口(Interface)注入。
项目结构
一个合理的项目结构是保证可维护性的第一步。
feature-adapter/
├── cmd/
│ └── server/
│ └── main.go // 程序入口
├── internal/
│ ├── adapter/
│ │ └── legacy_client.go // 调用遗留 Java 服务的 HTTP 客户端
│ ├── config/
│ │ └── config.go // 配置加载
│ ├── server/
│ │ └── grpc.go // gRPC 服务实现
│ └── service/
│ └── feature.go // 业务逻辑核心
├── gen/
│ └── go/ // protoc 生成的 Go 代码
└── proto/
└── ...
与遗留系统的交互层
这里的关键是定义一个接口来抽象对 Java 服务的 HTTP 调用。
internal/adapter/legacy_client.go:
package adapter
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
// LegacyFeature represents the structure of the data from the old Java service.
// 注意,这里的字段名可能与我们的 gRPC 契约不同,这就是适配器存在的意义。
type LegacyFeature struct {
UID string `json:"uid"`
ProfileValue interface{} `json:"profileValue"`
ProfileType string `json:"profileType"`
FeatureName string `json:"featureName"`
}
// LegacyServiceClient defines the interface for interacting with the legacy system.
// 这是可测试性设计的核心:定义一个接口。
type LegacyServiceClient interface {
FetchUserFeatures(ctx context.Context, userID string) ([]LegacyFeature, error)
}
// httpLegacyClient implements the LegacyServiceClient with a real HTTP client.
type httpLegacyClient struct {
client *http.Client
baseURL string
}
// NewHTTPLegacyClient creates a new client for the legacy service.
func NewHTTPLegacyClient(baseURL string, timeout time.Duration) LegacyServiceClient {
return &httpLegacyClient{
client: &http.Client{
Timeout: timeout,
},
baseURL: baseURL,
}
}
// FetchUserFeatures performs the actual HTTP call to the Java/MyBatis service.
func (c *httpLegacyClient) FetchUserFeatures(ctx context.Context, userID string) ([]LegacyFeature, error) {
// 在真实项目中,URL 和请求构建会更复杂。
url := fmt.Sprintf("%s/api/v1/user-features?userId=%s", c.baseURL, userID)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
// 这里的日志记录至关重要
return nil, fmt.Errorf("failed to create request for user %s: %w", userID, err)
}
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("http call to legacy service failed for user %s: %w", userID, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
// 对非 200 状态码的健壮处理
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("legacy service returned status %d for user %s: %s", resp.StatusCode, userID, string(bodyBytes))
}
var features []LegacyFeature
if err := json.NewDecoder(resp.Body).Decode(&features); err != nil {
return nil, fmt.Errorf("failed to decode legacy service response for user %s: %w", userID, err)
}
return features, nil
}
通过定义 LegacyServiceClient 接口,我们在单元测试中就可以用一个 mock 对象来替换掉 httpLegacyClient,从而完全隔离对外部 HTTP 的依赖。
核心业务逻辑
业务逻辑层 (FeatureService) 接收 LegacyServiceClient 作为依赖,负责编排调用、数据转换和错误处理。
internal/service/feature.go:
package service
import (
"context"
"log/slog"
"github.com/your-org/feature-adapter/internal/adapter"
featurev1 "github.com/your-org/feature-adapter/gen/go/feature/v1"
)
// FeatureService implements the gRPC service logic.
// 它依赖于我们之前定义的接口,而不是具体实现。
type FeatureService struct {
legacyClient adapter.LegacyServiceClient
logger *slog.Logger
}
func NewFeatureService(legacyClient adapter.LegacyServiceClient, logger *slog.Logger) *FeatureService {
return &FeatureService{
legacyClient: legacyClient,
logger: logger,
}
}
// GetUserFeatures handles the gRPC request, calls the legacy service via the adapter,
// and transforms the data into the protobuf format.
func (s *FeatureService) GetUserFeatures(ctx context.Context, req *featurev1.GetUserFeaturesRequest) (*featurev1.GetUserFeaturesResponse, error) {
userID := req.GetUserId()
if userID == "" {
// 参数校验是服务的第一道防线
return nil, fmt.Errorf("user_id cannot be empty") // 在 gRPC 中会转换为 INVALID_ARGUMENT 状态码
}
s.logger.Info("Fetching features for user", "user_id", userID)
legacyFeatures, err := s.legacyClient.FetchUserFeatures(ctx, userID)
if err != nil {
s.logger.Error("Failed to fetch features from legacy service", "error", err, "user_id", userID)
// 错误需要被包装和向上传递
return nil, fmt.Errorf("internal error fetching features: %w", err) // 转换为 UNKNOWN 状态码
}
// 数据转换逻辑:从遗留系统的模型转换为 gRPC 契约模型
respFeatures := make(map[string]*featurev1.FeatureValue)
for _, f := range legacyFeatures {
fv := &featurev1.FeatureValue{}
switch v := f.ProfileValue.(type) {
case string:
fv.Value = &featurev1.FeatureValue_StringValue{StringValue: v}
case float64:
// JSON number unmarshals to float64 by default
// 这里需要根据 ProfileType 做更精细的判断
if f.ProfileType == "INTEGER" {
fv.Value = &featurev1.FeatureValue_IntValue{IntValue: int64(v)}
} else {
fv.Value = &featurev1.FeatureValue_DoubleValue{DoubleValue: v}
}
case bool:
fv.Value = &featurev1.FeatureValue_BoolValue{BoolValue: v}
default:
s.logger.Warn("Unsupported feature type from legacy system", "feature_name", f.FeatureName, "type", f.ProfileType)
continue // 跳过无法处理的类型
}
respFeatures[f.FeatureName] = fv
}
return &featurev1.GetUserFeaturesResponse{
UserId: userID,
Features: respFeatures,
}, nil
}
Go 适配器的单元测试
现在,我们可以为 FeatureService 编写一个完全隔离的单元测试。我们将使用 gomock 来生成 LegacyServiceClient 接口的 mock 实现。
首先,安装 mockgen 并生成 mock 文件:go install go.uber.org/mock/mockgen@latestmockgen -source=internal/adapter/legacy_client.go -destination=internal/adapter/mock_legacy_client_test.go -package=adapter
internal/service/feature_test.go:
package service
import (
"context"
"errors"
"log/slog"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/your-org/feature-adapter/internal/adapter"
featurev1 "github.com/your-org/feature-adapter/gen/go/feature/v1"
)
func TestFeatureService_GetUserFeatures(t *testing.T) {
// 创建一个 gomock 控制器
ctrl := gomock.NewController(t)
defer ctrl.Finish()
// 创建 mock client
mockLegacyClient := adapter.NewMockLegacyServiceClient(ctrl)
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
// 实例化被测试的服务,注入 mock client
featureSvc := NewFeatureService(mockLegacyClient, logger)
ctx := context.Background()
testUserID := "user-123"
t.Run("Success case", func(t *testing.T) {
// 1. 准备 mock 数据
mockResponse := []adapter.LegacyFeature{
{UID: testUserID, FeatureName: "age", ProfileValue: float64(30), ProfileType: "INTEGER"},
{UID: testUserID, FeatureName: "city", ProfileValue: "New York", ProfileType: "STRING"},
{UID: testUserID, FeatureName: "is_premium", ProfileValue: true, ProfileType: "BOOLEAN"},
}
// 2. 设定 mock 预期:当 FetchUserFeatures 以指定参数被调用时,返回我们准备好的数据
mockLegacyClient.EXPECT().
FetchUserFeatures(gomock.Any(), testUserID).
Return(mockResponse, nil).
Times(1) // 确保它只被调用一次
// 3. 执行被测试的方法
req := &featurev1.GetUserFeaturesRequest{UserId: testUserID}
resp, err := featureSvc.GetUserFeatures(ctx, req)
// 4. 断言结果
require.NoError(t, err)
require.NotNil(t, resp)
assert.Equal(t, testUserID, resp.UserId)
assert.Len(t, resp.Features, 3)
assert.Equal(t, int64(30), resp.Features["age"].GetIntValue())
assert.Equal(t, "New York", resp.Features["city"].GetStringValue())
assert.Equal(t, true, resp.Features["is_premium"].GetBoolValue())
})
t.Run("Legacy service returns error", func(t *testing.T) {
// 设定 mock 预期:当方法被调用时,返回一个错误
expectedErr := errors.New("network timeout")
mockLegacyClient.EXPECT().
FetchUserFeatures(gomock.Any(), testUserID).
Return(nil, expectedErr).
Times(1)
req := &featurev1.GetUserFeaturesRequest{UserId: testUserID}
resp, err := featureSvc.GetUserFeatures(ctx, req)
// 断言我们的服务正确地处理并向上传播了错误
assert.Nil(t, resp)
require.Error(t, err)
assert.Contains(t, err.Error(), "internal error fetching features")
assert.ErrorIs(t, err, expectedErr)
})
t.Run("Request with empty user ID", func(t *testing.T) {
req := &featurev1.GetUserFeaturesRequest{UserId: ""}
resp, err := featureSvc.GetUserFeatures(ctx, req)
assert.Nil(t, resp)
require.Error(t, err)
assert.Equal(t, "user_id cannot be empty", err.Error())
})
}
这个测试文件覆盖了成功、依赖服务失败和输入参数校验失败等核心场景。它运行速度极快,因为它不涉及任何网络或文件 I/O,可以在 CI/CD 流水线中瞬时完成,为我们提供了强大的信心。
FastAPI 服务的集成与单元测试
在 Python 端,我们首先需要根据 .proto 文件生成 gRPC 客户端代码。
python -m grpc_tools.protoc -I./proto --python_out=./generated --grpc_python_out=./generated ./proto/feature/v1/feature_service.proto
然后,在 FastAPI 应用中,我们需要一个服务层来封装对 Go 适配器的 gRPC 调用。同样,为了可测试性,这个 gRPC client 不应该被直接使用。
FastAPI 端的服务层封装
# ml_service/feature_client.py
import grpc
from generated.feature.v1 import feature_service_pb2
from generated.feature.v1 import feature_service_pb2_grpc
from typing import Dict, Any
class FeatureClient:
"""
一个封装了 gRPC 调用的客户端,使其更易于在 FastAPI 中使用和 mock。
"""
def __init__(self, grpc_channel):
self.stub = feature_service_pb2_grpc.FeatureServiceStub(grpc_channel)
def get_features(self, user_id: str) -> Dict[str, Any]:
try:
request = feature_service_pb2.GetUserFeaturesRequest(user_id=user_id)
response = self.stub.GetUserFeatures(request, timeout=0.5) # 设置超时
# 将 protobuf 的 map 转换为 Python dict
features = {}
for key, feature_value in response.features.items():
if feature_value.HasField("string_value"):
features[key] = feature_value.string_value
elif feature_value.HasField("int_value"):
features[key] = feature_value.int_value
# ... 其他类型
return features
except grpc.RpcError as e:
# 这里的日志和错误处理是生产级代码的关键
print(f"gRPC call failed for user {user_id}: {e.details()}")
# 可以根据 e.code() 决定是重试还是直接失败
raise # 重新抛出,让上层处理
# 在应用启动时创建单例
# feature_client = FeatureClient(grpc.insecure_channel("go-adapter:50051"))
FastAPI Endpoint 实现
在 FastAPI 端点中,我们通过依赖注入来获取 FeatureClient 的实例。
# ml_service/main.py
from fastapi import FastAPI, Depends, HTTPException
from .feature_client import FeatureClient
# 假设有一个模型服务
# from .model import predict
app = FastAPI()
# 依赖注入函数,这使得替换 mock client 变得简单
def get_feature_client():
# 在真实环境中,这里会返回连接到 Go 服务的真实 client
# 为了演示,我们暂时不实现它
# return feature_client
pass
@app.post("/predict")
async def predict_endpoint(user_id: str, client: FeatureClient = Depends(get_feature_client)):
try:
# 1. 获取特征
features = client.get_features(user_id)
# 2. 如果没有特征,可能代表用户不存在
if not features:
raise HTTPException(status_code=404, detail="User features not found")
# 3. 调用模型进行预测
# prediction_result = predict(features)
prediction_result = {"score": 0.95, "features_used": list(features.keys())} # 模拟结果
return {"user_id": user_id, "prediction": prediction_result}
except grpc.RpcError as e:
# 将 gRPC 错误转换为合适的 HTTP 错误
if e.code() == grpc.StatusCode.NOT_FOUND:
raise HTTPException(status_code=404, detail=f"User {user_id} not found in downstream service.")
else:
raise HTTPException(status_code=503, detail=f"Feature service unavailable: {e.details()}")
FastAPI 的单元测试
现在,最关键的部分来了:如何测试 /predict 端点,而不实际调用 Go gRPC 服务?我们可以利用 FastAPI 的依赖注入覆盖(Dependency Injection Overrides)和 unittest.mock。
# tests/test_main.py
from fastapi.testclient import TestClient
from unittest.mock import MagicMock
import pytest
import grpc
from ml_service.main import app, get_feature_client
from ml_service.feature_client import FeatureClient
# 创建 TestClient 实例
client = TestClient(app)
# 创建一个 mock FeatureClient
mock_feature_client = MagicMock(spec=FeatureClient)
# 定义一个覆盖 get_feature_client 依赖的函数
def override_get_feature_client():
return mock_feature_client
# 在测试期间应用这个覆盖
app.dependency_overrides[get_feature_client] = override_get_feature_client
def test_predict_success():
# 1. 准备 mock:当 get_features 被调用时,返回预设的数据
user_id = "test-user"
expected_features = {"age": 30, "city": "New York"}
mock_feature_client.get_features.return_value = expected_features
# 2. 发起请求
response = client.post(f"/predict?user_id={user_id}")
# 3. 断言
assert response.status_code == 200
data = response.json()
assert data["user_id"] == user_id
assert "prediction" in data
# 验证 mock client 是否被正确调用
mock_feature_client.get_features.assert_called_once_with(user_id)
# 重置 mock,以便用于下一个测试
mock_feature_client.reset_mock()
def test_predict_feature_service_unavailable():
user_id = "error-user"
# 模拟 gRPC 调用失败
# 创建一个模拟的 RpcError
mock_grpc_error = grpc.RpcError()
mock_grpc_error.code = lambda: grpc.StatusCode.UNAVAILABLE
mock_grpc_error.details = lambda: "connection refused"
mock_feature_client.get_features.side_effect = mock_grpc_error
response = client.post(f"/predict?user_id={user_id}")
# 我们期望 FastAPI 能捕获异常并返回 503
assert response.status_code == 503
assert "Feature service unavailable" in response.json()["detail"]
mock_feature_client.get_features.assert_called_once_with(user_id)
mock_feature_client.reset_mock()
通过这种方式,我们为 FastAPI 端点编写了快速、可靠的单元测试。测试覆盖了业务成功路径和对下游服务故障的容错处理,完全不需要一个运行中的 Go 服务或网络连接。
架构的全景图
通过 Mermaid.js,我们可以清晰地看到整个请求流程和测试边界。
请求时序图:
sequenceDiagram
participant Client
participant FastAPI as FastAPI Service
participant GoAdapter as Go gRPC Adapter
participant JavaLegacy as Java/MyBatis Service
Client->>+FastAPI: POST /predict?user_id=...
FastAPI->>+GoAdapter: gRPC GetUserFeatures(user_id)
GoAdapter->>+JavaLegacy: HTTP GET /api/v1/user-features?userId=...
JavaLegacy-->>-GoAdapter: HTTP 200 OK (JSON features)
GoAdapter-->>-FastAPI: gRPC Response (Protobuf features)
Note right of FastAPI: Run Model Inference
FastAPI-->>-Client: HTTP 200 OK (Prediction)
组件与测试边界:
graph TD
subgraph Python_ML_Service
A[FastAPI Endpoint] --> B{FeatureClient Interface}
end
subgraph Go_Adapter
C[gRPC Server Impl] --> D{LegacyServiceClient Interface}
D --> E[HTTP Client to Legacy]
end
subgraph Legacy_Java_Service
F[REST Controller] --> G[MyBatis DAL]
end
A -- gRPC Call --> C
E -- HTTP Call --> F
subgraph Unit_Test_Boundaries
T1(Pytest) -- Mocks --> B
T2(Go Test) -- Mocks --> D
end
style T1 fill:#cde,stroke:#333,stroke-width:2px
style T2 fill:#cde,stroke:#333,stroke-width:2px
当前方案的局限与未来演进
这个基于 Go 和 gRPC 的适配器架构并非银弹。它成功地解决了眼前的性能、可靠性和可测试性问题,但引入了一个新的独立部署单元,增加了运维的复杂度。
当前最大的瓶颈仍然是 Go 适配器对 Java 服务的同步 HTTP 调用。如果 Java 服务出现长时间的延迟,Go 适配器虽然能通过超时机制保护自身,但最终还是会将失败传递给 FastAPI 服务。
一个更彻底的优化路径是走向事件驱动。我们可以改造 Java 服务,让它在用户画像数据发生变更时,通过 MyBatis 的拦截器或业务层逻辑,将变更事件推送到 Kafka 这样的消息队列中。Go 适配器则从一个同步请求/响应的服务,转变为一个事件消费者。它会订阅 Kafka topic,将数据近实时地写入一个高性能的缓存(如 Redis)中。这样,当 FastAPI 服务请求特征时,Go 服务可以直接从 Redis 读取,响应时间可以降至毫秒级,并且与 Java 服务的运行时状态完全解耦。这无疑是一个更复杂的架构,但它为未来的可扩展性和韧性奠定了更坚实的基础。