利用 eBPF 与 Zig 构建对 PostgreSQL 协议的内核态实时审计与阻断


数据库安全审计,一个老生常谈却又永远棘手的问题。传统的方案无非两种:在 PostgreSQL 内部开启日志审计,或者在应用和数据库之间架设一个代理。前者对数据库本身性能消耗巨大,在高并发下 log_statement = 'all' 几乎是灾难;后者引入了新的网络跳数、单点故障风险和显著的延迟。我们的目标是在不侵入应用和数据库、性能损耗可忽略的前提下,实现对特定高危 SQL 的实时阻断。这在 Kubernetes 环境下尤其重要,网络拓扑复杂,传统的边界防火墙早已失效。

思路很快转向了 eBPF。借助 Cilium,我们已经能在内核网络层面获得极佳的可观测性和控制力。但一个核心问题摆在面前:PostgreSQL 的 Wire Protocol 相当复杂,且是全状态的。想在 eBPF 内核探针那种受限的环境里(有限的指令数、不能有循环、栈空间严格限制)完整解析协议,无异于痴人说梦。

一个折衷且务实的架构浮现出来:让 eBPF 做它最擅长的事——高速的、无差别的包过滤;将复杂的协议解析工作,交给一个性能同样极致的用户态 Agent。这个 Agent 的技术选型至关重要,它需要处理从内核高速旁路过来的原始网络包,性能稍有不慎就会成为整个链路的瓶颈。C/C++ 是显而易见的选择,但 Zig 凭借其内存安全、卓越的 C 互操作性以及现代化的构建体系,进入了我们的视野。

最终的架构设计如下:

sequenceDiagram
    participant Client
    participant K8s Node (Kernel)
    participant Zig Agent (Userspace)
    participant PostgreSQL Pod

    Client->>K8s Node (Kernel): 发送 SQL 查询 (TCP Packet on Port 5432)
    Note over K8s Node (Kernel): eBPF 程序挂载在 TC Ingress Hook
    K8s Node (Kernel)->>K8s Node (Kernel): eBPF: 识别为 PostgreSQL 流量
    K8s Node (Kernel)->>Zig Agent (Userspace): 通过 BPF Ring Buffer 发送原始数据包
    Zig Agent (Userspace)->>Zig Agent (Userspace): 解析 PostgreSQL Wire Protocol
    Zig Agent (Userspace)->>Zig Agent (Userspace): 根据策略库(另一PG实例)进行匹配
    alt 匹配到高危 SQL
        Zig Agent (Userspace)->>K8s Node (Kernel): 通过 BPF Map 更新连接状态为 "DROP"
        K8s Node (Kernel)->>Client: (数据包被内核直接丢弃, 连接超时)
    else 未匹配
        Note over K8s Node (Kernel): 数据包正常流转
        K8s Node (Kernel)->>PostgreSQL Pod: 转发原始数据包
        PostgreSQL Pod-->>Client: (正常响应)
    end

这个架构的核心在于,eBPF 程序本身并不做复杂的决策,它只负责将所有发往 PostgreSQL 端口的 TCP 包的 payload 无脑地推送到一个 BPF_MAP_TYPE_RINGBUF 中。用户态的 Zig Agent 作为唯一的消费者,从 Ring Buffer 中取出数据,进行协议解析和策略判断。这种模型的优势在于,内核路径极简且高效,将复杂性完全隔离在了用户态。

第一步:内核的眼睛 - eBPF 数据包捕获

我们需要一个 eBPF 程序,它能挂载到网络接口的流量控制层(Traffic Control),捕获所有进入节点的数据包。这里我们使用 C 语言编写,并通过 libbpf 框架加载。

pg_watcher.bpf.c:

#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_core_read.h>
#include <bpf/bpf_endian.h>

#define TC_ACT_OK 0
#define TC_ACT_SHOT 2 // 丢弃数据包
#define ETH_P_IP 0x0800
#define PG_PORT 5432

