基于ZooKeeper与Saga模式构建高可用分布式事务协调器


在一个复杂的业务流程中,例如创建一个包含多重资源配置的大型营销活动,往往涉及对多个微服务的调用:活动服务、优惠券服务、用户通知服务、风控服务。这些操作必须作为一个原子单元,要么全部成功,要么全部回滚。传统的两阶段提交(2PC)因其同步阻塞和对数据库的强依赖性,在高性能、松耦合的微服务架构中几乎不被采用。这迫使我们必须寻找替代方案来保障数据最终一致性。

定义问题:耦合的业务与脆弱的一致性

我们的核心挑战是实现一个创建营销活动的操作。该操作包含三个步骤:

  1. 活动服务 (Campaign Service): 创建活动主体信息。
  2. 优惠券服务 (Coupon Service): 预生成一批与活动关联的优惠券码。
  3. 通知服务 (Notification Service): 向目标用户群体发送活动预热通知。

如果优惠券生成失败,已经创建的活动主体必须被回滚。如果通知发送失败(例如,触及了发送频率限制),活动和优惠券也应一并撤销。这种跨服务的事务需求是典型的分布式事务场景。

方案A:业务服务内嵌Saga编排逻辑

最直接的想法是在调用方(通常是活动服务)内部嵌入Saga模式的编排逻辑。通过一个状态机来依次调用其他服务,并记录每个步骤的状态。如果某个步骤失败,则反向调用之前所有成功步骤的补偿操作。

// 伪代码:在CampaignService中直接编排
public class CampaignService {

    @Autowired
    private CouponServiceClient couponClient;
    @Autowired
    private NotificationServiceClient notificationClient;
    @Autowired
    private CampaignRepository campaignRepo;

    @Transactional
    public void createCampaignWithEmbeddedSaga(Campaign campaign) {
        // 步骤1: 创建活动 (本地事务)
        Campaign savedCampaign = campaignRepo.save(campaign);
        long campaignId = savedCampaign.getId();
        
        try {
            // 步骤2: 调用优惠券服务
            CouponResponse couponResponse = couponClient.generateCoupons(campaignId);
            if (!couponResponse.isSuccess()) {
                throw new SagaStepFailedException("Coupon generation failed.");
            }
            
            // 步骤3: 调用通知服务
            NotificationResponse notificationResponse = notificationClient.sendNotifications(campaignId);
            if (!notificationResponse.isSuccess()) {
                throw new SagaStepFailedException("Notification sending failed.");
            }
            
        } catch (Exception e) {
            // 补偿逻辑
            compensate(campaignId);
            throw new CampaignCreationException("Failed to create campaign, rolling back.", e);
        }
    }

    private void compensate(long campaignId) {
        // 反向补偿:这里的顺序和错误处理很关键
        // 如果补偿操作也失败了呢?需要重试机制和持久化状态
        couponClient.cancelCoupons(campaignId);
        campaignRepo.deleteById(campaignId); // 补偿本地事务
    }
}

方案A的优劣分析:

  • 优点:

    • 实现简单直观,无需引入额外的中间件或服务。
    • 对于只有两三个步骤的简单流程,开发速度快。
  • 缺点:

    • 业务逻辑与事务逻辑强耦合: CampaignService 不仅要负责活动管理的核心业务,还要承担复杂的分布式事务协调职责,违反了单一职责原则。
    • 可靠性问题: 如果 CampaignService 在执行编排逻辑时自身发生宕机,整个事务的状态就会丢失,可能导致数据不一致(例如,优惠券已生成但活动未创建,且没有触发补偿)。
    • 可观测性差: 事务的执行状态分散在业务服务的日志中,难以集中监控和管理。当出现问题时,排查链路非常痛苦。
    • 缺乏水平扩展能力: 编排逻辑是有状态的。对 CampaignService 进行简单的水平扩展,无法解决单点故障问题。

在真实项目中,这种脆弱性是不可接受的。一次发布或一次节点故障,就可能造成大量悬挂事务和脏数据。

方案B:构建独立的、基于ZooKeeper的高可用Saga协调器

为了解决上述问题,我们将Saga的编排逻辑抽离出来,形成一个独立的、无业务逻辑的协调器服务(Saga Coordinator)。这个协调器的核心职责是根据预定义的Saga流程图(Saga Definition),驱动并追踪每个Saga实例(Saga Instance)的执行。

