go, k8s

client-go DeltaFIFO源码解读

简介

Reflector 中获取到的资源事件放入 DeltaFIFO 先进先出队列中。DeltaFIFO 使得客户端能够以高效和一致的方式处理资源状态的变化,是实现Kubernetes控制器模式的关键组件之一。

源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/delta_fifo.go

设计思想

Deltas

与普通的 FIFO 队列直接存储对象不同,DeltaFIFO 针对每个对象的键(Key)维护一个 Deltas 列表,即该对象的一系列变化(Delta)。当一个新的对象变化需要被加入到队列时,它会被追加到相应键的 Deltas 列表中,除非这个变化是一个删除操作,并且 Deltas 列表已经以一个删除操作结束。这种设计使得消费者在处理一个对象时,可以看到自上次处理以来该对象的所有变化。

替换(Replaced)和同步(Sync)

DeltaFIFO 引入了两种额外的对象应用方式:替换(Replaced)和同步(Sync)。如果 EmitDeltaTypeReplaced 选项没有被设置为 true,那么在替换事件中将使用 Sync 以保持向后兼容。Sync 也用于定期的重新同步事件。

生产者-消费者队列

DeltaFIFO 设计为一个生产者-消费者队列,其中 Reflector 作为生产者,负责监视Kubernetes资源并将变化事件放入队列。消费者通过调用 Pop() 方法来处理这些事件。

file

使用场景

DeltaFIFO 适用于以下场景:

  • 你希望最多只处理每个对象变化(Delta)一次。
  • 当你处理一个对象时,你希望看到自上次处理以来发生在该对象上的所有变化。
  • 你希望能够处理某些对象的删除。
  • 你可能希望定期重新处理对象。

方法返回值

DeltaFIFOPop()、Get() 和 GetByKey() 方法返回 interface{} 类型以满足 Store/Queue 接口的要求,但它们总是返回 Deltas 类型的对象。List() 方法返回 FIFO 中每个累加器的最新对象。

已知对象

DeltaFIFOknownObjects KeyListerGetter 提供了列出存储键和通过存储键获取对象的能力。这些对象被称为“已知对象”,它们的集合会以不同的方式修改 Delete、Replace 和 Resync 方法的行为。

线程安全性

关于线程安全性的说明:如果你从多个线程并行调用 Pop(),可能会导致多个线程处理同一个对象的略微不同版本。因此,在使用 DeltaFIFO 时需要注意同步和并发处理的策略。

源码

DeltaFIFOOptions结构体

是DeltaFIFO的配置选项。

type DeltaFIFOOptions struct {
    KeyFunction KeyFunc
    KnownObjects KeyListerGetter
    EmitDeltaTypeReplaced bool
    Transformer TransformFunc
}
  • KeyFunction:KeyFunc类型;用于确定对象应该有什么键(Key)。这个函数在返回的 DeltaFIFOKeyOf() 方法中被暴露出来,其中还包括了对已删除对象和队列状态的额外处理;可选。如果不设置,默认使用 MetaNamespaceKeyFunc
  • KnownObjects:KeyListerGetter类型;预期返回一个消费者“已知”的键(Key)列表。它用于在调用 Replace() 时决定哪些项是缺失的;对于缺失的项,会产生 ‘Deleted’ deltas。如果你可以容忍在 Replace() 操作中遗漏删除操作,KnownObjects 可以为 nil;可选。
  • EmitDeltaTypeReplaced:bool 类型;Replaced DeltaType的作用。在添加了 Replaced 事件类型之前,对 Replace() 的调用与 Sync() 的处理方式相同。出于向后兼容的目的,默认情况下此选项为 false。当为 true 时,对于传递给 Replace() 调用的项,将发送 Replaced 事件,当为 false 时,将发送 Sync 事件;可选,默认为 false
  • Transformer:TransformFunc 类型;如果设置,将在将对象入队之前调用此函数;可选。

DeltaFIFO结构体

队列的定义。

