在《Redis1.0源码阅读笔记一、总体流程》一文中,介绍了Redis-1.0服务器在启动初始化阶段、处理网络事件、处理定时事件的基本流程,并介绍了在这些流程中涉及到的数据结构。
本文将介绍Redis-1.0在处理客户端请求过程中的涉及的操作及数据结构等。
Redis-1.0处理客户端通信的入口在aeProcessEvents函数中,通过select获得可以操作的文件描述符数量,如果有可操作的socket,则遍历eventLoop中的fileEventHead链表。
retval = select(maxfd+1, &rfds, &wfds, &efds, tvp);
if (retval > 0) {
fe = eventLoop->fileEventHead;
while(fe != NULL) {
int fd = (int) fe->fd;
if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) ||
(fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) ||
(fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)))
{
int mask = 0;
if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds))
mask |= AE_WRITABLE;
if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))
mask |= AE_EXCEPTION;
//调用每个节点的fileProc函数
fe->fileProc(eventLoop, fe->fd, fe->clientData, mask);
processed++;
// 在事件处理后,事件链表可能发生变化,所以再重头遍历一次
fe = eventLoop->fileEventHead;
FD_CLR(fd, &rfds); // FD_CLR(fd_set *fdset);用于在文件描述符集合中删除一个文件描述符。
FD_CLR(fd, &wfds);
FD_CLR(fd, &efds);
} else {
fe = fe->next;
}
}
}
该链表的每个节点fe,是结构体aeFileEvent:
/* File event structure */
typedef struct aeFileEvent {
int fd;
int mask; /* one of AE_(READABLE|WRITABLE|EXCEPTION) */
aeFileProc *fileProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeFileEvent *next;
} aeFileEvent;
该节点的创建,由函数aeCreateFileEvent完成。可以看到,新创建的节点,是放入链表的头部。从而实现了O(1)的插入速度。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
aeFileEvent *fe;
fe = zmalloc(sizeof(*fe));
if (fe == NULL) return AE_ERR;
fe->fd = fd;
fe->mask = mask;
fe->fileProc = proc;
fe->finalizerProc = finalizerProc;
fe->clientData = clientData;
fe->next = eventLoop->fileEventHead;
eventLoop->fileEventHead = fe;
return AE_OK;
}
每个aeFileEvent的处理函数,是由fileProc保存的。通过全局搜索,调用该函数包含了以下五个地方(接收客户端连接、读取客户端请求、发送应答给客户端、发送消息给slave节点、同步数据给slave节点):
- server监听socket,accept时。对应的fileProc为acceptHandler。
int main(int argc, char **argv) {
/*balabala*/
if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
return 0;
}
- 创建redisClient对象时。对应的fileProc为readQueryFromClient。
static redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(*c));
/*balabala*/
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c, NULL) == AE_ERR) {
freeClient(c);
return NULL;
}
return c;
}
- 添加对客户端的响应时。对应的fileProc为sendReplyToClient。
static void addReply(redisClient *c, robj *obj) {
if (listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c, NULL) == AE_ERR) return;
if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
incrRefCount(obj);
}
- 发送消息给slave。对应的fileProc为sendReplyToClient。
static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *slave = privdata;
/*balabala*/
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave, NULL) == AE_ERR) {
freeClient(slave);
return;
}
}
- 在调用bgsave时。对应的fileProc为sendBulkToSlave。
static void updateSlavesWaitingBgsave(int bgsaveerr) {
listNode *ln;
int startbgsave = 0;
listRewind(server.slaves);
while((ln = listYield(server.slaves))) {
redisClient *slave = ln->value;
aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL);
}
}
本文主要介绍redis server如何接收客户端连接、如果响应客户端的请求,故对上述前三个fileProc(acceptHandler、readQueryFromClient、sendReplyToClient)做分析。涉及到的master-slave结构,在后期的文章做介绍。
acceptHandler
acceptHandler就是在客户端发起连接时,在函数anetAccept中调用accept完成连接;在函数createClient中,创建redisClient结构。
static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[128];
redisClient *c;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
cfd = anetAccept(server.neterr, fd, cip, &cport);
if (cfd == AE_ERR) {
redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
if ((c = createClient(cfd)) == NULL) {
redisLog(REDIS_WARNING,"Error allocating resoures for the client");
close(cfd); /* May be already closed, just ingore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in nonblocking
* mode and we can send an error for free using the Kernel I/O */
if (server.maxclients && listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
(void) write(c->fd,err,strlen(err));
freeClient(c);
return;
}
server.stat_numconnections++;
}
函数createClient的具体实现如下。做的工作主要是创建redisClient结构体,初始化连接的数据库(默认0号数据库),初始化结构体的各个字段, 向eventLoop的链表fileEventHead,加入新的节点。将新的client,加入全局变量server的clients双向链表
static redisClient *createClient(int fd) {
redisClient *c = zmalloc(sizeof(*c));
anetNonBlock(NULL,fd);
anetTcpNoDelay(NULL,fd);
if (!c) return NULL;
selectDb(c,0); // 刚连接时,默认使用0号数据库
c->fd = fd;
c->querybuf = sdsempty();
c->argc = 0;
c->argv = NULL;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
c->lastinteraction = time(NULL);
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
if ((c->reply = listCreate()) == NULL) oom("listCreate");
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
//创建aeFileEvent,并加入eventLoop的fileEventHead这个链表中
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c, NULL) == AE_ERR) {
freeClient(c);
return NULL;
}
if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
return c;
}
在函数listAddNodeTail中,我们可以看到,构成双向链表的工作,是通过listNode结构体完成, 结构体redisClient只是作为双向链表的,每个节点的Value。Redis管理客户端,是通过双向链表构成的。
typedef struct listNode {
struct listNode *prev;
struct listNode *next;
void *value;
} listNode;
list *listAddNodeTail(list *list, void *value)
{
listNode *node;
if ((node = zmalloc(sizeof(*node))) == NULL)
return NULL;
node->value = value;
if (list->len == 0) {
list->head = list->tail = node;
node->prev = node->next = NULL;
} else {
node->prev = list->tail;
node->next = NULL;
list->tail->next = node;
list->tail = node;
}
list->len++;
return list;
}
readQueryFromClient
对于客户端发送过来的请求,不管是普通的redis 客户端,还是slave节点,均有readQueryFromClient函数进行处理。首先调用read把数据从内核态拷贝至用户态。调用sdscatlen函数,为RedisClient的querybuf字段申请内存空间、把请求内容拷贝至querybuf。调用argv = sdssplitlen(query,sdslen(query)," ",1,&argc);,把请求的数据,按空格进行切割。得到每个参数后,调用createObject函数, 为每个请求的参数,构建结构为redisObject的变量。最后,是调用processCommand,处理每个命令请求。
static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
char buf[REDIS_IOBUF_LEN];
int nread;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
nread = read(fd, buf, REDIS_IOBUF_LEN); // 从内核态拷贝数据到用户态
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
redisLog(REDIS_DEBUG, "Client closed connection");
freeClient(c);
return;
}
if (nread) {
c->querybuf = sdscatlen(c->querybuf, buf, nread);
c->lastinteraction = time(NULL);
} else {
return;
}
again:
if (c->bulklen == -1) {
/* Read the first line of the query */
char *p = strchr(c->querybuf,'\n');
size_t querylen;
if (p) {
sds query, *argv;
int argc, j;
query = c->querybuf;
c->querybuf = sdsempty();
querylen = 1+(p-(query));
if (sdslen(query) > querylen) {
/* leave data after the first line of the query in the buffer */
c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
}
*p = '\0'; /* remove "\n" */
if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
sdsupdatelen(query);
/* Now we can split the query in arguments */
if (sdslen(query) == 0) {
/* Ignore empty query */
sdsfree(query);
return;
}
argv = sdssplitlen(query,sdslen(query)," ",1,&argc); // 按空格切割每个参数
if (argv == NULL) oom("sdssplitlen");
sdsfree(query);
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*argc);
if (c->argv == NULL) oom("allocating arguments list for client");
for (j = 0; j < argc; j++) {
if (sdslen(argv[j])) {
c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
c->argc++;
} else {
sdsfree(argv[j]);
}
}
zfree(argv);
// 处理请求
if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
return;
} else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
redisLog(REDIS_DEBUG, "Client protocol error");
freeClient(c);
return;
}
}
}
processCommand是具体处理每个客户端请求的入口。第一步,如果RedisServer限制了可使用内存大小,则会尝试释放内存。然后调用lookupCommand查看具体的处理指令。Redis支持的所有客户端指令,在全局变量cmdTable中。接着调用相关的函数,如getCommand、setCommand。如果客户端的请求,需要返回信息,则在相应的命令函数中,调用addReply,创建一个可写的fileEvent。
static void addReply(redisClient *c, robj *obj) {
if (listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c, NULL) == AE_ERR) return;
if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
incrRefCount(obj);
}
sendReplyToClient
sendReplyToClient的功能比较简单,调用write把数据从用户态拷贝到内核态,然后调用aeDeleteFileEvent,把该节点,从eventLoop的fileEventHead链表中删除。因为aeFileEvent是单向链表,删除链表节点只能从头到尾遍历。为啥aeFileEvent不采用双向链表呢?是出于节省内存的目的,放弃了速度?
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{
aeFileEvent *fe, *prev = NULL;
fe = eventLoop->fileEventHead;
while(fe) {
if (fe->fd == fd && fe->mask == mask) {
if (prev == NULL)
eventLoop->fileEventHead = fe->next;
else
prev->next = fe->next;
if (fe->finalizerProc)
fe->finalizerProc(eventLoop, fe->clientData);
zfree(fe);
return;
}
prev = fe;
fe = fe->next;
}
}
做一个简单的小结:
- Redis管理客户端,是通过双向链表管理的。链表的每个节点,value值对应一个redisClient结构体,该结构体保存了每个客户端信息,包括fd、连接的数据库等信息
- Redis与客户端的网络IO操作,通过select系统调用。用单向链表aeFileEvent管理可读、可写等IO操作。
本文暂时没有评论,来添加一个吧(●'◡'●)