构建基于 NATS JetStream 和 AWS S3 的混合式实时特征管道架构


定义挑战:实时与批处理的二元困境

在构建现代机器学习平台时,一个核心矛盾始终存在:在线推理服务要求毫秒级的特征访问延迟,而模型训练则需要对海量历史数据进行高效的批处理。这就催生了对一个混合系统的需求,它既能处理高吞吐量的实时事件流,又能将这些数据可靠、经济地归档到数据湖中,以供后续分析和训练。

传统的解决方案往往是技术栈的拼凑。例如,使用 Kafka 或 Kinesis 作为事件总线,通过 Flink 或 Spark Streaming 进行实时处理,再利用 Kinesis Firehose 或自定义的连接器将数据转储到 AWS S3。这个方案虽然成熟,但在真实项目中,其运维复杂性、成本和端到端延迟往往成为瓶颈。Kafka 集群的维护成本不菲,而 Flink/Spark 的资源消耗和调优难度对许多团队来说也是一个沉重的负担。

我们需要一个更轻量、运维更友好,且在性能上不妥协的架构。

方案对比与选型决策

  • 优势:
    • 生态系统极其成熟,社区支持广泛。
    • 能够处理超大规模的数据,久经考验。
    • Flink 提供了强大的状态管理和精确一次(Exactly-Once)语义支持。
  • 劣势:
    • 运维复杂度: 需要分别维护 Kafka、Zookeeper(或 Kraft 模式)、Flink/Spark 集群,每个组件都有自己的一套监控和调优体系。
    • 资源开销: 整个体系是资源密集型的,尤其是在中等规模负载下,资源利用率可能不高,导致成本浪费。
    • 延迟链路: 数据从生产者到 Kafka,再到 Flink,最后到 S3,整个链路相对较长,对于需要近实时归档的场景,延迟控制更具挑战。

方案 B:以 NATS JetStream 为核心的简化架构

  • 优势:
    • 极简运维: NATS 本身是一个单一的 Go 二进制文件,部署和集群配置极其简单。JetStream 内置了持久化、KV 存储和对象存储,无需引入外部依赖。
    • 高性能: NATS 在设计上追求极致的低延迟和高吞吐量。其客户端-服务器模型相比 Kafka 的拉取模型,在某些场景下能提供更低的端到端延迟。
    • 统一平台: JetStream 的 Stream、KV Store 和 Object Store 功能可以在一个统一的平台上同时满足消息队列、实时状态存储和数据块存储的需求,极大地简化了技术栈。
  • 劣势:
    • 生态系统相对年轻: 虽然 NATS 历史悠久,但 JetStream 的生态和周边工具(如连接器)相较于 Kafka 还不算丰富。
    • 流处理能力: NATS 本身不提供像 Flink 那样复杂的流计算框架,复杂的窗口操作或状态计算需要自行在消费端实现。

最终决策

对于我们的目标——构建一个平衡了性能、成本和运维复杂度的特征管道——方案 B 展现出更大的吸引力。我们需要的不是一个重量级的流计算平台,而是一个高效的“数据路由器”:将实时数据快速分发给在线服务,同时可靠地批量归档到 S3 数据湖。NATS JetStream 的轻量化和高性能特性恰好契合这一需求。我们可以构建一个简单的消费者服务,实现微批处理逻辑,将数据高效写入 S3,从而避免引入 Flink/Spark 的复杂性。

核心架构实现概览

我们将构建一个系统,包含数据采集、实时处理与归档、在线特征服务、以及一个管理仪表盘。

