redis概述

redis作为一个高性能的kv数据库,基于c语言实现,数据主要基于内存存放,所以查询效率较高,并且支持较多的数据类型有字符串、字典、列表与集合等数据结构。另外redis叶支持持久化的日志,支持集群等特性,这些内容并不在本文的概述中,本文只是概述早期版本的redis的基本实现思路。

redis的架构概述

早期版本的redis,该版本的协议还是telnet协议,支持的数据类型只有string、dict与set,不过该版本支持db的定时load到文件中,定时保存数据时间间隔为1000毫秒,redis的选用的模型是单线程select事件驱动框架;

伪代码执行流程:server初始化过程
1.监听端口
2.事件框架的初始化
3.添加事件监听驱动while (1) {select() // 监听是否有读写请求1.如果有事件发生调用回调函数处理2.如果时间时间到了调用时间注册的回调处理函数
}

以上就是整个业务的处理流程,现在的事件驱动模型可以选用select,epoll等不同操作系统提供的事件驱动的系统调用。

redis启动流程分析

main函数启动

启动函数位于redis.c中的main函数

int main(int argc, char **argv) {initServerConfig();                                             // 读取配置信息initServer();                                                   // 初始化serverif (argc == 2) {ResetServerSaveParams();                                    // 读取传入的配置信息并更新默认值loadServerConfig(argv[1]);redisLog(REDIS_NOTICE,"Configuration loaded");} else if (argc > 2) {                                          // 如果超过两两个输入值则报错fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");exit(1);}redisLog(REDIS_NOTICE,"Server started");                        // 打印日志if (loadDb("dump.rdb") == REDIS_OK)                             // 检查是否有加载dump内容redisLog(REDIS_NOTICE,"DB loaded from disk");if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");    // 创建服务端监听事件的监听redisLog(REDIS_NOTICE,"The server is now ready to accept connections");aeMain(server.el);                                                       // 启动事件循环aeDeleteEventLoop(server.el);                                            // 移除监听事件return 0;
}

main函数的主要工作就是初始化并加载配置文件,检查是否需要加载保存的rdb,创建监听对象,进入事件循环,这些步骤就是标准的事件驱动框架的执行流程。

initServer函数初始化

该函数主要工作就是初始化相关的内容监听,并初始化事件监听。

static void initServer() {int j;signal(SIGHUP, SIG_IGN);                    // 防止出现僵死进程signal(SIGPIPE, SIG_IGN);server.clients = listCreate();              // 创建一个列表用于保存clients 就是一个链表server.objfreelist = listCreate();createSharedObjects();server.el = aeCreateEventLoop();            // 创建事件循环server.dict = malloc(sizeof(dict*)*server.dbnum);       // 生成一个字典保存不同数据库中存入的值if (!server.dict || !server.clients || !server.el || !server.objfreelist)oom("server initialization"); /* Fatal OOM */                   // 如果三个中任意一个未生成成功则内存报错server.fd = anetTcpServer(server.neterr, server.port, NULL);        // 建立服务端监听请求if (server.fd == -1) {                                              // 如果创建监听失败则报错redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);exit(1);}for (j = 0; j < server.dbnum; j++) {                                // 根据配置的数据库db生成对应的保存的数据结构server.dict[j] = dictCreate(&sdsDictType,NULL);if (!server.dict[j])oom("server initialization"); /* Fatal OOM */}server.cronloops = 0;server.bgsaveinprogress = 0;server.lastsave = time(NULL);server.dirty = 0;aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);         // 创建事件监听
}

aeCreateEventLoop函数主要就是创建一个事件驱动loop,用于管理事件驱动里面包括了驱动事件与时间事件。

/* File event structure */
typedef struct aeFileEvent {int fd;                                                  // 文件描述符int mask; /* one of AE_(READABLE|WRITABLE|EXCEPTION) */  // 事件标志aeFileProc *fileProc;                       // 事件回调函数aeEventFinalizerProc *finalizerProc;        // 事件处理完成后回调函数void *clientData;                           // 数据struct aeFileEvent *next;                   // 下一个事件
} aeFileEvent;/* Time event structure */
typedef struct aeTimeEvent {long long id; /* time event identifier. */  // 事件idlong when_sec; /* seconds */                // 时间秒long when_ms; /* milliseconds */            // 时间毫秒aeTimeProc *timeProc;                       // 时间回调函数aeEventFinalizerProc *finalizerProc;void *clientData;struct aeTimeEvent *next;                   // 下一个回调事件
} aeTimeEvent;/* State of an event based program */
typedef struct aeEventLoop {long long timeEventNextId;aeFileEvent *fileEventHead;         // 事件列表aeTimeEvent *timeEventHead;         // 时间列表int stop;
} aeEventLoop;aeEventLoop *aeCreateEventLoop(void) {aeEventLoop *eventLoop;eventLoop = malloc(sizeof(*eventLoop));         // 申请内存if (!eventLoop) return NULL;eventLoop->fileEventHead = NULL;                // 初始化链表eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;return eventLoop;
}

