redis/nginx/memcached等网络编程模型

网络编程四点

说到网络编程,就要把下面四个方面处理好。

第一是网络连接,来自客户端的连接,监听accept有收到EPOLLIN事件,或者当前服务器连接上游服务器,进行connect时返回-1,errno为EINPROGRESS,此时再收到EPOLLOUT事件就代表连接上了,因为三次握手最后是需要回复ack给上游服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
bool AsynConnect(int sock, struct sockaddr* addr,int timeout)
{
if (timeout <= 0)
{
timeout = 100;
}
int oldOpt = fcntl(sock, F_GETFL);
// 设置为异步
int newOpt = (oldOpt | O_NONBLOCK);
bool bRet = false;
if (connect(sock, addr, sizeof(struct sockaddr_in)) != 0)
{
if(EINPROGRESS == errno)
{
struct pollfd pfds[1];
pfds[0].fd = sock;
pfds[0].events = POLLOUT;
int error = -1;
socklen_t len = sizeof(error);
int ret = poll(pfds, 1, timeout);
if (ret > 0)
{
getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&error, &len);
if (0 == error)
{
bRet = true; //连接成功
}
else
{
printf("getsockopt error = %d\n", error);
}
}
else
{
printf("poll ret = %d, error = %d\n", iPollRet, errno);
}
}
else
{
bRet = true; //连接成功
}
}
return bRet;
}

第二个方面是网络断开,当客户端断开时,服务端read返回0,或者收到EPOLLRDHUP事件,如果服务端要支持半关闭状态,就关闭读端shutdown(SHUT_RD),如果不需要支持,直接close即可,大部分都是直接close,一般close前也会进行类似释放资源的操作,如果这步操作比较耗时,可以异步处理,否则导致服务端出现大量的close_wait状态。如果是服务端主动断开连接时,通过shutdown(SHUT_WR)发送FIN包给客户端,此时再调用write会返回-1,errno为EPIPE,代表写通道已经关闭。这里要注意close和shutdown的区别,close时如果fd的引用不为0,是不会真正的释放资源的,比如fd1=dup(fd2),close(fd2)不会对fd1造成影响,而shutdown则跳过了前面的引用计数检查,直接对网络进行操作,多线程多进程下用close比较好。断开连接时,如果发现接收缓冲区还有数据,直接丢弃,并回复RST包,如果是发送缓冲区还有数据,则会取消nagle进行发送,末尾加上FIN包,如果开启了SO_LINGER,则会在linger_time内拖延一段时间close,这样保证发送缓冲区的数据被对端接收到。

第三个是消息到达,如果read大于0,接收数据正常,处理对应的业务逻辑即可,如果read等于0,说明对端发送了FIN包,如果read小于0,此时要根据errno进行下一步的判断处理,如果是EWOULDBLOCK或者EAGAIN,说明接收缓冲区还没有数据,直接重试即可,如果是EINTR,说明被信号中断了,因为信号中断的优先级比系统调用高,此时也是重试read即可,如果是ETIMEDOUT,说明探活超时了,每个socket都有一个tcp_keepalive_timer,当超过tcp_keepalive_time没有进行数据交换时,开始发送探活包,如果探测失败,间隔tcp_keepalive_intvl时间发送下一次探活包,最多连续发送tcp_keepalive_probes次,如果都失败了,则关闭连接,返回ETIMEDOUT错误。这些探活都是在传输层进行的,如果应用层的进程有死锁或者阻塞,它是检测不到的,这种情况需要在应用层自行加入心跳包机制来进行检测。一般客户端与数据库之间,反向代理与服务器直接直接用探活机制就行,但数据库之间主从复制以及客户端与服务器之间需要加入心跳机制,以防进程有阻塞。

第四个是消息发送,write大于0,消息放入了发送缓冲区,write小于0,同样要分errno的情况处理,如果错误码为EWOULDBLOCK,说明发送缓冲区还装不下你要发送的数据,需要重试,如果是EINTR,说明write系统调用被信号中断了,同样进行重试处理,如果是EPIPE,说明写通道已经关闭了。

ET模式称为边缘触发模式,顾名思义,不到边缘情况,是死都不会触发的。

