go

client-go Reflector源码解读

简介

在k8s中,组件之间通过http协议进行通信,通过使用cient-go的Informer保证了消息的实时性、可靠性、顺序性等,其他组件也是通过Informer与apiserver进行通信的。它的出现就是为了减少当多个控制器对apiserver进行大量访问时对apiserver造成的压力。

架构

file

上半部分是client-go的相关组件,下半部分是自定义控制器的组件。

Informer组件

  • Reflector:使用List-Watch来保证本地缓存数据的准确性、顺序性和一致性。List对应资源的全量列表数据,是短连接。Watch监听指定的资源,是长链接。监听资源变更事件(例如Added、Updated和Deleted事件),并将资源对象存放到本地缓存DeltaFIFO中。
  • Indexer:Indexer是一个缓存层,用于在本地存储当前集群中所有资源对象的最新状态,允许通过多种索引(如名称、命名空间等)来查询资源对象。Reflector从DeltaFIFO中将消费出来的资源对象存储到Indexer,Indexer数据与etcd保持一致。这样client-go就可以从本地读取,不用连etcd,减少了apiserver和etcd的压力。
  • DeltaFIFO:DeltaFIFO是一个先进先出的队列,用于存储由Reflector监听到的资源对象变化事件。每个事件都被封装成一个Delta对象,包含了事件类型(如Added、Updated、Deleted)和资源对象。
  • SharedInformer:SharedInformer是Informer的核心,它协调Reflector、DeltaFIFO和Indexer的工作。SharedInformer从DeltaFIFO中消费事件,更新Indexer中的缓存,并将变化通知给所有注册的事件处理器(EventHandler)。

整个流程如下:

  1. controller manager在启动的时候会启动一个sharedInformerFactory这是一个Informer的集合(informers map [reflect.Type] cache.SharedIndexInformer)。
  2. controller在run的时候会调用reflector的run,reflector在run的时候会listen and watch,当有event的时候插入本地缓存DeltaFIFO中并更新ResouVersion。
  3. Informer从队列取出事件;
  4. Informer更新Indexer的缓存;
  5. Indexer存储object和key到ThreadSafeStore本地存储;
  6. Informer根据事件类型调用相应的EventHandler处理逻辑,即发送对象给自定义控制器。
  7. 最后Indexer调用ThreadSafeStore底层存储。

自定义控制器组件

  • Informer reference:是Informer的一个实例,主要用于处理与CRD对象相关的。当开发自定义控制器(custom controller)时,需要这个控制器创建相匹配的Informer。
  • Resource Event Handler:这是一个回调函数,当一个Informer/SharedInformer要分发一个对象到控制器时,会调用此函数。例如:将对象的Key放在WorkQueue中并等待后续的处理。
  • WorkQueue:工作者队列。前面我们提到过Informer,它除了更新本地缓存之外,还要将数据同步给相应控制器,WorkQueue就是为了数据同步的问题而产生的。当有资源被添加、修改或删除,Informer/SharedInformer就会将相应的事件加入到WorkQueue中。其它所有的控制器需要排队对这个queue进行读取,如果某个控制器发现这个事件与自己相关,就执行相应的操作。如果操作失败,就会把刚才取出的事件再放回到WorkQueue中,等再轮到自己执行时会再去重试这次失败的操作。如果操作成功,就将该事件从队列中删除。
  • Process Item:用户自定义的处理WorkQueue中的相应Item的函数。比如可以在这里面使用Indexer或Listing wrapper来根据相应的Key检索对象。
  • Indexer reference:是Indexer的一个实例,主要用于处理与CRD对象相关的。当开发自定义控制器时,需要创建Indexer的实例,这个实例主要作用是实现存储+索引。

控制器的源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/controller.go

这篇主要介绍架构的上半部分的Reflector,解释一下代码实现。

Reflector

源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/reflector.go

Reflector结构体

type Reflector struct {
    // name:字符串,标识Reflector,通常设置为文件:行格式,以便于识别。
    name string
    // typeDescription:字符串,描述Reflector期望处理的对象类型。仅用于显示目的。
    typeDescription string
    // expectedType:reflect.Type,表示Reflector期望放置在存储中的对象类型。这确保了类型安全。
    expectedType reflect.Type
    // expectedGVK:指向schema.GroupVersionKind结构体的指针,指定Reflector期望的对象的组、版本和种类,用于处理非结构化数据。
    expectedGVK *schema.GroupVersionKind
    // store:接口(Store),Reflector在其中同步观察到的对象。存储本质上是从Kubernetes API服务器获取的对象的本地缓存。
    store Store
    // listerWatcher:接口(ListerWatcher),Reflector用来从apiserver列出和监视资源。
    listerWatcher ListerWatcher
    // backoffManager:wait.BackoffManager,用于管理ListAndWatch操作的回退时间,帮助避免在重试期间打爆apiserver。
    backoffManager wait.BackoffManager
    // resyncPeriod:time.Duration,指示Reflector应多久与listerWatcher重新同步一次,即使没有检测到更改。
    resyncPeriod   time.Duration
    // clock:接口(clock.Clock),用于时间相关操作,允许通过操纵时间来更容易地进行测试。
    clock clock.Clock
    // paginatedResult:布尔值,指示是否应该为列表调用强制分页,基于初始list调用结果。
    paginatedResult bool
    // lastSyncResourceVersion:字符串,存储在同步期间最后观察到的资源版本,用于优化ListAndWatch操作。
    lastSyncResourceVersion string
    // isLastSyncResourceVersionUnavailable:布尔值,如果使用lastSyncResourceVersion的前一个list请求失败并出现“过期”或“资源版本太大”错误,则isLastSyncResourceVersionUnavailable为true。
    isLastSyncResourceVersionUnavailable bool
    // lastSyncResourceVersionMutex:sync.RWMutex,提供对lastSyncResourceVersion的线程安全读/写访问。
    lastSyncResourceVersionMutex sync.RWMutex
    // WatchErrorHandler函数类型,每当ListAndWatch操作因错误而断开连接时调用。
    watchErrorHandler WatchErrorHandler
    // WatchListPageSize:int64,定义初始和重新同步watch列表的块大小,影响如何从API服务器获取数据。
    WatchListPageSize int64
    // ShouldResync:一个函数,决定是否应该重新同步存储,定期调用。
    ShouldResync func() bool
    // MaxInternalErrorRetryDuration:time.Duration,定义Reflector在遇到watch操作的内部错误后应重试多长时间。
    MaxInternalErrorRetryDuration time.Duration
    // UseWatchList:可选的*bool,如果启用,指示Reflector使用流式方法从API服务器获取数据,流的主要优点是使用较少的服务器资源来获取数据。旧的行为建立一个以块为单位获取数据的LIST请求。分页列表的效率较低,并且取决于对象的实际大小,可能会导致apisserver的内存消耗增加。
    UseWatchList *bool
}