graph TD
    subgraph 数据源
        A[Web/App Clicks]
        B[IoT Devices]
    end

    subgraph NATS JetStream Cluster
        C(Stream: raw_events)
        D(KV Store: latest_features)
    end

    subgraph AWS
        F[S3 Data Lake Bucket]
        G[AWS Glue Catalog]
        H[Amazon Athena]
    end

    subgraph EKS Cluster
        I[Ingestion Service] -- gRPC/HTTP --> E(NATS Producer)
        J(Stream Processor) -- Jib Container --> J_container
        K(Feature API) -- Jib Container --> K_container
        L(Admin Dashboard) -- Chakra UI --> L_ui
    end
    
    A --> I
    B --> I
    E -- Publishes --> C
    
    J_container -- Subscribes --> C
    J_container -- Micro-batch write (Parquet) --> F
    J_container -- Updates --> D

    K_container -- Reads from --> D
    
    L_ui -- API Calls --> K
    L_ui -- Displays Metrics --> L_ui

    F -- Crawled by --> G
    H -- Queries --> G

    linkStyle 8 stroke:#ff7f0e,stroke-width:2px;
    linkStyle 9 stroke:#ff7f0e,stroke-width:2px;
  1. Ingestion Service: 接收外部数据,将其格式化并发布到 NATS JetStream 的 raw_events 流中。
  2. Stream Processor: 一个用 Java 编写的消费者服务。它订阅 raw_events 流,执行两个任务:
    • 将最新的特征值更新到 NATS KV Store (latest_features) 中。
    • 在内存中进行微批处理,定期将一批数据以 Parquet 格式写入 AWS S3 Data Lake。
  3. Feature API: 提供低延迟的 RESTful API,用于在线模型推理。它直接从 NATS KV Store 查询最新的特征值。
  4. Admin Dashboard: 一个基于 React 和 Chakra UI 的前端应用,用于监控流处理状态、数据湖文件生成情况,并提供一些简单的管理功能。
  5. Jib: 用于将 Java 服务(Stream Processor 和 Feature API)构建成轻量级、无 Docker 守护进程的容器镜像,以便部署到 EKS。

关键代码与实现解析

1. NATS JetStream Stream 配置

首先,我们需要定义 raw_events 流。在生产环境中,这通常通过 Terraform 或 Ansible 完成,但这里我们使用 NATS CLI 来展示其配置。

# nats stream add raw_events --subjects "events.>" --ack --max-msgs=-1 --max-bytes=-1 --storage file --retention limits --discard old --dupe-window 2m --replicas 3

Stream raw_events was created

Information for Stream raw_events created 2023-10-27T11:20:00Z

Configuration:

           Subjects: events.>
  Acknowledgements: true
         Retention: Limits
  Replicas: 3
   Storage: File

Options:

           Max Age: 0s
       Max Msgs: -1
     Max Bytes: -1
    Max Msg Size: -1
    Duplicates: 2m0s
  • --subjects "events.>" : 捕获所有以 events. 开头的消息。
  • --storage file: 将数据持久化到磁盘。
  • --retention limits: 默认策略,不限制消息数量和大小。
  • --replicas 3: 在一个3节点的 NATS 集群中,数据会保存3份,保证高可用。

2. Stream Processor: 消费、聚合与写入 S3

这是系统的核心。我们使用 Java 和官方 NATS 客户端,结合 AWS SDK V2。该服务必须是健壮的,能处理背压和写入失败。

pom.xml 依赖与 Jib 配置:

<dependencies>
    <!-- NATS JetStream Client -->
    <dependency>
        <groupId>io.nats</groupId>
        <artifactId>jnats</artifactId>
        <version>2.16.10</version>
    </dependency>
    <!-- AWS S3 SDK V2 -->
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>s3</artifactId>
        <version>2.20.150</version>
    </dependency>
    <!-- Parquet Writer -->
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-avro</artifactId>
        <version>1.13.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.6</version>
    </dependency>
    <!-- Logging -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.7</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>com.google.cloud.tools</groupId>
            <artifactId>jib-maven-plugin</artifactId>
            <version>3.3.2</version>
            <configuration>
                <from>
                    <image>amazoncorretto:17-alpine-jdk</image>
                </from>
                <to>
                    <!-- Replace with your ECR repository -->
                    <image>123456789012.dkr.ecr.us-east-1.amazonaws.com/feature-stream-processor</image>
                    <tags>
                        <tag>${project.version}</tag>
                        <tag>latest</tag>
                    </tags>
                </to>
                <container>
                    <mainClass>com.myapp.processor.StreamProcessor</mainClass>
                </container>
            </configuration>
        </plugin>
    </plugins>
