基于HBase为海量WAF日志构建高基数可观测性数据管道


我们负责维护的WAF集群每天要处理近千亿次的请求,随之产生的安全日志体量是惊人的。最初,我们使用一套成熟的ELK Stack来收集和分析这些日志,在业务初期这套方案运行良好。但随着流量增长,尤其是攻击的多样化导致日志字段的基数(Cardinality)爆炸式增长后,Elasticsearch集群开始不堪重重负。单个IP、User-Agent、攻击载荷的唯一值动辄上亿,导致索引体积膨胀、查询性能断崖式下跌,集群维护成了团队的噩梦。痛点非常明确:我们需要一个能从容应对超高写入吞吐和高基数数据的存储与分析方案。

初步构想是彻底放弃在索引层面解决高基数问题的思路,回归到一个更原始但更强大的模型:利用一个对海量写入优化到极致的存储引擎,通过精巧的数据模型设计来满足我们的核心查询需求。技术选型很快聚焦到了HBase。它基于LSM-Tree的架构天然适合写密集型负载,并且其水平扩展能力几乎是无限的。我们决定围绕HBase构建一个全新的WAF日志可观测性管道,彻底取代摇摇欲坠的ELK。

最终的架构设计如下:

graph TD
    subgraph WAF集群
        WAF1
        WAF2
        WAFN[...]
    end

    subgraph 数据管道
        WAF1 -- JSON Log --> KAFKA[Apache Kafka]
        WAF2 -- JSON Log --> KAFKA
        WAFN -- JSON Log --> KAFKA

        KAFKA -- Topic: waf_logs --> INGESTION[Go Ingestion Service]
        INGESTION -- Batch Put --> HBASE[HBase Cluster]
    end

    subgraph 可观测性平台
        QUERY[Query Service API] -- Scan/Get --> HBASE
        DASHBOARD[Grafana / Custom UI] -- HTTP API --> QUERY
        ALERT[Alerting System] -- HTTP API --> QUERY
    end

    subgraph 系统自身监控
        INGESTION -- Metrics --> PROMETHEUS[Prometheus]
        QUERY -- Metrics --> PROMETHEUS
        PROMETHEUS --> ALERTM[AlertManager]
    end

这个方案的核心在于:放弃通用的搜索引擎,转而构建一个针对WAF日志场景高度优化的专用数据服务。这需要我们在HBase数据模型设计、数据写入服务和查询接口层投入大量的开发精力,但收益是获得一个可预测、可扩展且成本可控的系统。

第一步:HBase表模型设计 - 成败的关键

在HBase的世界里,RowKey的设计决定了系统的生死。一个糟糕的RowKey设计会立即导致写入热点(Write Hotspot),让整个集群的性能瓶颈集中在少数几个RegionServer上。WAF日志的核心维度是时间,如果直接使用时间戳作为RowKey前缀,所有新的写入都会集中在表的末端,这是典型的热点问题。

我们的查询场景主要有两种:

  1. 根据时间范围和特定过滤条件(如IP、URI、规则ID)查询日志详情。
  2. 快速检索单条日志(通过唯一的Event ID)。

为了解决写入热点并兼顾查询效率,我们设计的RowKey结构如下:
[Salt(1 Byte)] + [Reversed Timestamp(8 Bytes)] + [Murmur3 Hash of Event ID(4 Bytes)]

  • Salt (盐值): 1字节的随机前缀,范围是0-255。写入时,我们对Event ID或某个高基数字段取模,得到一个Salt值。这样可以将写入请求均匀地散列到256个不同的RowKey前缀范围,从物理上打散数据,彻底避免写入热点。
  • Reversed Timestamp (反转时间戳): Long.MAX_VALUE - timestamp。直接使用时间戳会让新数据追加在表尾,而HBase的Scan操作对于指定起始和结束RowKey效率最高。通过反转时间戳,最新的数据会排在表的最前面。这样,查询最近N小时的日志就变成了一个从表头开始的、非常高效的Prefix Scan操作。
  • Murmur3 Hash of Event ID: 使用事件ID的哈希值作为后缀,确保RowKey的唯一性,同时保持较短的长度。