ResourceVersionUpdater接口

ResourceVersionUpdater接口定义了一个UpdateResourceVersion方法,这个方法的作用是更新反射器当前的资源版本。实现ResourceVersionUpdater接口允许开发者自定义如何处理资源版本的更新。例如,开发者可以实现一个更新内部状态或者触发特定逻辑的UpdateResourceVersion方法。

type ResourceVersionUpdater interface {
    UpdateResourceVersion(resourceVersion string)
}

WatchErrorHandler类型

WatchErrorHandler是一个函数类型,定义了当ListAndWatch操作因为错误而断开连接时应该调用的错误处理程序。这个错误处理程序的目的是允许开发者自定义如何响应和处理监视过程中遇到的错误。

type WatchErrorHandler func(r *Reflector, err error)

DefaultWatchErrorHandler函数

默认实现会根据错误类型将错误消息记录在适当的日志级别。然而,开发者可以提供自己的实现来以不同的方式展示错误消息。例如,可以实现一个WatchErrorHandler,将错误消息发送到监控系统或者执行一些恢复操作。

func DefaultWatchErrorHandler(r *Reflector, err error) {
    switch {
    case isExpiredError(err):
        // 如果错误是由于 watch 连接过期导致的(isExpiredError(err) 返回 true),则不会设置 LastSyncResourceVersionUnavailable。这是因为使用资源版本(ResourceVersion)进行 LIST 调用的前提是已经有一个明确的资源版本了。因此,首先尝试使用最后观察到的对象的资源版本进行 LIST 调用。
        klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
    case err == io.EOF:
        // 如果错误是 io.EOF,这意味着 watch 连接正常关闭,没有特别的错误处理。
    case err == io.ErrUnexpectedEOF:
        // 如果错误是 io.ErrUnexpectedEOF,这表示 watch 连接以意外的方式结束了。这种情况下,会记录一条日志,日志级别为 V(1),表示这是一个值得注意的异常情况。
        klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err)
    default:
        // 对于所有其他类型的错误,将使用 utilruntime.HandleError 函数记录错误。这意味着错误被认为是严重的,需要被记录并可能需要进一步的处理。
        utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err))
    }
}

var (
    // 定义了一个变量 minWatchTimeout,设置 watch 请求的超时时间,可以降低 apiserver 的负载。超时时间是随机的,在 [minWatchTimeout, 2*minWatchTimeout] 范围内。这里,minWatchTimeout 被设置为 5 分钟,意味着实际的超时时间将在 5 到 10 分钟之间随机选择。
    minWatchTimeout = 5 * time.Minute
)

NewNamespaceKeyedIndexerAndReflector函数

函数接受 ListerWatcher、expectedType接口、resyncPeriod参数,并返回一个索引器(Indexer)和反射器(Reflector)。

func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
    indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
    reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
    return indexer, reflector
}
  • 首先使用 NewIndexer 函数创建一个Indexer,Indexer使用 MetaNamespaceKeyFunc 作为键,并使用 MetaNamespaceIndexFunc 作为索引生成函数。这意味着Indexer将根据资源的命名空间和名称来索引资源。
  • 再使用 NewReflector 函数创建一个Reflector,Reflector使用前面创建的Indexer作为存储,lwexpectedType 作为参数,并设置重新同步周期为给定的 resyncPeriod。
  • 最后,函数返回创建的Indexer和Reflector。

NewReflector函数

NewReflector 函数根据给定的 ListerWatcher、expectedType、store、resyncPeriod创建一个新的Reflector。

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
}

函数内部调用了 NewReflectorWithOptions 函数,并传递了相同的参数,同时使用了 ReflectorOptions 结构体来设置反射器的选项。其中,ResyncPeriod 被设置为给定的 resyncPeriod。最后,函数返回创建的Reflector。

NewNamedReflector函数

NewNamedReflector 函数与 NewReflector 函数类似,但它可以指定Reflector的名称。

func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
}

函数内部调用了 NewReflectorWithOptions 函数,并传递了相同的参数,同时使用了 ReflectorOptions 结构体来设置反射器的选项。其中,Name 被设置为给定的 nameResyncPeriod 被设置为给定的 resyncPeriod。最后,函数返回创建的Reflector。

ReflectorOptions结构体

用于设置Reflector的选项。这些选项可以用于自定义Reflector的行为和属性。