</build>

Jib 的配置非常简洁。它直接从 Maven 构建中分析依赖,分层构建镜像,并推送到指定的容器仓库,整个过程无需本地安装 Docker。这对于 CI/CD 流程来说是一个巨大的效率提升。

核心处理逻辑 StreamProcessor.java:

import io.nats.client.*;
import io.nats.client.api.KeyValue;
import io.nats.client.api.KeyValueConfiguration;
import io.nats.client.api.StorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class StreamProcessor {
    private static final Logger logger = LoggerFactory.getLogger(StreamProcessor.class);

    private static final String NATS_URL = System.getenv().getOrDefault("NATS_URL", "nats://localhost:4222");
    private static final String S3_BUCKET = System.getenv("S3_BUCKET");
    private static final String STREAM_NAME = "raw_events";
    private static final String SUBJECT = "events.>";
    private static final String KV_BUCKET_NAME = "latest_features";
    private static final int BATCH_SIZE = 1000;
    private static final long BATCH_INTERVAL_MS = 5000;

    private final List<Message> messageBuffer = new ArrayList<>(BATCH_SIZE);
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final S3Client s3Client;
    private final ScheduledExecutorService scheduler;

    public StreamProcessor() {
        this.s3Client = S3Client.builder().build(); // Assumes default credential chain
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
    }

    public void run() throws Exception {
        Options options = new Options.Builder()
                .server(NATS_URL)
                .connectionListener((conn, type) -> logger.info("NATS connection event: " + type))
                .errorListener(new ErrorListener() {}) // Add proper error handling
                .build();

        try (Connection nc = Nats.connect(options)) {
            JetStreamManagement jsm = nc.jetStreamManagement();
            JetStream js = nc.jetStream();

            // Ensure KV store exists
            KeyValueConfiguration kvConfig = KeyValueConfiguration.builder()
                    .name(KV_BUCKET_NAME)
                    .storageType(StorageType.File)
                    .replicas(3)
                    .build();
            jsm.addKeyValue(kvConfig);
            KeyValue kv = js.keyValue(KV_BUCKET_NAME);

            // Setup timed flusher for the buffer
            scheduler.scheduleAtFixedRate(this::flushBufferToS3, BATCH_INTERVAL_MS, BATCH_INTERVAL_MS, TimeUnit.MILLISECONDS);

            PushSubscribeOptions so = PushSubscribeOptions.builder()
                    .durable("processor-s3-bridge")
                    .build();

            Dispatcher dispatcher = nc.createDispatcher(msg -> {});
            dispatcher.subscribe(SUBJECT, "processor-queue", msg -> {
                try {
                    // 1. Update KV store for real-time access
                    // In a real app, you'd parse the message and extract key/value
                    String key = extractFeatureKey(msg); // Placeholder
                    kv.put(key, msg.getData());
                    logger.debug("Updated KV for key: {}", key);

                    // 2. Add to S3 batch
                    synchronized (messageBuffer) {
                        messageBuffer.add(msg);
                        if (messageBuffer.size() >= BATCH_SIZE) {
                            flushBufferToS3();
                        }
                    }
                    
                    // Acknowledge message only after critical processing
                    msg.ack();

                } catch (Exception e) {
                    logger.error("Error processing message: " + new String(msg.getData()), e);
                    msg.nak(); // Tell NATS to redeliver
                }
            }, false, so);

            logger.info("Listening on subject: {}", SUBJECT);
            while (running.get()) {
                Thread.sleep(1000);
            }
        } finally {
            scheduler.shutdown();
        }
    }
    
    private synchronized void flushBufferToS3() {
        List<Message> batch;
        synchronized (messageBuffer) {
            if (messageBuffer.isEmpty()) {
                return;
            }
            batch = new ArrayList<>(messageBuffer);
            messageBuffer.clear();
        }

        logger.info("Flushing batch of {} messages to S3", batch.size());
        try {
            // In a real implementation, use a ParquetWriter here
            // For simplicity, we'll just concat the messages.
            StringBuilder sb = new StringBuilder();
            for (Message m : batch) {
                sb.append(new String(m.getData(), StandardCharsets.UTF_8)).append("\n");
            }
            byte[] data = sb.toString().getBytes(StandardCharsets.UTF_8);

            String key = String.format("raw_events/year=%d/month=%d/day=%d/%s.json",
                Instant.now().atZone(java.time.ZoneOffset.UTC).getYear(),
                Instant.now().atZone(java.time.ZoneOffset.UTC).getMonthValue(),
                Instant.now().atZone(java.time.ZoneOffset.UTC).getDayOfMonth(),
                UUID.randomUUID());
            
            PutObjectRequest request = PutObjectRequest.builder()
                .bucket(S3_BUCKET)
                .key(key)
                .build();
            
            s3Client.putObject(request, RequestBody.fromBytes(data));
            logger.info("Successfully wrote batch to S3 object: {}", key);

        } catch (Exception e) {
            logger.error("Failed to write batch to S3. Messages in this batch might be lost.", e);
            // A robust implementation would have a dead-letter-queue or retry mechanism here.
            // For now, these messages are lost upon service restart.
        }
    }

    private String extractFeatureKey(Message msg) {
        // Dummy implementation. Parse JSON and get a unique ID.
        // e.g., using Jackson: new ObjectMapper().readTree(msg.getData()).get("userId").asText();
        return "user_" + (Math.random() * 1000);
    }

    public static void main(String[] args) throws Exception {
        if (System.getenv("S3_BUCKET") == null) {
            logger.error("S3_BUCKET environment variable must be set.");
            System.exit(1);
        }
        new StreamProcessor().run();
    }
}
  • 错误处理: msg.ack() 表示成功处理,msg.nak() 表示处理失败,JetStream 会在稍后重新投递该消息。这是保证至少一次(at-least-once)语义的关键。
  • 微批处理: 通过一个 ScheduledExecutorService 和一个大小阈值 (BATCH_SIZE),我们实现了简单的微批处理。这极大地提高了 S3 的写入效率,避免了为每条消息都执行一次 PutObject 操作,从而降低了 API 调用成本和请求开销。
  • S3 Key 结构: 写入 S3 的路径 (year=.../month=...) 遵循了 Hive 分区格式。这对于后续使用 AWS Glue Crawler 和 Amazon Athena 进行高效查询至关重要。

