新网创想网站建设,新征程启航

为企业提供网站建设、域名注册、服务器等服务

Linux多线程Web服务器(C++实现)-创新互联

本文实现的是基于Reactor模式+epoll(边缘触发)+非阻塞socket+非阻塞IO+线程池的Web服务器,可以处理GET、POST两种请求,完成展示主页、获取图片、获取视频、登录及注册共五种功能。

创新互联公司作为成都网站建设公司,专注网站建设、网站设计,有关企业网站设计方案、改版、费用等问题,行业涉及阳光房等多个领域,已为上千家企业服务,得到了客户的尊重与认可。

原理图:

上图为本文实现的服务器的原理图,采用了单Reactor多线程的模式,在主线程中用epoll监听一个listenFd与多个connFd。

  • 若发生建立连接的事件,则交给accept单元处理,再把生成的connFd传给epoll管理;

  • 若发生可读\可写事件,则添加到线程池的任务队列中,由池中空闲的子线程拿取任务并处理;

  • 此外,若是客户端请求涉及数据库文件,则还需要从数据库连接池中拿出一个空闲的数据库连接,通过这个连接进行数据库文件的增删查改操作。


本文实现的服务器还采用了epoll的边缘触发模式,相比于水平触发减少了更多的epoll系统调用次数,在高并发的情况下效率更高。下面就ET(边缘触发)和LT(水平)触发进行介绍,并给出基于此实现的读写函数。

关于上图的详解:彻底学会使用epoll(一)——ET模式实现分析

因为在ET模式下,如果read一次没有读尽buffer中的数据,那么下次将得不到读就绪的通知,造成buffer中已有的数据无机会读出,除非有新的数据再次到达,write也同理。所以为了解决这一问题,就需要在读的时候尽力去读,写的时候尽力去写,即使用while循环去读写,直至出错或读写完毕。

使用while循环读写见: HTTP连接(读取请求+解析请求+生成响应+回送响应)
循环读:ssize_t HttpConn::read(int* errno_)
循环写:ssize_t HttpConn::write(int* errno_)

使用while循环去读写时,如果是阻塞IO就会因为 无数据可读/没空间可写 而一直阻塞在那里,所以采用非阻塞IO,一旦 无数据可读/没空间可写 就立刻返回-1,errno=EAGAIN(设置socket为非阻塞socke)。

bool WebServer::setFdNonblock(int fd)
{
    int flags;
    if((flags=fcntl(fd,F_GETFL,0))<0)    
        return false;
    flags |= O_NONBLOCK;
    if(fcntl(fd,F_SETFL,flags)<0)
        return false;
    return true;
}

因为ET模式下被唤醒(返回就绪)的条件为:

对于读取操作:

(1) 当buffer由不可读状态变为可读的时候,即由空变为不空的时候。

(2) 当有新数据到达时,即buffer中的待读内容变多的时候。

(3) 当buffer中有数据可读(即buffer不空)且用户对相应fd进行epoll_mod IN事件时。

对于写操作:

(1) 当buffer由不可写变为可写的时候,即由满状态变为不满状态的时候。

(2) 当有旧数据被发送走时,即buffer中待写的内容变少得时候。

(3) 当buffer中有可写空间(即buffer不满)且用户对相应fd进行epoll_mod OUT事件时

所以下面实现的逻辑为:

readFrom调用client->read(&readErrno)循环读取,直至读取完成/读取失败,读取完成也就是read返回0,代表服务器端read到了客户端发来的FIN请求,所以关闭连接;而读取失败又分为“暂时无数据可读”或”其他原因“,如果是其他原因则直接关闭连接,如果是"暂时无数据可读"(即没有新的HTTP请求发来)则进入process函数,判断读缓冲区内是否存在收集到的请求,有则解析请求+完成响应报文,然后更换监听的事件为EPOLLOUT,没有则继续监听EPOLLIN事件,继续读取客户端请求。

writeTo调用client->write(&writeErrno)循环写入,直至全部写完/写入失败,写入失败又分为”暂时无空间可写“或”其他原因“,如果是其他原因则直接关闭连接,如果是”暂时无空间可写“,则继续监听EPOLLOUT事件,继续写入响应。如果全部写完了,需要判断是否建立了HTTP长连接,如果建立了,那么就要尽量在一个TCP连接内完成多条HTTP报文的传送,所以进入process函数,判断读缓冲区内是否存在收集到的请求,有则继续监听EPOLLOUT,让本条请求的响应在下一轮epoll_wait中就绪;无则改为监听EPOLLIN,继续读取客户端请求;如果不建立HTTP长连接,则发送完毕HTTP响应后就应直接关闭连接。

