Go 基础设施:Kubernetes client-go 控制器模式深度

从 Reflector/Informer 机制、DeltaFIFO 队列、工作队列设计出发,解析自定义控制器的 reconciliation 循环与乐观并发控制。

资深架构师 · 基础架构部

Kubernetes controller-runtime 是构建自定义控制器的核心框架。理解 client-go 底层的数据流 —— 从 API Server 事件到 DeltaFIFO、再到工作队列 —— 是编写生产级控制器的必备基础。

1. Reflector/Informer 机制

Reflector 是 Kubernetes Informer 模式的核心组件,负责从 API Server 监听资源变化并填充到本地 Store(DeltaFIFO)。Informer 则是对 Reflector、Store、处理器的高层抽象。

1.1 整体架构

API Server
REST API + etcd
Watch/List 接口
Reflector
长期 Watch 连接
增量事件同步
DeltaFIFO
增量队列
事件去重
Indexer
本地索引存储
Thread-safe
Controller
事件处理循环
SharedIndexInformer
WorkQueue
限速队列
并发控制

Informer 架构图:Reflector 负责与 API Server 保持长连接,DeltaFIFO 缓存变化事件,Indexer 提供本地查询能力。

1.2 Reflector 的 List/Watch 机制

// Reflector 核心逻辑 (client-go/tools/cache/reflector.go)

type Reflector struct {
    name          string
    expectedType  interface{}
    store         Store           // DeltaFIFO
    listerWatcher ListerWatcher   // List/Watch 接口
    watchErrorHandler WatchErrorHandler

    // 进度跟踪
    lastSyncResourceVersion string
}

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    // 1. 首次全量 List
    list, err := r.listerWatcher.List(options)
    if err != nil {
        return err
    }
    listMetaInterface, items, err := r.extractList(list)

    // 2. 同步到 Store(带 ResourceVersion)
    for _, item := range items {
        if err := r.store.Add(item); err != nil {
            return err
        }
    }
    r.setLastSyncResourceVersion(getResourceVersion(listMetaInterface))

    // 3. 增量 Watch 循环
    for {
        select {
        case <-stopCh:
            return nil
        default:
            options = ListOptions{
                ResourceVersion: r.lastSyncResourceVersion,
                Watch: true,
            }
            w, err := r.listerWatcher.Watch(options)
            if err != nil {
                // 断线重连策略
                time.Sleep(exponentialBackoff)
            }
            r.watchHandler(w)
        }
    }
}
💡 关键洞察

Reflector 使用 ResourceVersion 实现渐进式同步。每次 List/Watch 都携带上一次成功的 RV,API Server 只会返回比该 RV 新的事件。这避免了全量同步的开销。

1.3 SharedInformerFactory

生产环境中,多个控制器通常共享同一个 Informer 以减少 API Server 连接数。SharedInformerFactory 实现了这一共享机制。

// 创建共享 Informer 工厂
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)

// 获取共享 Informer(相同资源的多个 Watch 合并为一次)
deploymentInformer := factory.Apps().V1().Deployments()
serviceInformer := factory.Core().V1().Services()

// 启动所有 Informer
factory.Start(stopCh)

// 等待缓存同步完成
factory.WaitForCacheSync(stopCh)
◆ ◆ ◆

2. DeltaFIFO 队列原理

DeltaFIFO 是 Kubernetes 控制器中最关键的数据结构之一,它不仅是一个 FIFO 队列,还负责维护资源的增量变化(Delta)历史。

2.1 Delta 类型定义

// Delta 类型 (client-go/tools/cache/delta_type.go)

type DeltaType string

const (
    Added    DeltaType = "Added"
    Modified DeltaType = "Modified"
    Deleted  DeltaType = "Deleted"
    Replaced DeltaType = "Replaced"   // 来自 List 事件
    Sync     DeltaType = "Sync"         // 周期性同步
)

// DeltaFIFO 的存储结构
type DeltaFIFO struct {
    items     map[string][]Delta  // key -> Deltas 列表
    queue     []string             // 顺序队列
    lock      sync.RWMutex
    cond      *sync.Cond
}

type Delta struct {
    Type   DeltaType
    Object interface{}  // 可能是.runtime.Object 或 DeletedFinalStateUnknown
}

2.2 事件合并逻辑

DeltaFIFO 会对同一对象的连续事件进行合并,只保留最新状态。这避免了积压大量中间状态事件。

// 关键方法:Add/Update/Delete 都会调用 queueActionLocked

func (f *DeltaFIFO) queueActionLocked(action DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return err
    }

    // 追加 Delta
    oldDeltas := f.items[id]
    newDeltas := append(oldDeltas, Delta{Type: action, Object: obj})
    f.items[id] = newDeltas

    // 加入处理队列
    if len(oldDeltas) == 0 {
        f.queue = append(f.queue, id)
    }

    return nil
}

