基于CDC构建从OLTP到Meilisearch与ClickHouse的双流数据同步架构


一个业务系统发展到一定阶段,几乎总会面临数据读取模式的冲突。用户前端需要亚秒级的、支持模糊匹配与拼写纠错的全文检索;而运营和数据分析团队则需要对数月甚至数年的数据进行复杂的聚合、分组和统计查询,以生成报表。这两种需求,在底层技术实现上是根本对立的。前者要求数据结构为查询优化,后者则为大规模扫描和聚合优化。

将所有压力都施加在单一的关系型数据库(例如MySQL)上,是多数系统初期的选择,但这很快会成为瓶颈。使用LIKE '%keyword%'进行搜索会扼杀数据库性能,而复杂的GROUP BYJOIN查询则会长时间锁定资源,影响在线业务。

方案权衡:从批处理ETL到实时CDC

在架构决策的十字路口,我们通常会评估几种方案。

方案A:传统的夜间ETL批处理

这是最经典的分离方案。我们维护一个主业务库(MySQL/PostgreSQL),然后每天凌晨通过定时任务,将数据抽取(Extract)、转换(Transform)、加载(Load)到专门的搜索引擎(如Elasticsearch)和数据仓库(如Hive或早期的ClickHouse实现)。

  • 优势:
    • 架构清晰,技术成熟,易于理解和实现。
    • 对主业务库的影响集中在低峰期,日间业务稳定。
  • 劣势:
    • 数据延迟是致命的。 T+1的延迟意味着用户当天发布的内容无法被搜到,最新的业务数据无法反映在分析报表中。在当今的商业环境中,这种延迟往往是不可接受的。
    • ETL任务的脆弱性。 批处理窗口有限,一旦数据量增长导致ETL任务超时,或者任务中途失败,数据恢复和重跑的流程会非常痛苦。
    • 资源浪费。 无论数据是否有变更,ETL任务都可能需要全量或大范围扫描,造成了计算和IO资源的峰值浪费。

方案B:应用层双写

为了解决数据延迟问题,一个直接的想法是在应用代码中进行“双写”。即当PHP代码完成对主数据库的写操作后,再同步或异步地调用Meilisearch和ClickHouse的API写入数据。

  • 优势:
    • 实现了数据的实时同步,延迟极低。
  • 劣劣势:
    • 强耦合与业务侵入。 数据同步逻辑与业务逻辑紧密耦合在PHP代码中。每次新增或修改一个需要同步的数据模型,都需要修改业务代码,违反了单一职责原则。
    • 一致性难题。 在分布式系统中,保证两个或多个独立系统的写入原子性是极其困难的。如果主数据库写入成功,但写入Meilisearch失败了怎么办?引入分布式事务(如TCC或Saga模式)会极大地增加系统复杂度和维护成本,对于一个数据同步场景来说,得不偿失。
    • 可靠性差。 如果消息队列(用于异步写入)出现故障,或者下游系统(Meilisearch/ClickHouse)暂时不可用,数据就会丢失,需要复杂的对账和补偿机制。

最终选择:基于CDC的实时数据流架构

Change Data Capture (CDC) 是一种更为优雅的解决方案。它通过捕获源数据库的底层日志(如MySQL的Binlog),将数据变更(INSERT, UPDATE, DELETE)实时地转化为事件流。这个过程对上层业务应用完全透明,实现了底层数据与上层业务的彻底解耦。

我们选择的架构如下:

flowchart TD
    subgraph "Docker Environment"
        subgraph "Write Model (OLTP)"
            A[PHP Application] -->|Writes| B(MySQL Database)
        end

        subgraph "CDC Pipeline"
            B -- Binlog --> C{Debezium Connector for MySQL}
            C -- JSON Events --> D[Kafka Topic: mysql.products]
        end

        subgraph "Read Models (CQRS Queries)"
            subgraph "Search Service"
                E[PHP Consumer] -- Consumes --> D
                E -->|Index Documents| F(Meilisearch)
            end
            subgraph "Analytics Service"
                 E -->|Insert Rows| G(ClickHouse)
            end
            H[User Search] --> F
            I[BI/Analytics] --> G
        end
    end

这个架构的核心优势在于:

  1. 解耦: PHP应用只关心核心业务逻辑和主数据库的事务完整性。它完全不知道下游还有Meilisearch和ClickHouse的存在。
  2. 实时性: 数据变更在毫秒级别内被捕获并广播出去,下游系统可以在亚秒级内完成同步。
  3. 可靠性: 依赖于数据库成熟的日志系统和Kafka这类高可用的消息队列,数据变更事件被持久化,即使下游消费端宕机,恢复后也能从上次的位置继续消费,保证了“至少一次”的交付。
  4. 扩展性: 未来如果需要增加新的数据消费者(例如一个实时缓存系统),只需增加一个新的消费者组来订阅相关的Kafka Topic即可,无需对现有系统做任何改动。