EPOLLOUT事件:
EPOLLOUT事件只有在连接时触发一次,表示可写,其他时候想要触发,那你要先准备好下面条件:

  1. 某次write,写满了发送缓冲区,返回错误码为EAGAIN。
  2. 对端读取了一些数据,又重新可写了,此时会触发EPOLLOUT。
    简单地说:EPOLLOUT事件只有在不可写到可写的转变时刻,才会触发一次,所以叫边缘触发,这叫法没错的!

其实,如果你真的想强制触发一次,也是有办法的,直接调用epoll_ctl重新设置一下event就可以了,event跟原来的设置一模一样都行(但必须包含EPOLLOUT),关键是重新设置,就会马上触发一次EPOLLOUT事件。

EPOLLIN事件:
EPOLLIN事件则只有当对端有数据写入时才会触发,所以触发一次后需要不断读取所有数据直到读完EAGAIN为止。否则剩下的数据只有在下次对端有写入时才能一起取出来了。
现在明白为什么说epoll必须要求异步socket了吧?如果同步socket,而且要求读完所有数据,那么最终就会在堵死在阻塞里。

一道腾讯后台开发的面试题
使用Linux epoll模型,水平触发模式;当socket可写时,会不停的触发 socket 可写的事件,如何处理?
第一种最普遍的方式:
需要向 socket 写数据的时候才把 socket 加入 epoll ,等待可写事件。
接受到可写事件后,调用 write 或者 send 发送数据。当所有数据都写完后,把 socket 移出 epoll。
这种方式的缺点是,即使发送很少的数据,也要把 socket 加入 epoll,写完后在移出 epoll,有一定操作代价。
一种改进的方式:
开始不把 socket 加入 epoll,需要向 socket 写数据的时候,直接调用 write 或者 send 发送数据。如果返回 EAGAIN,把 socket 加入 epoll,在 epoll 的驱动下写数据,全部数据发送完毕后,再出 epoll。
这种方式的优点是:数据不多的时候可以避免 epoll 的事件处理,提高效率。

udp读写

这里的读写数据部分和udp有个不同的地方,udp不能发送比MSS还大的数据,超过则发送不出去,最大MSS为MTU-20-8=1500-28=1472,20为ip包的头部,8为udp报文的头部,有的数据还会加入pppoe字段区分不同的运营商,所以MSS最大使用1400比较保险。另外udp接收时,需要接收比MSS大的数据量,否则没接完的数据会丢失。而tcp由于是流式协议,数据不一定是整体到达的,比如3000字节的数据,先到100字节,到了就可以进行recv,可以分多次recv,所以tcp有粘包问题。

常见网络io模型

网络io模型主要分这么几种,阻塞io,非阻塞io,信号驱动io,io多路复用,异步io,其中阻塞io和非阻塞io指的是内核数据准备阶段要不要阻塞等待,如果内核数据准备好了,将数据从内核拷贝至用户空间还是阻塞的,所以它们都为同步io,信号驱动io是内核数据准备好后,通过信号中断的方式来告诉用户空间,比如SIGIO,但也是需要用户主动调用系统调用函数将数据从内核空间拷贝至用户空间的,异步io就不需要用户态去调用系统调用函数将数据从内核态拷贝至用户态了,它是内核自己去把准备好的数据拷贝到用户态,然后通知用户态。目前使用最多的是io多路复用,有select/poll/epoll这些系统调用,可以监听多个io的状态,有事件时系统调用返回,再调用同步io去读取数据即可,当然我们也可以while循环调用同步io轮询看数据有没有准备好,但这样使得cpu一直空转,消耗高。

redis网络模型

目前大部分高性能网络中间件都是采用io多路复用加事件处理的机制,也就是reactor模型,redis是单reactor模型,只有一个epoll对象,主线程就是一个循环,不断的处理epoll事件,首先处理accept事件,将接入的连接绑定读事件处理函数后加入epoll中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,&server.ipfd) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
exit(1);
}
if (server.tls_port != 0 &&
listenToPort(server.tls_port,&server.tlsfd) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port);
exit(1);
}

/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
serverPanic("Unrecoverable error creating TCP socket accept handler.");
}
if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
serverPanic("Unrecoverable error creating TLS socket accept handler.");
}