为了保证协调器自身的高可用,我们不能部署单个实例。部署多个实例会引入新的问题:哪个实例负责驱动某个特定的Saga实例?如果多个实例同时处理同一个Saga实例,会导致重复执行或状态冲突。

这就是ZooKeeper的用武之地。我们可以利用ZooKeeper的临时有序节点和Watch机制实现一个可靠的分布式锁,或者更优雅地实现领导者选举(Leader Election)。在我们的架构中,所有协调器实例启动后都会尝试在ZooKeeper上获取一个领导者“锁”。只有成功成为Leader的实例才负责接收新的Saga启动请求和驱动Saga状态机向前执行。其他Follower实例则处于热备状态,随时准备在Leader宕机后接管工作。

graph TD
    subgraph "高可用协调器集群 (Saga Coordinator Cluster)"
        C1(Coordinator 1 - Leader)
        C2(Coordinator 2 - Follower)
        C3(Coordinator 3 - Follower)
    end

    subgraph "ZooKeeper 集群"
        ZK1(ZK Node 1)
        ZK2(ZK Node 2)
        ZK3(ZK Node 3)
    end

    subgraph "业务服务 (Participant Services)"
        SvcA[活动服务
Action: createCampaign
Comp: deleteCampaign] SvcB[优惠券服务
Action: generateCoupons
Comp: cancelCoupons] SvcC[通知服务
Action: sendNotifications
Comp: recallNotifications] end API[API Gateway] --> BusinessSvc(发起方服务) BusinessSvc -->|1. Start Saga| C1 C1 -->|2. Lock /leader ZNode| ZK1 C2 -->|Watch /leader| ZK2 C3 -->|Watch /leader| ZK3 C1 -->|3. Execute Step 1| SvcA SvcA -->|4. Callback| C1 C1 -->|5. Execute Step 2| SvcB SvcB -->|6. Callback| C1 C1 -->|7. Execute Step 3| SvcC SvcC -->|8. Callback| C1 C1 -->|9. Saga Completed| BusinessSvc

最终选择:方案B

尽管方案B引入了新的组件(协调器服务、ZooKeeper),但它从架构层面解决了方案A的根本问题。它实现了业务逻辑和事务协调逻辑的彻底解耦,并通过集群化和ZooKeeper保证了协调层的高可用性。这在软件工程与架构实践中,是用适度的复杂性换取系统的长期稳定性和可维护性,是明智的权衡。

核心实现概览

1. ZooKeeper领导者选举

我们使用Apache Curator框架来简化与ZooKeeper的交互。LeaderLatch类提供了领导者选举的经典实现。

// SagaCoordinator.java
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.UUID;

@Component
public class SagaLeaderElector {

    private static final Logger logger = LoggerFactory.getLogger(SagaLeaderElector.class);
    private static final String LEADER_PATH = "/saga/coordinator/leader";
    
    private final CuratorFramework zkClient;
    private final SagaProcessor sagaProcessor; // 核心Saga处理逻辑
    private LeaderLatch leaderLatch;
    private final String participantId = "coordinator-" + UUID.randomUUID().toString();

    @Autowired
    public SagaLeaderElector(CuratorFramework zkClient, SagaProcessor sagaProcessor) {
        this.zkClient = zkClient;
        this.sagaProcessor = sagaProcessor;
    }

    @PostConstruct
    public void start() throws Exception {
        logger.info("Starting leader election with participant ID: {}", participantId);
        leaderLatch = new LeaderLatch(zkClient, LEADER_PATH, participantId);

        leaderLatch.addListener(new LeaderLatchListener() {
            @Override
            public void isLeader() {
                logger.warn("This instance ({}) is now the leader. Starting saga processing.", participantId);
                // 成为Leader后,开始执行核心任务,例如扫描数据库中待处理的Saga实例
                sagaProcessor.startProcessing();
            }

            @Override
            public void notLeader() {
                logger.info("This instance ({}) is not the leader. Entering standby mode.", participantId);
                // 失去Leader身份,停止处理
                sagaProcessor.stopProcessing();
            }
        });

        leaderLatch.start();
    }

    public boolean isLeader() {
        return leaderLatch != null && leaderLatch.hasLeadership();
    }

    @PreDestroy
    public void stop() {
        if (leaderLatch != null) {
            try {
                logger.info("Closing leader latch for participant ID: {}", participantId);
                leaderLatch.close();
            } catch (IOException e) {
                logger.error("Error closing leader latch.", e);
            }
        }
    }
}

2. 协调器的数据持久化与索引优化