// Replace 方法:来自 List 的全量同步
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
    f.lock.Lock()
    defer f.lock.Unlock()

    // 1. 找出需要删除的 keys(本地有但 list 中没有)
    keys := make(map[string]struct{})
    for _, item := range list {
        keys[f.KeyOf(item)] = struct{}{}
    }

    for key, deltas := range f.items {
        if _, exists := keys[key]; !exists {
            // 发送 Deleted 事件
            f.queueActionLocked(Deleted, deltas[0].Object)
            delete(f.items, key)
        }
    }

    // 2. 批量添加/更新(标记为 Replaced)
    for _, item := range list {
        f.queueActionLocked(Replaced, item)
    }
    return nil
}
⚠️ DeletedFinalStateUnknown

当收到 Deleted 事件时,如果本地缓存中找不到该对象(可能被其他控制器删除了),DeltaFIFO 会发送 DeletedFinalStateUnknown 而不是普通 Deleted。这要求控制器在处理时能够优雅地处理"对象已不存在"的情况。

2.3 Pop 消费模式

// Process 函数由控制器实现,负责实际处理
type ProcessFunc func(obj interface{}, isInInitialList bool) error

func (f *DeltaFIFO) Pop(process ProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for len(f.queue) == 0 {
        f.cond.Wait()
    }

    // 取出最旧的 key
    key := f.queue[0]
    f.queue = f.queue[1:]

    // 获取所有 Deltas
    item, ok := f.items[key]
    if !ok {
        return nil, KeyError{Key: key}
    }
    delete(f.items, key)

    // 调用处理函数
    isInInitialList := false
    if len(item) > 0 && item[0].Type == Replaced {
        isInInitialList = true
    }

    err := process(item, isInInitialList)
    return item, err
}
◆ ◆ ◆

3. 工作队列设计

Kubernetes client-go 提供了三种工作队列实现,从简单到复杂分别适用于不同场景。

3.1 三种队列对比

类型特性适用场景
Interface基础队列,无去重/限速简单控制器
RateLimitingQueue限速 + 延迟重试大多数控制器
DelayingQueue延迟入队指数退避

3.2 RateLimitingQueue 实现

// 工作队列接口
type RateLimitingInterface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShuttingDown() bool
}

// 创建带限速的工作队列
queue := workqueue.NewRateLimitingQueueWithConfig(
    workqueue.DefaultControllerRateLimiter(),
    workqueue.Options{},
)

// 控制器主循环
for {
    item, shutdown := queue.Get()
    if shutdown {
        return
    }

    err := syncHandler(item)
    if err != nil {
        // 限速重试:指数退避
        queue.AddRateLimited(item)
    } else {
        // 处理完成,标记 Done
        queue.Forget(item)
    }
    queue.Done(item)
}

3.3 限速器算法

// DefaultControllerRateLimiter 实现
type DefaultControllerRateLimiter struct {
    perItem   RateLimiter
    itemCount int
}

func NewDefaultControllerRateLimiter() RateLimiter {
    return &itemCountRetryWithDelay{
        delays: []time.Duration{
            5 * time.Millisecond,
            10 * time.Millisecond,
            20 * time.Millisecond,
            40 * time.Millisecond,
            80 * time.Millisecond,
            160 * time.Millisecond,
            320 * time.Millisecond,
            640 * time.Millisecond,
            1 * time.Second,
            2 * time.Second,
            4 * time.Second,
            8 * time.Second,
        },
    }
}

// 指数退避:延迟 = min(base * 2^attempts, maxDelay)
func (r *itemCountRetryWithDelay) When(item interface{}) time.Duration {
    attempts := r.attempts[item]
    r.attempts[item] = attempts + 1
    delay := r.delays[attempts]
    if delay == 0 {
        delay = r.delays[len(r.delays)-1]
    }
    return delay
}
✓ 推荐实践

使用 controller-runtimecontroller.New() 配合 reconcile.Func 可以大幅简化控制器的编写,它内置了工作队列管理和重试逻辑。

◆ ◆ ◆

4. Reconciliation 循环

Reconciliation 是 Kubernetes 控制器的核心模式。控制器持续观察期望状态与实际状态的差异,并采取措施使两者趋于一致。

4.1 Reconciler 接口

// controller-runtime v0.14+ Reconciler 接口
type Reconciler interface {
    Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
}

// 典型的 Reconciler 实现
type DeploymentReconciler struct {
    client.Client
    Scheme *runtime.Scheme
}

