使用epoll实现一个高并发的服务器
上节从一个基础的socket服务说起我们实现了一个基本的socket服务器,并留了个思考题
先启动server,然后启动一个client,不输入数据,这个时候在另外一个终端上再启动一个client,并在第二个client终端中输入数据,会发生什么呢?
实际操作后,我们会发现,在第二个client输入后,服务端并没有响应,直到第一个client也输入数据完成交互后,第二个client才会有数据返回。
这是由于服务端accept获取到第一个client的套接字后,由于第一个client未输入数据,所以服务端进程会阻塞在等待客户端数据那一行。
... int read_num = read(accept_fd, read_msg, 100); ...
所以,第二个client完成三次握手后,连接一直在服务端的全连接队列中,等待accept获取处理。
后续的client无法得到处理是由于服务端只有一个线程,获取client套接字还有连接通信全在一个线程中。
那我们直接开多个线程就好了,主线程只负责accept获取客户端套接字。每来一个连接,我们就新起一个线程去处理客户端和服务端的通信。这样多个连接之间就不会互相影响了。服务端程序如下:
// per_conn_per_thread_server.cpp #include <sys/socket.h> #include <netinet/in.h> #include <unistd.h> #include <thread> #include <arpa/inet.h> #include <string.h> #include <cstdio> #include <errno.h> void handleConn(int accept_fd) { char read_msg[100]; int read_num = read(accept_fd, read_msg, 100); printf("get msg from client: %sn", read_msg); int write_num = write(accept_fd, read_msg, read_num); close(accept_fd); } int main() { int listen_fd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in server_addr; bzero(&server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); server_addr.sin_port = htons(8888); if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { printf("bind err: %sn", strerror(errno)); close(listen_fd); return -1; } if (listen(listen_fd, 2048) < 0) { printf("listen err: %sn", strerror(errno)); close(listen_fd); return -1; } struct sockaddr_in client_addr; bzero(&client_addr, sizeof(struct sockaddr_in)); socklen_t client_addr_len = sizeof(client_addr); int accept_fd = 0; while((accept_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len)) > 0) { printf("get accept_fd: %d from: %s:%dn", accept_fd, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); std::thread handleThread(handleConn, accept_fd); // 将线程设置为后台线程,避免阻塞主线程 handleThread.detach(); } }
使用thread库时,如果使用g++进行编译需要添加
-lpthread
,完整编译命令:g++ -std=c++11 xxx.cpp -lpthread
看似解决阻塞问题了,但其实这种方案有大缺陷,只要我们稍微加大下客户端的并发度,就会发现服务端会处理不过来。每来一个连接都创建一个新线程,处理完后再销毁线程,这种处理方式成本太大。
我们仔细分析下,「per connection per thread」出现性能瓶颈有以下几个原因:
既然是由于并发量高时线程太多导致的性能问题,那如果有一种技术,能让一个线程负责N个连接就能完美解决了。伪代码如下:
class HandleThread { std::vector<int> handle_fds; void addFd(int fd) {handle_fds.push_back(fd)}; void work(); } HandleThread::work() { for(;;) { int readyFd = getReadyIOFd(); ... // 对readyFd读写处理 ... } } auto pool = createThreadPool(4); int accept_fd = accept(...); HandleThread thread = pool.getThread(); thread.addFd(accept_fd);
上面代码大家应该很容易看懂,先创建一个指定线程数量的线程池,主线程获取到新连接后,丢到线程池的一个线程去处理。每个线程初始化后会执行work函数,work函数是一个while死循环,里面的getReadyIOFd会阻塞线程,直到有可读可写的套接字时,才会唤醒线程,去进行连接的读写。
扫盲点:一般我们讲的由于系统调用(比如read/write等)导致阻塞,这个时候阻塞的线程状态会被置为挂起,不会占用CPU。所以上面虽然有个while死循环,但在getReadyIOFd被阻塞了,getReadyIOFd底层也是个系统调用(具体实现我们后面会讲到),在没有可读写的套接字时线程并不会占用CPU。
上面的流程,其实就是大名鼎鼎的IO多路复用和Reactor多线程模型了。
这一节我们具体聊聊一个handleThread是如何管理多个套接字的。
IO多路复用的实现模型大家多少听过一些,我们先比较下常见的select和epoll
select简单理解就是拿一个数组保存连接套接字,调用select时,会将整个数组拷贝到内核空间中,如果当前数组中没有可读写的套接字,线程被阻塞。
等到数组中有可读写的套接字,或者超时(select可以设置阻塞的超时时间),select调用会返回,然后线程遍历全部数组,找到可读写的套接字,进行读写处理。
select存在以下几个缺点:
/usr/include/bits/typesizes.h
中有定义。epoll是linux2.6的时候提出的,epoll在內核中维护了一个eventpoll对象,eventpoll包含一个红黑树结构的等待队列wq和一个链表结构的就绪队列rdlist。
新获取到一个套接字后,将该套接字添加到wq中,等到套接字可读写时,操作系统会将该套接字从wq转到rdlist,然后线程直接处理rdlist中的套接字即可,不需要再遍历全部监听的套接字了。
与select相比,可以发现有以下几个优点:
epoll基本使用
因为我们的项目选用epoll,所以下面我们具体讲讲epoll的使用方法
epoll_create
创建一个epoll实例int epoll_create(int size); int epoll_create1(int flags);
epoll_create
功能相同。可以设置为EPOLL_CLOEXEC
, 表示当持有epoll句柄的进程fork出一个子进程时,子进程不会包含该epoll_fd。epoll_ctl
管理监听的描述符,并注册要监听的事件int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
epoll_create
创建的epoll_fdtypedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ } // epoll_event.event表示具体的事件类型,常见有以下几种: // EPOLLIN:文件描述符可读 // EPOLLOUT:文件描述符可写 // EPOLLRDHUP:套接字对端断开 // EPOLLET:边缘触发(后面细讲)
epoll_wait
等待事件发生,没有事件时,调用者进程会被挂起,等到事件发生/超时后返回int epoll_wait(int epfd, struct epoll_event* evlist, int maxevents, int timeout);
epoll_create
创建的epoll_fdepoll_wait
最多可以返回的事件数量epoll_wait
阻塞的超时值,如果设置为-1,表示不超时,如果设置为0,即使没有IO事件也会立即返回epoll有EPOLLLT(水平触发)和EPOLLET(边缘触发)两种工作模式:
epoll_wait
返回后读了部分数据,在下一次的epoll_wait
调用还是会返回之前那个没读完数据的socket。epoll_wait
返回。如果我们第一次epoll_wait
返回中读了部分数据,如果该套接字没再收到新数据,那即使该套接字缓存区中还有一些数据没读,下一次的epoll_wait
也不会返回该套接字了。所以我们需要在第一次读时通过循环read的方式把套接字中的数据全读出来。边缘触发处理起来会比水平触发比较麻烦,但性能会比水平触发高,因为减少 epoll 相关系统调用次数
讲完epoll的使用方法,我们把前面的伪代码套上epoll的边缘触发模式,完整代码如下:
#include <sys/socket.h> #include <netinet/in.h> #include <unistd.h> #include <thread> #include <arpa/inet.h> #include <string.h> #include <cstdio> #include <errno.h> #include <vector> #include <assert.h> #include <sys/epoll.h> #include <fcntl.h> int setfdNonBlock(int fd) { int flag = fcntl(fd, F_GETFL, 0); if (flag == -1) return -1; flag |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flag) == -1) return -1; return 0; }; void handleConn(int accept_fd) { char read_msg[100]; char *buf_ptr = read_msg; int total_read_num = 0; int read_num = 0; // 使用的是epollet边缘触发模式,需要把套接字缓存区中的数据全读完 do { read_num = read(accept_fd, buf_ptr, 100); buf_ptr += read_num; total_read_num += read_num; } while(read_num > 0); printf("get msg from client: %sn", read_msg); int write_num = write(accept_fd, read_msg, total_read_num); close(accept_fd); } int listenServer(char *host, int port) { int listen_fd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in server_addr; bzero(&server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); server_addr.sin_port = htons(8888); if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { printf("bind err: %sn", strerror(errno)); close(listen_fd); return -1; } if (listen(listen_fd, 2048) < 0) { printf("listen err: %sn", strerror(errno)); close(listen_fd); return -1; } return listen_fd; } const int EPOLLWAIT_TIME = 10000; const int EVENTSMAXNUM = 4096; class HandleThread { public: HandleThread() : epoll_fd_(epoll_create1(EPOLL_CLOEXEC)), epoll_events_(EVENTSMAXNUM), thread_(std::bind(&HandleThread::work, this)) { assert(epoll_fd_ > 0); thread_.detach(); } ~HandleThread() { close(epoll_fd_); } // 线程实际运行函数 void work(); // 添加监听套接字 void addFd(int fd); // 不再监听指定套接字 void rmFd(int fd); private: int epoll_fd_; std::vector<epoll_event>epoll_events_; std::thread thread_; }; void HandleThread::work() { for(;;) { int event_count = epoll_wait(epoll_fd_, &*epoll_events_.begin(), epoll_events_.size(), EPOLLWAIT_TIME); if (event_count < 0) { perror("epoll wait error"); continue; } for (int i = 0; i < event_count; i++) { epoll_event cur_event = epoll_events_[i]; int fd = cur_event.data.fd; // 不再监听fd,从epoll中去掉 rmFd(fd); // 处理连接读写 handleConn(fd); } } } void HandleThread::addFd(int fd) { epoll_event event; event.data.fd = fd; // 只监听读事件 event.events = EPOLLIN | EPOLLET; if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event) < 0) { perror("epoll_add error"); } } void HandleThread::rmFd(int fd) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event) < 0) { perror("epoll_del error"); } } typedef std::shared_ptr<HandleThread> SP_HandleThread; class HandleThreadPool { public: HandleThreadPool(int thread_nums) : thread_nums_(thread_nums), next_thread_idx_(0) { for (int i = 0; i < thread_nums; i++) { SP_HandleThread t (new HandleThread()); thread_pool_.push_back(t); } } SP_HandleThread getThread(); private: int thread_nums_; int next_thread_idx_; std::vector<SP_HandleThread> thread_pool_; }; // 从线程池中获取一个线程 SP_HandleThread HandleThreadPool::getThread() { SP_HandleThread t = thread_pool_[next_thread_idx_]; next_thread_idx_ = (next_thread_idx_ + 1) % thread_nums_; return t; } int main() { int listen_fd = listenServer("127.0.0.1", 8888); // 创建线程池 HandleThreadPool pool(4); // 等待1秒 sleep(1); struct sockaddr_in client_addr; bzero(&client_addr, sizeof(struct sockaddr_in)); socklen_t client_addr_len = sizeof(client_addr); int accept_fd = 0; while((accept_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len)) > 0) { printf("get accept_fd: %d from: %s:%dn", accept_fd, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port)); // 将fd设置为非阻塞 ? setfdNonBlock(accept_fd); // 从pool中获取一个线程处理连接 SP_HandleThread t = pool.getThread(); t->addFd(accept_fd); } }
代码比较长,但不难,大家可以fork下来慢慢看。
使用了智能指针,避免忘记回收堆上的资源。
大家可能会发现代码有两次注释添加了"?",第一处是在创建线程池后,sleep了1秒,这个当成本节的思考题,大家可以先思考,并想想有没有什么更好的解决办法?
第二处是在获取到accept_fd后,将fd设置为非阻塞了。下面我们展开具体讲讲。
阻塞IO调用:进程在调用IO操作时,如果没有数据可读或缓冲区没有空闲空间可写,导致IO操作未完成,进程被阻塞挂起,后续操作将无法执行。比如下面代码,如果客户端建立连接后,一直不发送数据,那服务端执行就会阻塞在read
调用,后面的printf
无法被执行到。
int accept_fd = accept(...); char read_msg[100]; int read_num = read(accept_fd, read_msg, 100); printf("i am a logn");
小提示:上面的代码即使客户端只发了1个字节的数据,服务端
read
调用也会返回,并不是要等到读满100个字节才会返回。
非阻塞IO调用: 进程在调用IO操作时,即使IO操作未完成,该IO调用也会立刻返回,之后进程可以进行后续操作。比如下面代码,将accept_fd设置为非阻塞后,再调用read
,这时即使客户端没有发数据,服务端也不会一直卡在read
调用上,后面的printf
能顺利打印出来。
int accept_fd = accept(...); // 将fd设置为非阻塞 setfdNonBlock(accept_fd); char read_msg[100]; int read_num = read(accept_fd, read_msg, 100); printf("i am a logn");
在前面,我们使用epoll实现了一个线程管理多个套接字,当某个套接字有读写事件时,epoll_wait
调用返回,告诉我们哪些套接字能读,但并不会告诉我们某个套接字上有多少数据可读。
epoll_wait
返回了。这对于水平触发还可行,但对于边缘触发就不行了,因为我们不知道这个套接字还会不会有新数据写入,如果对端不再写入新数据,那缓冲区中剩下的数据就再也读不到了。完整源码已上传到https://github.com/lzs123/CProxy-tutorial,欢迎fork and star!
如果本文对你有用,点个赞再走吧!或者关注我,我会带来更多优质的内容。