浅析egg-cluster多进程管理模块(一)
nodejs/cluster集群模块
cluster模块采用Master-Worker模式,以主进程操控子进程的方式启动多个http或https服务器。主要需解决两个问题:
-
多个工作进程的http或https服务监听同一个端口;
-
http或https服务器与cluster内部tcp服务间的通信。
大致实现思路是: – 在工作进程http或https服务的listen方法执行时,启动主进程内部的tcp服务,置空工作进程http或https服务的listen方法;
- 主进程tcp服务”connection”事件时,促使工作进程将主进程服务发出的net#Server交由工作进程的http或https服务处理。
多进程启动服务
按DavidCai1993在《
通过源码解析 Node.js 中 cluster 模块的主要功能实现
》这篇文章中的表述,当主进程调用cluster.fork方法执行子进程脚本时,node在net模块的实现中筛分了net#Server实例listen方法的执行场景,以策略模式对不同场景予以不同处理。
cluster.fork执行场景下,当主进程的tcp服务尚未创建时,则予以创建,用以实际处理监听端口的职能;同时worker进程下启动的http/https服务,其监听端口行为也变得无效。其余场景仍采用net#Server实例listen方法原有的执行逻辑。
node@8.3.0中的表现是,根据net#Server实例listen方法的参数筛分场景,后续执行逻辑相同。个中情形又不同于DavidCai1993的说法。
node@8.3.0版本中,通过环境变量NODE_UNIQUE_ID决定cluster模块实际调用哪份脚本文件。NODE_UNIQUE_ID初始值为0,require(“cluster”)模块将调用lib/internal/cluster/master.js脚本,cluster.fork将使NODE_UNIQUE_ID自增1,并创建工作进程;随后在其他脚本文件中require(“cluster”)模块,因NODE_UNIQUE_ID为真值,将调用lib/internal/cluster/child.js脚本。如使用者require(“cluster”)模块fork工作进程时,调用的是master脚本,node内置的net模块require(“cluster”)时,调用的是child脚本。
const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master'; module.exports = require(`internal/cluster/${childOrMaster}`);
同时,master脚本的isMaster属性为真值,cluster._getServer方法也不存在;child脚本的isMaster属性为否值,存在cluster._getServer方法。对于cluster.fork工作进程中创建的net#Server实例,在其执行listen方法时,将调用cluster._getServer方法。
// node@8.3.0 lib/net.js function listenInCluster(server, address, port, addressType, backlog, fd, exclusive) { exclusive = !!exclusive; if (!cluster) cluster = require('cluster'); if (cluster.isMaster || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd); return; } const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags: 0 }; // Get the master's server handle, and listen on it cluster._getServer(server, serverQuery, listenOnMasterHandle); function listenOnMasterHandle(err, handle) { // EADDRINUSE may not be reported until we call listen(). To complicate // matters, a failed bind() followed by listen() will implicitly bind to // a random port. Ergo, check that the socket is bound to the expected // port before calling listen(). // // FIXME(bnoordhuis) Doesn't work for pipe handles, they don't have a // getsockname() method. Non-issue for now, the cluster module doesn't // really support pipes anyway. if (err === 0 && port > 0 && handle.getsockname) { var out = {}; err = handle.getsockname(out); if (err === 0 && port !== out.port) err = uv.UV_EADDRINUSE; } if (err) { var ex = exceptionWithHostPort(err, 'bind', address, port); return server.emit('error', ex); } // Reuse master's server handle server._handle = handle; // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd); } }
lib/internal/cluster/child.js脚本中,cluster.
getServer方法将促使工作进程send消息,该消息通过lib/internal/utils.js模块转化为”internalMessage”消息。内部消息的判断凭据是子对象发送的message消息对象含有cmd属性,且该属性以”NODE
“前缀起始。
// node@8.3.0 lib/internal/cluster/child.js // cb回调为listenInCluster函数中的listenOnMasterHandle函数 cluster._getServer = function(obj, options, cb) { const indexesKey = [options.address, options.port, options.addressType, options.fd ].join(':'); // ... const message = util._extend({ act: 'queryServer', index: indexes[indexesKey], data: null }, options); // ... // 工作进程发送消息,次参作为回调添加到lib/internal/utils.js模块callbacks回调队列中 send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); // ... }; // ... function send(message, cb) { return sendHelper(process, message, null, cb); } // node@8.3.0 lib/internal/cluster/utils.js // 将工作进程发送的消息转化为内部消息 // 将cb添加到callbacks中 var seq = 0; function sendHelper(proc, message, handle, cb) { if (!proc.connected) return false; // Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js message = util._extend({ cmd: 'NODE_CLUSTER' }, message); if (typeof cb === 'function') callbacks[seq] = cb; message.seq = seq; seq += 1; return proc.send(message, handle); }
而在lib/internal/cluster/master.js脚本中,cluster.fork方法执行时,将促成新创建的工作进程订阅内部消息,借由通过onmessage函数处理该内部消息。又因lib/internal/cluster/child.js模块发送的内部消息,其act属性为”queryServer”,所以在onmessage函数处理过程中,最终将调用queryServer函数。
// node@8.3.0 lib/internal/cluster/master.js cluster.fork = function(env) { cluster.setupMaster(); const id = ++ids; const workerProcess = createWorkerProcess(id, env); const worker = new Worker({ id: id, process: workerProcess }); // ... worker.process.on('internalMessage', internal(worker, onmessage)); // ... }; function onmessage(message, handle) { const worker = this; // ... else if (message.act === 'queryServer') queryServer(worker, message); // ... } // node@8.3.0 lib/internal/cluster/utils.js // 以cb回调即onmessage函数处理消息和句柄 function internal(worker, cb) { return function onInternalMessage(message, handle) { if (message.cmd !== 'NODE_CLUSTER') return; var fn = cb; // ... fn.apply(worker, arguments); }; }
关于queryServer函数,它的工作内容即是在首次fork工作进程、且在该工作进程中调用net#Server实例的listen方法时,创建cluster模块内部的tcp服务。因工作进程发送的内部消息中,其启动服务的配置项address、port、addressType、fd均由用户配置,所以内部Tcp服务只会创建一个。该tcp服务在lib/internal/cluster/round_robin_handle.js模块中通过实例化RoundRobinHandle 改造函数时创建。随后再次fork工作进程时,queryServer函数将重用缓存的RoundRobinHandle 实例,而不是创建新的tcp服务。 又因内部tcp服务在master进程中创建,而master进程的’NODE_UNIQUE_ID’环境变量为否值,该net#Server实例执行listen方法时,将以创建Tcp实例的方式赋值net#Server实例的_handle属性,再通过该Tcp实例完成端口监听。
// node@8.3.0 lib/internal/cluster/master.js function queryServer(worker, message) { // Stop processing if worker already disconnecting if (worker.exitedAfterDisconnect) return; const args = [message.address, message.port, message.addressType, message.fd, message.index]; const key = args.join(':'); var handle = handles[key]; if (handle === undefined) { var constructor = RoundRobinHandle; // UDP is exempt from round-robin connection balancing for what should // be obvious reasons: it's connectionless. There is nothing to send to // the workers except raw datagrams and that's pointless. if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } handles[key] = handle = new constructor(key, message.address, message.port, message.addressType, message.fd, message.flags); } // ... } // node@8.3.0 lib/internal/cluster/round_robin_handle.js function RoundRobinHandle(key, address, port, addressType, fd) { this.key = key; this.all = {}; this.free = []; this.handles = []; this.handle = null; this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd }); else if (port >= 0) this.server.listen(port, address); else this.server.listen(address); // UNIX socket path. // ... } // node@8.3.0 lib/net.js // 主进程启动内部tcp服务时,fd为undefined function listenInCluster(server, address, port, addressType, backlog, fd, exclusive) { exclusive = !!exclusive; if (!cluster) cluster = require('cluster'); // cluster内部tcp服务由master进程启动 if (cluster.isMaster || exclusive) { // Will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd); return; } // ... } Server.prototype._listen2 = setupListenHandle; // legacy alias // 启动tcp服务 function setupListenHandle(address, port, addressType, backlog, fd) { debug('setupListenHandle', address, port, addressType, backlog, fd); // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already'); } else { debug('setupListenHandle: create a handle'); // ... if (rval === null) rval = createServerHandle(address, port, addressType, fd); if (typeof rval === 'number') { var error = exceptionWithHostPort(rval, 'listen', address, port); process.nextTick(emitErrorNT, this, error); return; } this._handle = rval; } this[async_id_symbol] = getNewAsyncId(this._handle); this._handle.onconnection = onconnection; this._handle.owner = this; // Use a backlog of 512 entries. We pass 511 to the listen() call because // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1); // which will thus give us a backlog of 512 entries. var err = this._handle.listen(backlog || 511); // ... } // 创建Tcp实例 function createServerHandle(address, port, addressType, fd) { var err = 0; // assign handle in listen, and clean up if bind or listen fails var handle; var isTCP = false; if (typeof fd === 'number' && fd >= 0) { // ... } else if (port === -1 && addressType === -1) { // ... } else { handle = new TCP(); isTCP = true; } // ... return handle; }
每次cluster.fork工作进程,并调用net#Server实例的listen方法时,都会执行RoundRobinHandle.add方法,促使工作进程发送{ack:message.seq}内部消息。基于前述lib/internal/cluster/master.js同样一套侦听内部消息的逻辑,工作进程在侦听{ack}消息过程,将调用lib/internal/cluster/utils.js模块中存储的回调函数。其中,消息对象的ack属性,用于指定将调用哪个回调函数。该回调函数即cluster._getServer方法调用send方法时传入的次参,从而间接调用rr函数。
// node@8.3.0 lib/internal/cluster/master.js function queryServer(worker, message) { // ... // Set custom server data handle.add(worker, (errno, reply, handle) => { reply = util._extend({ errno: errno, key: key, ack: message.seq, data: handles[key].data }, reply); if (errno) delete handles[key]; // Gives other workers a chance to retry. send(worker, reply, handle); }); } // node@8.3.0 lib/internal/cluster/round_robin_handle.js RoundRobinHandle.prototype.add = function(worker, send) { assert(worker.id in this.all === false); this.all[worker.id] = worker; const done = () => { if (this.handle.getsockname) { const out = {}; this.handle.getsockname(out); // TODO(bnoordhuis) Check err. send(null, { sockname: out }, null); } else { send(null, null, null); // UNIX socket. } this.handoff(worker); // In case there are connections pending. }; if (this.server === null) return done(); // Still busy binding. this.server.once('listening', done); // ... }; // node@8.3.0 lib/internal/cluster/utils.js // 工作进程发送{ack}消息时,将触发执行utils模块中存储的回调函数 function internal(worker, cb) { return function onInternalMessage(message, handle) { // ... if (message.ack !== undefined && callbacks[message.ack] !== undefined) { fn = callbacks[message.ack]; delete callbacks[message.ack]; } fn.apply(worker, arguments); }; } // node@8.3.0 lib/internal/cluster/child.js cluster._getServer = function(obj, options, cb) { // ... send(message, (reply, handle) => { if (typeof obj._setServerData === 'function') obj._setServerData(reply.data); if (handle) shared(reply, handle, indexesKey, cb); // Shared listen socket. else rr(reply, indexesKey, cb); // Round-robin. }); // ... };
rr函数执行过程中,将构建handle={listen,close}对象,并作为次参传入lib/net.js模块中listenOnMasterHandle函数里,并调用listenOnMasterHandle函数。随着listenOnMasterHandle函数的执行,工作进程创建的net#Server实例将添加_handle属性,等到程序调用net#Server实例的_listen2方法时,因为存在_handle属性,该方法内不执行任何有意义的脚本,即不触发net#Server实例原listen方法端口监听动作。
// node@8.3.0 lib/internal/cluster/child.js // 参数cb为lib/net.js模块中的listenOnMasterHandle函数 function rr(message, indexesKey, cb) { if (message.errno) return cb(message.errno, null); var key = message.key; function listen(backlog) { // TODO(bnoordhuis) Send a message to the master that tells it to // update the backlog size. The actual backlog should probably be // the largest requested size by any worker. return 0; } function close() { // lib/net.js treats server._handle.close() as effectively synchronous. // That means there is a time window between the call to close() and // the ack by the master process in which we can still receive handles. // onconnection() below handles that by sending those handles back to // the master. if (key === undefined) return; send({ act: 'close', key }); delete handles[key]; delete indexes[indexesKey]; key = undefined; } function getsockname(out) { if (key) util._extend(out, message.sockname); return 0; } // Faux handle. Mimics a TCPWrap with just enough fidelity to get away // with it. Fools net.Server into thinking that it's backed by a real // handle. Use a noop function for ref() and unref() because the control // channel is going to keep the worker alive anyway. const handle = { close, listen, ref: noop, unref: noop }; if (message.sockname) { handle.getsockname = getsockname; // TCP handles only. } assert(handles[key] === undefined); handles[key] = handle; cb(0, handle); } // node@8.3.0 lib/net.js function listenOnMasterHandle(err, handle) { // ... // Reuse master's server handle server._handle = handle; // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addressType, backlog, fd); } Server.prototype._listen2 = setupListenHandle; // legacy alias function setupListenHandle(address, port, addressType, backlog, fd) { debug('setupListenHandle', address, port, addressType, backlog, fd); // If there is not yet a handle, we need to create one and bind. // In the case of a server sent via IPC, we don't need to do this. if (this._handle) { debug('setupListenHandle: have a handle already'); } // ... }
随后工作进程启动的http或https服务将侦测到”listening”事件,触发工作进程发送{act:”listening”}内部消息,基于前述lib/internal/cluster/master.js同样一套侦听内部消息的逻辑,通过onmessage函数将驱动执行listening函数,触发”linstening”事件,以使开发者订阅”listening”消息。
// node@8.3.0 lib/internal/cluster/child.js cluster._getServer = function(obj, options, cb) { // ... obj.once('listening', () => { cluster.worker.state = 'listening'; const address = obj.address(); message.act = 'listening'; message.port = address && address.port || options.port; // send消息通过lib/internal/cluster/utils.js模块转化为内部消息 send(message); }); }; // node@8.3.0 lib/internal/cluster/master.js function onmessage(message, handle) { const worker = this; // ... else if (message.act === 'listening') listening(worker, message); // ... } function listening(worker, message) { const info = { addressType: message.addressType, address: message.address, port: message.port, fd: message.fd }; worker.state = 'listening'; worker.emit('listening', info); cluster.emit('listening', worker, info); }
服务间的通信
nodejs/cluster模块实现中,主进程内部创建的tcp服务,当其有”connection”事件触发时(即客户端发起请求时),将执行RoundRobinHandle实例的distribute方法,取出空闲的工作进程,并促使该进程发送{act:’newconn’}内部消息。该消息在lib/internal/cluster/child.js模块中得到订阅。工作进程监听到该消息时,将通过onconnection函数执行该进程内的http或https服务的onconnection方法,传入的次参为主进程tcp服务所提供的net#Server实例(即主进程tcp.onconnenction方法的次参),为工作进程中的http或https服务建立新的tcp流;与此同时,工作进程将发送{ack:message.seq}内部消息,执行reply=>{}回调,以调用主进程tcp服务net#Server实例的close方法,促使该服务不再接受新的connection。
// node@8.3.0 lib/internal/cluster/round_robin_handle.js function RoundRobinHandle(key, address, port, addressType, fd) { // ... this.server.once('listening', () => { this.handle = this.server._handle;// lib/net.js模块中构建的Tcp实例 // 监听"connection"事件 this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; }); } RoundRobinHandle.prototype.distribute = function(err, handle) { this.handles.push(handle); const worker = this.free.shift(); if (worker) this.handoff(worker); }; RoundRobinHandle.prototype.handoff = function(worker) { if (worker.id in this.all === false) { return; // Worker is closing (or has closed) the server. } const handle = this.handles.shift(); if (handle === undefined) { this.free.push(worker); // Add to ready queue again. return; } const message = { act: 'newconn', key: this.key }; // 尾参作为工作进程发送{ack:message.seq}内部消息时待执行的函数 sendHelper(worker.process, message, handle, (reply) => { // 工作进程启动http或https服务时,关停tcp服务"connection"事件发出的net#Server if (reply.accepted) handle.close(); else this.distribute(0, handle); // Worker is shutting down. Send to another. // 嵌套调用handoff方法,若主进程tcp服务没有新的请求,将worker添加this.free中 this.handoff(worker); }); }; // node@8.3.0 lib/internal/cluster/child.js cluster._setupWorker = function() { // ... function onmessage(message, handle) { if (message.act === 'newconn') onconnection(message, handle); else if (message.act === 'disconnect') _disconnect.call(worker, true); } }; // Round-robin connection. function onconnection(message, handle) { const key = message.key; const server = handles[key]; const accepted = server !== undefined; send({ ack: message.seq, accepted }); if (accepted) server.onconnection(0, handle); }
简易流程回顾
cluster.fork过程
-
cluster.fork工作进程执行脚本
-
工作进程执行httpServer.listen
-
转交listenInCluster函数处理,执行cluster._getServer
-
工作进程发送{act:”queryServer”},由订阅该消息触发执行queryServer函数,将rr函数添加到回调队列中
-
实例化RoundRobinHandle,创建内部tcp服务
-
调用RoundRobinHandle实例的add方法
-
工作进程发送{ack:message.seq}内部消息,由订阅该消息触发执行rr函数,构建handle={listen,close}对象
-
内部tcp服务监听listening事件,将connection交给工作进程转发给httpServer
-
-
执行listenInCluster函数中的子函数listenOnMasterHandle
- 为工作进程启动的httpServer添加_handle属性,由于_handle属性为真,置空httpServer._listen2
-
请求处理过程
-
接受新的请求,内部tcp服务调用tcpServer._handle.onconnection方法
-
调用RoundRobinHandle实例的distribute方法,取出空闲的进程
-
调用RoundRobinHandle实例的handoff方法,工作进程发送{act:’newconn’}内部消息,将handoff方法添加回调队列中
-
执行订阅{act:’newconn’}内部消息的绑定函数onconnection
-
工作进程的httpServer服务根据主进程的net#Server实例,发起新的tcp流,处理请求
-
工作进程发送{ack:message.seq}内部消息,由订阅该消息触发执行handoff方法,调用net#Server实例close方法,不在接受新的请求
-
嵌套调用handoff方法,将工作进程置为空闲
使用案列,来自《
解读Nodejs多核处理模块cluster
》
var cluster = require('cluster'); var http = require('http'); var numCPUs = require('os').cpus().length; if (cluster.isMaster) { console.log("master start..."); // Fork workers. for (var i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('listening',function(worker,address){ console.log('listening: worker ' + worker.process.pid +', Address: '+address.address+":"+address.port); }); cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } else { http.createServer(function(req, res) { console.log('worker'+cluster.worker.id); res.writeHead(200); res.end("hello world\n"); }).listen(8080,"127.0.0.1"); }
可借鉴处
复杂事件系统的设计,可由send发送的消息args影响订阅函数on的职能。
按nodejs/cluster模块,send函数的次参将作为钩子函数缓存起来,等到发送{ack}类消息时,再取出相应的回调函数执行;send函数发送的消息对象若含有cmd属性,且前缀为”NODE_”,将视为内部消息。