func (r *DeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := log.FromContext(ctx)

    // 1. 获取 Deployment
    dep := &appsv1.Deployment{}
    if err := r.Get(ctx, req.NamespacedName, dep); err != nil {
        if errors.IsNotFound(err) {
            // 对象已删除,清理相关资源
            return ctrl.Result{}, nil
        }
        return ctrl.Result{}, err
    }

    // 2. 计算期望的 ReplicaSet
    desired := r.generateReplicaSet(dep)

    // 3. 创建/更新 ReplicaSet
    if err := r.createOrUpdate(ctx, desired); err != nil {
        return ctrl.Result{}, err
    }

    // 4. 更新 Status
    if err := r.updateStatus(ctx, dep); err != nil {
        return ctrl.Result{}, err
    }

    return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

4.2 createOrUpdate 模式

func (r *DeploymentReconciler) createOrUpdate(ctx context.Context, dep *appsv1.Deployment) error {
    rs := &appsv1.ReplicaSet{}

    err := r.Get(ctx, types.NamespacedName{
        Name:      dep.Name + "-rs",
        Namespace: dep.Namespace,
    }, rs)

    if errors.IsNotFound(err) {
        // 创建
        return r.Create(ctx, dep)
    }
    if err != nil {
        return err
    }

    // 更新:对比 Spec,修改需要的字段
    if !reflect.DeepEqual(rs.Spec, dep.Spec) {
        rs.Spec = dep.Spec
        return r.Update(ctx, rs)
    }

    return nil
}

4.3 乐观并发控制

Kubernetes 使用 ResourceVersion 实现乐观并发控制。每次更新对象时必须携带当前持有的 ResourceVersion,如果与 API Server 不匹配则更新失败。

// 乐观锁更新模式
func (r *DeploymentReconciler) updateWithRetry(ctx context.Context, dep *appsv1.Deployment) error {
    return retry.RetryOnConflict(retry.DefaultRetry, func() error {
        // 每次重试前重新获取最新对象
        latest := &appsv1.Deployment{}
        if err := r.Get(ctx, req.NamespacedName, latest); err != nil {
            return err
        }

        // 修改最新对象的 Spec
        latest.Spec.Replicas = dep.Spec.Replicas

        // 更新时携带 latest.ResourceVersion
        // 如果 RV 已改变,返回 Conflict 错误
        return r.Update(ctx, latest)
    })
}
⚠️ 冲突处理的艺术

收到 Conflict 错误时,不应立即重试。短暂的冲突表明其他控制器也在操作同一对象。指数退避的限速器配合 RetryOnConflict 是标准做法。

◆ ◆ ◆

5. 乐观并发控制

乐观并发控制(Optimistic Concurrency Control)是 Kubernetes 分布式协调的核心机制,理解它对于编写正确的控制器至关重要。

5.1 ResourceVersion 机制

// Kubernetes 对象元数据中的版本字段
type ObjectMeta struct {
    ResourceVersion string `json:"resourceVersion,omitempty"`
    UID              types.UID `json:"uid,omitempty"`
}

// API Server 的 CAS (Compare-and-Swap) 语义
// UPDATE 请求携带当前 RV,API Server 比较:
// - 如果服务端 RV == 请求 RV → 更新成功,RV++
// - 如果服务端 RV != 请求 RV → 返回 409 Conflict

// 客户端处理冲突
retryCount := 0
for {
    // 获取最新对象和 RV
    current, err := client.Get(ctx, name)
    if err != nil {
        return err
    }

    // 修改
    modified := current.DeepCopy()
    modified.Spec.Replicas = desiredReplicas

    // 尝试更新
    _, err = client.Update(ctx, modified)
    if err == nil {
        return nil  // 成功
    }

    if errors.IsConflict(err) && retryCount < maxRetries {
        retryCount++
        time.Sleep(time.Duration(retryCount) * 100 * time.Millisecond)
        continue  // 重试
    }
    return err
}

5.2 Generation 机制

Generation 是 spec 中"代际"计数器,用于区分 spec 变更和 status 变更。当 spec 变化时 generation++,status 变化不影响 generation。

// Generation 用于判断 spec 是否变化
type Deployment struct {
    Spec   DeploymentSpec   `json:"spec,omitempty"`
    Status DeploymentStatus  `json:"status,omitempty"`
}

type ObjectMeta struct {
    Generation int64 `json:"generation,omitempty"`
}

// 控制器可通过检查 generation 判断 spec 是否变化
if deployment.Generation > deployment.Status.ObservedGeneration {
    // Spec 已变化,需要重新 rollout
}

5.3 Finalizer 模式

Finalizer 确保在对象被删除前执行清理逻辑,防止资源泄漏。

// 添加 Finalizer
func (r *DeploymentReconciler) addFinalizer(ctx context.Context, dep *appsv1.Deployment) error {
    dep.Finalizers = append(dep.Finalizers, "example.com/cleanup")
    return r.Update(ctx, dep)
}

// Reconciler 中的删除处理
func (r *DeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    dep := &appsv1.Deployment{}
    if err := r.Get(ctx, req.NamespacedName, dep); err != nil {
        if errors.IsNotFound(err) {
            // 确认清理已完成,移除 Finalizer
            r.removeFinalizerFromCache(req.NamespacedName)
            return ctrl.Result{}, nil
        }
    }

    if !dep.DeletionTimestamp.IsZero() {
        // 执行清理逻辑
        if err := r.cleanupResources(ctx, dep); err != nil {
            return ctrl.Result{}, err
        }

        // 移除 Finalizer,允许对象删除
        dep.Finalizers = removeString(dep.Finalizers, "example.com/cleanup")
        return ctrl.Result{}, r.Update(ctx, dep)
    }

    // 正常处理...
}
✓ 生产建议

生产级控制器应使用 controller-runtime 提供的 client 接口和 reconcile 框架,配合 controller-runtime 的多集群支持(fleet)来构建弹性的 operator。