编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

Redis1.0源码阅读笔记二、服务端与客户端的通信过程

wxchong 2024-07-21 07:05:47 开源技术 33 ℃ 0 评论

在《Redis1.0源码阅读笔记一、总体流程》一文中,介绍了Redis-1.0服务器在启动初始化阶段、处理网络事件、处理定时事件的基本流程,并介绍了在这些流程中涉及到的数据结构。

本文将介绍Redis-1.0在处理客户端请求过程中的涉及的操作及数据结构等。

Redis-1.0处理客户端通信的入口在aeProcessEvents函数中,通过select获得可以操作的文件描述符数量,如果有可操作的socket,则遍历eventLoop中的fileEventHead链表。

retval = select(maxfd+1, &rfds, &wfds, &efds, tvp);
if (retval > 0) {
    fe = eventLoop->fileEventHead;
    while(fe != NULL) {
        int fd = (int) fe->fd;
       if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) ||
            (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) ||
            (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)))
        {
            int mask = 0;
            if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds))
                mask |= AE_READABLE;
            if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds))
                mask |= AE_WRITABLE;
            if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))
                mask |= AE_EXCEPTION;
            //调用每个节点的fileProc函数
            fe->fileProc(eventLoop, fe->fd, fe->clientData, mask);

            processed++;
            // 在事件处理后,事件链表可能发生变化,所以再重头遍历一次
            fe = eventLoop->fileEventHead;
            FD_CLR(fd, &rfds);  // FD_CLR(fd_set *fdset);用于在文件描述符集合中删除一个文件描述符。
            FD_CLR(fd, &wfds);
            FD_CLR(fd, &efds);
        } else {
            fe = fe->next;
        }
    }
}

该链表的每个节点fe,是结构体aeFileEvent:

/* File event structure */
typedef struct aeFileEvent {
    int fd;
    int mask; /* one of AE_(READABLE|WRITABLE|EXCEPTION) */
    aeFileProc *fileProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeFileEvent *next;
} aeFileEvent;

该节点的创建,由函数aeCreateFileEvent完成。可以看到,新创建的节点,是放入链表的头部。从而实现了O(1)的插入速度。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    aeFileEvent *fe;

    fe = zmalloc(sizeof(*fe));
    if (fe == NULL) return AE_ERR;
    fe->fd = fd;
    fe->mask = mask;
    fe->fileProc = proc;
    fe->finalizerProc = finalizerProc;
    fe->clientData = clientData;
    fe->next = eventLoop->fileEventHead;
    eventLoop->fileEventHead = fe;
    return AE_OK;
}

每个aeFileEvent的处理函数,是由fileProc保存的。通过全局搜索,调用该函数包含了以下五个地方(接收客户端连接、读取客户端请求、发送应答给客户端、发送消息给slave节点、同步数据给slave节点):

  • server监听socket,accept时。对应的fileProc为acceptHandler。
int main(int argc, char **argv) {
    /*balabala*/
    if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
        acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
    return 0;
}
  • 创建redisClient对象时。对应的fileProc为readQueryFromClient。
static redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(*c));
    /*balabala*/
    if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
        readQueryFromClient, c, NULL) == AE_ERR) {
        freeClient(c);
        return NULL;
    }
    return c;
}
  • 添加对客户端的响应时。对应的fileProc为sendReplyToClient。
static void addReply(redisClient *c, robj *obj) {
    if (listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c, NULL) == AE_ERR) return;
    if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
    incrRefCount(obj);
}
  • 发送消息给slave。对应的fileProc为sendReplyToClient。
static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *slave = privdata;
    /*balabala*/
    if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
        sendReplyToClient, slave, NULL) == AE_ERR) {
        freeClient(slave);
        return;
    }
}
  • 在调用bgsave时。对应的fileProc为sendBulkToSlave。