继续分析anetTcpServer函数;

int anetTcpServer(char *err, int port, char *bindaddr)
{int s, on = 1;struct sockaddr_in sa;if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {              // 生成socket实例sanetSetError(err, "socket: %s\n", strerror(errno));return ANET_ERR;}if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {       // 设置端口可复用anetSetError(err, "setsockopt SO_REUSEADDR: %s\n", strerror(errno));close(s);return ANET_ERR;}sa.sin_family = AF_INET;                                        // 设置协议sa.sin_port = htons(port);                                      // 设置端口sa.sin_addr.s_addr = htonl(INADDR_ANY);                         if (bindaddr) inet_aton(bindaddr, &sa.sin_addr);if (bind(s, (struct sockaddr*)&sa, sizeof(sa)) == -1) {         // 监听该套接字anetSetError(err, "bind: %s\n", strerror(errno));close(s);return ANET_ERR;}if (listen(s, 5) == -1) {                                       // 指定监听队列anetSetError(err, "listen: %s\n", strerror(errno));close(s);return ANET_ERR;}return s;
}

主要就是根据端口ip监听队列并返回该套接字。创建完成之后就创建了定时回写到文件的内容备份时间事件;

aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);  long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc)
{long long id = eventLoop->timeEventNextId++;aeTimeEvent *te;te = malloc(sizeof(*te));if (te == NULL) return AE_ERR;te->id = id;aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);te->timeProc = proc;te->finalizerProc = finalizerProc;te->clientData = clientData;te->next = eventLoop->timeEventHead;eventLoop->timeEventHead = te;return id;
}