3. Admin Dashboard: Chakra UI & React

前端仪表盘的核心是提供可观测性。我们将创建一个简单的组件来显示 NATS 流的状态。

StreamStats.js 组件:

import React, { useState, useEffect } from 'react';
import {
  Box,
  Heading,
  Stat,
  StatLabel,
  StatNumber,
  StatHelpText,
  SimpleGrid,
  Spinner,
  Alert,
  AlertIcon,
} from '@chakra-ui/react';

// In a real app, this would be fetched from a backend API
// which in turn queries the NATS monitoring endpoints or a Prometheus exporter.
const fetchStreamStats = async () => {
  // Mock API call
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve({
        stream_name: 'raw_events',
        messages: 1054328,
        bytes: 210865600, // approx 210 MB
        first_seq: 1,
        last_seq: 1054328,
        consumer_count: 2,
        state: {
          'last_active': '2023-10-27T12:00:05.123Z',
        }
      });
    }, 1000);
  });
};

export const StreamStats = () => {
  const [stats, setStats] = useState(null);
  const [error, setError] = useState(null);
  const [loading, setLoading] = useState(true);

  useEffect(() => {
    const getStats = async () => {
      try {
        setLoading(true);
        const data = await fetchStreamStats();
        setStats(data);
        setError(null);
      } catch (err) {
        setError('Failed to fetch stream statistics.');
        console.error(err);
      } finally {
        setLoading(false);
      }
    };

    getStats();
    const interval = setInterval(getStats, 5000); // Poll every 5 seconds
    return () => clearInterval(interval);
  }, []);

  if (loading && !stats) {
    return (
      <Box p={5} shadow="md" borderWidth="1px" borderRadius="md">
        <Spinner />
      </Box>
    );
  }

  if (error) {
    return (
      <Alert status="error">
        <AlertIcon />
        {error}
      </Alert>
    );
  }

  return (
    <Box p={5} shadow="md" borderWidth="1px" borderRadius="md">
      <Heading size="md" mb={4}>
        Stream: {stats.stream_name}
      </Heading>
      <SimpleGrid columns={{ base: 2, md: 4 }} spacing={5}>
        <Stat>
          <StatLabel>Total Messages</StatLabel>
          <StatNumber>{stats.messages.toLocaleString()}</StatNumber>
          <StatHelpText>Last Seq: {stats.last_seq}</StatHelpText>
        </Stat>
        <Stat>
          <StatLabel>Stream Size</StatLabel>
          <StatNumber>{(stats.bytes / 1024 / 1024).toFixed(2)} MB</StatNumber>
        </Stat>
        <Stat>
          <StatLabel>Consumers</StatLabel>
          <StatNumber>{stats.consumer_count}</StatNumber>
        </Stat>
        <Stat>
          <StatLabel>Last Active</StatLabel>
          <StatNumber>
            {new Date(stats.state.last_active).toLocaleTimeString()}
          </StatNumber>
        </Stat>
      </SimpleGrid>
    </Box>
  );
};

