简单梳理Node.js创建子进程的方法(下)—— cluster

前文简单梳理了Node.js使用child_process模块创建子进程的4种方法,execexecFileforkspawn。接下来我们看看cluster模块如何创建子进程,后续更多内容会介绍cluster.fork启动Net Server时候为何不会因为共同监听同一个端口而不报错。

cluster

  • fork: 衍生出一个新的工作进程,这只能通过主进程调用。


翻翻源码看看他们怎么实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// libuv
#define UV_VERSION_MAJOR 1
#define UV_VERSION_MINOR 33
#define UV_VERSION_PATCH 1

// V8
#define V8_MAJOR_VERSION 7
#define V8_MINOR_VERSION 8
#define V8_BUILD_NUMBER 279
#define V8_PATCH_LEVEL 17

// Node.js
#define NODE_MAJOR_VERSION 14
#define NODE_MINOR_VERSION 0
#define NODE_PATCH_VERSION 0

cluster源码位置

1
2
3
4
5
6
7
8
9
10
lib
- internal
- cluster
- child.js
- master.js
- round_robin_handle.js
- shared_handle.js
- utils.js
- worker.js
- cluster.js


官方示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
console.log(`主进程 ${process.pid} 正在运行`);

// 衍生工作进程。
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 已退出`);
});
} else {
// 工作进程可以共享任何 TCP 连接。
// 在本例子中,共享的是 HTTP 服务器。
http.createServer((req, res) => {
res.writeHead(200);
res.end('你好世界\n');
}).listen(8000);

console.log(`工作进程 ${process.pid} 已启动`);
}


Q:cluster.isMaster模块如何区分是主进程还是子进程的?

1
2
3
// lib/cluster.js
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
module.exports = require(`internal/cluster/${childOrMaster}`);

lib/cluster.js是整个cluster的入口,根据环境变量中是否有NODE_UNIQUE_ID来区分主或子进程。

主进程通过cluster.fork创建子进程的时候,会将NODE_UNIQUE_ID传入子进程的环境变量中,最后通过child_process.fork去创建新的子进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// lib/internal/cluster/master.js
// ...
cluster.isMaster = true;
// ...
// fork的代码下文还会提到
const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
return fork(cluster.settings.exec, cluster.settings.args, {
// ...
env: workerEnv,
});

// lib/internal/cluster/child.js
// ...
cluster.isMaster = false;

Q:cluster.fork创建子进程过程中做了些什么?

cluster.setupMaster是对整个环境参数的配置;

通过createWorkerProcess里的child_process.fork去创建子进程;

之后将其和一个Worker对象做关联,worker、其子进程和当前cluster(master)都会收到几乎相同的messageexitdisconnect事件,Worker这边不多扩展,可以查阅lib/internal/cluster/worker.js

子进程会监听internalMessage事件,什么是internalMessage事件呢?看看下面官方的介绍。

当发送 {cmd: ‘NODE_foo’} 消息时有一种特殊情况。 cmd 属性中包含 NODE_ 前缀的消息是预留给 Node.js 内核内部使用的,将不会触发子进程的 ‘message’事件。 相反,这种消息可使用 ‘internalMessage’ 事件触发,且会被 Node.js 内部消费。 应用程序应避免使用此类消息或监听‘internalMessage’ 事件,因为它可能会被更改且不会通知。

此处internalMessage事件的回调方法是internal(worker, onmessage)internallib/internal/cluster/master.js里的方法,主要作用是判断监听的消息里面是否存在需要执行的回调,如果没有就会执行入参回调,这里指的是onmessage

onmessage里面有很多if-else语句,主要是根据cluster.child传送进来的消息类型(act)做出不同的处理,这里列出了一个queryServer(因为后面会介绍Net Server里多个子进程如何监听同一个端口的)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// lib/internal/cluster/master.js
cluster.fork = function (env) {
cluster.setupMaster();
const id = ++ids;
// 创建子进程
const workerProcess = createWorkerProcess(id, env);
//
const worker = new Worker({
id: id,
process: workerProcess, // 新建的子进程
});

worker.on("message", function (message, handle) {
cluster.emit("message", this, message, handle);
});
worker.process.once("exit", (exitCode, signalCode) => {
// ...
worker.state = "dead";
worker.emit("exit", exitCode, signalCode);
cluster.emit("exit", worker, exitCode, signalCode);
});
worker.process.once("disconnect", () => {
// ...
worker.state = "disconnected";
worker.emit("disconnect");
cluster.emit("disconnect", worker);
});

worker.process.on("internalMessage", internal(worker, onmessage));
// 触发 fork 事件
process.nextTick(emitForkNT, worker);
cluster.workers[worker.id] = worker;
return worker;
};

function createWorkerProcess(id, env) {
const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
// ... 对一些参数的组合和设定
// 调用child_process的fork方法创建子进程
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,
stdio: cluster.settings.stdio,
gid: cluster.settings.gid,
uid: cluster.settings.uid,
});
}
function onmessage(message, handle) {
const worker = this;
// ... if message.act 类型很多 这里主要讲 queryServer
if (message.act === "queryServer") queryServer(worker, message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// lib/internal/cluster/utils.js
// 在cluster的chilid和master里的send都会调用sendHelper
let seq = 0;
function sendHelper(proc, message, handle, cb) {
if (!proc.connected) return false;
// NODE_* 开头的命令触发 internalMessage
message = { cmd: "NODE_CLUSTER", ...message, seq };

if (typeof cb === "function") callbacks.set(seq, cb); // 缓存回调

seq += 1;
// cluster/child.js handle => null
// cluster/master.js handle => null
return proc.send(message, handle);
}

以上就是cluster.fork的大致过程,引入一个Worker和internalMessage概念,之后会用得到。cluster的child和master之间传输信息,都是通过sendHelper方法。

Q:cluster.fork创建的子进程如何共同监听TCP端口?

解答这个问题,主要是看net模块如何创建Server的,还有就是cluster中childmaster如何通信的。

官方示例虽然用的是http创建的服务,但它底层是继承的net模块,为了方便梳理,我们从net.createServer入手一步步查看源码,主要的逻辑从listen开始。

Child部分:

cluster.child里创建一个TCP服务,参数port是8000,host没有传参;

调用listenInCluster方法,一看这名字就知道和cluster有关;

listen是在子进程里触发的,它会通过cluster._getServer拼出一个act为serverQuery的消息发送给cluster.master;

Master部分:

前文提到,master.onmessage方法会根据消息的act不同而做出不同的处理,此处正是serverQuery

进入queryServer方法,默认使用RoundRobinHandle循环分配任务;

RoundRobinHandle构造函数中,会调用net.createServer创建一个Server,由于是在cluster.master里调用的,所以会在listenInCluster里调用server._listen2,会new 出 TCP(src/tcp_wrap.cc)作为句柄,并将其赋给server._handle,至此cluster.master已经拥有了处理TCP请求的能力,不过master有该能力是不行的,还需要让child拥有才行;

RoundRobinHandle中一旦server触发了listening事件后,它会接管server._handle,用distribute重置其onconnection方法;

distribute的作用就是转发新的请求TCP给cluster.child,从free列队中取出一个之前add进来的worker(这个worker和cluster.child有关联关系),发送一个actnewconn的消息让其处理这个TCP;

回到Child部分:

cluster.child的onconnection收到actnewconn的请求后,会找到之前的创建的server,然后调用其onconnection(child里的该方法没有被重置)方法,然后封装出一个Socket对象,触发onnection事件;

到此,后面的就是普通业务逻辑代码了。

下面贴上了部分相关代码,还是比较多的,细节部分我也加上了注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// lib/net.js
function createServer(options, connectionListener) {
return new Server(options, connectionListener);
}
function Server(options, connectionListener) {
// ... 大量内置属性和参数的初始化
}
Server.prototype.listen = function (...args) {
// ...
// 传了port参数(8000),没有host
var backlog;
if (typeof options.port === "number" || typeof options.port === "string") {
backlog = options.backlog || backlogFromArgs;
if (options.host) {
// ...
} else {
//
listenInCluster(
this,
null,
options.port | 0,
4,
backlog,
undefined,
options.exclusive
);
}
return this;
}
// ...
};


function listenInCluster(
server,
address,
port,
addressType,
backlog,
fd,
exclusive,
flags
) {
exclusive = !!exclusive;

if (cluster === undefined) cluster = require("cluster");

if (cluster.isMaster || exclusive) {
server._listen2(address, port, addressType, backlog, fd, flags);
return;
}

const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
cluster._getServer(server, serverQuery, listenOnMasterHandle);

function listenOnMasterHandle(err, handle) {
// ...
server._handle = handle;
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// lib/internal/cluster/child.js
cluster._getServer = function (obj, options, cb) {
let address = options.address;
// ...
// 当前创建的server信息是否之前已经在cluster.child里查询过
// 有的话就累加计数index值
const indexesKey = [
address,
options.port,
options.addressType,
options.fd,
].join(":");

let index = indexes.get(indexesKey);

if (index === undefined) index = 0;
else index++;

indexes.set(indexesKey, index);

const message = {
act: "queryServer",
index,
data: null,
...options,
};

// ...

send(message, (reply, handle) => {
// ...
if (handle) shared(reply, handle, indexesKey, cb);
// Shared listen socket.
else rr(reply, indexesKey, cb); // Round-robin 返回的 handle 是null
});
// ...
};
function rr(message, indexesKey, cb) {
if (message.errno) return cb(message.errno, null);

var key = message.key;

function listen(backlog) {
return 0;
}

function close() {
if (key === undefined) return;
send({ act: "close", key });
handles.delete(key);
indexes.delete(indexesKey);
key = undefined;
}

function getsockname(out) {
if (key) Object.assign(out, message.sockname);
return 0;
}

const handle = { close, listen, ref: noop, unref: noop };

if (message.sockname) {
handle.getsockname = getsockname; // TCP handles only.
}

assert(handles.has(key) === false);
handles.set(key, handle);
cb(0, handle); // 将封装好的handle,作为listenInCluster的回调handle,赋给server._handle
}
// Round-robin connection.
function onconnection(message, handle) {
const key = message.key; // maseter里的key
const server = handles.get(key); // 是client创建的server?
const accepted = server !== undefined;

send({ ack: message.seq, accepted }); // 答复 master

// 虽然cluster.child rr里没有为server绑定,onconnection
// 但是在cb回到net.js里,后面的逻辑绑定了onconnection方法
if (accepted) server.onconnection(0, handle);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// lib/internal/cluster/master.js
function queryServer(worker, message) {
// worker是cluster.child worker
const key =
`${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
var handle = handles.get(key);

if (handle === undefined) {

// 默认是RoundRobin,Shared模式暂不讨论有兴趣可以看源码
var constructor = RoundRobinHandle;
// ...
handle = new constructor(
key,
address,
message.port,
message.addressType,
message.fd,
message.flags
);
handles.set(key, handle);
}
// ...
// 将cluster.child的worker添加到handle中
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
send(
worker,
{
errno,
key,
ack: message.seq,
data,
...reply,
},
handle // round_robin_handle 里返回的是一个null
);
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
this.key = key;
this.all = new Map();
this.free = [];
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);

