可恶!这怎么办!

还记得上一次,我们接触网络编程后出现的问题吗?我们当时写了一个简单的服务端,可以接收到客户端传输给我们的信息,但我们有一些问题没有解决,我们的服务端是一次性的,我们加上了while(1),但是,断开连接后又出现了问题,我们的服务器会一直进行接收动作,而且其他的客户端连接不上我们了,所以显然单纯的一个while是不可以的。可恶啊!这该怎么办?

一个线程一个客户端?

单纯的加while已经是彻底失败了,现在我们的一个线程就能处理一个客户端连接,和消息的接收发送……嘶,那我们何不开多个线程呢?一个线程负责一个客户端,这不就好了嘛!咱说干就干:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>

void *routine(void *arg) {

int clientfd = *(int *)arg;

while (1) {

unsigned char buffer[BUFFER_LENGTH] = {0};
int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
if (ret == 0) {
close(clientfd);
break;

}
printf("buffer : %s, ret: %d\n", buffer, ret);

ret = send(clientfd, buffer, ret, 0);
}
}

int main() {

int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1) return -1;

struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9999);

if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr))) {
return -2;
}

int flag = fcntl(listenfd, F_GETFL, 0);
flag |= O_NONBLOCK;
fcntl(listenfd, F_SETFL, flag);

listen(listenfd, 10);

while (1) {
struct sockaddr_in client;
socklen_t len = sizeof(client);
int clientfd = accept(listenfd, (struct sockaddr*)&client, &len);

pthread_t threadid;
pthread_create(&threadid, NULL, routine, &clientfd);
}
}

现在我们再来测试一下,这一会程序没有问题了,无论是连接发送消息还是断开连接,亦或是多个客户端连接都没问题,但这之中还是有一个问题,线程是需要占用内存的越多的线程就意味着内存也要被占用更多,如果我们现在有100w人连接,显然我们需要超级多的内存,显然这个方法在高并发中并不是很好的解题方法,一定还有更好的……

就决定是你啦!IO多路复用!

IO多路复用是一种同步IO模型,单个进程/线程就可以同时处理多个IO请求。一个进程/线程可以监视多个文件句柄;一旦某个文件句柄就绪,就能够通知应用程序进行相应的读写操作;没有文件句柄就绪时会阻塞应用程序,交出cpu控制权。多路是指多条网络连接,复用指的是同一个进程/线程。一个进程/线程虽然任一时刻只能处理一个请求,但是处理每个请求的事件时,耗时控制在 1 毫秒以内,这样 1 秒内就可以处理上千个请求,把时间拉长来看,多个请求复用了一个进程/线程,这就是多路复用。多路复用主要有三种实现分别是select、poll、epoll,我们接下来一个一个的认识一下。

select

在写代码之前,我们先来仔细的了解一下整体的流程和工作原理,接下来的poll、epoll我们也是如此,磨刀不误砍柴工,理解工作原理十分重要。

首先我们来看看select的故事叭!select最开始会找三个“数组”,这三个“数组”分别是读集合,写集合,异常集合(主要还是看你想要关注的事件是什么,如果你只关心读事件,那么select就只会找一个“数组”),如果我们想让select监听某一个fd是否可以读了,那么我们就把它放到读集合里面并标记一下,如果监听一个fd是否可写了那我们就把它放在写集合里面并标记一下,之后select就拿着小树枝一遍一遍的轮询,一个一个的指,如果有某个fd或者多个fd出现了事件,那么select就停止轮询,将没有事件发生的位置0,然后返回一个正整数(这个正整数实际上是三个集合里面有事件的fd的数量总和,但好像没啥用……),之后我们就开始遍历这些fd看看到底是那个fd在那个集合里面被标记了,然后根据所在集合和是否被标记,来判断一个fd有什么事件,然后我们去处理,select很听话,如果我们不告诉它等多久,它就会乖乖的一直等待,等到有事件了才会继续进行下面的程序,所以select是一个阻塞函数。

select的工作流程(一个集合版)

select的”数组”设计的十分巧妙:select选取了3个128个字节大小的空间当做这三个“数组”,它把这128个字节展开为1024比特,每一个比特位就代表一个文件描述符,如果一个文件描述符被加入了“数组“中就以fd为索引,将对应索引位的值置1,这就是标记,“数组”其实应该叫做位图。

