限流
最近做个benchmark工具往服务端压测数据,由于服务端的承载能力优先,需要限流进行平滑压测。打算使用令牌桶进行限流。 令牌桶算法以一个设定的速率产生令牌并放入令牌桶,每次用户请求都得申请令牌,如果令牌不足,则拒绝请求。
令牌桶
令牌桶算法中新请求到来时会从桶里拿走一个令牌,如果桶内没有令牌可拿,就拒绝服务。当然,令牌的数量也是有上限的。令牌的数量与时间和发放速率强相关,时间流逝的时间越长,会不断往桶里加入越多的令牌,如果令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶,如图所示。
令牌桶限流大致的规则如下:
- 进水口按照某个速度,向桶中放入令牌。
- 令牌的容量是固定的,但是放行的速度不是固定的,只要桶中还有剩余令牌,一旦请求过来就能申请成功,然后放行。
- 如果令牌的发放速度,慢于请求到来速度,桶内就无牌可领,请求就会被拒绝。
总之,令牌的发送速率可以设置,从而可以对突发的出口流量进行有效的应对。
算法
比如压测QPS是10000
- 那我们设置令牌总数为10000
- 每次发送时获取令牌,获取令牌失败,不可发送;获取成功,则令牌总数减一
- 每秒重置令牌总数为10000
这样实现的是个最简单的令牌桶,但是这个令牌桶有个缺陷,如果发送请求不怎么耗时,那么客户端可能在1s的头1ms就将令牌耗尽,后面999ms没有令牌可用,仍然会造成后端流量的不平滑
算法改进1
1s的时间间隔太久,我们可以将1s按1ms的窗口进行切割
比如压测QPS是10000
- 将1s按1ms进行切割,1s分成1000个窗口,每个窗口的令牌数10个
- 每次发送时,获取令牌,获取令牌失败,不可发送;获取成功,则令牌总数减一
- 每过1ms,令牌总数加10
算法改进2
上面的算法有两个问题:
- 如果设置的qps < 1000, 1000个窗口,每个窗口不足1个令牌
- 如果设置的qps 是1999呢,除以1000个窗口等于1,余数却有999,每个窗口的令牌是1还是2呢?
改进:
- 如果qps小于等于1000,则将窗口数量设置为qps的大小,每个窗口一个令牌
- 如果qps > 1000, 这里的策略是多发,比如1999除以1000个窗口,余数不为0,则先定每个窗口发送2个,同时也要记录已使用的令牌总数,保证1s的令牌总数不超过1999,这样即保证了平滑发送,也不会超过总数
单线程实现
实现结果:
qps设置为1999:
1000个窗口,前999个每个窗口2个令牌,最后一个是1个令牌
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| #define DEFAULT_WIN_SIZE 1000 class Controller { public: Controller() {} ~Controller() {}
bool CanSend(int64_t cur_us) { if (cur_us >= m_win_begin_us_ + 1000000LL) { Reset(cur_us); }
int64_t idx = (cur_us - m_win_begin_us_) * (int64_t)m_win_size_ / 1000000LL; if (idx >= (int64_t)m_win_size_ || idx < 0) { return false; }
if (idx != m_cur_win_idx_) { m_cur_win_idx_ = idx; int32_t left_token = m_speed_ - m_used_token_; int32_t left_win_size = m_win_size_ - m_cur_win_idx_; m_cur_token_ = left_token / left_win_size; if (left_token % left_win_size != 0) { m_cur_token_++; } }
if (m_cur_token_ <= 0 || m_used_token_ >= m_speed_) { return false; } m_cur_token_--; m_used_token_++; return true; }
void SetSpeed(int32_t speed, int64_t cur_us) { m_speed_ = speed; if (m_speed_ <= DEFAULT_WIN_SIZE) { m_win_size_ = m_speed_; } else { m_win_size_ = DEFAULT_WIN_SIZE; } Reset(cur_us); }
private: void Reset(int64_t cur_us) { m_win_begin_us_ = cur_us; m_cur_win_idx_ = 0; m_used_token_ = 0; m_cur_token_ = m_speed_ / m_win_size_; if (m_speed_ % m_win_size_ != 0) { m_cur_token_++; } }
private: int32_t m_speed_; int32_t m_cur_token_; int32_t m_used_token_;
int64_t m_win_begin_us_ = 0; int32_t m_cur_win_idx_; int32_t m_win_size_; };
|