type ReflectorOptions struct {
    // Name 是反射器的名称。如果未设置或未指定,则名称默认为调用栈中位于该包之外的最接近的源文件的行号。
    Name string
    // TypeDescription 是反射器的类型描述。如果未设置或未指定,则类型描述按照以下规则默认值:如果传递给 NewReflectorWithOptions 的 expectedType 为 nil,则类型描述为 “”。如果 expectedType 是 *unstructured.Unstructured 的实例并且其 apiVersion 和 kind 字段已设置,则类型描述为其字符串编码。否则,类型描述设置为 expectedType 的 Go 类型。
    TypeDescription string

    // ResyncPeriod 是反射器的重新同步周期。如果未设置或未指定,则重新同步周期默认为 0(不重新同步)。
    ResyncPeriod time.Duration

    // Clock 允许测试控制时间。如果未设置,默认为 clock.RealClock{}。
    Clock clock.Clock
}

NewReflectorWithOptions函数

用于创建一个带有选项的Reflector。

func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
    reflectorClock := options.Clock
    if reflectorClock == nil {
        reflectorClock = clock.RealClock{}
    }
    r := &Reflector{
        name:            options.Name,
        resyncPeriod:    options.ResyncPeriod,
        typeDescription: options.TypeDescription,
        listerWatcher:   lw,
        store:           store,
        backoffManager:    wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
        clock:             reflectorClock,
        watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
        expectedType:      reflect.TypeOf(expectedType),
    }

    if r.name == "" {
        r.name = naming.GetNameFromCallsite(internalPackages...)
    }

    if r.typeDescription == "" {
        r.typeDescription = getTypeDescriptionFromObject(expectedType)
    }

    if r.expectedGVK == nil {
        r.expectedGVK = getExpectedGVKFromObject(expectedType)
    }

    if r.UseWatchList == nil {
        if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
            r.UseWatchList = ptr.To(true)
        }
    }

    return r
}
  • 首先根据 options.Clock 的值来确定使用的时钟对象,如果未设置,则使用默认的 clock.RealClock{}
  • 创建了反射器 r,并设置了name,resyncPeriod,typeDescription,listerWatcher,store,backoffManager,clock,watchErrorHandler,expectedType属性。
  • 如果反射器的名称为空,则使用 naming.GetNameFromCallsite 函数根据调用栈来获取名称。
  • 如果反射器的类型描述为空,则使用 getTypeDescriptionFromObject 函数根据 expectedType 来获取类型描述。
  • 如果反射器的期望的 GroupVersionKind(GVK)为空,则使用 getExpectedGVKFromObject 函数根据 expectedType 来获取 GVK。
  • 如果反射器的 UseWatchList 属性未设置,则根据环境变量 ENABLE_CLIENT_GO_WATCH_LIST_ALPHA 的值来设置。
  • 最后,函数返回创建的反射器。

getTypeDescriptionFromObject函数

用于根据给定的 expectedType 获取类型描述。如果 expectedType*unstructured.Unstructured 的实例并且具有非空的 GVK,则返回 GVK 的字符串作为 TypeDescription。否则,返回reflectDescription作为TypeDescription

func getTypeDescriptionFromObject(expectedType interface{}) string {
    if expectedType == nil {
        return defaultExpectedTypeName
    }

    reflectDescription := reflect.TypeOf(expectedType).String()

    obj, ok := expectedType.(*unstructured.Unstructured)
    if !ok {
        return reflectDescription
    }

    gvk := obj.GroupVersionKind()
    if gvk.Empty() {
        return reflectDescription
    }

    return gvk.String()
}

getExpectedGVKFromObject函数

根据给定的 expectedType 来获取期望的 GVK。如果 expectedType*unstructured.Unstructured 的实例并且具有非空的 GVK,则返回 GVK 的指针。否则,返回 nil。

func getExpectedGVKFromObject(expectedType interface{}) *schema.GroupVersionKind {
    obj, ok := expectedType.(*unstructured.Unstructured)
    if !ok {
        return nil
    }

    gvk := obj.GroupVersionKind()
    if gvk.Empty() {
        return nil
    }

    return &gvk
}

Run方法

Run 方法是 Reflector 结构体的一个成员方法,接受一个类型为 <-chan struct{}stopCh 通道参数。该方法用于启动一个反射器并运行一个循环来监听和处理资源的变化。

func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
    wait.BackoffUntil(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            r.watchErrorHandler(r, err)
        }
    }, r.backoffManager, true, stopCh)
    klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}

var (
    neverExitWatch <-chan time.Time = make(chan time.Time)
    errorStopRequested = errors.New("stop requested")
)
  • 首先使用 klog.V(3).Infof 输出一条日志,表示反射器的启动信息。
  • 然后使用 wait.BackoffUntil 方法。
    • 匿名函数,用于执行 r.ListAndWatch(stopCh) 操作。
    • 在每次执行之前,会先检查是否需要停止。如果 stopCh 接收到了信号,则会立即停止循环。
    • 在每次执行时,会调用 r.ListAndWatch(stopCh) 方法来监听和处理资源的变化。
    • 如果出现错误,则调用 watchErrorHandler 处理错误。
    • 在每次执行之后,会根据指定的 backoffManager 进行等待,等待时间会逐渐增加,以防止频繁的重试。
    • 直到收到 stopCh 信号或发生错误。
  • 最后,使用 klog.V(3).Infof 输出一条日志,表示反射器的停止信息。
  • 变量 neverExitWatch 是一个只读的时间通道,用于表示永远不会发送任何值。
  • 变量 errorStopRequested 是一个错误对象,用于表示停止请求的信号,通常由调用方通过 stopCh 传递给反射器。

BackoffUntil函数

func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{})

