前文简单梳理了Node.js使用child_process 模块创建子进程的4种方法,exec
、execFile
、fork
和spawn
。接下来我们看看cluster 模块如何创建子进程,后续更多内容会介绍cluster.fork启动Net Server时候为何不会因为共同监听同一个端口而不报错。
cluster
fork : 衍生出一个新的工作进程,这只能通过主进程调用。
翻翻源码看看他们怎么实现的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 // libuv #define UV_VERSION_ MAJOR 1 #define UV_VERSION_ MINOR 33 #define UV_VERSION_ PATCH 1 // V8 #define V8_MAJOR_ VERSION 7 #define V8_MINOR_ VERSION 8 #define V8_BUILD_ NUMBER 279 #define V8_PATCH_ LEVEL 17 // Node.js #define NODE_MAJOR_ VERSION 14 #define NODE_MINOR_ VERSION 0 #define NODE_PATCH_ VERSION 0
cluster 源码位置
1 2 3 4 5 6 7 8 9 10 lib - internal - cluster - child.js - master.js - round_robin_ handle.js - shared_handle.js - utils.js - worker.js - cluster.js
官方示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 const cluster = require ('cluster' );const http = require ('http' );const numCPUs = require ('os' ).cpus ().length ;if (cluster.isMaster ) { console .log (`主进程 ${process.pid} 正在运行` ); for (let i = 0 ; i < numCPUs; i++) { cluster.fork (); } cluster.on ('exit' , (worker, code, signal ) => { console .log (`工作进程 ${worker.process.pid} 已退出` ); }); } else { http.createServer ((req, res ) => { res.writeHead (200 ); res.end ('你好世界\n' ); }).listen (8000 ); console .log (`工作进程 ${process.pid} 已启动` ); }
Q:cluster.isMaster模块如何区分是主进程还是子进程的? 1 2 3 const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master' ;module .exports = require (`internal/cluster/${childOrMaster} ` );
lib/cluster.js
是整个cluster的入口,根据环境变量中是否有NODE_UNIQUE_ID
来区分主或子进程。
主进程通过cluster.fork
创建子进程的时候,会将NODE_UNIQUE_ID
传入子进程的环境变量中,最后通过child_process.fork 去创建新的子进程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 cluster.isMaster = true ; const workerEnv = { ...process.env , ...env, NODE_UNIQUE_ID : `${id} ` };return fork (cluster.settings .exec , cluster.settings .args , { env : workerEnv, }); cluster.isMaster = false ;
Q:cluster.fork创建子进程过程中做了些什么? cluster.setupMaster
是对整个环境参数的配置;
通过createWorkerProcess
里的child_process.fork
去创建子进程;
之后将其和一个Worker
对象做关联,worker、其子进程和当前cluster(master)都会收到几乎相同的message
、exit
和disconnect
事件,Worker 这边不多扩展,可以查阅lib/internal/cluster/worker.js
;
子进程会监听internalMessage
事件,什么是internalMessage事件呢?看看下面官方的介绍。
当发送 {cmd: ‘NODE_foo’} 消息时有一种特殊情况。 cmd 属性中包含 NODE_ 前缀的消息是预留给 Node.js 内核内部使用的,将不会触发子进程的 ‘message’事件。 相反,这种消息可使用 ‘internalMessage’ 事件触发,且会被 Node.js 内部消费。 应用程序应避免使用此类消息或监听 ‘internalMessage’ 事件,因为它可能会被更改且不会通知。
此处internalMessage
事件的回调方法是internal(worker, onmessage)
,internal 是lib/internal/cluster/master.js
里的方法,主要作用是判断监听的消息里面是否存在需要执行的回调,如果没有就会执行入参回调,这里指的是onmessage ;
onmessage 里面有很多if-else语句,主要是根据cluster.child传送进来的消息类型(act
)做出不同的处理,这里列出了一个queryServer
(因为后面会介绍Net Server里多个子进程如何监听同一个端口的)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 cluster.fork = function (env ) { cluster.setupMaster (); const id = ++ids; const workerProcess = createWorkerProcess (id, env); const worker = new Worker ({ id : id, process : workerProcess, }); worker.on ("message" , function (message, handle ) { cluster.emit ("message" , this , message, handle); }); worker.process .once ("exit" , (exitCode, signalCode ) => { worker.state = "dead" ; worker.emit ("exit" , exitCode, signalCode); cluster.emit ("exit" , worker, exitCode, signalCode); }); worker.process .once ("disconnect" , () => { worker.state = "disconnected" ; worker.emit ("disconnect" ); cluster.emit ("disconnect" , worker); }); worker.process .on ("internalMessage" , internal (worker, onmessage)); process.nextTick (emitForkNT, worker); cluster.workers [worker.id ] = worker; return worker; }; function createWorkerProcess (id, env ) { const workerEnv = { ...process.env , ...env, NODE_UNIQUE_ID : `${id} ` }; return fork (cluster.settings .exec , cluster.settings .args , { cwd : cluster.settings .cwd , env : workerEnv, silent : cluster.settings .silent , windowsHide : cluster.settings .windowsHide , execArgv : execArgv, stdio : cluster.settings .stdio , gid : cluster.settings .gid , uid : cluster.settings .uid , }); } function onmessage (message, handle ) { const worker = this ; if (message.act === "queryServer" ) queryServer (worker, message); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 let seq = 0 ;function sendHelper (proc, message, handle, cb ) { if (!proc.connected ) return false ; message = { cmd : "NODE_CLUSTER" , ...message, seq }; if (typeof cb === "function" ) callbacks.set (seq, cb); seq += 1 ; return proc.send (message, handle); }
以上就是cluster.fork的大致过程,引入一个Worker和internalMessage概念,之后会用得到。cluster的child和master之间传输信息,都是通过sendHelper
方法。
Q:cluster.fork创建的子进程如何共同监听TCP端口? 解答这个问题,主要是看net
模块如何创建Server的,还有就是cluster中child
和master
如何通信的。
官方示例虽然用的是http
创建的服务,但它底层是继承的net
模块,为了方便梳理,我们从net.createServer
入手一步步查看源码,主要的逻辑从listen
开始。
Child部分: cluster.child里创建一个TCP服务,参数port
是8000,host
没有传参;
调用listenInCluster
方法,一看这名字就知道和cluster 有关;
listen
是在子进程里触发的,它会通过cluster._getServer
拼出一个act
为serverQuery的消息发送给cluster.master;
Master部分: 前文提到,master.onmessage
方法会根据消息的act不同而做出不同的处理,此处正是serverQuery
;
进入queryServer
方法,默认使用RoundRobinHandle
循环分配任务;
在RoundRobinHandle
构造函数中,会调用net.createServer
创建一个Server,由于是在cluster.master里调用的,所以会在listenInCluster
里调用server._listen2
,会new 出 TCP
(src/tcp_wrap.cc)作为句柄,并将其赋给server._handle
,至此cluster.master已经拥有了处理TCP请求的能力,不过master有该能力是不行的,还需要让child拥有才行;
RoundRobinHandle
中一旦server触发了listening
事件后,它会接管server._handle
,用distribute
重置其onconnection
方法;
distribute
的作用就是转发新的请求TCP给cluster.child,从free
列队中取出一个之前add
进来的worker(这个worker和cluster.child有关联关系),发送一个act
为newconn 的消息让其处理这个TCP;
回到Child部分: cluster.child的onconnection
收到act
为newconn 的请求后,会找到之前的创建的server,然后调用其onconnection
(child里的该方法没有被重置)方法,然后封装出一个Socket
对象,触发onnection
事件;
到此,后面的就是普通业务逻辑代码了。
下面贴上了部分相关代码,还是比较多的,细节部分我也加上了注释。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 function createServer (options, connectionListener ) { return new Server (options, connectionListener); } function Server (options, connectionListener ) { } Server .prototype .listen = function (...args ) { var backlog; if (typeof options.port === "number" || typeof options.port === "string" ) { backlog = options.backlog || backlogFromArgs; if (options.host ) { } else { listenInCluster ( this , null , options.port | 0 , 4 , backlog, undefined , options.exclusive ); } return this ; } }; function listenInCluster ( server, address, port, addressType, backlog, fd, exclusive, flags ) { exclusive = !!exclusive; if (cluster === undefined ) cluster = require ("cluster" ); if (cluster.isMaster || exclusive) { server._listen2 (address, port, addressType, backlog, fd, flags); return ; } const serverQuery = { address : address, port : port, addressType : addressType, fd : fd, flags, }; cluster._getServer (server, serverQuery, listenOnMasterHandle); function listenOnMasterHandle (err, handle ) { server._handle = handle; server._listen2 (address, port, addressType, backlog, fd, flags); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 cluster._getServer = function (obj, options, cb ) { let address = options.address ; const indexesKey = [ address, options.port , options.addressType , options.fd , ].join (":" ); let index = indexes.get (indexesKey); if (index === undefined ) index = 0 ; else index++; indexes.set (indexesKey, index); const message = { act : "queryServer" , index, data : null , ...options, }; send (message, (reply, handle ) => { if (handle) shared (reply, handle, indexesKey, cb); else rr (reply, indexesKey, cb); }); }; function rr (message, indexesKey, cb ) { if (message.errno ) return cb (message.errno , null ); var key = message.key ; function listen (backlog ) { return 0 ; } function close ( ) { if (key === undefined ) return ; send ({ act : "close" , key }); handles.delete (key); indexes.delete (indexesKey); key = undefined ; } function getsockname (out ) { if (key) Object .assign (out, message.sockname ); return 0 ; } const handle = { close, listen, ref : noop, unref : noop }; if (message.sockname ) { handle.getsockname = getsockname; } assert (handles.has (key) === false ); handles.set (key, handle); cb (0 , handle); } function onconnection (message, handle ) { const key = message.key ; const server = handles.get (key); const accepted = server !== undefined ; send ({ ack : message.seq , accepted }); if (accepted) server.onconnection (0 , handle); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 function queryServer (worker, message ) { const key = `${message.address} :${message.port} :${message.addressType} :` + `${message.fd} :${message.index} ` ; var handle = handles.get (key); if (handle === undefined ) { var constructor = RoundRobinHandle ; handle = new constructor ( key, address, message.port, message.addressType, message.fd, message.flags ); handles.set (key, handle); } handle.add (worker, (errno, reply, handle ) => { const { data } = handles.get (key); send ( worker, { errno, key, ack : message.seq , data, ...reply, }, handle ); }); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 function RoundRobinHandle (key, address, port, addressType, fd, flags ) { this .key = key; this .all = new Map (); this .free = []; this .handles = []; this .handle = null ; this .server = net.createServer (assert.fail ); if (fd >= 0 ) this .server .listen ({ fd }); else if (port >= 0 ) { this .server .listen ({ port, host : address, ipv6Only : Boolean (flags & constants.UV_TCP_IPV6ONLY ), }); } else this .server .listen (address); this .server .once ("listening" , () => { this .handle = this .server ._handle ; this .handle .onconnection = (err, handle ) => this .distribute (err, handle); this .server ._handle = null ; this .server = null ; }); } RoundRobinHandle .prototype .add = function (worker, send ) { assert (this .all .has (worker.id ) === false ); this .all .set (worker.id , worker); const done = ( ) => { if (this .handle .getsockname ) { const out = {}; this .handle .getsockname (out); send (null , { sockname : out }, null ); } else { send (null , null , null ); } this .handoff (worker); }; if (this .server === null ) return done (); this .server .once ("listening" , done); this .server .once ("error" , (err ) => { send (err.errno , null ); }); }; RoundRobinHandle .prototype .distribute = function (err, handle ) { this .handles .push (handle); const worker = this .free .shift (); if (worker) this .handoff (worker); }; RoundRobinHandle .prototype .handoff = function (worker ) { if (this .all .has (worker.id ) === false ) { return ; } const handle = this .handles .shift (); if (handle === undefined ) { this .free .push (worker); return ; } const message = { act : "newconn" , key : this .key }; sendHelper (worker.process , message, handle, (reply ) => { if (reply.accepted ) handle.close (); else this .distribute (0 , handle); this .handoff (worker); }); };
总结: cluster
利用child_process的fork方法创建子进程,并传入新的环境变量NODE_UNIQUE_ID
用于区分主子进程从而在require(‘cluster’)时候可以加载到对应的master.js
和child.js
文件。另外在默认的RoundRobinHandle
模式下,cluster
子进程之所以可以共同监听同个TCP端口,是在net
模块里面做了master和child区分,child并没有真正的监听端口,而是child会去master查询该Server是否已经存在,如果没有会在RoundRobinHandle
中创建中创建server,一旦有新的TCP连接进入,会转发给free
里的worker(cluster.child)处理。
Comments