void WebServer::readFrom(HttpConn* client)
{
    int readErrno=0;
    int ret=client->read(&readErrno);//client->raed()用while循环去尽力读
    if(ret<=0&&readErrno!=EAGAIN)//读出错,关闭连接
    {
        closeConn(client);
        return ;
    }
    process(client);
}

void WebServer::writeTo(HttpConn* client)
{
    int writeErrno=0;
    int ret=client->write(&writeErrno);//client->write()用while循环去尽力写
    if(client->isWriteOver())//数据已全部写完 
    {
        if(client->isKeepAlive()) //建立的是Http长连接
        {
            process(client);
            return;
        }
    }
    if(ret<0&&writeErrno==EAGAIN) //数据未读完,加入rdlist中下次继续读
    {
        epoller_p->modFd(client->getFd(), connEvent | EPOLLOUT);
        return ;
    }
    closeConn(client);
}

void WebServer::process(HttpConn* client)
{
    if(client->process()) //处理客户端请求成功
    {
        epoller_p->modFd(client->getFd(), connEvent | EPOLLOUT);//监听事件改为EPOLLOUT
    } 
    else 
    {
        epoller_p->modFd(client->getFd(), connEvent | EPOLLIN);//监听事件仍为EPOLLIN,继续读
    }
}

本文实现的Web服务器还设置了优雅关闭的选项:

struct linger optLinger;
    optLinger.l_onoff=1;
    optLinger.l_linger=1;
    if(setsockopt(listenFd,SOL_SOCKET,SO_LINGER,&optLinger,sizeof(optLinger))<0)
    {
        LOG_ERROR("%s","Set SO_LINGER Option Error!");
        return false;
    }

开启此套接字选项后,close()不会在调用后立刻返回,而是在延滞optLinger.l_linger 时间后才返回,在这一段时间内,close()函数并未关闭读写端,所以可以获取到客户端对于发往它的数据和FIN的ACK确认,故称优雅关闭。

当然,如果optLinger.l_linger设置不合理,在延滞时间内并未收到对端的确认,那么close返回-1,errno=EWOULDBLOCK,服务器端关闭,此后如果客户端的确认姗姗来迟,面对已经关闭的服务器端,只会收到RST,这是我们不想看到的。

所以,最好的办法其实是服务器端采用shutdown半关闭(关闭写端),这样读端read函数就会一直阻塞,可以读取到客户端发来的对于数据和FIN的确认,也可以在之后读取到客户端在处理完发来数据后调用close发出的FIN,服务器的读端read读到FIN后,直接返回,读取结束。

所以read的成功返回表明了:服务器端既获得了客户端对于发往它的数据和FIN的确认(TCP),又获得了客户端正确读取发来数据的确认(客户端的用户进程)。相比于close()+SO_LINGER,得到的消息更多。


此外,本文的Web服务器还设置了地址复用的选项:

int optReuseaddr=1;
    if(setsockopt(listenFd,SOL_SOCKET,SO_REUSEADDR,&optReuseaddr,sizeof(optReuseaddr))<0)
    {
        LOG_ERROR("%s","Set SO_REUSEADDR Option Error!");
        return false;
    }

它的功能是,允许启动一个监听服务器并捆绑其众所周知的端口,即使以前建立的将该端口用作它们的本地端口的连接仍然存在。

这种情况常见于:

先启动一个监听服务器,连接请求到达,派生一个子进程来处理该客户,之后监听服务器终止,但是子进程仍然继续为该连接上的客户提供服务,重启监听服务器时就会失败。

这里重启失败是失败在bind函数绑定端口时,它试图绑定一个已有连接上的端口,所以失败。

当然,本文实现的基于多线程的Web服务器,一旦主线程的监听服务器终止,派生的子线程也会随之终止,大家都关闭了,也就不存在上述的重启失败问题。


以上就是本文实现的Web服务器的重点内容,下面是全部代码:

WebServer类结构 webserver.h

#ifndef WEBSERVER_H
#define WEBSERVER_H
 
#include#include#include 
#include#include#include#include 
 
#include "heaptimer.h"
#include "epoller.h"
#include "../pool/threadpool.h"
#include "../pool/sqlconnpool.h"
#include "../http/httpconn.h"
#include "../log/log.h"
 
class  WebServer
{
public:
    WebServer();
    ~WebServer();
    bool loadConfigFile();
    void start();
 
private:
    void dealListen();
    void dealRead(HttpConn* client);
    void dealWrite(HttpConn* client);
    void closeConn(HttpConn* client);
 
