epoll服务器反应堆模型

常规的epoll处理

epoll是io多路复用的一种实现方式,最开始我们使用epoll是对多个fd进行管理,当epoll_wait从内核的rdllist就绪链表中取出一定数量的poll_event时,我们可以根据fd进行相应的处理,比如是listenfd的话,就进行accept操作,再将返回的clientfd通过epoll_ctl加入到内核的红黑树中进行EPOLLIN的监听,如果不是listenfd,说明是clientfd需要被处理,再判断是EPOLLIN还是EPOLLOUT,分别做recv和send的处理,可以看出在这个过程中,需要对fd做出处理,判断是否是listenfd,因为是listenfd的话说明是有新的连接进来,需要做连接接入处理,而是其他fd的话,就要判断是哪种事件了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int epoll_fd = epoll_create(EPOLL_CLIENT);
while (true) {
int nready = epoll_wait(epoll_fd, epoll_events);
for (int i = 0; i < nready; i++) {
if (epoll_events[i].fd == listenerfd) { // 先判断文件描述符
struct sockaddr clientaddr;
int clientfd = accept(epoll_events[i].fd, (struct sockaddr *)&clientaddr, sizeof(clientaddr));
epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = clientfd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);
} else {
char buf[1024];
int n = read(epoll_events[i].fd, buf, sizeof(buf));
printf("receive: %s\n", buf);
}
}
}

当然也可以这样实现,epoll_wait返回后,首先对事件进行判断,看是EPOLLIN还是EPOLLOUT,如果是EPOLLIN的话,还要判断是不是listenfd,因为listenfd也是EPOLLIN事件,不是的话就说明是有传输数据进来,需要recv处理,而对EPOLLOUT事件,说明发送缓冲区可以发送数据了,进行send即可。这种实现方式还是需要对fd进行判断,有没有更优雅的使用方法呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int epoll_fd = epoll_create(EPOLL_CLIENT);
while (true) {
int nready = epoll_wait(epoll_fd, epoll_events);
for (int i = 0; i < nready; i++) {
if (epoll_events[i].events & EPOLLIN) { // 先判断事件
if (epoll_events[i].fd == listenerfd) {
struct sockaddr clientaddr;
int clientfd = accept(epoll_events[i].fd, (struct sockaddr *)&clientaddr, sizeof(clientaddr));
epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = clientfd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);
} else {
char buf[1024];
int n = read(epoll_events[i].fd, buf, sizeof(buf));
printf("receive: %s\n", buf);
}
}
}
}

epoll事件模型的底层原理

这就是这篇文章要说到的reactor反应堆模型了,它是一种对事件进行管理的模式,先来看看是怎么实现的,epoll_event是我们提供epoll_ctl需要传递给内核的结构体数据,它有一个uint32_t 的events和epoll_data类型的data,epoll_data是一个union联合体,它有一个fd整形和void *ptr指针(内核中是一个64位的变量)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// glibc
/* sys/epoll.h */
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;
// 内核
struct epoll_event {
__poll_t events;
__u64 data;
} EPOLL_PACKED;

重点关注这两个,也就是说,我们可以通过epoll_event传递给内核fd或者是ptr,这些传递进去的fd或者ptr只是为了和events绑定在一起,后面通过epoll_wait取的数据也是epoll_event数据,也就可以知道是哪个fd触发了事件,或者哪个ptr触发了事件.

1
2
3
4
5
6
7
8
if (__put_user(revents, &events->events) ||
__put_user(epi->event.data, &events->data)) { // 内核取出就绪数据到用户空间
list_add(&epi->rdllink, &txlist);
ep_pm_stay_awake(epi);
if (!res)
res = -EFAULT;
break;
}

上文提到的就是通过判断是哪个fd触发了事件,那我们这里就是要通过判断是哪个ptr,这个ptr我们可以设置成一个sockitem结构体的地址,这个结构体里面包括了fd和回调函数,在epoll_ctl的时候就将ptr绑定好传递给内核,epoll_wait返回后,直接取出ptr对应的sockitem里面的回调函数执行就行了,比如listenfd的回调函数是accept_cb,clientfd的回调函数有send_cb和recv_cb,epoll_wait后我们可以通过事件来进行区分,EPOLLIN就调用epoll_event对应的ptr里面的回调函数就行,EPOLLOUT也是一样,这些ptr都是epoll_ctl时就绑定好了的,这样我们就将epoll的处理逻辑和业务代码单独处理了,因为回调函数都是处理一些业务的逻辑,也无需判断是不是listenfd,这种自动触发对应的事件处理函数的方法就是反应堆模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void (callback *)(int fd);
struct SockItem {
int fd;
callback cb;
};

int epoll_fd = epoll_create(EPOLL_CLIENT);
struct SockItem *ed = new SockItem();
ed->fd = listenerfd;
ed->cb = accept_cb;
epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = ed;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenerfd, &ev);

while (true) {
int nready = epoll_wait(epoll_fd, epoll_events);
for (int i = 0; i < nready; i++) {
struct SockItem * ed = (struct SockItem *)epoll_events[i].data.ptr; // 取出来的还是原来传进去的结构体地址
if (ed->cb) {
ed->cb(ed->fd); // 统一处理
}
}
}

