首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

控制请求并发数量:p-limit 源码解读

编程知识
2024年09月22日 22:02

p-limit 是一个控制请求并发数量的库,他的整体代码不多,思路挺好的,很有学习价值;

举例

当我们同时发起多个请求时,一般是这样做的

Promise.all([
    requestFn1,
    requestFn2,
    requestFn3
]).then(res =>{})

或者

requestFn1()
requestFn2()
requestFn3()

而使用 p-limit 限制并发请求数量是这样做的:

var limit = pLimit(8); // 设置最大并发数量为 8

var input = [ // Limit函数包装各个请求
    limit(() => fetchSomething('1')),
    limit(() => fetchSomething('2')),
    limit(() => fetchSomething('3')),
    limit(() => fetchSomething('4')),
    limit(() => fetchSomething('5')),
    limit(() => fetchSomething('6')),
    limit(() => fetchSomething('7')),
    limit(() => fetchSomething('8')),
];

// 执行请求
Promise.all(input).then(res =>{
    console.log(res)
})

上面 input 数组包含了 8limit 函数,每个 limit 函数包含了要发起的请求

当设置最大并发数量为 8 时,上面 8 个请求会同时执行

来看下效果,假设每个请求执行时间为1s

var fetchSomething = (str) => {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            console.log(str)
            resolve(str)
        }, 1000)
    })
}

当设置并发请求数量为 2

image

当设置并发请求数量为 3

image

p-limit 限制并发请求数量本质上是,在内部维护了一个请求队列;

当请求发起时,先将请求推入队列,判断当前执行的请求数量是否小于配置的请求并发数量,如果是则执行当前请求,否则等待正在发起的请求中谁请求完了,再从队列首部取出一个执行;

源码(v2.3.0)

pLimit 源码如下(这个源码是 v2.3.0 版本的,因为项目中引入的版本比较早。后面会分析从 2.3.0 到最新版本的源码,看看增加或者改进了什么):

'use strict';
const pTry = require('p-try');

const pLimit = concurrency => {
    // 限制为正整数
    if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
        return Promise.reject(new TypeError('Expected `concurrency` to be a number from 1 and up'));
    }

    const queue = []; // 请求队列
    let activeCount = 0; // 当前并发的数量

    const next = () => { // 一个请求完成时执行的回调
        activeCount--;

        if (queue.length > 0) {
            queue.shift()();
        }
    };

    const run = (fn, resolve, ...args) => { // 请求开始执行
        activeCount++;

        const result = pTry(fn, ...args);

        resolve(result); // 将结果传递给 generator

        result.then(next, next); // 请求执行完调用回调
    };

    // 将请求加入队列
    const enqueue = (fn, resolve, ...args) => {
        if (activeCount < concurrency) {
            run(fn, resolve, ...args);
        } else {
            queue.push(run.bind(null, fn, resolve, ...args));
        }
    };

    const generator = (fn, ...args) => new Promise(resolve => enqueue(fn, resolve, ...args));
    
    // 暴露内部属性给外界
    Object.defineProperties(generator, {
        activeCount: {
            get: () => activeCount
        },
        pendingCount: {
            get: () => queue.length
        },
        clearQueue: {
            value: () => {
                queue.length = 0;
            }
        }
    });

    return generator;
};

module.exports = pLimit;
module.exports.default = pLimit;

下面一一剖析下

1、pLimit 函数整体是一个闭包函数,返回了一个名叫 generator 的函数,由 generator 处理并发逻辑,
generator 返回值必须是 promise,这样才能被 Promise.all 捕获到

const generator = (fn,...args) => new Promise((resolve,reject)=7enqueue(fn,resolve,...args))

2、在 enqueue 函数里面

// 将请求加入队列
const enqueue = (fn, resolve, ...args) => {
    if (activeCount < concurrency) {
        run(fn, resolve, ...args);
    } else {
        queue.push(run.bind(null, fn, resolve, ...args));
    }
};

activeCount 表示正在执行的请求数量,当 activeCount 小于配置的并发数量(concurrency)时,则可以执行当前的 fn(执行 run 函数),否则推入请求队列等待。

