如何搞定接口并发?想学我教你呀!

关于限流

背景

最近和龙哥在做一款证件照生成项目,大概流程是,前端给一张 base64 图片,后端调用百度的 ai 抠图以及获取图片中人体关键点信息 接口,然后将结果合并返回给前端。

百度这两个服务的每日调用次数是够用的, 5w/日。但他对并发做了限制,默认 QPS = 2。

什么是 QPS

简单来说,就是服务每秒能接收到的最大请求,比如 QPS 为 2,那服务每秒最多接受两个请求,多出来的请求会被忽略掉。

按 QPS(并发量)计费: 调用量免费,免费 QPS 默认为 2,如果您通过百度云的企业认证,接口的免费 QPS 将扩充至 5。同时您可以根据业务需求随时购买扩充 QPS,QPS 可包月购买,也可按天单独购买,灵活多样,适应多场景需求。 当前人体关键点识别、人体检测与属性识别、人流量统计、人像分割、手势识别 5 个接口支持在线购买 QPS。 注意:同一个账号下多个应用共享接口 QPS。

这个 QPS 是非常贵的:

购买 QPS 数量 按月购买 按天购买
0<QPS<=10 500 元/月/QPS 50 元/天/QPS
10<QPS<=50 400 元/月/QPS 40 元/天/QPS
50<QPS<=100 300 元/月/QPS 30 元/天/QPS
100<QPS 200 元/月/QPS 20 元/天/QPS

解决 QPS 问题

因为我负责后端服务,作为中间人给前端做请求转发,所以要控制对第三方接口的调用量,不能让每秒并发超过 2(这个数值可以提升,取决于钱包厚度,但总有个上限)。

为了方便描述,百度 api 在下文中都被称作 第三方接口

试错法

这个方法很容易想到,当前端请求到达时,我先将它挂起,然后直接去调用第三方,如果返回结果符合预期,则直接将结果返回给前端,否则,等待一段时间,再发起请求,直到结果符合预期。

但仔细想想,这个方法问题很多:

  • 等待多少时间?
  • 用第三方请求结果去验证,本身就是对资源的一种消耗,十分不优雅。
  • 如果有大量前端并发请求到达,有很多无用的请求将发出。

这个方法肯定是行不通的。

计数器

误区

既然每秒并发为 2,那是不是意味着,1/2 秒 发一个请求?那我在 0-1/2s 时设置一个变量 count,当超过一时,则不受理请求。
但是仔细想想,这样也是有问题的。
举个例子:如果两个请求同时到达,按照我们的流程,它们将串行调用。但按照并发为 2 的限制,他们完全可以同时调用。

img

普通计数器

我们可以启动一个以 1s 为间隔的定时器,在一个周期内,设置一个计数器 count = 2。每当请求来临时,count -= 1。当 count 为 0,请求则挂起。等下个周期再执行。

img

但还是存在问题,当两个周期中间有请求调用时,我们的 count 就失效了,这里同时收到了三个请求,但间隔小于 1s。
img

滑动窗口计数器

普通计数器的问题在于临界值问题,在临界值,突然增多的请求会绕过我们的计数器策略,那能不能缩短时间间隔,来保证策略准确率的提升?这就引出了另一经典的并发限流算法:滑动窗口计数器。
img

在我们的案例中,我们可以将一秒切割成两个分别为 500ms 的格子,每个格子都有自己的 count,当请求到达时,累加所有格子的 count,小于并发数字,则执行下一步,否则挂起。
我们可以用数组来储存格子,比如 slide = [0],当前只有一个格子,且 count = 0。
随着时间的推移,格子数也增加,比如 500ms 时,slide = [0, 0]。
当 1s 时,由于总格子设置为 2,超出这个数字,则 slide 需要删除第一个格子:slide.unshift()。
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class Slide {
constructor() {
// 格子数组
this.node = [0];
// 最多能分两个格子
this.maxNode = 2;
// 最大并发数
this.qps = 2;
// 时间间隔
this.inter = (1 / this.qps) * 1000;
// 未执行的任务列表
this.task = [];
this.interval();
}
get(res) {
// 保存接口时间戳
const allCount = this.node.reduce((a, b) => a + b);
if (allCount < this.qps) {
this.node[this.node.length - 1] = this.node[this.node.length - 1] + 1;
console.log('执行后node', this.node[this.node.length - 1], res);
} else {
// 请求挂起
this.task.push(res);
}
}
interval() {
const id = setInterval(() => {
if (this.node.length >= this.maxNode) {
this.node.shift();
}
// 还能增加格子
this.node.push(0);
}, this.inter);
}
}
const slide = new Slide();

