构建从OCI SQL到ClickHouse的CDC数据管道及其在ELK上的可观测性实践


我们的业务核心系统一直运行在OCI(Oracle Cloud Infrastructure)上的一套高可用PostgreSQL集群上。随着业务量增长,对实时数据的分析需求变得愈发尖锐,但直接在主库上跑复杂的分析SQL已经数次触发了数据库的性能告警,甚至导致过短暂的服务抖动。传统的夜间ETL批处理任务,其小时级的数据延迟,在今天这个决策周期被压缩到分钟级的商业环境下,已经显得力不从心。问题很明确:我们需要一个既能提供近乎实时的数据,又不能对核心交易数据库产生任何性能冲击的分析解决方案。

初步的构想是搭建一个数据复制管道,将生产库的数据变更实时同步到一个专门用于OLAP的数据库中。技术选型上,ClickHouse因其出色的查询性能,成为我们OLAP引擎的不二之C。而连接源和目的地的桥梁,我们选择了基于日志的变更数据捕获(Change Data Capture, CDC)方案,这能从根本上避免对源库的轮询,实现真正的低侵入性。

技术栈决策与权衡

在具体的工具链选择上,我们进行了一些内部讨论和权衡:

  1. CDC工具: Debezium。它作为一个开源的分布式CDC平台,社区成熟,支持多种数据库,并且能与Kafka Connect无缝集成。相比于自己基于数据库逻辑复制功能编写解析程序,Debezium提供了开箱即用的、生产级的解决方案,包括初始快照、模式演进处理等复杂功能。
  2. 消息队列: Kafka。这是CDC场景下的标准配置。它提供了一个高吞吐、可持久化的事件流总线,完美地解耦了数据生产者(Debezium)和消费者。虽然OCI提供了托管的Streaming服务,但考虑到我们需要对Kafka版本、分区策略和一些高级参数进行更精细的控制,我们决定在OCI的Compute实例上自建一套Kafka集群。
  3. 数据消费与写入: 自研Go消费服务。市面上虽然有现成的Kafka-ClickHouse连接器,但我们的场景中有一些特殊的转换逻辑和复杂的错误处理需求。例如,需要根据事件类型(c/u/d)对数据进行预处理,并且需要对写入ClickHouse的批次大小、频率进行动态调整。使用Go语言自研,可以获得极致的性能、内存控制以及逻辑实现的灵活性。
  4. 可观测性: 复用现有的ELK Stack。公司内部已经有了一套成熟的、用于收集所有应用日志的ELK(Elasticsearch, Logstash, Kibana)平台。为这套新的数据管道再引入一套Prometheus + Grafana体系,会增加运维的复杂性。我们的策略是,让管道中的各个组件(Debezium Connect, Go消费者)产生结构化的JSON日志,然后通过Filebeat采集到Logstash进行解析,最终在Kibana中构建出覆盖整个管道健康状况的监控仪表盘。

整个架构的数据流如下:

graph TD
    subgraph OCI
        A[PostgreSQL on OCI] -- Logical Replication --> B[Debezium Connector on Kafka Connect VM];
        B -- Produces JSON events --> C[Apache Kafka Cluster];
        D[Go Consumer Service VM] -- Consumes from --> C;
        D -- Batched Inserts --> E[ClickHouse Server];
    end

    subgraph Observability Plane
        B -- Logs via Filebeat --> F[Logstash];
        D -- Logs via Filebeal --> F;
        F -- Parsed Logs --> G[Elasticsearch];
        H[Kibana] -- Queries --> G;
    end

    style A fill:#cde4ff
    style E fill:#cde4ff
    style C fill:#ffe4b5
    style G fill:#d5f4e6

步骤化实现与陷阱规避

1. 配置源端PostgreSQL

这是所有工作的第一步,也是最基础的一步。Debezium的PostgreSQL连接器依赖于数据库的逻辑解码功能。