函数逻辑如下:

  1. 初始化重试次数为0。
  2. 进入一个无限循环,直到满足停止条件(stopCh通道关闭)或执行函数返回false。
  3. 在每次循环中,调用f函数执行重试操作。如果f返回true,表示应继续重试。
  4. 如果f返回false,表示不应继续重试,退出循环。
  5. 基于退避策略计算下一次重试的等待时间。
  6. 在等待时间结束后,继续下一次循环。

resyncChan方法

resyncChan 方法是 Reflector 结构体的一个成员方法,该方法返回两个值:一个类型为 <-chan time.Time 的通道和一个类型为 func() bool 的函数。

func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
    if r.resyncPeriod == 0 {
        return neverExitWatch, func() bool { return false }
    }
    t := r.clock.NewTimer(r.resyncPeriod)
    return t.C(), t.Stop
}
  • 首先检查 r.resyncPeriod 的值。如果为0,表示不需要进行重新同步操作,那么就返回一个永远不会发送任何值的通道 neverExitWatch,以及一个始终返回 false 的函数。
  • 如果 r.resyncPeriod 的值不为0,表示需要进行重新同步操作,那么就创建一个新的定时器 t,使用 r.clock.NewTimer(r.resyncPeriod) 方法,并设置定时器的周期为 r.resyncPeriod
  • 然后,返回定时器的通道 t.C(),以及定时器的 Stop 方法 t.Stopt.C() 返回一个 <-chan time.Time 类型的通道,当定时器到达设定的时间时,该通道将接收到一个值。t.Stop 是用于停止定时器的方法。

通过调用 resyncChan 方法,可以获取用于重新同步操作的定时器通道和停止定时器的方法。

ListAndWatch方法

ListAndWatch 方法是 Reflector 结构体的一个成员方法。该方法用于执行资源的ListAndWatch 操作,根据配置的 UseWatchList 标志选择使用 WatchList 或回退到使用 List 方法来获取资源的初始列表。然后启动一个goroutine来进行周期性的重新同步操作,并调用 watch 方法来监听资源的变化。

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
    var err error
    var w watch.Interface
    useWatchList := ptr.Deref(r.UseWatchList, false)
    fallbackToList := !useWatchList

    if useWatchList {
        w, err = r.watchList(stopCh)
        if w == nil && err == nil {
            return nil
        }
        if err != nil {
            klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err)
            fallbackToList = true
            w = nil
        }
    }

    if fallbackToList {
        err = r.list(stopCh)
        if err != nil {
            return err
        }
    }

    klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)

    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go r.startResync(stopCh, cancelCh, resyncerrc)
    return r.watch(w, stopCh, resyncerrc)
}
  • 首先使用 klog.V(3).Infof 输出一条日志,表示正在执行 ListAndWatch 操作。
  • 然后,定义了两个变量 errw,用于存储可能的错误和监视器对象。
  • 通过 ptr.Deref 函数获取 r.UseWatchList 的值,并将其赋给 useWatchList 变量。ptr.Deref 函数用于获取指针类型变量的值,如果指针为空则返回提供的默认值。
  • 根据 r.UseWatchList 的值来决定是否使用 watchList 方法。如果 r.UseWatchList 为真,则调用 watchList 方法来获取监视器对象 w 和可能的错误 err。如果 wnil 并且 errnil,表示 stopCh 已关闭,直接返回 nil
  • 如果使用 watchList 方法出现错误,则会输出一个警告日志,并将 fallbackToList 设置为真,表示需要回退到使用 list 方法,将 w 设置为 nil,以便后续使用标准的 LIST/WATCH 机制。
  • 调用 list 方法来获取资源的初始列表,并将可能的错误 err 返回。如果 err 不为 nil,则直接返回错误。
  • 使用 klog.V(2).Infof 输出一条日志,表示缓存已填充的类型描述和名称。
  • 创建了一个缓冲大小为1的 resyncerrc 通道,一个用于取消操作的 cancelCh 通道,并在函数结束时关闭 cancelCh 通道。
  • 启动一个goroutine来执行 startResync 方法,进行周期性的重新同步操作。同时,调用 watch 方法来监听资源的变化,并将监视器对象 w、停止信号通道 stopCh 和重新同步错误通道 resyncerrc 作为参数传递给 watch 方法。可以实现在后台执行周期性的重新同步操作,而不会阻塞主线程。同时,通过关闭 cancelCh 通道,可以在需要时取消重新同步操作。
  • 最后,返回 watch 方法的结果。

startResync方法

该方法用于执行周期性的重新同步操作,当发生错误时,发送错误到通道中。

func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
    resyncCh, cleanup := r.resyncChan()
    defer func() {
        cleanup() // Call the last one written into cleanup
    }()
    for {
        select {
        case <-resyncCh:
        case <-stopCh:
            return
        case <-cancelCh:
            return
        }
        if r.ShouldResync == nil || r.ShouldResync() {
            klog.V(4).Infof("%s: forcing resync", r.name)
            if err := r.store.Resync(); err != nil {
                resyncerrc <- err
                return
            }
        }
        cleanup()
        resyncCh, cleanup = r.resyncChan()
    }
}
  • 首先调用 r.resyncChan() 方法获取一个用于触发重新同步操作的通道 resyncCh 和一个清理函数 cleanup。使用defer延迟执行 cleanup 函数,以确保在方法结束时清理资源。
  • 进入无限循环,通过select语句监听三个通道:resyncCh、stopCh 和 cancelCh
    • 如果从 resyncCh 接收到值,表示定时器已触发,需要进行重新同步操作。
    • 如果从 stopCh 接收到值,表示停止信号已经触发,需要退出循环。
    • 如果从 cancelCh 接收到值,表示取消重新同步操作的信号已经触发,需要退出循环。
  • 如果 r.ShouldResync 为 nil 或者调用 r.ShouldResync() 返回 true,表示需要进行重新同步操作。函数打印一条日志,并调用 r.store.Resync() 方法执行实际的重新同步操作。如果执行过程中出现错误,会将错误发送到 resyncerrc 通道,并返回。
  • 在每次重新同步操作完成后,会调用 cleanup() 函数停止和清理定时器。
  • 调用 r.resyncChan() 获取一个新的定时器通道 resyncCh 和停止函数 cleanup,以便下一次循环使用。