这里插一句题外话,一般来讲文件描述符的分配规则是在 files_struct 数组当中,找到当前没有被使用的最小的一个下标,作为新的文件描述符。所以分配的fd一般都是按照顺序分配的,当你关闭了之前的fd那么新的fd就会使用这个被关闭了的fd。比如我们之前分配的fd是3,4,5,那么下一个fd应该是6,如果现在我关闭了3号fd,然后我再分配一个fd那么这个fd应该是3,而不是7。

好了,我们已经知道select工作的整体流程了,接下来我们就来看看代码怎么写。根据故事我知道select需要三个位图,这个位图系统已经为我们封装好了叫做fd_set。之后我们当然还需要select函数喽,没有select谁帮我们看着呀,然后我们还需要四个工具,一个给我们用来看看哪一个被标记了,剩下三个给select,让它帮我们标记用。有了这些我们就可以开始写代码啦,我们先来看看这些东西是什么样子的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include<sys/select.h>

typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name
from the global namespace. */
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;

extern int select (int __nfds, fd_set *__restrict __readfds,
fd_set *__restrict __writefds,
fd_set *__restrict __exceptfds,
struct timeval *__restrict __timeout);

#define FD_SET(fd, fdsetp) __FD_SET (fd, fdsetp)
#define FD_CLR(fd, fdsetp) __FD_CLR (fd, fdsetp)
#define FD_ISSET(fd, fdsetp) __FD_ISSET (fd, fdsetp)
#define FD_ZERO(fdsetp) __FD_ZERO (fdsetp)

#include<time.h>
struct timeval {
__kernel_time_t tv_sec; /* seconds */
__kernel_suseconds_t tv_usec; /* microseconds */
};

fd_set虽然看起来有点……可怕,但没关系我们先不用注重里面是怎么回事,我们先来看下面的select函数:

  1. select函数有五个参数,第一个参数是最大fd值+1,这里为啥要指定是“最大fd值+1”nie?因为如果只是最大fd值的话,那么最大这个fd就不会被检测了,就好像是数组一样,如果你说最大的索引值要是10的话,那声明的时候应该是11,那如果比10还小,那最大索引就更不可能是10了,同样select第一个参数也是如此,如果我们要检测fd0~4,那么我们这里的参数应该填5。
  2. 第二个,第三个,第四个参数就是故事里所说的三个位图啦,分别是读位图,写位图,异常位图,如果你需要检测就把对应的参数填上,如果你说你一个都不想监听,那就填NULL就好了,这样select就变成了一个高精度的计时器。
  3. 最后一个参数是等待时间,它有三种情况,如果是NULL,那么select就会坐在那里乖乖的等着,如果我们指定了时间,也就是第五个参数struct timeval这个结构体被指定了,那么select就会严格的按照规定好的时间等待,期间有时间就停止等待然后告诉我们有事件啦,如果没有就会一直等待,直到超时了就不再等待了。timeval结构体有两个属性,一个是tv_sec代表秒,一个是tv_usec代表微秒,所以如果你什么都不想监听,那可以设置select的时间,让它成为一个微秒级的计时器。

之后是四个工具:

  1. FD_ZERO函数的参数只有一个,那就是一个fd_set,这个函数会将传入的fd_set全部位设为0,我们每一次声明一个fd_set后都要记得将它置0哦~
  2. FD_SET函数的参数有两个,一个是fd,一个是fd_set,这个函数会将传入的fd值当做索引,然后去传入的位图中,找到对应索引位,并将该位置1
  3. FD_CLR函数也是两个参数,它的作用和FD_SET正好相反,它会将对应索引位置0
  4. FD_ISSET函数就是留给我们自己的啦,虽然select函数很乖,但有点笨笨~,它不会告诉我们到底是哪一位被标记了,那我们也没办法知道到底是哪一个fd有事件发生,所以我们就要拿着所有fd号去我们想要监听的集合里面问,这个fd有没有在这里被标记呀?如果你社恐的话,FD_ISSET可以帮你问,如果你不社恐的话……也请用FD_ISSET问。

好,我们知道了这些都是做什么用的了,我们来写代码叭!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/select.h>
#include <unistd.h>
#include <string.h>

#define BUFFER_LENGTH 128

int main() {

int listenfd = socket(AF_INET, SOCK_STREAM, 0); //
if (listenfd == -1) return -1;

struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9999);

if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr))) {
return -2;
}

listen(listenfd, 10);

fd_set rfds, wfds, rset, wset;

FD_ZERO(&rfds);
FD_SET(listenfd, &rfds);
FD_ZERO(&wfds);