3、run 函数接收了三个形参

const run = (fn, resolve, ...args) => { // 请求开始执行
    activeCount++;
    const result = pTry(fn, ...args);
    resolve(result);
    result.then(next, next);
};
  • fn 表示执行的请求,

  • resolvegenerator 定义并往下传,一直跟踪到请求执行完毕后,调用 resolve(result); 代表 generator 函数 fulfilled

  • ···args 表示其余的参数,最终会作为 fn 的参数。

4、执行 run 函数时

const run = (fn, resolve, ...args) => { // 请求开始执行
    activeCount++; // 请求开始执行,当前请求数量 +1

    const result = pTry(fn, ...args);

    resolve(result);

    result.then(next, next);
};

这里执行 fn 使用的是 const result = pTry(fn,...args)pTry 的作用就是创建一个 promise 包裹的结果,不论 fn 是同步函数还是异步函数

// pTry 源码
const pTry = (fn,...args) => new Promise((resolve,reject) => resolve(fn(...args)));

现在 fn 执行(fn(...args))完毕并兑现(resolve(fn(...args)))之后,result 就会兑现。

result 兑现后,generatorpromise 也就兑现了( resolve(result) ),那么当前请求 fn 的流程就执行完了。

5、当前请求执行完后,对应的当前正在请求的数量也要减一,activeCount--

const next = () => { // 一个请求完成时执行的回调
    activeCount--;

    if (queue.length > 0) {
        queue.shift()();
    }
};

然后继续从队列头部取出请求来执行

6、最后暴露内部属性给外界

Object.defineProperties(generator, {
    activeCount: { // 当前正在请求的数量
        get: () => activeCount
    },
    pendingCount: { // 等待执行的数量
        get: () => queue.length
    },
    clearQueue: {
        value: () => {
            queue.length = 0;
        }
    }
});

源码(v2.3.0)=> 源码(v6.1.0)

v2.3.0 到最新的 v6.1.0 版本中间加了一些改进

1、v3.0.0:始终异步执行传进 limit 的函数

image

3.0.0 中,作者将请求入队放在前面,将 if 判断语句和请求执行置于微任务中运行;正如源码注释中解释的:因为当 run 函数执行时,activeCount 是异步更新的,那么这里的 if 判断语句也应该异步执行才能实时获取到 activeCount 的值。

这样一开始批量执行 limit(fn) 时,将会先把这些请求全部放入队列中,然后再根据条件判断是否执行请求;

2、v3.0.2:修复传入的无效并发数引起的错误;

image

return Promise.reject 改为了直接 throw 一个错误

3、v3.1.0:移除 pTry 的依赖;改善性能;

image

移除了 pTry 依赖,改为了 async 包裹,上面有提到,pTry 是一个 promise 包装函数,返回结果是一个 promise;两者本质都是一样;

增加了 yocto-queue 依赖,yocto-queue是一个队列数据结构,用队列代替数组,性能更好;队列的入队和出队操作时间复杂度是 O(1),而数组的 shift()O(n);

4、v5.0.0:修复上下文传播问题

image

引入了 AsyncResource

export const AsyncResource = {
    bind(fn, _type, thisArg) {
        return fn.bind(thisArg);
    }
}

这里用 AsyncResource.bind() 包裹 run.bind(undefined, fn, resolve, args) ,其实不是太明白为啥加这一层。。。这里用的到三个参数(fn,resolve,args)都是通过函数传参过来的,和 this 没关系吧,各位知道的可以告知下么。

相关 issue这里

5、6.0.0:性能优化,主要优化的地方在下面

image

移除了 AsyncResource.bind(),改为使用一个立即执行的 promise,并将 promiseresolve 方法插入队列,一旦 resolve 完成兑现,调用相应请求;相关 issue这里

6、v6.1.0:允许实时修改并发限制数

image

改变并发数后立马再检测是否可以执行请求;


最后

在上面第4点的,第5点中的优化没太看明白,因为执行请求用的到三个参数(fn,resolve,args)都是通过函数传参过来的,看起来 this 没关系,为啥要进行多层 bind 绑定呢?各位知道的可以不吝赐教下么。