该时间的回调函数是serverCron,时间是1000毫秒;

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {int j, size, used, loops = server.cronloops++;REDIS_NOTUSED(eventLoop);REDIS_NOTUSED(id);REDIS_NOTUSED(clientData);/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL* we resize the hash table to save memory */for (j = 0; j < server.dbnum; j++) {                                                // 重新计算字典的空间太小好调整内容以达到节省内存size = dictGetHashTableSize(server.dict[j]);used = dictGetHashTableUsed(server.dict[j]);if (!(loops % 5) && used > 0) {redisLog(REDIS_DEBUG,"DB %d: %d keys in %d slots HT.",j,used,size);// dictPrintStats(server.dict);}if (size && used && size > REDIS_HT_MINSLOTS &&(used*100/size < REDIS_HT_MINFILL)) {redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);       // 检查是否达到重新缩小字典大小的阈值达到就重新设置大小dictResize(server.dict[j]);redisLog(REDIS_NOTICE,"Hash table %d resized.",j);}}/* Show information about connected clients */if (!(loops % 5)) redisLog(REDIS_DEBUG,"%d clients connected",listLength(server.clients));  // 展示链接信息/* Close connections of timedout clients */if (!(loops % 10))closeTimedoutClients();                                                             // 关闭超时链接的客户端/* Check if a background saving in progress terminated */if (server.bgsaveinprogress) {                                                          // 检查是否在存储int statloc;if (wait4(-1,&statloc,WNOHANG,NULL)) {int exitcode = WEXITSTATUS(statloc);if (exitcode == 0) {redisLog(REDIS_NOTICE,"Background saving terminated with success");server.dirty = 0;server.lastsave = time(NULL);} else {redisLog(REDIS_WARNING,"Background saving error");}server.bgsaveinprogress = 0;}} else {/* If there is not a background saving in progress check if* we have to save now */time_t now = time(NULL);for (j = 0; j < server.saveparamslen; j++) {struct saveparam *sp = server.saveparams+j;         if (server.dirty >= sp->changes &&now-server.lastsave > sp->seconds) {redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",sp->changes, sp->seconds);                                  // 如果需要重新dumpsaveDbBackground("dump.rdb");                                   // 将内存中保存的数据dump到dump.rdb中break;}}}return 1000;                                                                // 返回时间,在时间驱动中会继续创建该时间事件
}

继续查看一下saveDbBackground的函数操作;

static int saveDbBackground(char *filename) {pid_t childpid;if (server.bgsaveinprogress) return REDIS_ERR;if ((childpid = fork()) == 0) {                     // 生成一个子进程/* Child */close(server.fd); if (saveDb(filename) == REDIS_OK) {             // 在子进程中保存数据保存完成之后就退出exit(0);} else {exit(1);}} else {/* Parent */redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);server.bgsaveinprogress = 1;                    // 父进程修改值后退出return REDIS_OK;}return REDIS_OK; /* unreached */
}

由此可知,早期redis版本是通过子进程来进行数据备份的。

创建服务端监听事件

通过server初始化之后,现在就需要把server监听的sock注册到事件驱动中去;

aeCreateFileEvent(server.el, server.fd, AE_READABLE,acceptHandler, NULL, NULL)int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc)
{aeFileEvent *fe;fe = malloc(sizeof(*fe));                       // 申请aeFileEvent内存if (fe == NULL) return AE_ERR;fe->fd = fd;                                    // 设置文件描述符fe->mask = mask;                                // 设置监听描述符fe->fileProc = proc;                            // 设置回调函数 fe->finalizerProc = finalizerProc;fe->clientData = clientData;fe->next = eventLoop->fileEventHead;            // 添加到eventloop的列表中去eventLoop->fileEventHead = fe;return AE_OK;
}

从创建流程中可知,当客户端新连接进来之后调用了acceptHandler函数进行处理

static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd;char cip[128];REDIS_NOTUSED(el);REDIS_NOTUSED(mask);REDIS_NOTUSED(privdata);cfd = anetAccept(server.neterr, fd, cip, &cport);       // 接受进来的请求if (cfd == AE_ERR) {redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);return;}redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);if (createClient(cfd) == REDIS_ERR) {                   // 创建一个client来处理请求redisLog(REDIS_WARNING,"Error allocating resoures for the client");close(cfd); /* May be already closed, just ingore errors */return;}
}int anetAccept(char *err, int serversock, char *ip, int *port)
{int fd;struct sockaddr_in sa;unsigned int saLen;while(1) {saLen = sizeof(sa);fd = accept(serversock, (struct sockaddr*)&sa, &saLen);         // 接受新进来的连接if (fd == -1) {if (errno == EINTR)continue;else {anetSetError(err, "accept: %s\n", strerror(errno));     // 如果出错则报错return ANET_ERR;}}break;}if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));                          // 将数据传入ip与port中if (port) *port = ntohs(sa.sin_port);return fd;
}static int createClient(int fd) {redisClient *c = malloc(sizeof(*c));anetNonBlock(NULL,fd);              // 设置连接为非阻塞anetTcpNoDelay(NULL,fd);if (!c) return REDIS_ERR;selectDb(c,0);                      // 选择数据库号c->fd = fd;c->querybuf = sdsempty();c->argc = 0;c->bulklen = -1;c->sentlen = 0;c->lastinteraction = time(NULL);if ((c->reply = listCreate()) == NULL) oom("listCreate");listSetFreeMethod(c->reply,decrRefCount);if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,readQueryFromClient, c, NULL) == AE_ERR) {          // 创建读事件请求freeClient(c);                                      // 如果出错则释放clientreturn REDIS_ERR;}if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");return REDIS_OK;
}

主要在createClient中对新建立的连接设置非阻塞之后,创建读事件并注册进去。此时注册的读事件的回调函数为readQueryFromClient函数,

readQueryFromClient函数解析
static int processCommand(redisClient *c) {struct redisCommand *cmd;sdstolower(c->argv[0]);/* The QUIT command is handled as a special case. Normal command* procs are unable to close the client connection safely */if (!strcmp(c->argv[0],"quit")) {                                   // 如果是quit退出freeClient(c);return 0;}cmd = lookupCommand(c->argv[0]);                                    // 寻找匹配的命令if (!cmd) {addReplySds(c,sdsnew("-ERR unknown command\r\n"));              // 如果没有找到则返回内容并重置clientresetClient(c);return 1;} else if (cmd->arity != c->argc) {                                 // 检查cmd是否与客户端传入的参数相等如果不想等则错误的输入参数addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));resetClient(c);return 1;} else if (cmd->type == REDIS_CMD_BULK && c->bulklen == -1) {int bulklen = atoi(c->argv[c->argc-1]);sdsfree(c->argv[c->argc-1]);if (bulklen < 0 || bulklen > 1024*1024*1024) {c->argc--;c->argv[c->argc] = NULL;addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));resetClient(c);return 1;}c->argv[c->argc-1] = NULL;c->argc--;c->bulklen = bulklen+2; /* add two bytes for CR+LF *//* It is possible that the bulk read is already in the* buffer. Check this condition and handle it accordingly */if ((signed)sdslen(c->querybuf) >= c->bulklen) {c->argv[c->argc] = sdsnewlen(c->querybuf,c->bulklen-2);c->argc++;c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);} else {return 1;}}/* Exec the command */cmd->proc(c);                                                       // 执行命令resetClient(c);                                                     // 重置客户端return 1;
}static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *c = (redisClient*) privdata;char buf[REDIS_QUERYBUF_LEN];int nread;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);nread = read(fd, buf, REDIS_QUERYBUF_LEN);                                      // 从fd中读数据if (nread == -1) {if (errno == EAGAIN) {                                                      // 如果出错就进行处理nread = 0;} else {redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));freeClient(c);return;}} else if (nread == 0) {                                                        // 如果为0则客户端关闭redisLog(REDIS_DEBUG, "Client closed connection");freeClient(c);return;}if (nread) {c->querybuf = sdscatlen(c->querybuf, buf, nread);                           // 保存读入的数据c->lastinteraction = time(NULL);} else {return;}again:if (c->bulklen == -1) {                                                         // 是否是批量读模式/* Read the first line of the query */char *p = strchr(c->querybuf,'\n');                                         // 解析第一行命令size_t querylen;if (p) {sds query, *argv;int argc, j;query = c->querybuf;c->querybuf = sdsempty();querylen = 1+(p-(query));if (sdslen(query) > querylen) {/* leave data after the first line of the query in the buffer */c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);}*p = '\0'; /* remove "\n" */if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */sdsupdatelen(query);/* Now we can split the query in arguments */if (sdslen(query) == 0) {/* Ignore empty query */sdsfree(query);return;}argv = sdssplitlen(query,sdslen(query)," ",1,&argc);                    // 解析命令sdsfree(query);if (argv == NULL) oom("Splitting query in token");for (j = 0; j < argc && j < REDIS_MAX_ARGS; j++) {if (sdslen(argv[j])) {c->argv[c->argc] = argv[j];c->argc++;} else {sdsfree(argv[j]);}}free(argv);/* Execute the command. If the client is still valid* after processCommand() return and there is something* on the query buffer try to process the next command. */if (processCommand(c) && sdslen(c->querybuf)) goto again;               // 执行命令如果还有内容则继续解析命令return;} else if (sdslen(c->querybuf) >= 1024) {redisLog(REDIS_DEBUG, "Client protocol error");freeClient(c);return;}} else {/* Bulk read handling. Note that if we are at this pointthe client already sent a command terminated with a newline,we are reading the bulk data that is actually the lastargument of the command. */int qbl = sdslen(c->querybuf);if (c->bulklen <= qbl) {/* Copy everything but the final CRLF as final argument */c->argv[c->argc] = sdsnewlen(c->querybuf,c->bulklen-2);c->argc++;c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);processCommand(c);                                                      // 执行命令return;}}
}