首先,必须修改PostgreSQL的配置文件postgresql.conf,确保WAL(Write-Ahead Log)级别足够支持逻辑复制。

# postgresql.conf
# 必须是 logical,replica 或 minimal 都不行
wal_level = logical

# 根据业务量调整,确保WAL日志在Debezium处理前不会被回收
max_wal_senders = 10
max_replication_slots = 10

修改后需要重启数据库实例。接着,创建一个专门用于数据复制的用户,并授予必要的权限。在真实项目中,永远不要使用超级用户权限。

-- 创建一个专用的复制角色
CREATE ROLE debezium_replicator WITH REPLICATION LOGIN PASSWORD 'a_very_secret_password';

-- 授予连接数据库的权限
GRANT CONNECT ON DATABASE my_database TO debezium_replicator;

-- 授予对需要捕获的表的 SELECT 权限
-- Debezium在启动快照时需要读取全表数据
GRANT SELECT ON my_table_1, my_table_2 TO debezium_replicator;

-- 如果使用 pgoutput 插件,还需要授予 usage 权限
GRANT USAGE ON SCHEMA public TO debezium_replicator;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_replicator;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_replicator;

最后,在OCI控制台上,需要检查网络安全组(NSG)或安全列表(Security List)的规则,确保部署Debezium的虚拟机能够访问PostgreSQL实例的5432端口。这是个常见但容易忽略的配置点。

2. 部署Kafka Connect与Debezium

我们在一个OCI Compute实例上使用Docker Compose来管理Kafka、Zookeeper和Kafka Connect,这对于开发和非极端负载的生产环境来说足够方便。

docker-compose.yml:

version: '3.7'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  connect:
    image: confluentinc/cp-kafka-connect:7.3.0
    container_name: connect
    depends_on:
      - kafka
    ports:
      - "8083:8083"
    volumes:
      # 这里映射一个本地目录,用于存放Debezium的插件JAR包
      - ./plugins:/etc/kafka-connect/plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: 'connect'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: 'debezium-pg-cluster'
      CONNECT_CONFIG_STORAGE_TOPIC: '__connect_configs'
      CONNECT_OFFSET_STORAGE_TOPIC: '__connect_offsets'
      CONNECT_STATUS_STORAGE_TOPIC: '__connect_status'
      CONNECT_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      # 这里的配置很关键,避免了不必要的schema信息,简化下游消费
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'false'
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/plugins'

下载Debezium的PostgreSQL连接器插件并解压到./plugins目录后,启动这套环境。然后,通过REST API向Kafka Connect提交我们的连接器配置。

register-postgres-connector.json:

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "your_oci_pg_host",
    "database.port": "5432",
    "database.user": "debezium_replicator",
    "database.password": "a_very_secret_password",
    "database.dbname": "my_database",
    "database.server.name": "pg-prod-server",
    "table.include.list": "public.orders,public.customers",
    "plugin.name": "pgoutput",
    "snapshot.mode": "initial",
    "decimal.handling.mode": "string",
    "time.precision.mode": "connect",
    "tombstones.on.delete": "false",
    "message.key.columns": "public.orders:id;public.customers:id",
    "heartbeat.interval.ms": "5000"
  }
}

遇到的第一个坑:快照性能与锁问题。当我们在一个拥有数亿条记录的大表上启动这个连接器时,snapshot.mode: initial导致Debezium尝试进行全表读取。默认的repeatable_read隔离级别会对表加一个共享锁,虽然不阻塞读,但会阻塞某些DDL操作。更严重的是,快照过程持续了数小时,期间产生的增量WAL日志堆积,可能导致磁盘压力。

解决方案:我们最终采用的策略是“增量快照”(incremental模式,Debezium 1.9+支持)。但对于初次上线,更稳妥的方式是:先通过pg_dump等工具进行数据的手动批量迁移,然后将Debezium的snapshot.mode设置为never,让它只从当前时间点开始消费增量日志。这需要业务方能接受短暂的数据不一致窗口。

