仿muduo库one thread one loop式高并发服务器-Buffer模块-日志宏-Socket模块-Channel模块

张开发
2026/7/1 8:18:47 15 分钟阅读
仿muduo库one thread one loop式高并发服务器-Buffer模块-日志宏-Socket模块-Channel模块
1.Buffer模块buffer是可以作为接收缓冲区用来缓存从socket中读取的数据也可以作为发送缓冲区来缓存上层处理好的数据直到socket可写后再写入socket。1.实现思想实现缓冲区得有一块空间我们不需要malloc申请一块空间而是使用stl给我们提供的容器这样就不需要我们来维护空间的大小了。我们采用vectorchar来实现缓冲区不能使用string因为string更多表示的是字符串操作遇到\0就会终止但我们再网络通信中什么数据都有可能会有其中就包括\0。记录buffer当前要从哪开始读取数据以及buffer当前要从哪开始写入数据还要buffer的大小假设read记录的是读取位置write记录的是写入位置。刚开始write和read都指向其实位置假如往buffer写入了100个字节的数据则write100。从buffer中读取50个字节的数据read50。每次写入数据之前都要判断buffer的空间是否足够判断是否要把数据往前覆盖因为当读取数据后read会往前移前面会有空闲空间或者是扩容。每次读取数据之前都要判断读取的数据大小是否合理可读数据大小是write-read如果想要读取的数据大小大于这个差值则说明不合理。2.实现buffer整体框架通过注释理解#define BUFFER_DEFAULT_SIZE 1024 class Buffer { private: std::vectorchar _buffer;//用vector来实现内存空间的管理 uint64_t _read_idx;//读偏移 uint64_t _wirte_idx;//写偏移 public: Buffer():_read_idx(0),_wirte_idx(0),_buffer(BUFFER_DEFAULT_SIZE){} //获取当前写入位置的地址 void* WritePosition(); //获取当前读取位置的地址 void* ReadPosition(); //获取末尾空间大小--写偏移之后的空闲空间 uint64_t TailIdleSize(); //获取头部空间大小--读偏移之前的空闲空间 uint64_t HeadIdleSize(); //获取可读数据大小 uint64_t ReadAbleSize(); //将读偏移向后移动 void MoveReadOffset(uint64_t len); //将写偏移向后移动 void MoveWriteOffset(uint64_t len); //确保可写空间足够整体空闲空间够了就移动数据否则就扩容 void EnsureWriteSpace(uint64_t len); //写入数据 void Write(void* data,uint64_t len); //读取数据 void Read(void* buf,uint64_t len); //清空缓冲区把读偏移和写偏移置为0 void Clear(); };完整代码#define BUFFER_DEFAULT_SIZE 1024 class Buffer { private: std::vectorchar _buffer;//用vector来实现内存空间的管理 uint64_t _read_idx;//读偏移 uint64_t _write_idx;//写偏移 public: Buffer():_read_idx(0),_wirte_idx(0),_buffer(BUFFER_DEFAULT_SIZE){} char* begin(){ return (*_buffer.begin()); } //获取当前写入位置的地址 char* WritePosition(){ return begin()_write_idx; } //获取当前读取位置的地址 char* ReadPosition(){ return begin()_read_idx; } //获取末尾空间大小--写偏移之后的空闲空间 uint64_t TailIdleSize(){ return _buffer.size()-_write_idx; } //获取头部空间大小--读偏移之前的空闲空间 uint64_t HeadIdleSize(){ return _read_idx; } //获取可读数据大小 uint64_t ReadAbleSize(){ return _write_idx-_read_idx; } //将读偏移向后移动 void MoveReadOffset(uint64_t len) { //向后移动的大小必须小于可读数据大小 要不然不合理 assert(lenReadAbleSize()); _read_idxlen; } //将写偏移向后移动 void MoveWriteOffset(uint64_t len) { assert(lenTailIdleSize()); _write_idxlen; } //确保可写空间足够整体空闲空间够了就移动数据否则就扩容 void EnsureWriteSpace(uint64_t len) { //末尾空间大小大于等于len 说明后面的空间足够这次的写入 if(TailIdleSize()len) return; //末尾空间大小不够则需要判读整体空闲空间大小是否足够如果足够则需要挪动数据 if(TailIdleSize()HeadIdleSize()len) { //将数据挪动到偏移量为0处 uint64_t szReadAbleSize(); //拷贝到起始位置 std::copy(ReadPosition(),ReadPosition()sz,begin()); _read_idx0; _write_idxsz; } else { //总体空间不够直接扩容 _buffer.resize(_write_idxlen); } } //往buffer中写入数据 void Write(const void* data,uint64_t len) { //1.确保有足够的空间 EnsureWriteSpace(len); //2.写入数据 const char* d(const char*)data; std::copy(d,dlen,WritePosition()); } void WriteAndPush(const void* data,uint64_t len) { Write(data,len); MoveWriteOffset(len); } void WriteString(const std::string data) { return Write(data.c_str(),data.size()); } void WriteStringAndPush(const std::string data) { WriteString(data); MoveWriteOffset(data.size()); } void WriteBuffer(Buffer data) { return Write(data.ReadPosition(),data.ReadAbleSize()); } void WriteBufferAndPush(Buffer data) { WriteBuffer(data); MoveWriteOffset(data.ReadAbleSize()); } //读取buffer中的数据 void Read(void* buf,uint64_t len) { assert(lenReadAbleSize()); std::copy(ReadPosition(),ReadPosition()len,(char*)buf); } void ReadAndPop(void* buf,uint64_t len) { Read(buf,len); MoveReadOffset(len); } std::string ReadAsString(uint64_t len) { assert(lenReadAbleSize()); std::string str; str.resize(len); Read(str[0],len); return str; } std::string ReadAsStringAndPop(void* buf,uint64_t len) { std::string strReadAsString(len); MoveReadOffset(len); return str; } char* FindCRLF() { //找换行字符 char* ret(char*)memchr(ReadPosition(),\n,ReadAbleSize()); return ret; } std::string GetLine() { //先找\n char* posFindCRLF(); if(posnullptr) return ; else return ReadAsString(pos-ReadPosition()1);//1是为了把换行字符也取出来 } std::string GetLineAndPop() { std::string strGetLine(); MoveReadOffset(str.size()); return str; } //清空缓冲区把读偏移和写偏移置为0 void Clear(){ _write_idx_read_idx0; } };测试1#includeserver.hpp int main() { Buffer buf; //写入buffer std::string strhello!!; buf.WriteStringAndPush(str); Buffer buf1; buf1.WriteBufferAndPush(buf); //把buffer中的数据读出 std::string tmpbuf.ReadAsStringAndPop(buf.ReadAbleSize()); std::coutbuf1.ReadAbleSize()std::endl; std::couttmpstd::endl; std::coutbuf.ReadAbleSize()std::endl; return 0; }测试2测试buffer的扩容#includeserver.hpp int main() { Buffer buf; for(int i0;i300;i) { std::string strhello!!std::to_string(i)\n; buf.WriteStringAndPush(str); } std::string tmpbuf.ReadAsStringAndPop(buf.ReadAbleSize()); std::couttmpstd::endl; return 0; }测试3测试获取一行数据#includeserver.hpp int main() { Buffer buf; for(int i0;i300;i) { std::string strhello!!std::to_string(i)\n; buf.WriteStringAndPush(str); } while(buf.ReadAbleSize()0) { std::string tmpbuf.GetLineAndPop(); std::couttmpstd::endl; } return 0; }三个测试都符合预期2.日志宏日志在任何一个项目中都是比较重要的当程序运行过程中出错了日志可以帮我显示是哪个文件的哪一行出错了可以大大的提升我们的调试效率。日志分为多个等级比如当我们调试时就把日志设置为调试等级程序就只会打印那些调试信息当我们不需要调试了就可以提高日志等级不需要再手动删除调试信息。#define INF 1 #define DBG 2 #define ERR 3 //日志等级 #define LOG_LEVEL DBG #define LOG(level,format, ...) do{\ if(levelLOG_LEVEL) break;\ time_t ttime(NULL);\ struct tm* ltmlocaltime(t);\ char tmp[32]{0};\ strftime(tmp,31,%H:%M:%S,ltm);\ fprintf(stdout,[%s: %s :%d] format \n,tmp,__FILE__,__LINE__, ##__VA_ARGS__);\ }while(0) #define INF_LOG(format,...) LOG(INF,format,##__VA_ARGS__) #define DBG_LOG(format,...) LOG(DBG,format,##__VA_ARGS__) #define ERR_LOG(format,...) LOG(ERR,format,##__VA_ARGS__)##__VA_ARGS__是对不定参数的使用日志效果3.Socket模块Socket模块就是对套接字的操作进行封装便于进行套接字的各种操作。Socket模块比较简单了解一下整体框架就可以都是一些固定套路的代码。#define MAX_LISTEN 1024 class Socket { private: int _sockfd; public: Socket(); Socket(int fd); ~Socket(); //创建套接字 bool Create(); //绑定地址 bool Bind(const std::string ip,uint16_t port); //开始监听 bool Listen(int backlogMAX_LISTEN); //向服务器发起连接 bool Connect(const std::string ip,uint16_t port); //获取新连接 int Accept(); //接收数据 ssize_t Recv(void* buf,size_t len,bool flag); //发送数据 ssize_t Send(void* buf,size_t len,bool flag); //关闭套接字 void Close(); //创建服务端连接 bool CreateServer(uint16_t port,const std::string ip0.0.0.0); //创建客户端连接 bool CreateClient(uint16_t port,const std::string ip); //开启地址重用 void ReuseAddress(); //设置套接字为非阻塞 void NonBlock(); };要注意的就是要把套接字的属性设置为非阻塞否则套接字没有数据recv就会一直阻塞。listen的backlog参数的含义listen函数的backlog参数用于指定服务器端已完成三次握手但尚未被accept取走的连接队列的最大长度。半连接的个数4.Channel模块channel里要保存该描述符监控的事件以及已经触发了什么事件还保存了多个回调函数由别的模块设置进来的。class Channel { private: int _fd; uint32_t _events;//监控了哪些事件 uint32_t _revents;//触发了哪些事件 using EventCallbackstd::functionvoid(); EventCallback _write_callback;//可写事件回调 EventCallback _read_callback;//可读事件回调 EventCallback _close_callback;//连接断开事件回调 EventCallback _error_callback;//错误事件回调 EventCallback _event_callback;//任意事件回调 public: Channel(int fd):_fd(fd),_events(0),_revents(0){} int Fd(){ return _fd; } //由connction模块设置的回调 void SetReadCallback(const EventCallback cb){ _read_callbackcb; } void SetCloseCallback(const EventCallback cb){ _close_callbackcb; } void SetWriteCallback(const EventCallback cb){ _read_callbackcb; } void SetErrorCallback(const EventCallback cb){ _error_callbackcb; } void SetEventCallback(const EventCallback cb){ _event_callbackcb; } void SetRevents(uint32_t events){ _reventsevents; } bool ReadAble(){ return (_eventsEPOLLIN); } //是否监控了可读 bool WriteAble(){ return (_eventsEPOLLOUT); } //是否监控了可写 //启动可读事件监控 void EnableRead() { //启动读事件监控 (_events|EPOLLIN); //挂到epoll上 //EventLoop模块还没实现 没法写 } //启动可写事件监控 void EnableWrite() { //启动读事件监控 (_events|EPOLLOUT); //挂到epoll上 //EventLoop模块还没实现 没法写 } //关闭读事件监控 void DisableRead() { //移除读事件监控 (_events~EPOLLIN); //从epoll上移除 } //关闭写事件监控 void DisableWrite() { (_events~EPOLLOUT); //从epoll上移除 } //关闭所有事件监控 void DisableAll() { _events0; } //移除监控从epoll的红黑树上移除 void Remove() { //调用EventLoop的接口 } //根据revents里触发了什么事件来调用对应的回调函数 void HandleEvent() { if((_reventsEPOLLIN)||(_reventsEPOLLRDHUP)||(_reventsEPOLLPRI)) { if(_read_callback) _read_callback(); if(_event_callback) _event_callback(); } //可能会导致连接关闭的回调函数一次只处理一个 if((_reventsEPOLLOUT)) { if(_write_callback) _write_callback(); if(_event_callback) _event_callback(); } else if((_reventsEPOLLERR)) { if(_event_callback) _event_callback(); if(_error_callback) _error_callback(); } else if((_reventsEPOLLHUP)) { if(_event_callback) _event_callback(); if(_close_callback) _close_callback(); } } };

更多文章