// acceptCommonHandler
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}
// createClient
connSetReadHandler(conn, readQueryFromClient); // 读事件回调readQueryFromClient

等epoll检测到连接有读事件到来时,触发读事件处理函数,但这个函数并没有真正的去读数据,而是将该有读事件到来的连接放入clients_pending_read任务队列中。

1
2
3
4
// readQueryFromClient
// postponeClientRead
c->flags |= CLIENT_PENDING_READ; // 置标志,下次不会进postponeClientRead函数了
listAddNodeHead(server.clients_pending_read,c);

主线程循环到下一次epoll_wait前,再将这些任务队列中的连接分配给各个io线程本地的任务队列io_threads_list处理,在io线程里,不断对io_threads_pending[i]原子变量进行判断,看有没有值,有就代表主线程给派了任务,然后根据io_threads_op任务类型对任务进行读或写处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// beforeSleep
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();

// handleClientsWithPendingReadsUsingThreads
listRewind(server.clients_pending_read,&li); // 取出来cli放到io_threads_list
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}

// 设置标志通知io线程
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}

// 主线程也不能闲着,也要收数据
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);

// 等待io显示设置标志位完成io任务
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}

while(listLength(server.clients_pending_read)) // 主线程发现clients_pending_read还有值,也会进行解析执行

// main 主线程
// aeMain
// aeProcessEvents
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop); // 这里取出来cli放到io_threads_list,并通知io线程

/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);

// =================分界线====================

// IOThreadMain io线程
/* Give the main thread a chance to stop this thread. */
if (getIOPendingCount(id) == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}

client *c = listNodeValue(ln); // 从io_threads_list取出
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
}

// 完成后重置标志位,主线程还在等待该标志
listEmpty(io_threads_list[id]);
setIOPendingCount(id, 0);

先说读处理部分,主要读取客户端数据并进行解析命令处理,将命令读到该连接对应的querybuf中,主线程忙轮询,等待所有io线程解析命令执行命令完成,执行完命令时,将相应结果写入连接对应的buf数组中,如果放不下就放入reply链表中。然后再将各个连接的响应客户端的任务放入clients_pending_write任务队列中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// IOThreadMain io线程
nread = connRead(c->conn, c->querybuf+qblen, readlen);

processInputBuffer(c); // 解析协议并执行

// processInputBuffer
/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return;
}

// processCommandAndResetClient
// processCommand
// addReply
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyProtoToList(c,buf,len); // 放不下的放到reply里面

// _addReplyProtoToList
listAddNodeTail(c->reply, tail);
// prepareClientToWrite
// clientInstallWriteHandler
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c); // 将命令结果放入clients_pending_write中

主线程再分配给各个io线程进行写处理,将数据响应给客户端,主线程此时也是忙轮询,等待所有io线程完成,如果最后发现还有数据没发送完,就注册epollout写事件sendReplyToClient,等客户端可写时再把数据发送完。可以看出网络io相关的操作、命令执行都使用了多线程处理,全程只有一个eventLoop即相当于epollfd,这种就是单reactor模型,每个io线程都有自己的任务队列io_threads_list,所以也没有多线程竞争的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// handleClientsWithPendingWritesUsingThreads
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li); // 主线程也会发送,不能闲着
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}

// 重新积压的设置回调函数,EPOLLOUT再发
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}

nginx网络模型

nginx采用的是单reactor多进程模型,因为每个连接都是处理的无状态数据,故可以通过多进程实现,多进程之间共享epollfd,在内核2.6以前,accept还存在惊群问题,即如果有连接到来,多个进程的epoll_wait都能监测到,这样多个进程都处理了该相同的连接,这是有问题的,所以nginx采用了文件锁的方式,在ngx_process_events_and_timers函数中可以看出,哪个进程先获得了这把锁ngx_accept_mutex,就开始监听EPOLLIN事件,并进行epoll_wait接收对应的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// ngx_process_events_and_timers
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}

// ngx_trylock_accept_mutex
if (ngx_shmtx_trylock(&ngx_accept_mutex)) // ngx_accept_mutex文件锁