核心实现:从环境搭建到消费逻辑

在真实项目中,这套体系的搭建需要精确的配置和健壮的代码。我们将使用Docker Compose来编排整个环境,这对于开发和部署都极为方便。

1. Docker Compose 环境编排 (docker-compose.yml)

这是整个系统的骨架。它定义了我们需要的所有服务、它们之间的网络关系以及数据持久化配置。

# docker-compose.yml
version: '3.8'

services:
  mysql:
    image: mysql:8.0
    container_name: mysql_master
    restart: always
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
      MYSQL_DATABASE: my_app
    volumes:
      - mysql_data:/var/lib/mysql
      - ./mysql/my.cnf:/etc/mysql/conf.d/my.cnf # 关键:挂载配置文件
    networks:
      - data_pipeline_net

  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    restart: always
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - data_pipeline_net

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    restart: always
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    networks:
      - data_pipeline_net

  debezium-connect:
    image: debezium/connect:2.1
    container_name: debezium_connect
    restart: always
    depends_on:
      - kafka
      - mysql
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses
      # Debezium需要这些Topic存在,且副本因子为1
      CONFIG_STORAGE_REPLICATION_FACTOR: 1
      OFFSET_STORAGE_REPLICATION_FACTOR: 1
      STATUS_STORAGE_REPLICATION_FACTOR: 1
    networks:
      - data_pipeline_net

  meilisearch:
    image: getmeili/meilisearch:v1.3.0
    container_name: meilisearch
    restart: always
    ports:
      - "7700:7700"
    environment:
      MEILI_MASTER_KEY: 'aMasterKey'
      MEILI_NO_ANALYTICS: 'true'
    volumes:
      - meili_data:/meili_data
    networks:
      - data_pipeline_net

  clickhouse:
    image: clickhouse/clickhouse-server:23.8
    container_name: clickhouse
    restart: always
    ports:
      - "8123:8123"
      - "9000:9000"
    ulimits:
      nproc: 65535
      nofile:
        soft: 262144
        hard: 262144
    volumes:
      - clickhouse_data:/var/lib/clickhouse
    networks:
      - data_pipeline_net

  php-consumer:
    build:
      context: ./php-consumer
    container_name: php_consumer
    restart: always
    depends_on:
      - kafka
      - meilisearch
      - clickhouse
    volumes:
      - ./php-consumer/src:/app/src
    networks:
      - data_pipeline_net

networks:
  data_pipeline_net:
    driver: bridge

volumes:
  mysql_data:
  meili_data:
  clickhouse_data:

关键配置解释:

  • mysql/my.cnf: 必须在这里启用Binlog,并设置为ROW格式,这是Debezium工作的基础。
    [mysqld]
    server-id=1
    log-bin=mysql-bin
    binlog_format=ROW
    binlog_row_image=full
  • debezium-connect: Debezium作为一个Kafka Connect插件运行。我们通过其REST API (8083端口) 来部署和管理我们的MySQL CDC连接器。

2. 配置并启动Debezium MySQL连接器

环境启动后 (docker-compose up -d),我们需要向Debezium的API发送一个POST请求来创建连接器。

debezium-mysql-connector.json:

{
  "name": "mysql-inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "rootpassword",
    "database.server.id": "100",
    "database.server.name": "dbserver1",
    "database.include.list": "my_app",
    "table.include.list": "my_app.products",
    "database.history.kafka.bootstrap.servers": "kafka:29092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "include.schema.changes": "true",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false"
  }
}

通过curl部署它:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d @debezium-mysql-connector.json

此刻,任何对my_app.products表的操作都会被捕获,并以JSON格式发送到Kafka的dbserver1.my_app.products Topic中。

3. ClickHouse与Meilisearch的准备

在ClickHouse中,我们需要创建目标表。这里使用ReplacingMergeTree引擎是一个不错的选择,它可以在后台根据主键自动去重合并,处理UPDATE操作非常方便。

CREATE TABLE my_app.products (
    id UInt64,
    name String,
    description String,
    price Decimal(10, 2),
    stock_quantity Int32,
    created_at DateTime,
    updated_at DateTime,
    _version UInt64,
    _is_deleted UInt8
) ENGINE = ReplacingMergeTree(_version)
PARTITION BY toYYYYMM(created_at)
ORDER BY id;
  • _version: 用于ReplacingMergeTree判断哪条记录是“最新”的。我们将使用Debezium事件中的ts_ms(事件时间戳)作为版本。
  • _is_deleted: 用于处理逻辑删除。CDC的DELETE事件会转化为一条_is_deleted=1的记录。