// 定义 Ring Buffer,用于向用户态发送数据
struct {
    __uint(type, BPF_MAP_TYPE_RINGBUF);
    __uint(max_entries, 256 * 1024); // 256KB
} rb SEC(".maps");

// 定义一个 Hash Map,用于接收用户态的指令,key 是连接的五元组
struct conn_key_t {
    __u32 src_ip;
    __u32 dst_ip;
    __u16 src_port;
    __u16 dst_port;
};

// value 定义了要执行的动作,例如 DROP
enum conn_action {
    ACTION_OK = 0,
    ACTION_DROP = 1,
};

struct {
    __uint(type, BPF_MAP_TYPE_HASH);
    __uint(max_entries, 1024);
    __type(key, struct conn_key_t);
    __type(value, enum conn_action);
} conn_directives SEC(".maps");


SEC("tc")
int tc_ingress(struct __sk_buff *skb) {
    void *data_end = (void *)(long)skb->data_end;
    void *data = (void *)(long)skb->data;

    struct ethhdr *eth = data;
    if ((void *)eth + sizeof(*eth) > data_end) {
        return TC_ACT_OK;
    }

    if (eth->h_proto != bpf_htons(ETH_P_IP)) {
        return TC_ACT_OK;
    }

    struct iphdr *ip = data + sizeof(*eth);
    if ((void *)ip + sizeof(*ip) > data_end) {
        return TC_ACT_OK;
    }

    if (ip->protocol != IPPROTO_TCP) {
        return TC_ACT_OK;
    }

    struct tcphdr *tcp = (struct tcphdr *)((__u8 *)ip + (ip->ihl * 4));
    if ((void *)tcp + sizeof(*tcp) > data_end) {
        return TC_ACT_OK;
    }

    // 关键:只关心发往 PostgreSQL 端口的流量
    if (tcp->dest != bpf_htons(PG_PORT)) {
        return TC_ACT_OK;
    }

    // 构造连接的 Key
    struct conn_key_t key = {};
    key.src_ip = ip->saddr;
    key.dst_ip = ip->daddr;
    key.src_port = tcp->source;
    key.dst_port = tcp->dest;

    // 查询 Map 中是否有针对此连接的指令
    enum conn_action *action = bpf_map_lookup_elem(&conn_directives, &key);
    if (action && *action == ACTION_DROP) {
        // 如果用户态Agent要求丢弃,则直接丢弃
        bpf_printk("Dropping packet for malicious connection\n");
        return TC_ACT_SHOT;
    }
    
    // 忽略没有 payload 的 TCP 包(如纯 ACK)
    __u32 payload_size = ip->tot_len - (ip->ihl * 4) - (tcp->doff * 4);
    if (payload_size <= 0) {
        return TC_ACT_OK;
    }

    // 将数据包的 payload 推送到 Ring Buffer
    // 这里为了简化,我们没有处理 TCP segment 重组,这是此方案的一个关键简化点
    // 真实项目中需要更复杂的处理
    void *payload = (__u8 *)tcp + (tcp->doff * 4);
    if (payload + 1 > data_end) {
        return TC_ACT_OK;
    }

    // 提交到 ring buffer
    // 为保证性能,不应发送过大的包,可以截断
    u64 flags = BPF_RB_NO_WAKEUP;
    long ret = bpf_ringbuf_output(skb, flags);

    return TC_ACT_OK;
}

char LICENSE[] SEC("license") = "GPL";

这份 eBPF 代码非常直接:解析 L2/L3/L4 头部,如果目标端口是 5432,则先检查 conn_directives map 中是否有来自用户态的 “DROP” 指令。如果没有,就将整个 skb(包含了元数据和 payload)推送到 Ring Buffer。这里的 bpf_ringbuf_output 是关键,它比旧的 bpf_perf_event_output 性能更好,因为它使用了一个共享内存区域,避免了每个事件的内存拷贝。

第二步:用户态的大脑 - Zig Agent 实现

