Node.js 创建子进程的方法常用的有如下几种:
child_process
exec : 衍生一个 shell 然后在该 shell 中执行 command
,并缓冲任何产生的输出,最大缓存 1024*1024 个字节。
execFile : 函数类似exec
,但默认情况下不会衍生 shell。 相反,指定的可执行文件file
会作为新进程直接地衍生,使其比exec
稍微更高效。和exec
一样,它也有最大 1024*1204 个字节的显示缓存。
fork : 是 spawn
的一个特例,专门用于衍生新的 Node.js 进程。 与spawn
一样返回ChildProcess
对象。 返回的ChildProcess
将会内置一个额外的通信通道,允许消息在父进程和子进程之间来回传递。
spawn : 上诉的几个方法其实都是通过 spawn 实现的。
cluster
fork : 衍生出一个新的工作进程,这只能通过主进程调用。
翻翻源码看看他们怎么实现的 源码版本和之前的libuv & Node.js EventLoop (一) 一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 // libuv #define UV_VERSION_ MAJOR 1 #define UV_VERSION_ MINOR 33 #define UV_VERSION_ PATCH 1 // V8 #define V8_MAJOR_ VERSION 7 #define V8_MINOR_ VERSION 8 #define V8_BUILD_ NUMBER 279 #define V8_PATCH_ LEVEL 17 // Node.js #define NODE_MAJOR_ VERSION 14 #define NODE_MINOR_ VERSION 0 #define NODE_PATCH_ VERSION 0
child_process 源码位置
1 2 3 4 5 Node - lib - internal - child_process.js - child_process.js ;
在lib/child_process.js
文件中,定义了exec
、execFile
、fork
和spawn
等方法,它们最后都会调用在lib/internal/child_process.js
文件中的spawn
方法。
exec
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 function exec (command, options, callback ) { const opts = normalizeExecArgs (command, options, callback); return module .exports .execFile (opts.file , opts.options , opts.callback ); } function normalizeExecArgs (command, options, callback ) { if (typeof options === "function" ) { callback = options; options = undefined ; } options = { ...options }; options.shell = typeof options.shell === "string" ? options.shell : true ; return { file : command, options : options, callback : callback, }; }
从上面发现 exec 其实就是封装了参数,主要是开启shell
参数,然后调用 execFile 方法。
execFile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 function execFile (file ) { let args = []; let callback; let options; options = { encoding : "utf8" , timeout : 0 , maxBuffer : MAX_BUFFER , killSignal : "SIGTERM" , cwd : null , env : null , shell : false , ...options, }; const child = spawn (file, args, { cwd : options.cwd , env : options.env , gid : options.gid , uid : options.uid , shell : options.shell , windowsHide : !!options.windowsHide , windowsVerbatimArguments : !!options.windowsVerbatimArguments , }); var encoding; const _stdout = []; const _stderr = []; var stdoutLen = 0 ; var stderrLen = 0 ; var killed = false ; var exited = false ; var timeoutId; var ex = null ; var cmd = file; function exithandler (code, signal ) { if (exited) return ; exited = true ; if (timeoutId) { clearTimeout (timeoutId); timeoutId = null ; } if (!callback) return ; var stdout; var stderr; if (encoding || (child.stdout && child.stdout .readableEncoding )) { stdout = _stdout.join ("" ); } else { stdout = Buffer .concat (_stdout); } if (encoding || (child.stderr && child.stderr .readableEncoding )) { stderr = _stderr.join ("" ); } else { stderr = Buffer .concat (_stderr); } if (!ex && code === 0 && signal === null ) { callback (null , stdout, stderr); return ; } if (args.length !== 0 ) cmd += ` ${args.join(" " )} ` ; if (!ex) { ex = new Error ("Command failed: " + cmd + "\n" + stderr); ex.killed = child.killed || killed; ex.code = code < 0 ? getSystemErrorName (code) : code; ex.signal = signal; } ex.cmd = cmd; callback (ex, stdout, stderr); } function errorhandler (e ) { ex = e; if (child.stdout ) child.stdout .destroy (); if (child.stderr ) child.stderr .destroy (); exithandler (); } function kill ( ) { if (child.stdout ) child.stdout .destroy (); if (child.stderr ) child.stderr .destroy (); killed = true ; try { child.kill (options.killSignal ); } catch (e) { ex = e; exithandler (); } } if (options.timeout > 0 ) { timeoutId = setTimeout (function delayedKill ( ) { kill (); timeoutId = null ; }, options.timeout ); } if (child.stdout ) { if (encoding) child.stdout .setEncoding (encoding); child.stdout .on ("data" , function onChildStdout (chunk ) { const encoding = child.stdout .readableEncoding ; const length = encoding ? Buffer .byteLength (chunk, encoding) : chunk.length ; stdoutLen += length; if (stdoutLen > options.maxBuffer ) { const truncatedLen = options.maxBuffer - (stdoutLen - length); _stdout.push (chunk.slice (0 , truncatedLen)); ex = new ERR_CHILD_PROCESS_STDIO_MAXBUFFER ("stdout" ); kill (); } else { _stdout.push (chunk); } }); } if (child.stderr ) { if (encoding) child.stderr .setEncoding (encoding); child.stderr .on ("data" , function onChildStderr (chunk ) { const encoding = child.stderr .readableEncoding ; const length = encoding ? Buffer .byteLength (chunk, encoding) : chunk.length ; stderrLen += length; if (stderrLen > options.maxBuffer ) { const truncatedLen = options.maxBuffer - (stderrLen - length); _stderr.push (chunk.slice (0 , truncatedLen)); ex = new ERR_CHILD_PROCESS_STDIO_MAXBUFFER ("stderr" ); kill (); } else { _stderr.push (chunk); } }); } child.addListener ("close" , exithandler); child.addListener ("error" , errorhandler); return child; }
exec
和execFile
最大的一个区别就是参数shell
默认是否开启,其它基本都是相同的。另外,它们对输出的内容有大小限制是在child.stderr.on('data')
和child.stdout.on('data')
获取数据时候被限制。
如果不做类似的规定,_stderr
和_stdout
无限被输出,那么内存会不断的膨胀导致性能问题,甚至程序奔溃。(这是我猜的原因)
另外说到底,最后还是对spawn
方法的封装了调用。
fork
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 function fork (modulePath ) { var execArgv; var options = {}; var args = []; if (typeof options.stdio === "string" ) { options.stdio = stdioStringToArray (options.stdio , "ipc" ); } else if (!Array .isArray (options.stdio )) { options.stdio = stdioStringToArray ( options.silent ? "pipe" : "inherit" , "ipc" ); } else if (!options.stdio .includes ("ipc" )) { throw new ERR_CHILD_PROCESS_IPC_REQUIRED ("options.stdio" ); } options.execPath = options.execPath || process.execPath ; options.shell = false ; return spawn (options.execPath , args, options); }
从上面代码有个非常引人注意就是 12~22 行,fork 方法的 stdio 参数,必须带有一个 ipc 参数,这个ipc
的作用将在后续深入挖掘后介绍。最后也是调用spawn
创建子进程。
spawn
1 2 3 4 5 6 7 8 9 10 function spawn (file, args, options ) { const child = new ChildProcess (); options = normalizeSpawnArguments (file, args, options); debug ("spawn" , options); child.spawn (options); return child; }
lib/child_process.js
里的 spawn 方法就简单的将传入的参数做整理,然后直接调用 ChildProcess 实例对象的 spawn。
ChildProcess
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 function ChildProcess ( ) { EventEmitter .call (this ); this ._handle = new Process (); } ChildProcess .prototype .spawn = function (options ) { let i = 0 ; let stdio = options.stdio || "pipe" ; stdio = getValidStdio (stdio, false ); const ipc = stdio.ipc ; const ipcFd = stdio.ipcFd ; stdio = options.stdio = stdio.stdio ; const err = this ._handle .spawn (options); this .pid = this ._handle .pid ; for (i = 0 ; i < stdio.length ; i++) { const stream = stdio[i]; if (stream.handle ) { stream.socket = createSocket ( this .pid !== 0 ? stream.handle : null , i > 0 ); if (i > 0 && this .pid !== 0 ) { this ._closesNeeded ++; stream.socket .on ("close" , () => { maybeClose (this ); }); } } } this .stdin = stdio.length >= 1 && stdio[0 ].socket !== undefined ? stdio[0 ].socket : null ; this .stdout = stdio.length >= 2 && stdio[1 ].socket !== undefined ? stdio[1 ].socket : null ; this .stderr = stdio.length >= 3 && stdio[2 ].socket !== undefined ? stdio[2 ].socket : null ; this .stdio = []; for (i = 0 ; i < stdio.length ; i++) this .stdio .push (stdio[i].socket === undefined ? null : stdio[i].socket ); if (ipc !== undefined ) setupChannel (this , ipc); return err; }; function stdioStringToArray (stdio, channel ) { } function getValidStdio (stdio, sync ) { var ipc; var ipcFd; stdio = stdio.reduce ((acc, stdio, i ) => { function cleanup ( ) { for (var i = 0 ; i < acc.length ; i++) { if ((acc[i].type === "pipe" || acc[i].type === "ipc" ) && acc[i].handle ) acc[i].handle .close (); } } if (stdio === "ignore" ) { acc.push ({ type : "ignore" }); } else if (stdio === "pipe" || (typeof stdio === "number" && stdio < 0 )) { var a = { type : "pipe" , readable : i === 0 , writable : i !== 0 , }; if (!sync) a.handle = new Pipe (PipeConstants .SOCKET ); acc.push (a); } else if (stdio === "ipc" ) { ipc = new Pipe (PipeConstants .IPC ); ipcFd = i; acc.push ({ type : "pipe" , handle : ipc, ipc : true , }); } else if (stdio === "inherit" ) { acc.push ({ type : "inherit" , fd : i, }); } return acc; }, []); return { stdio, ipc, ipcFd }; }
ChildProcess
并不能直接创建新的进程,需要底层 V8 的帮助,在构造函数里面直接 new ProcessWrap 赋给了 this._handle。
ChildProcess.prototype.spawn
开始主要处理主要的stdio
参数,明确父子进程通过哪些方式来获取数据信息,官方文档给出了一些示例,如果不清楚可以多做点实验。如果是pipe
或者是ipc
都会实例化一个 Pipe 对象,只是参数类型不同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void PipeWrap::New (const FunctionCallbackInfo<Value>& args) { switch (type) { case SOCKET: provider = PROVIDER_PIPEWRAP; ipc = false ; break ; case IPC: provider = PROVIDER_PIPEWRAP; ipc = true ; break ; } new PipeWrap(env, args.This(), provider, ipc); } PipeWrap::PipeWrap(Environment* env, Local<Object> object, ProviderType provider, bool ipc) : ConnectionWrap(env, object, provider) { int r = uv_pipe_init(env->event_loop(), &handle_, ipc); } int uv_pipe_init (uv_loop_t * loop, uv_pipe_t * handle, int ipc) { uv__stream_init(loop, (uv_stream_t *)handle, UV_NAMED_PIPE); handle->shutdown_req = NULL ; handle->connect_req = NULL ; handle->pipe_fname = NULL ; handle->ipc = ipc; return 0 ; }
它们唯一的区别就是uv_pipe_t
的 ipc 参数是 true 还是 false,所有的事件都被老老实实的绑定在 libuv 上 。
回到ChildProcess.prototype.spawn
中,已经重置了 stdio 参数后,到了真正创建子进程的地方了,this._handle.spawn(options);
,通过process_wrap.cc
里的 ProcessWrap.Spawn 去创建,整个创建方法也极长,主要是对传入的参数进行处理,然后再调用uv_spawn
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 int uv_spawn (uv_loop_t * loop, uv_process_t * process, const uv_process_options_t * options) {#if defined(__APPLE__) && (TARGET_OS_TV || TARGET_OS_WATCH) return UV_ENOSYS; #else uv_signal_start(&loop->child_watcher, uv__chld, SIGCHLD); uv_rwlock_wrlock(&loop->cloexec_lock); pid = fork(); if (pid == -1 ) { err = UV__ERR(errno); uv_rwlock_wrunlock(&loop->cloexec_lock); uv__close(signal_pipe[0 ]); uv__close(signal_pipe[1 ]); goto error; } if (pid == 0 ) { uv__process_child_init(options, stdio_count, pipes, signal_pipe[1 ]); abort (); } uv_rwlock_wrunlock(&loop->cloexec_lock); uv__close(signal_pipe[1 ]); }
通过系统层面的fork
函数创建子进程,由于 fork 的特殊性,一次调用返回二次,当返回 0 的时候回执行子进程的逻辑,回去通过uv__process_child_init
初始化整个子进程的上下文信息。
再回到ChildProcess.prototype.spawn
中,遍历stdio
,如果成员有 handle 字段,就通过createSocket
为其创建一个 socket 对象
1 2 3 function createSocket (pipe, readable ) { return net.Socket ({ handle : pipe, readable, writable : !readable }); }
无论是参数stdio
是 pipe 还是 ipc 都会创建 socket,在父子进程通信的时候,父进程通过子进程暴露出来的 stdin、stdout 和 stderr 来展示子进程执行的信息,缺乏之间的数据互通性,这也是导致exec
和execFile
可使用的场景有限,而fork
会带一个 ipc 参数给 stdio 参数(可以回过去翻翻 fork 源码),所以可以执行父子进程的通信操作,比如 send 方法等,具体的实现可以看setupChannel
。
综上, Node.js 在child_process
里创建进程的流程大致梳理了下,在 JavaScript 层面并没有什么复杂的,在 libuv 层面注册了很多相关的事件,有空可以研究研究。之后会写一篇关于 cluster.fork 的介绍,其实就是对 child_process.fork 更多的封装。
Comments