type DeltaFIFO struct {
    lock sync.RWMutex
    cond sync.Cond
    items map[string]Deltas
    queue []string
    populated bool
    initialPopulationCount int
    keyFunc KeyFunc
    knownObjects KeyListerGetter
    closed bool
    emitDeltaTypeReplaced bool
    transformer TransformFunc
}
  • lock 和 cond:这两个字段用于保护对 itemsqueue 的访问。lock 是一个读写锁,用于在修改或访问这些字段时保证线程安全。cond 是一个条件变量,用于在队列为空时阻塞等待,直到有新的元素被添加进来。
  • items:这是一个映射,将一个键映射到一组变化(Deltas)。每组变化至少包含一个变化(Delta)。这些变化代表了对应的 Kubernetes 资源对象的增加、更新或删除操作。
  • queue:这是一个字符串数组,用于按照先进先出(FIFO)的顺序维护键的消费。队列中没有重复的键。一个键在 queue 中当且仅当它也在 items 中。
  • populated 和 initialPopulationCount:这两个字段用于跟踪队列的初始化状态。populated 表示是否已经完成了第一批通过 Replace() 方法插入的元素的填充。initialPopulationCount 是第一次调用 Replace() 方法时插入的元素数量。
  • keyFunc:同上。
  • knownObjects:同上。
  • closed:这是一个布尔值,用于指示队列是否已关闭,以便控制循环可以在队列为空时退出。目前,它不用于控制任何增删改查(CRUD)操作的访问。
  • emitDeltaTypeReplaced:同上。
  • transformer:同上。

TransformFunc类型

允许在对象被处理之前对其进行转换。这个功能的设计初衷是为了在不影响对象处理逻辑的前提下,对对象进行预处理,比如清理对象的某些部分以减少内存使用,或者调整对象的结构以适应特定的处理需求。

type TransformFunc func(interface{}) (interface{}, error)

TransformFunc 被调用时,是在将对象插入到通知队列中,因此这个函数的性能非常重要,请避免执行耗时的操作。这是为了确保系统的高效性能,避免因为转换操作而导致的性能瓶颈。

DeltaType类型

变化的类型。Replaced:当遇到观察错误并且不得不重新列出时发出。此时,我们不知道替换的对象是否已经改变。只有当选项 EmitDeltaTypeReplaced 为真时,才会发出 Replaced

type DeltaType string

const (
    Added   DeltaType = "Added"
    Updated DeltaType = "Updated"
    Deleted DeltaType = "Deleted"
    Replaced DeltaType = "Replaced"
    Sync DeltaType = "Sync"
)

DeletedFinalStateUnknown类型

用于表示一个已删除对象的最终状态未知的情况。这种情况可能发生在与apiserver的连接断开时错过了删除事件。这个类型包含一个Key字段,表示对象的键,和一个Obj字段,可能包含对象的一个旧版本。

type DeletedFinalStateUnknown struct {
    Key string
    Obj interface{}
}

Delta结构体

DeltaDeltas(一个 Delta 对象的列表)的成员,DeltasDeltaFIFO 存储的类型。它告诉你发生了什么变化,以及在这个变化之后对象的状态。除非这个变化是删除,那么你将得到对象在被删除之前的最终状态。

type Delta struct {
    Type   DeltaType
    Object interface{}
}

type Deltas []Delta

KeyListerGetter接口

这是一个组合接口,它结合了KeyListerKeyGetter两个接口的功能。任何实现了KeyListerGetter接口的类型都必须能够列出键并根据给定的键获取值。

type KeyListerGetter interface {
    KeyLister
    KeyGetter
}

KeyLister接口

接口定义了一个方法ListKeys,该方法返回一个字符串切片,包含所有的键。

type KeyLister interface {
    ListKeys() []string
}

KeyGetter接口

接口定义了一个方法GetByKey,该方法接受一个键作为参数,并返回与该键关联的值、一个布尔值(表示该键是否存在)以及一个错误。

type KeyGetter interface {
    GetByKey(key string) (value interface{}, exists bool, err error)
}

Oldest函数

它返回切片中的第一个Delta对象,即最旧的变化。如果切片为空,它返回 nil。

func (d Deltas) Oldest() *Delta {
    if len(d) > 0 {
        return &d[0]
    }
    return nil
}

Newest函数

返回切片中的最后一个Delta对象,即最新的变化。如果切片为空,它也返回 nil。

func (d Deltas) Newest() *Delta {
    if n := len(d); n > 0 {
        return &d[n-1]
    }
    return nil
}

copyDeltas函数

函数接受一个Deltas切片作为参数,并返回这个切片的一个浅拷贝。这意味着它拷贝了切片本身,但没有拷贝切片中的Delta对象。