主要就是从客户端读取数据并解析出对应的命令,然后根据解析的命令查找不同的command去执行。对应的command查找过程如下;

static struct redisCommand cmdTable[] = {{"get",getCommand,2,REDIS_CMD_INLINE},{"set",setCommand,3,REDIS_CMD_BULK},{"setnx",setnxCommand,3,REDIS_CMD_BULK},{"del",delCommand,2,REDIS_CMD_INLINE},{"exists",existsCommand,2,REDIS_CMD_INLINE},{"incr",incrCommand,2,REDIS_CMD_INLINE},{"decr",decrCommand,2,REDIS_CMD_INLINE},{"rpush",rpushCommand,3,REDIS_CMD_BULK},{"lpush",lpushCommand,3,REDIS_CMD_BULK},{"rpop",rpopCommand,2,REDIS_CMD_INLINE},{"lpop",lpopCommand,2,REDIS_CMD_INLINE},{"llen",llenCommand,2,REDIS_CMD_INLINE},{"lindex",lindexCommand,3,REDIS_CMD_INLINE},{"lrange",lrangeCommand,4,REDIS_CMD_INLINE},{"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},{"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},{"select",selectCommand,2,REDIS_CMD_INLINE},{"move",moveCommand,3,REDIS_CMD_INLINE},{"rename",renameCommand,3,REDIS_CMD_INLINE},{"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},{"keys",keysCommand,2,REDIS_CMD_INLINE},{"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},{"ping",pingCommand,1,REDIS_CMD_INLINE},{"echo",echoCommand,2,REDIS_CMD_BULK},{"save",saveCommand,1,REDIS_CMD_INLINE},{"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},{"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},{"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},/* lpop, rpop, lindex, llen *//* dirty, lastsave, info */{"",NULL,0,0}
};static struct redisCommand *lookupCommand(char *name) {int j = 0; while(cmdTable[j].name != NULL) {                                   // 遍历列表直到找到或者达到表尾if (!strcmp(name,cmdTable[j].name)) return &cmdTable[j];        // 通过字符串对比name是否相同j++;}return NULL;
}
command执行过程

举例分析,假如是setCommand函数执行,

static void setGenericCommand(redisClient *c, int nx) {int retval;robj *o;o = createObject(REDIS_STRING,c->argv[2]);c->argv[2] = NULL;retval = dictAdd(c->dict,c->argv[1],o);             // 向数据库添加数据if (retval == DICT_ERR) {if (!nx)dictReplace(c->dict,c->argv[1],o);          // 检查是否是替换elsedecrRefCount(o);} else {/* Now the key is in the hash entry, don't free it */c->argv[1] = NULL;}server.dirty++; addReply(c,shared.ok);                             // 将ok返回客户端
}static void setCommand(redisClient *c) {return setGenericCommand(c,0);
}

具体的字典操作大家可以详细查看,就是哈希表的操作,接着就继续分析addReply。

addRepl返回数据到客户端
static void addReply(redisClient *c, robj *obj) {if (listLength(c->reply) == 0 &&aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c, NULL) == AE_ERR) return;              // 创建写事件到驱动中if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");incrRefCount(obj);
}

通过addReply创建写事件到事件驱动中,触发的回调函数是sendReplyToClient;

static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *c = privdata;int nwritten = 0, totwritten = 0, objlen;robj *o;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);while(listLength(c->reply)) {o = listNodeValue(listFirst(c->reply));objlen = sdslen(o->ptr);if (objlen == 0) {listDelNode(c->reply,listFirst(c->reply));continue;}nwritten = write(fd, o->ptr+c->sentlen, objlen - c->sentlen);       // 将数据写入fd中if (nwritten <= 0) break;c->sentlen += nwritten;                                             // 添加已经发送的数据长度totwritten += nwritten;/* If we fully sent the object on head go to the next one */if (c->sentlen == objlen) {listDelNode(c->reply,listFirst(c->reply));c->sentlen = 0;}}if (nwritten == -1) {                                                   // 如果写入报错if (errno == EAGAIN) {nwritten = 0;} else {redisLog(REDIS_DEBUG,"Error writing to client: %s", strerror(errno));freeClient(c);                                                  // 释放并报错return;}}if (totwritten > 0) c->lastinteraction = time(NULL);if (listLength(c->reply) == 0) {                                        // 如果数据写入完成c->sentlen = 0;aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);                     // 删除该写事件}
}