From:https://www.cnblogs.com/zsxblog/p/18426066
本文地址: http://www.shuzixingkong.net/article/2211
0评论
提交 加载更多评论
其他文章 Web刷题之polarctf靶场(2)
1.蜜雪冰城吉警店 点开靶场, 发现题目说点到隐藏奶茶(也就是第九杯)就给flag, 但是明显就只有八杯, 猜测大概率考的是前端代码修改 把id=1修改为id=9, 然后回到页面点击原味奶茶即可弹出flag #flag{7d43cc8863ad0ee649048e562fde53ec} 2.召唤神龙
Web刷题之polarctf靶场(2) Web刷题之polarctf靶场(2) Web刷题之polarctf靶场(2)
小美的数组合并(美团20240427年暑期实习笔试真题)
题目:小美的数组合并 小美拿到了一个数组,她每次操作可以将两个相邻元素ai合并为一个元素,合并后的元素为原来两个元素之和。小美希望最终数组的最小值不小于k。她想知道有多少种不同的合并结果? 输入描述 第一行输入两个正整数n,k,代表数组大小和数组的最大值。 第二行输入个正整数ai,代表小美拿到的数组
ConcurrentLinkedQueue详解(图文并茂)
前言 ConcurrentLinkedQueue是基于链接节点的无界线程安全队列。此队列按照FIFO(先进先出)原则对元素进行排序。队列的头部是队列中存在时间最长的元素,而队列的尾部则是最近添加的元素。新的元素总是被插入到队列的尾部,而队列的获取操作(例如poll或peek)则是从队列头部开始。 与
ConcurrentLinkedQueue详解(图文并茂) ConcurrentLinkedQueue详解(图文并茂) ConcurrentLinkedQueue详解(图文并茂)
数据结构与算法之间有何关系?
数据结构与算法是计算机科学中的两个重要概念,程序=算法+数据结构。数据结构管理数据,算法解决问题,两者相辅相成。数据类型是连接两者的桥梁,数据结构与算法既紧密相连又各有关注。
数据结构与算法之间有何关系? 数据结构与算法之间有何关系? 数据结构与算法之间有何关系?
手搓大模型Task01:LLama3模型讲解
前言 主要进行Qwen模型架构进行讲解。 1.Qwen整体介绍 Qwen的整体架构与Llama2类似,如下图所示: tokenizer将文本转为词表里面的数值。 数值经过embedding得到一一对应的向量。 attention_mask是用来看见左边、右边,双向等等来设定。 各类下游任务,Casu
手搓大模型Task01:LLama3模型讲解
基础数据结构之数组
数组 1) 概述 定义 在计算机科学中,数组是由一组元素(值或变量)组成的数据结构,每个元素有至少一个索引或键来标识 In computer science, an array is a data structure consisting of a collection of elements (v
基础数据结构之数组 基础数据结构之数组 基础数据结构之数组
C#/.NET/.NET Core技术前沿周刊 | 第 6 期(2024年9.16-9.22)
前言 C#/.NET/.NET Core技术前沿周刊,你的每周技术指南针!记录、追踪C#/.NET/.NET Core领域、生态的每周最新、最实用、最有价值的技术文章、社区动态、优质项目和学习资源等。让你时刻站在技术前沿,助力技术成长与视野拓宽。 欢迎投稿,推荐或自荐优质文章/项目/学习资源等。每周
C#/.NET/.NET Core技术前沿周刊 | 第 6 期(2024年9.16-9.22) C#/.NET/.NET Core技术前沿周刊 | 第 6 期(2024年9.16-9.22) C#/.NET/.NET Core技术前沿周刊 | 第 6 期(2024年9.16-9.22)
【解决方案】Java 互联网项目中常见的 Redis 缓存应用场景
本文梳理总结了一些 Java 互联网项目中常见的 Redis 缓存应用场景,例如常见的 String 类型 Key-Value、对时效性要求高的场景、Hash 结构的场景以及对实时性要求高的场景等,全面涵盖了 Redis 中所有的 5 种基本类型。