sensai2.0

319
15

之前曾经简单的过了一遍《深入浅出Node.js》这本书,因为自感Node.js与多进程没啥太多关联,并且有pm2这种神器来完成多核部署,所以对进程一章没有深入研究。

最近Team搞读书会,每人选一章节,正好random到了进程一章,于是详细的读了一番。

然后发现一些看上去难以理解的代码,如下:

slave进程:

const http = require('http')                         
const server = http.createServer((req, res) => {     
  res.writeHead(200, {'Content-Type': 'text/plain'}) 
  res.end(`handled by child, pid is ${process.pid}`) 
})                                                   
                                                     
process.on('message', (message, handle) => {         
  handle.on('connection', socket => {                
    server.emit('connection', socket)                
  })                                                 
})                                                   
master进程:

const net = require('net')                         
const cp = require('child_process')                
const os = require('os')                           
                                                   
const children = []                                
const cpus = os.cpus().length                      
for (let i = 0; i < cpus; i++) {                   
  children.push(cp.fork('./worker.js'))            
}                                                  
                                                   
const server = net.createServer()                  
server.listen('9999', () => {                      
  children.forEach(child => child.send('', server))
  server.close()                                   
})                                                 
关于进程创建和进程通讯相关的代码没什么可说的,node已经封装到了极致。

写代码的人不需要关心当前是什么平台,只需要cp.fork('./worker.js'),在windows上会自动帮你CreateProcess,类unix上会自动fork;也只需要child.send(...),那么node会把...序列化后在windows上通过Named Pipe,类unix上通过unix domain socket,自动把数据带给子进程。

上述代码中最匪夷所思的就是master中调用了一个net.CreateServer,然后通过send第二个参数带给slave后关闭,然后slave中竟然自己又调用了http.CreateServer,然后在第二个参数handler触发connection事件的时候再把socket带给http server...

运行一下master然后写一个大循环curl一下...结果发现请求竟然随机的被各个worker接管了,真shi高级。

-> % for ((i=0;i<10;i++)) do   
  curl localhost:9999
done;
handled by child, pid is 73200
handled by child, pid is 73201
handled by child, pid is 73201
handled by child, pid is 73201
handled by child, pid is 73201
handled by child, pid is 73200
handled by child, pid is 73199
handled by child, pid is 73200
handled by child, pid is 73201
handled by child, pid is 73200

书上关于这段代码的解释是这样的:因为Node对每个socket设置了SO_REUSEADDR的选项,所以支持不同的进程监听同一个端口号。但是搜索了一下,发现SO_REUSEADDR这个选项可以支持不同ip到相同端口的绑定,但是肯定不能支持同ip同端口的绑定。

于是我尝试写了段代码验证一下,如下所示:

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>

const int PROCESSES = 2;

int subpids[2];
int addresses[2];
int server_sock_fd, sock_fd;
struct sockaddr_in serv_addr;
struct sockaddr client_addr;
int client_len = sizeof(client_addr);

void on_child_sigint() 
{
    printf("subprocess %d exit!\n", getpid());
    exit(0);
}

void on_sigint() 
{
    printf("receving SIGINT...\n");
    for (int i = 0; i < PROCESSES; i++) {
        if (subpids[i] != 0) {
            kill(subpids[i], SIGINT);
        }
    }
    printf("master process exit!\n");
    exit(0);
}

void on_sigchild() 
{
	pid_t pid; 
	int   stat; 

    while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
        printf("child %d terminated\n", pid); 
    }
	return; 
}

void echo_back(int sock_fd) 
{
	int n;
	char buffer[256];

	n = read(sock_fd, buffer, 256);
	while (n) {
		write(sock_fd, buffer, n);
		bzero(buffer, 256);
		n = read(sock_fd, buffer, 256);
	}
	close(sock_fd);
}

