GoLang定時器實現原理

語言: CN / TW / HK

簡介

工作中經常有定時執行某些程式碼塊的需求,如果是PHP程式碼,一般寫個指令碼,然後用Cron實現。

Go裡提供了兩種定時器:Timer(到達指定時間觸發且只觸發一次)和 Ticker(間隔特定時間觸發)。

Timer和Ticker的實現幾乎一樣,Ticker相對複雜一些,這裡主要講述一下Ticker是如何實現的。

讓我們先來看一下如何使用Ticker

//建立Ticker,設定多長時間觸發一次
ticker := time.NewTicker(time.Second * 10)
go func() {
   for range ticker.C { //遍歷ticker.C,如果有值,則會執行do someting,否則阻塞
      //do someting
   }
}()
複製程式碼

程式碼很簡潔,給開發者提供了巨大的便利。那GoLang是如何實現這個功能的呢?

原理

NewTicker

time/tick.go的NewTicker函式:

呼叫NewTicker可以生成Ticker,關於這個函式有四點需要說明

  1. NewTicker主要作用之一是初始化
  2. NewTicker中的時間是以納秒為單位的,when返回的從當前時間+d的納秒值,d必須為正值
  3. Ticker結構體中包含channel,sendTime是個function,邏輯為用select等待c被賦值
  4. 神祕的startTimer函式,揭示channel、sendTime是如何關聯的
// NewTicker returns a new Ticker containing a channel that will send the
// time with a period specified by the duration argument.
// It adjusts the intervals or drops ticks to make up for slow receivers.
// The duration d must be greater than zero; if not, NewTicker will panic.
// Stop the ticker to release associated resources.
func NewTicker(d Duration) *Ticker {
   if d <= 0 {
      panic(errors.New("non-positive interval for NewTicker"))
   }
   // Give the channel a 1-element time buffer.
   // If the client falls behind while reading, we drop ticks
   // on the floor until the client catches up.
   c := make(chan Time, 1)
   t := &Ticker{
      C: c,
      r: runtimeTimer{
         when:   when(d),
         period: int64(d),
         f:      sendTime,
         arg:    c,
      },
   }
   startTimer(&t.r)
   return t
}
複製程式碼

time/tick.go的Ticker資料結構

// A Ticker holds a channel that delivers `ticks' of a clock
// at intervals.
type Ticker struct {
   C <-chan Time // The channel on which the ticks are delivered.
   r runtimeTimer
}
複製程式碼

time/sleep.go的runtimeTimer

// Interface to timers implemented in package runtime.
// Must be in sync with ../runtime/time.go:/^type timer
type runtimeTimer struct {
   tb uintptr
   i  int

   when   int64
   period int64
   f      func(interface{}, uintptr) // NOTE: must not be closure
   arg    interface{}
   seq    uintptr
}
複製程式碼

time/sleep.go的sendTime

func sendTime(c interface{}, seq uintptr) {
   // Non-blocking send of time on c.
   // Used in NewTimer, it cannot block anyway (buffer).
   // Used in NewTicker, dropping sends on the floor is
   // the desired behavior when the reader gets behind,
   // because the sends are periodic.
   select {
   case c.(chan Time) <- Now():
   default:
   }
}
複製程式碼

time/sleep.go的startTimer

func startTimer(*runtimeTimer)
func stopTimer(*runtimeTimer) bool
複製程式碼

startTimer

看完上面的程式碼,大家內心是不是能夠猜出是怎麼實現的?

有一個機制保證時間到了時,sendTime被呼叫,此時channel會被賦值,呼叫ticker.C的位置解除阻塞,執行指定的邏輯。

讓我們看一下GoLang是不是這樣實現的。

追蹤程式碼的時候我們發現在time包裡的startTimer,只是一個宣告,那真正的實現在哪裡?

runtime/time.go的startTimer

此處使用go的隱藏技能go:linkname引導編譯器將當前(私有)方法或者變數在編譯時連結到指定的位置的方法或者變數。另外timer和runtimeTimer的結構是一致的,所以程式執行正常。

//startTimer將new的timer物件加入timer的堆資料結構中
//startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
   if raceenabled {
      racerelease(unsafe.Pointer(t))
   }
   addtimer(t)
}
複製程式碼

runtime/time.go的addtimer

func addtimer(t *timer) {
   tb := t.assignBucket()
   lock(&tb.lock)
   ok := tb.addtimerLocked(t)
   unlock(&tb.lock)
   if !ok {
      badTimer()
   }
}
複製程式碼

runtime/time.go的addtimerLocked