watch方法

该方法用于监听资源的变化,并在每次变化发生后调用相应的处理函数。

func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
    var err error
    retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)

    for {
        select {
        case <-stopCh:
            if w != nil {
                w.Stop()
            }
            return nil
        default:
        }

        start := r.clock.Now()

        if w == nil {
            timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
            options := metav1.ListOptions{
                ResourceVersion:      r.LastSyncResourceVersion(),
                TimeoutSeconds:       &timeoutSeconds,
                AllowWatchBookmarks:  true,
            }

            w, err = r.listerWatcher.Watch(options)
            if err != nil {
                if canRetry := isWatchErrorRetriable(err); canRetry {
                    klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err)
                    select {
                    case <-stopCh:
                        return nil
                    case <-r.backoffManager.Backoff().C():
                        continue
                    }
                }
                return err
            }
        }

        err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
        w.Stop()
        w = nil
        retry.After(err)
        if err != nil {
            if err != errorStopRequested {
                switch {
                case isExpiredError(err):
                    klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
                case apierrors.IsTooManyRequests(err):
                    klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription)
                    select {
                    case <-stopCh:
                        return nil
                    case <-r.backoffManager.Backoff().C():
                        continue
                    }
                case apierrors.IsInternalError(err) && retry.ShouldRetry():
                    klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err)
                    continue
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err)
                }
            }
            return nil
        }
    }
}
  • 首先,函数定义了一个变量 err 和一个重试策略 retry
  • 进入无限循环,通过 select 语句监听 stopCh 通道的事件。如果从 stopCh 接收到值,需要调用 w.Stop() 停止监听器,然后函数返回退出循环。如果没有接收到值,则继续执行后续操作。
  • 记录当前时间 start,用于计算请求的耗时。
  • 如果监视器 wnil,表示需要创建一个新的监视器。函数根据一些选项参数调用 r.listerWatcher.Watch(options) 方法来创建监视器 w,并将错误赋值给 err。如果创建监听器的过程中出现了可以重试的错误,函数打印一条日志,并在一定的退避时间后继续下一轮循环。如果在退避等待期间接收到了 stopCh 通道的值,函数立即返回。如果创建监听器的过程中出现了无法重试的错误,函数直接返回该错误。
  • 调用 watchHandler 函数进行实际的监听操作。该函数处理监听事件,将事件存储到 r.store 中,并在需要时执行重新同步操作。函数还负责处理错误情况,并根据错误类型执行相应的操作。
  • 在处理完事件后,调用 w.Stop() 方法来停止监视器对象的监听,并将 w 设置为 nil,以确保在下一次循环中重新创建新的监视器对象。
  • 根据返回的错误 err 进行判断和处理。如果 err 不为 nil,并且该错误不等于特定的 errorStopRequested 错误,则根据不同的错误类型进行相应的处理:
    • 如果是 isExpiredError 错误,输出一条日志。
    • 如果是 429 Too Many Requests 错误,并在一定的退避时间后继续下一轮循环。如果在退避等待期间接收到了 stopCh 通道的值,函数立即返回。
    • 如果是 apierrors.IsInternalError 错误,并且重试策略允许重试(retry.ShouldRetry() 返回 true),输出一条日志并继续下一次循环。
    • 其他情况下,输出一条警告日志。
  • 如果是 errorStopRequested 错误,表示停止信号已经触发,直接返回 nil。
  • 最后,回到循环的开始,继续下一次循环。

list方法

该方法用于从列表中获取资源,并处理重试和错误情况。它更新 Reflector 的状态,并将获取到的资源进行同步操作。

