Redis源码探究系列—Redis Reactor 模型源码实现解析(下)

张开发
2026/6/8 2:43:33 15 分钟阅读
Redis源码探究系列—Redis Reactor 模型源码实现解析(下)
欢迎各位同学关注我哦~在这个 AI 喧嚣的时代不忘初心戒骄戒躁认真沉淀在《Redis Reactor 模型源码实现解析上》中 我们分析了Redis Reactor模型的核心数据结构、事件循环主流程和事件注册机制。本文将继续深入分析Redis中的实际应用、性能优化技巧、AE_BARRIER深度解析、与典型Reactor对比等内容。七、Redis中的实际应用7.1 连接接受acceptTcpHandler// src/networking.c:726voidacceptTcpHandler(aeEventLoop*el,intfd,void*privdata,intmask){intcport,cfd,maxMAX_ACCEPTS_PER_CALL;charcip[NET_IP_STR_LEN];while(max--){cfdanetTcpAccept(server.neterr,fd,cip,sizeof(cip),cport);if(cfdANET_ERR){if(errno!EWOULDBLOCK)serverLog(LL_WARNING,Accepting client connection: %s,server.neterr);return;}serverLog(LL_VERBOSE,Accepted %s:%d,cip,cport);acceptCommonHandler(cfd,0,cip);}}一次循环最多接受MAX_ACCEPTS_PER_CALL个连接提高吞吐。7.2 请求读取readQueryFromClient// src/networking.c:1500voidreadQueryFromClient(aeEventLoop*el,intfd,void*privdata,intmask){client*c(client*)privdata;intnread,readlen;size_tqblen;readlenPROTO_IOBUF_LEN;// 大参数优化读取长度if(c-reqtypePROTO_REQ_MULTIBULKc-multibulklenc-bulklen!-1c-bulklenPROTO_MBULK_BIG_ARG){ssize_tremaining(size_t)(c-bulklen2)-sdslen(c-querybuf);if(remaining0remainingreadlen)readlenremaining;}qblensdslen(c-querybuf);c-querybufsdsMakeRoomFor(c-querybuf,readlen);nreadread(fd,c-querybufqblen,readlen);if(nread-1){if(errnoEAGAIN)return;freeClient(c);return;}elseif(nread0){freeClient(c);return;}sdsIncrLen(c-querybuf,nread);c-lastinteractionserver.unixtime;// 处理输入缓冲区processInputBufferAndReplicate(c);}7.3 响应发送sendReplyToClient// src/networking.c:1054voidsendReplyToClient(aeEventLoop*el,intfd,void*privdata,intmask){writeToClient(fd,privdata,1);}7.4 定时任务serverCron// src/server.c:1089intserverCron(structaeEventLoop*eventLoop,longlongid,void*clientData){// 更新时间缓存updateCachedTime();// 动态调整执行频率server.hzserver.config_hz;if(server.dynamic_hz){while(listLength(server.clients)/server.hzMAX_CLIENTS_PER_CLOCK_TICK){server.hz*2;if(server.hzCONFIG_MAX_HZ){server.hzCONFIG_MAX_HZ;break;}}}// 统计信息更新run_with_period(100){trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);trackInstantaneousMetric(STATS_METRIC_NET_INPUT,server.stat_net_input_bytes);trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,server.stat_net_output_bytes);}// LRU 时钟更新unsignedlonglruclockgetLRUClock();atomicSet(server.lruclock,lruclock);// 峰值内存记录if(zmalloc_used_memory()server.stat_peak_memory)server.stat_peak_memoryzmalloc_used_memory();// ... 更多定时任务}serverCron是 Redis 唯一的时间事件默认每 100ms 执行一次那么它到底做了哪些事情呢总体捋一下更新统计信息清理过期key数据库rehashAOF/RDB持久化触发主从同步检查7.5 beforeSleep钩子// src/server.c:1358voidbeforeSleep(structaeEventLoop*eventLoop){// Cluster状态检查if(server.cluster_enabled)clusterBeforeSleep();// 快速过期检查if(server.active_expire_enabledserver.masterhostNULL)activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);// 处理阻塞客户端if(listLength(server.clients_waiting_acks))processClientsWaitingReplicas();moduleHandleBlockedClients();if(listLength(server.unblocked_clients))processUnblockedClients();// AOF刷盘flushAppendOnlyFile(0);// 发送待写响应handleClientsWithPendingWrites();// 释放GIL给模块线程if(moduleCount())moduleReleaseGIL();}这个钩子方法会在每轮事件循环前执行主要做了以下三件事情快速过期key清理AOF缓冲区刷盘发送客户端响应八、为什么时间事件用链表呢aeSearchNearestTimer是O(n)的// src/ae.c:254staticaeTimeEvent*aeSearchNearestTimer(aeEventLoop*eventLoop){aeTimeEvent*teeventLoop-timeEventHead;aeTimeEvent*nearestNULL;while(te){if(!nearest||te-when_secnearest-when_sec||(te-when_secnearest-when_secte-when_msnearest-when_ms))nearestte;tete-next;}returnnearest;}作者在注释里说可以用跳表优化到 O(1)但由于我们当前分析的这个Redis的版本Redis 5.0只有serverCron一个时间事件链表完全够用。这又是一个够用就好设计。九、性能优化技巧9.1 写事件延迟注册Redis不立即注册写事件而是在handleClientsWithPendingWrites中尝试同步写// src/networking.c:1064inthandleClientsWithPendingWrites(void){listIter li;listNode*ln;listRewind(server.clients_pending_write,li);while((lnlistNext(li))){client*clistNodeValue(ln);// 先尝试同步写if(writeToClient(c-fd,c,0)C_ERR)continue;// 写不完才注册写事件if(clientHasPendingReplies(c)){intae_flagsAE_WRITABLE;if(server.aof_stateAOF_ONserver.aof_fsyncAOF_FSYNC_ALWAYS){ae_flags|AE_BARRIER;}aeCreateFileEvent(server.el,c-fd,ae_flags,sendReplyToClient,c);}}}如果采用传统的做法来有数据要写-注册写事件-写事件触发-发送响应那么每次有数据要写都要调用一次aeCreateFileEvent而且写事件触发后还要调用一次aeDeleteFileEvent来取消注册。对于高并发场景这些系统调用的开销是非常大的。通过先尝试同步写如果能一次性写完就不注册写事件只有写不完才注册写事件这样就大大减少了系统调用的次数提高了性能。9.2 批量接受连接acceptTcpHandler一次最多接受MAX_ACCEPTS_PER_CALL个连接而不是来一个接受一个。9.3 动态调整cron频率if(server.dynamic_hz){while(listLength(server.clients)/server.hzMAX_CLIENTS_PER_CLOCK_TICK){server.hz*2;if(server.hzCONFIG_MAX_HZ){server.hzCONFIG_MAX_HZ;break;}}}客户端越多cron执行越频繁保证每个客户端在每个时钟周期都能被处理到。十、完整流程图Redis 启动 │ ▼ ┌─────────────────────┐ │ aeCreateEventLoop │ 创建事件循环 └─────────────────────┘ │ ▼ ┌─────────────────────┐ │ 注册监听socket │ aeCreateFileEvent(ipfd, AE_READABLE, acceptTcpHandler) └─────────────────────┘ │ ▼ ┌─────────────────────┐ │ 注册serverCron │ aeCreateTimeEvent(1ms, serverCron) └─────────────────────┘ │ ▼ ┌─────────────────────┐ │ aeMain() │ 进入主循环 └─────────────────────┘ │ ┌─────────────┴─────────────┐ │ │ ▼ │ ┌─────────────┐ │ │ beforeSleep │ ◄──────────────────┤ 每轮循环开始 └─────────────┘ │ │ │ ▼ │ ┌─────────────────────────────────────────────────────────┐ │ aeProcessEvents │ │ ┌─────────────────────────────────────────────────┐ │ │ │ 1. 计算阻塞超时找最近时间事件 │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ 2. aeApiPoll() ← epoll_wait / kqueue / select │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ 3. 处理文件事件 │ │ │ │ ├─ acceptTcpHandler新连接 │ │ │ │ ├─ readQueryFromClient读请求 │ │ │ │ └─ sendReplyToClient写响应 │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ 4. processTimeEvents() │ │ │ │ └─ serverCron定时任务 │ │ │ └─────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────┘ │ ▼ 停止? ──否──► 继续循环 │ 是 │ ▼ Redis 退出十一、与典型Reactor模型的对比11.1 典型Reactor模型┌───────────────────────────────────────────────────────┐ │ Reactor │ │ ┌─────────────────────────────────────────────────┐ │ │ │ Event Demultiplexer │ │ │ │ (epoll / kqueue / select) │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────┼───────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Handler A │ │ Handler B │ │ Handler C │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ └───────────────────────────────────────────────────────┘典型Reactor的特点事件分离器Demultiplexer负责监听事件事件处理器Handler负责业务逻辑支持多线程扩展11.2 Redis Reactor简化模型┌───────────────────────────────────────────────────────┐ │ aeEventLoop │ │ ┌─────────────────────────────────────────────────┐ │ │ │ aeApiPoll (epoll) │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────┼───────────────┐ │ │ ▼ ▼ ▼ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │acceptHandler│ │ readHandler │ │writeHandler │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ └───────────────┴───────────────┘ │ │ │ │ │ ▼ │ │ commandProc命令处理 │ └───────────────────────────────────────────────────────┘Redis的简化单线程执行所有回调在同一个线程执行无连接池每个连接对应一个client结构无独立Handler回调直接调用命令处理函数十二、为什么不用libevent / libev有兴趣的朋友可以看一下这篇文章的最后附有作者对于这个问题的解释Redis事件循环模型全景解析ae.c十三、AE_BARRIER的深度解析13.1 为什么需要AE_BARRIER正常情况下事件处理顺序是先读后写客户端发送请求 → 触发读事件 → 执行命令 → 生成响应 ↓ 触发写事件 → 发送响应这个顺序对大多数场景是最优的收到请求立即处理并回复。13.2 特殊场景AOF fsyncalways当配置appendfsync always时收到请求 → 执行命令 → 写入 AOF 缓冲区 ↓ 需要先 fsync ↓ 才能发送响应问题来了 fsync 在beforeSleep中执行如果先处理读事件可能读到新请求新请求处理完要发响应但上个请求还没fsync导致响应发送时数据还没落盘。这个该怎么办呢13.3 解决方案// src/networking.c:1064if(server.aof_stateAOF_ONserver.aof_fsyncAOF_FSYNC_ALWAYS){ae_flags|AE_BARRIER;// 标记需要反转}设置AE_BARRIER后处理顺序变成先写后读beforeSleep: ├─ flushAppendOnlyFile() // 先fsync └─ handleClientsWithPendingWrites() // 发送响应 aeProcessEvents: ├─ 先处理写事件发送响应 └─ 再处理读事件读新请求这样保证响应发送时数据已经落盘。十四、事件循环的一些边界情况14.1 fd耗尽// src/ae.c:136if(fdeventLoop-setsize){errnoERANGE;returnAE_ERR;}setsize在启动时设置// src/server.cserver.elaeCreateEventLoop(server.maxclientsCONFIG_FDSET_INCR);如果连接数超过maxclientsaccept会失败。14.2 时间事件溢出// src/ae.c:208longlongideventLoop-timeEventNextId;id是long long实际不会溢出。但如果系统时间跳变// src/ae.c:284if(noweventLoop-lastTime){teeventLoop-timeEventHead;while(te){te-when_sec0;// 立即触发所有时间事件tete-next;}}14.3 epoll返回大量事件// src/ae_epoll.c:112retvalepoll_wait(state-epfd,state-events,eventLoop-setsize,timeout);返回的事件数量受setsize限制。如果触发事件太多events数组大小固定只处理前setsize个事件剩余事件下轮处理十五、最后总结通过两篇文章的分析我们可以领会到Redis Reactor模型的设计精髓如下设计点实现效果文件事件存储数组下标即fdO(1) 访问时间事件存储双向链表简单够用多路复用统一接口抽象跨平台兼容阻塞超时最近时间事件计算精确控制事件处理顺序先读后写支持反转灵活应对不同场景时钟跳变强制立即触发避免任务延迟回调注册函数指针 clientData灵活扩展整理源码文件索引文件功能src/ae.h事件循环数据结构定义src/ae.c事件循环核心实现src/ae_epoll.cLinux epoll封装src/ae_kqueue.cBSD/macOS kqueue封装src/ae_select.c通用 select封装src/ae_evport.cSolaris event port封装src/server.cbeforeSleep、serverCronsrc/networking.cacceptTcpHandler、readQueryFromClient、sendReplyToClient欢迎各位同学关注我哦~在这个 AI 喧嚣的时代不忘初心戒骄戒躁认真沉淀

更多文章