3. Go消费者服务的实现细节

这是整个管道的核心逻辑所在。我们的Go服务主要职责包括:

  1. 使用sarama库连接Kafka集群并消费消息。
  2. 解析Debezium输出的JSON消息体。
  3. 按批次、按表聚合数据。
  4. 使用clickhouse-go库将批次数据高效写入ClickHouse。
  5. 实现优雅停机、错误处理和结构化日志记录。

以下是消费者核心循环的简化代码,展示了设计思路:

package main

import (
	"context"
	"encoding/json"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/IBM/sarama"
	"github.com/ClickHouse/clickhouse-go/v2"
	"github.com/sirupsen/logrus"
)

// Debezium的JSON消息结构(简化版)
type DebeziumPayload struct {
	Op     string          `json:"op"` // c = create, u = update, d = delete
	Before json.RawMessage `json:"before"`
	After  json.RawMessage `json:"after"`
	Source struct {
		Table string `json:"table"`
	} `json:"source"`
}

// 对应ClickHouse中的orders表结构
type OrderRecord struct {
	ID        int64     `ch:"id"`
	Amount    float64   `ch:"amount"`
	OrderTime time.Time `ch:"order_time"`
	IsDeleted uint8     `ch:"is_deleted"` // 用于标记删除
	Version   uint64    `ch:"version"`    // 用于ReplacingMergeTree
}

const (
	batchSize       = 1000
	flushInterval   = 5 * time.Second
)

// ConsumerHandler实现了sarama.ConsumerGroupHandler接口
type ConsumerHandler struct {
	chConn    clickhouse.Conn
	batchData map[string][]interface{}
	mutex     sync.Mutex
	ready     chan bool
}

func (h *ConsumerHandler) Setup(sarama.ConsumerGroupSession) error {
	h.batchData = make(map[string][]interface{})
	close(h.ready)
	return nil
}

func (h *ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

func (h *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	ticker := time.NewTicker(flushInterval)
	defer ticker.Stop()

	for {
		select {
		case message, ok := <-claim.Messages():
			if !ok {
				logrus.Info("Message channel closed")
				return nil
			}
			h.processMessage(message)
			session.MarkMessage(message, "")

			// 达到批次大小立即刷写
			if len(h.batchData[message.Topic]) >= batchSize {
				h.flush(context.Background())
			}

		case <-ticker.C:
			// 定时刷写
			h.flush(context.Background())
		
		case <-session.Context().Done():
			// session结束,最后刷写一次
			h.flush(context.Background())
			return nil
		}
	}
}

func (h *ConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {
    h.mutex.Lock()
    defer h.mutex.Unlock()

	var payload DebeziumPayload
	if err := json.Unmarshal(msg.Value, &payload); err != nil {
		logrus.WithFields(logrus.Fields{
			"topic": msg.Topic,
			"offset": msg.Offset,
			"error": err,
		}).Error("Failed to unmarshal Debezium payload")
		return // 简单跳过,生产环境应发送到死信队列
	}
	
	tableName := payload.Source.Table
	if tableName != "orders" { // 仅处理orders表
		return
	}
	
	// 这里是核心的转换逻辑
	var record OrderRecord
	dataToParse := payload.After
	if payload.Op == "d" {
		dataToParse = payload.Before
		record.IsDeleted = 1
	} else {
		record.IsDeleted = 0
	}
	
	if err := json.Unmarshal(dataToParse, &record); err != nil {
		logrus.WithFields(logrus.Fields{
			"table": tableName,
			"op": payload.Op,
			"error": err,
		}).Error("Failed to unmarshal record data")
		return
	}

    // 从Debezium的source字段获取LSN或时间戳作为版本号
    // record.Version = ... (此处省略解析逻辑)

	h.batchData[tableName] = append(h.batchData[tableName], record)
}


func (h *ConsumerHandler) flush(ctx context.Context) {
	h.mutex.Lock()
	defer h.mutex.Unlock()

	for table, records := range h.batchData {
		if len(records) == 0 {
			continue
		}

		// 使用类型断言转换回具体类型
		orderRecords := make([]OrderRecord, len(records))
		for i, r := range records {
			orderRecords[i] = r.(OrderRecord)
		}

		start := time.Now()
		err := h.chConn.AsyncInsert(ctx, "INSERT INTO my_database.orders VALUES", orderRecords, false)

		logEntry := logrus.WithFields(logrus.Fields{
			"table":    table,
			"batch_size": len(records),
			"duration_ms": time.Since(start).Milliseconds(),
		})

		if err != nil {
			logEntry.WithError(err).Error("Failed to insert batch into ClickHouse")
			// 生产环境需要有重试和告警机制
		} else {
			logEntry.Info("Successfully inserted batch into ClickHouse")
		}
	}
	// 清空批次
	h.batchData = make(map[string][]interface{})
}


func main() {
	// ... (logrus, sarama, clickhouse-go 初始化代码)
	
	// 优雅停机处理
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	client, _ := sarama.NewConsumerGroup(...)
	handler := &ConsumerHandler{chConn: chConn, ready: make(chan bool)}

	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			if err := client.Consume(ctx, []string{"pg-prod-server.public.orders"}, handler); err != nil {
				// ...
			}
			if ctx.Err() != nil {
				return
			}
			handler.ready = make(chan bool)
		}
	}()
	
	<-handler.ready // 等待 consumer group 建立
	logrus.Info("Sarama consumer up and running!...")

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-ctx.Done():
		logrus.Info("terminating: context cancelled")
	case <-sigterm:
		logrus.Info("terminating: via signal")
	}
	cancel()
	wg.Wait()
}