static void updateSlavesWaitingBgsave(int bgsaveerr) {
    listNode *ln;
    int startbgsave = 0;

    listRewind(server.slaves);
    while((ln = listYield(server.slaves))) {
        redisClient *slave = ln->value;
        aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL);
        
    }
}

本文主要介绍redis server如何接收客户端连接、如果响应客户端的请求,故对上述前三个fileProc(acceptHandler、readQueryFromClient、sendReplyToClient)做分析。涉及到的master-slave结构,在后期的文章做介绍。

acceptHandler

acceptHandler就是在客户端发起连接时,在函数anetAccept中调用accept完成连接;在函数createClient中,创建redisClient结构。

static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    char cip[128];
    redisClient *c;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);

    cfd = anetAccept(server.neterr, fd, cip, &cport);
    if (cfd == AE_ERR) {
        redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
        return;
    }
    redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
    if ((c = createClient(cfd)) == NULL) {
        redisLog(REDIS_WARNING,"Error allocating resoures for the client");
        close(cfd); /* May be already closed, just ingore errors */
        return;
    }
    /* If maxclient directive is set and this is one client more... close the
     * connection. Note that we create the client instead to check before
     * for this condition, since now the socket is already set in nonblocking
     * mode and we can send an error for free using the Kernel I/O */
    if (server.maxclients && listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";

        /* That's a best effort error message, don't check write errors */
        (void) write(c->fd,err,strlen(err));
        freeClient(c);
        return;
    }
    server.stat_numconnections++;
}

函数createClient的具体实现如下。做的工作主要是创建redisClient结构体,初始化连接的数据库(默认0号数据库),初始化结构体的各个字段, 向eventLoop的链表fileEventHead,加入新的节点。将新的client,加入全局变量server的clients双向链表

static redisClient *createClient(int fd) {
    redisClient *c = zmalloc(sizeof(*c));

    anetNonBlock(NULL,fd);
    anetTcpNoDelay(NULL,fd);
    if (!c) return NULL;
    selectDb(c,0); // 刚连接时,默认使用0号数据库
    c->fd = fd;
    c->querybuf = sdsempty();
    c->argc = 0;
    c->argv = NULL;
    c->bulklen = -1;
    c->sentlen = 0;
    c->flags = 0;
    c->lastinteraction = time(NULL);
    c->authenticated = 0;
    c->replstate = REDIS_REPL_NONE;
    if ((c->reply = listCreate()) == NULL) oom("listCreate");
    listSetFreeMethod(c->reply,decrRefCount);
    listSetDupMethod(c->reply,dupClientReplyValue);
    //创建aeFileEvent,并加入eventLoop的fileEventHead这个链表中
    if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
        readQueryFromClient, c, NULL) == AE_ERR) {
        freeClient(c);
        return NULL;
    }
    if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
    return c;
}

在函数listAddNodeTail中,我们可以看到,构成双向链表的工作,是通过listNode结构体完成, 结构体redisClient只是作为双向链表的,每个节点的Value。Redis管理客户端,是通过双向链表构成的。

typedef struct listNode {
    struct listNode *prev;
    struct listNode *next;
    void *value;
} listNode;

list *listAddNodeTail(list *list, void *value)
{
    listNode *node;

    if ((node = zmalloc(sizeof(*node))) == NULL)
        return NULL;
    node->value = value;
    if (list->len == 0) {
        list->head = list->tail = node;
        node->prev = node->next = NULL;
    } else {
        node->prev = list->tail;
        node->next = NULL;
        list->tail->next = node;
        list->tail = node;
    }
    list->len++;
    return list;
}

readQueryFromClient

对于客户端发送过来的请求,不管是普通的redis 客户端,还是slave节点,均有readQueryFromClient函数进行处理。首先调用read把数据从内核态拷贝至用户态。调用sdscatlen函数,为RedisClient的querybuf字段申请内存空间、把请求内容拷贝至querybuf。调用argv = sdssplitlen(query,sdslen(query)," ",1,&argc);,把请求的数据,按空格进行切割。得到每个参数后,调用createObject函数, 为每个请求的参数,构建结构为redisObject的变量。最后,是调用processCommand,处理每个命令请求。