现在轮到 Zig Agent 上场了。它的职责是:

  1. 加载并挂载 eBPF 程序。
  2. 循环从 Ring Buffer 中读取数据包。
  3. 解析 PostgreSQL 前端/后端消息协议。
  4. 连接策略数据库,获取阻断规则。
  5. 对于匹配规则的查询,通过 BPF Map 通知内核丢弃该连接后续所有包。

首先是 build.zig 文件,我们需要链接 libbpf

build.zig:

const std = @import("std");

pub fn build(b: *std.Build) void {
    const target = b.standardTargetOptions(.{});
    const optimize = b.standardOptimizeOption(.{});

    const exe = b.addExecutable(.{
        .name = "pg-agent",
        .root_source_file = .{ .path = "src/main.zig" },
        .target = target,
        .optimize = optimize,
    });

    // 必须链接 libbpf 和 libelf
    exe.linkSystemLibrary("bpf");
    exe.linkSystemLibrary("elf");

    b.installArtifact(exe);

    const run_cmd = b.addRunArtifact(exe);
    run_cmd.step.dependOn(b.getInstallStep());

    if (b.args) |args| {
        run_cmd.addArgs(args);
    }

    const run_step = b.step("run", "Run the application");
    run_step.dependOn(&run_cmd.step);
}

接下来是核心的 Agent 逻辑 src/main.zig。代码会很长,但每个部分都至关重要。

src/main.zig:

const std = @import("std");
const c = @cImport({
    @cInclude("bpf/libbpf.h");
    @cInclude("bpf/bpf.h");
    // 假设我们有一个预先生成的 skeleton 文件
    @cInclude("pg_watcher.skel.h"); 
});

// PostgreSQL Wire Protocol Message Tags
const PG_MSG_QUERY = 'Q';

// 用于解析 PG 消息
fn parsePgQuery(payload: []const u8) !?[]const u8 {
    if (payload.len < 5) return null; // 至少需要 1 字节 tag + 4 字节 length

    const tag = payload[0];
    if (tag != PG_MSG_QUERY) {
        // 我们只关心 Simple Query 消息
        // 生产环境中需要处理 StartupMessage, Parse, Bind, Execute 等
        return null;
    }

    // PG 协议使用网络字节序 (Big Endian)
    const msg_len = std.mem.readInt(u32, payload[1..5], .big);

    // 简单的完整性检查
    if (msg_len > payload.len - 1) {
        std.log.warn("Incomplete PG message received. len={d}, payload_len={d}", .{ msg_len, payload.len });
        return null;
    }

    // 提取 SQL 查询字符串,它是以 null 结尾的
    const query_end = std.mem.indexOf(u8, payload[5 .. 5 + msg_len - 4], "\x00") orelse return error.InvalidQueryString;
    const sql_query = payload[5 .. 5 + query_end];
    
    return sql_query;
}

// 模拟的策略检查引擎
// 在真实项目中,这里会连接一个 PostgreSQL 数据库来获取规则
fn isQueryMalicious(query: []const u8) bool {
    // 这是一个非常简化的示例,真实场景会使用正则表达式或更复杂的 AST 解析
    const forbidden_patterns = [_][]const u8{
        "delete from users",
        "drop table",
    };

    for (forbidden_patterns) |pattern| {
        if (std.mem.contains(u8, std.ascii.lowerSlice(query), pattern)) {
            return true;
        }
    }
    return false;
}

