我们的业务核心系统一直运行在OCI(Oracle Cloud Infrastructure)上的一套高可用PostgreSQL集群上。随着业务量增长,对实时数据的分析需求变得愈发尖锐,但直接在主库上跑复杂的分析SQL已经数次触发了数据库的性能告警,甚至导致过短暂的服务抖动。传统的夜间ETL批处理任务,其小时级的数据延迟,在今天这个决策周期被压缩到分钟级的商业环境下,已经显得力不从心。问题很明确:我们需要一个既能提供近乎实时的数据,又不能对核心交易数据库产生任何性能冲击的分析解决方案。
初步的构想是搭建一个数据复制管道,将生产库的数据变更实时同步到一个专门用于OLAP的数据库中。技术选型上,ClickHouse因其出色的查询性能,成为我们OLAP引擎的不二之C。而连接源和目的地的桥梁,我们选择了基于日志的变更数据捕获(Change Data Capture, CDC)方案,这能从根本上避免对源库的轮询,实现真正的低侵入性。
技术栈决策与权衡
在具体的工具链选择上,我们进行了一些内部讨论和权衡:
- CDC工具: Debezium。它作为一个开源的分布式CDC平台,社区成熟,支持多种数据库,并且能与Kafka Connect无缝集成。相比于自己基于数据库逻辑复制功能编写解析程序,Debezium提供了开箱即用的、生产级的解决方案,包括初始快照、模式演进处理等复杂功能。
- 消息队列: Kafka。这是CDC场景下的标准配置。它提供了一个高吞吐、可持久化的事件流总线,完美地解耦了数据生产者(Debezium)和消费者。虽然OCI提供了托管的Streaming服务,但考虑到我们需要对Kafka版本、分区策略和一些高级参数进行更精细的控制,我们决定在OCI的Compute实例上自建一套Kafka集群。
- 数据消费与写入: 自研Go消费服务。市面上虽然有现成的Kafka-ClickHouse连接器,但我们的场景中有一些特殊的转换逻辑和复杂的错误处理需求。例如,需要根据事件类型(c/u/d)对数据进行预处理,并且需要对写入ClickHouse的批次大小、频率进行动态调整。使用Go语言自研,可以获得极致的性能、内存控制以及逻辑实现的灵活性。
- 可观测性: 复用现有的ELK Stack。公司内部已经有了一套成熟的、用于收集所有应用日志的ELK(Elasticsearch, Logstash, Kibana)平台。为这套新的数据管道再引入一套Prometheus + Grafana体系,会增加运维的复杂性。我们的策略是,让管道中的各个组件(Debezium Connect, Go消费者)产生结构化的JSON日志,然后通过Filebeat采集到Logstash进行解析,最终在Kibana中构建出覆盖整个管道健康状况的监控仪表盘。
整个架构的数据流如下:
graph TD
subgraph OCI
A[PostgreSQL on OCI] -- Logical Replication --> B[Debezium Connector on Kafka Connect VM];
B -- Produces JSON events --> C[Apache Kafka Cluster];
D[Go Consumer Service VM] -- Consumes from --> C;
D -- Batched Inserts --> E[ClickHouse Server];
end
subgraph Observability Plane
B -- Logs via Filebeat --> F[Logstash];
D -- Logs via Filebeal --> F;
F -- Parsed Logs --> G[Elasticsearch];
H[Kibana] -- Queries --> G;
end
style A fill:#cde4ff
style E fill:#cde4ff
style C fill:#ffe4b5
style G fill:#d5f4e6
步骤化实现与陷阱规避
1. 配置源端PostgreSQL
这是所有工作的第一步,也是最基础的一步。Debezium的PostgreSQL连接器依赖于数据库的逻辑解码功能。
首先,必须修改PostgreSQL的配置文件postgresql.conf,确保WAL(Write-Ahead Log)级别足够支持逻辑复制。
# postgresql.conf
# 必须是 logical,replica 或 minimal 都不行
wal_level = logical
# 根据业务量调整,确保WAL日志在Debezium处理前不会被回收
max_wal_senders = 10
max_replication_slots = 10
修改后需要重启数据库实例。接着,创建一个专门用于数据复制的用户,并授予必要的权限。在真实项目中,永远不要使用超级用户权限。
-- 创建一个专用的复制角色
CREATE ROLE debezium_replicator WITH REPLICATION LOGIN PASSWORD 'a_very_secret_password';
-- 授予连接数据库的权限
GRANT CONNECT ON DATABASE my_database TO debezium_replicator;
-- 授予对需要捕获的表的 SELECT 权限
-- Debezium在启动快照时需要读取全表数据
GRANT SELECT ON my_table_1, my_table_2 TO debezium_replicator;
-- 如果使用 pgoutput 插件,还需要授予 usage 权限
GRANT USAGE ON SCHEMA public TO debezium_replicator;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_replicator;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_replicator;
最后,在OCI控制台上,需要检查网络安全组(NSG)或安全列表(Security List)的规则,确保部署Debezium的虚拟机能够访问PostgreSQL实例的5432端口。这是个常见但容易忽略的配置点。
2. 部署Kafka Connect与Debezium
我们在一个OCI Compute实例上使用Docker Compose来管理Kafka、Zookeeper和Kafka Connect,这对于开发和非极端负载的生产环境来说足够方便。
docker-compose.yml:
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
connect:
image: confluentinc/cp-kafka-connect:7.3.0
container_name: connect
depends_on:
- kafka
ports:
- "8083:8083"
volumes:
# 这里映射一个本地目录,用于存放Debezium的插件JAR包
- ./plugins:/etc/kafka-connect/plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: 'connect'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: 'debezium-pg-cluster'
CONNECT_CONFIG_STORAGE_TOPIC: '__connect_configs'
CONNECT_OFFSET_STORAGE_TOPIC: '__connect_offsets'
CONNECT_STATUS_STORAGE_TOPIC: '__connect_status'
CONNECT_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
# 这里的配置很关键,避免了不必要的schema信息,简化下游消费
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'false'
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/plugins'
下载Debezium的PostgreSQL连接器插件并解压到./plugins目录后,启动这套环境。然后,通过REST API向Kafka Connect提交我们的连接器配置。
register-postgres-connector.json:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "your_oci_pg_host",
"database.port": "5432",
"database.user": "debezium_replicator",
"database.password": "a_very_secret_password",
"database.dbname": "my_database",
"database.server.name": "pg-prod-server",
"table.include.list": "public.orders,public.customers",
"plugin.name": "pgoutput",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect",
"tombstones.on.delete": "false",
"message.key.columns": "public.orders:id;public.customers:id",
"heartbeat.interval.ms": "5000"
}
}
遇到的第一个坑:快照性能与锁问题。当我们在一个拥有数亿条记录的大表上启动这个连接器时,snapshot.mode: initial导致Debezium尝试进行全表读取。默认的repeatable_read隔离级别会对表加一个共享锁,虽然不阻塞读,但会阻塞某些DDL操作。更严重的是,快照过程持续了数小时,期间产生的增量WAL日志堆积,可能导致磁盘压力。
解决方案:我们最终采用的策略是“增量快照”(incremental模式,Debezium 1.9+支持)。但对于初次上线,更稳妥的方式是:先通过pg_dump等工具进行数据的手动批量迁移,然后将Debezium的snapshot.mode设置为never,让它只从当前时间点开始消费增量日志。这需要业务方能接受短暂的数据不一致窗口。
3. Go消费者服务的实现细节
这是整个管道的核心逻辑所在。我们的Go服务主要职责包括:
- 使用
sarama库连接Kafka集群并消费消息。 - 解析Debezium输出的JSON消息体。
- 按批次、按表聚合数据。
- 使用
clickhouse-go库将批次数据高效写入ClickHouse。 - 实现优雅停机、错误处理和结构化日志记录。
以下是消费者核心循环的简化代码,展示了设计思路:
package main
import (
"context"
"encoding/json"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/IBM/sarama"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/sirupsen/logrus"
)
// Debezium的JSON消息结构(简化版)
type DebeziumPayload struct {
Op string `json:"op"` // c = create, u = update, d = delete
Before json.RawMessage `json:"before"`
After json.RawMessage `json:"after"`
Source struct {
Table string `json:"table"`
} `json:"source"`
}
// 对应ClickHouse中的orders表结构
type OrderRecord struct {
ID int64 `ch:"id"`
Amount float64 `ch:"amount"`
OrderTime time.Time `ch:"order_time"`
IsDeleted uint8 `ch:"is_deleted"` // 用于标记删除
Version uint64 `ch:"version"` // 用于ReplacingMergeTree
}
const (
batchSize = 1000
flushInterval = 5 * time.Second
)
// ConsumerHandler实现了sarama.ConsumerGroupHandler接口
type ConsumerHandler struct {
chConn clickhouse.Conn
batchData map[string][]interface{}
mutex sync.Mutex
ready chan bool
}
func (h *ConsumerHandler) Setup(sarama.ConsumerGroupSession) error {
h.batchData = make(map[string][]interface{})
close(h.ready)
return nil
}
func (h *ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (h *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ticker := time.NewTicker(flushInterval)
defer ticker.Stop()
for {
select {
case message, ok := <-claim.Messages():
if !ok {
logrus.Info("Message channel closed")
return nil
}
h.processMessage(message)
session.MarkMessage(message, "")
// 达到批次大小立即刷写
if len(h.batchData[message.Topic]) >= batchSize {
h.flush(context.Background())
}
case <-ticker.C:
// 定时刷写
h.flush(context.Background())
case <-session.Context().Done():
// session结束,最后刷写一次
h.flush(context.Background())
return nil
}
}
}
func (h *ConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {
h.mutex.Lock()
defer h.mutex.Unlock()
var payload DebeziumPayload
if err := json.Unmarshal(msg.Value, &payload); err != nil {
logrus.WithFields(logrus.Fields{
"topic": msg.Topic,
"offset": msg.Offset,
"error": err,
}).Error("Failed to unmarshal Debezium payload")
return // 简单跳过,生产环境应发送到死信队列
}
tableName := payload.Source.Table
if tableName != "orders" { // 仅处理orders表
return
}
// 这里是核心的转换逻辑
var record OrderRecord
dataToParse := payload.After
if payload.Op == "d" {
dataToParse = payload.Before
record.IsDeleted = 1
} else {
record.IsDeleted = 0
}
if err := json.Unmarshal(dataToParse, &record); err != nil {
logrus.WithFields(logrus.Fields{
"table": tableName,
"op": payload.Op,
"error": err,
}).Error("Failed to unmarshal record data")
return
}
// 从Debezium的source字段获取LSN或时间戳作为版本号
// record.Version = ... (此处省略解析逻辑)
h.batchData[tableName] = append(h.batchData[tableName], record)
}
func (h *ConsumerHandler) flush(ctx context.Context) {
h.mutex.Lock()
defer h.mutex.Unlock()
for table, records := range h.batchData {
if len(records) == 0 {
continue
}
// 使用类型断言转换回具体类型
orderRecords := make([]OrderRecord, len(records))
for i, r := range records {
orderRecords[i] = r.(OrderRecord)
}
start := time.Now()
err := h.chConn.AsyncInsert(ctx, "INSERT INTO my_database.orders VALUES", orderRecords, false)
logEntry := logrus.WithFields(logrus.Fields{
"table": table,
"batch_size": len(records),
"duration_ms": time.Since(start).Milliseconds(),
})
if err != nil {
logEntry.WithError(err).Error("Failed to insert batch into ClickHouse")
// 生产环境需要有重试和告警机制
} else {
logEntry.Info("Successfully inserted batch into ClickHouse")
}
}
// 清空批次
h.batchData = make(map[string][]interface{})
}
func main() {
// ... (logrus, sarama, clickhouse-go 初始化代码)
// 优雅停机处理
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, _ := sarama.NewConsumerGroup(...)
handler := &ConsumerHandler{chConn: chConn, ready: make(chan bool)}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := client.Consume(ctx, []string{"pg-prod-server.public.orders"}, handler); err != nil {
// ...
}
if ctx.Err() != nil {
return
}
handler.ready = make(chan bool)
}
}()
<-handler.ready // 等待 consumer group 建立
logrus.Info("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
logrus.Info("terminating: context cancelled")
case <-sigterm:
logrus.Info("terminating: via signal")
}
cancel()
wg.Wait()
}
遇到的第二个坑:数据类型映射。PostgreSQL的jsonb,timestamp with time zone,numeric等类型在转为JSON后,再写入ClickHouse时需要小心处理。例如,numeric类型变成了JSON字符串,必须在Go代码中解析为float64或高精度类型。我们为此编写了一个专门的类型转换层,对每个字段进行显式转换和校验,而不是依赖于简单的json.Unmarshal。
4. ClickHouse端的表引擎选择
如何处理上游的UPDATE和DELETE操作,是CDC到OLAP系统中的经典问题。ClickHouse的MergeTree系列引擎是追加写优化的,不支持原地更新。
我们的方案是使用ReplacingMergeTree。这个引擎可以在后台合并数据时,根据排序键保留具有最高版本号的一行数据。
CREATE TABLE my_database.orders (
`id` Int64,
`amount` Float64,
`order_time` DateTime64(6, 'UTC'),
`is_deleted` UInt8,
`version` UInt64 -- 版本字段,可以是事件时间戳或LSN
) ENGINE = ReplacingMergeTree(version)
PARTITION BY toYYYYMM(order_time)
ORDER BY (id, order_time)
SETTINGS index_granularity = 8192;
-
ENGINE = ReplacingMergeTree(version): 指定引擎,并告知ClickHouse用version字段来判断哪条记录是最新的。 -
ORDER BY (id, order_time): 这是去重的关键。只有ORDER BY键相同的记录才会被去重。 -
is_deleted字段: 对于源库的DELETE操作,我们不是在ClickHouse中删除数据,而是插入一条is_deleted = 1的记录。查询时,通过WHERE is_deleted = 0过滤掉已删除的数据。这保留了完整的变更历史,对于某些审计场景很有价值。
**遇到的第三个坑:Too many parts**。在高写入吞吐量下,ClickHouse因为频繁的小批量写入,产生了大量的数据分区(parts),后台合并不过来,导致写入性能下降并报错。
解决方案:调整Go消费者的batchSize和flushInterval。我们将批次大小增加到5000,刷写间隔延长到10秒。这显著减少了写入ClickHouse的频率,使得每次写入的数据量更大,从而降低了parts的数量和合并压力。同时,观察ClickHouse的系统表system.merges和system.parts,可以帮助我们微调这些参数。
5. 在ELK上构建可观测性仪表盘
最后一步,是为这个黑盒般的管道建立一套透明的监控体系。我们在Go消费者中使用了logrus库,并配置其输出为JSON格式。
func init() {
logrus.SetFormatter(&logrus.JSONFormatter{})
logrus.SetOutput(os.Stdout)
logrus.SetLevel(logrus.InfoLevel)
}
这样,每一条日志,无论是常规的信息(如批次写入成功)还是错误,都带有丰富的上下文信息,如table, batch_size, duration_ms。
在部署Go消费者和Kafka Connect的OCI虚拟机上,我们都安装了Filebeat,配置它来收集这些JSON日志并发送到Logstash。
Logstash的配置文件中,我们定义了一个简单的pipeline来解析这些日志:
input {
beats {
port => 5044
}
}
filter {
json {
source => "message"
}
# 可以进一步解析字段,比如把duration_ms转为数字类型
mutate {
convert => { "duration_ms" => "integer" }
convert => { "batch_size" => "integer" }
}
}
output {
elasticsearch {
hosts => ["http://your_es_host:9200"]
index => "data-pipeline-%{+YYYY.MM.dd}"
}
}
在Kibana中,我们创建了一个仪表盘,包含以下关键图表:
- 消息吞吐率: 按表统计,每分钟处理的消息数量。这是一个
Count聚合,按时间直方图展示。 - ClickHouse写入延迟:
Average聚合duration_ms字段,展示P95、P99延迟,帮助发现写入性能瓶颈。 - 批次大小分布:
Average聚合batch_size字段,观察批次是否稳定在我们的目标值附近。 - 错误率:
Filter出level: "error"的日志,按error字段内容进行Terms聚合,可以快速定位主要错误类型。 - Kafka消费者延迟: 这是个难点。由于
sarama库本身不直接暴露lag,我们在Go消费者中启动了一个后台goroutine,定期调用Kafka Admin API获取当前消费组的lag信息,并将其作为一条特殊的metric日志打印出来。这样,ELK也能监控到消费延迟。
最终,这个仪表盘成为了我们排查管道问题的首选工具。任何性能下降、错误增多或数据延迟,都能在这里得到直观的体现。
局限性与未来展望
这套架构稳定运行至今,成功支撑了业务的实时分析需求,同时保护了核心数据库的稳定。但它并非完美,仍然存在一些可以改进的地方。
首要的挑战是模式演进(Schema Evolution)。当前,如果上游PostgreSQL表发生DDL变更(如增减字段),整个管道需要人工干预:暂停Debezium,手动修改ClickHouse表结构,更新Go消费者的代码和struct,然后重启整个流程。这个过程繁琐且容易出错。未来的一个重要迭代方向是引入Schema Registry(如Confluent Schema Registry),让Debezium将表结构变化注册为Avro schema,下游消费者可以动态地解析schema并自动适应,从而实现DDL变更的自动化处理。
其次,对于“毒丸消息”(Poison Pill Messages)的处理机制还比较初级。目前遇到无法解析或处理的消息,我们只是记录错误日志并跳过。一个更健壮的设计是引入死信队列(Dead-Letter Queue, DLQ)。当消息处理失败达到一定重试次数后,自动将其投递到一个专门的DLQ Kafka topic中,以便后续进行人工分析和处理,而不是简单地丢弃。
最后,虽然自研消费者提供了最大的灵活性,但其维护成本也不容忽视。随着ClickHouse社区生态的成熟,未来可以重新评估clickhouse-kafka-connect这类官方或社区维护的Sink Connector。如果其功能和性能能够满足我们的定制化需求,迁移过去可以减少一部分自研代码的维护负担。