• 首页
  • 产品中心
  • 企业荣誉
  • 业界新闻
  • 汽车服务
  • 企业荣誉

    你的位置:【开元棋盘地址官方客服】 > 企业荣誉 > Go 漫衍式令牌桶限流 + 兜底保障

    Go 漫衍式令牌桶限流 + 兜底保障

    发布日期:2022-08-07 08:58    点击次数:61

     

    本文转载自微信群众号「微服求实践」,作者欧阳安。转载本文请联络微服求实践群众号。

    上篇文章提到安稳时光窗口限流没法处理惩罚倏忽要求洪峰情形,本文奉告的令牌桶线路算法例可以或许相比好的处理惩罚此场景。

    事变道理

    单位时光根据必定速率匀速的临蓐 token 放入桶内,直抵达到桶容量上限。

    处理惩罚要求,每次查验测验取得一个或多个令牌,假定拿到则处理惩罚要求,失利则推卸要求。

    优弱点

    所长

    可以或许有用场理惩罚刹那的突发流量,桶内存量 token 即可作为流量缓冲区滑腻处理惩罚突发流量。

    弱点

    完成较为宏壮。

    代码完成
    core/limit/tokenlimit.go 

    漫衍式情形下推敲运用 redis 作为桶和令牌的存储容器,给与 lua 脚本完成全副算法流程。

    redis lua 脚本

    -- 每秒生成token数量即token生成速率 local rate = tonumber(ARGV[1]) -- 桶容量 local capacity = tonumber(ARGV[2]) -- 今后时光戳 local now = tonumber(ARGV[3]) -- 今后要求token数量 local requested = tonumber(ARGV[4]) -- 需求几多秒材干填满桶 local fill_time = capacity/rate -- 向下取整,ttl为填满时光的2倍 local ttl = math.floor(fill_time*2) -- 今后时光桶容量 local last_tokens = tonumber(redis.call("get", KEYS[1])) -- 假定今后桶容量为0,分化是第一次进入,则默认容量为桶的最大容量 if last_tokens == nil then last_tokens = capacity end -- 上一次刷新的时光 local last_refreshed = tonumber(redis.call("get", KEYS[2])) -- 第一次进入则配置刷新时光为0 if last_refreshed == nil then last_refreshed = 0 end -- 距离前主要求的时光跨度 local delta = math.max(0, now-last_refreshed) -- 距离前主要求的时光跨度,总共能临蓐token的数量,假定超多最大容量则扬弃多余的token local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) -- 本主要求token数量是否足够 local allowed = filled_tokens >= requested -- 桶残剩数量 local new_tokens = filled_tokens -- 准许本次token请求,计算残剩数量 if allowed then new_tokens = filled_tokens - requested end -- 配置残剩token数量 redis.call("setex", KEYS[1], ttl, new_tokens) -- 配置刷新时光 redis.call("setex", KEYS[2], ttl, now)  return allowed 

    令牌桶限流器定义

    type TokenLimiter struct {     // 每秒临蓐速率     rate int     // 桶容量     burst int     // 存储容器     store *redis.Redis     // redis key     tokenKey       string     // 桶刷新时光key     timestampKey   string     // lock     rescueLock     sync.Mutex     // redis健康标识     redisAlive     uint32     // redis体系毛病时给与过程内 令牌桶限流器     rescueLimiter  *xrate.Limiter     // redis监控探测使命标识     monitorStarted bool }  func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {     tokenKey := fmt.Sprintf(tokenFormat, key)     timestampKey := fmt.Sprintf(timestampFormat, key)      return &TokenLimiter{         rate:          rate,         burst:         burst,         store:         store,         tokenKey:      tokenKey,企业荣誉         timestampKey:  timestampKey,         redisAlive:    1,         rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),     } } 

    取得令牌

    func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {     // 鉴定redis是否健康     // redis体系毛病时给与过程内限流器     // 兜底保障     if atomic.LoadUint32(&lim.redisAlive) == 0 {         return lim.rescueLimiter.AllowN(now, n)     }     // 执行脚本取得令牌     resp, err := lim.store.Eval(         script,         []string{             lim.tokenKey,             lim.timestampKey,         },         []string{             strconv.Itoa(lim.rate),             strconv.Itoa(lim.burst),             strconv.FormatInt(now.Unix(), 10),             strconv.Itoa(n),         })     // redis allowed == false     // Lua boolean false -> r Nil bulk reply     // 不凡处理惩罚key不存在的情形     if err == redis.Nil {         return false     } else if err != nil {         logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)         // 执行很是,开启redis健康探测使命         // 同时给与过程内限流器作为兜底         lim.startMonitor()         return lim.rescueLimiter.AllowN(now, n)     }      code, ok := resp.(int64)     if !ok {         logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)         lim.startMonitor()         return lim.rescueLimiter.AllowN(now, n)     }      // redis allowed == true     // Lua boolean true -> r integer reply with value of 1     return code == 1 } 

    redis 体系毛病时兜底计策

    兜底计策的策画推敲得极度细节,当 redis 不成用的时光,启动单机版的 ratelimit 做备用限流,确保根抵的限流可用,服务不会被冲垮。

    // 开启redis健康探测 func (lim *TokenLimiter) startMonitor() {     lim.rescueLock.Lock()     defer lim.rescueLock.Unlock()     // 预防重复开启     if lim.monitorStarted {         return     }      // 配置使命和健康标识     lim.monitorStarted = true     atomic.StoreUint32(&lim.redisAlive, 0)     // 健康探测     go lim.waitForRedis() }  // redis健康探测守时使命 func (lim *TokenLimiter) waitForRedis() {     ticker := time.NewTicker(pingInterval)     // 健康探测告成时回调此函数     defer func() {         ticker.Stop()         lim.rescueLock.Lock()         lim.monitorStarted = false         lim.rescueLock.Unlock()     }()      for range ticker.C {         // ping属于redis内置健康探测敕令         if lim.store.Ping() {             // 健康探测告成,配置健康标识             atomic.StoreUint32(&lim.redisAlive, 1)             return         }     } } 
    名目地点

    https://github.com/zeromicro/go-zero

    迎策运用 go-zero 并 star 支持我们!