int maxfd = listenfd;

unsigned char buffer[BUFFER_LENGTH] = {0};
int ret = 0;

while (1)
{
rset = rfds;
wset = wfds;

select(maxfd+1, &rset, &wset, NULL, NULL);
if (FD_ISSET(listenfd, &rset))
{
printf("listenfd --> \n");

struct sockaddr_in client;
socklen_t len = sizeof(client);
int clientfd = accept(listenfd, (struct sockaddr*)&client, &len);

FD_SET(clientfd, &rfds);
if (clientfd > maxfd) maxfd = clientfd;

}

int i = 0;
for (i = listenfd+1; i <= maxfd;i ++)
{
if (FD_ISSET(i, &rset)) {
ret = recv(i, buffer, BUFFER_LENGTH, 0);
if (ret == 0) {
close(i);
FD_CLR(i, &rfds);
} else if (ret > 0) {
printf("buffer : %s, ret: %d\n", buffer, ret);
FD_SET(i, &wfds);
}
}else if (FD_ISSET(i, &wset)) {
ret = send(i, buffer, BUFFER_LENGTH, 0); //
FD_CLR(i, &wfds); //
FD_SET(i, &rfds);
}
}
}
}

代码看起来好像有点长,不过没关系,我们一点点的看,最开始还是老三件socket,bind,listen。

之后我们声明了,四个位图,两个是读位图一个是原先的版本rfds,一个是副本rset,同样写位图也有两个,也是一个原版本,一个副本,至于为什么每样要声明两个,我们继续往下看,声明好了之后我们把rfds,wfds清零,并把listenfd放到rfds中,这样等一下select就可以监听listenfd是否有可读事件发生了。之后我们还要声明一个最大的fd并把listenfd的值赋给它,我们之前也说正常来讲fd的分配都是按照一定顺序来的,我们现在只往里面放了listenfd所以最大的fd号就是listenfd的值(这个值应该是3,因为在linux中0是标准输入,1是标准输出,2是标准异常)。然后我们还声明了一个字符串用来接收和发送消息,一个ret用来接收recv的返回值。

之后我们写了一个while循环,因为我们希望我们的服务器是可以重复使用的而不是一次性的,之后我们就用到了我们的副本,我们将原版本中的值赋给副本,然后使用select去监听副本rset和wset,因为select会清除没有事件发生的fd,这个不是我们希望的,我们只希望我们客户端断开连接后才清除,所以采用了监听副本的方式。当监听到了有事件了,就开始继续执行下去,我们首先要看一下这个事件是不是listenfd发出的读事件,别的fd发出的fd并不需要连接,所以我们要对listenfd的读事件进行单独的处理,我们也知道listenfd我们关心的也只是读事件,所以我们直接使用FD_ISSET看看,listenfd在rset中有没有被标记,如果有那么我们就accept建立连接,并且将新分配的客户端fd加入到rfds中,那为什么不一起加入到wfds中呢?因为我们这个业务是客户给我们发啥,我们就给客户返回啥,客户都还没发我们怎么给人返回呢?

如果没有那么这个事件应该是某一个客户端发出的,所以我们就开始拿着所有的fd去一个一个集合问。之后我们使用了一个for循环,循环从listenfd+1开始直到maxfd,因为我们刚才已经判断过listenfd了所以不需要再一次判断了,判断到maxfd也没有必要继续下去了,因为剩下的我们都没有分配怎么可能有事件发生。之后我们开始拿着fd问问读集合,我这个fd这里还有没有标记呀,有就是有事件没有就是没事件,然后我再去问问写集合,如果也没有那么这个fd就是没有事件发生,我们就换下一个fd在问,直到我们找到了发生事件的fd,然后我们处理事件。

这样就完成了IO多路复用,多个连接多个事件通过select在一个线程上得到处理。

select IO多路复用

select虽然很好,但这个笨笨的小家伙还是有点缺点:

  1. 每次调用都需要重新设置fd_set
  2. 频繁的从用户态切换内核态进行拷贝,效率不高效
  3. 底层采用轮询机制,大量连接下效率很低。
  4. select 支持监听的fd有限。
  5. 我们没有办法知道到底是那个fd发生事件了,只能再一次轮询

因为这些缺点的存在所以人们又发明了其他的实现IO多路复用的方法,poll。

poll

poll实际上和select的机制相同,也是通过轮询和判断状态进行的,根据描述符的状态进行处理,但是 poll() 没有最大文件描述符数量的限制(但是数量过大后性能也是会下降)。poll() 和 select() 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

