简介
从 Reflector 中获取到的资源事件放入 DeltaFIFO 先进先出队列中。DeltaFIFO 使得客户端能够以高效和一致的方式处理资源状态的变化,是实现Kubernetes控制器模式的关键组件之一。
设计思想
Deltas
与普通的 FIFO 队列直接存储对象不同,DeltaFIFO 针对每个对象的键(Key)维护一个 Deltas 列表,即该对象的一系列变化(Delta)。当一个新的对象变化需要被加入到队列时,它会被追加到相应键的 Deltas 列表中,除非这个变化是一个删除操作,并且 Deltas 列表已经以一个删除操作结束。这种设计使得消费者在处理一个对象时,可以看到自上次处理以来该对象的所有变化。
替换(Replaced)和同步(Sync)
DeltaFIFO 引入了两种额外的对象应用方式:替换(Replaced)和同步(Sync)。如果 EmitDeltaTypeReplaced 选项没有被设置为 true,那么在替换事件中将使用 Sync 以保持向后兼容。Sync 也用于定期的重新同步事件。
生产者-消费者队列
DeltaFIFO 设计为一个生产者-消费者队列,其中 Reflector 作为生产者,负责监视Kubernetes资源并将变化事件放入队列。消费者通过调用 Pop() 方法来处理这些事件。

