简介
在k8s中,组件之间通过http协议进行通信,通过使用cient-go的Informer保证了消息的实时性、可靠性、顺序性等,其他组件也是通过Informer与apiserver进行通信的。它的出现就是为了减少当多个控制器对apiserver进行大量访问时对apiserver造成的压力。
架构
上半部分是client-go的相关组件,下半部分是自定义控制器的组件。
Informer组件
- Reflector:使用List-Watch来保证本地缓存数据的准确性、顺序性和一致性。List对应资源的全量列表数据,是短连接。Watch监听指定的资源,是长链接。监听资源变更事件(例如Added、Updated和Deleted事件),并将资源对象存放到本地缓存DeltaFIFO中。
- DeltaFIFO:DeltaFIFO是一个先进先出的队列,用于存储由Reflector监听到的资源对象变化事件。每个事件都被封装成一个Delta对象,包含了事件类型(如Added、Updated、Deleted)和资源对象。
- SharedInformer:SharedInformer是Informer的核心,它协调Reflector、DeltaFIFO和Indexer的工作。SharedInformer从DeltaFIFO中消费事件,更新Indexer中的缓存,并将变化通知给所有注册的事件处理器(EventHandler)。
- Indexer:Indexer是一个缓存层,用于在本地存储当前集群中所有资源对象的最新状态,允许通过多种索引(如名称、命名空间等)来查询资源对象。SharedInformer从DeltaFIFO中将消费出来的资源对象存储到Indexer,Indexer数据与etcd保持一致。这样client-go就可以从本地读取,不用连etcd,减少了apiserver和etcd的压力。
整个流程如下:
- controller manager在启动的时候会启动一个sharedInformerFactory这是一个Informer的集合(informers map [reflect.Type] cache.SharedIndexInformer)。
- controller在run的时候会调用reflector的run,reflector在run的时候会listen and watch,当有event的时候插入本地缓存DeltaFIFO中并更新ResouVersion。
- Informer从队列取出事件;
- Informer更新Indexer的缓存;
- Indexer存储object和key到ThreadSafeStore本地存储;
- Informer根据事件类型调用相应的EventHandler处理逻辑,即发送对象给自定义控制器。
- 最后Indexer调用ThreadSafeStore底层存储。
自定义控制器组件
- Informer reference:是Informer的一个实例,主要用于处理与CRD对象相关的。当开发自定义控制器(custom controller)时,需要这个控制器创建相匹配的Informer。
- Resource Event Handler:这是一个回调函数,当一个Informer/SharedInformer要分发一个对象到控制器时,会调用此函数。例如:将对象的Key放在WorkQueue中并等待后续的处理。
- WorkQueue:工作者队列。前面我们提到过Informer,它除了更新本地缓存之外,还要将数据同步给相应控制器,WorkQueue就是为了数据同步的问题而产生的。当有资源被添加、修改或删除,Informer/SharedInformer就会将相应的事件加入到WorkQueue中。其它所有的控制器需要排队对这个queue进行读取,如果某个控制器发现这个事件与自己相关,就执行相应的操作。如果操作失败,就会把刚才取出的事件再放回到WorkQueue中,等再轮到自己执行时会再去重试这次失败的操作。如果操作成功,就将该事件从队列中删除。
- Process Item:用户自定义的处理WorkQueue中的相应Item的函数。比如可以在这里面使用Indexer或Listing wrapper来根据相应的Key检索对象。
- Indexer reference:是Indexer的一个实例,主要用于处理与CRD对象相关的。当开发自定义控制器时,需要创建Indexer的实例,这个实例主要作用是实现存储+索引。
这篇主要介绍架构的上半部分的Reflector,解释一下代码实现。
Reflector
Reflector结构体
type Reflector struct {
// name:字符串,标识Reflector,通常设置为文件:行格式,以便于识别。
name string
// typeDescription:字符串,描述Reflector期望处理的对象类型。仅用于显示目的。
typeDescription string
// expectedType:reflect.Type,表示Reflector期望放置在存储中的对象类型。这确保了类型安全。
expectedType reflect.Type
// expectedGVK:指向schema.GroupVersionKind结构体的指针,指定Reflector期望的对象的组、版本和种类,用于处理非结构化数据。
expectedGVK *schema.GroupVersionKind
// store:接口(Store),Reflector在其中同步观察到的对象。存储本质上是从Kubernetes API服务器获取的对象的本地缓存。
store Store
// listerWatcher:接口(ListerWatcher),Reflector用来从apiserver列出和监视资源。
listerWatcher ListerWatcher
// backoffManager:wait.BackoffManager,用于管理ListAndWatch操作的回退时间,帮助避免在重试期间打爆apiserver。
backoffManager wait.BackoffManager
// resyncPeriod:time.Duration,指示Reflector应多久与listerWatcher重新同步一次,即使没有检测到更改。
resyncPeriod time.Duration
// clock:接口(clock.Clock),用于时间相关操作,允许通过操纵时间来更容易地进行测试。
clock clock.Clock
// paginatedResult:布尔值,指示是否应该为列表调用强制分页,基于初始list调用结果。
paginatedResult bool
// lastSyncResourceVersion:字符串,存储在同步期间最后观察到的资源版本,用于优化ListAndWatch操作。
lastSyncResourceVersion string
// isLastSyncResourceVersionUnavailable:布尔值,如果使用lastSyncResourceVersion的前一个list请求失败并出现“过期”或“资源版本太大”错误,则isLastSyncResourceVersionUnavailable为true。
isLastSyncResourceVersionUnavailable bool
// lastSyncResourceVersionMutex:sync.RWMutex,提供对lastSyncResourceVersion的线程安全读/写访问。
lastSyncResourceVersionMutex sync.RWMutex
// WatchErrorHandler函数类型,每当ListAndWatch操作因错误而断开连接时调用。
watchErrorHandler WatchErrorHandler
// WatchListPageSize:int64,定义初始和重新同步watch列表的块大小,影响如何从API服务器获取数据。
WatchListPageSize int64
// ShouldResync:一个函数,决定是否应该重新同步存储,定期调用。
ShouldResync func() bool
// MaxInternalErrorRetryDuration:time.Duration,定义Reflector在遇到watch操作的内部错误后应重试多长时间。
MaxInternalErrorRetryDuration time.Duration
// UseWatchList:可选的*bool,如果启用,指示Reflector使用流式方法从API服务器获取数据,流的主要优点是使用较少的服务器资源来获取数据。旧的行为建立一个以块为单位获取数据的LIST请求。分页列表的效率较低,并且取决于对象的实际大小,可能会导致apisserver的内存消耗增加。
UseWatchList *bool
}
ResourceVersionUpdater接口
ResourceVersionUpdater
接口定义了一个UpdateResourceVersion
方法,这个方法的作用是更新反射器当前的资源版本。实现ResourceVersionUpdater
接口允许开发者自定义如何处理资源版本的更新。例如,开发者可以实现一个更新内部状态或者触发特定逻辑的UpdateResourceVersion
方法。
type ResourceVersionUpdater interface {
UpdateResourceVersion(resourceVersion string)
}
WatchErrorHandler类型
WatchErrorHandler
是一个函数类型,定义了当ListAndWatch
操作因为错误而断开连接时应该调用的错误处理程序。这个错误处理程序的目的是允许开发者自定义如何响应和处理监视过程中遇到的错误。
type WatchErrorHandler func(r *Reflector, err error)
DefaultWatchErrorHandler函数
默认实现会根据错误类型将错误消息记录在适当的日志级别。然而,开发者可以提供自己的实现来以不同的方式展示错误消息。例如,可以实现一个WatchErrorHandler
,将错误消息发送到监控系统或者执行一些恢复操作。
func DefaultWatchErrorHandler(r *Reflector, err error) {
switch {
case isExpiredError(err):
// 如果错误是由于 watch 连接过期导致的(isExpiredError(err) 返回 true),则不会设置 LastSyncResourceVersionUnavailable。这是因为使用资源版本(ResourceVersion)进行 LIST 调用的前提是已经有一个明确的资源版本了。因此,首先尝试使用最后观察到的对象的资源版本进行 LIST 调用。
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
case err == io.EOF:
// 如果错误是 io.EOF,这意味着 watch 连接正常关闭,没有特别的错误处理。
case err == io.ErrUnexpectedEOF:
// 如果错误是 io.ErrUnexpectedEOF,这表示 watch 连接以意外的方式结束了。这种情况下,会记录一条日志,日志级别为 V(1),表示这是一个值得注意的异常情况。
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err)
default:
// 对于所有其他类型的错误,将使用 utilruntime.HandleError 函数记录错误。这意味着错误被认为是严重的,需要被记录并可能需要进一步的处理。
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err))
}
}
var (
// 定义了一个变量 minWatchTimeout,设置 watch 请求的超时时间,可以降低 apiserver 的负载。超时时间是随机的,在 [minWatchTimeout, 2*minWatchTimeout] 范围内。这里,minWatchTimeout 被设置为 5 分钟,意味着实际的超时时间将在 5 到 10 分钟之间随机选择。
minWatchTimeout = 5 * time.Minute
)
NewNamespaceKeyedIndexerAndReflector函数
函数接受 ListerWatcher、expectedType接口、resyncPeriod
参数,并返回一个索引器(Indexer)和反射器(Reflector)。
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
return indexer, reflector
}
- 首先使用
NewIndexer
函数创建一个Indexer,Indexer使用MetaNamespaceKeyFunc
作为键,并使用MetaNamespaceIndexFunc
作为索引生成函数。这意味着Indexer将根据资源的命名空间和名称来索引资源。 - 再使用
NewReflector
函数创建一个Reflector,Reflector使用前面创建的Indexer作为存储,lw
和expectedType
作为参数,并设置重新同步周期为给定的 resyncPeriod。 - 最后,函数返回创建的Indexer和Reflector。
NewReflector函数
NewReflector
函数根据给定的 ListerWatcher、expectedType、store、resyncPeriod
创建一个新的Reflector。
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
}
函数内部调用了 NewReflectorWithOptions
函数,并传递了相同的参数,同时使用了 ReflectorOptions
结构体来设置反射器的选项。其中,ResyncPeriod 被设置为给定的 resyncPeriod。最后,函数返回创建的Reflector。
NewNamedReflector函数
NewNamedReflector
函数与 NewReflector
函数类似,但它可以指定Reflector的名称。
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
}
函数内部调用了 NewReflectorWithOptions
函数,并传递了相同的参数,同时使用了 ReflectorOptions
结构体来设置反射器的选项。其中,Name
被设置为给定的 name
,ResyncPeriod
被设置为给定的 resyncPeriod
。最后,函数返回创建的Reflector。
ReflectorOptions结构体
用于设置Reflector的选项。这些选项可以用于自定义Reflector的行为和属性。
type ReflectorOptions struct {
// Name 是反射器的名称。如果未设置或未指定,则名称默认为调用栈中位于该包之外的最接近的源文件的行号。
Name string
// TypeDescription 是反射器的类型描述。如果未设置或未指定,则类型描述按照以下规则默认值:如果传递给 NewReflectorWithOptions 的 expectedType 为 nil,则类型描述为 “”。如果 expectedType 是 *unstructured.Unstructured 的实例并且其 apiVersion 和 kind 字段已设置,则类型描述为其字符串编码。否则,类型描述设置为 expectedType 的 Go 类型。
TypeDescription string
// ResyncPeriod 是反射器的重新同步周期。如果未设置或未指定,则重新同步周期默认为 0(不重新同步)。
ResyncPeriod time.Duration
// Clock 允许测试控制时间。如果未设置,默认为 clock.RealClock{}。
Clock clock.Clock
}
NewReflectorWithOptions函数
用于创建一个带有选项的Reflector。
func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
reflectorClock := options.Clock
if reflectorClock == nil {
reflectorClock = clock.RealClock{}
}
r := &Reflector{
name: options.Name,
resyncPeriod: options.ResyncPeriod,
typeDescription: options.TypeDescription,
listerWatcher: lw,
store: store,
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
clock: reflectorClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
expectedType: reflect.TypeOf(expectedType),
}
if r.name == "" {
r.name = naming.GetNameFromCallsite(internalPackages...)
}
if r.typeDescription == "" {
r.typeDescription = getTypeDescriptionFromObject(expectedType)
}
if r.expectedGVK == nil {
r.expectedGVK = getExpectedGVKFromObject(expectedType)
}
if r.UseWatchList == nil {
if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
r.UseWatchList = ptr.To(true)
}
}
return r
}
- 首先根据
options.Clock
的值来确定使用的时钟对象,如果未设置,则使用默认的clock.RealClock{}
。 - 创建了反射器
r
,并设置了name,resyncPeriod,typeDescription,listerWatcher,store,backoffManager,clock,watchErrorHandler,expectedType属性。 - 如果反射器的名称为空,则使用
naming.GetNameFromCallsite
函数根据调用栈来获取名称。 - 如果反射器的类型描述为空,则使用
getTypeDescriptionFromObject
函数根据expectedType
来获取类型描述。 - 如果反射器的期望的
GroupVersionKind(GVK)
为空,则使用getExpectedGVKFromObject
函数根据expectedType
来获取 GVK。 - 如果反射器的
UseWatchList
属性未设置,则根据环境变量ENABLE_CLIENT_GO_WATCH_LIST_ALPHA
的值来设置。 - 最后,函数返回创建的反射器。
getTypeDescriptionFromObject函数
用于根据给定的 expectedType
获取类型描述。如果 expectedType
是 *unstructured.Unstructured
的实例并且具有非空的 GVK
,则返回 GVK
的字符串作为 TypeDescription
。否则,返回reflectDescription
作为TypeDescription
。
func getTypeDescriptionFromObject(expectedType interface{}) string {
if expectedType == nil {
return defaultExpectedTypeName
}
reflectDescription := reflect.TypeOf(expectedType).String()
obj, ok := expectedType.(*unstructured.Unstructured)
if !ok {
return reflectDescription
}
gvk := obj.GroupVersionKind()
if gvk.Empty() {
return reflectDescription
}
return gvk.String()
}
getExpectedGVKFromObject函数
根据给定的 expectedType
来获取期望的 GVK
。如果 expectedType
是 *unstructured.Unstructured
的实例并且具有非空的 GVK
,则返回 GVK
的指针。否则,返回 nil。
func getExpectedGVKFromObject(expectedType interface{}) *schema.GroupVersionKind {
obj, ok := expectedType.(*unstructured.Unstructured)
if !ok {
return nil
}
gvk := obj.GroupVersionKind()
if gvk.Empty() {
return nil
}
return &gvk
}
Run方法
Run
方法是 Reflector
结构体的一个成员方法,接受一个类型为 <-chan struct{}
的 stopCh
通道参数。该方法用于启动一个反射器并运行一个循环来监听和处理资源的变化。
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
}
var (
neverExitWatch <-chan time.Time = make(chan time.Time)
errorStopRequested = errors.New("stop requested")
)
- 首先使用
klog.V(3).Infof
输出一条日志,表示反射器的启动信息。 - 然后使用
wait.BackoffUntil
方法。- 匿名函数,用于执行
r.ListAndWatch(stopCh)
操作。 - 在每次执行之前,会先检查是否需要停止。如果
stopCh
接收到了信号,则会立即停止循环。 - 在每次执行时,会调用
r.ListAndWatch(stopCh)
方法来监听和处理资源的变化。 - 如果出现错误,则调用
watchErrorHandler
处理错误。 - 在每次执行之后,会根据指定的
backoffManager
进行等待,等待时间会逐渐增加,以防止频繁的重试。 - 直到收到
stopCh
信号或发生错误。
- 匿名函数,用于执行
- 最后,使用
klog.V(3).Infof
输出一条日志,表示反射器的停止信息。 - 变量
neverExitWatch
是一个只读的时间通道,用于表示永远不会发送任何值。 - 变量
errorStopRequested
是一个错误对象,用于表示停止请求的信号,通常由调用方通过stopCh
传递给反射器。
BackoffUntil函数
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{})
函数逻辑如下:
- 初始化重试次数为0。
- 进入一个无限循环,直到满足停止条件(stopCh通道关闭)或执行函数返回false。
- 在每次循环中,调用f函数执行重试操作。如果f返回true,表示应继续重试。
- 如果f返回false,表示不应继续重试,退出循环。
- 基于退避策略计算下一次重试的等待时间。
- 在等待时间结束后,继续下一次循环。
resyncChan方法
resyncChan
方法是 Reflector
结构体的一个成员方法,该方法返回两个值:一个类型为 <-chan time.Time
的通道和一个类型为 func() bool
的函数。
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
if r.resyncPeriod == 0 {
return neverExitWatch, func() bool { return false }
}
t := r.clock.NewTimer(r.resyncPeriod)
return t.C(), t.Stop
}
- 首先检查
r.resyncPeriod
的值。如果为0,表示不需要进行重新同步操作,那么就返回一个永远不会发送任何值的通道neverExitWatch
,以及一个始终返回false
的函数。 - 如果
r.resyncPeriod
的值不为0,表示需要进行重新同步操作,那么就创建一个新的定时器t
,使用r.clock.NewTimer(r.resyncPeriod)
方法,并设置定时器的周期为r.resyncPeriod
。 - 然后,返回定时器的通道
t.C()
,以及定时器的Stop
方法t.Stop
。t.C()
返回一个<-chan time.Time
类型的通道,当定时器到达设定的时间时,该通道将接收到一个值。t.Stop
是用于停止定时器的方法。
通过调用 resyncChan
方法,可以获取用于重新同步操作的定时器通道和停止定时器的方法。
ListAndWatch方法
ListAndWatch
方法是 Reflector
结构体的一个成员方法。该方法用于执行资源的ListAndWatch
操作,根据配置的 UseWatchList
标志选择使用 WatchList
或回退到使用 List
方法来获取资源的初始列表。然后启动一个goroutine来进行周期性的重新同步操作,并调用 watch 方法来监听资源的变化。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
var err error
var w watch.Interface
useWatchList := ptr.Deref(r.UseWatchList, false)
fallbackToList := !useWatchList
if useWatchList {
w, err = r.watchList(stopCh)
if w == nil && err == nil {
return nil
}
if err != nil {
klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err)
fallbackToList = true
w = nil
}
}
if fallbackToList {
err = r.list(stopCh)
if err != nil {
return err
}
}
klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go r.startResync(stopCh, cancelCh, resyncerrc)
return r.watch(w, stopCh, resyncerrc)
}
- 首先使用
klog.V(3).Infof
输出一条日志,表示正在执行ListAndWatch
操作。 - 然后,定义了两个变量
err
和w
,用于存储可能的错误和监视器对象。 - 通过
ptr.Deref
函数获取r.UseWatchList
的值,并将其赋给useWatchList
变量。ptr.Deref
函数用于获取指针类型变量的值,如果指针为空则返回提供的默认值。 - 根据
r.UseWatchList
的值来决定是否使用watchList
方法。如果r.UseWatchList
为真,则调用watchList
方法来获取监视器对象w
和可能的错误err
。如果w
为nil
并且err
为nil
,表示stopCh
已关闭,直接返回nil
。 - 如果使用
watchList
方法出现错误,则会输出一个警告日志,并将fallbackToList
设置为真,表示需要回退到使用list
方法,将w
设置为nil
,以便后续使用标准的LIST/WATCH
机制。 - 调用
list
方法来获取资源的初始列表,并将可能的错误err
返回。如果err
不为nil
,则直接返回错误。 - 使用
klog.V(2).Infof
输出一条日志,表示缓存已填充的类型描述和名称。 - 创建了一个缓冲大小为1的
resyncerrc
通道,一个用于取消操作的cancelCh
通道,并在函数结束时关闭cancelCh
通道。 - 启动一个goroutine来执行
startResync
方法,进行周期性的重新同步操作。同时,调用watch
方法来监听资源的变化,并将监视器对象w
、停止信号通道stopCh
和重新同步错误通道resyncerrc
作为参数传递给watch
方法。可以实现在后台执行周期性的重新同步操作,而不会阻塞主线程。同时,通过关闭cancelCh
通道,可以在需要时取消重新同步操作。 - 最后,返回
watch
方法的结果。
startResync方法
该方法用于执行周期性的重新同步操作,当发生错误时,发送错误到通道中。
func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}
- 首先调用
r.resyncChan()
方法获取一个用于触发重新同步操作的通道resyncCh
和一个清理函数cleanup
。使用defer延迟执行cleanup
函数,以确保在方法结束时清理资源。 - 进入无限循环,通过select语句监听三个通道:
resyncCh、stopCh 和 cancelCh
。- 如果从
resyncCh
接收到值,表示定时器已触发,需要进行重新同步操作。 - 如果从
stopCh
接收到值,表示停止信号已经触发,需要退出循环。 - 如果从
cancelCh
接收到值,表示取消重新同步操作的信号已经触发,需要退出循环。
- 如果从
- 如果
r.ShouldResync
为 nil 或者调用r.ShouldResync()
返回 true,表示需要进行重新同步操作。函数打印一条日志,并调用r.store.Resync()
方法执行实际的重新同步操作。如果执行过程中出现错误,会将错误发送到resyncerrc
通道,并返回。 - 在每次重新同步操作完成后,会调用
cleanup()
函数停止和清理定时器。 - 调用
r.resyncChan()
获取一个新的定时器通道resyncCh
和停止函数cleanup
,以便下一次循环使用。
watch方法
该方法用于监听资源的变化,并在每次变化发生后调用相应的处理函数。
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
var err error
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
for {
select {
case <-stopCh:
if w != nil {
w.Stop()
}
return nil
default:
}
start := r.clock.Now()
if w == nil {
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(),
TimeoutSeconds: &timeoutSeconds,
AllowWatchBookmarks: true,
}
w, err = r.listerWatcher.Watch(options)
if err != nil {
if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err)
select {
case <-stopCh:
return nil
case <-r.backoffManager.Backoff().C():
continue
}
}
return err
}
}
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
w.Stop()
w = nil
retry.After(err)
if err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription)
select {
case <-stopCh:
return nil
case <-r.backoffManager.Backoff().C():
continue
}
case apierrors.IsInternalError(err) && retry.ShouldRetry():
klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err)
continue
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err)
}
}
return nil
}
}
}
- 首先,函数定义了一个变量
err
和一个重试策略retry
。 - 进入无限循环,通过
select
语句监听stopCh
通道的事件。如果从stopCh
接收到值,需要调用w.Stop()
停止监听器,然后函数返回退出循环。如果没有接收到值,则继续执行后续操作。 - 记录当前时间
start
,用于计算请求的耗时。 - 如果监视器
w
为nil
,表示需要创建一个新的监视器。函数根据一些选项参数调用r.listerWatcher.Watch(options)
方法来创建监视器 w,并将错误赋值给 err。如果创建监听器的过程中出现了可以重试的错误,函数打印一条日志,并在一定的退避时间后继续下一轮循环。如果在退避等待期间接收到了stopCh
通道的值,函数立即返回。如果创建监听器的过程中出现了无法重试的错误,函数直接返回该错误。 - 调用
watchHandler
函数进行实际的监听操作。该函数处理监听事件,将事件存储到r.store
中,并在需要时执行重新同步操作。函数还负责处理错误情况,并根据错误类型执行相应的操作。 - 在处理完事件后,调用
w.Stop()
方法来停止监视器对象的监听,并将w
设置为nil
,以确保在下一次循环中重新创建新的监视器对象。 - 根据返回的错误
err
进行判断和处理。如果err
不为nil
,并且该错误不等于特定的errorStopRequested
错误,则根据不同的错误类型进行相应的处理:- 如果是
isExpiredError
错误,输出一条日志。 - 如果是
429 Too Many Requests
错误,并在一定的退避时间后继续下一轮循环。如果在退避等待期间接收到了stopCh
通道的值,函数立即返回。 - 如果是
apierrors.IsInternalError
错误,并且重试策略允许重试(retry.ShouldRetry() 返回 true),输出一条日志并继续下一次循环。 - 其他情况下,输出一条警告日志。
- 如果是
- 如果是 errorStopRequested 错误,表示停止信号已经触发,直接返回 nil。
- 最后,回到循环的开始,继续下一次循环。
list方法
该方法用于从列表中获取资源,并处理重试和错误情况。它更新 Reflector
的状态,并将获取到的资源进行同步操作。
func (r *Reflector) list(stopCh <-chan struct{}) error {
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
case options.ResourceVersion != "" && options.ResourceVersion != "0":
pager.PageSize = 0
}
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
if err != nil {
klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err)
return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
}
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractListWithAlloc(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
initTrace.Step("Objects extracted")
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}
- 首先定义了一些变量设置跟踪信息,创建通道,用于存储获取到的资源,处理错误信息。
- 启动一个 goroutine 来执行获取资源的操作。在 goroutine 内部,尝试按块收集列表数据,使用一个分页器(pager)进行操作。分页器的配置基于
Reflector
的WatchListPageSize
字段和提供的ListOptions
。如果不支持分页或未请求分页,则返回完整的列表响应。 - 使用
r.listerWatcher.List(opts)
方法来获取资源列表。如果列表操作因为isExpiredError
错误或isTooLargeResourceVersionError
错误而失败,函数将使用空的资源版本进行重试,以便恢复并继续进行。 - goroutine 通过关闭
listCh
通道来标识列表操作完成。主函数通过等待listCh
通道来等待列表操作的完成。(defer) - 如果在
stopCh
通道上接收到停止信号,函数会提前返回。 - 在主 goroutine 中,通过 select 语句监听多个通道的事件:
- 如果从
stopCh
接收到值,表示停止信号已经触发,需要退出方法。 - 如果从
panicCh
接收到值,表示在获取资源的 goroutine 中发生了 panic,需要重新抛出 panic。 - 如果从
listCh
接收到值,表示资源列表已经获取完成,可以继续执行后续操作。
- 如果从
- 如果list操作成功完成,函数提取对象列表及其资源版本。如果出现错误,会输出一条警告日志,并返回相应的错误信息。
- 函数更新 Reflector 的状态,并调用
r.syncWith(items, resourceVersion)
方法来进行同步操作。 - 函数更新最后同步的资源版本,并返回 nil 表示方法执行成功。
watchList方法
该方法通过watch操作获取资源对象的变化,将变化同步到存储中,并返回一个监视器。
func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
var w watch.Interface
var err error
var temporaryStore Store
var resourceVersion string
isErrorRetriableWithSideEffectsFn := func(err error) bool {
if canRetry := isWatchErrorRetriable(err); canRetry {
klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err)
<-r.backoffManager.Backoff().C()
return true
}
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
return true
}
return false
}
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
defer initTrace.LogIfLong(10 * time.Second)
for {
select {
case <-stopCh:
return nil, nil
default:
}
resourceVersion = ""
lastKnownRV := r.rewatchResourceVersion()
temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: lastKnownRV,
AllowWatchBookmarks: true,
SendInitialEvents: pointer.Bool(true),
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
TimeoutSeconds: &timeoutSeconds,
}
start := r.clock.Now()
w, err = r.listerWatcher.Watch(options)
if err != nil {
if isErrorRetriableWithSideEffectsFn(err) {
continue
}
return nil, err
}
bookmarkReceived := pointer.Bool(false)
err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
func(rv string) { resourceVersion = rv },
bookmarkReceived,
r.clock, make(chan error), stopCh)
if err != nil {
w.Stop() // stop and retry with clean state
if err == errorStopRequested {
return nil, nil
}
if isErrorRetriableWithSideEffectsFn(err) {
continue
}
return nil, err
}
if *bookmarkReceived {
break
}
}
initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())})
r.setIsLastSyncResourceVersionUnavailable(false)
checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore)
if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
return w, nil
}
- 函数初始化一些变量,包括监听器(watch.Interface)、错误、临时存储(temporaryStore)和资源版本(resourceVersion)。
- 定义了一个函数
isErrorRetriableWithSideEffectsFn
,该函数会根据错误类型进行判断,并在需要重试时输出一条日志并进行相应的处理。 - 启动一个无限循环,在每次循环开始时,通过
select
语句监听stopCh
通道的事件。如果从stopCh
接收到值,表示停止信号已经触发,需要退出循环。如果没有接收到值,则继续执行后续操作。 - 重置资源版本,并从
rewatchResourceVersion
函数获取最后已知的资源版本。 - 创建一个临时存储(temporaryStore),用于暂存从监听流中获取的对象。
- 设置列表选项,包括资源版本、允许使用书签、发送初始事件、资源版本匹配条件以及超时时间。
- 记录开始时间,并通过
listerWatcher
的Watch
方法创建监视器。如果创建过程中出现错误,并且满足重试条件,则继续下一次循环。如果不满足重试条件,则返回错误。 - 如果watch操作成功,调用
watchHandler
函数处理监听流,并将获取的对象存储到临时存储中。 - 如果处理监听流的过程中出现错误,停止监听器并根据错误类型和重试函数的判断,决定是否进行重试或返回错误。
- 如果接收到书签(bookmark),则退出循环。否则,继续下一次循环。
- 如果成功从监听流中获取了初始状态,并确认了
k8s.io/initial-events-end
书签,记录日志并设置isLastSyncResourceVersionUnavailable
为 false。 - 检查监听列表的一致性。
- 将临时存储中的对象替换当前存储(store)中的对象,并设置最后同步的资源版本为当前资源版本。
- 记录日志并返回监听器。
syncWith方法
用于将给定的对象列表与存储中的对象进行同步。
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
初始化了一个名为 found
的 []interface{}
类型的切片。容量为 items
切片的长度。
遍历对象列表 items
,将每个对象添加到 found
切片中。
调用存储(store)的 Replace
方法,将 found
切片中的对象替换掉存储中的对象,并使用给定的资源版本进行更新。确保了本地缓存的状态与远端(例如 Kubernetes 集群)保持一致。
watchHandler函数
函数监听来自 Kubernetes API 的 watch 事件,并根据事件类型(如添加、修改、删除)更新本地存储(store)。
func watchHandler(start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
exitOnInitialEventsEndBookmark *bool,
clock clock.Clock,
errc chan error,
stopCh <-chan struct{},
) error {
eventCount := 0
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = false
}
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if expectedType != nil {
if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
continue
}
}
if expectedGVK != nil {
if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
continue
}
resourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Modified:
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Deleted:
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {
if exitOnInitialEventsEndBookmark != nil {
*exitOnInitialEventsEndBookmark = true
}
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}
setLastSyncResourceVersion(resourceVersion)
if rvu, ok := store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(resourceVersion)
}
eventCount++
if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
watchDuration := clock.Since(start)
klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
return nil
}
}
}
watchDuration := clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
return nil
}
- 参数:
- start: 记录 watch 开始的时间。
- w: 实现了 watch.Interface 接口的对象,用于接收 watch 事件。
- store: 用于存储对象的本地存储,必须实现 Add、Update 和 Delete 方法。
- expectedType: 预期接收对象的类型。
- expectedGVK: 预期接收对象的 GroupVersionKind。
- name: 用于日志记录的标识符。
- expectedTypeName: 预期类型的名称,用于日志记录。
- setLastSyncResourceVersion: 函数,用于设置最后同步的资源版本号。
- exitOnInitialEventsEndBookmark: 指示是否在接收到初始事件结束的书签后退出。
- clock: 用于获取当前时间,便于测试。
- errc: 错误通道,用于接收错误。
- stopCh: 停止通道,用于接收停止信号。
- 通过for循环和 select 语句监听
stopCh
、errc
和w.ResultChan()
三个通道的事件。 - 如果从
stopCh
接收到信号,函数返回errorStopRequested
错误,表示请求停止。 - 如果从
errc
接收到错误,函数直接返回该错误。 - 处理watch事件:
- 如果watch事件的类型为错误,将其转换为对应的
apierrors
并返回。 - 验证事件对象类型和
GroupVersionKind
是否符合预期。如果不匹配,则记录错误并继续下一个事件。 - 获取监听事件对象的元数据,并获取其资源版本。
- 根据事件类型执行对应操作:
watch.Added
:调用存储的Add
方法将事件对象添加到存储中。watch.Modified
:调用存储的Update
方法更新存储中的事件对象。watch.Deleted
:调用存储的Delete
方法从存储中删除事件对象。watch.Bookmark
:如果事件的注释中包含k8s.io/initial-events-end
键且值为true
,将退出初始事件结束书签的标志设置为true
。- 其他类型的事件:记录错误并继续下一个事件。
- 如果watch事件的类型为错误,将其转换为对应的
- 更新最后同步的资源版本。
- 如果存储实现了
ResourceVersionUpdater
接口,调用其UpdateResourceVersion
方法更新存储的资源版本。 - 增加事件计数。
- 如果设置了
exitOnInitialEventsEndBookmark
并且为true
,则记录日志并返回。 - 如果watch持续时间很短且没有接收到任何事件,函数返回错误,提示watch意外关闭。否则,记录日志并正常退出。
LastSyncResourceVersion方法
用于获取 Reflector
实例中存储的最后同步的资源版本。
func (r *Reflector) LastSyncResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
return r.lastSyncResourceVersion
}
- 函数首先通过调用
r.lastSyncResourceVersionMutex.RLock()
获取读锁,这是为了确保在多线程或并发环境中访问lastSyncResourceVersion
字段时的线程安全。使用读锁(而不是写锁)是因为这个操作只涉及读取数据,不会修改数据。 - 使用
defer
确保在函数返回之前释放读锁。 - 函数返回
r.lastSyncResourceVersion
字段的值。
setLastSyncResourceVersion方法
用于设置 Reflector
结构体中的 lastSyncResourceVersion
字段的值。
func (r *Reflector) setLastSyncResourceVersion(v string) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.lastSyncResourceVersion = v
}
- 获取
lastSyncResourceVersionMutex
的写锁。 - 更新
r.lastSyncResourceVersion
的值为传入的参数v
。 - 释放写锁。
relistResourceVersion方法
用于确定在重新列出(list)操作时应使用的资源版本。
func (r *Reflector) relistResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionUnavailable {
return ""
}
if r.lastSyncResourceVersion == "" {
return "0"
}
return r.lastSyncResourceVersion
}
- 获取
r.lastSyncResourceVersionMutex
的读锁。 - defer释放读锁。
- 如果
isLastSyncResourceVersionUnavailable
标志为真,表示最后同步的资源版本不可用,函数返回空字符串""。这意味着在下一次list操作时,将不指定资源版本,以便从etcd中进行一致性读取,获取最新可用的资源版本。 - 如果
lastSyncResourceVersion
为空字符串,这通常表示是初始的list操作。为了性能考虑,使用"0"作为资源版本,这允许请求(如果启用了watch缓存)从watch缓存中读取。 - 如果都不满足上述条件,即存在一个有效的
lastSyncResourceVersion
,则直接返回这个值。
rewatchResourceVersion方法
用于确定在重新watch操作时应从哪个资源版本开始。
func (r *Reflector) rewatchResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionUnavailable {
return ""
}
return r.lastSyncResourceVersion
}
- 获取
r.lastSyncResourceVersionMutex
的读锁。 - defer释放读锁。
- 如果
isLastSyncResourceVersionUnavailable
标志为真,表示最后同步的资源版本不可用,函数返回空字符串""。这意味着在下一次watch操作时,将从最新的资源版本开始,确保返回的数据是一致的,就像直接从etcd中读取一样。 - 否则直接返回
lastSyncResourceVersion
,即从上一次同步的资源版本开始watch。
setIsLastSyncResourceVersionUnavailable方法
用于设置 isLastSyncResourceVersionUnavailable
字段的值。
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.isLastSyncResourceVersionUnavailable = isUnavailable
}
- 获取
r.lastSyncResourceVersionMutex
的写锁。 - 释放写锁。
- 设置
isLastSyncResourceVersionUnavailable
的值为传入的isUnavailable
。
isExpiredError函数
用于检查一个错误是否表示资源已过期。
func isExpiredError(err error) bool {
return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
}
- 使用
apierrors.IsResourceExpired(err)
检查是不是表示资源已过期错误。这通常对应于HTTP状态码410(Gone),并且错误原因是StatusReasonExpired
。 - 使用
apierrors.IsGone(err)
检查是不是因为资源不存在错误。虽然这通常也对应于HTTP状态码410,但在Kubernetes 1.17及更早版本中,API服务器可能会因为资源版本过期而使用StatusReasonGone
。 - 如果任一检查返回true,则函数返回true,表示错误确实是因为资源过期。
isTooLargeResourceVersionError函数
用于判断是不是资源版本过大错误。
func isTooLargeResourceVersionError(err error) bool {
if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
return true
}
if !apierrors.IsTimeout(err) {
return false
}
apierr, ok := err.(apierrors.APIStatus)
if !ok || apierr == nil || apierr.Status().Details == nil {
return false
}
for _, cause := range apierr.Status().Details.Causes {
if cause.Message == "Too large resource version" {
return true
}
}
if strings.Contains(apierr.Status().Message, "Too large resource version") {
return true
}
return false
}
- 首先,它检查错误是否具有
metav1.CauseTypeResourceVersionTooLarge
的状态原因,这是一个直接的指示,表明请求的资源版本过大。 - 对于Kubernetes版本1.17.0到1.18.5,apiserver在这个特定的错误条件下不会将错误状态原因设置为
metav1.CauseTypeResourceVersionTooLarge
。相反,该函数会查找超时错误,然后检查错误消息中是否含有“资源版本过大”。这样做是为了保持与这些服务器版本的向后兼容性。 - 最后对于1.17.0版本之前,它检查一般错误消息(而不是特定于原因的消息)中是否包含“资源版本过大”。
isWatchErrorRetriable函数
用于判断观察watch错误是否可以重试。
func isWatchErrorRetriable(err error) bool {
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
return true
}
return false
}
- 如果是“连接拒绝”错误,这意味着apiserver可能没有响应。在这种情况下,重新列出所有对象没有意义,因此,将该错误视为可重试,并返回 true。如果出现这种情况,开始指数级退避并重新发送观察请求。
- 对于“429”错误(即请求过多),采取相同的处理策略。
- 否则将其视为不可重试,并返回 false。