Go 基础设施:Kubernetes client-go 控制器模式深度
从 Reflector/Informer 机制、DeltaFIFO 队列、工作队列设计出发,解析自定义控制器的 reconciliation 循环与乐观并发控制。
Kubernetes controller-runtime 是构建自定义控制器的核心框架。理解 client-go 底层的数据流 —— 从 API Server 事件到 DeltaFIFO、再到工作队列 —— 是编写生产级控制器的必备基础。
1. Reflector/Informer 机制
Reflector 是 Kubernetes Informer 模式的核心组件,负责从 API Server 监听资源变化并填充到本地 Store(DeltaFIFO)。Informer 则是对 Reflector、Store、处理器的高层抽象。
1.1 整体架构
REST API + etcd
Watch/List 接口
长期 Watch 连接
增量事件同步
增量队列
事件去重
本地索引存储
Thread-safe
事件处理循环
SharedIndexInformer
限速队列
并发控制
Informer 架构图:Reflector 负责与 API Server 保持长连接,DeltaFIFO 缓存变化事件,Indexer 提供本地查询能力。
1.2 Reflector 的 List/Watch 机制
// Reflector 核心逻辑 (client-go/tools/cache/reflector.go)
type Reflector struct {
name string
expectedType interface{}
store Store // DeltaFIFO
listerWatcher ListerWatcher // List/Watch 接口
watchErrorHandler WatchErrorHandler
// 进度跟踪
lastSyncResourceVersion string
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// 1. 首次全量 List
list, err := r.listerWatcher.List(options)
if err != nil {
return err
}
listMetaInterface, items, err := r.extractList(list)
// 2. 同步到 Store(带 ResourceVersion)
for _, item := range items {
if err := r.store.Add(item); err != nil {
return err
}
}
r.setLastSyncResourceVersion(getResourceVersion(listMetaInterface))
// 3. 增量 Watch 循环
for {
select {
case <-stopCh:
return nil
default:
options = ListOptions{
ResourceVersion: r.lastSyncResourceVersion,
Watch: true,
}
w, err := r.listerWatcher.Watch(options)
if err != nil {
// 断线重连策略
time.Sleep(exponentialBackoff)
}
r.watchHandler(w)
}
}
}
Reflector 使用 ResourceVersion 实现渐进式同步。每次 List/Watch 都携带上一次成功的 RV,API Server 只会返回比该 RV 新的事件。这避免了全量同步的开销。
1.3 SharedInformerFactory
生产环境中,多个控制器通常共享同一个 Informer 以减少 API Server 连接数。SharedInformerFactory 实现了这一共享机制。
// 创建共享 Informer 工厂
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
// 获取共享 Informer(相同资源的多个 Watch 合并为一次)
deploymentInformer := factory.Apps().V1().Deployments()
serviceInformer := factory.Core().V1().Services()
// 启动所有 Informer
factory.Start(stopCh)
// 等待缓存同步完成
factory.WaitForCacheSync(stopCh)
2. DeltaFIFO 队列原理
DeltaFIFO 是 Kubernetes 控制器中最关键的数据结构之一,它不仅是一个 FIFO 队列,还负责维护资源的增量变化(Delta)历史。
2.1 Delta 类型定义
// Delta 类型 (client-go/tools/cache/delta_type.go)
type DeltaType string
const (
Added DeltaType = "Added"
Modified DeltaType = "Modified"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced" // 来自 List 事件
Sync DeltaType = "Sync" // 周期性同步
)
// DeltaFIFO 的存储结构
type DeltaFIFO struct {
items map[string][]Delta // key -> Deltas 列表
queue []string // 顺序队列
lock sync.RWMutex
cond *sync.Cond
}
type Delta struct {
Type DeltaType
Object interface{} // 可能是.runtime.Object 或 DeletedFinalStateUnknown
}
2.2 事件合并逻辑
DeltaFIFO 会对同一对象的连续事件进行合并,只保留最新状态。这避免了积压大量中间状态事件。
// 关键方法:Add/Update/Delete 都会调用 queueActionLocked
func (f *DeltaFIFO) queueActionLocked(action DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return err
}
// 追加 Delta
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{Type: action, Object: obj})
f.items[id] = newDeltas
// 加入处理队列
if len(oldDeltas) == 0 {
f.queue = append(f.queue, id)
}
return nil
}
// Replace 方法:来自 List 的全量同步
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
// 1. 找出需要删除的 keys(本地有但 list 中没有)
keys := make(map[string]struct{})
for _, item := range list {
keys[f.KeyOf(item)] = struct{}{}
}
for key, deltas := range f.items {
if _, exists := keys[key]; !exists {
// 发送 Deleted 事件
f.queueActionLocked(Deleted, deltas[0].Object)
delete(f.items, key)
}
}
// 2. 批量添加/更新(标记为 Replaced)
for _, item := range list {
f.queueActionLocked(Replaced, item)
}
return nil
}
当收到 Deleted 事件时,如果本地缓存中找不到该对象(可能被其他控制器删除了),DeltaFIFO 会发送 DeletedFinalStateUnknown 而不是普通 Deleted。这要求控制器在处理时能够优雅地处理"对象已不存在"的情况。
2.3 Pop 消费模式
// Process 函数由控制器实现,负责实际处理
type ProcessFunc func(obj interface{}, isInInitialList bool) error
func (f *DeltaFIFO) Pop(process ProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for len(f.queue) == 0 {
f.cond.Wait()
}
// 取出最旧的 key
key := f.queue[0]
f.queue = f.queue[1:]
// 获取所有 Deltas
item, ok := f.items[key]
if !ok {
return nil, KeyError{Key: key}
}
delete(f.items, key)
// 调用处理函数
isInInitialList := false
if len(item) > 0 && item[0].Type == Replaced {
isInInitialList = true
}
err := process(item, isInInitialList)
return item, err
}
3. 工作队列设计
Kubernetes client-go 提供了三种工作队列实现,从简单到复杂分别适用于不同场景。
3.1 三种队列对比
| 类型 | 特性 | 适用场景 |
|---|---|---|
| Interface | 基础队列,无去重/限速 | 简单控制器 |
| RateLimitingQueue | 限速 + 延迟重试 | 大多数控制器 |
| DelayingQueue | 延迟入队 | 指数退避 |
3.2 RateLimitingQueue 实现
// 工作队列接口
type RateLimitingInterface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
// 创建带限速的工作队列
queue := workqueue.NewRateLimitingQueueWithConfig(
workqueue.DefaultControllerRateLimiter(),
workqueue.Options{},
)
// 控制器主循环
for {
item, shutdown := queue.Get()
if shutdown {
return
}
err := syncHandler(item)
if err != nil {
// 限速重试:指数退避
queue.AddRateLimited(item)
} else {
// 处理完成,标记 Done
queue.Forget(item)
}
queue.Done(item)
}
3.3 限速器算法
// DefaultControllerRateLimiter 实现
type DefaultControllerRateLimiter struct {
perItem RateLimiter
itemCount int
}
func NewDefaultControllerRateLimiter() RateLimiter {
return &itemCountRetryWithDelay{
delays: []time.Duration{
5 * time.Millisecond,
10 * time.Millisecond,
20 * time.Millisecond,
40 * time.Millisecond,
80 * time.Millisecond,
160 * time.Millisecond,
320 * time.Millisecond,
640 * time.Millisecond,
1 * time.Second,
2 * time.Second,
4 * time.Second,
8 * time.Second,
},
}
}
// 指数退避:延迟 = min(base * 2^attempts, maxDelay)
func (r *itemCountRetryWithDelay) When(item interface{}) time.Duration {
attempts := r.attempts[item]
r.attempts[item] = attempts + 1
delay := r.delays[attempts]
if delay == 0 {
delay = r.delays[len(r.delays)-1]
}
return delay
}
使用 controller-runtime 的 controller.New() 配合 reconcile.Func 可以大幅简化控制器的编写,它内置了工作队列管理和重试逻辑。
4. Reconciliation 循环
Reconciliation 是 Kubernetes 控制器的核心模式。控制器持续观察期望状态与实际状态的差异,并采取措施使两者趋于一致。
4.1 Reconciler 接口
// controller-runtime v0.14+ Reconciler 接口
type Reconciler interface {
Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
}
// 典型的 Reconciler 实现
type DeploymentReconciler struct {
client.Client
Scheme *runtime.Scheme
}
func (r *DeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
// 1. 获取 Deployment
dep := &appsv1.Deployment{}
if err := r.Get(ctx, req.NamespacedName, dep); err != nil {
if errors.IsNotFound(err) {
// 对象已删除,清理相关资源
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
// 2. 计算期望的 ReplicaSet
desired := r.generateReplicaSet(dep)
// 3. 创建/更新 ReplicaSet
if err := r.createOrUpdate(ctx, desired); err != nil {
return ctrl.Result{}, err
}
// 4. 更新 Status
if err := r.updateStatus(ctx, dep); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
4.2 createOrUpdate 模式
func (r *DeploymentReconciler) createOrUpdate(ctx context.Context, dep *appsv1.Deployment) error {
rs := &appsv1.ReplicaSet{}
err := r.Get(ctx, types.NamespacedName{
Name: dep.Name + "-rs",
Namespace: dep.Namespace,
}, rs)
if errors.IsNotFound(err) {
// 创建
return r.Create(ctx, dep)
}
if err != nil {
return err
}
// 更新:对比 Spec,修改需要的字段
if !reflect.DeepEqual(rs.Spec, dep.Spec) {
rs.Spec = dep.Spec
return r.Update(ctx, rs)
}
return nil
}
4.3 乐观并发控制
Kubernetes 使用 ResourceVersion 实现乐观并发控制。每次更新对象时必须携带当前持有的 ResourceVersion,如果与 API Server 不匹配则更新失败。
// 乐观锁更新模式
func (r *DeploymentReconciler) updateWithRetry(ctx context.Context, dep *appsv1.Deployment) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
// 每次重试前重新获取最新对象
latest := &appsv1.Deployment{}
if err := r.Get(ctx, req.NamespacedName, latest); err != nil {
return err
}
// 修改最新对象的 Spec
latest.Spec.Replicas = dep.Spec.Replicas
// 更新时携带 latest.ResourceVersion
// 如果 RV 已改变,返回 Conflict 错误
return r.Update(ctx, latest)
})
}
收到 Conflict 错误时,不应立即重试。短暂的冲突表明其他控制器也在操作同一对象。指数退避的限速器配合 RetryOnConflict 是标准做法。
5. 乐观并发控制
乐观并发控制(Optimistic Concurrency Control)是 Kubernetes 分布式协调的核心机制,理解它对于编写正确的控制器至关重要。
5.1 ResourceVersion 机制
// Kubernetes 对象元数据中的版本字段
type ObjectMeta struct {
ResourceVersion string `json:"resourceVersion,omitempty"`
UID types.UID `json:"uid,omitempty"`
}
// API Server 的 CAS (Compare-and-Swap) 语义
// UPDATE 请求携带当前 RV,API Server 比较:
// - 如果服务端 RV == 请求 RV → 更新成功,RV++
// - 如果服务端 RV != 请求 RV → 返回 409 Conflict
// 客户端处理冲突
retryCount := 0
for {
// 获取最新对象和 RV
current, err := client.Get(ctx, name)
if err != nil {
return err
}
// 修改
modified := current.DeepCopy()
modified.Spec.Replicas = desiredReplicas
// 尝试更新
_, err = client.Update(ctx, modified)
if err == nil {
return nil // 成功
}
if errors.IsConflict(err) && retryCount < maxRetries {
retryCount++
time.Sleep(time.Duration(retryCount) * 100 * time.Millisecond)
continue // 重试
}
return err
}
5.2 Generation 机制
Generation 是 spec 中"代际"计数器,用于区分 spec 变更和 status 变更。当 spec 变化时 generation++,status 变化不影响 generation。
// Generation 用于判断 spec 是否变化
type Deployment struct {
Spec DeploymentSpec `json:"spec,omitempty"`
Status DeploymentStatus `json:"status,omitempty"`
}
type ObjectMeta struct {
Generation int64 `json:"generation,omitempty"`
}
// 控制器可通过检查 generation 判断 spec 是否变化
if deployment.Generation > deployment.Status.ObservedGeneration {
// Spec 已变化,需要重新 rollout
}
5.3 Finalizer 模式
Finalizer 确保在对象被删除前执行清理逻辑,防止资源泄漏。
// 添加 Finalizer
func (r *DeploymentReconciler) addFinalizer(ctx context.Context, dep *appsv1.Deployment) error {
dep.Finalizers = append(dep.Finalizers, "example.com/cleanup")
return r.Update(ctx, dep)
}
// Reconciler 中的删除处理
func (r *DeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
dep := &appsv1.Deployment{}
if err := r.Get(ctx, req.NamespacedName, dep); err != nil {
if errors.IsNotFound(err) {
// 确认清理已完成,移除 Finalizer
r.removeFinalizerFromCache(req.NamespacedName)
return ctrl.Result{}, nil
}
}
if !dep.DeletionTimestamp.IsZero() {
// 执行清理逻辑
if err := r.cleanupResources(ctx, dep); err != nil {
return ctrl.Result{}, err
}
// 移除 Finalizer,允许对象删除
dep.Finalizers = removeString(dep.Finalizers, "example.com/cleanup")
return ctrl.Result{}, r.Update(ctx, dep)
}
// 正常处理...
}
生产级控制器应使用 controller-runtime 提供的 client 接口和 reconcile 框架,配合 controller-runtime 的多集群支持(fleet)来构建弹性的 operator。