我们也简单的来看一下poll的函数原型:

1
2
3
4
5
6
7
8
9
#include<sys/poll.h>
struct pollfd
{
int fd; /* File descriptor to poll. */
short int events; /* Types of events poller cares about. */
short int revents; /* Types of events that actually occurred. */
};

extern int poll (struct pollfd *__fds, nfds_t __nfds, int __timeout);

我们先来看pollfd这个结构体,这个结构体里面有三个属性,分别是一个文件描述符fd,一个等待的时间,一个实际发生的事件。fd很好理解,就是指定poll要监听的fd,事件events和实际发生事件revents也很好理解,事件events就是我们去指定我们监听的fd可能会发生什么事件,实际发生事件就是我们监听后实际发生过得事件,这个实际发生事件并不需要我们指定而是内核给我们返回的。事件和实际发生事件有这些取值:

poll事件取值

这里面events无法取值异常事件的三个值,其他都可以取到,revent是都可以取到的。

然后我们看poll函数,poll函数三个参数,第一个就是我们刚才讲的我们可以声明一个pollfd的结构体数组,然后传给poll这样就可以监听多个fd了,第二个nfds是我们最多可以监听文件描述符的数量,第三个就是时间检测,如果我们没有指定那他就会一直等。之后我们就可以循环的去看数组中每一个pollfd的revents到底是什么,对不同的事件进行不同的处理。

虽然poll解决了select监听数有限,每一次都需要从新设定fd_set,这两个问题,但是其他问题几乎都没有解决,所以人们也没有很器重这个做法而是找到了更好的也可以说是现在最好的解决办法:epoll。

epoll

epoll是现在实现IO多路复用最好的技术,它解决了select和poll的问题,成为了高并发服务器的大杀器。

我们来看一看epoll的故事,epoll是一位优秀的快递员,不同于select和poll一家一户的上门询问,epoll采用了更好的办法,它在业主楼下设立了一个快递站,楼内居住的业主谁有快递了就把快递丢到快递点就好,epoll只要去从快递点取快递就好了,不需要一家一户的问有没有快递要收,而且epoll还可以根据快递上面的信息知道每一个快递的主人是谁。如果有新的业主入住或者以前的业主搬走了又或者是其他什么事情都需要告诉epoll一声,毕竟为大家带来了这么多便利,和大家的关系还是不错的。

epoll的故事很短,但是epoll也很强大,接下来我们就先来看看如何使用epoll吧!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <sys/epoll.h>
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;

extern int epoll_create (int __size) __THROW;

extern int epoll_ctl (int __epfd, int __op, int __fd,
struct epoll_event *__event) __THROW;

extern int epoll_wait (int __epfd, struct epoll_event *__events,
int __maxevents, int __timeout);

epoll有三个重要的函数分别是epoll_create、epoll_ctl、epoll_wait。我们分别来看看:

  1. epoll_create:这个函数是用来创建一个epoll实例的,它返回了一个int类型数据,我们知道的在linux下皆为文件,所以返回的int数据其实就是一个新的文件描述符,也就是epoll实例的fd,这个函数的参数很有意思,只要是一个非零数即可无论是1还是1000效果都是一样的,有兴趣的小伙伴可以看看这个是因为什么,欢迎留言告诉博主哦。
  2. epoll_ctl:这个函数是业主用来告诉epoll事情的,它有四个参数,第一个就是平日里和自己打交道的epollfd,第二个是操作(业主要做的事情),操作有三个取值分别是EPOLL_CTL_ADD(你好呀!我要搬进来住啦!:) )、EPOLL_CTL_DEL(再见了我要搬走了 :( )、EPOLL_CTL_MOD(我们从8楼搬到2楼啦,爬楼好累 :| )。第三个参数是业主,到底是那个业主需要通知epoll,这就是第三个参数,第四个参数是一个epoll_event结构体,那我们就来看看epoll结构体,epoll结构体有两个属性,一个是联合体epoll_data,一个是events,events就是我们关心的事情,我们快递只负责收,只负责发,或者有人可以发有人可以收,都体现在这个events上了,events可以是下面几个参数的集合:events取值data就是故事里所说的快递上的信息,这里面我们需要关注的就是联合体内的fd,这个需要我们初始化,最后一个参数的作用就是将关注的事件和对应的业主联系起来。
  3. epoll_wait:这个函数是epoll定时去快递站看看有没有快递,它有四个参数,第一个是快递员epoll,第二个是事件集合,epoll虽然很厉害很勤奋,但他还是要把快递取回来让我们处理,这个事件集合就是我们需要处理的所有事件,第三个参数就是事件集合的大小,第四个代表epoll是按照什么样的频率去快递站,单位是毫秒,如果是0就代表立马返回,如果是-1就阻塞等待,如果是5000,那就是5秒一返回。

知道了这些接下来我们就开始写代码吧!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include<sys/select.h>
#include <unistd.h>
#include<sys/poll.h>
#include <pthread.h>
#include <sys/epoll.h>
#include<stdlib.h>
#include <string.h>


#define BUFFER_LENGTH 10
#define EVENTS_LENGTH 128


char rbuffer[BUFFER_LENGTH] = {0};
char wbuffer[BUFFER_LENGTH] = {0};

int main() {

int listenfd = socket(AF_INET, SOCK_STREAM, 0); //
if (listenfd == -1) return -1;

struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9999);

if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr))) {
return -2;
}
int flag = fcntl(listenfd, F_GETFL, 0);
flag |= O_NONBLOCK;
fcntl(listenfd, F_SETFL, flag);

