GoLang定時器實現原理

語言: CN / TW / HK

簡介

工作中經常有定時執行某些代碼塊的需求,如果是PHP代碼,一般寫個腳本,然後用Cron實現。

Go裏提供了兩種定時器:Timer(到達指定時間觸發且只觸發一次)和 Ticker(間隔特定時間觸發)。

Timer和Ticker的實現幾乎一樣,Ticker相對複雜一些,這裏主要講述一下Ticker是如何實現的。

讓我們先來看一下如何使用Ticker

//創建Ticker,設置多長時間觸發一次
ticker := time.NewTicker(time.Second * 10)
go func() {
   for range ticker.C { //遍歷ticker.C,如果有值,則會執行do someting,否則阻塞
      //do someting
   }
}()
複製代碼

代碼很簡潔,給開發者提供了巨大的便利。那GoLang是如何實現這個功能的呢?

原理

NewTicker

time/tick.go的NewTicker函數:

調用NewTicker可以生成Ticker,關於這個函數有四點需要説明

  1. NewTicker主要作用之一是初始化
  2. NewTicker中的時間是以納秒為單位的,when返回的從當前時間+d的納秒值,d必須為正值
  3. Ticker結構體中包含channel,sendTime是個function,邏輯為用select等待c被賦值
  4. 神祕的startTimer函數,揭示channel、sendTime是如何關聯的
// NewTicker returns a new Ticker containing a channel that will send the
// time with a period specified by the duration argument.
// It adjusts the intervals or drops ticks to make up for slow receivers.
// The duration d must be greater than zero; if not, NewTicker will panic.
// Stop the ticker to release associated resources.
func NewTicker(d Duration) *Ticker {
   if d <= 0 {
      panic(errors.New("non-positive interval for NewTicker"))
   }
   // Give the channel a 1-element time buffer.
   // If the client falls behind while reading, we drop ticks
   // on the floor until the client catches up.
   c := make(chan Time, 1)
   t := &Ticker{
      C: c,
      r: runtimeTimer{
         when:   when(d),
         period: int64(d),
         f:      sendTime,
         arg:    c,
      },
   }
   startTimer(&t.r)
   return t
}
複製代碼

time/tick.go的Ticker數據結構

// A Ticker holds a channel that delivers `ticks' of a clock
// at intervals.
type Ticker struct {
   C <-chan Time // The channel on which the ticks are delivered.
   r runtimeTimer
}
複製代碼

time/sleep.go的runtimeTimer

// Interface to timers implemented in package runtime.
// Must be in sync with ../runtime/time.go:/^type timer
type runtimeTimer struct {
   tb uintptr
   i  int

   when   int64
   period int64
   f      func(interface{}, uintptr) // NOTE: must not be closure
   arg    interface{}
   seq    uintptr
}
複製代碼

time/sleep.go的sendTime

func sendTime(c interface{}, seq uintptr) {
   // Non-blocking send of time on c.
   // Used in NewTimer, it cannot block anyway (buffer).
   // Used in NewTicker, dropping sends on the floor is
   // the desired behavior when the reader gets behind,
   // because the sends are periodic.
   select {
   case c.(chan Time) <- Now():
   default:
   }
}
複製代碼

time/sleep.go的startTimer

func startTimer(*runtimeTimer)
func stopTimer(*runtimeTimer) bool
複製代碼

startTimer

看完上面的代碼,大家內心是不是能夠猜出是怎麼實現的?

有一個機制保證時間到了時,sendTime被調用,此時channel會被賦值,調用ticker.C的位置解除阻塞,執行指定的邏輯。

讓我們看一下GoLang是不是這樣實現的。

追蹤代碼的時候我們發現在time包裏的startTimer,只是一個聲明,那真正的實現在哪裏?

runtime/time.go的startTimer

此處使用go的隱藏技能go:linkname引導編譯器將當前(私有)方法或者變量在編譯時鏈接到指定的位置的方法或者變量。另外timer和runtimeTimer的結構是一致的,所以程序運行正常。

//startTimer將new的timer對象加入timer的堆數據結構中
//startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
   if raceenabled {
      racerelease(unsafe.Pointer(t))
   }
   addtimer(t)
}
複製代碼

runtime/time.go的addtimer

func addtimer(t *timer) {
   tb := t.assignBucket()
   lock(&tb.lock)
   ok := tb.addtimerLocked(t)
   unlock(&tb.lock)
   if !ok {
      badTimer()
   }
}
複製代碼

runtime/time.go的addtimerLocked