func copyDeltas(d Deltas) Deltas {
    d2 := make(Deltas, len(d))
    copy(d2, d)
    return d2
}

NewDeltaFIFO函数

函数返回一个可以用来处理项目更改的队列。已被弃用,改用下面的 NewDeltaFIFOWithOptions

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
    return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
        KeyFunction:  keyFunc,
        KnownObjects: knownObjects,
    })
}

NewDeltaFIFOWithOptions函数

函数返回一个可以用来处理项目更改的队列。

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
    if opts.KeyFunction == nil {
        opts.KeyFunction = MetaNamespaceKeyFunc
    }

    f := &DeltaFIFO{
        items:        map[string]Deltas{},
        queue:        []string{},
        keyFunc:      opts.KeyFunction,
        knownObjects: opts.KnownObjects,

        emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
        transformer:           opts.Transformer,
    }
    f.cond.L = &f.lock
    return f
}

var (
    _ = Queue(&DeltaFIFO{})
)

var (
    ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
)
  • 函数接受一个参数:opts,它是DeltaFIFOOptions类型。如果opts.KeyFunction为空,则将其设置为MetaNamespaceKeyFunc
  • 函数创建了一个新的DeltaFIFO对象f,并设置了其各种属性,包括items,queue,keyFunc,knownObjects,emitDeltaTypeReplaced和transformer。然后,它将f.cond.L设置为f.lock,以便进行同步,并返回f
  • 代码还定义了两个变量。第一个变量是一个断言,确认DeltaFIFO是一个队列。第二个变量ErrZeroLengthDeltasObject是一个错误,当遇到长度为0的Deltas对象时返回。

Close方法

关闭队列。

它首先锁定f.lock,然后在函数结束时解锁。然后,它将f.closed设置为true,表示队列已经关闭。最后,它调用f.cond.Broadcast()来唤醒所有等待在f.cond上的goroutine。

func (f *DeltaFIFO) Close() {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.closed = true
    f.cond.Broadcast()
}

KeyOf方法

用来获取对象的键值。

它首先检查传入的对象是否是Deltas类型,并且如果长度为0,那么返回一个错误。如果不是0,那么将对象更新为Deltas中最新的对象。然后,它检查对象是否是DeletedFinalStateUnknown类型,如果是,那么直接返回其键值。如果都不是,那么调用f.keyFunc方法来获取对象的键值。

func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
    if d, ok := obj.(Deltas); ok {
        if len(d) == 0 {
            return "", KeyError{obj, ErrZeroLengthDeltasObject}
        }
        obj = d.Newest().Object
    }
    if d, ok := obj.(DeletedFinalStateUnknown); ok {
        return d.Key, nil
    }
    return f.keyFunc(obj)
}

HasSynced方法

HasSynced用于判断是否已经完成同步。它会检查是否调用过Add、Update、Delete、AddIfNotPresent操作,或者Replace()插入的第一批项目是否已经被处理(从队列中弹出)。

hasSynced_locked用于在已经获取锁的情况下判断是否已完成同步。返回一个布尔值,判断DeltaFIFO对象的populated字段是否为true,即是否已经被填充过数据。并且initialPopulationCount字段是否为0,即初始填充的所有项目都已经被处理。

func (f *DeltaFIFO) HasSynced() bool {
    f.lock.Lock()
    defer f.lock.Unlock()
    return f.hasSynced_locked()
}

func (f *DeltaFIFO) hasSynced_locked() bool {
    return f.populated && f.initialPopulationCount == 0
}

生产者方法

Add方法

用于向队列中添加新的对象。

func (f *DeltaFIFO) Add(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Added, obj)
}

获取锁,标记已填充,调用queueActionLocked方法将对象添加到队列中。该方法在锁定的情况下进行执行,将操作类型设置为Added,并将要添加的对象传递给它,解锁,返回结果。

Update方法

用于更新队列中的对象。

func (f *DeltaFIFO) Update(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    return f.queueActionLocked(Updated, obj)
}

获取锁,标记已填充,调用queueActionLocked方法将对象添加到队列中。该方法在锁定的情况下进行执行,将操作类型设置为Updated,并将要更新的对象传递给它,解锁,返回结果。

Delete方法

