介绍
Indexer
是一种用于存储和索引Kubernetes资源对象的本地存储,Reflector
从DeltaFIFO
中将消费出来的资源对象存储至Indexer
。
Indexer
中的数据与etcd中的数据是完全一致的,这样客户端无须每次都从apiserver获取,从而减少了请求过多对apiserver造成的压力。
Indexer
在需要快速查找或筛选Kubernetes资源对象时非常有用。例如,如果你的应用需要频繁地根据标签、名称空间或其他属性来查询Pod列表,使用Indexer
可以大大提高查询效率,因为它避免了每次查询时都去遍历整个对象列表。
index源码
Indexer接口
扩展了多个索引的Store
,并限制每个累加器只保存当前对象(删除后为空)。
type Indexer interface {
Store
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ListIndexFuncValues(indexName string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
}
- Store:是一个通用的对象存储和处理接口。一个Store维护了一个从字符串键(key)到累加器(accumulator)的映射,并提供了将给定对象添加、更新和删除到与给定键关联的累加器的操作。Store还知道如何从给定对象中提取键,因此许多操作只需要对象作为参数。源码地址:https://github.com/kubernetes/client-go/blob/master/tools/cache/store.go#L40
- Index: 此方法返回与给定对象的索引值集相交的所有存储对象。针对指定的索引名称。
- IndexKeys: 返回具有指定索引名称和索引值的所有存储对象的键。
- ListIndexFuncValues: 返回给定索引名称的所有索引值。
- ByIndex:返回索引值集合包含给定索引值的存储对象。
- GetIndexers: 返回索引器(Indexers)。
- AddIndexers: 向存储中添加更多的索引器。这个方法支持在存储中已经有项目之后添加索引。
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
Index类型
type IndexFunc func(obj interface{}) ([]string, error)
type Index map[string]sets.String
type Indexers map[string]IndexFunc
type Indices map[string]Index
- IndexFunc: 根据一系列的key计算对象的索引值集合。
- Index: 以Key/Value的形式将数据存储到Store中。
- Indexers: 存储的是索引器的名字与索引器的实现函数IndexFunc。
- Indices: 存储缓存数据,形式Key/Value,其中Key为缓存器的名字,Value为缓存数据。
这几者的关系如下:
- 每个
IndexFunc
根据对象的属性计算出索引值。 Indexers
管理多个IndexFunc
,并将每个IndexFunc
的名称映射到该函数本身。- 从
Indexers
节点,有多个箭头指向Indices
。每个箭头代表一个特定的索引名称和对应的IndexFunc
。Indices
存储实际的索引数据。 - 每个 Index
是
Indices`的一个子集,存储了索引值到对象键的映射,以便快速查询。
NamespaceIndex常量
是一个常用的索引名称,用于根据对象的命名空间字段进行索引。
const (
NamespaceIndex string = "namespace"
)
IndexFuncToKeyFuncAdapter函数
这是一个适配器函数,它将IndexFunc
转换为KeyFunc
。这在你的索引函数为每个对象返回唯一值时非常有用。但如果找到多个键,这种转换可能会产生错误。
func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
return func(obj interface{}) (string, error) {
indexKeys, err := indexFunc(obj)
if err != nil {
return "", err
}
if len(indexKeys) > 1 {
return "", fmt.Errorf("too many keys: %v", indexKeys)
}
if len(indexKeys) == 0 {
return "", fmt.Errorf("unexpected empty indexKeys")
}
return indexKeys[0], nil
}
}
MetaNamespaceIndexFunc函数
默认的基于对象命名空间进行索引的函数。
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}
ThreadSafeStore源码
ThreadSafeStore
是一个接口,允许并发安全地对存储后端进行索引访问。它类似于Indexer
,但不(必须)知道如何从给定的对象中获取存储键,这使得ThreadSafeStore
可以与不同的存储后端进行交互。Indexer
在ThreadSafeMap
的基础上进行封装,它继承了与ThreadSafeMap
相关的操作方法并实现了IndexFunc等功能。
https://github.com/kubernetes/client-go/blob/306b201a2d292fd78483fdf6147131926ed25a78/tools/cache/thread_safe_store.go
ThreadSafeStore接口
与Indexer
类似,定义了一组用于并发安全访问存储的方法。
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error
}
cache结构体
用于实现缓存功能。它通过cacheStorage
字段来存储缓存的数据,并使用keyFunc
字段生成对应的键来进行数据的存储和检索。这是根据ThreadSafeStore
和关联的KeyFunc
实现的Indexer
。所有的对象都缓存在内存中,这不是一个外部可以调用的对象,在包的外部不能直接调用cache。
type cache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}
threadSafeMap结构体
实现了ThreadSafeStore
接口,是一个内存中的存储,其中的数据并不会写入本地磁盘中,通过使用读写锁sync.RWMutex
来保证多个goroutine可以安全地访问items字段,具备CRUD等操作。items字段是实际的Map存储数据的地方,key通过keyFunc
函数计算得到,计算默认使用MetaNamespaceKeyFunc
函数。
index是一个指向storeIndex
对象的指针,用于对Map中的数据建立索引,以便更快地访问数据。
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
index *storeIndex
}
// 用来为对象生成NamespaceKey
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
objName, err := ObjectToName(obj)
if err != nil {
return "", err
}
return objName.String(), nil
}
indexer,cache,ThreadSafeStore,store,threadSafeMap 几者的关系
- store是最基础的键值存储
- ThreadSafeStore在store基础上增加同步能力
- threadSafeMap是ThreadSafeStore的核心实现
- cache可以使用ThreadSafeStore等作为底层存储提供缓存策略
- indexer通常会使用cache或ThreadSafeStore作为存储层,提供索引查询能力
ByIndex函数
Indexer
索引的实现是通过index.ByIndex
来完成的。ByIndex
函数根据给定的索引名和索引值,从threadSafeMap
中获取所有匹配的项。
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
set, err := c.index.getKeysByIndex(indexName, indexedValue)
if err != nil {
return nil, err
}
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
- 首先获得读锁c.lock.RLock(),以便并发安全地读取数据。
- 调用内部index的getKeysByIndex方法,根据索引名称indexName和索引值indexedValue获取该索引对应的所有键key。
- 通过这些键key,遍历底层map获取对应的值,构建返回列表list。
- 最后释放读锁c.lock.RUnlock(),并返回构建的列表和错误。
indexer的执行流程如下: