常规的epoll处理
epoll是io多路复用的一种实现方式,最开始我们使用epoll是对多个fd进行管理,当epoll_wait从内核的rdllist就绪链表中取出一定数量的poll_event时,我们可以根据fd进行相应的处理,比如是listenfd的话,就进行accept操作,再将返回的clientfd通过epoll_ctl加入到内核的红黑树中进行EPOLLIN的监听,如果不是listenfd,说明是clientfd需要被处理,再判断是EPOLLIN还是EPOLLOUT,分别做recv和send的处理,可以看出在这个过程中,需要对fd做出处理,判断是否是listenfd,因为是listenfd的话说明是有新的连接进来,需要做连接接入处理,而是其他fd的话,就要判断是哪种事件了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | int epoll_fd = epoll_create(EPOLL_CLIENT); while (true) {     int nready = epoll_wait(epoll_fd, epoll_events);     for (int i = 0; i < nready; i++) {         if (epoll_events[i].fd == listenerfd) {              struct sockaddr clientaddr;             int clientfd = accept(epoll_events[i].fd, (struct sockaddr *)&clientaddr, sizeof(clientaddr));             epoll_event ev;             ev.events = EPOLLIN;             ev.data.fd = clientfd;             epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);         } else {             char buf[1024];             int n = read(epoll_events[i].fd, buf, sizeof(buf));             printf("receive: %s\n", buf);         }     } }
   | 
 
当然也可以这样实现,epoll_wait返回后,首先对事件进行判断,看是EPOLLIN还是EPOLLOUT,如果是EPOLLIN的话,还要判断是不是listenfd,因为listenfd也是EPOLLIN事件,不是的话就说明是有传输数据进来,需要recv处理,而对EPOLLOUT事件,说明发送缓冲区可以发送数据了,进行send即可。这种实现方式还是需要对fd进行判断,有没有更优雅的使用方法呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
   | int epoll_fd = epoll_create(EPOLL_CLIENT); while (true) {     int nready = epoll_wait(epoll_fd, epoll_events);     for (int i = 0; i < nready; i++) {         if (epoll_events[i].events & EPOLLIN) {               if (epoll_events[i].fd == listenerfd) {                 struct sockaddr clientaddr;                 int clientfd = accept(epoll_events[i].fd, (struct sockaddr *)&clientaddr, sizeof(clientaddr));                 epoll_event ev;                 ev.events = EPOLLIN;                 ev.data.fd = clientfd;                 epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);             } else {                 char buf[1024];                 int n = read(epoll_events[i].fd, buf, sizeof(buf));                 printf("receive: %s\n", buf);             }         }     } }
   | 
 
epoll事件模型的底层原理
这就是这篇文章要说到的reactor反应堆模型了,它是一种对事件进行管理的模式,先来看看是怎么实现的,epoll_event是我们提供epoll_ctl需要传递给内核的结构体数据,它有一个uint32_t 的events和epoll_data类型的data,epoll_data是一个union联合体,它有一个fd整形和void *ptr指针(内核中是一个64位的变量)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
   | 
  typedef union epoll_data {   void *ptr;   int fd;   uint32_t u32;   uint64_t u64; } epoll_data_t;
  struct epoll_event {   uint32_t events;       epoll_data_t data;   } __EPOLL_PACKED;
  struct epoll_event { 	__poll_t events; 	__u64 data; } EPOLL_PACKED;
 
  | 
 
重点关注这两个,也就是说,我们可以通过epoll_event传递给内核fd或者是ptr,这些传递进去的fd或者ptr只是为了和events绑定在一起,后面通过epoll_wait取的数据也是epoll_event数据,也就可以知道是哪个fd触发了事件,或者哪个ptr触发了事件.
1 2 3 4 5 6 7 8
   | if (__put_user(revents, &events->events) ||     __put_user(epi->event.data, &events->data)) {      list_add(&epi->rdllink, &txlist);     ep_pm_stay_awake(epi);     if (!res)         res = -EFAULT;     break; }
   | 
 
上文提到的就是通过判断是哪个fd触发了事件,那我们这里就是要通过判断是哪个ptr,这个ptr我们可以设置成一个sockitem结构体的地址,这个结构体里面包括了fd和回调函数,在epoll_ctl的时候就将ptr绑定好传递给内核,epoll_wait返回后,直接取出ptr对应的sockitem里面的回调函数执行就行了,比如listenfd的回调函数是accept_cb,clientfd的回调函数有send_cb和recv_cb,epoll_wait后我们可以通过事件来进行区分,EPOLLIN就调用epoll_event对应的ptr里面的回调函数就行,EPOLLOUT也是一样,这些ptr都是epoll_ctl时就绑定好了的,这样我们就将epoll的处理逻辑和业务代码单独处理了,因为回调函数都是处理一些业务的逻辑,也无需判断是不是listenfd,这种自动触发对应的事件处理函数的方法就是反应堆模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
   | void (callback *)(int fd); struct SockItem {     int fd;     callback cb; };
  int epoll_fd = epoll_create(EPOLL_CLIENT); struct SockItem *ed = new SockItem(); ed->fd = listenerfd; ed->cb = accept_cb; epoll_event ev; ev.events = EPOLLIN; ev.data.ptr = ed; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenerfd, &ev);
  while (true) {     int nready = epoll_wait(epoll_fd, epoll_events);     for (int i = 0; i < nready; i++) {         struct SockItem * ed = (struct SockItem *)epoll_events[i].data.ptr;          if (ed->cb) {             ed->cb(ed->fd);          }     } }
  void accept_cb(int fd) {     struct sockaddr clientaddr;     int clientfd = accept(fd, (struct sockaddr *)&clientaddr, sizeof(clientaddr));     struct SockItem *ed = new SockItem();     ed->fd = clientfd;     ed->cb = recv_cb;     epoll_event ev;     ev.events = EPOLLIN;     ev.data.ptr = ed;     epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev); }
  void recv_cb(int fd) {     char buf[1024];     int n = read(fd, buf, sizeof(buf));     printf("receive: %s\n", buf); }
   | 
 