用于删除队列中的对象。Delete就像Add一样,但是是一个Deleted Delta。如果给定
对象不存在,它将被忽略。

func (f *DeltaFIFO) Delete(obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    if f.knownObjects == nil {
        if _, exists := f.items[id]; !exists {
            return nil
        }
    } else {
        _, exists, err := f.knownObjects.GetByKey(id)
        _, itemsExist := f.items[id]
        if err == nil && !exists && !itemsExist {
            return nil
        }
    }
    return f.queueActionLocked(Deleted, obj)
}
  • 使用f.KeyOf方法获取要删除对象的键 id,并检查是否有错误。如果有错误,则返回KeyError类型的错误,其中包含相关的对象和错误信息。
  • 获取锁,标记已填充。
  • 如果f.knownObjects为nil,则检查items中是否存在该对象的键。如果不存在,可能已删除,直接返回nil。
  • 如果f.knownObjects不为 nil,则同时检查knownObjectsitems。只有当对象在knownObjects中不存在且items中也没有对应项时,才认为该对象可能在之前已经被删除,直接返回nil。
  • 如果对象存在于items和/或knownObjects中,那么将执行删除操作,调用queueActionLocked方法将对象添加到队列中。将操作类型设置为Deleted,并将要删除的对象传递给它,解锁,返回结果。

AddIfNotPresent方法

用于在对象不存在时添加对象。这在单个生产者/消费者场景中很有用,这样消费者可以安全地重试项目,而不会与生产者竞争,也不会使过期的项目重新排队。

func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
    deltas, ok := obj.(Deltas)
    if !ok {
        return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
    }
    id, err := f.KeyOf(deltas)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.addIfNotPresent(id, deltas)
    return nil
}
  • 首先方法将传入的obj参数断言为Deltas类型。如果断言失败,则返回一个错误,对象必须是Deltas类型。
  • 调用KeyOf方法,从deltas中获取对象的键。如果有错误则返回KeyError错误。
  • 获取锁,调用addIfNotPresent方法,将对象添加到队列中。
  • 释放锁,返回nil,表示成功添加对象。

addIfNotPresent方法

addIfNotPresent方法是私有方法,只能在DeltaFIFO类内部调用。用于将一组与特定id关联的变更(deltas)插入到队列中,前提是这个id在队列中不存在,且调用者已经持有了DeltaFIFO对象的锁。

func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
    f.populated = true
    if _, exists := f.items[id]; exists {
        return
    }

    f.queue = append(f.queue, id)
    f.items[id] = deltas
    f.cond.Broadcast()
}
  • 标记已填充。检查id是否已经存在于队列的items映射中。如果存在,方法会立即返回,不做任何更改。这确保了每个id及其关联的deltas只被添加一次,防止重复。
  • 如果不存在,将id添加到队列queue中。queue是一个切片,维护着添加到队列中的id及其关联的deltas的顺序。
  • 以id为索引,将deltas对象插入到items中。
  • 调用cond.Broadcast()。广播会唤醒所有等待此条件的goroutine,表示队列中有新数据可用。

dedupDeltas函数

用于合并最近两个相同的事件。

func dedupDeltas(deltas Deltas) Deltas {
    n := len(deltas)
    if n < 2 {
        return deltas
    }
    a := &deltas[n-1]
    b := &deltas[n-2]
    if out := isDup(a, b); out != nil {
        deltas[n-2] = *out
        return deltas[:n-1]
    }
    return deltas
}
  • 首先检查事件更新切片中的事件数量。如果少于2个,则无需合并,直接返回原切片。
  • 获取切片中最后两个事件更新,并分别赋值给变量a和b。
  • 调用isDup函数来判断这两个事件是否相同。如果相同,则需要合并,将第二个事件更新为合并后的事件,并返回去除了最后一个事件的切片。
  • 如果不同,直接返回nil。

isDup函数

判断两个事件更新是否相同,并返回应该保留的事件更新。

func isDup(a, b *Delta) *Delta {
    if out := isDeletionDup(a, b); out != nil {
        return out
    }
    return nil
}

调用isDeletionDup函数判断这两个事件是否为相同的删除事件。如果是,返回合并后的对象out,如果不是,返回nil。

isDeletionDup函数

在两个事件都是删除事件的情况下,保留信息更全的事件更新。