func (r *Reflector) list(stopCh <-chan struct{}) error {
    var resourceVersion string
    options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

    initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
    defer initTrace.LogIfLong(10 * time.Second)
    var list runtime.Object
    var paginatedResult bool
    var err error
    listCh := make(chan struct{}, 1)
    panicCh := make(chan interface{}, 1)
    go func() {
        defer func() {
            if r := recover(); r != nil {
                panicCh <- r
            }
        }()
        pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
            return r.listerWatcher.List(opts)
        }))
        switch {
        case r.WatchListPageSize != 0:
            pager.PageSize = r.WatchListPageSize
        case r.paginatedResult:
        case options.ResourceVersion != "" && options.ResourceVersion != "0":
            pager.PageSize = 0
        }

        list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
        if isExpiredError(err) || isTooLargeResourceVersionError(err) {
            r.setIsLastSyncResourceVersionUnavailable(true)
            list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
        }
        close(listCh)
    }()
    select {
    case <-stopCh:
        return nil
    case r := <-panicCh:
        panic(r)
    case <-listCh:
    }
    initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
    if err != nil {
        klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err)
        return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
    }

    if options.ResourceVersion == "0" && paginatedResult {
        r.paginatedResult = true
    }

    r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
    listMetaInterface, err := meta.ListAccessor(list)
    if err != nil {
        return fmt.Errorf("unable to understand list result %#v: %v", list, err)
    }
    resourceVersion = listMetaInterface.GetResourceVersion()
    initTrace.Step("Resource version extracted")
    items, err := meta.ExtractListWithAlloc(list)
    if err != nil {
        return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
    }
    initTrace.Step("Objects extracted")
    if err := r.syncWith(items, resourceVersion); err != nil {
        return fmt.Errorf("unable to sync list result: %v", err)
    }
    initTrace.Step("SyncWith done")
    r.setLastSyncResourceVersion(resourceVersion)
    initTrace.Step("Resource version updated")
    return nil
}
  • 首先定义了一些变量设置跟踪信息,创建通道,用于存储获取到的资源,处理错误信息。
  • 启动一个 goroutine 来执行获取资源的操作。在 goroutine 内部,尝试按块收集列表数据,使用一个分页器(pager)进行操作。分页器的配置基于 ReflectorWatchListPageSize 字段和提供的 ListOptions。如果不支持分页或未请求分页,则返回完整的列表响应。
  • 使用 r.listerWatcher.List(opts) 方法来获取资源列表。如果列表操作因为 isExpiredError 错误或 isTooLargeResourceVersionError 错误而失败,函数将使用空的资源版本进行重试,以便恢复并继续进行。
  • goroutine 通过关闭 listCh 通道来标识列表操作完成。主函数通过等待 listCh 通道来等待列表操作的完成。(defer)
  • 如果在 stopCh 通道上接收到停止信号,函数会提前返回。
  • 在主 goroutine 中,通过 select 语句监听多个通道的事件:
    • 如果从 stopCh 接收到值,表示停止信号已经触发,需要退出方法。
    • 如果从 panicCh 接收到值,表示在获取资源的 goroutine 中发生了 panic,需要重新抛出 panic。
    • 如果从 listCh 接收到值,表示资源列表已经获取完成,可以继续执行后续操作。
  • 如果list操作成功完成,函数提取对象列表及其资源版本。如果出现错误,会输出一条警告日志,并返回相应的错误信息。
  • 函数更新 Reflector 的状态,并调用 r.syncWith(items, resourceVersion) 方法来进行同步操作。
  • 函数更新最后同步的资源版本,并返回 nil 表示方法执行成功。

watchList方法

该方法通过watch操作获取资源对象的变化,将变化同步到存储中,并返回一个监视器。

func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
    var w watch.Interface
    var err error
    var temporaryStore Store
    var resourceVersion string
    isErrorRetriableWithSideEffectsFn := func(err error) bool {
        if canRetry := isWatchErrorRetriable(err); canRetry {
            klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err)
            <-r.backoffManager.Backoff().C()
            return true
        }
        if isExpiredError(err) || isTooLargeResourceVersionError(err) {
            r.setIsLastSyncResourceVersionUnavailable(true)
            return true
        }
        return false
    }

    initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
    defer initTrace.LogIfLong(10 * time.Second)
    for {
        select {
        case <-stopCh:
            return nil, nil
        default:
        }

        resourceVersion = ""
        lastKnownRV := r.rewatchResourceVersion()
        temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options := metav1.ListOptions{
            ResourceVersion:      lastKnownRV,
            AllowWatchBookmarks:  true,
            SendInitialEvents:    pointer.Bool(true),
            ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
            TimeoutSeconds:       &timeoutSeconds,
        }
        start := r.clock.Now()

        w, err = r.listerWatcher.Watch(options)
        if err != nil {
            if isErrorRetriableWithSideEffectsFn(err) {
                continue
            }
            return nil, err
        }
        bookmarkReceived := pointer.Bool(false)
        err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
            func(rv string) { resourceVersion = rv },
            bookmarkReceived,
            r.clock, make(chan error), stopCh)
        if err != nil {
            w.Stop() // stop and retry with clean state
            if err == errorStopRequested {
                return nil, nil
            }
            if isErrorRetriableWithSideEffectsFn(err) {
                continue
            }
            return nil, err
        }
        if *bookmarkReceived {
            break
        }
    }
    initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())})
    r.setIsLastSyncResourceVersionUnavailable(false)

    checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore)

    if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
        return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
    }
    initTrace.Step("SyncWith done")
    r.setLastSyncResourceVersion(resourceVersion)

    return w, nil
}
  • 函数初始化一些变量,包括监听器(watch.Interface)、错误、临时存储(temporaryStore)和资源版本(resourceVersion)。
  • 定义了一个函数 isErrorRetriableWithSideEffectsFn,该函数会根据错误类型进行判断,并在需要重试时输出一条日志并进行相应的处理。
  • 启动一个无限循环,在每次循环开始时,通过 select 语句监听 stopCh 通道的事件。如果从 stopCh 接收到值,表示停止信号已经触发,需要退出循环。如果没有接收到值,则继续执行后续操作。
  • 重置资源版本,并从 rewatchResourceVersion 函数获取最后已知的资源版本。
  • 创建一个临时存储(temporaryStore),用于暂存从监听流中获取的对象。
  • 设置列表选项,包括资源版本、允许使用书签、发送初始事件、资源版本匹配条件以及超时时间。
  • 记录开始时间,并通过 listerWatcherWatch 方法创建监视器。如果创建过程中出现错误,并且满足重试条件,则继续下一次循环。如果不满足重试条件,则返回错误。
  • 如果watch操作成功,调用 watchHandler 函数处理监听流,并将获取的对象存储到临时存储中。
  • 如果处理监听流的过程中出现错误,停止监听器并根据错误类型和重试函数的判断,决定是否进行重试或返回错误。
  • 如果接收到书签(bookmark),则退出循环。否则,继续下一次循环。
  • 如果成功从监听流中获取了初始状态,并确认了 k8s.io/initial-events-end 书签,记录日志并设置 isLastSyncResourceVersionUnavailable 为 false。
  • 检查监听列表的一致性。
  • 将临时存储中的对象替换当前存储(store)中的对象,并设置最后同步的资源版本为当前资源版本。
  • 记录日志并返回监听器。