    void flushTime(HttpConn* client);
    void addClient(int fd,const sockaddr_in& addr);
    void readFrom(HttpConn* client);
    void writeTo(HttpConn* client);
    void process(HttpConn* client);
 
    bool setFdNonblock(int fd);
    bool initSocket();
    
    int port;
    int timeOutMs;
    bool isClose;
    int listenFd;
    char* srcDir;
 
    uint32_t listenEvent;
    uint32_t connEvent;
    std::unique_ptrtimer_p;
    std::unique_ptrthreadPool_p;
    std::unique_ptrepoller_p;
    std::unordered_mapusers;
    const int maxUserNum = 65536;
};
 
#endif // !WEBSERVER_H

WebServer类实现 webserver.cpp

#include "webserver.h"
 
bool WebServer::loadConfigFile()//加载配置文件
{
    FILE* fp = fopen("./webserver.ini", "r");
    if(fp==nullptr)
    {
        return false;
    }    
    while(!feof(fp)) 
    {
        char line[1024] = {0};
        fgets(line, 1024, fp);
        string str = line;
        int idx = str.find('=', 0);
        if (idx == -1)
            continue;
        int endidx = str.find('\n', idx);
        string key = str.substr(0, idx);
        string value = str.substr(idx+1,endidx-idx-1);
        if (key == "port")    
            port = stoi(value);
        else if (key == "timeOutMs")
            timeOutMs = stoi(value);
        else if (key == "sqlConnMaxNum")
            SqlConnPool::getInstance(stoi(value));
        else if (key == "threadNum") 
            threadPool_p=std::unique_ptr(new ThreadPool(stoi(value)));
        else if (key == "logQueSize")
            Log::getInstance()->init(stoi(value));
    }
    fclose(fp);
    return true;
}
 
WebServer::WebServer():isClose(false),timer_p(new HeapTimer()),epoller_p(new Epoller())
{
    if (!loadConfigFile())//配置失败
    {
        isClose=true;
        LOG_ERROR("%s","Load Config File Fail!");
        return ;
    }
    srcDir=getcwd(nullptr,256);
    strncat(srcDir,"/resources/",15);
    HttpConn::userCount=0;
    HttpConn::srcDir=srcDir;
    listenEvent=EPOLLRDHUP|EPOLLIN|EPOLLET;
    connEvent=EPOLLONESHOT|EPOLLRDHUP|EPOLLET;
    if(!initSocket())
        isClose=true;
    if(isClose)
        LOG_ERROR("%s","========== Server init error!==========");
    else
        LOG_INFO("%s", "========== Server init success!========");
}
 
WebServer::~WebServer()
{
    close(listenFd);
    isClose=true;
    free(srcDir);
}
 
void WebServer::start() 
{
    int timeMs=-1;
    while(!isClose)
    {
        if(timeOutMs>0)
        {
            timeMs=timer_p->getNextTick();
        }
        int eventCnt=epoller_p->wait(timeMs);
        for(int i=0;igetEventFd(i);
            uint32_t events=epoller_p->getEvents(i);
            if(fd==listenFd)
            {
                dealListen();
            }
            else if(events&EPOLLIN)
            {
                dealRead(&users[fd]);
            }
            else if(events&EPOLLOUT)
            {
                dealWrite(&users[fd]);
            }
            else if(events&(EPOLLRDHUP|EPOLLHUP|EPOLLERR))
            {
                closeConn(&users[fd]);
            }
            else
            {
                LOG_ERROR("%s","Unexpected Event Happen!");
            }
        }
    }
    
}
 
void WebServer::dealListen()
{
    struct sockaddr_in addr;
    socklen_t len=sizeof(addr);
    while(true)
    {
        int connfd=accept(listenFd,(struct sockaddr*)&addr,&len);
        if(HttpConn::userCount>=maxUserNum)
        {
            close(connfd);
            LOG_ERROR("%s","Server Users Full!");
            return ;
        }
        if(connfd<=0)
        {
            return ;
        }
        else
        {
            addClient(connfd,addr);
        }
    }
}
 
 void WebServer::dealRead(HttpConn* client)
 {
    flushTime(client);
    threadPool_p->addTask(std::bind(&WebServer::readFrom,this,client));
 }
 
void WebServer::dealWrite(HttpConn* client)
{
    flushTime(client);
    threadPool_p->addTask(std::bind(&WebServer::writeTo,this,client));
}
 
