go, k8s

client-go Indexer源码解读

介绍

Indexer是一种用于存储和索引Kubernetes资源对象的本地存储,ReflectorDeltaFIFO中将消费出来的资源对象存储至Indexer

Indexer中的数据与etcd中的数据是完全一致的,这样客户端无须每次都从apiserver获取,从而减少了请求过多对apiserver造成的压力。

Indexer在需要快速查找或筛选Kubernetes资源对象时非常有用。例如,如果你的应用需要频繁地根据标签、名称空间或其他属性来查询Pod列表,使用Indexer可以大大提高查询效率,因为它避免了每次查询时都去遍历整个对象列表。

index源码

源码地址:https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/index.go

Indexer接口

扩展了多个索引的Store,并限制每个累加器只保存当前对象(删除后为空)。

type Indexer interface {
    Store
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexedValue string) ([]string, error)
    ListIndexFuncValues(indexName string) []string
    ByIndex(indexName, indexedValue string) ([]interface{}, error)
    GetIndexers() Indexers
    AddIndexers(newIndexers Indexers) error
}
  • Store:是一个通用的对象存储和处理接口。一个Store维护了一个从字符串键(key)到累加器(accumulator)的映射,并提供了将给定对象添加、更新和删除到与给定键关联的累加器的操作。Store还知道如何从给定对象中提取键,因此许多操作只需要对象作为参数。源码地址:https://github.com/kubernetes/client-go/blob/master/tools/cache/store.go#L40
  • Index: 此方法返回与给定对象的索引值集相交的所有存储对象。针对指定的索引名称。
  • IndexKeys: 返回具有指定索引名称和索引值的所有存储对象的键。
  • ListIndexFuncValues: 返回给定索引名称的所有索引值。
  • ByIndex:返回索引值集合包含给定索引值的存储对象。
  • GetIndexers: 返回索引器(Indexers)。
  • AddIndexers: 向存储中添加更多的索引器。这个方法支持在存储中已经有项目之后添加索引。
type Store interface {
    Add(obj interface{}) error
    Update(obj interface{}) error
    Delete(obj interface{}) error
    List() []interface{}
    ListKeys() []string
    Get(obj interface{}) (item interface{}, exists bool, err error)
    GetByKey(key string) (item interface{}, exists bool, err error)
    Replace([]interface{}, string) error
    Resync() error
}

Index类型

type IndexFunc func(obj interface{}) ([]string, error)

type Index map[string]sets.String

type Indexers map[string]IndexFunc

type Indices map[string]Index
  • IndexFunc: 根据一系列的key计算对象的索引值集合。
  • Index: 以Key/Value的形式将数据存储到Store中。
  • Indexers: 存储的是索引器的名字与索引器的实现函数IndexFunc。
  • Indices: 存储缓存数据,形式Key/Value,其中Key为缓存器的名字,Value为缓存数据。

这几者的关系如下:

