定义挑战:实时与批处理的二元困境
在构建现代机器学习平台时,一个核心矛盾始终存在:在线推理服务要求毫秒级的特征访问延迟,而模型训练则需要对海量历史数据进行高效的批处理。这就催生了对一个混合系统的需求,它既能处理高吞吐量的实时事件流,又能将这些数据可靠、经济地归档到数据湖中,以供后续分析和训练。
传统的解决方案往往是技术栈的拼凑。例如,使用 Kafka 或 Kinesis 作为事件总线,通过 Flink 或 Spark Streaming 进行实时处理,再利用 Kinesis Firehose 或自定义的连接器将数据转储到 AWS S3。这个方案虽然成熟,但在真实项目中,其运维复杂性、成本和端到端延迟往往成为瓶颈。Kafka 集群的维护成本不菲,而 Flink/Spark 的资源消耗和调优难度对许多团队来说也是一个沉重的负担。
我们需要一个更轻量、运维更友好,且在性能上不妥协的架构。
方案对比与选型决策
方案 A:经典的 Kafka + Flink/Spark + S3 组合
- 优势:
- 生态系统极其成熟,社区支持广泛。
- 能够处理超大规模的数据,久经考验。
- Flink 提供了强大的状态管理和精确一次(Exactly-Once)语义支持。
- 劣势:
- 运维复杂度: 需要分别维护 Kafka、Zookeeper(或 Kraft 模式)、Flink/Spark 集群,每个组件都有自己的一套监控和调优体系。
- 资源开销: 整个体系是资源密集型的,尤其是在中等规模负载下,资源利用率可能不高,导致成本浪费。
- 延迟链路: 数据从生产者到 Kafka,再到 Flink,最后到 S3,整个链路相对较长,对于需要近实时归档的场景,延迟控制更具挑战。
方案 B:以 NATS JetStream 为核心的简化架构
- 优势:
- 极简运维: NATS 本身是一个单一的 Go 二进制文件,部署和集群配置极其简单。JetStream 内置了持久化、KV 存储和对象存储,无需引入外部依赖。
- 高性能: NATS 在设计上追求极致的低延迟和高吞吐量。其客户端-服务器模型相比 Kafka 的拉取模型,在某些场景下能提供更低的端到端延迟。
- 统一平台: JetStream 的 Stream、KV Store 和 Object Store 功能可以在一个统一的平台上同时满足消息队列、实时状态存储和数据块存储的需求,极大地简化了技术栈。
- 劣势:
- 生态系统相对年轻: 虽然 NATS 历史悠久,但 JetStream 的生态和周边工具(如连接器)相较于 Kafka 还不算丰富。
- 流处理能力: NATS 本身不提供像 Flink 那样复杂的流计算框架,复杂的窗口操作或状态计算需要自行在消费端实现。
最终决策
对于我们的目标——构建一个平衡了性能、成本和运维复杂度的特征管道——方案 B 展现出更大的吸引力。我们需要的不是一个重量级的流计算平台,而是一个高效的“数据路由器”:将实时数据快速分发给在线服务,同时可靠地批量归档到 S3 数据湖。NATS JetStream 的轻量化和高性能特性恰好契合这一需求。我们可以构建一个简单的消费者服务,实现微批处理逻辑,将数据高效写入 S3,从而避免引入 Flink/Spark 的复杂性。
核心架构实现概览
我们将构建一个系统,包含数据采集、实时处理与归档、在线特征服务、以及一个管理仪表盘。
graph TD
subgraph 数据源
A[Web/App Clicks]
B[IoT Devices]
end
subgraph NATS JetStream Cluster
C(Stream: raw_events)
D(KV Store: latest_features)
end
subgraph AWS
F[S3 Data Lake Bucket]
G[AWS Glue Catalog]
H[Amazon Athena]
end
subgraph EKS Cluster
I[Ingestion Service] -- gRPC/HTTP --> E(NATS Producer)
J(Stream Processor) -- Jib Container --> J_container
K(Feature API) -- Jib Container --> K_container
L(Admin Dashboard) -- Chakra UI --> L_ui
end
A --> I
B --> I
E -- Publishes --> C
J_container -- Subscribes --> C
J_container -- Micro-batch write (Parquet) --> F
J_container -- Updates --> D
K_container -- Reads from --> D
L_ui -- API Calls --> K
L_ui -- Displays Metrics --> L_ui
F -- Crawled by --> G
H -- Queries --> G
linkStyle 8 stroke:#ff7f0e,stroke-width:2px;
linkStyle 9 stroke:#ff7f0e,stroke-width:2px;
- Ingestion Service: 接收外部数据,将其格式化并发布到 NATS JetStream 的
raw_events流中。 - Stream Processor: 一个用 Java 编写的消费者服务。它订阅
raw_events流,执行两个任务:- 将最新的特征值更新到 NATS KV Store (
latest_features) 中。 - 在内存中进行微批处理,定期将一批数据以 Parquet 格式写入 AWS S3 Data Lake。
- 将最新的特征值更新到 NATS KV Store (
- Feature API: 提供低延迟的 RESTful API,用于在线模型推理。它直接从 NATS KV Store 查询最新的特征值。
- Admin Dashboard: 一个基于 React 和 Chakra UI 的前端应用,用于监控流处理状态、数据湖文件生成情况,并提供一些简单的管理功能。
- Jib: 用于将 Java 服务(Stream Processor 和 Feature API)构建成轻量级、无 Docker 守护进程的容器镜像,以便部署到 EKS。
关键代码与实现解析
1. NATS JetStream Stream 配置
首先,我们需要定义 raw_events 流。在生产环境中,这通常通过 Terraform 或 Ansible 完成,但这里我们使用 NATS CLI 来展示其配置。
# nats stream add raw_events --subjects "events.>" --ack --max-msgs=-1 --max-bytes=-1 --storage file --retention limits --discard old --dupe-window 2m --replicas 3
Stream raw_events was created
Information for Stream raw_events created 2023-10-27T11:20:00Z
Configuration:
Subjects: events.>
Acknowledgements: true
Retention: Limits
Replicas: 3
Storage: File
Options:
Max Age: 0s
Max Msgs: -1
Max Bytes: -1
Max Msg Size: -1
Duplicates: 2m0s
-
--subjects "events.>": 捕获所有以events.开头的消息。 -
--storage file: 将数据持久化到磁盘。 -
--retention limits: 默认策略,不限制消息数量和大小。 -
--replicas 3: 在一个3节点的 NATS 集群中,数据会保存3份,保证高可用。
2. Stream Processor: 消费、聚合与写入 S3
这是系统的核心。我们使用 Java 和官方 NATS 客户端,结合 AWS SDK V2。该服务必须是健壮的,能处理背压和写入失败。
pom.xml 依赖与 Jib 配置:
<dependencies>
<!-- NATS JetStream Client -->
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.16.10</version>
</dependency>
<!-- AWS S3 SDK V2 -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.20.150</version>
</dependency>
<!-- Parquet Writer -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.6</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>3.3.2</version>
<configuration>
<from>
<image>amazoncorretto:17-alpine-jdk</image>
</from>
<to>
<!-- Replace with your ECR repository -->
<image>123456789012.dkr.ecr.us-east-1.amazonaws.com/feature-stream-processor</image>
<tags>
<tag>${project.version}</tag>
<tag>latest</tag>
</tags>
</to>
<container>
<mainClass>com.myapp.processor.StreamProcessor</mainClass>
</container>
</configuration>
</plugin>
</plugins>
</build>
Jib 的配置非常简洁。它直接从 Maven 构建中分析依赖,分层构建镜像,并推送到指定的容器仓库,整个过程无需本地安装 Docker。这对于 CI/CD 流程来说是一个巨大的效率提升。
核心处理逻辑 StreamProcessor.java:
import io.nats.client.*;
import io.nats.client.api.KeyValue;
import io.nats.client.api.KeyValueConfiguration;
import io.nats.client.api.StorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class StreamProcessor {
private static final Logger logger = LoggerFactory.getLogger(StreamProcessor.class);
private static final String NATS_URL = System.getenv().getOrDefault("NATS_URL", "nats://localhost:4222");
private static final String S3_BUCKET = System.getenv("S3_BUCKET");
private static final String STREAM_NAME = "raw_events";
private static final String SUBJECT = "events.>";
private static final String KV_BUCKET_NAME = "latest_features";
private static final int BATCH_SIZE = 1000;
private static final long BATCH_INTERVAL_MS = 5000;
private final List<Message> messageBuffer = new ArrayList<>(BATCH_SIZE);
private final AtomicBoolean running = new AtomicBoolean(true);
private final S3Client s3Client;
private final ScheduledExecutorService scheduler;
public StreamProcessor() {
this.s3Client = S3Client.builder().build(); // Assumes default credential chain
this.scheduler = Executors.newSingleThreadScheduledExecutor();
}
public void run() throws Exception {
Options options = new Options.Builder()
.server(NATS_URL)
.connectionListener((conn, type) -> logger.info("NATS connection event: " + type))
.errorListener(new ErrorListener() {}) // Add proper error handling
.build();
try (Connection nc = Nats.connect(options)) {
JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = nc.jetStream();
// Ensure KV store exists
KeyValueConfiguration kvConfig = KeyValueConfiguration.builder()
.name(KV_BUCKET_NAME)
.storageType(StorageType.File)
.replicas(3)
.build();
jsm.addKeyValue(kvConfig);
KeyValue kv = js.keyValue(KV_BUCKET_NAME);
// Setup timed flusher for the buffer
scheduler.scheduleAtFixedRate(this::flushBufferToS3, BATCH_INTERVAL_MS, BATCH_INTERVAL_MS, TimeUnit.MILLISECONDS);
PushSubscribeOptions so = PushSubscribeOptions.builder()
.durable("processor-s3-bridge")
.build();
Dispatcher dispatcher = nc.createDispatcher(msg -> {});
dispatcher.subscribe(SUBJECT, "processor-queue", msg -> {
try {
// 1. Update KV store for real-time access
// In a real app, you'd parse the message and extract key/value
String key = extractFeatureKey(msg); // Placeholder
kv.put(key, msg.getData());
logger.debug("Updated KV for key: {}", key);
// 2. Add to S3 batch
synchronized (messageBuffer) {
messageBuffer.add(msg);
if (messageBuffer.size() >= BATCH_SIZE) {
flushBufferToS3();
}
}
// Acknowledge message only after critical processing
msg.ack();
} catch (Exception e) {
logger.error("Error processing message: " + new String(msg.getData()), e);
msg.nak(); // Tell NATS to redeliver
}
}, false, so);
logger.info("Listening on subject: {}", SUBJECT);
while (running.get()) {
Thread.sleep(1000);
}
} finally {
scheduler.shutdown();
}
}
private synchronized void flushBufferToS3() {
List<Message> batch;
synchronized (messageBuffer) {
if (messageBuffer.isEmpty()) {
return;
}
batch = new ArrayList<>(messageBuffer);
messageBuffer.clear();
}
logger.info("Flushing batch of {} messages to S3", batch.size());
try {
// In a real implementation, use a ParquetWriter here
// For simplicity, we'll just concat the messages.
StringBuilder sb = new StringBuilder();
for (Message m : batch) {
sb.append(new String(m.getData(), StandardCharsets.UTF_8)).append("\n");
}
byte[] data = sb.toString().getBytes(StandardCharsets.UTF_8);
String key = String.format("raw_events/year=%d/month=%d/day=%d/%s.json",
Instant.now().atZone(java.time.ZoneOffset.UTC).getYear(),
Instant.now().atZone(java.time.ZoneOffset.UTC).getMonthValue(),
Instant.now().atZone(java.time.ZoneOffset.UTC).getDayOfMonth(),
UUID.randomUUID());
PutObjectRequest request = PutObjectRequest.builder()
.bucket(S3_BUCKET)
.key(key)
.build();
s3Client.putObject(request, RequestBody.fromBytes(data));
logger.info("Successfully wrote batch to S3 object: {}", key);
} catch (Exception e) {
logger.error("Failed to write batch to S3. Messages in this batch might be lost.", e);
// A robust implementation would have a dead-letter-queue or retry mechanism here.
// For now, these messages are lost upon service restart.
}
}
private String extractFeatureKey(Message msg) {
// Dummy implementation. Parse JSON and get a unique ID.
// e.g., using Jackson: new ObjectMapper().readTree(msg.getData()).get("userId").asText();
return "user_" + (Math.random() * 1000);
}
public static void main(String[] args) throws Exception {
if (System.getenv("S3_BUCKET") == null) {
logger.error("S3_BUCKET environment variable must be set.");
System.exit(1);
}
new StreamProcessor().run();
}
}
- 错误处理:
msg.ack()表示成功处理,msg.nak()表示处理失败,JetStream 会在稍后重新投递该消息。这是保证至少一次(at-least-once)语义的关键。 - 微批处理: 通过一个
ScheduledExecutorService和一个大小阈值 (BATCH_SIZE),我们实现了简单的微批处理。这极大地提高了 S3 的写入效率,避免了为每条消息都执行一次PutObject操作,从而降低了 API 调用成本和请求开销。 - S3 Key 结构: 写入 S3 的路径 (
year=.../month=...) 遵循了 Hive 分区格式。这对于后续使用 AWS Glue Crawler 和 Amazon Athena 进行高效查询至关重要。
3. Admin Dashboard: Chakra UI & React
前端仪表盘的核心是提供可观测性。我们将创建一个简单的组件来显示 NATS 流的状态。
StreamStats.js 组件:
import React, { useState, useEffect } from 'react';
import {
Box,
Heading,
Stat,
StatLabel,
StatNumber,
StatHelpText,
SimpleGrid,
Spinner,
Alert,
AlertIcon,
} from '@chakra-ui/react';
// In a real app, this would be fetched from a backend API
// which in turn queries the NATS monitoring endpoints or a Prometheus exporter.
const fetchStreamStats = async () => {
// Mock API call
return new Promise((resolve) => {
setTimeout(() => {
resolve({
stream_name: 'raw_events',
messages: 1054328,
bytes: 210865600, // approx 210 MB
first_seq: 1,
last_seq: 1054328,
consumer_count: 2,
state: {
'last_active': '2023-10-27T12:00:05.123Z',
}
});
}, 1000);
});
};
export const StreamStats = () => {
const [stats, setStats] = useState(null);
const [error, setError] = useState(null);
const [loading, setLoading] = useState(true);
useEffect(() => {
const getStats = async () => {
try {
setLoading(true);
const data = await fetchStreamStats();
setStats(data);
setError(null);
} catch (err) {
setError('Failed to fetch stream statistics.');
console.error(err);
} finally {
setLoading(false);
}
};
getStats();
const interval = setInterval(getStats, 5000); // Poll every 5 seconds
return () => clearInterval(interval);
}, []);
if (loading && !stats) {
return (
<Box p={5} shadow="md" borderWidth="1px" borderRadius="md">
<Spinner />
</Box>
);
}
if (error) {
return (
<Alert status="error">
<AlertIcon />
{error}
</Alert>
);
}
return (
<Box p={5} shadow="md" borderWidth="1px" borderRadius="md">
<Heading size="md" mb={4}>
Stream: {stats.stream_name}
</Heading>
<SimpleGrid columns={{ base: 2, md: 4 }} spacing={5}>
<Stat>
<StatLabel>Total Messages</StatLabel>
<StatNumber>{stats.messages.toLocaleString()}</StatNumber>
<StatHelpText>Last Seq: {stats.last_seq}</StatHelpText>
</Stat>
<Stat>
<StatLabel>Stream Size</StatLabel>
<StatNumber>{(stats.bytes / 1024 / 1024).toFixed(2)} MB</StatNumber>
</Stat>
<Stat>
<StatLabel>Consumers</StatLabel>
<StatNumber>{stats.consumer_count}</StatNumber>
</Stat>
<Stat>
<StatLabel>Last Active</StatLabel>
<StatNumber>
{new Date(stats.state.last_active).toLocaleTimeString()}
</StatNumber>
</Stat>
</SimpleGrid>
</Box>
);
};
Chakra UI 的组合式组件 (Box, Stat, SimpleGrid) 使得构建这样一个信息密集但布局清晰的仪表盘变得非常高效。代码可读性强,并且天然支持响应式布局。
架构的扩展性与局限性
当前这套架构在处理每日百亿级别事件、TB 级数据归档的场景下表现出色,且运维成本远低于传统的 Kafka/Flink 方案。但它并非银弹。
局限性:
- 复杂流处理: 如前所述,该架构不适合需要复杂状态管理、窗口计算或事件时间处理的场景。如果业务需求演变为需要进行实时会话分析或滑动窗口聚合,引入 Flink 或其他专业流处理引擎仍然是必要的。届时,NATS 可以作为 Flink 的一个高效数据源。
- 数据回溯与重处理: 虽然 JetStream 支持按时间或序列号重放消息,但如果需要对 S3 数据湖中数月的数据进行大规模重处理,启动一个临时的 Spark 或 EMR 集群直接读取 S3 效率更高。我们的管道主要优化的是“写入”路径。
- KV 存储容量: NATS KV Store 性能极高,但它是为存储热数据或最新状态设计的,不适合作为大规模的全量特征存储。如果在线特征集非常庞大(例如,数亿用户 * 数百个特征),则应将 Feature API 的后端切换为更专业的存储,如 DynamoDB 或 Redis Enterprise。
未来迭代路径:
- 引入 Schema Registry: 为事件流引入 Avro 或 Protobuf,并结合 Schema Registry,可以保证数据质量和向后兼容性,这在多团队协作中至关重要。
- 自动化分区管理: 当前 S3 分区是基于处理时间的。对于需要基于事件时间进行分析的场景,Stream Processor 需要更复杂的逻辑来处理延迟数据,并可能需要动态创建分区。
- 服务网格集成: 在 EKS 集群中引入 Linkerd 或 Istio,可以为我们的 Java 服务提供开箱即用的 mTLS、重试、超时和更精细的可观测性,而无需在应用代码中实现。