对于Meilisearch,它不需要预定义schema,我们可以在第一次写入数据时自动创建索引。

4. PHP消费者核心逻辑 (php-consumer/src/consume.php)

这是将所有组件粘合在一起的地方。我们将使用一个健壮的PHP Kafka客户端库(例如 php-rdkafka)和一个循环逻辑来持续消费数据。

<?php
// consume.php
require __DIR__ . '/../vendor/autoload.php';

// --- 配置 ---
$kafkaBroker = 'kafka:29092';
$kafkaTopic = 'dbserver1.my_app.products';
$kafkaGroup = 'php-dual-sync-consumer-group';
$meiliHost = 'http://meilisearch:7700';
$meiliApiKey = 'aMasterKey';
$clickhouseConfig = [
    'host' => 'clickhouse',
    'port' => '8123',
    'username' => 'default',
    'password' => '',
    'database' => 'my_app',
];

// --- 初始化客户端 ---
$logger = new Monolog\Logger('consumer');
$logger->pushHandler(new Monolog\Handler\StreamHandler('php://stdout'));

try {
    $meiliClient = new MeiliSearch\Client($meiliHost, $meiliApiKey);
    $clickhouseClient = new ClickHouseDB\Client($clickhouseConfig);
    $clickhouseClient->database('my_app');
} catch (\Exception $e) {
    $logger->error("Failed to initialize clients: " . $e->getMessage());
    exit(1);
}

$conf = new RdKafka\Conf();
$conf->set('group.id', $kafkaGroup);
$conf->set('metadata.broker.list', $kafkaBroker);
$conf->set('auto.offset.reset', 'earliest');
$conf->set('enable.auto.commit', 'false'); // 手动控制offset提交,保证处理成功

$consumer = new RdKafka\KafkaConsumer($conf);
$consumer->subscribe([$kafkaTopic]);

$logger->info("Consumer started. Waiting for messages...");

while (true) {
    $message = $consumer->consume(120 * 1000); // 120s timeout
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            processMessage($message, $logger, $meiliClient, $clickhouseClient);
            // 成功处理后,手动提交offset
            $consumer->commit($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            $logger->info('No more messages; will wait for more...');
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            $logger->info('Timed out.');
            break;
        default:
            $logger->error("Kafka error: " . $message->errstr());
            // 对于可重试的错误,可以加入重试逻辑
            break;
    }
}

function processMessage(RdKafka\Message $message, $logger, $meiliClient, $clickhouseClient)
{
    $payload = json_decode($message->payload, true);
    if (!$payload || !isset($payload['payload'])) {
        $logger->warning("Invalid message format received.", ['payload' => $message->payload]);
        return;
    }

    $event = $payload['payload'];
    $op = $event['op']; // 'c' for create, 'u' for update, 'r' for read (snapshot), 'd' for delete

    // 对于Create/Update/Read, 数据在 'after' 字段
    if (in_array($op, ['c', 'u', 'r'])) {
        $data = $event['after'];
        handleUpsert($data, $event['ts_ms'], $logger, $meiliClient, $clickhouseClient);
    } elseif ($op === 'd') {
        $data = $event['before'];
        handleDelete($data, $event['ts_ms'], $logger, $meiliClient, $clickhouseClient);
    } else {
        $logger->info("Skipping unhandled operation: " . $op);
    }
}

function handleUpsert(array $data, int $timestamp, $logger, $meiliClient, $clickhouseClient)
{
    $productId = $data['id'];
    $logger->info("Processing UPSERT for product ID: {$productId}");

    // 1. 同步到 Meilisearch (Add or Replace Document)
    try {
        // 生产级代码需要对数据进行清洗和转换
        $meiliDocument = [
            'id' => $data['id'],
            'name' => $data['name'],
            'description' => $data['description'],
            'price' => (float)$data['price'],
            'created_at' => strtotime($data['created_at'])
        ];
        $meiliClient->index('products')->addDocuments([$meiliDocument], 'id');
    } catch (\Exception $e) {
        $logger->error("Failed to sync to Meilisearch for ID {$productId}: " . $e->getMessage());
        // 抛出异常以触发重试机制或告警
        throw $e;
    }

    // 2. 同步到 ClickHouse (INSERT a new version)
    try {
        $chData = [
            'id' => $data['id'],
            'name' => $data['name'],
            'description' => $data['description'],
            'price' => (float)$data['price'],
            'stock_quantity' => (int)$data['stock_quantity'],
            'created_at' => date('Y-m-d H:i:s', (int)($data['created_at'] / 1000)),
            'updated_at' => date('Y-m-d H:i:s', (int)($data['updated_at'] / 1000)),
            '_version' => $timestamp,
            '_is_deleted' => 0
        ];
        $clickhouseClient->insert('products', [$chData], array_keys($chData));
    } catch (\Exception $e) {
        $logger->error("Failed to sync to ClickHouse for ID {$productId}: " . $e->getMessage());
        throw $e;
    }
    
    $logger->info("Successfully processed UPSERT for product ID: {$productId}");
}