if (fd >= 0) this.server.listen({ fd });
else if (port >= 0) {
this.server.listen({
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else this.server.listen(address); // UNIX socket path.

this.server.once("listening", () => {
this.handle = this.server._handle;
// 重置onconnection方法
// distribute 做任务派发
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}

RoundRobinHandle.prototype.add = function (worker, send) {
assert(this.all.has(worker.id) === false);
this.all.set(worker.id, worker);

const done = () => {
if (this.handle.getsockname) {
// tcp\udp 会有getsockname
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}

this.handoff(worker); // In case there are connections pending.
};

if (this.server === null) return done();

// Still busy binding.
this.server.once("listening", done);
this.server.once("error", (err) => {
send(err.errno, null);
});
};

RoundRobinHandle.prototype.distribute = function (err, handle) {
this.handles.push(handle);
const worker = this.free.shift();

if (worker) this.handoff(worker);
};

RoundRobinHandle.prototype.handoff = function (worker) {
// worker如果不存在那就跳出
if (this.all.has(worker.id) === false) {
return; // Worker is closing (or has closed) the server.
}

const handle = this.handles.shift(); // 取出第一个待处理任务

if (handle === undefined) {
this.free.push(worker); // 没有的话就会将worker归还到free里
return;
}

const message = { act: "newconn", key: this.key };

sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted) handle.close();
else this.distribute(0, handle); // Worker is shutting down. Send to another.

this.handoff(worker);
});
};

总结:

cluster利用child_process的fork方法创建子进程,并传入新的环境变量NODE_UNIQUE_ID用于区分主子进程从而在require(‘cluster’)时候可以加载到对应的master.jschild.js文件。另外在默认的RoundRobinHandle模式下,cluster子进程之所以可以共同监听同个TCP端口,是在net模块里面做了master和child区分,child并没有真正的监听端口,而是child会去master查询该Server是否已经存在,如果没有会在RoundRobinHandle中创建中创建server,一旦有新的TCP连接进入,会转发给free里的worker(cluster.child)处理。

Istio简单概念 简单梳理Node.js创建子进程的方法(上)

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×