大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发!
1.Node.js CPU 密集型困境
执行 IO 绑定操作是 Nodejs 应用程序的亮点,例如:响应 Http 请求、数据库访问、服务器间对话。 这是因为 Nodejs 的单线程特性使得能够以较低的系统资源消耗快速处理许多请求。
但是,执行 CPU 密集型操作(例如:计算斐波那契数、素数、机器学习等)将使 Nodejs 应用程序陷入困境,因为无论机器有多少核(Core),Nodejs 都只使用 CPU 的单个核。
因此,如果在 Web 应用程序的上下文中运行繁重的 CPU 密集型操作,Nodejs 的单线程将被阻塞,因此 Web 服务器将无法响应任何请求,因为其忙于计算本身。
下面示例都将在 Web 应用程序的上下文中进行,但相同的逻辑适用于任何类型的 Nodejs 应用程序。
const express = require('express');
const app = express();
app.get('/getfibonacci', (req, res) => {
const startTime = new Date();
const result = fibonacci(parseInt(req.query.number));
// 数据转换
const endTime = new Date();
res.json({
number: parseInt(req.query.number),
fibonacci: result,
time: endTime.getTime() - startTime.getTime() + 'ms',
});
});
app.get('/simple', (req, res) => {
res.json({ data: 'simple' });
});
const fibonacci = (n) => {
if (n <= 1) {
return 1;
}
return fibonacci(n - 1) + fibonacci(n - 2);
};
app.listen(3000, () => console.log('listening on port 3000'));
在上面的示例中,在浏览器中直接访问/simple 没有任何问题,服务器直接返回。
然而,如果在浏览器中首先访问/getfibonacci?number=50,然后再访问/simple 就会发现浏览器一种在 loading,直到前一次访问返回后第二次访问结果才能正常返回。
即当 Nodejs 运行繁重 CPU 密集型操作,单线程完全被阻塞! 如下图所示,单核 CPU 完成了所有任务。
2.Promise 能解决其他请求阻塞但自己除外
很多人可能会有这样的疑问:Promise 不是可以通过异步执行操作来解决此类问题吗?一起来看下面的示例:
// serverWithPromises.js
const express = require('express');
const app = express();
app.get('/isprime', async (req, res) => {
const startTime = new Date();
const result = await isPrime(parseInt(req.query.number));
const endTime = new Date();
res.json({
number: parseInt(req.query.number),
isprime: result,
time: endTime.getTime() - startTime.getTime() + 'ms',
});
});
app.get('/testrequest', (req, res) => {
res.send('I am unblocked now');
});
const isPrime = (number) => {
return new Promise((resolve) => {
let isPrime = true;
for (let i = 3; i < number; i++) {
if (number % i === 0) {
isPrime = false;
break;
}
}
resolve(isPrime);
});
};
app.listen(3000, () => console.log('listening on port 3000'));
很显然,结果依然是不能!。当访问地址/isprime?number=936868033 后,/testrequest 的访问依然会被阻塞。
这是因为,即使 Promise 是异步运行的,Promise 执行器函数(内部 for 循环等等)也会被同步调用,并且会阻塞应用程序。
Promise 之所以在 JavaScript 社区中被推崇为“异步非阻塞操作”的一种方式,是因为 Promise 擅长完成需要更多时间而不是更多 CPU 的工作。 这里所说的“需要更多时间的工作”通常包括:数据库通信、跨服务器通信等,这页是 Web 服务器所做的 99% 的工作。
JavaScript Promise 通过将任务推送到特殊队列并侦听事件(例如:数据库返回)发生并在该事件发生时执行一个回调函数,这就是著名的事件循环。
const express = require('express');
const app = express();
const fetch = require('node-fetch');
// node-fetch 用于在Nodejs中发送情况
app.get('/calltoslowserver', async (req, res) => {
const result = await fetch('http://localhost:5000/slowrequest');
// 返回一个Promise
const resJson = await result.json();
res.json(resJson);
});
app.get('/slowrequest', (req, res) => {
setTimeout(() => res.json({ message: 'sry i was late' }), 10000);
// setTimeout 模拟10s的时间花销,即使花销很长时间也不会阻塞其他访问
});
app.get('/testrequest', (req, res) => {
res.send('I am unblocked now');
});
app.listen(4000, () => console.log('listening on port 4000'));
在上面的示例中可以看到,即使对 /slowrequest 或者 /calltoslowserver 的调用花费了很长时间,所有其他请求,比如:/testrequest,都没有被阻止。 这是因为 node-fetch 的 fetch 函数返回一个 Promise,而 这种单线程、非阻塞、异步的处理方式是 Nodejs 中默认的。
3.Nodejs CPU 密集型三种解决方案
Node js 提供了三种解决方案来解决上面问题。
2.1 child_process
child_process 模块提供了生成拥有自己内存的新进程的能力, 这些进程之间的通信是通过 OS 提供的 IPC(Inter-process Communication)建立。
该模块内部主要有 3 个方法:
- child_process.spawn()
- child_process.fork()
- child_process.exec()
2.1.1 child_process.spawn()
该方法用于异步生成子进程,该子进程可以是允许终端运行的任何命令。spawn 采用以下语法:
spawn('comand to run', 'array of arguments', optionsObject);
下面的代码使用参数 -lash 和查询字符串中的目录名称生成一个 ls 进程,并将其输出发回。
const express = require('express');
const app = express();
const { spawn } = require('child_process');
// 导入child_process子进程
app.get('/ls', (req, res) => {
const ls = spawn('ls', ['-lash', req.query.directory]);
ls.stdout.on('data', (data) => {
// stdin,stdout,stderr 管道(connection)是通过父级parent建立的
// Node.js 进程和生成的子进程,可以在标准输出上监听数据事件
res.write(data.toString());
// 日期将以流(数据块)的形式出现
// 由于 res 是一个可写流,支持写入
});
ls.on('close', (code) => {
console.log(`child process exited with code ${code}`);
res.end();
// 最后,当子进程退出时,所有写入的流都会被发送回来
});
});
app.listen(7000, () => console.log('listening on port 7000'));
下面是输出结果:
开发者可以自由生成一个 Nodejs 进程并执行另一个任务,但 fork() 是一种更好的方法。
2.1.2 child_process.fork()
child_process.fork() 专门用于生成新的 Nodejs 进程。 与 spawn 一样,返回的 childProcess 对象将具有内置的 IPC 通信通道,允许消息在父进程和子进程之间来回传递。
fork 采用以下语法:
fork('path to module', 'array of arguments', 'optionsObject');
使用 fork()可以生成一个单独的 nodejs 进程并在该进程中执行函数,在完成时将答案返回给父进程来解决此类问题。 这样,父进程就不会被阻塞,从而可以继续响应请求。
比如下面的 childforkServer.js 代码示例:
const express = require('express');
const app = express();
const { fork } = require('child_process');
app.get('/isprime', (req, res) => {
const childProcess = fork('./forkedchild.js');
// fork() 的第一个参数是子进程要运行的 js 文件的名称
childProcess.send({ number: parseInt(req.query.number) });
// send方法用于通过IPC向子进程发送消息
const startTime = new Date();
childProcess.on('message', (message) => {
// on("message")方法用于监听子进程发送的消息
const endTime = new Date();
res.json({
...message,
time: endTime.getTime() - startTime.getTime() + 'ms',
});
});
});
app.get('/testrequest', (req, res) => {
res.send('I am unblocked now');
});
app.listen(3636, () => console.log('listening on port 3636'));
forkedchild.js 的代码如下:
process.on('message', (message) => {
//子进程正在监听父进程的消息
const result = isPrime(message.number);
process.send(result);
process.exit();
// 确保使用 exit() 来防止孤立进程
});
function isPrime(number) {
let isPrime = true;
for (let i = 3; i < number; i++) {
if (number % i === 0) {
isPrime = false;
break;
}
}
return {
number: number,
isPrime: isPrime,
};
}
此时当访问 /isprime?number=29355126551 时候虽然浏览器也会一直处于 loading 等待直到返回,但是此时不会阻塞后续的其他请求,比如:/testrequest。
需要注意的是:应用为每个 child process即子进程分配单独内存,这意味着存在时间和资源的额外开销。
2.2 cluster
cluster 主要用于垂直(为现有机器添加更多功能)扩展 Nodejs Web 服务器,构建在 child_process 模块之上。 在 Http 服务器中,cluster 模块使用 child_process.fork() 自动 fork 进程并建立主从架构,其中父进程以循环方式将传入请求分发给子进程。 理想情况下,fork 的进程数应等于计算机具有的 CPU 核数。
下面示例使用 cluster 模块构建一个 Express 服务器:
const cluster = require('cluster');
const http = require('http');
const cpuCount = require('os').cpus().length;
// 返回cpu的核数
if (cluster.isMaster) {
masterProcess();
} else {
childProcess();
}
// 父进程
function masterProcess() {
console.log(`Master process ${process.pid} is running`);
// fork()更多 workers
for (let i = 0; i < cpuCount; i++) {
console.log(`Forking process number ${i}...`);
cluster.fork();
//creates new node js processes
}
cluster.on('exit', (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
cluster.fork(); //forks a new process if any process dies
});
}
// 子进程
function childProcess() {
const express = require('express');
const app = express();
// workers 可以共享 TCP connection
app.get('/', (req, res) => {
res.send(`hello from server ${process.pid}`);
});
app.listen(5555, () =>
console.log(`server ${process.pid} listening on port 5555`)
);
}
当运行上面的代码时,cluster.isMaster 第一次为 true 并且 masterProcess() 函数被执行。masterProcess()函数 fork()了 4 个 NodeJS 进程(依赖于设备 CPU 的核数),每当 fork()另一个进程时,都会再次运行相同的文件,但 cluster.isMaster 将返回 false,因为该进程现在是一个子进程,因为是 fork()的,因此控制转到 else 条件。
最终 childProcess() 函数执行了 4 次,并创建了 4 个 Express 服务器实例,后续请求以循环方式分发到四台服务器,从而充分利用机器的 CPU。 Node js 文档还指出,有一些内置的智能功能可以避免工作进程过载。
Cluster 模块是垂直扩展简单 Nodejs 服务器的最简单、最快的方法。 但是,为了实现更高级和弹性的扩展,可以使用 docker 容器和 Kubernetes 等工具。
2.3 worker threads
本质上,工作线程和子进程之间的区别就像线程和进程之间的区别一样。理想情况下,创建的线程数应等于 cpu 核数。
接下来比较默认的单线程、多线程、工作线程的性能。下面是 singleThreadServer.js 的示例代码:
const express = require('express');
const app = express();
// 求和
function sumOfPrimes(n) {
var sum = 0;
for (var i = 2; i <= n; i++) {
for (var j = 2; j <= i / 2; j++) {
if (i % j == 0) {
break;
}
}
if (j > i / 2) {
sum += i;
}
}
return sum;
}
app.get('/sumofprimes', (req, res) => {
const startTime = new Date().getTime();
const sum = sumOfPrimes(req.query.number);
const endTime = new Date().getTime();
res.json({
number: req.query.number,
sum: sum,
timeTaken: (endTime - startTime) / 1000 + ' seconds',
});
});
app.listen(6767, () => console.log('listening on port 6767'));
输入数据如下,数据显示:计算 60 万素数之和大约需要 50 秒。
下面是 sumOfPrimesWorker.js 的示例代码:
const { workerData, parentPort } = require('worker_threads');
// workerData 将是 multiThreadServer.js 中 Worker 构造函数的第二个参数
const start = workerData.start;
const end = workerData.end;
var sum = 0;
for (var i = start; i <= end; i++) {
for (var j = 2; j <= i / 2; j++) {
if (i % j == 0) {
break;
}
}
if (j > i / 2) {
sum += i;
}
}
parentPort.postMessage({
// 将结果消息发送回父进程
start: start,
end: end,
result: sum,
});
下面是 multiThreadServer.js 的示例代码:
const express = require('express');
const app = express();
const { Worker } = require('worker_threads');
function runWorker(workerData) {
return new Promise((resolve, reject) => {
// 第一个参数是worker的文件名
const worker = new Worker('./sumOfPrimesWorker.js', {
workerData,
});
worker.on('message', resolve);
// 当数据从worker线程返回的时候改Promise为resolve
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
}
function divideWorkAndGetSum() {
// 为了简单起见,对值 600000 进行硬编码分成4等份
const start1 = 2;
const end1 = 150000;
const start2 = 150001;
const end2 = 300000;
const start3 = 300001;
const end3 = 450000;
const start4 = 450001;
const end4 = 600000;
// 为每一个worker单独分配内容
const worker1 = runWorker({ start: start1, end: end1 });
const worker2 = runWorker({ start: start2, end: end2 });
const worker3 = runWorker({ start: start3, end: end3 });
const worker4 = runWorker({ start: start4, end: end4 });
// 要求所有都resolve
return Promise.all([worker1, worker2, worker3, worker4]);
}
app.get('/sumofprimeswiththreads', async (req, res) => {
const startTime = new Date().getTime();
const sum = await divideWorkAndGetSum()
.then(
(
values
//values is an array containing all the resolved values
) => values.reduce((accumulator, part) => accumulator + part.result, 0)
//reduce is used to sum all the results from the workers
)
.then((finalAnswer) => finalAnswer);
const endTime = new Date().getTime();
res.json({
number: 600000,
sum: sum,
timeTaken: (endTime - startTime) / 1000 + ' seconds',
});
});
app.listen(7777, () => console.log('listening on port 7777'));
上面将 worker 分为 4 个相等的部分,并将每个部分分配给一个 worker 并行执行任务。
此时消耗的时间数据如下:
总体来看,时间缩短为只需要单线程服务器一半。而下图展示了 4 个线程(工作线程)在 cpu 的所有 4 个核上运行。
4.本文总结
尽管 Node js 为多线程提供了强大的支持,但这并不一定意味着开发者应该始终使 Web 应用程序成为多线程。
Node js 的构建方式是默认的单线程行为优于 Web 服务器的多线程行为,因为 Web 服务器往往是 IO 绑定的,而 Nodejs 非常适合用最少的系统资源处理异步 IO 操作 。
同时,线程或进程的额外开销和复杂性使得程序员很难处理简单的 IO 任务。 但在某些情况下,Web 服务器执行 CPU 密集型操作,在这种情况下,启动工作线程或子进程确实是有必要的。 因此,设计架构实际上可以归结为应用程序的需求和要求,应该据此做出决策。
参考资料
https://alvinlal.netlify.app/blog/single-thread-vs-child-process-vs-worker-threads-vs-cluster-in-nodejs
https://medium.com/dkatalis/eventloop-in-nodejs-ways-to-block-it-and-ways-to-avoid-b60a65bab2be
https://app.daily.dev/posts/IDNwogRC4
https://dev.to/arealesramirez/is-node-js-single-threaded-or-multi-threaded-and-why-ab1