listen(listenfd, 10);

int epfd = epoll_create(1);

struct epoll_event ev, events[EVENTS_LENGTH];
ev.events = EPOLLIN ;
ev.data.fd = listenfd; //

epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); //

while (1) {

int nready = epoll_wait(epfd, events, EVENTS_LENGTH, -1); // -1, ms
printf("--------------%d\n",nready);
int i = 0;
for (i = 0;i < nready;i ++) {

if (listenfd == events[i].data.fd) {
struct sockaddr_in client;
socklen_t len = sizeof(client);
int clientfd = accept(listenfd, (struct sockaddr*)&client, &len);
printf("%d\n",clientfd);
if (clientfd == -1) break;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = clientfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);

} else if (events[i].events & EPOLLIN ) {

fcntl(events[i].data.fd, F_SETFL, flag);
int n = recv(events[i].data.fd, rbuffer, BUFFER_LENGTH, 0);
if (n > 0) {
rbuffer[n] = '\0';
printf("recv: %s, n: %d\n", rbuffer, n);

memcpy(wbuffer, rbuffer, BUFFER_LENGTH);

ev.events = EPOLLOUT | EPOLLET;
ev.data.fd = events[i].data.fd;

epoll_ctl(epfd, EPOLL_CTL_MOD, events[i].data.fd, &ev);

} else {
epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, &ev);
close(events[i].data.fd);
}
}
else if (events[i].events & EPOLLOUT) {

fcntl(events[i].data.fd, F_SETFL, flag);
int sent = send(events[i].data.fd, wbuffer, strlen(wbuffer), 0); //
printf("sent: %d\n", sent);

ev.events = EPOLLIN | EPOLLET;
ev.data.fd = events[i].data.fd;

epoll_ctl(epfd, EPOLL_CTL_MOD, events[i].data.fd, &ev);
}
}
}
}

我们来一点点的看代码:

首先不用多说肯定是我们的socket、bind、listen。之后我们声明了ev和events[EVENTS_LENGTH],分别用来等一下绑定事件与fd、充当事件集合。然后我们让listenfd入住。之后我们进入一个for循环,因为我们只需要处理事件集合中的全部事件即可所以我们的for循环判断终止条件是事件的个数即可。之后我们就开始判断了如果事件的fd是listenfd那么证明有人需要入住啦,所以我们就开始办理入住手续,如果不是listenfd,那么可能就是有人希望收快递,那我们就开始处理收快递的事件,如果收到的快递里面是空盒,那么说明有人要搬走了,我们就关闭当前事件的fd,然后调用epoll_ctl告诉epoll一声,要搬走了以后不用关心我了。同样如果是有人要发快递,那我们就处理发快递的事件。这样就解决了轮询效率低下的问题。

水平触发和边缘触发

刚刚我们介绍epoll的时候说到了一个边缘触发,而且是相对于水平触发说的,那什么是水平触发呢?什么又是边缘触发呢?这里我们简单的给出两条概念:

边缘触发(ET),无论是对于读事件还是写事件,ET只会因为这三种情况被触发:

  1. buffer由不可读\不可写转变为可读\可写
  2. buffer中内容变多\变少
  3. 当buffer不为空\满,且用户对对应的fd进行了epoll_mod_IN\epoll_mod_OUT事件