表结构定义:

  • 表名: waf:logs
  • 列族 (Column Family):
    • d: (data) 存放解析后的、结构化的日志字段,如src_ip, uri, rule_id, status_code等。这些字段将用于过滤查询。
    • r: (raw) 存放原始的JSON格式日志字符串。用于需要查看完整日志详情的场景,通过将不常访问的大字段分离到单独的列族,可以提升对d列族扫描的性能。

预分区(Pre-splitting)也是一个关键步骤。在建表时,我们根据Salt的数量预先创建256个Region,确保集群在启动之初就有足够的并发处理能力。

// HBase Admin API for table creation (conceptual Java code)
// 在真实项目中,这通常通过HBase Shell或自动化脚本完成

TableName tableName = TableName.valueOf("waf:logs");
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);

// Column Family 'd' for structured data
HColumnDescriptor dataFamily = new HColumnDescriptor("d");
dataFamily.setCompressionType(Compression.Algorithm.SNAPPY); // 使用Snappy压缩
dataFamily.setBlockCacheEnabled(true);
dataFamily.setBloomFilterType(BloomType.ROW); // 行级布隆过滤器,加速Get请求
tableDescriptor.addFamily(dataFamily);

// Column Family 'r' for raw log string
HColumnDescriptor rawFamily = new HColumnDescriptor("r");
rawFamily.setCompressionType(Compression.Algorithm.GZ); // GZ压缩率更高,适合冷数据
rawFamily.setBlockCacheEnabled(false); // 原始日志一般不需缓存
tableDescriptor.addFamily(rawFamily);

// Pre-splitting based on Salt
byte[][] splitKeys = new byte[255][];
for (int i = 1; i <= 255; i++) {
    splitKeys[i - 1] = new byte[]{(byte) i};
}

try (Admin admin = connection.getAdmin()) {
    admin.createTable(tableDescriptor, splitKeys);
}

这里的坑在于,列族的设计需要权衡。太多的列族会影响性能,因为每次Flush都会产生多个HFile。两个列族是比较理想的平衡,既实现了冷热数据的分离,又没有引入过多的管理开销。

第二步:构建高吞吐Go语言写入服务

这个服务是数据管道的中枢。它从Kafka消费WAF日志,将其转换为HBase的Put操作,并以高效的批量方式写入HBase。我们选择Go语言是因为其出色的并发性能和强大的生态系统。

我们将使用tsuna/gohbase这个客户端库,因为它提供了异步和批量写入的接口。

ingestion-service/main.go:

package main

import (
	"context"
	"encoding/binary"
	"encoding/json"
	"fmt"
	"hash/murmur3"
	"log"
	"os"
	"os/signal"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/Shopify/sarama"
	"github.com/tsuna/gohbase"
	"github.com/tsuna/gohbase/hrpc"
)

const (
	kafkaBrokers   = "kafka1:9092,kafka2:9092"
	kafkaTopic     = "waf_logs"
	kafkaGroup     = "hbase-ingestion-group"
	hbaseZookeepers = "zk1:2181,zk2:2181"
	hbaseTable     = "waf:logs"
	batchSize      = 2000 // 每次批量写入HBase的记录数
	batchTimeout   = 2 * time.Second // 或者2秒超时
)

// WAFLog represents the structure of our incoming log messages from Kafka.
type WAFLog struct {
	EventID   string `json:"event_id"`
	Timestamp int64  `json:"timestamp"` // Unix Milliseconds
	SrcIP     string `json:"src_ip"`
	URI       string `json:"uri"`
	RuleID    string `json:"rule_id"`
	RawLog    string `json:"raw_log"`
	// ... other fields
}

// LogConsumerGroupHandler implements Sarama's ConsumerGroupHandler interface.
type LogConsumerGroupHandler struct {
	hbaseClient gohbase.Client
	putChan     chan *hrpc.Mutate
	wg          *sync.WaitGroup
}