通过该函数调用就可以将数据返回给客户端。

aeMain事件驱动启动函数

该函数就是启动整个事件循环,来推送上节分析的所有流程。

void aeMain(aeEventLoop *eventLoop)
{eventLoop->stop = 0;while (!eventLoop->stop)                                // 只要没有停止aeProcessEvents(eventLoop, AE_ALL_EVENTS);          // 处理该函数
}

所有的处理逻辑都在aeProcessEvents函数中处理;

aeProcessEvents事件处理过程
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int maxfd = 0, numfd = 0, processed = 0;fd_set rfds, wfds, efds;aeFileEvent *fe = eventLoop->fileEventHead;                         // 获取事件头部aeTimeEvent *te;long long maxId;AE_NOTUSED(flags);/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;FD_ZERO(&rfds);                                                     // 读事件置空FD_ZERO(&wfds);                                                     // 写事件置空FD_ZERO(&efds);/* Check file events */if (flags & AE_FILE_EVENTS) {                                       // 检查事件列表while (fe != NULL) {                                            // 遍历该链表if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds);          // 添加读事件监听fdif (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds);          // 添加写事件监听fdif (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds);if (maxfd < fe->fd) maxfd = fe->fd;numfd++;fe = fe->next;                                              // 获取下一个事件}}/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (numfd || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int retval;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);if (shortest) {long now_sec, now_ms;/* Calculate the time missing for the nearest* timer to fire. */aeGetTime(&now_sec, &now_ms);tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;if (shortest->when_ms < now_ms) {tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;tvp->tv_sec --;} else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;}} else {/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to se the timeout* to zero */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}retval = select(maxfd+1, &rfds, &wfds, &efds, tvp);                     // 调用select检查是否有监听的事件发生if (retval > 0) {fe = eventLoop->fileEventHead;                                      // 获取事件头部while(fe != NULL) {                                                 // 依次遍历该事件列表int fd = (int) fe->fd;if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) ||(fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) ||(fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))){int mask = 0;if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds))          // 计算当前事件的mask值mask |= AE_READABLE;if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds))mask |= AE_WRITABLE;if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))mask |= AE_EXCEPTION;fe->fileProc(eventLoop, fe->fd, fe->clientData, mask);      // 调用事件驱动的回调函数processed++;/* After an event is processed our file event list* may no longer be the same, so what we do* is to clear the bit for this file descriptor and* restart again from the head. */fe = eventLoop->fileEventHead;                              // 获取下一个FD_CLR(fd, &rfds);                                          // 将该fd清理掉FD_CLR(fd, &wfds); FD_CLR(fd, &efds);} else {fe = fe->next;                                              // 如果没有事件发生则下一个}}}}/* Check time events */if (flags & AE_TIME_EVENTS) {                                               // 处理时间事件te = eventLoop->timeEventHead;maxId = eventLoop->timeEventNextId-1;while(te) {long now_sec, now_ms;long long id;if (te->id > maxId) {te = te->next;continue;}aeGetTime(&now_sec, &now_ms);                                       // 获取当前时间if (now_sec > te->when_sec ||(now_sec == te->when_sec && now_ms >= te->when_ms))             // 比较当前时间 如果获取的时间事件的时间已经过期{int retval;id = te->id;retval = te->timeProc(eventLoop, id, te->clientData);           // 调用时间事件的回调函数/* After an event is processed our time event list may* no longer be the same, so we restart from head.* Still we make sure to don't process events registered* by event handlers itself in order to don't loop forever.* To do so we saved the max ID we want to handle. */if (retval != AE_NOMORE) {aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);  // 如果时间事件返回的是时间, 则修改该时间事件到返回的时间以继续在指定时间到了之后再次调用} else {aeDeleteTimeEvent(eventLoop, id);                           // 删除该时间事件}te = eventLoop->timeEventHead;                                  // 获取下一个事件} else {te = te->next;                                                  // 获取下一个事件}}}return processed; /* return the number of processed file/time events */
}