func isDeletionDup(a, b *Delta) *Delta {
    if b.Type != Deleted || a.Type != Deleted {
        return nil
    }
    if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
        return a
    }
    return b
}
  • 首先检查两个事件是否都是删除事件。如果不是,则直接返回 nil。
  • 如果第二个事件的对象是DeletedFinalStateUnknown类型,则说明它的信息可能不完整,因此保留第一个事件。否则,保留第二个事件。

queueActionLocked方法

用于在加锁的情况下处理对象的增量更新操作,并确保队列中不会有重复的更新。

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    if f.transformer != nil {
        var err error
        obj, err = f.transformer(obj)
        if err != nil {
            return err
        }
    }

    oldDeltas := f.items[id]
    newDeltas := append(oldDeltas, Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else {
        if oldDeltas == nil {
            klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
            return nil
        }
        klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
        f.items[id] = newDeltas
        return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
    }
    return nil
}
  • 首先,通过调用f.KeyOf(obj)方法获取对象的键(ID)。如果无法获取,返回一个KeyError错误。
  • 如果DeltaFIFO对象的transformer字段不为 nil,则在对象上应用这个转换函数。这一步骤允许在对象被处理之前对其进行修改或标准化。如果转换过程中出现错误,返回该错误。
  • 获取与对象键关联的当前操作列表oldDeltas,并将新的操作(由 actionType 和 obj 组成的 Delta)追加到列表中。然后,调用dedupDeltas函数去重。
  • 如果合并后的newDeltas不为空:
    • 并且对象的键不在队列中,则将键添加到队列中,即把对象obj入队。
    • 无论键是否已存在,都将更新后的操作列表存储回f.items中。
    • 调用f.cond.Broadcast()通知所有等待的协程,队列已经更新。
  • 按照dedupDeltas函数的设计,给定非空列表时,它不应返回空列表。如果发生这种情况,记录错误信息,并根据oldDeltas是否为 nil 采取不同的处理方式。

list方法

List和ListKeys,用于获取当前队列中对象的列表和键列表。listLocked获取对象最新的增量,添加到列表中。

func (f *DeltaFIFO) List() []interface{} {
    f.lock.RLock()
    defer f.lock.RUnlock()
    return f.listLocked()
}

func (f *DeltaFIFO) listLocked() []interface{} {
    list := make([]interface{}, 0, len(f.items))
    for _, item := range f.items {
        list = append(list, item.Newest().Object)
    }
    return list
}

func (f *DeltaFIFO) ListKeys() []string {
    f.lock.RLock()
    defer f.lock.RUnlock()
    list := make([]string, 0, len(f.queue))
    for _, key := range f.queue {
        list = append(list, key)
    }
    return list
}

Get方法

Get和GetByKey,用于根据对象或键获取存储在队列中的数据。

func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
    key, err := f.KeyOf(obj)
    if err != nil {
        return nil, false, KeyError{obj, err}
    }
    return f.GetByKey(key)
}

func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
    f.lock.RLock()
    defer f.lock.RUnlock()
    d, exists := f.items[key]
    if exists {
        d = copyDeltas(d)
    }
    return d, exists, nil
}

消费者方法

设计思想

  • Pop方法会阻塞调用线程,直到队列中有可用项。
  • 如果有多个items可用,它们将按照被添加或更新到队列中的顺序返回。确保了顺序性。
  • 当item通过Pop函数返回时,它也会从队列和底层存储中移除。这防止了项目被多次处理。然而,如果出于某种原因处理失败,则需要使用AddIfNotPresent()重新添加item到队列中,以确保它稍后得到处理。
  • 在有锁的情况下调用process函数。
  • process函数可能会返回一个特殊错误(ErrRequeue)以指示当前项目应该被重新排队。这实际上与在锁下调用AddIfNotPresent()相同,确保了原子性。
  • process函数应避免执行耗时的I/O操作,以防止阻塞其他队列操作(如 Add() 和 Get())过长时间。
  • Pop函数返回一个Deltas对象,其中包含了对象在队列中时发生的所有更改(deltas)的完整列表。

Pop方法