int create_server(int i) 
{
	serv_addr.sin_family = AF_INET;
	serv_addr.sin_port = htons(9998);
	serv_addr.sin_addr.s_addr = addresses[i];
	bzero(&(serv_addr.sin_zero), 8);
	
	server_sock_fd = socket(AF_INET, SOCK_STREAM, 0);
        if (server_sock_fd == -1) {
		perror("open error");
		return -1;
	}
    int optvalue = 1;
    if (-1 == setsockopt(server_sock_fd, SOL_SOCKET, SO_REUSEADDR, 
            &optvalue, sizeof(optvalue))) {
        perror("setsocketopt error");
        return -1;
    }

    if (-1 == bind(server_sock_fd, (struct sockaddr *)&serv_addr, 
            sizeof(serv_addr))) {
        perror("bind error");
        return -2;
    }
    if (-1 == listen(server_sock_fd, 1)) {
        perror("listen error");
        return -3;
    }
    while (1) {
        sock_fd = accept(server_sock_fd, (struct sockaddr *)&client_addr, 
                (socklen_t *)&client_len);
        struct sockaddr_in *p_clientaddr = (struct sockaddr_in *)&client_addr;
        printf("ip: %s connected by %d\n", inet_ntoa(p_clientaddr->sin_addr), i);
        echo_back(sock_fd);
        printf("ip: %s disconnected!\n", inet_ntoa(p_clientaddr->sin_addr));
        close(sock_fd);
    }
}

int main(void)
{
    addresses[0] = inet_addr("127.0.0.1");
    addresses[1] = inet_addr("0.0.0.0");

    int pid = 0;
    for (int i = 0; i < PROCESSES; i++) {
        pid = fork();
        if (pid == 0) {
            printf("sub process %d created!\n", i);
            signal(SIGINT, on_child_sigint);
            return create_server(i);
        } else if (pid > 0) {
            signal(SIGCHLD, on_sigchild);
            signal(SIGINT, on_sigint);
            subpids[i] = pid;
        }
    }

    printf("set up finished!\n");
    while (1) {
        sleep(1000);
    }
	return 0;
}

程序创建了两个进程,分别创建两个socket监听127.0.0.1:9998和0.0.0.0:9998两个地址,并且两个socket都通过setsockopt来设置SO_REUSEADDR,果然可以绑定成功。

但是,如果把两个进程下socket监听的地址都改成127.0.0.1:9998的话,就失败了!

所以书上说的有一些问题,并不是因为设置了SO_REUSEADDR这个选项所以多个进程可以共享一个ip:port,而且根据Node文档所述,实际上Node中所有的socket都是开启了SO_REUSEADDR的选项的,如下:


所以,我认为开头那段代码实际上只有master开启了一个socket而已,然后通过send的第二个参数handler将socket对应的文件描述符传给slaves,那么所有的slave进程共同监听一个socket文件描述符,这种case下无论有没有SO_REUSEADDR都无所谓了。

那么上面验证的代码简单的修改一下,在主进程中执行完socket->bind->listen后fork子进程,这时socket文件描述符会带给所有的子进程,然后子进程中进行accept及后续操作。

变动的部分代码如下:

int create_socket_fd() 
{
	serv_addr.sin_family = AF_INET;
	serv_addr.sin_port = htons(9998);
	serv_addr.sin_addr.s_addr = INADDR_ANY;
	bzero(&(serv_addr.sin_zero), 8);
	
	server_sock_fd = socket(AF_INET, SOCK_STREAM, 0);
        if (server_sock_fd == -1) {
		perror("open error");
		return -1;
	}

    int optvalue = 1;
    if (-1 == setsockopt(server_sock_fd, SOL_SOCKET, SO_REUSEADDR, 
            &optvalue, sizeof(optvalue))) {
        perror("setsocketopt error");
        return -1;
    }

    if (-1 == bind(server_sock_fd, (struct sockaddr *)&serv_addr, 
            sizeof(serv_addr))) {
        perror("bind error");
        return -2;
    }
    if (-1 == listen(server_sock_fd, 1)) {
        perror("listen error");
        return -3;
    }
    return 0;
}