所有的驱动事件与时间驱动都在该函数中通过select的系统调用来完成,至此redis的主要的事件驱动逻辑执行完成。

总结

通过早期版本的redis的代码查看,可知redis是一个单线程的事件驱动框架,只有在讲数据保存到本地文件中时,才使用进程来保存数据。本文只是简单的分析了执行的主要过程,大家如果对redis的其他特性感兴趣,可自行查看代码。由于本人才疏学浅,如有错误请批评指正。

redis源码分析(beta版本)-redis实现的概述逻辑相关推荐

  1. redis源码分析 ppt_【Redis】redis各类型数据结构和底层实现源码分析

    一.简介和应用 Redis是一个由ANSI C语言编写,性能优秀.支持网络.可持久化的K-K内存数据库,并提供多种语言的API.它常用的类型主要是 String.List.Hash.Set.ZSet ...

  2. Redis源码分析:基础概念介绍与启动概述

    Redis源码分析 基于Redis-5.0.4版本,进行基础的源码分析,主要就是分析一些平常使用过程中的内容.仅作为相关内容的学习记录,有关Redis源码学习阅读比较广泛的便是<Redis设计与 ...

  3. Redis源码分析(一)--Redis结构解析

    从今天起,本人将会展开对Redis源码的学习,Redis的代码规模比较小,非常适合学习,是一份非常不错的学习资料,数了一下大概100个文件左右的样子,用的是C语言写的.希望最终能把他啃完吧,C语言好久 ...

  4. redis源码分析 -- cs结构之服务器

    服务器与客户端是如何交互的 redis客户端向服务器发送命令请求,服务器接收到客户端发送的命令请求之后,读取解析命令,并执行命令,同时将命令执行结果返回给客户端. 客户端与服务器交互的代码流程如下图所 ...

  5. Redis源码分析(一)redis.c //redis-server.c

    Redis源码分析(一)redis.c //redis-server.c 入口函数 int main() 4450 int main(int argc, char **argv) {4451 init ...

  6. 10年大厂程序员是如何高效学习使用redis的丨redis源码分析丨redis存储原理

    10年大厂程序员是怎么学习使用redis的 1. redis存储原理分析 2. redis源码学习分享 3. redis跳表和B+树详细对比分析 视频讲解如下,点击观看: 10年大厂程序员是如何高效学 ...

  7. c++ 多线程 类成员函数_为什么我说C/C++程序员都要阅读Redis源码之:通过Redis学习事件驱动设计

    0. 为什么我说C/C++程序员都要阅读Redis源码 主要原因就是『简洁』.如果你用源码编译过Redis,你会发现十分轻快,一步到位.其他语言的开发者可能不会了解这种痛,作为C/C++程序员,如果你 ...

  8. Spring 源码分析(四) ——MVC(二)概述

    随时随地技术实战干货,获取项目源码.学习资料,请关注源代码社区公众号(ydmsq666) from:Spring 源码分析(四) --MVC(二)概述 - 水门-kay的个人页面 - OSCHINA ...

  9. Redis 源码分析之故障转移

    在 Redis cluster 中故障转移是个很重要的功能,下面就从故障发现到故障转移整个流程做一下详细分析. 故障检测 PFAIL 标记 集群中每个节点都会定期向其他节点发送 PING 消息,以此来 ...