static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = (redisClient*) privdata;
    char buf[REDIS_IOBUF_LEN];
    int nread;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    nread = read(fd, buf, REDIS_IOBUF_LEN); // 从内核态拷贝数据到用户态
    if (nread == -1) {
        if (errno == EAGAIN) {
            nread = 0;
        } else {
            redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        redisLog(REDIS_DEBUG, "Client closed connection");
        freeClient(c);
        return;
    }
    if (nread) {
        c->querybuf = sdscatlen(c->querybuf, buf, nread);
        c->lastinteraction = time(NULL);
    } else {
        return;
    }

again:
    if (c->bulklen == -1) {
        /* Read the first line of the query */
        char *p = strchr(c->querybuf,'\n');
        size_t querylen;

        if (p) {
            sds query, *argv;
            int argc, j;
            
            query = c->querybuf;
            c->querybuf = sdsempty();
            querylen = 1+(p-(query));
            if (sdslen(query) > querylen) {
                /* leave data after the first line of the query in the buffer */
                c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
            }
            *p = '\0'; /* remove "\n" */
            if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
            sdsupdatelen(query);

            /* Now we can split the query in arguments */
            if (sdslen(query) == 0) {
                /* Ignore empty query */
                sdsfree(query);
                return;
            }
            argv = sdssplitlen(query,sdslen(query)," ",1,&argc); // 按空格切割每个参数
            if (argv == NULL) oom("sdssplitlen");
            sdsfree(query);

            if (c->argv) zfree(c->argv);
            c->argv = zmalloc(sizeof(robj*)*argc);
            if (c->argv == NULL) oom("allocating arguments list for client");

            for (j = 0; j < argc; j++) {
                if (sdslen(argv[j])) {
                    c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
                    c->argc++;
                } else {
                    sdsfree(argv[j]);
                }
            }
            zfree(argv);
						// 处理请求
            if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again; 
            return;
        } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
            redisLog(REDIS_DEBUG, "Client protocol error");
            freeClient(c);
            return;
        }
    } 
}

processCommand是具体处理每个客户端请求的入口。第一步,如果RedisServer限制了可使用内存大小,则会尝试释放内存。然后调用lookupCommand查看具体的处理指令。Redis支持的所有客户端指令,在全局变量cmdTable中。接着调用相关的函数,如getCommand、setCommand。如果客户端的请求,需要返回信息,则在相应的命令函数中,调用addReply,创建一个可写的fileEvent。

static void addReply(redisClient *c, robj *obj) {
    if (listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c, NULL) == AE_ERR) return;
    if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
    incrRefCount(obj);
}

sendReplyToClient

sendReplyToClient的功能比较简单,调用write把数据从用户态拷贝到内核态,然后调用aeDeleteFileEvent,把该节点,从eventLoop的fileEventHead链表中删除。因为aeFileEvent是单向链表,删除链表节点只能从头到尾遍历。为啥aeFileEvent不采用双向链表呢?是出于节省内存的目的,放弃了速度?

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
    aeFileEvent *fe, *prev = NULL;

    fe = eventLoop->fileEventHead;
    while(fe) {
        if (fe->fd == fd && fe->mask == mask) {
            if (prev == NULL)
                eventLoop->fileEventHead = fe->next;
            else
                prev->next = fe->next;
            if (fe->finalizerProc)
                fe->finalizerProc(eventLoop, fe->clientData);
            zfree(fe);
            return;
        }
        prev = fe;
        fe = fe->next;
    }
}

做一个简单的小结:

  1. Redis管理客户端,是通过双向链表管理的。链表的每个节点,value值对应一个redisClient结构体,该结构体保存了每个客户端信息,包括fd、连接的数据库等信息
  2. Redis与客户端的网络IO操作,通过select系统调用。用单向链表aeFileEvent管理可读、可写等IO操作。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表