// #define ngx_process_events ngx_event_actions.process_events
// ngx_epoll_process_events, /* process the events */
(void) ngx_process_events(cycle, timer, flags); // 等价于ngx_epoll_process_events
events = epoll_wait(ep, event_list, (int) nevents, timer); // 锁住了整个epoll_fd
if (flags & NGX_POST_EVENTS) {
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events; // accept单独放一个队列

ngx_post_event(rev, queue); // 将事件放入队列中
} else {
rev->handler(rev);
}

ngx_event_process_posted(cycle, &ngx_posted_accept_events); // 先在锁内处理accept
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex); // 解锁,让其他进程处理epoll
}
ngx_event_process_posted(cycle, &ngx_posted_events); // 已连接的可以放在锁外处理

接收到的事件先不处理,先放入一个队列中,如果是accept事件,就放入accept对应的ngx_posted_accept_events队列中,其它事件放入另外一个ngx_posted_events队列,然后再处理accept队列的事件回调函数,到这里才能释放文件锁。
再去处理非accept队列里面的事件回调函数handler。可以看出nginx其实也是只有一个epollfd,只是被多个进程共享了,这样多个进程可以并行处理事件。

memcached网络模型

memcached是基于libevent来实现网络模型的,它是多线程多reactor模型,相比前面两种,它是有多个reactot模型的,也就是说有多个epoll进行事件循环,它也是将网络接入和网络读写io单独分离的方式,主线程主要处理网络的接入,主要看server_sockets函数,有accept事件后会调用event_handler回调函数,该事件是通过调用conn_new函数注册在主线程的event_base类型变量main_base上,所以主线程陷入事件循环后会监听accept事件。

1
2
3
4
5
6
7
8
9
10
11
12
// server_sockets
// server_socket
if (!(listen_conn_add = conn_new(sfd, conn_listening, // 初始状态为conn_listening
EV_READ | EV_PERSIST, 1,
transport, main_base, NULL))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}

// conn_new
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);

该event_handler回调函数里面做的事情就是调用drive_machine,看这个名字就知道是个状态机处理,所以accept事件来时就处理conn_listening下的事情,主要是进行accept得到客户端的sfd,然后通过round_robin算法选择一个工作线程,将该fd信息打包成CQ_ITEM放在工作线程的连接事件队列ev_queue中,最后通过pipe通知那个工作队列有连接事件过来了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// event_handler
drive_machine(c);

// drive_machine
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, // 已连接初始化状态conn_new_cmd
READ_BUFFER_CACHED, c->transport, ssl_v);
// dispatch_conn_new
if (!settings.num_napi_ids)
thread = select_thread_round_robin(); // 选择一个线程
else
thread = select_thread_by_napi_id(sfd);

item = cqi_new(thread->ev_queue);
notify_worker(thread, item); // 通知线程

// notify_worker
char buf[1] = "c";
if (write(t->notify_send_fd, buf, 1) != 1) { // pipe通知,notify_receive_fd接收
perror("Failed writing to notify pipe");
/* TODO: This is a fatal problem. Can it ever happen temporarily? */
}

// notify_receive_fd接收,回调为thread_libevent_process
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);

其实就是往pipe中发一个”c”字符,工作线程此时处理事件回调thread_libevent_process,该回调主要是从连接事件队列ev_queue中取出主线程传过来的那个item,再调用conn_new处理该item,conn_new之前主线程也是调用的这个,主要是用来绑定fd的读写事件到event_base上去,工作线程则绑定到该线程对应的那个event_base上,这个event_base对应一个epoll,所以memcached是有多个epoll,因为每个工作线程都有自己的event_base,绑定完后,后续的读写事件回调也是event_handler函数,里面再调用drive_machine函数,只是连接的状态从conn_new_cmd变成了conn_read和conn_write,这就是状态机的好处,代码逻辑很清晰。

1
2
3
4
5
6
7
8
9
10
// thread_libevent_process
ev_count = read(fd, buf, MAX_PIPE_EVENTS);
item = cq_pop(me->ev_queue);
if (item == NULL) {
return;
}
case queue_new_conn: // accept后的新连接
c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport,
me->base, item->ssl);

总的来说,主线程处理accept,工作线程处理后续的通信read和write,思路很清晰。

以上是对网络模型原理的一些介绍,并结合实际开源代码进行剖析,建议自己多看源码,结合别人的思路看我,很快就能梳理清楚,也对网络模型有了更加深刻的理解。

nephen wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!