// Add a timer to the heap and start or kick timerproc if the new timer is
// earlier than any of the others.
// Timers are locked.
// Returns whether all is well: false if the data structure is corrupt
// due to user-level races.
func (tb *timersBucket) addtimerLocked(t *timer) bool {
   // when must never be negative; otherwise timerproc will overflow
   // during its delta calculation and never expire other runtime timers.
   if t.when < 0 {
      t.when = 1<<63 - 1
   }
   t.i = len(tb.t)
   tb.t = append(tb.t, t)
   if !siftupTimer(tb.t, t.i) {
      return false
   }
   if t.i == 0 {
      // siftup moved to top: new earliest deadline.
      if tb.sleeping && tb.sleepUntil > t.when {
         tb.sleeping = false
         notewakeup(&tb.waitnote)
      }
      if tb.rescheduling {
         tb.rescheduling = false
         goready(tb.gp, 0)
      }
      if !tb.created {
         tb.created = true
         go timerproc(tb)
      }
   }
   return true
}
複製程式碼

runtime/time.go的timerproc

func timerproc(tb *timersBucket) {
    tb.gp = getg()
    for {
        lock(&tb.lock)
        tb.sleeping = false
        now := nanotime()
        delta := int64(-1)
        for {
            if len(tb.t) == 0 { //無timer的情況
                delta = -1
                break
            }
            t := tb.t[0] //拿到堆頂的timer
            delta = t.when - now
            if delta > 0 { // 所有timer的時間都沒有到期
                break
            }
            if t.period > 0 { // t[0] 是ticker型別,調整其到期時間並調整timer堆結構
                // leave in heap but adjust next time to fire
                t.when += t.period * (1 + -delta/t.period)
                siftdownTimer(tb.t, 0)
            } else {
                //Timer型別的定時器是單次的,所以這裡需要將其從堆裡面刪除
                // remove from heap
                last := len(tb.t) - 1
                if last > 0 {
                    tb.t[0] = tb.t[last]
                    tb.t[0].i = 0
                }
                tb.t[last] = nil
                tb.t = tb.t[:last]
                if last > 0 {
                    siftdownTimer(tb.t, 0)
                }
                t.i = -1 // mark as removed
            }
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&tb.lock)
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            f(arg, seq) //sendTimer被呼叫的位置 ---------------------------------------
            lock(&tb.lock)
        }
        if delta < 0 || faketime > 0 {
            // No timers left - put goroutine to sleep.
            tb.rescheduling = true
            goparkunlock(&tb.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
            continue
        }
        // At least one timer pending. Sleep until then.
        tb.sleeping = true
        tb.sleepUntil = now + delta
        noteclear(&tb.waitnote)
        unlock(&tb.lock)
        notetsleepg(&tb.waitnote, delta)
    }
}
複製程式碼

追蹤了一圈,最終追蹤到timerproc,發現了sendTimer被呼叫位置f(arg, seq) ,而且可以看到將channel c傳到了sendTimer中。

上面的這堆程式碼邏輯是什麼意思呢?

  1. 所有timer統一使用一個最小堆結構去維護,按照timer的when(到期時間)比較大小;
  2. for迴圈過程中,如果delta = t.when - now的時間大於0,則break,直到有到時間的timer才進行操作;
  3. timer處理執行緒從堆頂開始處理每個timer,對於到期的timer,如果其period>0,則表明該timer 屬於Ticker型別,調整其下次到期時間並調整其在堆中的位置,否則從堆中移除該timer;
  4. 呼叫該timer的處理函式以及其他相關工作;

總結

讀完這篇文章,有沒有奇怪的知識又增加了一些的感覺。寫這些原始碼的大神們,對Go的理解很深刻,編碼的功力也很深厚。

本質上GoLang用channel和堆實現了定時器功能,讓我們來mock一下,虛擬碼如下:

func cronMock() {
   for {
      //從堆中獲取時間最近的定時器
      t := getNearestTime()
      //如果時間還沒到,則continue
      t.delta > 0 {
         continue
      }else{
         //時間到了,將當前的定時器再加一個鐘
         t.when += t.duration
         //將堆重新排序
         siftdownTimer()
         //執行當前定時器指定的函式,即sendTimer
         t.sendTimer()
      }
   }
}
複製程式碼

最後

大家如果喜歡我的文章,可以關注我的公眾號(程式設計師麻辣燙)

我的個人部落格為:http://shidawuhen.github.io/

往期文章回顧:

技術

  1. HTTPS連線過程
  2. 限流實現2
  3. 秒殺系統
  4. 分散式系統與一致性協議
  5. 微服務之服務框架和註冊中心
  6. Beego框架使用
  7. 淺談微服務
  8. TCP效能優化
  9. 限流實現1
  10. Redis實現分散式鎖
  11. Golang原始碼BUG追查
  12. 事務原子性、一致性、永續性的實現原理
  13. CDN請求過程詳解
  14. 常用快取技巧
  15. 如何高效對接第三方支付
  16. Gin框架簡潔版
  17. InnoDB鎖與事務簡析
  18. 演算法總結

讀書筆記

  1. 敏捷革命
  2. 如何鍛鍊自己的記憶力
  3. 簡單的邏輯學-讀後感
  4. 熱風-讀後感
  5. 論語-讀後感
  6. 孫子兵法-讀後感

思考

  1. 專案流程管理
  2. 對專案管理的一些看法
  3. 對產品經理的一些思考
  4. 關於程式設計師職業發展的思考
  5. 關於程式碼review的思考
  6. Markdown編輯器推薦-typora