基于 Zookeeper 实现 Caddy OIDC 配置动态化与全链路可观测性


我们面临一个棘手的运维难题:一个多租户 PaaS 平台,承载着数百个独立的客户应用。每个租户都需要通过他们自己的身份提供商(IdP)使用 OpenID Connect (OIDC) 进行单点登录。最初的方案是在反向代理层(早期使用 Nginx)为每个租户手写一套 OIDC 配置。这在租户数量少的时候尚可忍受,但随着业务扩展,问题暴露无遗:

  1. 配置爆炸:每个新租户的入驻或配置变更,都意味着一次人工修改配置文件。
  2. 重启风险:Nginx 的配置变更通常需要 reload,在高流量下这并非零风险操作。
  3. 效率低下:从商务签单到租户上线,配置 OIDC 成了流程中的一个明显瓶颈,依赖人工,容易出错。

我们需要一个能自动响应租户变化的、无需重启服务即可动态更新认证逻辑的网关。初步构想是建立一个由配置中心驱动的动态反向代理。当配置中心内的租户信息发生变更时,网关能实时感知并应用新的认证规则。

技术选型决策过程很直接。对于反向代理,Caddy 是不二之选,其原生支持通过 API 动态更新全量配置,彻底告别了传统 reload 模式的弊端。对于配置中心,我们选择了 Apache Zookeeper。虽然 etcd 或 Consul 也是备选项,但团队对 Zookeeper 的运维经验更为丰富,其稳定性和 Watcher 机制非常符合我们的需求:我们需要的是可靠的、基于事件推送的配置变更通知,而不是低效的轮询。OpenTelemetry 则作为标准,嵌入到整个系统的血液中,确保在这样一个动态、自动化的系统中,我们对每一个环节都有着清晰的洞察力。

最终的架构蓝图如下:

graph TD
    subgraph "管理平面"
        AdminUI[管理员/自动化脚本] -- "写入/更新租户OIDC配置" --> ZK[Zookeeper];
    end

    subgraph "数据平面"
        User[用户] -- "1. 访问 a.example.com" --> Caddy[Caddy Gateway];
        Caddy -- "2. OIDC 认证流程" --> IdP_A[租户A的IdP];
        User -- "3. 访问 b.example.com" --> Caddy;
        Caddy -- "4. OIDC 认证流程" --> IdP_B[租户B的IdP];
    end
    
    subgraph "控制平面"
        Controller[配置同步控制器] -- "3. Watch 节点变化" --> ZK;
        Controller -- "4. 生成Caddy JSON配置" --> Controller;
        Controller -- "5. POST /load" --> CaddyAPI[Caddy Admin API];
    end

    subgraph "可观测性平面"
        Controller -- "发送 Traces/Metrics" --> OTel[OpenTelemetry Collector];
        Caddy -- "发送 Traces/Metrics" --> OTel;
        OTel -- "导出" --> Backend[Jaeger/Prometheus];
    end

    CaddyAPI -- "6. 热加载配置" --> Caddy;

Zookeeper 中的数据结构设计

为了让控制器能够理解和转换配置,我们首先要定义在 Zookeeper 中的数据模型。我们将所有租户的配置信息存放在一个主路径 /gateway/tenants 下,每个租户使用其唯一的 tenant_id 作为 ZNode 名称。

每个租户的 ZNode (/gateway/tenants/{tenant_id}) 存储的是一个 JSON 字符串,包含了构建 Caddy OIDC 路由所需的所有信息。

一个典型的租户配置 ZNode 数据如下:

/gateway/tenants/tenant-alpha

{
  "domain": "tenant-alpha.myapplication.com",
  "upstream": "http://tenant-alpha-service:8080",
  "oidc": {
    "issuer": "https://auth.tenant-alpha.com/realms/master",
    "client_id": "caddy-gateway",
    "client_secret": "a_very_secret_string_for_alpha",
    "scopes": ["openid", "profile", "email"]
  }
}

/gateway/tenants/tenant-beta

{
  "domain": "tenant-beta.myapplication.com",
  "upstream": "http://tenant-beta-service:8080",
  "oidc": {
    "issuer": "https://login.microsoftonline.com/tenant-beta-guid/v2.0",
    "client_id": "some_guid_client_id",
    "client_secret": "another_secret_string_for_beta",
    "scopes": ["openid", "profile", "email", "offline_access"]
  }
}

这种结构清晰、易于管理,并且控制器可以通过监听 /gateway/tenants 的子节点变化来感知租户的增删,同时监听每个具体租户 ZNode 的数据变化来感知配置更新。