遇到的第二个坑:数据类型映射。PostgreSQL的jsonbtimestamp with time zonenumeric等类型在转为JSON后,再写入ClickHouse时需要小心处理。例如,numeric类型变成了JSON字符串,必须在Go代码中解析为float64或高精度类型。我们为此编写了一个专门的类型转换层,对每个字段进行显式转换和校验,而不是依赖于简单的json.Unmarshal

4. ClickHouse端的表引擎选择

如何处理上游的UPDATEDELETE操作,是CDC到OLAP系统中的经典问题。ClickHouse的MergeTree系列引擎是追加写优化的,不支持原地更新。

我们的方案是使用ReplacingMergeTree。这个引擎可以在后台合并数据时,根据排序键保留具有最高版本号的一行数据。

CREATE TABLE my_database.orders (
    `id` Int64,
    `amount` Float64,
    `order_time` DateTime64(6, 'UTC'),
    `is_deleted` UInt8,
    `version` UInt64 -- 版本字段,可以是事件时间戳或LSN
) ENGINE = ReplacingMergeTree(version)
PARTITION BY toYYYYMM(order_time)
ORDER BY (id, order_time)
SETTINGS index_granularity = 8192;
  • ENGINE = ReplacingMergeTree(version): 指定引擎,并告知ClickHouse用version字段来判断哪条记录是最新的。
  • ORDER BY (id, order_time): 这是去重的关键。只有ORDER BY键相同的记录才会被去重。
  • is_deleted字段: 对于源库的DELETE操作,我们不是在ClickHouse中删除数据,而是插入一条is_deleted = 1的记录。查询时,通过WHERE is_deleted = 0过滤掉已删除的数据。这保留了完整的变更历史,对于某些审计场景很有价值。

**遇到的第三个坑:Too many parts**。在高写入吞吐量下,ClickHouse因为频繁的小批量写入,产生了大量的数据分区(parts),后台合并不过来,导致写入性能下降并报错。

解决方案:调整Go消费者的batchSizeflushInterval。我们将批次大小增加到5000,刷写间隔延长到10秒。这显著减少了写入ClickHouse的频率,使得每次写入的数据量更大,从而降低了parts的数量和合并压力。同时,观察ClickHouse的系统表system.mergessystem.parts,可以帮助我们微调这些参数。