setTimeout(() => {
slide.get(300);
slide.get(300);
slide.get(300);
slide.get(300);
slide.get(300);
}, 100);
setTimeout(() => {
slide.get(260);
slide.get(260);
}, 260);
setTimeout(() => {
slide.get(500);
}, 500);
setTimeout(() => {
slide.get(800);
}, 800);
setTimeout(() => {
slide.get(1000);
}, 1000);
setTimeout(() => {
slide.get(1500);
}, 1500);
setTimeout(() => {
slide.get(1900);
slide.get(2000);
}, 1900);

这个算法完美了吗?不,还不够。至少它存在以下几个问题:

  • 时间间隔被分成了多个格子,很显然,定时器执行的越频繁,咱们系统的性能消耗的越多。
  • 我们需要额外的内存空间来存储格子,格子分的越多,需要的空间越大
  • 这个算法在一开始就可以接受大量请求,这是好事儿吗?不,你的摩托车在冬天启动前还要预热一下呢。

冷启动

所以这里也要普及一个知识点

Warm Up,即冷启动/预热的方式。当系统长期处于低水位的情况下,流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过”冷启动”,让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。

比如你预计正常系统并发是 1000,你写的滑动窗口算法很争气,系统刚启动,用户们就发来了 1000 个请求,因为没超过阈值,请求都应该被同时处理,但你的数据库也刚刚启动,各种缓存策略还没准备好,然后你的系统就和你没预热的摩托车一样半路熄火了。

令牌桶算法

这是我最终采用的算法,也是后端处理并发限流场景中用的比较多的算法。
img
令牌桶是一个桶,装令牌的(废话!)。它的容积有限,只能装 N 个令牌。刚开始是空的,我们的代码每隔一段时间向他加一个令牌,直到桶里的令牌数变为 N。

当请求到来时,先去令牌桶取令牌,如果有令牌,则请求可以被处理,处理完成,删除令牌。否则请求不能被处理或者暂时挂起之后有令牌了再处理(取决于你的业务场景!)

在我的业务场景中,不希望返回前端一个接口繁忙的错误,这样会增加前端的心智负担。当请求被挂起后,如果有新令牌被添加,则直接执行挂起的请求,对前端来说,只是等的稍微久了一点。

所以我对以上流程做了修改:

img

接下来讲讲代码流程。我是用的 eggjs 框架。
在请求方法中:

1
2
3
4
const bucketId = await this.app.tokenBucket.getBucket();
const bodyAndRmove = [this.getPassportPhoto(imgStr), this.removeBg(imgStr)];
const [bodyInfo, removeImg] = await Promise.all(bodyAndRmove);
this.app.tokenBucket.delBucket(bucketId);

getBucket 是获取令牌的函数,注意它是异步函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
'use strict';

module.exports = class TokenBucket {
constructor() {
// 2次/s
this.maxLimt = 2;
this.timer = 1000 * (1 / 2);
this.maxBucket = 2;
this.bucketList = [];
this.taskList = [];
this.interval();
}
getBucket() {
return new Promise((resolve) => {
const hasBucket = this.runTask({ resolve });
if (hasBucket) {
return;
}
console.log('no bucket, push in tasklist');
// 没令牌先把请求挂起,等有令牌时执行
this.taskList.push(resolve);
});
}
runTask({ resolve }) {
const item = this.bucketList.filter((item) => !item.used);
// 无可用令牌
if (!item.length) {
console.error('入栈');
return false;
}
console.log('执行', item.length, item);
const firstItem = item[0];
resolve(firstItem.id);
firstItem.used = true;
return true;
}
/** 请求完成,需删除对应令牌 */
delBucket(id) {
const index = this.bucketList.findIndex((item) => item.id === id);
if (index === -1) {
return;
}
console.log('删除令牌');
this.bucketList.splice(index, 1);
}
// 每隔 timer 时间,往bucket 里添加令牌,如果队列里有任务,添加完令牌立即执行任务
interval() {
console.log('令牌桶状态', this.bucketList);
if (this.bucketList.length < this.maxBucket) {
const random = new Date().getTime() + Math.floor(Math.random() * 1000);
this.bucketList.push({
id: random,
used: false,
});
}
const item = this.bucketList.filter((item) => !item.used);
// 有令牌,队列有任务,优先执行
if (item.length && this.taskList.length) {
console.log('运行队列任务');
const resolve = this.taskList.shift();
this.runTask({
resolve,
});
}
const id = setTimeout(() => {
this.interval();
clearTimeout(id);
}, this.timer);
}
};