水平触发(LT),它的触发条件就更为宽松了,只要缓冲区有东西就会触发。

接下来我们看几个小例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <unistd.h>
#include <iostream>
#include <sys/epoll.h>
using namespace std;
int main(void)
{
int epfd,nfds;
struct epoll_event ev,events[5];//ev用于注册事件,数组用于返回要处理的事件
epfd=epoll_create(1);//只需要监听一个描述符——标准输入
ev.data.fd=STDIN_FILENO;
ev.events=EPOLLIN|EPOLLET;//监听读状态同时设置ET模式
epoll_ctl(epfd,EPOLL_CTL_ADD,STDIN_FILENO,&ev);//注册epoll事件
for(;;)
{
nfds=epoll_wait(epfd,events,5,-1);
for(int i=0;i<nfds;i++)
{
if(events[i].data.fd==STDIN_FILENO)
cout<<"hello world!"<<endl;
}
}
}

结果:

边缘触发

当我们输入一组字符后,这组字符被送到了缓冲区里面,因为这一次输入让缓冲区从空变成了非空,所以epoll_wait认为监听的STDIN_FILENO(标准输入)有事件发生了,所以就触发了一次,之后因为我们没有把字符从缓冲区读出来,所以缓冲区里面字符长度没变,还是非空的,所以epoll_wait就认为没有事件发生,所以会阻塞在epoll_wait。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <unistd.h>
#include <iostream>
#include <sys/epoll.h>
using namespace std;
int main(void)
{
int epfd,nfds;
struct epoll_event ev,events[5];
epfd=epoll_create(1);
ev.data.fd=STDIN_FILENO;
ev.events=EPOLLIN;
epoll_ctl(epfd,EPOLL_CTL_ADD,STDIN_FILENO,&ev);
for(;;)
{
nfds=epoll_wait(epfd,events,5,-1);
for(int i=0;i<nfds;i++)
{
if(events[i].data.fd==STDIN_FILENO)
cout<<"hello world!"<<endl;
}
}
}

结果:

水平触发

这一回,程序疯狂的打印hello world这也符合我们的预期,因为数据一直停留在缓冲区,而且epoll_wait监听的事件触发方式是水平触发,所以epoll_wait认为STDIN_FILENO一直有事,所以就一直打印hello world。

我们再来小小修改一下水平触发的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <unistd.h>
#include <iostream>
#include <sys/epoll.h>
using namespace std;
int main(void)
{
int epfd,nfds;
char buf[2];
struct epoll_event ev,events[5];
epfd=epoll_create(1);
ev.data.fd=STDIN_FILENO;
ev.events=EPOLLIN;
epoll_ctl(epfd,EPOLL_CTL_ADD,STDIN_FILENO,&ev);
for(;;)
{
nfds=epoll_wait(epfd,events,5,-1);
for(int i=0;i<nfds;i++)
{
if(events[i].data.fd==STDIN_FILENO)
{
read(STDIN_FILENO,buf,sizeof(buf));
cout<<"hello world!"<<endl;
}
}
}
}

结果:

水平触发

这回的结果很有意思,但不难理解,因为我们输入了三个字符每一次只能读两个,所以我们读了两次打印了两次把所有字符读完了之后epoll_wait认为所有事情都处理完了,所以就阻塞等待了。

最后我们再来看一个小例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <unistd.h>
#include <iostream>
#include <sys/epoll.h>
using namespace std;
int main(void)
{
int epfd,nfds;
struct epoll_event ev,events[5];
epfd=epoll_create(1);
ev.data.fd=STDIN_FILENO;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_ADD,STDIN_FILENO,&ev);
for(;;)
{
nfds=epoll_wait(epfd,events,5,-1);
for(int i=0;i<nfds;i++)
{
if(events[i].data.fd==STDIN_FILENO)
{
cout<<"hello world!"<<endl;
ev.data.fd=STDIN_FILENO;
ev.events=EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,STDIN_FILENO,&ev);
}
}
}
}

结果:

使用mod触发

epoll真的已经是强无敌了吗?

我们之前说epoll是IO多路服用最好的实现方式了,可是难道真的已经是强无敌了嘛?我们可以看到在程序中还是存在着一些漏洞和瑕疵,比如我们所有的客户端都共用一个字符空间,这可能就会成为我们未来程序崩溃的原因之一,当然还有其他问题,或许我们需要借助某种设计来改善它,欲知后事如何,请听下回分解。