// Add a timer to the heap and start or kick timerproc if the new timer is
// earlier than any of the others.
// Timers are locked.
// Returns whether all is well: false if the data structure is corrupt
// due to user-level races.
func (tb *timersBucket) addtimerLocked(t *timer) bool {
   // when must never be negative; otherwise timerproc will overflow
   // during its delta calculation and never expire other runtime timers.
   if t.when < 0 {
      t.when = 1<<63 - 1
   }
   t.i = len(tb.t)
   tb.t = append(tb.t, t)
   if !siftupTimer(tb.t, t.i) {
      return false
   }
   if t.i == 0 {
      // siftup moved to top: new earliest deadline.
      if tb.sleeping && tb.sleepUntil > t.when {
         tb.sleeping = false
         notewakeup(&tb.waitnote)
      }
      if tb.rescheduling {
         tb.rescheduling = false
         goready(tb.gp, 0)
      }
      if !tb.created {
         tb.created = true
         go timerproc(tb)
      }
   }
   return true
}
複製代碼

runtime/time.go的timerproc

func timerproc(tb *timersBucket) {
    tb.gp = getg()
    for {
        lock(&tb.lock)
        tb.sleeping = false
        now := nanotime()
        delta := int64(-1)
        for {
            if len(tb.t) == 0 { //無timer的情況
                delta = -1
                break
            }
            t := tb.t[0] //拿到堆頂的timer
            delta = t.when - now
            if delta > 0 { // 所有timer的時間都沒有到期
                break
            }
            if t.period > 0 { // t[0] 是ticker類型,調整其到期時間並調整timer堆結構
                // leave in heap but adjust next time to fire
                t.when += t.period * (1 + -delta/t.period)
                siftdownTimer(tb.t, 0)
            } else {
                //Timer類型的定時器是單次的,所以這裏需要將其從堆裏面刪除
                // remove from heap
                last := len(tb.t) - 1
                if last > 0 {
                    tb.t[0] = tb.t[last]
                    tb.t[0].i = 0
                }
                tb.t[last] = nil
                tb.t = tb.t[:last]
                if last > 0 {
                    siftdownTimer(tb.t, 0)
                }
                t.i = -1 // mark as removed
            }
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&tb.lock)
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            f(arg, seq) //sendTimer被調用的位置 ---------------------------------------
            lock(&tb.lock)
        }
        if delta < 0 || faketime > 0 {
            // No timers left - put goroutine to sleep.
            tb.rescheduling = true
            goparkunlock(&tb.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
            continue
        }
        // At least one timer pending. Sleep until then.
        tb.sleeping = true
        tb.sleepUntil = now + delta
        noteclear(&tb.waitnote)
        unlock(&tb.lock)
        notetsleepg(&tb.waitnote, delta)
    }
}
複製代碼

追蹤了一圈,最終追蹤到timerproc,發現了sendTimer被調用位置f(arg, seq) ,而且可以看到將channel c傳到了sendTimer中。

上面的這堆代碼邏輯是什麼意思呢?

  1. 所有timer統一使用一個最小堆結構去維護,按照timer的when(到期時間)比較大小;
  2. for循環過程中,如果delta = t.when - now的時間大於0,則break,直到有到時間的timer才進行操作;
  3. timer處理線程從堆頂開始處理每個timer,對於到期的timer,如果其period>0,則表明該timer 屬於Ticker類型,調整其下次到期時間並調整其在堆中的位置,否則從堆中移除該timer;
  4. 調用該timer的處理函數以及其他相關工作;

總結

讀完這篇文章,有沒有奇怪的知識又增加了一些的感覺。寫這些源碼的大神們,對Go的理解很深刻,編碼的功力也很深厚。

本質上GoLang用channel和堆實現了定時器功能,讓我們來mock一下,偽代碼如下:

func cronMock() {
   for {
      //從堆中獲取時間最近的定時器
      t := getNearestTime()
      //如果時間還沒到,則continue
      t.delta > 0 {
         continue
      }else{
         //時間到了,將當前的定時器再加一個鐘
         t.when += t.duration
         //將堆重新排序
         siftdownTimer()
         //執行當前定時器指定的函數,即sendTimer
         t.sendTimer()
      }
   }
}
複製代碼

最後

大家如果喜歡我的文章,可以關注我的公眾號(程序員麻辣燙)

我的個人博客為:http://shidawuhen.github.io/

往期文章回顧:

技術

  1. HTTPS連接過程
  2. 限流實現2
  3. 秒殺系統
  4. 分佈式系統與一致性協議
  5. 微服務之服務框架和註冊中心
  6. Beego框架使用
  7. 淺談微服務
  8. TCP性能優化
  9. 限流實現1
  10. Redis實現分佈式鎖
  11. Golang源碼BUG追查
  12. 事務原子性、一致性、持久性的實現原理
  13. CDN請求過程詳解
  14. 常用緩存技巧
  15. 如何高效對接第三方支付
  16. Gin框架簡潔版
  17. InnoDB鎖與事務簡析
  18. 算法總結

讀書筆記

  1. 敏捷革命
  2. 如何鍛鍊自己的記憶力
  3. 簡單的邏輯學-讀後感
  4. 熱風-讀後感
  5. 論語-讀後感
  6. 孫子兵法-讀後感

思考

  1. 項目流程管理
  2. 對項目管理的一些看法
  3. 對產品經理的一些思考
  4. 關於程序員職業發展的思考
  5. 關於代碼review的思考
  6. Markdown編輯器推薦-typora