syncWith方法

用于将给定的对象列表与存储中的对象进行同步。

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    }
    return r.store.Replace(found, resourceVersion)
}

初始化了一个名为 found[]interface{} 类型的切片。容量为 items 切片的长度。
遍历对象列表 items,将每个对象添加到 found 切片中。
调用存储(store)的 Replace 方法,将 found 切片中的对象替换掉存储中的对象,并使用给定的资源版本进行更新。确保了本地缓存的状态与远端(例如 Kubernetes 集群)保持一致。

watchHandler函数

函数监听来自 Kubernetes API 的 watch 事件,并根据事件类型(如添加、修改、删除)更新本地存储(store)。

func watchHandler(start time.Time,
    w watch.Interface,
    store Store,
    expectedType reflect.Type,
    expectedGVK *schema.GroupVersionKind,
    name string,
    expectedTypeName string,
    setLastSyncResourceVersion func(string),
    exitOnInitialEventsEndBookmark *bool,
    clock clock.Clock,
    errc chan error,
    stopCh <-chan struct{},
) error {
    eventCount := 0
    if exitOnInitialEventsEndBookmark != nil {
        *exitOnInitialEventsEndBookmark = false
    }

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested
        case err := <-errc:
            return err
        case event, ok := <-w.ResultChan():
            if !ok {
                break loop
            }
            if event.Type == watch.Error {
                return apierrors.FromObject(event.Object)
            }
            if expectedType != nil {
                if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
                    continue
                }
            }
            if expectedGVK != nil {
                if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
                    continue
                }
            }
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
                continue
            }
            resourceVersion := meta.GetResourceVersion()
            switch event.Type {
            case watch.Added:
                err := store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
                }
            case watch.Modified:
                err := store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
                }
            case watch.Deleted:
                err := store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
                }
            case watch.Bookmark:
                // A `Bookmark` means watch has synced here, just update the resourceVersion
                if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {
                    if exitOnInitialEventsEndBookmark != nil {
                        *exitOnInitialEventsEndBookmark = true
                    }
                }
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
            }
            setLastSyncResourceVersion(resourceVersion)
            if rvu, ok := store.(ResourceVersionUpdater); ok {
                rvu.UpdateResourceVersion(resourceVersion)
            }
            eventCount++
            if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
                watchDuration := clock.Since(start)
                klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
                return nil
            }
        }
    }

    watchDuration := clock.Since(start)
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
    }
    klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
    return nil
}
  • 参数:
    • start: 记录 watch 开始的时间。
    • w: 实现了 watch.Interface 接口的对象,用于接收 watch 事件。
    • store: 用于存储对象的本地存储,必须实现 Add、Update 和 Delete 方法。
    • expectedType: 预期接收对象的类型。
    • expectedGVK: 预期接收对象的 GroupVersionKind。
    • name: 用于日志记录的标识符。
    • expectedTypeName: 预期类型的名称,用于日志记录。
    • setLastSyncResourceVersion: 函数,用于设置最后同步的资源版本号。
    • exitOnInitialEventsEndBookmark: 指示是否在接收到初始事件结束的书签后退出。
    • clock: 用于获取当前时间,便于测试。
    • errc: 错误通道,用于接收错误。
    • stopCh: 停止通道,用于接收停止信号。
  • 通过for循环和 select 语句监听 stopCherrcw.ResultChan() 三个通道的事件。
  • 如果从 stopCh 接收到信号,函数返回 errorStopRequested 错误,表示请求停止。
  • 如果从 errc 接收到错误,函数直接返回该错误。
  • 处理watch事件:
    • 如果watch事件的类型为错误,将其转换为对应的 apierrors 并返回。
    • 验证事件对象类型和 GroupVersionKind 是否符合预期。如果不匹配,则记录错误并继续下一个事件。
    • 获取监听事件对象的元数据,并获取其资源版本。
    • 根据事件类型执行对应操作:
      • watch.Added:调用存储的 Add 方法将事件对象添加到存储中。
      • watch.Modified:调用存储的 Update 方法更新存储中的事件对象。
      • watch.Deleted:调用存储的 Delete 方法从存储中删除事件对象。
      • watch.Bookmark:如果事件的注释中包含 k8s.io/initial-events-end 键且值为 true,将退出初始事件结束书签的标志设置为 true
      • 其他类型的事件:记录错误并继续下一个事件。
  • 更新最后同步的资源版本。
  • 如果存储实现了 ResourceVersionUpdater 接口,调用其 UpdateResourceVersion 方法更新存储的资源版本。
  • 增加事件计数。
  • 如果设置了 exitOnInitialEventsEndBookmark 并且为 true,则记录日志并返回。
  • 如果watch持续时间很短且没有接收到任何事件,函数返回错误,提示watch意外关闭。否则,记录日志并正常退出。

LastSyncResourceVersion方法

用于获取 Reflector 实例中存储的最后同步的资源版本。

func (r *Reflector) LastSyncResourceVersion() string {
    r.lastSyncResourceVersionMutex.RLock()
    defer r.lastSyncResourceVersionMutex.RUnlock()
    return r.lastSyncResourceVersion
}
  • 函数首先通过调用 r.lastSyncResourceVersionMutex.RLock() 获取读锁,这是为了确保在多线程或并发环境中访问 lastSyncResourceVersion 字段时的线程安全。使用读锁(而不是写锁)是因为这个操作只涉及读取数据,不会修改数据。
  • 使用 defer 确保在函数返回之前释放读锁。
  • 函数返回 r.lastSyncResourceVersion 字段的值。

