簡介
上一篇文章 限流實現1 已經介紹三種限流方案
- 隨機拒流
- 計數器方式
- 基於滑動時間窗口限流
剩下的幾種本來打算能立即寫完,沒想到一下三個月過去了,很是尷尬。本次主要實現如下兩種算法
- 令牌桶算法
- 漏斗算法
算法的具體實現可以在github上查看 github.com/shidawuhen/…
下面先來介紹一下這兩種算法
令牌桶算法
算法説明
令牌桶算法(Token Bucket):是網絡流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一種算法。令牌桶算法示意圖如下所示:
一般的流程為:
a. 按特定的速率向令牌桶投放令牌
b. 根據預設的匹配規則先對報文進行分類,不符合匹配規則的報文不需要經過令牌桶的處理,直接發送;
c. 符合匹配規則的報文,則需要令牌桶進行處理。當桶中有足夠的令牌則報文可以被繼續發送下去,同時令牌桶中的令牌量按報文的長度做相應的減少;
d. 當令牌桶中的令牌不足時,報文將不能被髮送,只有等到桶中生成了新的令牌,報文才可以發送。這就可以限制報文的流量只能是小於等於令牌生成的速度,達到限制流量的目的。
大小固定的令牌桶可自行以恆定的速率源源不斷地產生令牌。如果令牌不被消耗,或者被消耗的速度小於產生的速度,令牌就會不斷地增多,直到把桶填滿。後面再產生的令牌就會從桶中溢出。最後桶中可以保存的最大令牌數永遠不會超過桶的大小。
令牌桶能允許突發,是因為令牌桶一般有兩個值,一個是桶容量,一個是單位時間投放令牌量。如果桶容量大於令牌單位時間投放量,而且單位時間消耗量比投放量少,則令牌數量最終會達到桶容量最大值。此時如果大量請求到達,會將所有令牌消耗,實現了允許突發的效果。
算法實現
該算法有幾個核心點
- 因為要更新令牌數量,所以需要加鎖
- 定時向桶中放入令牌,有兩種方式,一種是起goroutine,定時投放,另一種是在判斷是否還有足夠令牌的時候,根據投放情況進行投放。這次實現使用的是第二種方法,整個架構會更簡單一些。
package limit
import (
"github.com/gin-gonic/gin"
"net/http"
"sync"
"time"
)
// @Tags limit
// @Summary 令牌桶拒流
// @Produce json
// @Success 200 {string} string "成功會返回ok"
// @Failure 502 "失敗返回reject"
// @Router /limit/tokenreject [get]
type TokenBucket struct {
rate int64 //固定的token放入速率, r/s
capacity int64 //桶的容量
tokens int64 //桶中當前token數量
lastTokenSec int64 //桶上次放token的時間戳 s
lock sync.Mutex
}
func (l *TokenBucket) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()
now := time.Now().Unix()
l.tokens = l.tokens + (now-l.lastTokenSec)*l.rate // 先添加令牌
if l.tokens > l.capacity {
l.tokens = l.capacity
}
l.lastTokenSec = now
if l.tokens > 0 {
// 還有令牌,領取令牌
l.tokens--
return true
} else {
// 沒有令牌,則拒絕
return false
}
}
func (l *TokenBucket) Set(r, c int64) {
l.rate = r
l.capacity = c
l.tokens = 0
l.lastTokenSec = time.Now().Unix()
}
func CreateTokenBucket()*TokenBucket{
t := &TokenBucket{}
t.Set(1,5)
return t
}
var tokenBucket *TokenBucket = CreateTokenBucket()
func TokenReject(c *gin.Context) {
//fmt.Println(tokenBucket.tokens)
if !tokenBucket.Allow() {
c.String(http.StatusBadGateway, "reject")
return
}
c.String(http.StatusOK, "ok")
}
複製代碼
漏桶算法
算法説明
漏桶作為計量工具(The Leaky Bucket Algorithm as a Meter)時,可以用於流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:
- 一個固定容量的漏桶,按照常量固定速率流出水滴;
- 如果桶是空的,則不需流出水滴;
- 可以以任意速率流入水滴到漏桶;
- 如果流入水滴超出了桶的容量,則流入的水滴溢出了(被丟棄),而漏桶容量是不變的。
漏桶法限流很好理解,假設我們有一個水桶按固定的速率向下方滴落一滴水,無論有多少請求,請求的速率有多大,都按照固定的速率流出,對應到系統中就是按照固定的速率處理請求。
示意圖如下:
算法實現
查閲了一下相關資料,主要的算法實現有三種。學習完這幾種實現後,我懷疑我是不是理解錯了,感覺還沒有計數拒流方便好用。如果我理解的有誤,大家也可以吿訴我。
這三種方式為:
令牌桶算法變種
桶的大小就是單位時間內能流出的最大量,這種就不寫了。
計數拒流變種
該方法在指定時間將可用空間置為初始值。
package limit
import (
"fmt"
"github.com/gin-gonic/gin"
"net/http"
"sync"
"time"
)
type LeakyBucket struct {
// 容量
capacity int64
// 剩餘大小
remaining int64
// 下一次的重置容量時間
reset time.Time
// 重置容量時間間隔
rate time.Duration
mutex sync.Mutex
}
func (b *LeakyBucket) Allow() bool {
b.mutex.Lock()
defer b.mutex.Unlock()
if time.Now().After(b.reset) { // 需要重置
b.reset = time.Now().Add(b.rate) // 更新時間
b.remaining = b.capacity // 重置剩餘容量
}
fmt.Println(b.remaining)
if b.remaining > 0 { // 判斷是否能過
b.remaining--
return true
}
return false
}
func (b *LeakyBucket) Set(r time.Duration, c int64) {
b.rate = r
b.capacity = c
b.remaining = c
b.reset = time.Now().Add(b.rate)
}
func CreateLeakyBucket(r time.Duration,c int64) *LeakyBucket {
t := &LeakyBucket{}
t.Set(r, c)
return t
}
var leakyBucket *LeakyBucket = CreateLeakyBucket(time.Second*2,10)
func LeakyReject(c *gin.Context) {
if !leakyBucket.Allow() {
c.String(http.StatusBadGateway, "reject")
return
}
c.String(http.StatusOK, "ok")
}
複製代碼
真固定速率
這個算法的的實現,根據uber團隊開源的 github.com/uber-go/rat… 而來。
該實現保證如果有大量請求,每一個請求會按照規定的時間間隔執行。如設置1s處理100個請求,則每10ms會處理一個。
如果長時間沒有請求,仍會產生請求在短時間內被處理完畢的情況。當然對於這種情況可以很容易修復,大家可以思考一下如何修改。
package limit
import (
"fmt"
"github.com/andres-erbsen/clock"
"github.com/gin-gonic/gin"
"net/http"
"sync"
"time"
)
//真固定速率
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
type limiter struct {
sync.Mutex // 鎖
last time.Time // 上一次的時刻
sleepFor time.Duration // 需要等待的時間
perRequest time.Duration // 每次的時間間隔
maxSlack time.Duration // 最大的富餘量
clock Clock // 時鐘
}
// Take 會阻塞確保兩次請求之間的時間走完
// Take 調用平均數為 time.Second/rate.
func (t *limiter) Take() time.Time {
t.Lock()
defer t.Unlock()
now := t.clock.Now()
// 如果是第一次請求就直接放行
if t.last.IsZero() {
t.last = now
return t.last
}
// sleepFor 根據 perRequest 和上一次請求的時刻計算應該sleep的時間
// 由於每次請求間隔的時間可能會超過perRequest, 所以這個數字可能為負數,並在多個請求之間累加
t.sleepFor += t.perRequest - now.Sub(t.last)
fmt.Println(t.sleepFor)
// 我們不應該讓sleepFor負的太多,因為這意味着一個服務在短時間內慢了很多隨後會得到更高的RPS。
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
// 如果 sleepFor 是正值那麼就 sleep
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
return t.last
}
func NewLimiter(rate int) *limiter {
l := &limiter{
perRequest: time.Second / time.Duration(rate),
maxSlack: -10 * time.Second / time.Duration(rate),
}
if l.clock == nil {
l.clock = clock.New()
}
return l
}
var rl = NewLimiter(100) // per second,每秒100個請求
func LeakyRejectFixedRate(c *gin.Context) {
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}
c.String(http.StatusOK, "ok")
}
複製代碼
演示結果如下:
總結
學習了令牌桶和漏桶算法,結合這幾年工作中的具體場景,感覺令牌桶算法的實用價值更大一些。
下面是令牌桶和漏桶對比:
- 令牌桶是按照固定速率往桶中添加令牌,請求是否被處理需要看桶中令牌是否足夠,當令牌數減為零時則拒絕新的請求;
- 漏桶則是按照常量固定速率流出請求,流入請求速率任意,當流入的請求數累積到漏桶容量時,則新流入的請求被拒絕;
- 令牌桶限制的是平均流入速率(允許突發請求,只要有令牌就可以處理,支持一次拿3個令牌,4個令牌),並允許一定程度突發流量;
- 漏桶限制的是常量流出速率(即流出速率是一個固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2),從而平滑突發流入速率;
- 令牌桶允許一定程度的突發,而漏桶主要目的是平滑流入速率;
- 兩個算法實現可以一樣,但是方向是相反的,對於相同的參數得到的限流效果是一樣的。
最後,給大家展示一下各種限流算法的比較
資料
最後
大家如果喜歡我的文章,可以關注我的公眾號(程序員麻辣燙)
我的個人博客為:http://shidawuhen.github.io/
往期文章回顧:
技術
- 秒殺系統
- 分佈式系統與一致性協議
- 微服務之服務框架和註冊中心
- Beego框架使用
- 淺談微服務
- TCP性能優化
- 限流實現1
- Redis實現分佈式鎖
- Golang源碼BUG追查
- 事務原子性、一致性、持久性的實現原理
- CDN請求過程詳解
- 常用緩存技巧
- 如何高效對接第三方支付
- Gin框架簡潔版
- InnoDB鎖與事務簡析
- 算法總結
讀書筆記
思考