浅析Kubernetes中client-go informer
本篇文章对Kubernetes中client-go informer进行分析。
Controller
在client-go informer架构中存在一个
controller
,这个不是 Kubernetes 中的Controller组件;而是在
tools/cache
中的一个概念,
controller
位于 informer 之下,Reflector 之上。
Config
从严格意义上来讲,
controller
是作为一个
sharedInformer
使用,通过接受一个
Config
,而
Reflector
则作为
controller
的 slot。
Config
则包含了这个
controller
里所有的设置。
type Config struct {
Queue // DeltaFIFO
ListerWatcher // 用于list watch的
Process ProcessFunc // 定义如何从DeltaFIFO中弹出数据后处理的操作
ObjectType runtime.Object // Controller处理的对象数据,实际上就是kubernetes中的资源
FullResyncPeriod time.Duration // 全量同步的周期
ShouldResync ShouldResyncFunc // Reflector通过该标记来确定是否应该重新同步
RetryOnError bool
}
controller
然后
controller
又为
reflertor
的上层。
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
type Controller interface {
// controller 主要做两件事,
// 1. 构建并运行 Reflector,将listerwacther中的泵压到queue(Delta fifo)中
// 2. Queue用Pop()弹出数据,具体的操作是Process
// 直到 stopCh 不阻塞,这两个协程将退出
Run(stopCh <-chan struct{})
HasSynced() bool // 这个实际上是从store中继承的,标记这个controller已经
LastSyncResourceVersion() string
}
controller
中的方法,仅有一个
Run()
和
New( )
;这意味着,
controller
只是一个抽象的概念,作为
Reflector
,
Delta FIFO
整合的工作流。
而
controller
则是
SharedInformer
了。
Queue
这里的
queue
可以理解为是一个具有
Pop()
功能的
Indexer
;而
Pop()
的功能则是
controller
中的一部分;也就是说
queue
是一个扩展的
Store
,
Store
是不具备弹出功能的。
type Queue interface { Store // Pop会阻塞等待,直到有内容弹出,删除对应的值并处理计数器 Pop(PopProcessFunc) (interface{}, error) // AddIfNotPresent puts the given accumulator into the Queue (in // association with the accumulator's key) if and only if that key // is not already associated with a non-empty accumulator. AddIfNotPresent(interface{}) error // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace // operation if that happened before any Add, Update, or Delete; // otherwise the first batch is empty. HasSynced() bool Close() // 关闭queue }
而弹出的操作是通过 controller 中的
processLoop( )
进行的,最终走到Delta FIFO中进行处理。
通过忙等待去读取要弹出的数据,然后在弹出前 通过
PopProcessFunc
进行处理。
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
DeltaFIFO.Pop( )
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item) // 进行处理
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item) // 如果失败,再重新加入到队列中
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
Informer
通过对
Reflector
,
Store
,
Queue
,
ListerWatcher
、
ProcessFunc
, 等的概念,发现由
controller
所包装的起的功能并不能完成通过对API的动作监听,并通过动作来处理本地缓存的一个能力;这个情况下诞生了
informer
严格意义上来讲是 sharedInformer。
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return New(cfg)
}
newInformer是位于
tools/cache/controller.go 下,可以看出,这里面并没有informer的概念,这里通过注释可以看到,newInformer实际上是一个提供了存储和事件通知的informer。他关联的
queue
则是
Delta FIFO
,并包含了
ProcessFunc
,
Store
等 controller的概念。最终对外的方法为
NewInformer( )。
func NewInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, Controller) {
// This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
SharedIndexInformer
shareInformer
为客户端提供了与apiserver一致的数据对象本地缓存,并支持多事件处理程序的 informer
,而
shareIndexInformer
则是对
shareInformer
的扩展。
type SharedInformer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. AddEventHandler(handler ResourceEventHandler) // AddEventHandlerWithResyncPeriod adds an event handler to the // shared informer with the requested resync period; zero means // this handler does not care about resyncs. The resync operation // consists of delivering to the handler an update notification // for every object in the informer's local cache; it does not add // any interactions with the authoritative storage. Some // informers do no resyncs at all, not even for handlers added // with a non-zero resyncPeriod. For an informer that does // resyncs, and for each handler that requests resyncs, that // informer develops a nominal resync period that is no shorter // than the requested period but may be longer. The actual time // between any two resyncs may be longer than the nominal period // because the implementation takes time to do work and there may // be competing load and scheduling noise. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) // GetStore returns the informer's local cache as a Store. GetStore() Store // GetController is deprecated, it does nothing useful GetController() Controller // Run starts and runs the shared informer, returning after it stops. // The informer will be stopped when stopCh is closed. Run(stopCh <-chan struct{}) // HasSynced returns true if the shared informer's store has been // informed by at least one full LIST of the authoritative state // of the informer's object collection. This is unrelated to "resync". HasSynced() bool // LastSyncResourceVersion is the resource version observed when last synced with the underlying // store. The value returned is not synchronized with access to the underlying store and is not // thread-safe. LastSyncResourceVersion() string}
SharedIndexInformer
是对SharedInformer的实现,可以从结构中看出,
SharedIndexInformer
大致具有如下功能:
· 索引本地缓存
· controller,通过list watch拉取API并推入
Deltal FIFO
· 事件的处理
type sharedIndexInformer struct {
indexer Indexer // 具有索引的本地缓存
controller Controller // controller
processor *sharedProcessor // 事件处理函数集合
cacheMutationDetector MutationDetector
listerWatcher ListerWatcher
objectType runtime.Object
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex
}
而在 tools/cache/share_informer.go 可以看到 shareIndexInformer 的运行过程。
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas, // process 弹出时操作的流程
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run) // 启动事件处理函数
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh) // 启动controller,controller会启动Reflector和fifo的Pop()
}
而在操作Delt a FIFO中可以看到,做具体操作时,会将动作分发至对应的事件处理函数中,这个是informer初始化时对事件操作的函数。
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 事件的分发
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 事件的分发
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
事件处理函数 processor
启动informer时也会启动注册进来的事件处理函数;
processor
就是这个事件处理函数。
run( )
函数会启动两个 listener,j监听事件处理业务函数
listener.run
和 事件的处理。
wg.StartWithChannel(processorStopCh, s.processor.run)
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
可以看出,就是拿到的事件,根据注册的到informer的事件函数进行处理。
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh { // 消费事件
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
informer中的事件的设计
了解了informer如何处理事件,就需要学习下,informer的事件系统设计
prossorListener
事件的添加
当在handleDelta时,会分发具体的事件
// 事件的分发
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
此时,事件泵
Pop( )
会根据接收到的事件进行处理。
// run() 时会启动一个事件泵
p.wg.Start(listener.pop)
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh)
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification: // 这里实际上是一个阻塞的等待
// 单向channel 可能不会走到这步骤
var ok bool
// deltahandle 中 distribute 会将事件添加到addCh待处理事件中
// 处理完事件会再次拿到一个事件
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
// 处理 分发过来的事件 addCh
case notificationToAdd, ok := <-p.addCh: // distribute分发的事件
if !ok {
return
}
// 这里代表第一次,没有任何事件时,或者上面步骤完成读取
if notification == nil { // 就会走这里
notification = notificationToAdd
nextCh = p.nextCh
} else {
// notification否则代表没有处理完,将数据再次添加到待处理中
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
该消息 事件的流程图为:
通过一个简单实例来学习client-go中的消息通知机制:
package main
import (
"fmt"
"time"
"k8s.io/utils/buffer"
)
var nextCh1 = make(chan interface{})
var addCh = make(chan interface{})
var stopper = make(chan struct{})
var notification interface{}
var pendding = *buffer.NewRingGrowing(2)
func main() {
// pop
go func() {
var nextCh chan<- interface{}
var notification interface{}
//var n int
for {
fmt.Println("busy wait")
fmt.Println("entry select", notification)
select {
// 初始时,一个未初始化的channel,nil,形成一个阻塞(单channel下是死锁)
case nextCh <- notification:
fmt.Println("entry nextCh", notification)
var ok bool
// 读不到数据代表已处理完,置空锁
notification, ok = pendding.ReadOne()
if !ok {
fmt.Println("unactive nextch")
nextCh = nil
}
// 事件的分发,监听,初始时也是一个阻塞
case notificationToAdd, ok := <-addCh:
fmt.Println(notificationToAdd, notification)
if !ok {
return
}
// 线程安全
// 当消息为空时,没有被处理
// 锁为空,就分发数据
if notification == nil {
fmt.Println("frist notification nil")
notification = notificationToAdd
nextCh = nextCh1 // 这步骤等于初始化了局部的nextCh,会触发上面的流程
} else {
// 在第三次时,会走到这里,数据进入环
fmt.Println("into ring", notificationToAdd)
pendding.WriteOne(notificationToAdd)
}
}
}
}()
// producer
go func() {
i := 0
for {
i++
if i%5 == 0 {
addCh <- fmt.Sprintf("thread 2 inner -- %d", i)
time.Sleep(time.Millisecond * 9000)
} else {
addCh <- fmt.Sprintf("thread 2 outer -- %d", i)
time.Sleep(time.Millisecond * 500)
}
}
}()
// subsriber
go func() {
for {
for next := range nextCh1 {
time.Sleep(time.Millisecond * 300)
fmt.Println("consumer", next)
}
}
}()
<-stopper
}
总结:
这里的机制类
似于线程安全,进入临界区的一些算法,临界区就是 nextCh
, notification
就是保证了至少有一个进程可以进入临界区(要么分发事件,要么生产事件); nextCh
和 nextCh1
一个是局部管道一个是全局的,管道未初始化代表了死锁(阻塞);当有消息要处理时,会将局部管道 nextCh
赋值给 全局 nextCh1
此时相当于解除了分发的步骤(对管道赋值,触发分发操作); ringbuffer
实际上是提供了一个对 notification
加锁的操作,在没有处理的消息时,需要保障 notification
为空,同时也关闭了流程 nextCh
的写入。这里主要是考虑对golang中channel的用法。
原文链接:
http://www.cnblogs.com/Cylon/p/16311233.html
扫描二维码 | 加入 云原生 技术交流群
- 浅析Kubernetes中client-go informer
- 探讨 K8s 的守护进程集DaemonSet
- 一文了解Istio外部授权
- 如何通过华为云云原生服务中心极速搭建全量KubeEdge生态组件
- 创原会 · 云原生先行报(4.18~4.24)
- 中国移动磐舟磐基平台基于KubeEdge的落地实践
- 创原会 · 云原生先行报(3.28~4.10)
- 畅聊云原生 | KubeEdge上天遁地,推动产、学、研融合创新
- 云原生边缘计算KubeEdge在智慧停车中的实践
- Karmada 有趣的玩法:自定义准入控制器
- KubeEdge-Sedna边云协同终身学习:迈向次时代AI范式
- 创原会 · 云原生先行报(3.21~3.27)
- Karmada 有趣的玩法:多集群监控
- 云原生如何让车学会“社交”技能?
- 跨越数字化鸿沟,看云原生2.0十八般武艺
- Karmada 有趣的玩法:自定义控制器
- 告别KubeConfig切换,Karmada开启 “多云一卡通”
- VIPKID基于Karmada的容器PaaS平台落地实践
- 边缘计算场景下Service Mesh的延伸和扩展
- 云原生多云应用利器 -- Karmada 控制器