使用场景
DeltaFIFO 适用于以下场景:
- 你希望最多只处理每个对象变化(Delta)一次。
- 当你处理一个对象时,你希望看到自上次处理以来发生在该对象上的所有变化。
- 你希望能够处理某些对象的删除。
- 你可能希望定期重新处理对象。
方法返回值
DeltaFIFO 的 Pop()、Get() 和 GetByKey() 方法返回 interface{} 类型以满足 Store/Queue 接口的要求,但它们总是返回 Deltas 类型的对象。List() 方法返回 FIFO 中每个累加器的最新对象。
已知对象
DeltaFIFO 的 knownObjects KeyListerGetter 提供了列出存储键和通过存储键获取对象的能力。这些对象被称为“已知对象”,它们的集合会以不同的方式修改 Delete、Replace 和 Resync 方法的行为。
线程安全性
关于线程安全性的说明:如果你从多个线程并行调用 Pop(),可能会导致多个线程处理同一个对象的略微不同版本。因此,在使用 DeltaFIFO 时需要注意同步和并发处理的策略。
源码
DeltaFIFOOptions结构体
是DeltaFIFO的配置选项。
type DeltaFIFOOptions struct {
KeyFunction KeyFunc
KnownObjects KeyListerGetter
EmitDeltaTypeReplaced bool
Transformer TransformFunc
}
- KeyFunction:
KeyFunc类型;用于确定对象应该有什么键(Key)。这个函数在返回的DeltaFIFO的KeyOf()方法中被暴露出来,其中还包括了对已删除对象和队列状态的额外处理;可选。如果不设置,默认使用MetaNamespaceKeyFunc。 - KnownObjects:
KeyListerGetter类型;预期返回一个消费者“已知”的键(Key)列表。它用于在调用Replace()时决定哪些项是缺失的;对于缺失的项,会产生‘Deleted’ deltas。如果你可以容忍在Replace()操作中遗漏删除操作,KnownObjects可以为nil;可选。 - EmitDeltaTypeReplaced:
bool类型;Replaced DeltaType的作用。在添加了Replaced事件类型之前,对Replace()的调用与Sync()的处理方式相同。出于向后兼容的目的,默认情况下此选项为false。当为true时,对于传递给Replace()调用的项,将发送Replaced事件,当为false时,将发送Sync事件;可选,默认为false。 - Transformer:
TransformFunc类型;如果设置,将在将对象入队之前调用此函数;可选。
DeltaFIFO结构体
队列的定义。
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
items map[string]Deltas
queue []string
populated bool
initialPopulationCount int
keyFunc KeyFunc
knownObjects KeyListerGetter
closed bool
emitDeltaTypeReplaced bool
transformer TransformFunc
}
- lock 和 cond:这两个字段用于保护对
items和queue的访问。lock是一个读写锁,用于在修改或访问这些字段时保证线程安全。cond是一个条件变量,用于在队列为空时阻塞等待,直到有新的元素被添加进来。 - items:这是一个映射,将一个键映射到一组变化(Deltas)。每组变化至少包含一个变化(Delta)。这些变化代表了对应的 Kubernetes 资源对象的增加、更新或删除操作。
- queue:这是一个字符串数组,用于按照先进先出(FIFO)的顺序维护键的消费。队列中没有重复的键。一个键在
queue中当且仅当它也在items中。 - populated 和 initialPopulationCount:这两个字段用于跟踪队列的初始化状态。
populated表示是否已经完成了第一批通过Replace()方法插入的元素的填充。initialPopulationCount是第一次调用Replace()方法时插入的元素数量。 - keyFunc:同上。
- knownObjects:同上。
- closed:这是一个布尔值,用于指示队列是否已关闭,以便控制循环可以在队列为空时退出。目前,它不用于控制任何增删改查(CRUD)操作的访问。
- emitDeltaTypeReplaced:同上。
- transformer:同上。
TransformFunc类型
允许在对象被处理之前对其进行转换。这个功能的设计初衷是为了在不影响对象处理逻辑的前提下,对对象进行预处理,比如清理对象的某些部分以减少内存使用,或者调整对象的结构以适应特定的处理需求。
type TransformFunc func(interface{}) (interface{}, error)
TransformFunc 被调用时,是在将对象插入到通知队列中,因此这个函数的性能非常重要,请避免执行耗时的操作。这是为了确保系统的高效性能,避免因为转换操作而导致的性能瓶颈。
DeltaType类型
变化的类型。Replaced:当遇到观察错误并且不得不重新列出时发出。此时,我们不知道替换的对象是否已经改变。只有当选项 EmitDeltaTypeReplaced 为真时,才会发出 Replaced。
type DeltaType string
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced"
Sync DeltaType = "Sync"
)
DeletedFinalStateUnknown类型
用于表示一个已删除对象的最终状态未知的情况。这种情况可能发生在与apiserver的连接断开时错过了删除事件。这个类型包含一个Key字段,表示对象的键,和一个Obj字段,可能包含对象的一个旧版本。
type DeletedFinalStateUnknown struct {
Key string
Obj interface{}
}
Delta结构体
Delta 是 Deltas(一个 Delta 对象的列表)的成员,Deltas 是 DeltaFIFO 存储的类型。它告诉你发生了什么变化,以及在这个变化之后对象的状态。除非这个变化是删除,那么你将得到对象在被删除之前的最终状态。
type Delta struct {
Type DeltaType
Object interface{}
}
type Deltas []Delta
KeyListerGetter接口
这是一个组合接口,它结合了KeyLister和KeyGetter两个接口的功能。任何实现了KeyListerGetter接口的类型都必须能够列出键并根据给定的键获取值。
type KeyListerGetter interface {
KeyLister
KeyGetter
}
KeyLister接口
接口定义了一个方法ListKeys,该方法返回一个字符串切片,包含所有的键。
type KeyLister interface {
ListKeys() []string
}
KeyGetter接口
接口定义了一个方法GetByKey,该方法接受一个键作为参数,并返回与该键关联的值、一个布尔值(表示该键是否存在)以及一个错误。
type KeyGetter interface {
GetByKey(key string) (value interface{}, exists bool, err error)
}
Oldest函数
它返回切片中的第一个Delta对象,即最旧的变化。如果切片为空,它返回 nil。
func (d Deltas) Oldest() *Delta {
if len(d) > 0 {
return &d[0]
}
return nil
}
Newest函数
返回切片中的最后一个Delta对象,即最新的变化。如果切片为空,它也返回 nil。
func (d Deltas) Newest() *Delta {
if n := len(d); n > 0 {
return &d[n-1]
}
return nil
}
copyDeltas函数
函数接受一个Deltas切片作为参数,并返回这个切片的一个浅拷贝。这意味着它拷贝了切片本身,但没有拷贝切片中的Delta对象。
func copyDeltas(d Deltas) Deltas {
d2 := make(Deltas, len(d))
copy(d2, d)
return d2
}
NewDeltaFIFO函数
函数返回一个可以用来处理项目更改的队列。已被弃用,改用下面的 NewDeltaFIFOWithOptions。
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: keyFunc,
KnownObjects: knownObjects,
})
}
NewDeltaFIFOWithOptions函数
函数返回一个可以用来处理项目更改的队列。
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
transformer: opts.Transformer,
}
f.cond.L = &f.lock
return f
}
var (
_ = Queue(&DeltaFIFO{})
)
var (
ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
)
- 函数接受一个参数:
opts,它是DeltaFIFOOptions类型。如果opts.KeyFunction为空,则将其设置为MetaNamespaceKeyFunc。 - 函数创建了一个新的
DeltaFIFO对象f,并设置了其各种属性,包括items,queue,keyFunc,knownObjects,emitDeltaTypeReplaced和transformer。然后,它将f.cond.L设置为f.lock,以便进行同步,并返回f。 - 代码还定义了两个变量。第一个变量是一个断言,确认
DeltaFIFO是一个队列。第二个变量ErrZeroLengthDeltasObject是一个错误,当遇到长度为0的Deltas对象时返回。
Close方法
关闭队列。
它首先锁定f.lock,然后在函数结束时解锁。然后,它将f.closed设置为true,表示队列已经关闭。最后,它调用f.cond.Broadcast()来唤醒所有等待在f.cond上的goroutine。
func (f *DeltaFIFO) Close() {
f.lock.Lock()
defer f.lock.Unlock()
f.closed = true
f.cond.Broadcast()
}
KeyOf方法
用来获取对象的键值。
它首先检查传入的对象是否是Deltas类型,并且如果长度为0,那么返回一个错误。如果不是0,那么将对象更新为Deltas中最新的对象。然后,它检查对象是否是DeletedFinalStateUnknown类型,如果是,那么直接返回其键值。如果都不是,那么调用f.keyFunc方法来获取对象的键值。
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
if d, ok := obj.(Deltas); ok {
if len(d) == 0 {
return "", KeyError{obj, ErrZeroLengthDeltasObject}
}
obj = d.Newest().Object
}
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return f.keyFunc(obj)
}
HasSynced方法
HasSynced用于判断是否已经完成同步。它会检查是否调用过Add、Update、Delete、AddIfNotPresent操作,或者Replace()插入的第一批项目是否已经被处理(从队列中弹出)。
hasSynced_locked用于在已经获取锁的情况下判断是否已完成同步。返回一个布尔值,判断DeltaFIFO对象的populated字段是否为true,即是否已经被填充过数据。并且initialPopulationCount字段是否为0,即初始填充的所有项目都已经被处理。
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.hasSynced_locked()
}
func (f *DeltaFIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
}
生产者方法
Add方法
用于向队列中添加新的对象。
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)
}
获取锁,标记已填充,调用queueActionLocked方法将对象添加到队列中。该方法在锁定的情况下进行执行,将操作类型设置为Added,并将要添加的对象传递给它,解锁,返回结果。
Update方法
用于更新队列中的对象。
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj)
}
获取锁,标记已填充,调用queueActionLocked方法将对象添加到队列中。该方法在锁定的情况下进行执行,将操作类型设置为Updated,并将要更新的对象传递给它,解锁,返回结果。
Delete方法
用于删除队列中的对象。Delete就像Add一样,但是是一个Deleted Delta。如果给定
对象不存在,它将被忽略。
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
if f.knownObjects == nil {
if _, exists := f.items[id]; !exists {
return nil
}
} else {
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
return nil
}
}
return f.queueActionLocked(Deleted, obj)
}
- 使用
f.KeyOf方法获取要删除对象的键 id,并检查是否有错误。如果有错误,则返回KeyError类型的错误,其中包含相关的对象和错误信息。 - 获取锁,标记已填充。
- 如果
f.knownObjects为nil,则检查items中是否存在该对象的键。如果不存在,可能已删除,直接返回nil。 - 如果
f.knownObjects不为 nil,则同时检查knownObjects和items。只有当对象在knownObjects中不存在且items中也没有对应项时,才认为该对象可能在之前已经被删除,直接返回nil。 - 如果对象存在于
items和/或knownObjects中,那么将执行删除操作,调用queueActionLocked方法将对象添加到队列中。将操作类型设置为Deleted,并将要删除的对象传递给它,解锁,返回结果。
AddIfNotPresent方法
用于在对象不存在时添加对象。这在单个生产者/消费者场景中很有用,这样消费者可以安全地重试项目,而不会与生产者竞争,也不会使过期的项目重新排队。
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
deltas, ok := obj.(Deltas)
if !ok {
return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
}
id, err := f.KeyOf(deltas)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.addIfNotPresent(id, deltas)
return nil
}
- 首先方法将传入的
obj参数断言为Deltas类型。如果断言失败,则返回一个错误,对象必须是Deltas类型。 - 调用
KeyOf方法,从deltas中获取对象的键。如果有错误则返回KeyError错误。 - 获取锁,调用
addIfNotPresent方法,将对象添加到队列中。 - 释放锁,返回nil,表示成功添加对象。
addIfNotPresent方法
addIfNotPresent方法是私有方法,只能在DeltaFIFO类内部调用。用于将一组与特定id关联的变更(deltas)插入到队列中,前提是这个id在队列中不存在,且调用者已经持有了DeltaFIFO对象的锁。
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
f.populated = true
if _, exists := f.items[id]; exists {
return
}
f.queue = append(f.queue, id)
f.items[id] = deltas
f.cond.Broadcast()
}
- 标记已填充。检查id是否已经存在于队列的
items映射中。如果存在,方法会立即返回,不做任何更改。这确保了每个id及其关联的deltas只被添加一次,防止重复。 - 如果不存在,将id添加到队列
queue中。queue是一个切片,维护着添加到队列中的id及其关联的deltas的顺序。 - 以id为索引,将
deltas对象插入到items中。 - 调用
cond.Broadcast()。广播会唤醒所有等待此条件的goroutine,表示队列中有新数据可用。
dedupDeltas函数
用于合并最近两个相同的事件。
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
deltas[n-2] = *out
return deltas[:n-1]
}
return deltas
}
- 首先检查事件更新切片中的事件数量。如果少于2个,则无需合并,直接返回原切片。
- 获取切片中最后两个事件更新,并分别赋值给变量a和b。
- 调用
isDup函数来判断这两个事件是否相同。如果相同,则需要合并,将第二个事件更新为合并后的事件,并返回去除了最后一个事件的切片。 - 如果不同,直接返回nil。
isDup函数
判断两个事件更新是否相同,并返回应该保留的事件更新。
func isDup(a, b *Delta) *Delta {
if out := isDeletionDup(a, b); out != nil {
return out
}
return nil
}
调用isDeletionDup函数判断这两个事件是否为相同的删除事件。如果是,返回合并后的对象out,如果不是,返回nil。
isDeletionDup函数
在两个事件都是删除事件的情况下,保留信息更全的事件更新。
func isDeletionDup(a, b *Delta) *Delta {
if b.Type != Deleted || a.Type != Deleted {
return nil
}
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
- 首先检查两个事件是否都是删除事件。如果不是,则直接返回 nil。
- 如果第二个事件的对象是
DeletedFinalStateUnknown类型,则说明它的信息可能不完整,因此保留第一个事件。否则,保留第二个事件。
queueActionLocked方法
用于在加锁的情况下处理对象的增量更新操作,并确保队列中不会有重复的更新。
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if f.transformer != nil {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
}
}
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
return nil
}
- 首先,通过调用
f.KeyOf(obj)方法获取对象的键(ID)。如果无法获取,返回一个KeyError错误。 - 如果
DeltaFIFO对象的transformer字段不为 nil,则在对象上应用这个转换函数。这一步骤允许在对象被处理之前对其进行修改或标准化。如果转换过程中出现错误,返回该错误。 - 获取与对象键关联的当前操作列表
oldDeltas,并将新的操作(由 actionType 和 obj 组成的 Delta)追加到列表中。然后,调用dedupDeltas函数去重。 - 如果合并后的
newDeltas不为空:- 并且对象的键不在队列中,则将键添加到队列中,即把对象obj入队。
- 无论键是否已存在,都将更新后的操作列表存储回
f.items中。 - 调用
f.cond.Broadcast()通知所有等待的协程,队列已经更新。
- 按照
dedupDeltas函数的设计,给定非空列表时,它不应返回空列表。如果发生这种情况,记录错误信息,并根据oldDeltas是否为 nil 采取不同的处理方式。
list方法
List和ListKeys,用于获取当前队列中对象的列表和键列表。listLocked获取对象最新的增量,添加到列表中。
func (f *DeltaFIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
}
func (f *DeltaFIFO) listLocked() []interface{} {
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
list = append(list, item.Newest().Object)
}
return list
}
func (f *DeltaFIFO) ListKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.queue))
for _, key := range f.queue {
list = append(list, key)
}
return list
}
Get方法
Get和GetByKey,用于根据对象或键获取存储在队列中的数据。
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.KeyOf(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
}
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
d, exists := f.items[key]
if exists {
d = copyDeltas(d)
}
return d, exists, nil
}
消费者方法
设计思想
- Pop方法会阻塞调用线程,直到队列中有可用项。
- 如果有多个items可用,它们将按照被添加或更新到队列中的顺序返回。确保了顺序性。
- 当item通过Pop函数返回时,它也会从队列和底层存储中移除。这防止了项目被多次处理。然而,如果出于某种原因处理失败,则需要使用
AddIfNotPresent()重新添加item到队列中,以确保它稍后得到处理。 - 在有锁的情况下调用process函数。
- process函数可能会返回一个特殊错误(ErrRequeue)以指示当前项目应该被重新排队。这实际上与在锁下调用
AddIfNotPresent()相同,确保了原子性。 - process函数应避免执行耗时的I/O操作,以防止阻塞其他队列操作(如 Add() 和 Get())过长时间。
- Pop函数返回一个
Deltas对象,其中包含了对象在队列中时发生的所有更改(deltas)的完整列表。
Pop方法
Pop方法从队列中移除并返回队列最前端的元素。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
- 首先加锁,返回前解锁。
- 如果队列为空,则调用
f.cond.Wait()进行等待直到队列非空或被关闭。如果队列被关闭,返回ErrFIFOClosed错误。 - 调用
f.hasSynced_locked()方法判断item是否处于初始列表中(isInInitialList)。 - 队列处理:
- 从队列`f.queue 中取出最前端的元素id,并从队列中移除该键。
- 更新队列的深度
depth,即队列中剩余项目的数量。 - 如果仍然处于初始列表中,将初始列表计数
f.initialPopulationCount减一。 - 检查元素是否在
f.items中,如果不在(这不应该发生),则记录错误并继续处理下一个元素。 - 从
f.items中删除该元素,以防止重复处理。
- 如果队列深度大于10,并且处理单个元素的时间超过100毫秒,则记录性能追踪信息。这有助于诊断处理缓慢的问题。
- 调用传入的process函数处理元素。如果处理过程中返回了
ErrRequeue错误,表示该元素需要重新加入队列,调用f.addIfNotPresent方法将元素添加到队列中。 - 返回处理的元素item和错误err。
process函数
1.process回调函数的定义是在SharedInformer的Run函数中,源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/shared_informer.go

2.可以看到process函数就是s.HandleDeltas,即SharedInformer的HandleDeltas函数。
3.Run函数会调用controller的Run函数,该函数会调用的controller的processLoop函数。processLoop函数负责从DeltaFIFO队列中循环取出数据。源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/controller.go#L129

4.processLoop函数会调用PopProcessFunc函数。

PopProcessFunc函数的定义在fifo.go中:
https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/fifo.go
type PopProcessFunc func(obj interface{}, isInInitialList bool) error
5.PopProcessFunc是一个函数类型,接收两个参数,obj interface{}:这个参数代表从队列中弹出的项目。isInInitialList bool:这个布尔参数表示pop的项目是否来自队列的初始列表。
6.PopProcessFunc函数调用了c.config.Process方法对pop弹出的元素进行处理。
7.而c.config.Process方法就是上面的HandleDeltas函数。
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
}
8.HandleDeltas函数中如果obj类型断言成功,则调用了controller的processDeltas函数处理队列。源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/controller.go#L436

9.processDeltas函数遍历变化列表,对于Sync、Replaced、Added 和 Updated类型,函数会尝试从clientState也就是HandleDeltas函数传入的indexer中获取旧对象信息,如果存在则进行更新操作,否则进行添加操作,并调用相应的事件处理方法。对于Deleted类型,函数会直接删除对象,并调用相应的事件处理方法。也就是SharedInformer的OnAdd,OnUpdate,OnDelete方法。

10.这样数据就从DeltaFIFO中更新到了本地存储indexer中。
同步方法
Resync方法会将Indexer本地存储中的资源对象同步到DeltaFIFO中,并将这些资源对象设置为Sync的操作类型,Resync函数在Reflector中定时执行,执行周期由NewRelector函数传入的resyncPeriod参数设定。
Resync方法是否执行是在reflector中定义的r.store.Resync(),上篇文章中已经介绍过。源码地址: https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/reflector.go

Resync方法
用于重新同步已知对象的状态。
func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
if f.knownObjects == nil {
return nil
}
keys := f.knownObjects.ListKeys()
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists {
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil
}
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if len(f.items[id]) > 0 {
return nil
}
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
- 加锁,返回前解锁。
f.knownObjects.GetByKey(key)是Indexer本地存储对象,通过该对象可以获取client-go目前存储的所有资源对象。- 如果
knownObjects为nil,则方法直接返回,因为没有对象需要同步。 - 使用
ListKeys方法获取所有已知对象的键(key)列表。 - 遍历每个键,调用
syncKeyLocked方法进行同步。 syncKeyLocked方法首先使用键key从knownObjects中获取对象。如果对象不存在或者获取时出现错误,则记录日志并跳过该对象。- 如果对象已经存在于处理队列中(即items中已有该对象的事件),则不再重复添加。
- 如果对象不在处理队列中,则使用操作类型
Sync和对象调用queueActionLocked方法将其添加到队列中。
Replace方法
用于将给定的对象列表以Sync或Replace类型的事件添加到队列中,或根据已有对象的键进行删除操作。
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
action := Sync
if f.emitDeltaTypeReplaced {
action = Replaced
}
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
queuedDeletions := 0
for k, oldItem := range f.items {
if keys.Has(k) {
continue
}
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
deletedObj = d.Obj
}
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if f.knownObjects != nil {
knownKeys := f.knownObjects.ListKeys()
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
if len(f.items[k]) > 0 {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
}
if !f.populated {
f.populated = true
f.initialPopulationCount = keys.Len() + queuedDeletions
}
return nil
}
- 加锁,返回前解锁。
- 创建一个集合
keys,用于存储列表中对象的键。 - 根据
f.emitDeltaTypeReplaced的值决定是使用Sync还是Replace作为操作类型。 - 遍历对象列表,获取对象的键
key,并将键插入集合keys。然后调用f.queueActionLocked方法,将其以Sync或者Replace类型的事件添加到队列中。如果添加操作失败,则返回错误。 - 遍历
f.items中的对象,对于不在新列表中的对象,将其作为Deleted类型的事件添加到队列中。如果对象是DeletedFinalStateUnknown类型,则提取并使用实际对象。 - 如果
f.knownObjects不为空,则对其进行遍历:- 获取
f.knownObjects中所有的键列表knownKeys。 - 遍历键列表,如果键在集合
keys中,或者键对应的f.items中的Delta列表不为空,则跳过该键。 - 否则,尝试从
f.knownObjects中根据键获取对象deletedObj,如果出现错误,则将deletedObj设置为 nil,并记录错误日志。如果对象不存在,则将deletedObj设置为 nil,并记录信息日志。累加计数器queuedDeletions。然后使用操作类型Deleted和DeletedFinalStateUnknown对象调用f.queueActionLocked方法,将删除操作添加到队列中。
- 获取
- 如果DeltaFIFO之前未被标记为已填充(f.populated 为 false),则将其标记为已填充,并更新
f.initialPopulationCount为新列表中的对象数加上队列中的删除事件数。