epoll处理连续数据
对于EPOLLOUT里面的send是需要发送大文件的情况,这种情况一次send是发送不完的,因为发送缓冲区不够大,需要分多次发送,但我们怎么知道上一次是发送到哪个位置了,比如30MB的文件,上次发送了2MB,这次要接着上次2MB的位置继续发送,所以这个2MB的位置信息send_pos需要在上次发送完成后保存下来,我们也把它保存着sockitem这个结构体里面,因为sockitem对应的地址也就是上面说的ptr我们是传给内核了的,有事件到来时,内核还会把这个ptr给到应用层,我们就可以从ptr对应的sockitem里取出上次的send_pos位置继续发送了,另外我们将需要发送的数据放在sockitem里面的send_buffer里面,一般是在recv_cb里面赋值,比如echo服务器直接将收到的数据recv_buff拷到send_buffer里,send_cb只要这个send_buffer有值就进行send即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
   | void (callback *)(int fd); struct SockItem {     int fd;          callback read_callback;     callback write_callback;     long send_pos;      long last_pos;      char send_buffer[MAX_BUFFER_SIZE];     int status;  };
  int epoll_fd = epoll_create(EPOLL_CLIENT); struct SockItem *ed = new SockItem(); ed->fd = listenerfd; ed->read_callback = accept_cb; epoll_event ev; ev.events = EPOLLIN; ev.data.ptr = ed; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenerfd, &ev);
  while (true) {     int nready = epoll_wait(epoll_fd, epoll_events);     for (int i = 0; i < nready; i++) {         struct SockItem * ed = (struct SockItem *)epoll_events[i].data.ptr;          if (epoll_events[i].events && EPOLLIN)          {             if (ed->read_callback) {                 ed->read_callback(ed);             }         } else if (epoll_events[i].events && EPOLLOUT) {             if (ed->write_callback) {                 ed->write_callback(ed);             }         }     } }
  void accept_cb(struct SockItem *si) {     struct sockaddr clientaddr;     int clientfd = accept(si->fd, (struct sockaddr *)&clientaddr, sizeof(clientaddr));     struct SockItem *new_si = new SockItem();     new_si->fd = clientfd;     new_si->read_callback = recv_cb;     new_si->write_callback = send_cb;     epoll_event ev;     ev.events = EPOLLIN || EPOLLOUT;     ev.data.ptr = ed;     epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, new_si); }
  void recv_cb(struct SockItem *si) {     char buf[1024];     int n = read(si->fd, buf, sizeof(buf));          printf("receive: %s\n", buf);     memcpy(si->send_buffer + si->last_pos, buf, n);      si->last_pos += n; }
  void send_cb(struct SockItem *si) {     if (si->last_pos - si->send_pos > 0) {         int n = 0;         while ((n = send(si->fd, si->send_buffer + si->send_pos, si->last_pos - si->send_pos) > 0)) {             si->send_pos += n;         }         if (si->send_pos == si->last) {             si->last_pos = si->send_pos = 0;         }     } }
   | 
 
状态机
对于http服务或者websocket服务,需要在recv_cb里面对接收到的数据进行相对应的协议解析,比如websocket通信前先要进行握手,也就是进行一些鉴权验证,验证通过后才能进行相应的数据传输,针对这个流程可以使用状态机来实现,所以sockitem里面还需要加入一个status字段,recv收到数据根据状态进行处理。
多线程accept
一些对网络接入速度有要求的服务器,我们可以将accept单独交个一个poll进行处理,因为fd就一个,用poll就行,也可以使用多线程管理多个poll,而clientfd则交给epoll进行处理,其具体的事件处理交给线程池去完成,这样就将网络接入和业务处理分离开了,能够快速的响应接入,特别是对于那种重启后突然有大量用户接入的情况有很好的效果。
百万并发
一般情况下,使用epoll做服务器,基本不需要怎么处理都可以达到百万的并发量,这里的并发指的是服务器同一时间能够承载的服务量,一般与内存,数据库,日志和网络带宽等因素有关,并发压测服务器的时候,可以使用多台机器做客户端进行请求,服务器开多个端口进行监听,这样客户端可以启用更多的连接,比较五元组的组合就变得更多一些,还可能需要设置一些参数,如ulimit -n设置最大可打开的文件数,或者修改/etc/security/limits.conf里面的nofile,这个可以永久修改。file-max表示文件描述符的最大值,一般设置的比open files要大。另外tcp的sendbuf和recvbuf可以设置的小一点,这样并发量上来后占用的内存要小一些。当客户端的连接到一定数量的时候,可能会出现连接超时的现象,这可能是内核的一个参数需要修改,nf_conntrack_max,发出去的包被服务器的netfilter给拒绝接收了。
