Nodejs cluster 模块深入探究

浏览:
字体:
发布时间:2017-10-26 18:11:42
来源:

本文作者: 东方联盟 - 欲休 。未经作者许可,禁止转载!
欢迎加入伯乐在线 专栏作者。### 由表及里 HTTP服务器用于响应来自客户端的请求,当客户端请求数逐渐增大时服务端的处理机制有多种,如tomcat的多线程、nginx的事件循环等。而对于node而言,由于其也采用事件循环和异步I/O机制,因此在高I/O并发的场景下性能非常好,但是由于单个node程序仅仅利用单核cpu,因此为了更好利用系统资源就需要fork多个node进程执行HTTP服务器逻辑,所以node内建模块提供了child_process和cluster模块。 利用childprocess模块,我们可以执行shell命令,可以fork子进程执行代码,也可以直接执行二进制文件;利用cluster模块,使用node封装好的API、IPC通道和调度机可以非常简单的创建包括一个master进程下HTTP代理服务器 + 多个worker进程多个HTTP应用服务器的架构,并提供两种调度子进程算法。本文主要针对cluster模块讲述node是如何实现简介高效的服务集群创建和调度的。那么就从代码进入本文的主题: code1

1234567891011121314151617181920212223242526272829303132333435 const cluster = require('cluster');const http = require('http'); if (cluster.isMaster) {   let numReqs = 0;  setInterval(() => {    console.log(<code>numReqs = ${numReqs}</code>);  }, 1000);   function messageHandler(msg) {    if (msg.cmd && msg.cmd === 'notifyRequest') {      numReqs += 1;    }  }   const numCPUs = require('os').cpus().length;  for (let i = 0; i < numCPUs; i++) {    cluster.fork();  }   for (const id in cluster.workers) {    cluster.workers[id].on('message', messageHandler);  } } else {   // Worker processes have a http server.  http.Server((req, res) => {    res.writeHead(200);    res.end('hello world/n');     process.send({ cmd: 'notifyRequest' });  }).listen(8000);}


主进程创建多个子进程,同时接受子进程传来的消息,循环输出处理请求的数量; 子进程创建http服务器,侦听8000端口并返回响应。 泛泛的大道理谁都了解,可是这套代码如何运行在主进程和子进程中呢?父进程如何向子进程传递客户端的请求?多个子进程共同侦听8000端口,会不会造成端口reuse error?每个服务器进程最大可有效支持多少并发量?主进程下的代理服务器如何调度请求? 这些问题,如果不深入进去便永远只停留在写应用代码的层面,而且不了解cluster集群创建的多进程与使用child_process创建的进程集群的区别,也写不出符合业务的最优代码,因此,深入cluster还是有必要的。 ## cluster与net cluster模块与net模块息息相关,而net模块又和底层socket有联系,至于socket则涉及到了系统内核,这样便由表及里的了解了node对底层的一些优化配置,这是我们的思路。介绍前,笔者仔细研读了node的js层模块实现,在基于自身理解的基础上诠释上节代码的实现流程,力图做到清晰、易懂,如果有某些纰漏也欢迎读者指出,只有在互相交流中才能收获更多。 ### 一套代码,多次执行 很多人对code1代码如何在主进程和子进程执行感到疑惑,怎样通过_cluster.isMaster判断语句内的代码是在主进程执行,而其他代码在子进程执行呢? 其实只要你深入到了node源码层面,这个问题很容易作答。cluster模块的代码只有一句:

123 module.exports = ('NODE<em>UNIQUE_ID' in process.env) ?                  require('internal/cluster/child') :                  require('internal/cluster/master');</em>


只需要判断当前进程有没有环境变量“NODE_UNIQUE_ID”就可知道当前进程是否是主进程;而变量“NODE_UNIQUE_ID”则是在主进程fork子进程时传递进去的参数,因此采用cluster.fork创建的子进程是一定包含“NODE_UNIQUE_ID”的。 这里需要指出的是,必须通过cluster.fork创建的子进程才有NODE_UNIQUE_ID变量,如果通过child_process.fork的子进程,在不传递环境变量的情况下是没有NODE_UNIQUE_ID的。因此,当你在child_process.fork的子进程中执行cluster.isMaster判断时,返回 true。 ### 主进程与服务器 code1中,并没有在cluster.isMaster的条件语句中创建服务器,也没有提供服务器相关的路径、端口和fd,那么主进程中是否存在TCP服务器,有的话到底是什么时候怎么创建的? 相信大家在学习nodejs时阅读的各种书籍都介绍过在集群模式下,主进程的服务器会接受到请求然后发送给子进程,那么问题就来到主进程的服务器到底是如何创建呢?主进程服务器的创建离不开与子进程的交互,毕竟与创建服务器相关的信息全在子进程的代码中。 当子进程执行

123456 http.Server((req, res) => {    res.writeHead(200);    res.end('hello world/n');     process.send({ cmd: 'notifyRequest' });  }).listen(8000);


时,http模块会调用net模块(确切的说,http.Server继承net.Server),创建net.Server对象,同时侦听端口。创建net.Server实例,调用构造函数返回。创建的net.Server实例调用listen(8000),等待accpet连接。那么,子进程如何传递服务器相关信息给主进程呢?答案就在listen函数中。我保证,net.Server.prototype.listen函数绝没有表面上看起来的那么简单,它涉及到了许多IPC通信和兼容性处理,可以说HTTP服务器创建的所有逻辑都在listen函数中。 > 延伸下,在学习linux下的socket编程时,服务端的逻辑依次是执行socket(),bind(),listen()和accept(),在接收到客户端连接时执行read(),write()调用完成TCP层的通信。那么,对应到node的net模块好像只有listen()阶段,这是不是很难对应socket的四个阶段呢?其实不然,node的net模块把“bind,listen”操作全部写入了net.Server.prototype.listen中,清晰的对应底层socket和TCP三次握手,而向上层使用者只暴露简单的listen接口。 code2

1234567891011121314151617181920212223242526272829303132333435363738 Server.prototype.listen = function() {   ...   // 根据参数创建 handle句柄  options = options._handle || options.handle || options;  // (handle[, backlog][, cb]) where handle is an object with a handle  if (options instanceof TCP) {    this._handle = options;    this[async_id_symbol] = this._handle.getAsyncId();    listenInCluster(this, null, -1, -1, backlogFromArgs);    return this;  }   ...   var backlog;  if (typeof options.port === 'number' || typeof options.port === 'string') {    if (!isLegalPort(options.port)) {      throw new RangeError('"port" argument must be >= 0 and < 65536');    }    backlog = options.backlog || backlogFromArgs;    // start TCP server listening on host:port    if (options.host) {      lookupAndListen(this, options.port | 0, options.host, backlog,                      options.exclusive);    } else { // Undefined host, listens on unspecified address      // Default addressType 4 will be used to search for master server      listenInCluster(this, null, options.port | 0, 4,                      backlog, undefined, options.exclusive);    }    return this;  }   ...   throw new Error('Invalid listen argument: ' + util.inspect(options));};


由于本文只探究cluster模式下HTTP服务器的相关内容,因此我们只关注有关TCP服务器部分,其他的Pipe(domain socket)服务不考虑。 listen函数可以侦听端口、路径和指定的fd,因此在listen函数的实现中判断各种参数的情况,我们最为关心的就是侦听端口的情况,在成功进入条件语句后发现所有的情况最后都执行了listenInCluster函数而返回,因此有必要继续探究。 code3

1234567891011121314151617181920212223 function listenInCluster(server, address, port, addressType,                         backlog, fd, exclusive) {   ...   if (cluster.isMaster || exclusive) {    server._listen2(address, port, addressType, backlog, fd);    return;  }   // 后续代码为worker执行逻辑  const serverQuery = {    address: address,    port: port,    addressType: addressType,    fd: fd,    flags: 0  };   ...    cluster._getServer(server, serverQuery, listenOnMasterHandle);}


listenInCluster函数传入了各种参数,如server实例、ip、port、ip类型(IPv6和IPv4)、backlog(底层服务端socket处理请求的最大队列)、fd等,它们不是必须传入,比如创建一个TCP服务器,就仅仅需要一个port即可。 简化后的listenInCluster函数很简单,cluster模块判断当前进程为主进程时,执行_listen2函数;否则,在子进程中执行cluster._getServer函数,同时像函数传递serverQuery对象,即创建服务器需要的相关信息。 因此,我们可以大胆假设,子进程在cluster._getServer函数中向主进程发送了创建服务器所需要的数据,即serverQuery。实际上也确实如此: code4

12345678910111213141516171819 cluster._getServer = function(obj, options, cb) {   const message = util._extend({    act: 'queryServer',    index: indexes[indexesKey],    data: null  }, options);   send(message, function modifyHandle(reply, handle) => {    if (typeof obj._setServerData === 'function')      obj._setServerData(reply.data);     if (handle)      shared(reply, handle, indexesKey, cb);  // Shared listen socket.    else      rr(reply, indexesKey, cb);              // Round-robin.  }); };


子进程在该函数中向已建立的IPC通道发送内部消息message,该消息包含之前提到的serverQuery信息,同时包含act: ‘queryServer’字段,等待服务端响应后继续执行回调函数modifyHandle。 主进程接收到子进程发送的内部消息,会根据act: ‘queryServer’执行对应queryServer方法,完成服务器的创建,同时发送回复消息给子进程,子进程执行回调函数modifyHandle,继续接下来的操作。 至此,针对主进程在cluster模式下如何创建服务器的流程已完全走通,主要的逻辑是在子进程服务器的listen过程中实现。 ### net模块与socket 上节提到了node中创建服务器无法与socket创建对应的问题,本节就该问题做进一步解释。在net.Server.prototype.listen函数中调用了listenInCluster函数,listenInCluster会在主进程或者子进程的回调函数中调用_listen2函数,对应底层服务端socket建立阶段的正是在这里。

123456789101112131415161718192021 function setupListenHandle(address, port, addressType, backlog, fd) {   // worker进程中,_handle为fake对象,无需创建  if (this._handle) {    debug('setupListenHandle: have a handle already');  } else {    debug('setupListenHandle: create a handle');     if (rval === null)      rval = createServerHandle(address, port, addressType, fd);     this._handle = rval;  }   this[async_id_symbol] = getNewAsyncId(this._handle);   this._handle.onconnection = onconnection;   var err = this._handle.listen(backlog || 511); }


通过createServerHandle函数创建句柄(句柄可理解为用户空间的socket),同时给属性onconnection赋值,最后侦听端口,设定backlog。 那么,socket处理请求过程“socket(),bind()”步骤就是在createServerHandle完成。

1234567891011121314151617181920 function createServerHandle(address, port, addressType, fd) {  var handle;   // 针对网络连接,绑定地址  if (address || port || isTCP) {    if (!address) {      err = handle.bind6('::', port);      if (err) {        handle.close();        return createServerHandle('0.0.0.0', port);      }    } else if (addressType === 6) {      err = handle.bind6(address, port);    } else {      err = handle.bind(address, port);    }  }   return handle;}


在createServerHandle中,我们看到了如何创建socket(createServerHandle在底层利用node自己封装的类库创建TCP handle),也看到了bind绑定ip和地址,那么node的net模块如何接收客户端请求呢? 必须深入c++模块才能了解node是如何实现在c++层面调用js层设置的onconnection回调属性,v8引擎提供了c++和js层的类型转换和接口透出,在c++的tcp_wrap中:

1234567891011 void TCPWrap::Listen(const FunctionCallbackInfo& args) {  TCPWrap* wrap;  ASSIGN_OR_RETURN_UNWRAP(&wrap,                          args.Holder(),                          args.GetReturnValue().Set(UV_EBADF));  int backloxxg = args[0]->Int32Value();  int err = uv_listen(reinterpret_cast(&wrap->handle),                      backlog,                      OnConnection);  args.GetReturnValue().Set(err);}


我们关注uvlisten函数,它是libuv封装后的函数,传入了**handle,backlog和OnConnection回调函数,其中handle_为node调用libuv接口创建的socket封装,OnConnection函数为socket接收客户端连接时执行的操作。我们可能会猜测在js层设置的onconnction函数最终会在OnConnection中调用,于是进一步深入探查node的connection_wrap c++模块:

12345678910111213 template void ConnectionWrap::OnConnection(uv_stream_t* handle,                                                    int status) {   if (status == 0) {    if (uv_accept(handle, client_handle))      return;     // Successful accept. Call the onconnection callback in JavaScript land.    argv[1] = client_obj;  }  wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv);}


过滤掉多余信息便于分析。当新的客户端连接到来时,libuv调用OnConnection,在该函数内执行uv_accept接收连接,最后将js层的回调函数onconnection[通过env->onconnection_string()获取js的回调]和接收到的客户端socket封装传入MakeCallback中。其中,argv数组的第一项为错误信息,第二项为已连接的clientSocket封装,最后在MakeCallback中执行js层的onconnection函数,该函数的参数正是argv数组传入的数据,“错误代码和clientSocket封装”。 js层的onconnection回调

1234567891011121314151617 function onconnection(err, clientHandle) {  var handle = this;   if (err) {    self.emit('error', errnoException(err, 'accept'));    return;  }   var socket = new Socket({    handle: clientHandle,    allowHalfOpen: self.allowHalfOpen,    pauseOnCreate: self.pauseOnConnect  });  socket.readable = socket.writable = true;   self.emit('connection', socket);}


这样,node在C++层调用js层的onconnection函数,构建node层的socket对象,并触发connection事件,完成底层socket与node net模块的连接与请求打通。 至此,我们打通了socket连接建立过程与net模块(js层)的流程的交互,这种封装让开发者在不需要查阅底层接口和数据结构的情况下,仅使用node提供的http模块就可以快速开发一个应用服务器,将目光聚集在业务逻辑中。 > backlog是已连接但未进行accept处理的socket队列大小。在linux 2.2以前,backlog大小包括了半连接状态和全连接状态两种队列大小。linux 2.2以后,分离为两个backlog来分别限制半连接SYN_RCVD状态的未完成连接队列大小跟全连接ESTABLISHED状态的已完成连接队列大小。这里的半连接状态,即在三次握手中,服务端接收到客户端SYN报文后并发送SYN+ACK报文后的状态,此时服务端等待客户端的ACK,全连接状态即服务端和客户端完成三次握手后的状态。backlog并非越大越好,当等待accept队列过长,服务端无法及时处理排队的socket,会造成客户端或者前端服务器如nignx的连接超时错误,出现“error: Broken Pipe”**。因此,node默认在socket层设置backlog默认值为511,这是因为nginx和redis默认设置的backlog值也为此,尽量避免上述错误。 ###

 

>更多相关文章
24小时热门资讯
24小时回复排行
资讯 | QQ | 安全 | 编程 | 数据库 | 系统 | 网络 | 考试 | 站长 | 关于东联 | 安全雇佣 | 搞笑视频大全 | 微信学院 | 视频课程 |
关于我们 | 联系我们 | 广告服务 | 免责申明 | 作品发布 | 网站地图 | 官方微博 | 技术培训
Copyright © 2007 - 2024 Vm888.Com. All Rights Reserved
粤公网安备 44060402001498号 粤ICP备19097316号 请遵循相关法律法规
');})();