func main() {
	// --- HBase Client Initialization ---
	// 在生产环境中,配置应来自配置文件或环境变量
	hbaseClient := gohbase.NewClient(hbaseZookeepers)
	defer hbaseClient.Close()

	// --- Kafka Consumer Group Setup ---
	config := sarama.NewConfig()
	config.Version = sarama.V2_8_0_0
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin

	consumerGroup, err := sarama.NewConsumerGroup(strings.Split(kafkaBrokers, ","), kafkaGroup, config)
	if err != nil {
		log.Fatalf("Error creating consumer group client: %v", err)
	}
	defer consumerGroup.Close()

	ctx, cancel := context.WithCancel(context.Background())
	wg := &sync.WaitGroup{}
	
	// Channel to buffer puts for batching
	putChan := make(chan *hrpc.Mutate, batchSize*2)

	handler := &LogConsumerGroupHandler{
		hbaseClient: hbaseClient,
		putChan:     putChan,
		wg:          wg,
	}

	// Start the HBase batch writer goroutine
	wg.Add(1)
	go batchWriter(ctx, hbaseClient, putChan, wg)

	// Start consuming messages
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			if err := consumerGroup.Consume(ctx, []string{kafkaTopic}, handler); err != nil {
				log.Printf("Error from consumer: %v", err)
			}
			if ctx.Err() != nil {
				return
			}
		}
	}()

	log.Println("Ingestion service started. Waiting for signals...")
	
	// Graceful shutdown handling
	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-ctx.Done():
		log.Println("terminating: context cancelled")
	case <-sigterm:
		log.Println("terminating: via signal")
	}
	cancel()
	wg.Wait()
}