void WebServer::closeConn(HttpConn* client)
{
    epoller_p->delFd(client->getFd());
    client->closeConn();
}
 
void WebServer::flushTime(HttpConn* client)
{
    if(timeOutMs>0)
    {
        timer_p->adjust(client->getFd(),timeOutMs);
    }
}
 
void WebServer::addClient(int connfd,const sockaddr_in& addr)
{
    users[connfd].init(connfd,addr);   
    if(timeOutMs>0)
    {
        timer_p->add(connfd,timeOutMs,std::bind(&WebServer::closeConn,this,&users[connfd]));
    }
    epoller_p->addFd(connfd,connEvent|EPOLLIN);
    setFdNonblock(connfd);
}
 
void WebServer::readFrom(HttpConn* client)
{
    int readErrno=0;
    int ret=client->read(&readErrno);
    if(ret<=0&&readErrno!=EAGAIN)
    {
        closeConn(client);
        return ;
    }
    process(client);
}
 
void WebServer::writeTo(HttpConn* client)
{
    int writeErrno=0;
    int ret=client->write(&writeErrno);
    if(client->isWriteOver()) 
    {
        if(client->isKeepAlive()) 
        {
            process(client);
            return;
        }
    }
    if(ret<0&&writeErrno==EAGAIN) 
    {
        epoller_p->modFd(client->getFd(), connEvent | EPOLLOUT);
        return ;
    }
    closeConn(client);
}
 
void WebServer::process(HttpConn* client)
{
    if(client->process()) 
    {
        epoller_p->modFd(client->getFd(), connEvent | EPOLLOUT);
    } 
    else 
    {
        epoller_p->modFd(client->getFd(), connEvent | EPOLLIN);
    }
}
 
bool WebServer::setFdNonblock(int fd)
{
    int flags;
    if((flags=fcntl(fd,F_GETFL,0))<0)    
        return false;
    flags |= O_NONBLOCK;
    if(fcntl(fd,F_SETFL,flags)<0)
        return false;
    return true;
}
 
bool WebServer::initSocket()
{
    if(port>65535||port<1024)
    {
        LOG_ERROR("Select Port:%d Error!",port);
        return false;
    }
    listenFd=socket(AF_INET,SOCK_STREAM,0);
    if(listenFd<0)
    {
        LOG_ERROR("%s","Create Socket Error!");
        return false;
    }
    struct linger optLinger;
    optLinger.l_onoff=1;
    optLinger.l_linger=1;
    if(setsockopt(listenFd,SOL_SOCKET,SO_LINGER,&optLinger,sizeof(optLinger))<0)
    {
        LOG_ERROR("%s","Set SO_LINGER Option Error!");
        return false;
    }
    int optReuseaddr=1;
    if(setsockopt(listenFd,SOL_SOCKET,SO_REUSEADDR,&optReuseaddr,sizeof(optReuseaddr))<0)
    {
        LOG_ERROR("%s","Set SO_REUSEADDR Option Error!");
        return false;
    }
    struct sockaddr_in addr;
    addr.sin_family=AF_INET;
    addr.sin_addr.s_addr=htonl(INADDR_ANY);
    addr.sin_port=htons(port);
    if(bind(listenFd,(struct sockaddr*)&addr,sizeof(addr))<0)
    {
        LOG_ERROR("%s","Bind Port:%d Error!",port);
        return false;
    }
    if(listen(listenFd,5)<0)
    {
        LOG_ERROR("%s","Listen Port:%d Error!",port);
        return false;
    }
    if(!epoller_p->addFd(listenFd,listenEvent))
    {
        LOG_ERROR("%s","Add ListenFd:%d in Epoll Error!",listenFd);
        return false;
    }
    if(!setFdNonblock(listenFd))
    {
        LOG_ERROR("%s","Set ListenFd:%d Nonblock Error!",listenFd);
        return false;
    }
    return true;
}

webserver.ini

# WEB服务器配置文件
#端口号
port=8888
#超时时间
timeOutMs=500
#大sql连接数
sqlConnMaxNum=1000
#线程数 
threadNum=8
#日志队列大容量
logQueSize=1024

本项目已在github开源,全部代码见:

1410138/MyWebServer: C++ Linux Web服务器 (github.com)

其余部分的介绍及部分代码见本专栏:

webServer_{(sunburst)}的博客-博客

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


新闻标题:Linux多线程Web服务器(C++实现)-创新互联
网址分享:http://wjwzjz.com/article/egish.html
在线咨询
服务热线
服务热线:028-86922220
TOP