void accept_cb(int fd)
{
struct sockaddr clientaddr;
int clientfd = accept(fd, (struct sockaddr *)&clientaddr, sizeof(clientaddr));
struct SockItem *ed = new SockItem();
ed->fd = clientfd;
ed->cb = recv_cb;
epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = ed;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);
}

void recv_cb(int fd)
{
char buf[1024];
int n = read(fd, buf, sizeof(buf));
printf("receive: %s\n", buf);
}

epoll处理连续数据

对于EPOLLOUT里面的send是需要发送大文件的情况,这种情况一次send是发送不完的,因为发送缓冲区不够大,需要分多次发送,但我们怎么知道上一次是发送到哪个位置了,比如30MB的文件,上次发送了2MB,这次要接着上次2MB的位置继续发送,所以这个2MB的位置信息send_pos需要在上次发送完成后保存下来,我们也把它保存着sockitem这个结构体里面,因为sockitem对应的地址也就是上面说的ptr我们是传给内核了的,有事件到来时,内核还会把这个ptr给到应用层,我们就可以从ptr对应的sockitem里取出上次的send_pos位置继续发送了,另外我们将需要发送的数据放在sockitem里面的send_buffer里面,一般是在recv_cb里面赋值,比如echo服务器直接将收到的数据recv_buff拷到send_buffer里,send_cb只要这个send_buffer有值就进行send即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
void (callback *)(int fd);
struct SockItem {
int fd;
// 有读写两个事件
callback read_callback;
callback write_callback;
long send_pos; // 记录发送的位置
long last_pos; // 记录读取到的位置
char send_buffer[MAX_BUFFER_SIZE];
int status; // 可以用来实现状态机
};

int epoll_fd = epoll_create(EPOLL_CLIENT);
struct SockItem *ed = new SockItem();
ed->fd = listenerfd;
ed->read_callback = accept_cb;
epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = ed;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenerfd, &ev);

while (true) {
int nready = epoll_wait(epoll_fd, epoll_events);
for (int i = 0; i < nready; i++) {
struct SockItem * ed = (struct SockItem *)epoll_events[i].data.ptr; // 取出来的还是原来传进去的结构体地址
if (epoll_events[i].events && EPOLLIN) // 分事件统一处理,不再区分fd
{
if (ed->read_callback) {
ed->read_callback(ed);
}
} else if (epoll_events[i].events && EPOLLOUT) {
if (ed->write_callback) {
ed->write_callback(ed);
}
}
}
}

void accept_cb(struct SockItem *si)
{
struct sockaddr clientaddr;
int clientfd = accept(si->fd, (struct sockaddr *)&clientaddr, sizeof(clientaddr));
struct SockItem *new_si = new SockItem();
new_si->fd = clientfd;
new_si->read_callback = recv_cb;
new_si->write_callback = send_cb;
epoll_event ev;
ev.events = EPOLLIN || EPOLLOUT;
ev.data.ptr = ed;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, new_si);
}

void recv_cb(struct SockItem *si)
{
char buf[1024];
int n = read(si->fd, buf, sizeof(buf));
// 可以对数据进行协议解析
printf("receive: %s\n", buf);
memcpy(si->send_buffer + si->last_pos, buf, n); // 假设不会超过send_buffer的大小
si->last_pos += n;
}

void send_cb(struct SockItem *si)
{
if (si->last_pos - si->send_pos > 0) {
int n = 0;
while ((n = send(si->fd, si->send_buffer + si->send_pos, si->last_pos - si->send_pos) > 0)) {
si->send_pos += n;
}
if (si->send_pos == si->last) {
si->last_pos = si->send_pos = 0;
}
}
}

状态机

对于http服务或者websocket服务,需要在recv_cb里面对接收到的数据进行相对应的协议解析,比如websocket通信前先要进行握手,也就是进行一些鉴权验证,验证通过后才能进行相应的数据传输,针对这个流程可以使用状态机来实现,所以sockitem里面还需要加入一个status字段,recv收到数据根据状态进行处理。

多线程accept

一些对网络接入速度有要求的服务器,我们可以将accept单独交个一个poll进行处理,因为fd就一个,用poll就行,也可以使用多线程管理多个poll,而clientfd则交给epoll进行处理,其具体的事件处理交给线程池去完成,这样就将网络接入和业务处理分离开了,能够快速的响应接入,特别是对于那种重启后突然有大量用户接入的情况有很好的效果。

百万并发

一般情况下,使用epoll做服务器,基本不需要怎么处理都可以达到百万的并发量,这里的并发指的是服务器同一时间能够承载的服务量,一般与内存,数据库,日志和网络带宽等因素有关,并发压测服务器的时候,可以使用多台机器做客户端进行请求,服务器开多个端口进行监听,这样客户端可以启用更多的连接,比较五元组的组合就变得更多一些,还可能需要设置一些参数,如ulimit -n设置最大可打开的文件数,或者修改/etc/security/limits.conf里面的nofile,这个可以永久修改。file-max表示文件描述符的最大值,一般设置的比open files要大。另外tcp的sendbuf和recvbuf可以设置的小一点,这样并发量上来后占用的内存要小一些。当客户端的连接到一定数量的时候,可能会出现连接超时的现象,这可能是内核的一个参数需要修改,nf_conntrack_max,发出去的包被服务器的netfilter给拒绝接收了。

nephen wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!