核心:配置同步控制器 (Go 实现)

控制器是整个动态系统的中枢神经。它负责连接 Zookeeper,监听配置变更,将这些变更翻译成 Caddy 能理解的 JSON 配置,并通过 Caddy 的 Admin API 推送过去。我们选择 Go 来实现这个控制器,因为它在并发处理和网络编程方面表现出色。

下面是控制器的核心代码,包含了 Zookeeper 连接、Watcher 机制、OpenTelemetry 埋点和 Caddy 配置生成逻辑。

main.go:

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/go-zookeeper/zk"
	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
	"go.opentelemetry.io/otel/trace"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

const (
	zkRootPath      = "/gateway/tenants"
	caddyAdminAPI   = "http://localhost:2019/load"
	serviceName     = "caddy-zk-controller"
	serviceVersion  = "1.0.0"
)

var tracer trace.Tracer

// TenantConfig mirrors the JSON structure in Zookeeper
type TenantConfig struct {
	Domain   string `json:"domain"`
	Upstream string `json:"upstream"`
	OIDC     struct {
		Issuer       string   `json:"issuer"`
		ClientID     string   `json:"client_id"`
		ClientSecret string   `json:"client_secret"`
		Scopes       []string `json:"scopes"`
	} `json:"oidc"`
}

// initTracer initializes the OpenTelemetry tracer
func initTracer() (*sdktrace.TracerProvider, error) {
	ctx := context.Background()

	// OTLP exporter setup (assumes OTel Collector is running on localhost:4317)
	otelAgentAddr, ok := os.LookupEnv("OTEL_EXPORTER_OTLP_ENDPOINT")
	if !ok {
		otelAgentAddr = "0.0.0.0:4317"
	}

	conn, err := grpc.DialContext(ctx, otelAgentAddr,
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithBlock(),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
	}

	traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
	if err != nil {
		return nil, fmt.Errorf("failed to create trace exporter: %w", err)
	}

	res, err := resource.New(ctx,
		resource.WithAttributes(
			semconv.ServiceName(serviceName),
			semconv.ServiceVersion(serviceVersion),
		),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to create resource: %w", err)
	}

	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(traceExporter),
		sdktrace.WithResource(res),
	)
	otel.SetTracerProvider(tp)
	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
	tracer = otel.Tracer(serviceName)
	return tp, nil
}

func main() {
	log.Println("Starting Caddy ZK Controller...")

	tp, err := initTracer()
	if err != nil {
		log.Fatalf("Failed to initialize OpenTelemetry tracer: %v", err)
	}
	defer func() {
		if err := tp.Shutdown(context.Background()); err != nil {
			log.Printf("Error shutting down tracer provider: %v", err)
		}
	}()

	zkServers := []string{"127.0.0.1:2181"}
	conn, _, err := zk.Connect(zkServers, 5*time.Second)
	if err != nil {
		log.Fatalf("Failed to connect to Zookeeper: %v", err)
	}
	defer conn.Close()

	// Initial sync and setup watcher
	if err := syncAndWatch(context.Background(), conn); err != nil {
		log.Fatalf("Initial sync failed: %v", err)
	}

	// Keep the process alive
	select {}
}