Chakra UI 的组合式组件 (Box, Stat, SimpleGrid) 使得构建这样一个信息密集但布局清晰的仪表盘变得非常高效。代码可读性强,并且天然支持响应式布局。

架构的扩展性与局限性

当前这套架构在处理每日百亿级别事件、TB 级数据归档的场景下表现出色,且运维成本远低于传统的 Kafka/Flink 方案。但它并非银弹。

局限性:

  1. 复杂流处理: 如前所述,该架构不适合需要复杂状态管理、窗口计算或事件时间处理的场景。如果业务需求演变为需要进行实时会话分析或滑动窗口聚合,引入 Flink 或其他专业流处理引擎仍然是必要的。届时,NATS 可以作为 Flink 的一个高效数据源。
  2. 数据回溯与重处理: 虽然 JetStream 支持按时间或序列号重放消息,但如果需要对 S3 数据湖中数月的数据进行大规模重处理,启动一个临时的 Spark 或 EMR 集群直接读取 S3 效率更高。我们的管道主要优化的是“写入”路径。
  3. KV 存储容量: NATS KV Store 性能极高,但它是为存储热数据或最新状态设计的,不适合作为大规模的全量特征存储。如果在线特征集非常庞大(例如,数亿用户 * 数百个特征),则应将 Feature API 的后端切换为更专业的存储,如 DynamoDB 或 Redis Enterprise。

未来迭代路径:

  • 引入 Schema Registry: 为事件流引入 Avro 或 Protobuf,并结合 Schema Registry,可以保证数据质量和向后兼容性,这在多团队协作中至关重要。
  • 自动化分区管理: 当前 S3 分区是基于处理时间的。对于需要基于事件时间进行分析的场景,Stream Processor 需要更复杂的逻辑来处理延迟数据,并可能需要动态创建分区。
  • 服务网格集成: 在 EKS 集群中引入 Linkerd 或 Istio,可以为我们的 Java 服务提供开箱即用的 mTLS、重试、超时和更精细的可观测性,而无需在应用代码中实现。

  目录