// Setup is run at the beginning of a new session, before ConsumeClaim.
func (h *LogConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { return nil }

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited.
func (h *LogConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (h *LogConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		var logEntry WAFLog
		if err := json.Unmarshal(message.Value, &logEntry); err != nil {
			log.Printf("Failed to unmarshal log: %v. Skipping.", err)
			// 在生产中,这里应该发送到死信队列 (Dead Letter Queue)
			session.MarkMessage(message, "")
			continue
		}

		putRequest, err := createPutRequest(&logEntry)
		if err != nil {
			log.Printf("Failed to create Put request for EventID %s: %v", logEntry.EventID, err)
			continue
		}

		h.putChan <- putRequest
		session.MarkMessage(message, "")
	}
	return nil
}

// createPutRequest encapsulates the logic for building the HBase Put request.
func createPutRequest(logEntry *WAFLog) (*hrpc.Mutate, error) {
    // 1. Construct RowKey: [Salt(1)] + [Reversed Timestamp(8)] + [Murmur3(4)]
	
	// Salt: use murmur3 hash of EventID mod 256 for distribution
	salt := byte(murmur3.Sum32([]byte(logEntry.EventID)) % 256)

	// Reversed Timestamp
	reversedTs := ^uint64(logEntry.Timestamp) // Bitwise NOT is a simple way to reverse order
	tsBytes := make([]byte, 8)
	binary.BigEndian.PutUint64(tsBytes, reversedTs)

	// Event ID Hash
	hashBytes := make([]byte, 4)
	binary.BigEndian.PutUint32(hashBytes, murmur3.Sum32([]byte(logEntry.EventID)))

	rowKey := append([]byte{salt}, tsBytes...)
	rowKey = append(rowKey, hashBytes...)

	// 2. Prepare column values
	values := map[string]map[string][]byte{
		"d": { // data column family
			"src_ip":  []byte(logEntry.SrcIP),
			"uri":     []byte(logEntry.URI),
			"rule_id": []byte(logEntry.RuleID),
		},
		"r": { // raw column family
			"log": []byte(logEntry.RawLog),
		},
	}

	put, err := hrpc.NewPut(context.Background(), []byte(hbaseTable), rowKey, values)
	if err != nil {
		return nil, err
	}
	return put, nil
}

// batchWriter collects Puts from the channel and writes them to HBase in batches.
func batchWriter(ctx context.Context, client gohbase.Client, putChan <-chan *hrpc.Mutate, wg *sync.WaitGroup) {
	defer wg.Done()
	
	ticker := time.NewTicker(batchTimeout)
	defer ticker.Stop()

	batch := make([]*hrpc.Mutate, 0, batchSize)

	for {
		select {
		case <-ctx.Done():
			// Final flush before exiting
			if len(batch) > 0 {
				log.Printf("Context cancelled. Flushing final batch of %d records...", len(batch))
				flush(client, batch)
			}
			return
		case put := <-putChan:
			batch = append(batch, put)
			if len(batch) >= batchSize {
				log.Printf("Batch size reached. Flushing %d records...", len(batch))
				flush(client, batch)
				batch = make([]*hrpc.Mutate, 0, batchSize) // Reset batch
				ticker.Reset(batchTimeout) // Reset timer
			}
		case <-ticker.C:
			if len(batch) > 0 {
				log.Printf("Batch timeout. Flushing %d records...", len(batch))
				flush(client, batch)
				batch = make([]*hrpc.Mutate, 0, batchSize) // Reset batch
			}
		}
	}
}

func flush(client gohbase.Client, batch []*hrpc.Mutate) {
	if len(batch) == 0 {
		return
	}

    // gohbase's client is thread-safe, we can send requests concurrently.
    // Here we use a WaitGroup to send all puts in the batch in parallel.
    var wg sync.WaitGroup
	for _, p := range batch {
        wg.Add(1)
        go func(put *hrpc.Mutate) {
            defer wg.Done()
			// 在生产环境中,需要更精细的重试逻辑
            _, err := client.Put(put)
            if err != nil {
                log.Printf("Failed to put record (RowKey: %x): %v", put.Row(), err)
                // TODO: Add to a retry queue or dead-letter queue
            }
        }(p)
    }
    wg.Wait()
}

这个写入服务有几个关键设计点:

  1. 并发消费与批量写入解耦: Kafka消费者将解析后的Put请求放入一个channel,一个独立的batchWriter协程负责从channel中取出并执行批量写入。这避免了HBase写入延迟影响Kafka的消费速度。
  2. 双重触发批量提交: 批量提交由两个条件触发:达到batchSizebatchTimeout超时。这确保了即使在流量较低时,日志也能被及时写入,不会无限期地驻留在内存中。
  3. 优雅停机: 通过监听SIGINTSIGTERM信号,服务在退出前会确保将内存中最后一个批次的数据刷入HBase,避免数据丢失。
  4. 错误处理: 生产级的服务必须有完善的错误处理。对于反序列化失败的日志或HBase写入失败的请求,应记录并推送到一个专门的死信队列(Dead Letter Queue),以便后续排查和重放。

第三步:实现查询服务API

有了数据,还需要提供一个便捷的方式来查询。直接暴露HBase的接口给上游应用是不现实的。我们需要一个中间层的Query Service,它封装HBase的查询逻辑,提供简单的RESTful API。

例如,提供一个API来查询特定IP在某个时间段内的日志:
GET /api/v1/logs?src_ip=1.2.3.4&start_time=...&end_time=...

query-service/main.go:

package main

// ... (imports for http, hbase client, etc.)

type QueryService struct {
	hbaseClient gohbase.Client
}

func (qs *QueryService) handleQueryLogs(w http.ResponseWriter, r *http.Request) {
	// 1. Parse query parameters (src_ip, start_time, end_time, etc.)
	queryParams := r.URL.Query()
	srcIP := queryParams.Get("src_ip")
	startTime, _ := strconv.ParseInt(queryParams.Get("start_time"), 10, 64) // Unix Millis
	endTime, _ := strconv.ParseInt(queryParams.Get("end_time"), 10, 64)     // Unix Millis

	if srcIP == "" || startTime == 0 || endTime == 0 {
		http.Error(w, "Missing required query parameters", http.StatusBadRequest)
		return
	}

	// 2. Construct HBase Scan requests
	// Because of our Salt, we must scan across all 256 possible prefixes.
	// This is a trade-off: we sacrifice some read performance for excellent write distribution.
	
	results := make(chan map[string]string)
	var wg sync.WaitGroup

	for i := 0; i < 256; i++ {
		wg.Add(1)
		go func(salt byte) {
			defer wg.Done()
			
			// StartRow: [salt] + [Reversed end_time]
			reversedEndTs := ^uint64(endTime)
			endTsBytes := make([]byte, 8)
			binary.BigEndian.PutUint64(endTsBytes, reversedEndTs)
			startRow := append([]byte{salt}, endTsBytes...)
			
			// StopRow: [salt] + [Reversed start_time]
			reversedStartTs := ^uint64(startTime)
			startTsBytes := make([]byte, 8)
			binary.BigEndian.PutUint64(startTsBytes, reversedStartTs)
			stopRow := append([]byte{salt}, startTsBytes...)
			
			// Filter: only return rows where column 'd:src_ip' matches the given IP.
			filter := hrpc.NewSingleColumnValueFilter(
				[]byte("d"),
				[]byte("src_ip"),
				hrpc.EQUAL,
				[]byte(srcIP),
				true, // drop row if filter doesn't match
				false,
			)

			scanRequest, _ := hrpc.NewScanRange(r.Context(), []byte(hbaseTable), startRow, stopRow, hrpc.Filters(filter))
			scanner := qs.hbaseClient.Scan(scanRequest)
			
			for {
				res, err := scanner.Next()
				if err == io.EOF {
					break // No more results
				}
				if err != nil {
					log.Printf("Error scanning salt %d: %v", salt, err)
					break
				}
				
				// Convert HBase Result to a map and send to channel
				resultMap := make(map[string]string)
				for _, cell := range res.Cells {
					// Format: "family:qualifier" -> "value"
					key := fmt.Sprintf("%s:%s", string(cell.Family), string(cell.Qualifier))
					resultMap[key] = string(cell.Value)
				}
				results <- resultMap
			}
		}(byte(i))
	}
	
	// Wait for all scanners to finish and close the results channel
	go func() {
		wg.Wait()
		close(results)
	}()
	
	// 3. Stream results back as JSON array
	w.Header().Set("Content-Type", "application/json")
	w.Write([]byte("["))
	first := true
	for res := range results {
		if !first {
			w.Write([]byte(","))
		}
		json.NewEncoder(w).Encode(res)
		first = false
	}
	w.Write([]byte("]"))
}

// ... (main function to start the http server)

这个查询服务的实现揭示了我们为写入性能所做的权衡:

  • 读操作放大 (Read Amplification): 由于我们为了打散热点引入了Salt,一次时间范围的查询必须并发地发起256次Scan操作,每个Scan对应一个Salt前缀。这无疑增加了读取的复杂度。
  • 过滤器下推 (Filter Pushdown): 幸运的是,HBase支持在服务端进行过滤。我们使用SingleColumnValueFiltersrc_ip的匹配逻辑下推到RegionServer执行,避免了将大量无关数据传输到客户端。这是优化查询性能的关键。

系统自身的“可观测性”

我们构建了一个可观测性数据管道,但这个管道本身也必须是可观测的。我们为Ingestion ServiceQuery Service都集成了Prometheus Metrics。
关键监控指标包括:

  • ingestion_kafka_lag: Kafka消费延迟,衡量数据管道的健康度。
  • ingestion_batch_flush_duration_seconds: HBase批量写入的耗时分布,反映HBase集群的写入性能。
  • ingestion_records_processed_total: 处理的日志总数,用于容量规划。
  • query_api_request_latency_seconds: 查询API的P99/P95延迟,是面向用户的核心SLI。
  • query_hbase_scan_count_total: HBase Scan操作次数,反映查询的放大效应。

通过监控这些指标,我们可以建立精细的告警,并在系统出现瓶颈时快速定位问题。

当前方案的局限性与未来展望

这套基于HBase的方案成功解决了我们面临的海量WAF日志写入和高基数查询的难题,但也并非银弹。

  1. 复杂聚合查询的缺失: 当前系统非常适合根据特定条件进行过滤和检索(Key-Value Lookups and Scans),但对于复杂的聚合分析(例如,统计TOP 100的攻击IP)则力不从心。这需要全表扫描,效率极低。未来的一个优化方向是引入二级索引方案(如通过Phoenix)或将聚合任务卸载到Spark等计算引擎。
  2. 运维复杂度: 相比于商业化的日志服务或成熟的ELK Stack,自建HBase管道对团队的运维能力要求更高。Hadoop和HBase生态的组件繁多,需要专门的SRE团队来保障其稳定性。
  3. 冷数据管理: 目前所有数据都存储在HBase中,成本较高。一个长期的优化是实现数据的生命周期管理(TTL),并自动将超过90天的冷数据归档到成本更低的HDFS或对象存储中。

尽管存在这些局限,但通过深入理解业务场景并对基础组件进行深度定制,我们构建了一个在特定问题域上性能和成本效益远超通用方案的系统。这正是架构设计的核心所在:没有最好的架构,只有最合适的取舍。


  目录