// syncAndWatch performs a full sync and sets up ZK watchers.
// This function is the core logic loop.
func syncAndWatch(ctx context.Context, conn *zk.Conn) error {
	ctx, span := tracer.Start(ctx, "syncAndWatch")
	defer span.End()

	// Ensure the root path exists, or the watch will fail.
	// In a real project, this might be created by an external provisioning script.
	exists, _, err := conn.Exists(zkRootPath)
	if err != nil {
		span.RecordError(err)
		return fmt.Errorf("failed to check root path existence: %w", err)
	}
	if !exists {
		_, err = conn.Create(zkRootPath, []byte{}, 0, zk.WorldACL(zk.PermAll))
		if err != nil && err != zk.ErrNodeExists {
			span.RecordError(err)
			return fmt.Errorf("failed to create root path: %w", err)
		}
	}

	// Watch for changes in the list of tenants (children of the root path)
	tenants, _, watchChan, err := conn.ChildrenW(zkRootPath)
	if err != nil {
		span.RecordError(err)
		return fmt.Errorf("failed to get children and set watch: %w", err)
	}

	log.Printf("Found %d initial tenants. Performing initial sync.", len(tenants))
	span.SetAttributes(attribute.Int("tenants.initial_count", len(tenants)))

	// Perform the initial full configuration sync
	if err := generateAndPushCaddyConfig(ctx, conn, tenants); err != nil {
		span.RecordError(err)
		return fmt.Errorf("initial config push failed: %w", err)
	}

	// Start the watcher loop in a separate goroutine
	go func() {
		for {
			event := <-watchChan
			// A production system needs more robust error handling and backoff here.
			if event.Err != nil {
				log.Printf("Watcher error: %v. Attempting to re-establish watch.", event.Err)
				time.Sleep(5 * time.Second)
				// Re-call syncAndWatch to reset watches
				if err := syncAndWatch(context.Background(), conn); err != nil {
					log.Printf("Failed to re-establish watch: %v", err)
				}
				return // Exit this goroutine, a new one will be created.
			}

			// Trace this event processing
			ctx, eventSpan := tracer.Start(context.Background(), "Zookeeper Event Received")
			eventSpan.SetAttributes(
				attribute.String("zookeeper.event.type", event.Type.String()),
				attribute.String("zookeeper.event.path", event.Path),
			)
			log.Printf("Zookeeper event received: %+v. Triggering resync.", event)
			
			// Any change to children triggers a full resync.
			// More advanced logic could inspect the event type for optimization.
			newTenants, _, newWatchChan, err := conn.ChildrenW(zkRootPath)
			if err != nil {
				log.Printf("Error getting new children list: %v", err)
				eventSpan.RecordError(err)
				eventSpan.End()
				continue // Try again on the next event
			}
			watchChan = newWatchChan // Update the channel for the next event

			log.Printf("Tenant list changed. Found %d tenants.", len(newTenants))
			eventSpan.SetAttributes(attribute.Int("tenants.new_count", len(newTenants)))

			if err := generateAndPushCaddyConfig(ctx, conn, newTenants); err != nil {
				log.Printf("Failed to push updated Caddy config: %v", err)
				eventSpan.RecordError(err)
			}
			eventSpan.End()
		}
	}()

	return nil
}

// generateAndPushCaddyConfig fetches all tenant configs and pushes a complete configuration to Caddy.
func generateAndPushCaddyConfig(ctx context.Context, conn *zk.Conn, tenants []string) error {
	ctx, span := tracer.Start(ctx, "generateAndPushCaddyConfig", trace.WithAttributes(attribute.Int("tenants.count", len(tenants))))
	defer span.End()

	var tenantConfigs []TenantConfig
	var wg sync.WaitGroup
	var mu sync.Mutex
	
	fetchErrors := make(chan error, len(tenants))

	for _, tenantID := range tenants {
		wg.Add(1)
		go func(id string) {
			defer wg.Done()
			path := zkRootPath + "/" + id
			
			// Start a span for fetching individual tenant config
			_, fetchSpan := tracer.Start(ctx, "fetchTenantConfig", trace.WithAttributes(attribute.String("tenant.id", id)))
			defer fetchSpan.End()
			
			data, _, err := conn.Get(path)
			if err != nil {
				log.Printf("Error fetching config for tenant %s: %v", id, err)
				fetchSpan.RecordError(err)
				fetchErrors <- fmt.Errorf("tenant %s: %w", id, err)
				return
			}

			var config TenantConfig
			if err := json.Unmarshal(data, &config); err != nil {
				log.Printf("Error unmarshalling config for tenant %s: %v", id, err)
				fetchSpan.RecordError(err)
				fetchErrors <- fmt.Errorf("tenant %s json: %w", id, err)
				return
			}
			
			// A common mistake is not validating the fetched config.
			// Here we should check for empty fields, etc.
			if config.Domain == "" || config.Upstream == "" || config.OIDC.Issuer == "" {
				err := fmt.Errorf("invalid config for tenant %s: missing required fields", id)
				log.Printf("%v", err)
				fetchSpan.RecordError(err)
				fetchErrors <- err
				return
			}

			mu.Lock()
			tenantConfigs = append(tenantConfigs, config)
			mu.Unlock()
		}(tenantID)
	}

	wg.Wait()
	close(fetchErrors)

	// Check if any goroutine reported an error
	for err := range fetchErrors {
		// Log all errors, but we might decide to proceed with a partial config
		log.Printf("An error occurred during tenant config fetch: %v", err)
        // For production, you might want to fail the entire update if any tenant config is invalid.
        // span.RecordError(err)
        // return err
	}
	
	span.AddEvent("All tenant configurations fetched")
	
	// Generate Caddy JSON from all valid tenant configs
	caddyJSON, err := buildCaddyJSON(tenantConfigs)
	if err != nil {
		span.RecordError(err)
		return fmt.Errorf("failed to build caddy json: %w", err)
	}
	
	span.AddEvent("Caddy JSON configuration generated")

	// Push to Caddy Admin API
	return pushToCaddy(ctx, caddyJSON)
}