协调器的状态必须持久化,以便在发生故障重启后能从断点处恢复。这是Saga模式可靠性的基石。我们设计两张核心表:

  • saga_instance: 存储每个Saga事务实例的全局信息。
  • saga_step_instance: 存储Saga实例中每个步骤的执行状态。

SQL Schema 定义:

CREATE TABLE `saga_instance` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `saga_instance_id` VARCHAR(128) NOT NULL COMMENT '全局唯一的Saga实例ID',
  `saga_definition_id` VARCHAR(128) NOT NULL COMMENT 'Saga定义ID',
  `status` VARCHAR(20) NOT NULL COMMENT '状态: STARTED, EXECUTING, COMPENSATING, SUCCEEDED, FAILED',
  `payload` JSON COMMENT '启动Saga的业务载荷',
  `created_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
  `updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_saga_instance_id` (`saga_instance_id`),
  -- 关键索引:用于后台任务轮询处理中的、未完成的实例
  KEY `idx_status_updated_at` (`status`, `updated_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE `saga_step_instance` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `saga_instance_id` VARCHAR(128) NOT NULL COMMENT '所属Saga实例ID',
  `step_name` VARCHAR(128) NOT NULL COMMENT '步骤名称',
  `status` VARCHAR(20) NOT NULL COMMENT '状态: PENDING, EXECUTING, SUCCEEDED, FAILED',
  `attempt_count` INT NOT NULL DEFAULT 0 COMMENT '重试次数',
  `action_request` JSON COMMENT '正向操作请求体',
  `compensate_request` JSON COMMENT '补偿操作请求体',
  `last_error` TEXT COMMENT '最近一次错误信息',
  `created_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
  `updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
  PRIMARY KEY (`id`),
  -- 关键索引:用于快速查找某个Saga实例的所有步骤
  KEY `idx_saga_instance_id` (`saga_instance_id`),
  -- 关键索引:用于后台任务轮询需要重试的失败步骤
  KEY `idx_status_attempt_updated_at` (`status`, `attempt_count`, `updated_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

索引优化的重要性:

这里的坑在于,如果不对状态和时间戳创建联合索引,协调器的后台恢复和重试任务将是灾难性的。Leader节点需要定期扫描 saga_instance 表中状态为 STARTEDEXECUTING 的超时实例,以及 saga_step_instance 表中状态为 FAILED 且重试次数未达上限的步骤。如果没有 idx_status_updated_atidx_status_attempt_updated_at 这两个索引,这些扫描操作将导致全表扫描,随着Saga实例数量的增多,数据库CPU会迅速被打满,协调器性能急剧下降。在真实项目中,忽略索引优化是导致Saga协调器不可靠的常见错误。

3. 前端异步交互与状态追踪 (Vue.js)

当用户在前端点击“创建活动”按钮时,后端会立即启动一个Saga流程。由于这个流程可能耗时数秒甚至更长,我们不能让用户界面一直处于阻塞等待状态。

交互流程:

  1. 发起请求: Vue.js前端调用后端API发起创建请求。
  2. 立即响应: 后端API接收到请求后,不是同步执行整个流程,而是立即向Saga协调器提交一个“启动Saga”的请求,然后将生成的 saga_instance_id 同步返回给前端。
  3. 前端轮询或WebSocket: 前端拿到 saga_instance_id 后,启动一个定时器,定期向后端的一个状态查询API发送请求,或者建立一个WebSocket连接来接收实时的状态推送。
  4. UI状态更新: 根据后端返回的Saga状态(EXECUTING, SUCCEEDED, FAILED),动态更新UI,向用户展示友好的进度提示。

Vue 3 (Composition API) 与 Pinia 示例:

// services/campaignApi.ts
import axios from 'axios';

export const campaignApi = {
  async startCampaignCreation(payload: any): Promise<{ sagaInstanceId: string }> {
    // 1. 发起请求
    const response = await axios.post('/api/campaigns/create-async', payload);
    // 2. 立即返回sagaInstanceId
    return response.data;
  },
  
  async getSagaStatus(sagaInstanceId: string): Promise<{ status: string; details: any[] }> {
    const response = await axios.get(`/api/saga-status/${sagaInstanceId}`);
    return response.data;
  }
};

// stores/campaignStore.ts
import { defineStore } from 'pinia';
import { ref } from 'vue';
import { campaignApi } from '@/services/campaignApi';

export const useCampaignStore = defineStore('campaign', () => {
  const currentSagaId = ref<string | null>(null);
  const sagaStatus = ref<string>('IDLE'); // IDLE, PENDING, SUCCEEDED, FAILED
  const pollingInterval = ref<number | null>(null);

  const createCampaign = async (payload: any) => {
    sagaStatus.value = 'PENDING';
    try {
      const { sagaInstanceId } = await campaignApi.startCampaignCreation(payload);
      currentSagaId.value = sagaInstanceId;
      startPollingStatus();
    } catch (error) {
      sagaStatus.value = 'FAILED';
      console.error("Failed to start campaign creation saga", error);
    }
  };

  const startPollingStatus = () => {
    if (!currentSagaId.value) return;

    // 清除旧的轮询
    if (pollingInterval.value) {
      clearInterval(pollingInterval.value);
    }

    pollingInterval.value = setInterval(async () => {
      if (!currentSagaId.value) {
        stopPolling();
        return;
      }
      try {
        const { status } = await campaignApi.getSagaStatus(currentSagaId.value);
        
        // 核心状态转换逻辑
        if (status === 'SUCCEEDED' || status === 'FAILED') {
          sagaStatus.value = status;
          stopPolling();
        } else {
          sagaStatus.value = 'PENDING'; // 保持 PENDING 或更新为 EXECUTING 等更细粒度的状态
        }
      } catch (error) {
        console.error("Error polling saga status", error);
        sagaStatus.value = 'FAILED';
        stopPolling();
      }
    }, 2000); // 每2秒轮询一次
  };
  
  const stopPolling = () => {
    if (pollingInterval.value) {
      clearInterval(pollingInterval.value);
      pollingInterval.value = null;
    }
  };

  return { createCampaign, sagaStatus };
});

Vue Component:

<!-- CampaignCreator.vue -->
<template>
  <div>
    <button @click="handleCreate" :disabled="isLoading">
      {{ buttonText }}
    </button>
    <div v-if="sagaStatus !== 'IDLE'" class="status-indicator">
      Current Status: {{ sagaStatus }}
    </div>
  </div>
</template>

<script setup lang="ts">
import { computed } from 'vue';
import { useCampaignStore } from '@/stores/campaignStore';
import { storeToRefs } from 'pinia';

const campaignStore = useCampaignStore();
const { sagaStatus } = storeToRefs(campaignStore);

const isLoading = computed(() => sagaStatus.value === 'PENDING');

const buttonText = computed(() => {
  switch (sagaStatus.value) {
    case 'PENDING':
      return 'Creating...';
    case 'SUCCEEDED':
      return 'Creation Successful!';
    case 'FAILED':
      return 'Creation Failed, Try Again';
    default:
      return 'Create Campaign';
  }
});

const handleCreate = () => {
  const campaignPayload = { name: 'My New Campaign', ... };
  campaignStore.createCampaign(campaignPayload);
};
</script>

这种前后端分离的异步交互模式,极大地提升了用户体验,并与后端Saga模式的最终一致性本质完美契合。

架构的扩展性与局限性

此架构的扩展性体现在:

  1. 协调器水平扩展: 可以根据负载情况,随时增加协调器实例数量以增强可用性,ZooKeeper会确保同一时间只有一个Leader。
  2. Saga流程定义: 新的分布式事务流程可以通过配置(例如JSON或YAML文件)动态加载到协调器中,无需修改协调器核心代码。
  3. 参与者服务解耦: 任何符合“Action + Compensation”接口的服务都可以作为参与者接入Saga流程。

然而,该方案并非银弹。它的局限性客观存在:

  1. 运维复杂性增加: 引入了ZooKeeper和协调器服务,对监控、部署和运维提出了更高的要求。ZooKeeper集群自身的稳定性至关重要。
  2. 最终一致性的业务约束: Saga模式本质上是最终一致性。在事务执行过程中,系统会存在一个短暂的中间状态。业务设计必须能够容忍这种不一致性窗口。例如,在活动创建成功但优惠券尚未生成时,不能让用户看到这个活动。
  3. 性能瓶颈转移: 虽然解决了业务服务的耦合,但所有Saga的状态推进都依赖于中心的协调器和其后端数据库。数据库的写入性能和查询性能(再次强调索引优化的重要性)可能成为新的瓶颈。
  4. 调试难度: 分布式系统的调试本身就很复杂。虽然集中式协调器改善了可观测性,但要完整追踪一个跨越多个服务的Saga实例,仍然需要强大的分布式链路追踪系统(如OpenTelemetry)的支持。

  目录