function handleDelete(array $data, int $timestamp, $logger, $meiliClient, $clickhouseClient)
{
    $productId = $data['id'];
    $logger->info("Processing DELETE for product ID: {$productId}");
    
    // 1. 从 Meilisearch 删除文档
    try {
        $meiliClient->index('products')->deleteDocument($productId);
    } catch (\Exception $e) {
        // 如果文档不存在,Meilisearch可能会报错,需要优雅处理
        if ($e->getCode() !== 404) {
             $logger->error("Failed to delete from Meilisearch for ID {$productId}: " . $e->getMessage());
             throw $e;
        }
    }

    // 2. 在 ClickHouse 中插入一条删除标记记录
    try {
        $chData = [
            'id' => $productId,
            'name' => '', // 填充默认值
            'description' => '',
            'price' => 0.0,
            'stock_quantity' => 0,
            'created_at' => '1970-01-01 00:00:00',
            'updated_at' => '1970-01-01 00:00:00',
            '_version' => $timestamp,
            '_is_deleted' => 1
        ];
        $clickhouseClient->insert('products', [$chData], array_keys($chData));
    } catch (\Exception $e) {
        $logger->error("Failed to sync delete marker to ClickHouse for ID {$productId}: " . $e->getMessage());
        throw $e;
    }

    $logger->info("Successfully processed DELETE for product ID: {$productId}");
}

代码中的生产级考量:

  • 手动Offset提交: enable.auto.commit 设置为 false,我们在processMessage成功执行后才调用$consumer->commit()。这确保了即使消费者进程崩溃,未被成功处理的消息也会在重启后被重新消费,实现了“至少一次处理”的语义。
  • 幂等性处理: Meilisearch的addDocuments指定了主键id,所以重复执行是安全的(覆盖)。ClickHouse的ReplacingMergeTree引擎基于_version字段保证了幂等性,重复插入相同版本的数据不会产生副作用。
  • 错误处理: 对客户端的调用都包裹在try-catch块中。在真实项目中,这里应该集成更复杂的重试逻辑(例如使用指数退避算法)和告警系统。
  • 数据转换: 代码中包含了数据类型转换和格式化,例如将MySQL的时间戳转换为Meilisearch的Unix timestamp和ClickHouse的DateTime字符串。这是同步管道中的常见工作。

架构的扩展性与局限性

这套架构并非银弹,它有自己的适用边界和运维挑战。

扩展性:

  • 新增数据消费者: 如前所述,添加新的下游系统非常容易。比如,我们可以再增加一个消费者,将商品价格变更事件推送到Redis,用于缓存更新。
  • 服务拆分: 随着业务发展,php-consumer可以被拆分为多个更专注的服务。例如,一个专门处理Meilisearch同步,另一个处理ClickHouse同步,它们消费同一个Kafka Topic,互不影响。

局限性与挑战:

  • 运维复杂性: 相比单体数据库,我们引入了Zookeeper, Kafka, Debezium Connect等多个中间件。这要求团队具备相应的监控、排障和运维能力。
  • 最终一致性: 这是一个最终一致的系统。从数据写入MySQL到最终在Meilisearch和ClickHouse中可查询,存在一个(通常很短的)延迟窗口。业务设计必须能够容忍这种短暂的不一致。例如,一个刚刚创建的订单可能在几百毫秒内无法在后台分析系统中查到。
  • Schema变更: 数据库表的Schema变更(如增删字段)需要谨慎处理。Debezium可以捕获DDL变更并将其发布到database.history.kafka.topic。消费端需要有相应的逻辑来处理这些变更,例如动态调整ClickHouse的表结构,否则可能导致消费中断。这是一个高级话题,需要周密的计划和自动化脚本来配合。
  • 快照(Snapshotting)开销: 当Debezium连接器首次启动或因故重启时,它可能会对监控的表进行一次全量快照,以保证基线数据的一致性。对于非常大的表,这个初始快照过程可能会消耗大量时间和IO资源。需要合理规划连接器的启动时机。

这套基于CDC的架构,通过将不同职责的读写任务分离到最适合的专用系统中,解决了单一数据库的性能瓶颈,同时保证了数据的低延迟同步,为上层应用的性能和功能扩展提供了坚实的基础。


  目录