简介
从 Reflector
中获取到的资源事件放入 DeltaFIFO
先进先出队列中。DeltaFIFO
使得客户端能够以高效和一致的方式处理资源状态的变化,是实现Kubernetes控制器模式的关键组件之一。
设计思想
Deltas
与普通的 FIFO
队列直接存储对象不同,DeltaFIFO
针对每个对象的键(Key)维护一个 Deltas
列表,即该对象的一系列变化(Delta)。当一个新的对象变化需要被加入到队列时,它会被追加到相应键的 Deltas
列表中,除非这个变化是一个删除操作,并且 Deltas
列表已经以一个删除操作结束。这种设计使得消费者在处理一个对象时,可以看到自上次处理以来该对象的所有变化。
替换(Replaced)和同步(Sync)
DeltaFIFO
引入了两种额外的对象应用方式:替换(Replaced)和同步(Sync)。如果 EmitDeltaTypeReplaced
选项没有被设置为 true,那么在替换事件中将使用 Sync
以保持向后兼容。Sync
也用于定期的重新同步事件。
生产者-消费者队列
DeltaFIFO
设计为一个生产者-消费者队列,其中 Reflector
作为生产者,负责监视Kubernetes资源并将变化事件放入队列。消费者通过调用 Pop()
方法来处理这些事件。
使用场景
DeltaFIFO
适用于以下场景:
- 你希望最多只处理每个对象变化(Delta)一次。
- 当你处理一个对象时,你希望看到自上次处理以来发生在该对象上的所有变化。
- 你希望能够处理某些对象的删除。
- 你可能希望定期重新处理对象。
方法返回值
DeltaFIFO
的 Pop()、Get() 和 GetByKey()
方法返回 interface{}
类型以满足 Store/Queue
接口的要求,但它们总是返回 Deltas
类型的对象。List()
方法返回 FIFO 中每个累加器的最新对象。
已知对象
DeltaFIFO
的 knownObjects KeyListerGetter
提供了列出存储键和通过存储键获取对象的能力。这些对象被称为“已知对象”,它们的集合会以不同的方式修改 Delete、Replace 和 Resync
方法的行为。
线程安全性
关于线程安全性的说明:如果你从多个线程并行调用 Pop()
,可能会导致多个线程处理同一个对象的略微不同版本。因此,在使用 DeltaFIFO
时需要注意同步和并发处理的策略。
源码
DeltaFIFOOptions结构体
是DeltaFIFO的配置选项。
type DeltaFIFOOptions struct {
KeyFunction KeyFunc
KnownObjects KeyListerGetter
EmitDeltaTypeReplaced bool
Transformer TransformFunc
}
- KeyFunction:
KeyFunc
类型;用于确定对象应该有什么键(Key)。这个函数在返回的DeltaFIFO
的KeyOf()
方法中被暴露出来,其中还包括了对已删除对象和队列状态的额外处理;可选。如果不设置,默认使用MetaNamespaceKeyFunc
。 - KnownObjects:
KeyListerGetter
类型;预期返回一个消费者“已知”的键(Key)列表。它用于在调用Replace()
时决定哪些项是缺失的;对于缺失的项,会产生‘Deleted’ deltas
。如果你可以容忍在Replace()
操作中遗漏删除操作,KnownObjects
可以为nil
;可选。 - EmitDeltaTypeReplaced:
bool
类型;Replaced DeltaType
的作用。在添加了Replaced
事件类型之前,对Replace()
的调用与Sync()
的处理方式相同。出于向后兼容的目的,默认情况下此选项为false
。当为true
时,对于传递给Replace()
调用的项,将发送Replaced
事件,当为false
时,将发送Sync
事件;可选,默认为false
。 - Transformer:
TransformFunc
类型;如果设置,将在将对象入队之前调用此函数;可选。
DeltaFIFO结构体
队列的定义。
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
items map[string]Deltas
queue []string
populated bool
initialPopulationCount int
keyFunc KeyFunc
knownObjects KeyListerGetter
closed bool
emitDeltaTypeReplaced bool
transformer TransformFunc
}
- lock 和 cond:这两个字段用于保护对
items
和queue
的访问。lock
是一个读写锁,用于在修改或访问这些字段时保证线程安全。cond
是一个条件变量,用于在队列为空时阻塞等待,直到有新的元素被添加进来。 - items:这是一个映射,将一个键映射到一组变化(Deltas)。每组变化至少包含一个变化(Delta)。这些变化代表了对应的 Kubernetes 资源对象的增加、更新或删除操作。
- queue:这是一个字符串数组,用于按照先进先出(FIFO)的顺序维护键的消费。队列中没有重复的键。一个键在
queue
中当且仅当它也在items
中。 - populated 和 initialPopulationCount:这两个字段用于跟踪队列的初始化状态。
populated
表示是否已经完成了第一批通过Replace()
方法插入的元素的填充。initialPopulationCount
是第一次调用Replace()
方法时插入的元素数量。 - keyFunc:同上。
- knownObjects:同上。
- closed:这是一个布尔值,用于指示队列是否已关闭,以便控制循环可以在队列为空时退出。目前,它不用于控制任何增删改查(CRUD)操作的访问。
- emitDeltaTypeReplaced:同上。
- transformer:同上。
TransformFunc类型
允许在对象被处理之前对其进行转换。这个功能的设计初衷是为了在不影响对象处理逻辑的前提下,对对象进行预处理,比如清理对象的某些部分以减少内存使用,或者调整对象的结构以适应特定的处理需求。
type TransformFunc func(interface{}) (interface{}, error)
TransformFunc 被调用时,是在将对象插入到通知队列中,因此这个函数的性能非常重要,请避免执行耗时的操作。这是为了确保系统的高效性能,避免因为转换操作而导致的性能瓶颈。
DeltaType类型
变化的类型。Replaced
:当遇到观察错误并且不得不重新列出时发出。此时,我们不知道替换的对象是否已经改变。只有当选项 EmitDeltaTypeReplaced
为真时,才会发出 Replaced
。
type DeltaType string
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced"
Sync DeltaType = "Sync"
)
DeletedFinalStateUnknown类型
用于表示一个已删除对象的最终状态未知的情况。这种情况可能发生在与apiserver的连接断开时错过了删除事件。这个类型包含一个Key字段,表示对象的键,和一个Obj字段,可能包含对象的一个旧版本。
type DeletedFinalStateUnknown struct {
Key string
Obj interface{}
}
Delta结构体
Delta
是 Deltas
(一个 Delta 对象的列表)的成员,Deltas
是 DeltaFIFO
存储的类型。它告诉你发生了什么变化,以及在这个变化之后对象的状态。除非这个变化是删除,那么你将得到对象在被删除之前的最终状态。
type Delta struct {
Type DeltaType
Object interface{}
}
type Deltas []Delta
KeyListerGetter接口
这是一个组合接口,它结合了KeyLister
和KeyGetter
两个接口的功能。任何实现了KeyListerGetter
接口的类型都必须能够列出键并根据给定的键获取值。
type KeyListerGetter interface {
KeyLister
KeyGetter
}
KeyLister接口
接口定义了一个方法ListKeys
,该方法返回一个字符串切片,包含所有的键。
type KeyLister interface {
ListKeys() []string
}
KeyGetter接口
接口定义了一个方法GetByKey
,该方法接受一个键作为参数,并返回与该键关联的值、一个布尔值(表示该键是否存在)以及一个错误。
type KeyGetter interface {
GetByKey(key string) (value interface{}, exists bool, err error)
}
Oldest函数
它返回切片中的第一个Delta
对象,即最旧的变化。如果切片为空,它返回 nil。
func (d Deltas) Oldest() *Delta {
if len(d) > 0 {
return &d[0]
}
return nil
}
Newest函数
返回切片中的最后一个Delta
对象,即最新的变化。如果切片为空,它也返回 nil。
func (d Deltas) Newest() *Delta {
if n := len(d); n > 0 {
return &d[n-1]
}
return nil
}
copyDeltas函数
函数接受一个Deltas
切片作为参数,并返回这个切片的一个浅拷贝。这意味着它拷贝了切片本身,但没有拷贝切片中的Delta
对象。
func copyDeltas(d Deltas) Deltas {
d2 := make(Deltas, len(d))
copy(d2, d)
return d2
}
NewDeltaFIFO函数
函数返回一个可以用来处理项目更改的队列。已被弃用,改用下面的 NewDeltaFIFOWithOptions
。
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: keyFunc,
KnownObjects: knownObjects,
})
}
NewDeltaFIFOWithOptions函数
函数返回一个可以用来处理项目更改的队列。
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
transformer: opts.Transformer,
}
f.cond.L = &f.lock
return f
}
var (
_ = Queue(&DeltaFIFO{})
)
var (
ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
)
- 函数接受一个参数:
opts
,它是DeltaFIFOOptions
类型。如果opts.KeyFunction
为空,则将其设置为MetaNamespaceKeyFunc
。 - 函数创建了一个新的
DeltaFIFO
对象f
,并设置了其各种属性,包括items,queue,keyFunc,knownObjects,emitDeltaTypeReplaced和transformer
。然后,它将f.cond.L
设置为f.lock
,以便进行同步,并返回f
。 - 代码还定义了两个变量。第一个变量是一个断言,确认
DeltaFIFO
是一个队列。第二个变量ErrZeroLengthDeltasObject
是一个错误,当遇到长度为0的Deltas
对象时返回。
Close方法
关闭队列。
它首先锁定f.lock
,然后在函数结束时解锁。然后,它将f.closed
设置为true,表示队列已经关闭。最后,它调用f.cond.Broadcast()
来唤醒所有等待在f.cond
上的goroutine。
func (f *DeltaFIFO) Close() {
f.lock.Lock()
defer f.lock.Unlock()
f.closed = true
f.cond.Broadcast()
}
KeyOf方法
用来获取对象的键值。
它首先检查传入的对象是否是Deltas
类型,并且如果长度为0,那么返回一个错误。如果不是0,那么将对象更新为Deltas
中最新的对象。然后,它检查对象是否是DeletedFinalStateUnknown
类型,如果是,那么直接返回其键值。如果都不是,那么调用f.keyFunc
方法来获取对象的键值。
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
if d, ok := obj.(Deltas); ok {
if len(d) == 0 {
return "", KeyError{obj, ErrZeroLengthDeltasObject}
}
obj = d.Newest().Object
}
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return f.keyFunc(obj)
}
HasSynced方法
HasSynced
用于判断是否已经完成同步。它会检查是否调用过Add、Update、Delete、AddIfNotPresent
操作,或者Replace()
插入的第一批项目是否已经被处理(从队列中弹出)。
hasSynced_locked
用于在已经获取锁的情况下判断是否已完成同步。返回一个布尔值,判断DeltaFIFO
对象的populated
字段是否为true,即是否已经被填充过数据。并且initialPopulationCount
字段是否为0,即初始填充的所有项目都已经被处理。
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.hasSynced_locked()
}
func (f *DeltaFIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
}
生产者方法
Add方法
用于向队列中添加新的对象。
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)
}
获取锁,标记已填充,调用queueActionLocked
方法将对象添加到队列中。该方法在锁定的情况下进行执行,将操作类型设置为Added
,并将要添加的对象传递给它,解锁,返回结果。
Update方法
用于更新队列中的对象。
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj)
}
获取锁,标记已填充,调用queueActionLocked
方法将对象添加到队列中。该方法在锁定的情况下进行执行,将操作类型设置为Updated
,并将要更新的对象传递给它,解锁,返回结果。
Delete方法
用于删除队列中的对象。Delete
就像Add
一样,但是是一个Deleted Delta
。如果给定
对象不存在,它将被忽略。
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
if f.knownObjects == nil {
if _, exists := f.items[id]; !exists {
return nil
}
} else {
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
return nil
}
}
return f.queueActionLocked(Deleted, obj)
}
- 使用
f.KeyOf
方法获取要删除对象的键 id,并检查是否有错误。如果有错误,则返回KeyError
类型的错误,其中包含相关的对象和错误信息。 - 获取锁,标记已填充。
- 如果
f.knownObjects
为nil,则检查items
中是否存在该对象的键。如果不存在,可能已删除,直接返回nil。 - 如果
f.knownObjects
不为 nil,则同时检查knownObjects
和items
。只有当对象在knownObjects
中不存在且items
中也没有对应项时,才认为该对象可能在之前已经被删除,直接返回nil。 - 如果对象存在于
items
和/或knownObjects
中,那么将执行删除操作,调用queueActionLocked
方法将对象添加到队列中。将操作类型设置为Deleted
,并将要删除的对象传递给它,解锁,返回结果。
AddIfNotPresent方法
用于在对象不存在时添加对象。这在单个生产者/消费者场景中很有用,这样消费者可以安全地重试项目,而不会与生产者竞争,也不会使过期的项目重新排队。
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
deltas, ok := obj.(Deltas)
if !ok {
return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
}
id, err := f.KeyOf(deltas)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.addIfNotPresent(id, deltas)
return nil
}
- 首先方法将传入的
obj
参数断言为Deltas
类型。如果断言失败,则返回一个错误,对象必须是Deltas
类型。 - 调用
KeyOf
方法,从deltas
中获取对象的键。如果有错误则返回KeyError
错误。 - 获取锁,调用
addIfNotPresent
方法,将对象添加到队列中。 - 释放锁,返回nil,表示成功添加对象。
addIfNotPresent方法
addIfNotPresent
方法是私有方法,只能在DeltaFIFO
类内部调用。用于将一组与特定id关联的变更(deltas)插入到队列中,前提是这个id在队列中不存在,且调用者已经持有了DeltaFIFO
对象的锁。
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
f.populated = true
if _, exists := f.items[id]; exists {
return
}
f.queue = append(f.queue, id)
f.items[id] = deltas
f.cond.Broadcast()
}
- 标记已填充。检查id是否已经存在于队列的
items
映射中。如果存在,方法会立即返回,不做任何更改。这确保了每个id及其关联的deltas
只被添加一次,防止重复。 - 如果不存在,将id添加到队列
queue
中。queue
是一个切片,维护着添加到队列中的id及其关联的deltas
的顺序。 - 以id为索引,将
deltas
对象插入到items
中。 - 调用
cond.Broadcast()
。广播会唤醒所有等待此条件的goroutine,表示队列中有新数据可用。
dedupDeltas函数
用于合并最近两个相同的事件。
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
deltas[n-2] = *out
return deltas[:n-1]
}
return deltas
}
- 首先检查事件更新切片中的事件数量。如果少于2个,则无需合并,直接返回原切片。
- 获取切片中最后两个事件更新,并分别赋值给变量a和b。
- 调用
isDup
函数来判断这两个事件是否相同。如果相同,则需要合并,将第二个事件更新为合并后的事件,并返回去除了最后一个事件的切片。 - 如果不同,直接返回nil。
isDup函数
判断两个事件更新是否相同,并返回应该保留的事件更新。
func isDup(a, b *Delta) *Delta {
if out := isDeletionDup(a, b); out != nil {
return out
}
return nil
}
调用isDeletionDup
函数判断这两个事件是否为相同的删除事件。如果是,返回合并后的对象out,如果不是,返回nil。
isDeletionDup函数
在两个事件都是删除事件的情况下,保留信息更全的事件更新。
func isDeletionDup(a, b *Delta) *Delta {
if b.Type != Deleted || a.Type != Deleted {
return nil
}
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
- 首先检查两个事件是否都是删除事件。如果不是,则直接返回 nil。
- 如果第二个事件的对象是
DeletedFinalStateUnknown
类型,则说明它的信息可能不完整,因此保留第一个事件。否则,保留第二个事件。
queueActionLocked方法
用于在加锁的情况下处理对象的增量更新操作,并确保队列中不会有重复的更新。
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if f.transformer != nil {
var err error
obj, err = f.transformer(obj)
if err != nil {
return err
}
}
oldDeltas := f.items[id]
newDeltas := append(oldDeltas, Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
if oldDeltas == nil {
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
return nil
}
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
f.items[id] = newDeltas
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
}
return nil
}
- 首先,通过调用
f.KeyOf(obj)
方法获取对象的键(ID)。如果无法获取,返回一个KeyError
错误。 - 如果
DeltaFIFO
对象的transformer
字段不为 nil,则在对象上应用这个转换函数。这一步骤允许在对象被处理之前对其进行修改或标准化。如果转换过程中出现错误,返回该错误。 - 获取与对象键关联的当前操作列表
oldDeltas
,并将新的操作(由 actionType 和 obj 组成的 Delta)追加到列表中。然后,调用dedupDeltas
函数去重。 - 如果合并后的
newDeltas
不为空:- 并且对象的键不在队列中,则将键添加到队列中,即把对象obj入队。
- 无论键是否已存在,都将更新后的操作列表存储回
f.items
中。 - 调用
f.cond.Broadcast()
通知所有等待的协程,队列已经更新。
- 按照
dedupDeltas
函数的设计,给定非空列表时,它不应返回空列表。如果发生这种情况,记录错误信息,并根据oldDeltas
是否为 nil 采取不同的处理方式。
list方法
List和ListKeys,用于获取当前队列中对象的列表和键列表。listLocked获取对象最新的增量,添加到列表中。
func (f *DeltaFIFO) List() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
}
func (f *DeltaFIFO) listLocked() []interface{} {
list := make([]interface{}, 0, len(f.items))
for _, item := range f.items {
list = append(list, item.Newest().Object)
}
return list
}
func (f *DeltaFIFO) ListKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.queue))
for _, key := range f.queue {
list = append(list, key)
}
return list
}
Get方法
Get和GetByKey,用于根据对象或键获取存储在队列中的数据。
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.KeyOf(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
}
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
d, exists := f.items[key]
if exists {
d = copyDeltas(d)
}
return d, exists, nil
}
消费者方法
设计思想
- Pop方法会阻塞调用线程,直到队列中有可用项。
- 如果有多个items可用,它们将按照被添加或更新到队列中的顺序返回。确保了顺序性。
- 当item通过Pop函数返回时,它也会从队列和底层存储中移除。这防止了项目被多次处理。然而,如果出于某种原因处理失败,则需要使用
AddIfNotPresent()
重新添加item到队列中,以确保它稍后得到处理。 - 在有锁的情况下调用process函数。
- process函数可能会返回一个特殊错误(ErrRequeue)以指示当前项目应该被重新排队。这实际上与在锁下调用
AddIfNotPresent()
相同,确保了原子性。 - process函数应避免执行耗时的I/O操作,以防止阻塞其他队列操作(如 Add() 和 Get())过长时间。
- Pop函数返回一个
Deltas
对象,其中包含了对象在队列中时发生的所有更改(deltas)的完整列表。
Pop方法
Pop方法从队列中移除并返回队列最前端的元素。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
isInInitialList := !f.hasSynced_locked()
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
err := process(item, isInInitialList)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
- 首先加锁,返回前解锁。
- 如果队列为空,则调用
f.cond.Wait()
进行等待直到队列非空或被关闭。如果队列被关闭,返回ErrFIFOClosed
错误。 - 调用
f.hasSynced_locked()
方法判断item是否处于初始列表中(isInInitialList)。 - 队列处理:
- 从队列`f.queue 中取出最前端的元素id,并从队列中移除该键。
- 更新队列的深度
depth
,即队列中剩余项目的数量。 - 如果仍然处于初始列表中,将初始列表计数
f.initialPopulationCount
减一。 - 检查元素是否在
f.items
中,如果不在(这不应该发生),则记录错误并继续处理下一个元素。 - 从
f.items
中删除该元素,以防止重复处理。
- 如果队列深度大于10,并且处理单个元素的时间超过100毫秒,则记录性能追踪信息。这有助于诊断处理缓慢的问题。
- 调用传入的process函数处理元素。如果处理过程中返回了
ErrRequeue
错误,表示该元素需要重新加入队列,调用f.addIfNotPresent
方法将元素添加到队列中。 - 返回处理的元素item和错误err。
process函数
1.process回调函数的定义是在SharedInformer
的Run
函数中,源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/shared_informer.go
2.可以看到process函数就是s.HandleDeltas
,即SharedInformer
的HandleDeltas
函数。
3.Run
函数会调用controller
的Run
函数,该函数会调用的controller
的processLoop
函数。processLoop
函数负责从DeltaFIFO队列中循环取出数据。源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/controller.go#L129
4.processLoop
函数会调用PopProcessFunc
函数。
PopProcessFunc
函数的定义在fifo.go
中:
https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/fifo.go
type PopProcessFunc func(obj interface{}, isInInitialList bool) error
5.PopProcessFunc
是一个函数类型,接收两个参数,obj interface{}
:这个参数代表从队列中弹出的项目。isInInitialList bool
:这个布尔参数表示pop
的项目是否来自队列的初始列表。
6.PopProcessFunc
函数调用了c.config.Process
方法对pop弹出的元素进行处理。
7.而c.config.Process
方法就是上面的HandleDeltas
函数。
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, deltas, isInInitialList)
}
return errors.New("object given as Process argument is not Deltas")
}
8.HandleDeltas
函数中如果obj类型断言成功,则调用了controller
的processDeltas
函数处理队列。源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/controller.go#L436
9.processDeltas
函数遍历变化列表,对于Sync、Replaced、Added 和 Updated
类型,函数会尝试从clientState
也就是HandleDeltas
函数传入的indexer
中获取旧对象信息,如果存在则进行更新操作,否则进行添加操作,并调用相应的事件处理方法。对于Deleted
类型,函数会直接删除对象,并调用相应的事件处理方法。也就是SharedInformer
的OnAdd,OnUpdate,OnDelete
方法。
10.这样数据就从DeltaFIFO
中更新到了本地存储indexer
中。
同步方法
Resync
方法会将Indexer
本地存储中的资源对象同步到DeltaFIFO
中,并将这些资源对象设置为Sync
的操作类型,Resync
函数在Reflector
中定时执行,执行周期由NewRelector
函数传入的resyncPeriod
参数设定。
Resync
方法是否执行是在reflector中定义的r.store.Resync()
,上篇文章中已经介绍过。源码地址: https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/reflector.go
Resync方法
用于重新同步已知对象的状态。
func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
if f.knownObjects == nil {
return nil
}
keys := f.knownObjects.ListKeys()
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists {
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil
}
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if len(f.items[id]) > 0 {
return nil
}
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
- 加锁,返回前解锁。
f.knownObjects.GetByKey(key)
是Indexer
本地存储对象,通过该对象可以获取client-go目前存储的所有资源对象。- 如果
knownObjects
为nil
,则方法直接返回,因为没有对象需要同步。 - 使用
ListKeys
方法获取所有已知对象的键(key)列表。 - 遍历每个键,调用
syncKeyLocked
方法进行同步。 syncKeyLocked
方法首先使用键key从knownObjects
中获取对象。如果对象不存在或者获取时出现错误,则记录日志并跳过该对象。- 如果对象已经存在于处理队列中(即items中已有该对象的事件),则不再重复添加。
- 如果对象不在处理队列中,则使用操作类型
Sync
和对象调用queueActionLocked
方法将其添加到队列中。
Replace方法
用于将给定的对象列表以Sync
或Replace
类型的事件添加到队列中,或根据已有对象的键进行删除操作。
func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
action := Sync
if f.emitDeltaTypeReplaced {
action = Replaced
}
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
queuedDeletions := 0
for k, oldItem := range f.items {
if keys.Has(k) {
continue
}
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
deletedObj = d.Obj
}
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if f.knownObjects != nil {
knownKeys := f.knownObjects.ListKeys()
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
if len(f.items[k]) > 0 {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
}
if !f.populated {
f.populated = true
f.initialPopulationCount = keys.Len() + queuedDeletions
}
return nil
}
- 加锁,返回前解锁。
- 创建一个集合
keys
,用于存储列表中对象的键。 - 根据
f.emitDeltaTypeReplaced
的值决定是使用Sync
还是Replace
作为操作类型。 - 遍历对象列表,获取对象的键
key
,并将键插入集合keys
。然后调用f.queueActionLocked
方法,将其以Sync
或者Replace
类型的事件添加到队列中。如果添加操作失败,则返回错误。 - 遍历
f.items
中的对象,对于不在新列表中的对象,将其作为Deleted
类型的事件添加到队列中。如果对象是DeletedFinalStateUnknown
类型,则提取并使用实际对象。 - 如果
f.knownObjects
不为空,则对其进行遍历:- 获取
f.knownObjects
中所有的键列表knownKeys
。 - 遍历键列表,如果键在集合
keys
中,或者键对应的f.items
中的Delta
列表不为空,则跳过该键。 - 否则,尝试从
f.knownObjects
中根据键获取对象deletedObj
,如果出现错误,则将deletedObj
设置为 nil,并记录错误日志。如果对象不存在,则将deletedObj
设置为 nil,并记录信息日志。累加计数器queuedDeletions
。然后使用操作类型Deleted
和DeletedFinalStateUnknown
对象调用f.queueActionLocked
方法,将删除操作添加到队列中。
- 获取
- 如果DeltaFIFO之前未被标记为已填充(f.populated 为 false),则将其标记为已填充,并更新
f.initialPopulationCount
为新列表中的对象数加上队列中的删除事件数。