// Ring Buffer 的回调处理函数
fn handleEvent(ctx: ?*anyopaque, data: ?*anyopaque, size: c.size_t) c_int {
    const self = @ptrCast(*c.pg_watcher_bpf, @alignCast(@ptrCast(*anyopaque, ctx)));
    const skb_data = @ptrCast([*]u8, @alignCast(data))[0..size];

    // 从原始的 skb 数据中解析 IP 和 TCP 头
    // 省略这部分代码,因为它涉及到复杂的指针偏移和字节序转换
    // 假设我们已经成功解析出了五元组 key 和 tcp payload
    const key: c.conn_key_t = .{ 
        .src_ip = 0, // placeholder 
        .dst_ip = 0, // placeholder
        .src_port = 0, // placeholder
        .dst_port = 0, // placeholder
    }; 
    const payload: []const u8 = &[_]u8{}; // placeholder

    // ---- 核心逻辑 ----
    const sql_query = parsePgQuery(payload) catch |err| {
        std.log.warn("Failed to parse PG protocol: {s}", .{@errorName(err)});
        return 0;
    };

    if (sql_query) |query| {
        std.log.info("Intercepted SQL query: {s}", .{query});
        if (isQueryMalicious(query)) {
            std.log.err("!!! MALICIOUS QUERY DETECTED !!!: {s}", .{query});
            
            // 通知 eBPF 内核程序,丢弃这个连接后续的所有包
            const action = c.ACTION_DROP;
            const ret = c.bpf_map_update_elem(
                c.bpf_map__fd(self.maps.conn_directives), 
                &key, 
                &action, 
                c.BPF_ANY
            );

            if (ret != 0) {
                std.log.err("Failed to update bpf map: {s}", .{std.os.strerror(std.os.errno())});
            } else {
                std.log.info("Drop directive sent to kernel for connection.", .{});
            }
        }
    }
    
    return 0; // 表示成功处理
}

pub fn main() !void {
    var skel: *c.pg_watcher_bpf = null;
    var ring_buffer: *c.ring_buf = null;

    // libbpf 会自动处理 BPF 程序的加载、验证和挂载
    skel = c.pg_watcher_bpf__open_and_load();
    if (skel == null) {
        std.log.err("Failed to open and load BPF skeleton", .{});
        return error.BpfLoadFailed;
    }
    defer _ = c.pg_watcher_bpf__destroy(skel);

    // 将 eBPF 程序挂载到网络接口上,例如 "eth0"
    // 生产环境中需要动态发现所有相关接口
    const iface_name = "eth0"; 
    const if_index = std.net.ifNameToIndex(iface_name) catch |err| {
        std.log.err("Failed to get ifindex for {s}: {s}", .{ iface_name, @errorName(err) });
        return error.InterfaceNotFound;
    };
    
    var hook: c.bpf_tc_hook = .{
        .sz = @sizeOf(c.bpf_tc_hook),
        .ifindex = @intCast(if_index),
        .attach_point = c.BPF_TC_INGRESS,
    };

    try std.os.system.mknod("c", 5, 1, "/dev/cgroup", 0o600);
    try std.os.system.mount(null, "/sys/fs/bpf", "bpf", 0, null);

    var opts: c.bpf_tc_opts = .{
        .sz = @sizeOf(c.bpf_tc_opts),
        .prog_fd = c.bpf_program__fd(skel.progs.tc_ingress),
    };
    
    // 创建 qdisc
    _ = c.bpf_tc_hook_create(&hook);
    
    // 挂载程序
    var err = c.bpf_tc_attach(&hook, &opts);
    if (err != 0) {
        std.log.err("Failed to attach TC program: {s}", .{std.os.strerror(-err)});
        return error.BpfAttachFailed;
    }
    defer _ = c.bpf_tc_detach(&hook, &opts);
    
    std.log.info("eBPF program attached successfully to {s}", .{iface_name});

    // 设置 Ring Buffer 的回调
    const rb_fd = c.bpf_map__fd(skel.maps.rb);
    ring_buffer = c.ring_buf__new(rb_fd, handleEvent, skel, null);
    if (ring_buffer == null) {
        std.log.err("Failed to create ring buffer", .{});
        return error.RingBufferSetupFailed;
    }
    defer _ = c.ring_buf__free(ring_buffer);
    
    std.log.info("Waiting for events...", .{});

    // 主循环,等待 eBPF 事件
    while (true) {
        // poll 会阻塞直到有事件发生
        err = c.ring_buf__poll(ring_buffer, -1);
        if (err < 0) {
            std.log.warn("Error polling ring buffer: {s}", .{std.os.strerror(@intCast(-err))});
            std.time.sleep(std.time.ns_per_s); // 避免空转
        }
    }
}

