我们团队的一个核心 Elixir 服务最近在一次关键的分布式事务处理中途崩溃。那是一个典型的 Saga 模式实现:用户注册流程,涉及Accounts服务创建用户,然后调用Wallets服务初始化钱包。问题发生在第二步,Wallets服务调用失败后,Saga 协调器本应触发对Accounts服务的补偿操作(删除用户),但协调器实例自身因一次意外的节点重启而终止。恢复后,我们花了数小时在混杂的日志海洋中手动拼凑线索,试图确定哪些 Saga 实例需要手动补偿。日志分散、格式不一,缺乏关键的事务上下文,这让故障恢复变成了一场灾难。
这次事故暴露了我们部署和可观测性策略的根本性缺陷。应用部署依赖于 Ansible Playbook,环境差异导致行为不一致;日志记录随意,缺乏对分布式事务状态的追踪能力。我们需要一个根本性的解决方案:一个包含应用、所有依赖、前端静态资源以及精细配置的可观测性代理的、完全不可变的部署单元。这个单元从启动那一刻起,就必须能以结构化的、可审计的方式报告其内部关键流程(如 Saga)的状态。
我们的技术栈包含 Elixir (Phoenix)、Sass/SCSS 和一些后台服务。经过讨论,我们确定了新的构建和观测流水线核心组件:Packer 用于构建不可变的机器镜像(AMI),Vector 作为高性能的可观测性数据管道代理,直接集成在镜像内。
第一步:改造 Elixir Saga,实现状态驱动的结构化日志
在解决基础设施问题之前,必须先从源头规范数据。如果应用程序产生的日志本身就是一团乱麻,再强大的工具也无济于事。我们的目标是让 Saga 的每一步状态转换都产生一条带有完整上下文的、机器可读的 JSON 日志。
在 Elixir 中,一个 Saga 协调器通常是一个 GenServer,负责管理整个事务的生命周期。我们重构了原有的 Saga 协调器,强制在每个关键逻辑点(开始、步骤成功、步骤失败、开始补偿、补偿成功)都通过一个专用的日志模块来记录状态。
下面是这个 Saga.Coordinator 的核心实现。注意,它不处理复杂的业务逻辑,只关注状态流转和日志记录。
# lib/my_app/saga/coordinator.go
defmodule MyApp.Saga.Coordinator do
use GenServer
require Logger
alias MyApp.Accounts
alias MyApp.Wallets
alias MyApp.Saga.Logger, as: SagaLogger
# Public API
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: via_tuple(args.saga_id))
end
def init(args) do
# saga_id 是整个事务的唯一标识
# transaction_id 是本次执行的唯一标识,用于重试追踪
state = %{
saga_id: args.saga_id,
transaction_id: :erlang.unique_integer() |> Integer.to_string(),
payload: args.payload,
status: :initial,
steps: [
%{name: :create_user, status: :pending, compensation: &Accounts.delete_user/1},
%{name: :initialize_wallet, status: :pending, compensation: &Wallets.delete_wallet/1}
]
}
# 启动 Saga
SagaLogger.info(state, :saga_started)
{:ok, state, {:continue, :execute_next_step}}
end
# Main execution loop
def handle_continue(:execute_next_step, state) do
case find_next_pending_step(state.steps) do
nil ->
# 所有步骤完成
SagaLogger.info(%{state | status: :completed}, :saga_completed)
{:stop, :normal, state}
step ->
execute_step(step, state)
end
end
# ... 其他 GenServer 回调 ...
defp execute_step(step, state) do
SagaLogger.info(state, step.name, %{status: :started})
# 模拟执行业务逻辑
result =
case step.name do
:create_user -> Accounts.create_user(state.payload.user_params)
:initialize_wallet -> Wallets.initialize_wallet(state.payload.user_id)
end
case result do
{:ok, step_result} ->
SagaLogger.info(state, step.name, %{status: :completed, result: step_result})
new_state = update_step_status(state, step.name, :completed)
{:noreply, new_state, {:continue, :execute_next_step}}
{:error, reason} ->
SagaLogger.error(state, step.name, %{status: :failed, reason: reason})
# 失败,开始补偿流程
compensate_state = %{state | status: :compensating}
SagaLogger.warn(compensate_state, :compensation_started)
{:noreply, compensate_state, {:continue, :compensate_previous_steps}}
end
end
# 补偿逻辑
def handle_continue(:compensate_previous_steps, state) do
# ... 省略补偿逻辑实现,但同样需要记录详细日志 ...
# 例如:SagaLogger.warn(state, step.name, %{status: :compensating})
# SagaLogger.warn(state, step.name, %{status: :compensated})
{:stop, {:shutdown, :compensated}, state}
end
# Helper functions
defp via_tuple(saga_id), do: {:via, Registry, {MyApp.SagaRegistry, saga_id}}
defp find_next_pending_step(steps), do: Enum.find(steps, &(&1.status == :pending))
defp update_step_status(state, step_name, new_status) do
# ... 更新步骤状态 ...
state
end
end
关键在于 SagaLogger 模块。我们没有直接使用 Elixir 的 Logger,而是封装了一层,强制所有 Saga 日志都附加了事务上下文,并以 JSON 格式输出。
# lib/my_app/saga/logger.go
defmodule MyApp.Saga.Logger do
require Logger
def info(state, event, meta \\ %{}) do
log(:info, state, event, meta)
end
def warn(state, event, meta \\ %{}) do
log(:warn, state, event, meta)
end
def error(state, event, meta \\ %{}) do
log(:error, state, event, meta)
end
defp log(level, state, event, meta) do
# 核心:将所有上下文信息构建为 map,并传递给 Logger
log_data = %{
# 业务上下文
saga_id: state.saga_id,
transaction_id: state.transaction_id,
saga_status: state.status,
event: event,
# 合并额外元数据
meta: meta
}
Logger.log(level, fn -> Jason.encode!(log_data) end,
# 确保元数据能被 logger backend (如 :console) 正确处理
application: :my_app,
domain: [:saga_audit]
)
end
end
同时,我们需要在 config/config.exs 中配置 Logger 使用 JSON 编码器。
# config/config.exs
config :logger, :console,
format: "$message\n",
metadata: [:request_id]
# 为生产环境配置 JSON 输出
config :logger,
backends: [:console],
handle_sasl_reports: true
# 在 prod.exs 中可以覆盖为更适合生产的格式
# config :logger, :console, format: {Jason, :encode!}
现在,每当 Saga 事务运行时,我们的应用日志文件 (_build/prod/rel/my_app/log/elixir.log 或类似路径) 中会产生如下格式的日志,这正是 Vector 需要的“原材料”。
{"saga_id":"saga-123","transaction_id":"unique-456","saga_status":"initial","event":"create_user","meta":{"status":"started"}}
{"saga_id":"saga-123","transaction_id":"unique-456","saga_status":"initial","event":"create_user","meta":{"status":"completed","result":{"user_id":"user-abc"}}}
{"saga_id":"saga-123","transaction_id":"unique-456","saga_status":"initial","event":"initialize_wallet","meta":{"status":"started"}}
{"saga_id":"saga-123","transaction_id":"unique-456","saga_status":"initial","event":"initialize_wallet","meta":{"status":"failed","reason":"insufficient_funds"}}
{"saga_id":"saga-123","transaction_id":"unique-456","saga_status":"compensating","event":"compensation_started","meta":{}}
第二步:配置 Vector,解析并路由 Saga 日志
Vector 的角色是镜像内的“日志总管”。它必须能监视 Elixir 应用的日志文件,解析 JSON,丰富元数据,然后安全地发送到我们的中央日志存储(例如 Loki 或 S3)。它的配置文件 vector.toml 是定义这一切行为的核心。
一个常见的错误是把过多逻辑放在日志聚合器中。我们的原则是:应用负责产生高质量、结构化的数据,Vector 负责路由和 enriquecimiento (数据丰富化),而不是复杂的解析和修复。
# /etc/vector/vector.toml
# 1. 数据源 (Sources)
[sources.elixir_app_logs]
type = "file"
# 我们将在 Packer 构建脚本中确定这个路径
include = ["/opt/app/log/*.log"]
# 从文件开始位置读取,避免丢失启动时的日志
start_at_beginning = true
# 重要:处理 Elixir 日志可能的多行堆栈跟踪
multiline.mode = "halt_before"
multiline.marker = "^{"
# 2. 数据转换 (Transforms)
# 解析 JSON 字符串为结构化数据
[transforms.parse_elixir_logs]
type = "remap"
inputs = ["elixir_app_logs"]
# Vector Remap Language (VRL) 是其强大之处
source = '''
# .message 字段是原始日志行,首先尝试解析为 JSON
parsed, err = parse_json(.message)
if err != null {
# 如果解析失败,可能是堆栈跟踪或其他非结构化日志
# 将其标记为错误,并保留原始消息
.level = "unknown"
.unparsed_message = .message
} else {
# 解析成功,将解析后的 JSON 对象合并到事件的根级别
. = merge(., parsed)
# 移除原始的 message 字段,避免数据冗余
remove(.message)
# 根据事件内容设置日志级别
if .event == "saga_completed" {
.level = "info"
} else if .saga_status == "compensating" {
.level = "warn"
} else if .meta.status == "failed" {
.level = "error"
} else {
.level = "info"
}
}
'''
# 添加 EC2 实例元数据,这对于在聚合平台中区分日志来源至关重要
[transforms.add_instance_metadata]
type = "aws_ec2_metadata"
inputs = ["parse_elixir_logs"]
# 将获取到的元数据放入一个嵌套对象中
[transforms.add_instance_metadata.fields]
instance_id = "instance_id"
availability_zone = "availability_zone"
region = "region"
instance_type = "instance_type"
# 3. 数据目的地 (Sinks)
# 在 Packer 构建期间,我们只输出到控制台进行调试
[sinks.console_debug]
type = "console"
inputs = ["add_instance_metadata"]
encoding.codec = "json"
# 生产环境中会替换为真实的 sink,例如 Loki
# [sinks.loki_prod]
# type = "loki"
# inputs = ["add_instance_metadata"]
# endpoint = "http://loki.internal:3100"
# # 使用 saga_id 和 event 作为 Loki 的标签,便于查询
# labels.saga_id = "{{ saga_id }}"
# labels.event = "{{ event }}"
这个配置体现了 Vector 的几个优点:
- 高性能:基于 Rust,资源占用远低于 Logstash。
- 强大的 VRL:
remap转换允许我们用简洁的表达式语言处理复杂的逻辑,如条件解析、字段合并和动态级别设定。 - 生态集成:
aws_ec2_metadata这种内置的 transform 极大地简化了云环境下的元数据丰富化工作。
第三步:使用 Packer HCL 构建黄金镜像
Packer 是整个流程的粘合剂。它通过一个声明式的 HCL 文件定义了从一个基础镜像(如 Amazon Linux 2)开始,到安装所有依赖、配置应用和 Vector,最终生成一个新的、可启动的 AMI 的所有步骤。
这是我们的 build.pkr.hcl 文件。它比简单的 shell 命令堆砌要复杂,因为它需要处理依赖安装、代码编译、资源构建和系统服务配置。
// build.pkr.hcl
packer {
required_plugins {
amazon = {
version = ">= 1.0.0"
source = "github.com/hashicorp/amazon"
}
}
}
variable "aws_region" {
type = string
default = "us-east-1"
}
variable "app_version" {
type = string
default = "1.0.0"
}
source "amazon-ebs" "elixir-saga-app" {
region = var.aws_region
instance_type = "t3.micro"
source_ami_filter {
filters = {
name = "amzn2-ami-hvm-*-x86_64-gp2"
root-device-type = "ebs"
virtualization-type = "hvm"
}
most_recent = true
owners = ["amazon"]
}
ssh_username = "ec2-user"
ami_name = "elixir-saga-app-${var.app_version}-${formatdate("YYYYMMDDHHmmss", timestamp())}"
tags = {
Name = "ElixirSagaApp"
Version = var.app_version
Provisioner = "Packer"
}
}
build {
name = "elixir-saga-app-build"
sources = ["source.amazon-ebs.elixir-saga-app"]
# Provisioners: The core logic of the build
provisioner "shell" {
inline = [
"sudo yum update -y",
"sudo yum install -y gcc openssl-devel automake autoconf ncurses-devel erlang socat", // Erlang OTP
"sudo yum install -y git",
// 安装 Elixir
"git clone https://github.com/elixir-lang/elixir.git /tmp/elixir",
"cd /tmp/elixir && git checkout v1.14.3 && sudo make install clean",
// 安装 Node.js 和 npm 用于 Sass/SCSS 编译
"curl -sL https://rpm.nodesource.com/setup_18.x | sudo bash -",
"sudo yum install -y nodejs",
"sudo npm install -g sass"
]
}
provisioner "file" {
source = "../my_app/" // 上传整个 Elixir 项目
destination = "/tmp/app"
}
provisioner "shell" {
inline = [
"sudo mv /tmp/app /opt/app",
"cd /opt/app",
// 设置 Mix 环境为 prod
"export MIX_ENV=prod",
// 获取依赖并编译
"sudo mix local.hex --force",
"sudo mix local.rebar --force",
"sudo mix deps.get --only prod",
"sudo mix compile",
// 编译前端资源,Sass/SCSS 在这里被处理
"sudo mix assets.deploy",
// 生成 Elixir release,这是生产部署的最佳实践
"sudo mix release",
// 清理构建缓存和源码,减小镜像体积
"sudo rm -rf /opt/app/deps /opt/app/_build /opt/app/assets /opt/app/test"
]
}
# --- Vector Installation and Configuration ---
provisioner "shell" {
inline = [
"curl -1sLf https://packages.vector.dev/gpg.key | sudo gpg --dearmor -o /usr/share/keyrings/vector-archive-keyring.gpg",
"echo \"deb [arch=amd64 signed-by=/usr/share/keyrings/vector-archive-keyring.gpg] https://packages.vector.dev/debian/$(. /etc/os-release && echo $ID) $(lsb_release -cs) main\" | sudo tee /etc/apt/sources.list.d/vector.list",
"sudo yum install -y vector", // 假设使用 yum repo,或直接下载 RPM
"sudo mkdir -p /etc/vector"
]
}
provisioner "file" {
source = "./vector.toml"
destination = "/tmp/vector.toml"
}
provisioner "shell" {
inline = [
"sudo mv /tmp/vector.toml /etc/vector/vector.toml"
]
}
# --- Systemd Service Configuration ---
provisioner "shell" {
inline = [
// 创建应用服务
"sudo touch /etc/systemd/system/my_app.service",
// 注意这里的路径指向 release 生成的可执行文件
"echo '[Unit]\nDescription=My Elixir App\nAfter=network.target\n\n[Service]\nUser=ec2-user\nGroup=ec2-user\nWorkingDirectory=/opt/app\nExecStart=/opt/app/_build/prod/rel/my_app/bin/my_app start\nRestart=always\n\n[Install]\nWantedBy=multi-user.target' | sudo tee /etc/systemd/system/my_app.service",
// 创建 Vector 服务
"sudo touch /etc/systemd/system/vector.service",
"echo '[Unit]\nDescription=Vector\nAfter=network-online.target\n\n[Service]\nUser=root\nGroup=root\nExecStart=/usr/bin/vector --config /etc/vector/vector.toml\nRestart=always\n\n[Install]\nWantedBy=multi-user.target' | sudo tee /etc/systemd/system/vector.service",
// 启用服务
"sudo systemctl enable my_app",
"sudo systemctl enable vector"
]
}
}
执行 packer build . 命令后,Packer 会自动完成以下所有工作:
- 启动一个临时的 EC2 实例。
- 在实例上执行所有
provisioner脚本:安装 Erlang/Elixir/Node.js,编译应用代码(包括Sass),安装并配置 Vector。 - 创建两个
systemd服务,确保应用和 Vector 在实例启动时自动运行。 - 基于这个配置好的实例,创建一个新的 AMI。
- 销毁临时实例。
我们最终得到一个 AMI ID。每次部署新版本,我们只需要用这个 AMI 启动新实例,替换旧实例即可。这就是不可变基础设施的核心实践。
graph TD
subgraph Packer Build Process
A[Start from Base AMI] --> B{Provisioners};
B --> C[Install Elixir/Node.js];
B --> D[Upload App Code];
D --> E[mix compile & mix assets.deploy];
E --> F[mix release];
B --> G[Install & Configure Vector];
F & G --> H[Setup Systemd Services];
H --> I[Create New AMI];
end
subgraph Runtime on EC2
J[EC2 Instance Boot] --> K[Systemd Starts Services];
K --> L[Elixir App Running];
K --> M[Vector Agent Running];
L -- writes to --> N[app.log];
M -- tails --> N;
N -- structured log --> M;
M -- VRL Transform --> O{Enriched Log Event};
O -- sends to --> P[Central Logging Platform];
end
I -.-> J;
成果与反思
现在,当一个新实例从我们构建的 AMI 启动时,我们的 Elixir 应用和 Vector 服务会同时启动。Saga 事务产生的每一条结构化日志都会被 Vector 实时捕获、丰富元数据(如 instance_id),然后发送到中央日志平台。
当再次出现文章开头那种故障时,我们的恢复流程截然不同了。我们可以直接在日志平台中通过 saga_id 查询,瞬间就能看到该事务的所有步骤、最终状态以及失败原因。补偿流程不再是猜测,而是基于确凿的数据进行。由于镜像是不可变的,我们也消除了环境不一致导致的问题。
局限性与未来迭代方向
这个方案并非银弹。它有明确的适用边界和需要权衡的缺点。
首先,它将应用、运行时和观测代理紧密耦合在一个 AMI 中。任何一个组件的微小更新(例如,修复一个 Vector 的 bug)都需要重新构建和部署整个 AMI,这在快速迭代的场景下可能显得笨重。
其次,虽然我们实现了日志的可观测性,但 Saga 的状态本身(通常存储在内存或数据库中)的持久化和恢复是另一个复杂问题。我们的方案解决了“审计”问题,但没有直接解决“状态恢复”问题。一个因节点故障而中断的内存中 Saga 状态仍然会丢失,尽管我们现在能从日志中精确知道它中断在哪一步。
未来的迭代路径很清晰。可以将此模式容器化,使用 Dockerfile 替代 Packer provisioners 来定义构建过程,然后将容器镜像部署到 Kubernetes。在 Kubernetes 环境中,Vector 可以作为 DaemonSet 运行,与应用容器解耦,从而允许它们独立更新。然而,这又会引入新的复杂性,如服务发现、配置管理和跨 Pod 的日志收集。但无论如何,将应用及其观测能力视为一个整体来构建和部署的思路,是我们从这次故障中学到的最宝贵的教训。