Pop方法从队列中移除并返回队列最前端的元素。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            if f.closed {
                return nil, ErrFIFOClosed
            }

            f.cond.Wait()
        }
        isInInitialList := !f.hasSynced_locked()
        id := f.queue[0]
        f.queue = f.queue[1:]
        depth := len(f.queue)
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        item, ok := f.items[id]
        if !ok {
            klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
            continue
        }
        delete(f.items, id)
        if depth > 10 {
            trace := utiltrace.New("DeltaFIFO Pop Process",
                utiltrace.Field{Key: "ID", Value: id},
                utiltrace.Field{Key: "Depth", Value: depth},
                utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
            defer trace.LogIfLong(100 * time.Millisecond)
        }
        err := process(item, isInInitialList)
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        return item, err
    }
}
  • 首先加锁,返回前解锁。
  • 如果队列为空,则调用f.cond.Wait()进行等待直到队列非空或被关闭。如果队列被关闭,返回ErrFIFOClosed错误。
  • 调用f.hasSynced_locked()方法判断item是否处于初始列表中(isInInitialList)。
  • 队列处理:
    • 从队列`f.queue 中取出最前端的元素id,并从队列中移除该键。
    • 更新队列的深度depth,即队列中剩余项目的数量。
    • 如果仍然处于初始列表中,将初始列表计数 f.initialPopulationCount 减一。
    • 检查元素是否在f.items中,如果不在(这不应该发生),则记录错误并继续处理下一个元素。
    • f.items中删除该元素,以防止重复处理。
  • 如果队列深度大于10,并且处理单个元素的时间超过100毫秒,则记录性能追踪信息。这有助于诊断处理缓慢的问题。
  • 调用传入的process函数处理元素。如果处理过程中返回了ErrRequeue错误,表示该元素需要重新加入队列,调用f.addIfNotPresent方法将元素添加到队列中。
  • 返回处理的元素item和错误err。

process函数

1.process回调函数的定义是在SharedInformerRun函数中,源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/shared_informer.go

file

2.可以看到process函数就是s.HandleDeltas,即SharedInformerHandleDeltas函数。

3.Run函数会调用controllerRun函数,该函数会调用的controllerprocessLoop函数。processLoop函数负责从DeltaFIFO队列中循环取出数据。源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/controller.go#L129

file

4.processLoop函数会调用PopProcessFunc函数。

file

PopProcessFunc函数的定义在fifo.go中:
https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/fifo.go

type PopProcessFunc func(obj interface{}, isInInitialList bool) error

5.PopProcessFunc是一个函数类型,接收两个参数,obj interface{}:这个参数代表从队列中弹出的项目。isInInitialList bool:这个布尔参数表示pop的项目是否来自队列的初始列表。

6.PopProcessFunc函数调用了c.config.Process方法对pop弹出的元素进行处理。

7.c.config.Process方法就是上面的HandleDeltas函数。

func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()

    if deltas, ok := obj.(Deltas); ok {
        return processDeltas(s, s.indexer, deltas, isInInitialList)
    }
    return errors.New("object given as Process argument is not Deltas")
}

8.HandleDeltas函数中如果obj类型断言成功,则调用了controllerprocessDeltas函数处理队列。源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/controller.go#L436

file

9.processDeltas函数遍历变化列表,对于Sync、Replaced、Added 和 Updated类型,函数会尝试从clientState也就是HandleDeltas函数传入的indexer中获取旧对象信息,如果存在则进行更新操作,否则进行添加操作,并调用相应的事件处理方法。对于Deleted类型,函数会直接删除对象,并调用相应的事件处理方法。也就是SharedInformerOnAdd,OnUpdate,OnDelete方法。

file

10.这样数据就从DeltaFIFO中更新到了本地存储indexer中。

同步方法

Resync方法会将Indexer本地存储中的资源对象同步到DeltaFIFO中,并将这些资源对象设置为Sync的操作类型,Resync函数在Reflector中定时执行,执行周期由NewRelector函数传入的resyncPeriod参数设定。

Resync方法是否执行是在reflector中定义的r.store.Resync(),上篇文章中已经介绍过。源码地址: https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/reflector.go

file

Resync方法

用于重新同步已知对象的状态。

func (f *DeltaFIFO) Resync() error {
    f.lock.Lock()
    defer f.lock.Unlock()

    if f.knownObjects == nil {
        return nil
    }

    keys := f.knownObjects.ListKeys()
    for _, k := range keys {
        if err := f.syncKeyLocked(k); err != nil {
            return err
        }
    }
    return nil
}

func (f *DeltaFIFO) syncKeyLocked(key string) error {
    obj, exists, err := f.knownObjects.GetByKey(key)
    if err != nil {
        klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
        return nil
    } else if !exists {
        klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
        return nil
    }

    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    if len(f.items[id]) > 0 {
        return nil
    }

    if err := f.queueActionLocked(Sync, obj); err != nil {
        return fmt.Errorf("couldn't queue object: %v", err)
    }
    return nil
}
  • 加锁,返回前解锁。
  • f.knownObjects.GetByKey(key)Indexer本地存储对象,通过该对象可以获取client-go目前存储的所有资源对象。
  • 如果knownObjectsnil,则方法直接返回,因为没有对象需要同步。
  • 使用ListKeys方法获取所有已知对象的键(key)列表。
  • 遍历每个键,调用syncKeyLocked方法进行同步。
  • syncKeyLocked方法首先使用键key从knownObjects中获取对象。如果对象不存在或者获取时出现错误,则记录日志并跳过该对象。
  • 如果对象已经存在于处理队列中(即items中已有该对象的事件),则不再重复添加。
  • 如果对象不在处理队列中,则使用操作类型Sync和对象调用queueActionLocked方法将其添加到队列中。

Replace方法

用于将给定的对象列表以SyncReplace类型的事件添加到队列中,或根据已有对象的键进行删除操作。

func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))

    action := Sync
    if f.emitDeltaTypeReplaced {
        action = Replaced
    }

    for _, item := range list {
        key, err := f.KeyOf(item)
        if err != nil {
            return KeyError{item, err}
        }
        keys.Insert(key)
        if err := f.queueActionLocked(action, item); err != nil {
            return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }

    queuedDeletions := 0
    for k, oldItem := range f.items {
        if keys.Has(k) {
            continue
        }
        var deletedObj interface{}
        if n := oldItem.Newest(); n != nil {
            deletedObj = n.Object

            if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
                deletedObj = d.Obj
            }
        }
        queuedDeletions++
        if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
        }
    }

    if f.knownObjects != nil {
        knownKeys := f.knownObjects.ListKeys()
        for _, k := range knownKeys {
            if keys.Has(k) {
                continue
            }
            if len(f.items[k]) > 0 {
                continue
            }

            deletedObj, exists, err := f.knownObjects.GetByKey(k)
            if err != nil {
                deletedObj = nil
                klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
            } else if !exists {
                deletedObj = nil
                klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
            }
            queuedDeletions++
            if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
                return err
            }
        }
    }

    if !f.populated {
        f.populated = true
        f.initialPopulationCount = keys.Len() + queuedDeletions
    }

    return nil
}
  • 加锁,返回前解锁。
  • 创建一个集合keys,用于存储列表中对象的键。
  • 根据f.emitDeltaTypeReplaced的值决定是使用Sync还是Replace作为操作类型。
  • 遍历对象列表,获取对象的键key,并将键插入集合keys。然后调用f.queueActionLocked方法,将其以Sync或者Replace类型的事件添加到队列中。如果添加操作失败,则返回错误。
  • 遍历f.items中的对象,对于不在新列表中的对象,将其作为Deleted类型的事件添加到队列中。如果对象是DeletedFinalStateUnknown类型,则提取并使用实际对象。
  • 如果f.knownObjects不为空,则对其进行遍历:
    • 获取f.knownObjects中所有的键列表knownKeys
    • 遍历键列表,如果键在集合keys中,或者键对应的f.items中的Delta列表不为空,则跳过该键。
    • 否则,尝试从f.knownObjects中根据键获取对象deletedObj,如果出现错误,则将deletedObj设置为 nil,并记录错误日志。如果对象不存在,则将deletedObj设置为 nil,并记录信息日志。累加计数器queuedDeletions。然后使用操作类型DeletedDeletedFinalStateUnknown对象调用f.queueActionLocked方法,将删除操作添加到队列中。
  • 如果DeltaFIFO之前未被标记为已填充(f.populated 为 false),则将其标记为已填充,并更新f.initialPopulationCount为新列表中的对象数加上队列中的删除事件数。
0 0 投票数
文章评分
订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部
0
希望看到您的想法,请您发表评论x