`

Redis代码阅读3--Redis网络监听(2)

阅读更多

这篇文章接上一篇,主要介绍Redis网络监听流程的各个步骤。

  1. aeCreateEventLoop :创建用于循环监听的 eventLoop Redis 支持主流的三种事件触发机制: select ,epoll, kqueue, 可以通过在 config.h 里面配置 HAVE_EPOLL/ HAVE_KQUEUE 来根据不同的操作系统选择合适的机制:调用 ae_epoll.c/ae_select.c/ae_kqueue.c中的 aeApiCreate;创建 eventLoop 的时候没有指定 beforesleep ,在开始循环监听前将函数 beforeSleep 绑定到 eventLoop 上,该函数也放在后面介绍
    /* test for polling API */
    #ifdef __linux__
    #define HAVE_EPOLL 1
    #endif
    
    #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
    #define HAVE_KQUEUE 1
    #endif
    
  2. aeCreateTimeEvent/:创建定时事件,注册了定时时间函数 serverCron,作用放到后面介绍;
  3. aeCreateFileEvent:注册了一个读 I/O事件,绑定了函数 acceptTcpHandler(同样也放到后面介绍 ),如果多路复用采用epoll机制的话,这采用LT模式进行触发;

  4. 创建好server.ae后,通过aeMain这个方法开始网络监听;此处的代码是:

    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS);
        }
    }
     在每次处理事件之前,都要执行一遍server.ae中设定的beforesleep方法,下面就介绍下beforesleep;

  5. Beforesleep:顾名思义,这个方法在Redis每次进入sleep/wait去等待监听的端口发生I/O事件之前被调用(这话太拗口了。。。。),还是来看代码:

    void beforeSleep(struct aeEventLoop *eventLoop) {
        REDIS_NOTUSED(eventLoop);
        listNode *ln;
        redisClient *c;
    
        /* Awake clients that got all the swapped keys they requested */
        if (server.vm_enabled && listLength(server.io_ready_clients)) {
            listIter li;
    
            listRewind(server.io_ready_clients,&li);
            while((ln = listNext(&li))) {
                c = ln->value;
                struct redisCommand *cmd;
    
                /* Resume the client. */
                listDelNode(server.io_ready_clients,ln);
                c->flags &= (~REDIS_IO_WAIT);
                server.vm_blocked_clients--;
                aeCreateFileEvent(server.el, c->fd, AE_READABLE,
                    readQueryFromClient, c);
                cmd = lookupCommand(c->argv[0]->ptr);
                redisAssert(cmd != NULL);
                call(c,cmd);
                resetClient(c);
                /* There may be more data to process in the input buffer. */
                if (c->querybuf && sdslen(c->querybuf) > 0)
                    processInputBuffer(c);
            }
        }
    
        /* Try to process pending commands for clients that were just unblocked. */
        while (listLength(server.unblocked_clients)) {
            ln = listFirst(server.unblocked_clients);
            redisAssert(ln != NULL);
            c = ln->value;
            listDelNode(server.unblocked_clients,ln);
            c->flags &= ~REDIS_UNBLOCKED;
    
            /* Process remaining data in the input buffer. */
            if (c->querybuf && sdslen(c->querybuf) > 0)
                processInputBuffer(c);
        }
    
        /* Write the AOF buffer on disk */
        flushAppendOnlyFile();
    }
     这个方法做了三件事情:
    I.    如果Redi开启了Virtual memory,那么某些clients请求的keys可能因为被swap了,因此这些client会被block住,当这些clients请求的keys又被swap到内存中时,则这些被block住的clients应该unblock,然后被处理;io_ready_clients就是用来维护这些clients的,为了尽快响应client的请求,因此在每次sleep前都先处理这些请求
    II.    某些Redis操作是blocking的,如BLPOP,那么执行这些操作的clients可能会被block住,unblocked_clients这个list就是用来维护那些刚被unblock的clients,如果这个list不为空,则也要尽快响应这些clients
    III.    flushAppendOnlyFile;因为clients的Socket的write只能在eventLoop里面进行,而flushAppendOnlyFile又是在每次sleep之前被调用,所以在eventLoop里面的所有AOF writes都是先写到内存里的一块buffer里面,flushAppendOnlyFile则负责把这个buffer内容flush到disk;
  6. 执行完beforesleep后aeprocessEvents,该方法主要是处理各种监听到的文件读写事件和到期响应的定时事件,因为这个方法的代码比较长,而且逻辑简单,就不贴过来了,简单介绍下过程:                                      a)    首先通过遍历eventLoop中注册的timeEvent找出离当前最近timeEvent(即shortest)。
    b)    调用epoll_wait()方法,等待I/O事件的发生, 为了尽快响应时间事件,epoll_wait()方法的等待时间为shortest与当前时间的差值,如果该差值小于零,则epoll_wait()轮询至有I/O事件发生;
    c)    响应eventLoop中fired的aeFileEvent,这里调用的就是之前设置的文件处理函数acceptTcpHandler。
    d)    响应完I/O事件后,则通过timeEventHead遍历timeEvent,逐一响应timeProc--serverCron。在响应定时事件的时候 需要注意几点点:
    static int processTimeEvents(aeEventLoop *eventLoop) {
        int processed = 0;
        aeTimeEvent *te;
        long long maxId;
    
        te = eventLoop->timeEventHead;
        maxId = eventLoop->timeEventNextId-1;
    
    
    
    
        while(te) {
            long now_sec, now_ms;
            long long id;
    
            if (te->id > maxId) 
    
    
    
    {
                te = te->next;
                continue;
            }
            aeGetTime(&now_sec, &now_ms);
            if (now_sec > te->when_sec ||
                (now_sec == te->when_sec && now_ms >= te->when_ms))
            {
                int retval;
    
                id = te->id;
                retval = te->timeProc(eventLoop, id, te->clientData);
                processed++;
                /* After an event is processed our time event list may
                 * no longer be the same, so we restart from head.
                 * Still we make sure to don't process events registered
                 * by event handlers itself in order to don't loop forever.
                 * To do so we saved the max ID we want to handle.
                 *
                 * FUTURE OPTIMIZATIONS:
                 * Note that this is NOT great algorithmically. Redis uses
                 * a single time event so it's not a problem but the right
                 * way to do this is to add the new elements on head, and
                 * to flag deleted elements in a special way for later
                 * deletion (putting references to the nodes to delete into
                 * another linked list). */
                if (retval != AE_NOMORE) {
                    aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
                } else {
                    aeDeleteTimeEvent(eventLoop, id);
                }
    
    
    
    
                
    
    
    
    te = eventLoop->timeEventHead;
    
    
    
    
    
    
    
    
            } else {
                te = te->next;
            }
        }
        return processed;
    }
     
    •  因为响应一个定时事件后,eventLoop里面的定时事件链表可能会改变了,所以又要从头结点开始遍历定时事件链表;
    • 因为每次都要从头结点开始遍历定时事件链表,因此要考虑如何避免响应循环调用,即在响应定时事件a时,如果a的处理函数timeProc中又register了新的定时事件b,如果响应完事件a后,又响应b的话,那么就会造成循环响应。为了解决这个情况,redis在eventLoop里维护了一个timeEventNextId,即下一个定时事件的id,比如当前eventLoop的只有一个timeEvent  a,那么timeEventNextId=2,a->id = 1当a的timeProc方法又注册了timeEvent  b,那么timeEventNextId = 3,b->id = 2.那么在redis在遍历定时事件开始的时候将遍历前的eventLoop里面的maxId= timeEventNextId-1保存起来,在遍历定时事件的时候,如果某个timeEvent->id >maxId,则跳过这个事件。
    • 作者也意识到了每次都从头结点开始遍历定时事件不是一个好的算法,但是由于目前Redis里面只有一个定时事件,所以目前对redis来说不是个问题,但是作者也提到在未来的版本会对此进行改进   
  7. acceptTcpHandler:这个方法主要是监听网络端口:                                                                             i.    通过调用anetTcpAccept方法获得监听端口上的client connection;
    ii.    然后调用acceptCommonHandler创建redisClient对象,如果当前连接的client的数量大于配置的最大client数量,则拒绝当前连接,并返回” max number of clients reached”提示信息;
    iii.    调用createClient方法创建redisClient,同时注册新的fileEvent(AE_READABLE),并绑定处理函数为readQueryFromClient;
    redisClient *createClient(int fd) {
        redisClient *c = zmalloc(sizeof(redisClient));
        c->bufpos = 0;
    
        anetNonBlock(NULL,fd);
        anetTcpNoDelay(NULL,fd);
        if (!c) return NULL;
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
    
    
    
    
        {
            close(fd);
            zfree(c);
            return NULL;
        }
        .........
    }
      iv.    readQueryFromClient:从指定的Socket中读取client发送过来的数据,并按照Redis的协议(后面将单独介绍)进行解析组装成Redis的各个command,然后通过查找commandTable,执行command
  8.  _installWriteEvent:上面介绍的文件事件都是AE_READABLE事件,但Redis在执行完client请求后的命令后,向Client端return数据,就是往Socket写入数据,这使一个AE_ AE_WRITABLE事件。Redis执行完command后,调用addReply方法,然后在这个方法里面调用installWriteEvent来注册一个AE_WRITABLE事件,并绑定事件处理函数sendReplyToClient,用来把数据发送到client。
  9. serverCron: 介绍完fileEvent的处理函数后,最后我们来介绍timeEvent的处理函数。顾名思义,serverCron就是Redis Server的定时计划任务。这个方法比较复杂,处理的事情也很多,主要集中在记录Redis的运行情况(memory,clients等),AOF write, VM Swap和BGSAVE等和Redis正常运行息息相关的事项。这个方法的代码很多,因此单独介绍
1
0
分享到:
评论

相关推荐

    Redis集群下过期key监听的实现代码

    1. 前言 在使用redis集群时,发现...关于Redis集群配置代码此处不贴,直接贴配置监听类代码! redis.host1: 10.113.56.68 redis.port1: 7030 redis.host2: 10.113.56.68 redis.port2: 7031 redis.host3: 10.113.56.6

    逆向工程极速搭建SSM-Redis-activeMQ-Quartz整合项目

    一键生成webXml、initXml、contextXml等系统配置文件 配置控制生成输出文件(listenter、filter、quartz、activeMQ、redis、webXml、initXml、contextXml) 项目可统计在线人数,无操作超时退出,监听网络请求,...

    redis过期监听.docx

    php+redis 键值过期自动监听 可以用作定时任务 当redis键值过期的时候就会触发回调方法,然后执行自己的程序,比如30分钟订单自动取消

    docker-redis-cluster:一个 Redis 集群 Docker 镜像

    它将启动 6 个 Redis 服务器,监听7000~7005端口和一个主管以确保所有服务器都启动。 在所有服务器启动后,redis-trib 将创建一个 Redis 集群。 docker run -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -...

    redis主从模式与哨兵模式例子

    redis主从模式与哨兵模式例子 # 当前Sentinel服务运行的端口 port 26378 # 哨兵监听的主服务器 sentinel myid 09a71e001825114de399e0b6214c4b6f5449ab3a # 3秒内mymaster没响应,则认为mymaster宕机 sentinel deny...

    监听redis的key的变化

    监听redis的key的变化

    flycache:一致性哈希的redis分片存储 (Redis sharding by consistent-hashing)

    功能弹性伸缩,提供控制面板,管理员可以增加和删除Redis节点Redis运行状态监控,报警Redis故障或者网络故障的灾难应对原理consistent-hashing一致性hashzookeeper保持一致性和监听文章测试如果您愿意捐助一下项目,...

    redis-pubsub-websocket-server:基于websocket的redis发布订阅协议服务器

    需要先安装redis redis默认监听6379扩展2.安装依赖yarn install使用说明1.编译yarn build启动服务器yarn start服务路径/ websocket服务协议基于websocket 发布订阅的协议客户端发送发布“频道”:频道“消息”:消息...

    解决Spring session(redis存储方式)监听导致创建大量redisMessageListenerContailner-X线程问题

    主要介绍了解决Spring session(redis存储方式)监听导致创建大量redisMessageListenerContailner-X线程问题,需要的朋友可以参考下

    springboot2.7.9+Redis6.2.5实现订单超时处理+数据超时+订单超时监听

    核心技术:Redis开启过期监听 springboot整合redis,springboot配置redis监听。 案例中通过访问addOrder方法实现向mysql数据库添加一条数据并在redis中设置该条数据的有效期为10秒,10秒过后触发redis监听,在监听中...

    mac下redis安装、设置、启动停止方法详解

    需要下载release版本,下载地址: http://download.redis.io/releases/ 我这里下载的是: ...解压到/usr/local/redis目录中,然后...如上图,redis-server启动成功,并监听6379端口。 常用命令说明 redis-server redis

    监听redis过期key,做对应业务处理

    1.springboot集成redis,并监听redis过期key做相应的业务处理

    redis-rs:Redis库防锈

    修订版 Redis-rs是Rust的高级Redis库。 它通过非常灵活但底层的API,提供对所有Redis功能的便捷访问。 它使用可自定义的类型转换特征,以便任何操作都可以返回所需类型的结果。 这带来了非常愉快的开发经验。 该板条...

    Redis 配置文件1

    1. Redis默认不是以守护进程的方式运行,可以通过该配置项修改,使 3. 指定Redis监听端口,默认端口为6379,作者在自己的一篇博文中解 4. 绑定的

    spring boot+redis 监听过期Key的操作方法

    主要介绍了spring boot+redis 监听过期Key,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

    laravel-redis-socket:使用laravel,redis和socket.io在没有数据库的情况下向用户发送实时通知

    laravel-redis-socket 使用laravel,redis和socket.io在没有数据库的情况下向用户发送实时通知 首先,克隆后,您必须创建一个虚拟主机。 安装套件 npm install express ioredis socket.io --save composer require...

    Redis实现系统监控

    为了收集数据进行监控和分析,请构建一个能持续创建并维护计数器的工具,该工具可以任意添加和...2. 对上述数据进行统计分析,记录最小值、最大值、平均值、标准差、样本数量及利用其它统计分析方法得到的统计数据。

    redis架构分析.docx

    一、前言 因为近期项目中开始使用Redis,为了更好的理解Redis并应用在适合的业务场景,需要对Redis设计与实现深入的理解。...5.网络监听服务启动前的准备工作 6.开启事件监听,开始接受客户端的请求

    SpringBoot监听reids的key值过期

    实现redis的key值过期监听事件,且集成jedis实现redis的简单调用,内有功能备注

Global site tag (gtag.js) - Google Analytics