最新文章

  1. 溢价 5 倍欲将 SiFive 收入麾下,英特尔的绝地反击战
  2. 当莎士比亚遇见Google Flax:教你用​字符级语言模型和归递神经网络写“莎士比亚”式句子...
  3. .Net下的HashTable
  4. js数组与字符串的相互转换方法
  5. STM32 电机教程 28 - ST MCLIB实战之 位置闭环控制
  6. 深究AngularJS——$sce的使用
  7. 模拟noj——打扑克
  8. 哈希表处理冲突的方法?
  9. 80 after generation to marry or not to marry that is a question
  10. NumPy之:理解广播
  11. 原创:微信小程序源码解说:石头剪刀布(附源码下载)
  12. 免oracle客户端下载,Oracle免安装客户端
  13. java设计模式 订阅模式_Java中的外观设计模式
  14. C#中的变量类型(值类型、引用类型)
  15. python程序开机自启动_Linux下Python脚本自启动和定时启动的详细步骤
  16. 取消开机就弹出 msn中文网 操作步骤
  17. cad画多段线时不显示轨迹_cad画多段线时不显示轨迹_CAD画多段线的时候看不到预览效果的解决方法...
  18. Spring定时任务表达式示例
  19. 啮齿类动物大尺度功能网络
  20. AngularJS知识概括

热门文章

  1. 「摸鱼」神器来了,Python 实现人脸监测制作神器
  2. 谷歌排名第一的编程语言,死磕它这两点,小白也能学的会!不信你看!
  3. 学会这21条,你离Vim大神就不远了
  4. 复旦邱锡鹏教授公布《神经网络与深度学习》,中文免费下载 | 极客头条
  5. 掌握哪些机器学习工具更受企业青睐?
  6. 加速点击控制应用中的边缘分析和机器学习部署 | 免费直播
  7. 集合70多种推荐算法,东北大学老师用Java写了一个开源库,在GitHub上收获近1500个Star...
  8. AlphaGo Zero,一次成功的炒作而已?
  9. 在微服务架构下基于 Prometheus 构建一体化监控平台的最佳实践
  10. RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?