Redis 英文拼写:REmote DIctionary Server(Redis)
Redis 是一个由Salvatore Sanfilippo 写的高性能key-value 存储系统。
远程,指的是有网络api接口,也就是提供6379端口tcp server给客户端使用。
字典服务,指的是数据类型为key:value类型,与mysql的关系型数据库有所区别。
redis原理:在内存层面用类型、key_len、ken、value_len、value存储数据,在文件中一行一条数据(kv关系)。 设置默认参数 创建对象、持久化定时器回调 加载文件参数 加载文件数据写入字典 创建client connect事件handler(accept后,写入读写事件select(),最新的版本用epoll) 调用事件多路复用循环 退出
int main(int argc, char * *argv) {
initServerConfig(); // 设置默认参数 initServer(); // 设置系统信号、字典、对象链表、公共对象、事件(文件,定时器)、6379服务器、持久化回调定时器 if (argc == 2) {
// 使用配置文件替代部分默认参数 ResetServerSaveParams(); loadServerConfig(argv[1]); redisLog(REDIS_NOTICE, "Configuration loaded"); } else if (argc > 2) {
fprintf(stderr, "Usage: ./redis-server [/path/to/redis.conf]\n"); exit(1); } redisLog(REDIS_NOTICE, "Server started"); if (loadDb("dump.rdb") == REDIS_OK) // 加载数据到字典 redisLog(REDIS_NOTICE, "DB loaded from disk"); if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event"); // 注册6379客户端连接事件处理 redisLog(REDIS_NOTICE, "The server is now ready to accept connections"); aeMain(server.el); // 定时/网络读写事件多路复用监听 aeDeleteEventLoop(server.el); // 销毁事件循环,退出redis进程 return 0; }
主要组件:
- 事件模型(select()多路复用)
- 链表
- 字典
事件模型:
// ae.h /* Types and data structures */ typedef void aeFileProc(struct aeEventLoop * eventLoop, int fd, void * clientData, int mask); typedef int aeTimeProc(struct aeEventLoop * eventLoop, long long id, void * clientData); typedef void aeEventFinalizerProc(struct aeEventLoop * eventLoop, void * clientData); /* File event structure */ typedef struct aeFileEvent {
int fd; // 目标文件文件描述符,也就是redis client的fd int mask; /* one of AE_(READABLE|WRITABLE|EXCEPTION) */ aeFileProc * fileProc; /* 客户端网络事件发生后,回调函数 */ aeEventFinalizerProc * finalizerProc; /* 清理函数 */ void * clientData; /* 回调参数 */ struct aeFileEvent * next; // 单链环 } aeFileEvent; /* Time event structure */ typedef struct aeTimeEvent {
long long id; /* 本定时器对象的id */ long when_sec; /* 定时秒 */ long when_ms; /* 定时毫秒 */ aeTimeProc * timeProc; // 定时回掉函数 aeEventFinalizerProc * finalizerProc; // 定时器销毁时,调用的清理函数 void * clientData; // 定时器回调函数参数 struct aeTimeEvent * next; // 单链表环 } aeTimeEvent; /* State of an event based program */ typedef struct aeEventLoop {
long long timeEventNextId; // 定时器事件id,自增,用于给新创建定时器编号 aeFileEvent * fileEventHead; // 客户都fd读写异常事件的链表头 aeTimeEvent * timeEventHead; // 定时器事件的链表头 int stop; // 事件循环while退出条件 } aeEventLoop; // 事件类型:读、写、异常。 #define AE_READABLE 1 #define AE_WRITABLE 2 #define AE_EXCEPTION 4 // 监控的事件源的类型:网络、定时器、全部、非阻塞标识(在select()时,设置timeout为0) #define AE_FILE_EVENTS 1 #define AE_TIME_EVENTS 2 #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) #define AE_DONT_WAIT 4
事件dispatch()循环
/* 调用后,遍历eventLoop下面挂载的网络事件链表、定时器链表。使用定时器链表中最近的超市时间,作为select()的最后一个参数,去监听全部的客户端fd, 处理客户端事件后,查询定时器事件并处理。 继续while (!eventLoop->stop)循环,直到stop==true退出循环 */ void aeMain(aeEventLoop * eventLoop) {
eventLoop->stop = 0; while (!eventLoop->stop) aeProcessEvents(eventLoop, AE_ALL_EVENTS); } int aeProcessEvents(aeEventLoop * eventLoop, int flags) {
int maxfd = 0, numfd = 0, processed = 0; fd_set rfds, wfds, efds; aeFileEvent * fe = eventLoop->fileEventHead; // fe获得客户端事件链表头 aeTimeEvent * te; // 指向定时器事件 long long maxId; AE_NOTUSED(flags); /* Nothing to do? return ASAP */ if (! (flags & AE_TIME_EVENTS) && ! (flags & AE_FILE_EVENTS)) return 0; // 清零载体。准备注册事件 FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); /* Check file events */ /* 注册客户端fd */ if (flags & AE_FILE_EVENTS) {
while (fe != NULL) {
if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds); if (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds); if (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds); if (maxfd < fe->fd) maxfd = fe->fd; numfd++; fe = fe->next; } } /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (numfd || ((flags & AE_TIME_EVENTS) && ! (flags & AE_DONT_WAIT))) {
int retval; aeTimeEvent * shortest = NULL; struct timeval tv, *tvp; // 获取最近一次的定时超时时间点,在早期redis版本中,用的还是比较笨的遍历整条链表的方法,在新的redis中,是用哈希表,或最小根来排序,不需要遍历 if (flags & AE_TIME_EVENTS && ! (flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); // 计算select()的超时时间参数 if (shortest) {
long now_sec, now_ms; /* Calculate the time missing for the nearest * timer to fire. */ aeGetTime(&now_sec, &now_ms); tvp = &tv; tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms + 1000) -now_ms) * 1000; tvp->tv_sec--; } else {
tvp->tv_usec = (shortest->when_ms - now_ms) * 1000; } } else {
/* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to se the timeout * to zero */ // timeout为0,如果无时间则直接返回 if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } // timeout为NULL,一直等待直到有事件发生 else {
/* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // 监控客户端fd事件,超时事件使用上面计算出来的参数 retval = select(maxfd + 1, &rfds, &wfds, &efds, tvp); // 如果检测到事件发生,则调用客户端fd处理回调函数 if (retval > 0) {
fe = eventLoop->fileEventHead; while (fe != NULL) {
int fd = (int) fe->fd; if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) || (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) || (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))) {
int mask = 0; if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) mask |= AE_WRITABLE; if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)) mask |= AE_EXCEPTION; fe->fileProc(eventLoop, fe->fd, fe->clientData, mask); processed++; /* After an event is processed our file event list * may no longer be the same, so what we do * is to clear the bit for this file descriptor and * restart again from the head. */ fe = eventLoop->fileEventHead; FD_CLR(fd, &rfds); FD_CLR(fd, &wfds); FD_CLR(fd, &efds); } else {
fe = fe->next; } } } } /* Check time events */ /* 遍历定时器链表,发现超时时间比current实际小的,则调用定时器回调函数 */ if (flags & AE_TIME_EVENTS) {
te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId - 1; while (te) {
long now_sec, now_ms; long long id; if (te->id > maxId) {
te = te->next; continue; } aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) {
int retval; id = te->id; retval = te->timeProc(eventLoop, id, te->clientData); // 定时器回调 /* After an event is processed our time event list may * no longer be the same, so we restart from head. * Still we make sure to don't process events registered * by event handlers itself in order to don't loop forever. * To do so we saved the max ID we want to handle. */ if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval, &te->when_sec, &te->when_ms); } else {
aeDeleteTimeEvent(eventLoop, id); } te = eventLoop->timeEventHead; } else {
te = te->next; } } } return processed; /* return the number of processed file/time events */ }
链表:
//链表节点 typedef struct listNode {
struct listNode * prev; // 链表双环 struct listNode * next; void * value; // 注意,但凡是void *的数据类型,必然是指向结构体地址 } listNode; typedef struct list {
listNode * head; // 指向链表头 listNode * tail; // 指向链表尾 void * (*dup) (void * ptr); // 注意,dup复制字符串。返回值是void *。那么自然的就要联想到返回的是void * value,因为类型是一致的 void (*free) (void * ptr); // 释放链表节点的void * value int (*match) (void * ptr, void * key); // 比较value,用于逻辑排序。程序中,比较字符串有两个目的:一个是排序(数学关系的大小),另外一个是逻辑排序(强行虚拟一个顺序出来,使对象在创建和销毁期间,具备一定的顺序性,方便调用) int len; // 链表的节点数量 } list; // 链表迭代器(统一定义封装类型)。 typedef struct listIter {
listNode * next; listNode * prev; int direction; } listIter;
字典:
// 词汇 typedef struct dictEntry {
void * key; void * val; struct dictEntry * next; } dictEntry; // 字典目录 typedef struct dictType {
unsigned int(*hashFunction) (const void * key); void * (*keyDup) (void * privdata, const void * key); void * (*valDup) (void * privdata, const void * obj); int(*keyCompare) (void * privdata, const void * key1, const void * key2); void(*keyDestructor) (void * privdata, void * key); void(*valDestructor) (void * privdata, void * obj); } dictType; // 字典 typedef struct dict {
dictEntry * * table; // 字典指针数组,每个数组是一条链表(一页词汇)。 dictType * type; // 字典目录,用于维护字典数据 unsigned int size; // 字典中的词汇量 unsigned int sizemask; unsigned int used; void * privdata; // void *类型的私有数据,一般指向的是结构体或函数指针,此处应该是结构体 } dict; // 字典迭代器 typedef struct dictIterator {
dict * ht; int index; dictEntry * entry, *nextEntry; } dictIterator;
static void initServerConfig() {
server.dbnum = REDIS_DEFAULT_DBNUM; // 默认16本字典,argv[x]参数可以指定其它值 server.port = REDIS_SERVERPORT; // 6379服务端口,用于对外(client)提供api接口 server.verbosity = REDIS_DEBUG; // 是否打印调试信息 server.maxidletime = REDIS_MAXIDLETIME; // 默认客户端timeout,此处是300s server.saveparams = NULL; server.logfile = NULL; /* NULL = log on standard output */ ResetServerSaveParams(); // 持久化策略 appendServerSaveParams(60 * 60, 1); /* 1条数据变化,超过1小时必须写入磁盘 */ appendServerSaveParams(300, 100); /* 100条数据变化,超过5分钟必须写入磁盘*/ appendServerSaveParams(60, 10000); /* 1万条数据变化,超过1分钟必须写入磁盘 */ }
// redis server在6379端口监听客户端连接,此函数就是socket的accept(),成功后加入事件链表,事件链表被多路复用监听,收到cli命令后,解析并处理(kv操作),返回状态到终端。 static void acceptHandler(aeEventLoop * el, int fd, void * privdata, int mask) {
int cport, cfd; char cip[128]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); cfd = anetAccept(server.neterr, fd, cip, &cport); if (cfd == AE_ERR) {
redisLog(REDIS_DEBUG, "Accepting client connection: %s", server.neterr); return; } redisLog(REDIS_DEBUG, "Accepted %s:%d", cip, cport); // 客户端fd加入事件监听链表 if (createClient(cfd) == REDIS_ERR) {
redisLog(REDIS_WARNING, "Error allocating resoures for the client"); close(cfd); /* May be already closed, just ingore errors */ return; } }
#define REDIS_STRING 0 #define REDIS_LIST 1 #define REDIS_SET 2
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/219446.html原文链接:https://javaforall.net