setLastSyncResourceVersion方法

用于设置 Reflector 结构体中的 lastSyncResourceVersion 字段的值。

func (r *Reflector) setLastSyncResourceVersion(v string) {
    r.lastSyncResourceVersionMutex.Lock()
    defer r.lastSyncResourceVersionMutex.Unlock()
    r.lastSyncResourceVersion = v
}
  • 获取 lastSyncResourceVersionMutex 的写锁。
  • 更新 r.lastSyncResourceVersion 的值为传入的参数 v
  • 释放写锁。

relistResourceVersion方法

用于确定在重新列出(list)操作时应使用的资源版本。

func (r *Reflector) relistResourceVersion() string {
    r.lastSyncResourceVersionMutex.RLock()
    defer r.lastSyncResourceVersionMutex.RUnlock()

    if r.isLastSyncResourceVersionUnavailable {
        return ""
    }
    if r.lastSyncResourceVersion == "" {
        return "0"
    }
    return r.lastSyncResourceVersion
}
  • 获取 r.lastSyncResourceVersionMutex的读锁。
  • defer释放读锁。
  • 如果 isLastSyncResourceVersionUnavailable 标志为真,表示最后同步的资源版本不可用,函数返回空字符串""。这意味着在下一次list操作时,将不指定资源版本,以便从etcd中进行一致性读取,获取最新可用的资源版本。
  • 如果 lastSyncResourceVersion 为空字符串,这通常表示是初始的list操作。为了性能考虑,使用"0"作为资源版本,这允许请求(如果启用了watch缓存)从watch缓存中读取。
  • 如果都不满足上述条件,即存在一个有效的 lastSyncResourceVersion,则直接返回这个值。

rewatchResourceVersion方法

用于确定在重新watch操作时应从哪个资源版本开始。

func (r *Reflector) rewatchResourceVersion() string {
    r.lastSyncResourceVersionMutex.RLock()
    defer r.lastSyncResourceVersionMutex.RUnlock()
    if r.isLastSyncResourceVersionUnavailable {
        return ""
    }
    return r.lastSyncResourceVersion
}
  • 获取 r.lastSyncResourceVersionMutex的读锁。
  • defer释放读锁。
  • 如果 isLastSyncResourceVersionUnavailable 标志为真,表示最后同步的资源版本不可用,函数返回空字符串""。这意味着在下一次watch操作时,将从最新的资源版本开始,确保返回的数据是一致的,就像直接从etcd中读取一样。
  • 否则直接返回 lastSyncResourceVersion,即从上一次同步的资源版本开始watch。

setIsLastSyncResourceVersionUnavailable方法

用于设置 isLastSyncResourceVersionUnavailable 字段的值。

func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
    r.lastSyncResourceVersionMutex.Lock()
    defer r.lastSyncResourceVersionMutex.Unlock()
    r.isLastSyncResourceVersionUnavailable = isUnavailable
}
  • 获取 r.lastSyncResourceVersionMutex 的写锁。
  • 释放写锁。
  • 设置 isLastSyncResourceVersionUnavailable 的值为传入的 isUnavailable

isExpiredError函数

用于检查一个错误是否表示资源已过期。

func isExpiredError(err error) bool {
    return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
}
  • 使用 apierrors.IsResourceExpired(err) 检查是不是表示资源已过期错误。这通常对应于HTTP状态码410(Gone),并且错误原因是 StatusReasonExpired
  • 使用 apierrors.IsGone(err) 检查是不是因为资源不存在错误。虽然这通常也对应于HTTP状态码410,但在Kubernetes 1.17及更早版本中,API服务器可能会因为资源版本过期而使用 StatusReasonGone
  • 如果任一检查返回true,则函数返回true,表示错误确实是因为资源过期。

isTooLargeResourceVersionError函数

用于判断是不是资源版本过大错误。

func isTooLargeResourceVersionError(err error) bool {
    if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
        return true
    }
    if !apierrors.IsTimeout(err) {
        return false
    }
    apierr, ok := err.(apierrors.APIStatus)
    if !ok || apierr == nil || apierr.Status().Details == nil {
        return false
    }
    for _, cause := range apierr.Status().Details.Causes {
        if cause.Message == "Too large resource version" {
            return true
        }
    }
    if strings.Contains(apierr.Status().Message, "Too large resource version") {
        return true
    }

    return false
}
  • 首先,它检查错误是否具有 metav1.CauseTypeResourceVersionTooLarge 的状态原因,这是一个直接的指示,表明请求的资源版本过大。
  • 对于Kubernetes版本1.17.0到1.18.5,apiserver在这个特定的错误条件下不会将错误状态原因设置为 metav1.CauseTypeResourceVersionTooLarge。相反,该函数会查找超时错误,然后检查错误消息中是否含有“资源版本过大”。这样做是为了保持与这些服务器版本的向后兼容性。
  • 最后对于1.17.0版本之前,它检查一般错误消息(而不是特定于原因的消息)中是否包含“资源版本过大”。

isWatchErrorRetriable函数

用于判断观察watch错误是否可以重试。

func isWatchErrorRetriable(err error) bool {
    if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
        return true
    }
    return false
}
  • 如果是“连接拒绝”错误,这意味着apiserver可能没有响应。在这种情况下,重新列出所有对象没有意义,因此,将该错误视为可重试,并返回 true。如果出现这种情况,开始指数级退避并重新发送观察请求。
  • 对于“429”错误(即请求过多),采取相同的处理策略。
  • 否则将其视为不可重试,并返回 false。
分类: go
0 0 投票数
文章评分
订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部
0
希望看到您的想法,请您发表评论x