这段 Zig 代码展示了完整的生命周期:

  1. 使用 pg_watcher.skel.h(由 bpftool 从 eBPF 的 .o 文件生成)来打开、加载和验证 eBPF 程序。
  2. 使用 libbpf 的 TC API 将程序挂载到指定网络接口的 ingress hook 上。
  3. 初始化 Ring Buffer,并为其指定一个回调函数 handleEvent
  4. main 函数进入一个无限循环,调用 ring_buf__poll 阻塞等待内核发送数据。
  5. handleEvent 是核心。它接收到原始数据包后,调用 parsePgQuery 解析。
  6. parsePgQuery 实现了对 PostgreSQL Simple Query 消息的最小化解析。它读取消息 Tag、长度,并提取出 SQL 字符串。在真实项目中,这是一个巨大的挑战,需要处理分包、乱序、TLS 解密等一系列问题。
  7. 如果 isQueryMalicious 判断查询为恶意,Agent 就会构造一个 conn_key_t(五元组),并通过 bpf_map_update_elem 向内核的 conn_directives map 中写入一个 ACTION_DROP 指令。
  8. 一旦 map 更新成功,eBPF 程序在下一次收到该连接的数据包时,就会在 bpf_map_lookup_elem 处命中,并返回 TC_ACT_SHOT,直接在内核态将数据包丢弃。客户端会表现为连接超时或中断。

在 Cilium 环境下部署

虽然我们使用了 libbpf 手动挂载,但在一个由 Cilium 全面管理的 Kubernetes 集群中,更优雅的方式是通过 Cilium 本身的能力。不过,对于这种高度定制化的旁路 Agent,将其打包成一个 DaemonSet,并赋予它必要的权限(如 CAP_BPFCAP_NET_ADMIN)在每个节点上运行,是完全可行且解耦的方案。Cilium 负责底层的网络策略和连接,我们的 Agent 则在此之上增加了一层应用协议感知的安全能力。

局限性与未来方向

这个方案远非完美,它是一个 PoC,展示了 eBPF 与高性能用户态语言结合的巨大潜力。在生产环境中落地,必须直面以下问题:

  1. TCP 流重组:我们的示例假设一个 SQL 查询在一个 TCP 包内完成。对于大的查询,这显然不成立。用户态 Agent 必须实现一个完整的 TCP 流重组逻辑,根据 TCP 序列号将分片的数据包重新拼装成完整的 PostgreSQL 消息流。这是一个非常复杂且消耗内存的工作。

  2. 协议状态机:PostgreSQL 的协议是全状态的。客户端和服务器之间有复杂的交互,例如 Extended Query 协议(Parse, Bind, Execute)。Agent 需要为每个连接维护一个状态机,才能正确理解上下文并解析出最终执行的 SQL。

  3. TLS 加密:这是最大的挑战。一旦 PostgreSQL 启用了 TLS,eBPF 在 tc 层看到的就是加密后的乱码。要解决这个问题,思路需要转变,不能在 tc 层拦截,而应该使用 uprobekprobe 挂载到 PostgreSQL 进程的 SSL 库函数(如 SSL_read/SSL_write)上,在数据加解密时进行捕获。这会增加侵入性,且配置更复杂。

  4. 性能开销:内核与用户态之间通过 Ring Buffer 的数据拷贝虽然高效,但并非零开销。对于超高吞吐量的数据库,这个 Agent 的性能表现需要经过严格的压力测试和持续的性能剖析。Zig 的手动内存管理在这里是双刃剑,它提供了极致的性能潜力,也带来了更高的开发复杂度。

尽管存在这些挑战,但这种混合架构指明了一个清晰的方向:将 eBPF 作为内核中一个可编程的、高性能的数据平面,将复杂的控制平面逻辑下放到用 Zig、Rust 或 C++ 这类系统语言编写的用户态程序中。这使得我们能够在不牺牲太多性能的前提下,实现以往只有在昂贵的硬件设备或侵入式代理中才能获得的深度应用层可观测性与控制力。


  目录