5. 在ELK上构建可观测性仪表盘

最后一步,是为这个黑盒般的管道建立一套透明的监控体系。我们在Go消费者中使用了logrus库,并配置其输出为JSON格式。

func init() {
	logrus.SetFormatter(&logrus.JSONFormatter{})
	logrus.SetOutput(os.Stdout)
	logrus.SetLevel(logrus.InfoLevel)
}

这样,每一条日志,无论是常规的信息(如批次写入成功)还是错误,都带有丰富的上下文信息,如table, batch_size, duration_ms

在部署Go消费者和Kafka Connect的OCI虚拟机上,我们都安装了Filebeat,配置它来收集这些JSON日志并发送到Logstash。

Logstash的配置文件中,我们定义了一个简单的pipeline来解析这些日志:

input {
  beats {
    port => 5044
  }
}

filter {
  json {
    source => "message"
  }
  # 可以进一步解析字段,比如把duration_ms转为数字类型
  mutate {
    convert => { "duration_ms" => "integer" }
    convert => { "batch_size" => "integer" }
  }
}

output {
  elasticsearch {
    hosts => ["http://your_es_host:9200"]
    index => "data-pipeline-%{+YYYY.MM.dd}"
  }
}

在Kibana中,我们创建了一个仪表盘,包含以下关键图表:

  1. 消息吞吐率: 按表统计,每分钟处理的消息数量。这是一个Count聚合,按时间直方图展示。
  2. ClickHouse写入延迟: Average聚合duration_ms字段,展示P95、P99延迟,帮助发现写入性能瓶颈。
  3. 批次大小分布: Average聚合batch_size字段,观察批次是否稳定在我们的目标值附近。
  4. 错误率: Filterlevel: "error"的日志,按error字段内容进行Terms聚合,可以快速定位主要错误类型。
  5. Kafka消费者延迟: 这是个难点。由于sarama库本身不直接暴露lag,我们在Go消费者中启动了一个后台goroutine,定期调用Kafka Admin API获取当前消费组的lag信息,并将其作为一条特殊的metric日志打印出来。这样,ELK也能监控到消费延迟。

最终,这个仪表盘成为了我们排查管道问题的首选工具。任何性能下降、错误增多或数据延迟,都能在这里得到直观的体现。

局限性与未来展望

这套架构稳定运行至今,成功支撑了业务的实时分析需求,同时保护了核心数据库的稳定。但它并非完美,仍然存在一些可以改进的地方。

首要的挑战是模式演进(Schema Evolution)。当前,如果上游PostgreSQL表发生DDL变更(如增减字段),整个管道需要人工干预:暂停Debezium,手动修改ClickHouse表结构,更新Go消费者的代码和struct,然后重启整个流程。这个过程繁琐且容易出错。未来的一个重要迭代方向是引入Schema Registry(如Confluent Schema Registry),让Debezium将表结构变化注册为Avro schema,下游消费者可以动态地解析schema并自动适应,从而实现DDL变更的自动化处理。

其次,对于“毒丸消息”(Poison Pill Messages)的处理机制还比较初级。目前遇到无法解析或处理的消息,我们只是记录错误日志并跳过。一个更健壮的设计是引入死信队列(Dead-Letter Queue, DLQ)。当消息处理失败达到一定重试次数后,自动将其投递到一个专门的DLQ Kafka topic中,以便后续进行人工分析和处理,而不是简单地丢弃。

最后,虽然自研消费者提供了最大的灵活性,但其维护成本也不容忽视。随着ClickHouse社区生态的成熟,未来可以重新评估clickhouse-kafka-connect这类官方或社区维护的Sink Connector。如果其功能和性能能够满足我们的定制化需求,迁移过去可以减少一部分自研代码的维护负担。


  目录