file

  • 每个IndexFunc根据对象的属性计算出索引值。
  • Indexers管理多个IndexFunc,并将每个IndexFunc的名称映射到该函数本身。
  • Indexers节点,有多个箭头指向Indices。每个箭头代表一个特定的索引名称和对应的 IndexFuncIndices存储实际的索引数据。
  • 每个 IndexIndices`的一个子集,存储了索引值到对象键的映射,以便快速查询。

NamespaceIndex常量

是一个常用的索引名称,用于根据对象的命名空间字段进行索引。

const (
    NamespaceIndex string = "namespace"
)

IndexFuncToKeyFuncAdapter函数

这是一个适配器函数,它将IndexFunc转换为KeyFunc。这在你的索引函数为每个对象返回唯一值时非常有用。但如果找到多个键,这种转换可能会产生错误。

func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
    return func(obj interface{}) (string, error) {
        indexKeys, err := indexFunc(obj)
        if err != nil {
            return "", err
        }
        if len(indexKeys) > 1 {
            return "", fmt.Errorf("too many keys: %v", indexKeys)
        }
        if len(indexKeys) == 0 {
            return "", fmt.Errorf("unexpected empty indexKeys")
        }
        return indexKeys[0], nil
    }
}

MetaNamespaceIndexFunc函数

默认的基于对象命名空间进行索引的函数。

func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
    meta, err := meta.Accessor(obj)
    if err != nil {
        return []string{""}, fmt.Errorf("object has no meta: %v", err)
    }
    return []string{meta.GetNamespace()}, nil
}

ThreadSafeStore源码

ThreadSafeStore是一个接口,允许并发安全地对存储后端进行索引访问。它类似于Indexer,但不(必须)知道如何从给定的对象中获取存储键,这使得ThreadSafeStore可以与不同的存储后端进行交互。IndexerThreadSafeMap的基础上进行封装,它继承了与ThreadSafeMap相关的操作方法并实现了IndexFunc等功能。
https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/thread_safe_store.go

ThreadSafeStore接口

Indexer类似,定义了一组用于并发安全访问存储的方法。

type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexedValue string) ([]string, error)
    ListIndexFuncValues(name string) []string
    ByIndex(indexName, indexedValue string) ([]interface{}, error)
    GetIndexers() Indexers
    AddIndexers(newIndexers Indexers) error
    Resync() error
}

cache结构体

用于实现缓存功能。它通过cacheStorage字段来存储缓存的数据,并使用keyFunc字段生成对应的键来进行数据的存储和检索。这是根据ThreadSafeStore和关联的KeyFunc实现的Indexer。所有的对象都缓存在内存中,这不是一个外部可以调用的对象,在包的外部不能直接调用cache。

type cache struct {
    cacheStorage ThreadSafeStore
    keyFunc KeyFunc
}

threadSafeMap结构体

实现了ThreadSafeStore接口,是一个内存中的存储,其中的数据并不会写入本地磁盘中,通过使用读写锁sync.RWMutex来保证多个goroutine可以安全地访问items字段,具备CRUD等操作。items字段是实际的Map存储数据的地方,key通过keyFunc函数计算得到,计算默认使用MetaNamespaceKeyFunc函数。
index是一个指向storeIndex对象的指针,用于对Map中的数据建立索引,以便更快地访问数据。

type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}
    index *storeIndex
}

// 用来为对象生成NamespaceKey
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
    if key, ok := obj.(ExplicitKey); ok {
        return string(key), nil
    }
    objName, err := ObjectToName(obj)
    if err != nil {
        return "", err
    }
    return objName.String(), nil
}

indexer,cache,ThreadSafeStore,store,threadSafeMap 几者的关系

  • store是最基础的键值存储
  • ThreadSafeStore在store基础上增加同步能力
  • threadSafeMap是ThreadSafeStore的核心实现
  • cache可以使用ThreadSafeStore等作为底层存储提供缓存策略
  • indexer通常会使用cache或ThreadSafeStore作为存储层,提供索引查询能力

ByIndex函数

Indexer索引的实现是通过index.ByIndex来完成的。ByIndex函数根据给定的索引名和索引值,从threadSafeMap中获取所有匹配的项。

func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()

    set, err := c.index.getKeysByIndex(indexName, indexedValue)
    if err != nil {
        return nil, err
    }
    list := make([]interface{}, 0, set.Len())
    for key := range set {
        list = append(list, c.items[key])
    }

    return list, nil
}
  • 首先获得读锁c.lock.RLock(),以便并发安全地读取数据。
  • 调用内部index的getKeysByIndex方法,根据索引名称indexName和索引值indexedValue获取该索引对应的所有键key。
  • 通过这些键key,遍历底层map获取对应的值,构建返回列表list。
  • 最后释放读锁c.lock.RUnlock(),并返回构建的列表和错误。

indexer的执行流程如下:

file

0 0 投票数
文章评分
订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部
0
希望看到您的想法,请您发表评论x