int main(void)
{
    if (0 != create_socket_fd()) {
        perror("init socket fail!");
        return 0;
    }

    int pid = 0;
    for (int i = 0; i < PROCESSES; i++) {
        pid = fork();
        if (pid == 0) {
            printf("sub process %d created!\n", i);
            signal(SIGINT, on_child_sigint);
            while (1) {
                sock_fd = accept(server_sock_fd, (struct sockaddr *)&client_addr, 
                        (socklen_t *)&client_len);
                struct sockaddr_in *p_clientaddr = (struct sockaddr_in *)&client_addr;
                printf("ip: %s connected by %d\n", inet_ntoa(p_clientaddr->sin_addr), i);
                echo_back(sock_fd);
                printf("ip: %s disconnected!\n", inet_ntoa(p_clientaddr->sin_addr));
                close(sock_fd);
            }
        } else if (pid > 0) {
            signal(SIGCHLD, on_sigchild);
            signal(SIGINT, on_sigint);
            subpids[i] = pid;
        }
    }

    printf("set up finished!\n");
    while (1) {
        sleep(1000);
    }
	return 0;
}

同样写一个循环测试一下:

-> % for ((i=0;i<10;i++)) do            
for>    echo 'test' | nc 127.0.0.1 9998 
for> done;                              
与开头那段代码不同的是,accept到连接的进程是一个一个轮着来的,而不像上面代码是随机accept的,这是一个疑点...

ip: 127.0.0.1 connected by 0
ip: 127.0.0.1 disconnected!
ip: 127.0.0.1 connected by 1
ip: 127.0.0.1 disconnected!
ip: 127.0.0.1 connected by 0
ip: 127.0.0.1 disconnected!
ip: 127.0.0.1 connected by 1
ip: 127.0.0.1 disconnected!
ip: 127.0.0.1 connected by 0
ip: 127.0.0.1 disconnected!
ip: 127.0.0.1 connected by 1
ip: 127.0.0.1 disconnected!
ip: 127.0.0.1 connected by 0
ip: 127.0.0.1 disconnected!
ip: 127.0.0.1 connected by 1
ip: 127.0.0.1 disconnected!
ip: 127.0.0.1 connected by 0
ip: 127.0.0.1 disconnected!
ip: 127.0.0.1 connected by 1
ip: 127.0.0.1 disconnected!

最终的真相就是只有一个socket file descriptor,而这个socket fd给多个进程共同使用,至于哪个子进程accept到连接的锁机制,大概就是os层来实现的了。

那么说明了另外一个问题,就是http.createServer和net.createServer这两个方法并没有真正的打开一个socket连接,这和之前的理解还是有很大的偏差的。

实地debug一下,原来http.createServer只是new了一个http.Server对象,http.Server对象又继承于net.Server,而net.Server这个对象的构造方法是没有任何native的实现,完全由js写的。

http.Server构造方法:


net.Server构造方法:


实际上,net.Server初始化的时候只是绑定了若干个事件,并且给this.handle赋值为null,这个this.handle对应的应该就是native层的socket文件描述符。而http.Server在构造的时候也仅仅只是在net.Server的基础上绑了request handler方法(就是日常写的中间件方法)

所以只有在listen方法执行后,socket才被真正的init:


可以看到listen方法考虑的还是很周全的,不仅可以listen随机端口,ip:port,还支持传入管道名称等等。

而且内部的listen方法里面考虑到了cluster模块​的判断(cluster模块也在进程这一章节提到,可以看成是开头一段代码的封装,cluster内部执行http.createServer时会判断如果当前ip:port已有进程监听,那么会自动取到监听的socket fd,否则才新建socket)。


listen方法会根据当前监听的类型来new原生的TCP类或Pipe类。


最终TCP类中调用libuv来实现异步的bind,listen,accept:


listen完成之后,那么server对象里面就已经有了socket文件描述符了

这时再通过send的第二个参数将socket传给slaves,然后slave将这个socket fd赋给自己的http.Server。

child_process的send方法也是非常有趣的,第一个参数message功能太有限,

可能是写node的人为了支持主从模式,特意加了第二个参数handler​用以支持native层文件描述符的传递。

这个handler可以支持5种类型,经过特殊序列化后通过管道发送给slaves:


至此,个人对这段代码的实现原理算是理解的差不多了。

不经感慨....node对底层的封装真是太彻底了......


回复

对话列表

×