// pushToCaddy sends the generated configuration to the Caddy Admin API.
func pushToCaddy(ctx context.Context, config []byte) error {
	ctx, span := tracer.Start(ctx, "pushToCaddy")
	defer span.End()

	client := http.Client{
		Transport: otelhttp.NewTransport(http.DefaultTransport),
		Timeout:   10 * time.Second,
	}

	req, err := http.NewRequestWithContext(ctx, "POST", caddyAdminAPI, bytes.NewBuffer(config))
	if err != nil {
		span.RecordError(err)
		return fmt.Errorf("failed to create request: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")

	resp, err := client.Do(req)
	if err != nil {
		span.RecordError(err)
		return fmt.Errorf("failed to push config to caddy: %w", err)
	}
	defer resp.Body.Close()

	span.SetAttributes(attribute.Int("caddy.response.status_code", resp.StatusCode))
	if resp.StatusCode != http.StatusOK {
		body, _ := io.ReadAll(resp.Body)
		err := fmt.Errorf("caddy returned non-200 status: %d - %s", resp.StatusCode, string(body))
		span.RecordError(err)
		return err
	}

	log.Println("Successfully pushed new configuration to Caddy.")
	span.AddEvent("Caddy configuration applied successfully")
	return nil
}

// buildCaddyJSON is a helper function that constructs the final Caddy JSON config.
// This is where the translation logic resides.
func buildCaddyJSON(configs []TenantConfig) ([]byte, error) {
	// Base Caddy structure
	caddyConfig := map[string]interface{}{
		"admin": map[string]interface{}{
			"listen": "0.0.0.0:2019",
		},
		"logging": map[string]interface{}{
			"logs": map[string]interface{}{
				"default": map[string]interface{}{
					"level": "INFO",
				},
			},
		},
		"apps": map[string]interface{}{
			"http": map[string]interface{}{
				"servers": map[string]interface{}{
					"main": map[string]interface{}{
						"listen": []string{":443"},
						"routes": []map[string]interface{}{},
						// Automatic HTTPS for all domains
						"automatic_https": map[string]interface{}{
							"disable_redirects": true,
						},
					},
				},
			},
		},
	}

	routes := []map[string]interface{}{}
	for _, tenant := range configs {
		route := map[string]interface{}{
			"match": []map[string]interface{}{
				{
					"host": []string{tenant.Domain},
				},
			},
			"handle": []map[string]interface{}{
				// OIDC Authentication handler using the caddy-security plugin
				{
					"handler": "authentication",
					"providers": map[string]interface{}{
						"oauth2": []map[string]interface{}{
							{
								"provider":        "openid_connect",
								"name":            fmt.Sprintf("oidc_%s", tenant.Domain), // Unique name
								"client_id":       tenant.OIDC.ClientID,
								"client_secret":   tenant.OIDC.ClientSecret,
								"issuer":          tenant.OIDC.Issuer,
								"scopes":          tenant.OIDC.Scopes,
								"token_key":       "access_token",
								"auth_url_params": map[string]string{"prompt": "login"},
							},
						},
					},
				},
				// Reverse proxy to the upstream service
				{
					"handler": "reverse_proxy",
					"upstreams": []map[string]interface{}{
						{
							"dial": tenant.Upstream,
						},
					},
				},
			},
			"terminal": true,
		}
		routes = append(routes, route)
	}

	// It's critical to inject the routes into the correct part of the structure.
	caddyConfig["apps"].(map[string]interface{})["http"].(map[string]interface{})["servers"].(map[string]interface{})["main"].(map[string]interface{})["routes"] = routes

	return json.MarshalIndent(caddyConfig, "", "  ")
}

这个控制器有几个关键的设计考量:

  1. 全量更新:Caddy 的 /load 端点接受完整的配置。因此,我们的控制器每次都获取所有租户的信息,生成一份完整的、全新的 Caddy 配置并推送。这简化了逻辑,避免了处理增量更新的复杂性。
  2. 并发获取generateAndPushCaddyConfig 函数并发地从 Zookeeper 获取所有租户的配置,以提高大规模租户场景下的同步速度。
  3. 鲁棒性:代码中包含了对 Zookeeper 连接错误、配置解析错误和 Caddy API 调用失败的基本处理。在真实项目中,这里还需要加入更复杂的重试和指数退避逻辑。
  4. 可观测性:通过 OpenTelemetry,我们为关键操作(如 Zookeeper 事件处理、配置生成、API 推送)创建了 Span。这使得在分布式追踪系统中,我们可以清晰地看到每次配置变更的完整生命周期,包括耗时、涉及的租户数量以及是否成功。如果一次更新失败,追踪数据能帮助我们快速定位是 Zookeeper 读取失败、配置格式错误还是 Caddy 拒绝了配置。

Caddy 的配置与启动

Caddy 本身的初始配置非常简单。我们只需要确保 Admin API 是开启的,并且安装了支持 OIDC 的 caddy-security 插件。

一个最小化的初始 Caddyfile,仅用于启动 Caddy 并开启管理端点。控制器会通过 API 覆盖这里的配置。

Caddyfile.initial:

{
	admin 0.0.0.0:2019
	# Enable OpenTelemetry tracing for Caddy itself
	trace
}

或者,直接用一个空的 JSON 文件启动:
caddy.json.initial:

{
	"admin": {
		"listen": "0.0.0.0:2019"
	},
	"logging": {
		"logs": {
			"default": {
				"level": "INFO"
			}
		}
	}
}

启动 Caddy(假设你已经构建了带 caddy-securityopentelemetry 插件的自定义版本):

# Export OTEL env vars so Caddy's tracer can connect to the collector
export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
export OTEL_SERVICE_NAME=caddy-gateway

./caddy run --config caddy.json.initial

端到端流程验证

  1. 启动 Zookeeper, OpenTelemetry Collector, Jaeger 和 Caddy。
  2. 启动 Go 控制器 go run main.go。控制器日志会显示它连接到 ZK 并执行了首次同步。此时 Caddy 配置中还没有任何路由。
  3. 在另一个终端使用 zkCli.sh 创建一个租户:
    create /gateway/tenants/tenant-alpha '{"domain":"tenant-alpha.myapplication.com","upstream":"http://httpbin.org","oidc":{"issuer":"https://accounts.google.com","client_id":"YOUR_GOOGLE_CLIENT_ID","client_secret":"YOUR_GOOGLE_CLIENT_SECRET","scopes":["openid","profile","email"]}}'
  4. 观察控制器日志。它应该会立刻检测到子节点变化,触发一次 resync,获取 tenant-alpha 的数据,生成新的 Caddy 配置,并将其 POST 到 Caddy 的 Admin API。
  5. 访问 https://tenant-alpha.myapplication.com (需要配置 DNS 或本地 hosts 文件)。你将被重定向到 Google 的登录页面,这证明 OIDC 路由已经动态生效。
  6. 在 Jaeger UI 中,搜索 caddy-zk-controller 服务。你会看到一个名为 Zookeeper Event Received 的 trace,其中包含了 generateAndPushCaddyConfigpushToCaddy 的子 span。同时,搜索 caddy-gateway 服务,你也能看到处理 tenant-alpha.myapplication.com 请求的 trace。全链路追踪将 Caddy 网关的行为和其背后的控制平面活动关联了起来。

方案的局限性与未来迭代

这套方案虽然解决了核心痛点,但在生产环境中仍有值得改进之处:

  1. 控制器的可用性:当前的控制器是单点的。如果它崩溃,配置将无法更新。生产环境需要部署多个控制器实例,并通过 Zookeeper 的临时节点实现 Leader 选举,保证只有一个实例在工作。
  2. 配置验证:控制器在将配置推送到 Caddy 之前,只做了非常基础的非空校验。一个常见的错误是,无效的 OIDC issuer URL 或错误的凭证会导致 Caddy 加载配置后认证流程失败。可以在控制器中加入一个“预检”逻辑,尝试与 OIDC 的 .well-known/openid-configuration 端点通信,以验证配置的有效性。
  3. Zookeeper 依赖:对于某些团队来说,维护一个 Zookeeper 集群可能过于沉重。此架构模式可以被轻松地移植到其他配置存储上,如 etcd 或 Consul,只需替换 Go 控制器中的客户端库和 Watch 逻辑即可。
  4. 大规模租户性能:当租户数量达到数千甚至上万时,每次变更都触发全量配置的生成和推送,可能会给控制器和 Caddy Admin API 带来压力。虽然 Caddy 处理这个体量的 JSON 速度很快,但更优化的方案可能是计算配置的 diff,并使用 Caddy Admin API 的细粒度 PATCH 接口来只更新变化的部分。但这会显著增加控制器的实现复杂度,是一种典型的性能与复杂度的权衡。

  目录