微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!
深入redis内部--内存管理
1. Redis内存管理通过在zmalloc.h和zmalloc.c中重写c语言对内存的管理来完成的。redis内存管理c内存管理原型作用zmallocmallocvoid *malloc(unsigned int num_bytes);分配一块指定大小的内存区域,并返回指向该区域头部的指针,分配失败则返回NULLzcalloccallocvoid *calloc(unsigned n, unsigned size);在内存的动态存储区中分配n个长度为size的连续空间,函数返回一个指向分配起始地址的指针;如果分配不成功,返回NULL。zreallocreallocoid *realloc(void *mem_address, unsigned int newsize);先判断当前的指针是否有足够的连续空间,如果有,扩大mem_address指向的地址,并且将mem_address返回,如果空间不够,先按照newsize指定的大小分配空间,将原有数据从头到尾拷贝到新分配的内存区域,而后释放原来mem_address所指内存区域(注意:原来指针是自动释放,不需要使用free),同时返回新分配的内存区域的首地址。即重新分配存储器块的地址。zfreefreevoid free(void *ptr) 释放ptr指向的存储空间。被释放的空间通常被送入可用存储区池,以后可在调用malloc、realloc以及calloc函数来再分配。 封装就是为了屏蔽底层平台的差异,同时方便自己实现相关的统计函数。定义平台之间的差异,主要是tcmalloc(google)、jemalloc(facebook)、苹果平台。具体来说就是:若系统中存在Google的TC_MALLOC库,则使用tc_malloc一族函数代替原本的malloc一族函数。若系统中存在facebook的JE_MALLOC库,则使用je_malloc一族函数替换原来的malloc一族函数。若当前系统是Mac系统或者其它系统,则使用<malloc/malloc.h>中的内存分配函数。/* Double expansion needed for stringification of macro values. */#define __xstr(s) __str(s)#define __str(s) #s#if defined(USE_TCMALLOC)#define ZMALLOC_LIB ("tcmalloc-" __xstr(TC_VERSION_MAJOR) "." __xstr(TC_VERSION_MINOR))#include <google/tcmalloc.h>#if (TC_VERSION_MAJOR == 1 && TC_VERSION_MINOR >= 6) || (TC_VERSION_MAJOR > 1)#define HAVE_MALLOC_SIZE 1#define zmalloc_size(p) tc_malloc_size(p)#else#error "Newer version of tcmalloc required"#endif#elif defined(USE_JEMALLOC)#define ZMALLOC_LIB ("jemalloc-" __xstr(JEMALLOC_VERSION_MAJOR) "." __xstr(JEMALLOC_VERSION_MINOR) "." __xstr(JEMALLOC_VERSION_BUGFIX))#include <jemalloc/jemalloc.h>#if (JEMALLOC_VERSION_MAJOR == 2 && JEMALLOC_VERSION_MINOR >= 1) || (JEMALLOC_VERSION_MAJOR > 2)#define HAVE_MALLOC_SIZE 1#define zmalloc_size(p) je_malloc_usable_size(p)#else#error "Newer version of jemalloc required"#endif#elif defined(__APPLE__)#include <malloc/malloc.h>#define HAVE_MALLOC_SIZE 1#define zmalloc_size(p) malloc_size(p)#endif 具体如下:/* Explicitly override malloc/free etc when using tcmalloc. */#if defined(USE_TCMALLOC)#define malloc(size) tc_malloc(size)#define calloc(count,size) tc_calloc(count,size)#define realloc(ptr,size) tc_realloc(ptr,size)#define free(ptr) tc_free(ptr)#elif defined(USE_JEMALLOC)#define malloc(size) je_malloc(size)#define calloc(count,size) je_calloc(count,size)#define realloc(ptr,size) je_realloc(ptr,size)#define free(ptr) je_free(ptr)#endif#ifdef HAVE_ATOMIC#define update_zmalloc_stat_add(__n) __sync_add_and_fetch(&used_memory, (__n))#define update_zmalloc_stat_sub(__n) __sync_sub_and_fetch(&used_memory, (__n))#else#define update_zmalloc_stat_add(__n) do {pthread_mutex_lock(&used_memory_mutex);used_memory += (__n);pthread_mutex_unlock(&used_memory_mutex);} while(0)#define update_zmalloc_stat_sub(__n) do {pthread_mutex_lock(&used_memory_mutex);used_memory -= (__n);pthread_mutex_unlock(&used_memory_mutex);} while(0)#endif说明:Both libraries try to de-contention memory acquire by having threads pick the memory from different caches, but they have different strategies:jemalloc (used by Facebook) maintains a cache per threadtcmalloc (from Google) maintains a pool of caches, and threads develop a "natural" affinity for a cache, but may changeThis led, once again if I remember correctly, to an important difference in term of thread management.jemalloc is faster if threads are static, for example using poolstcmalloc is faster when threads are created/destructed1.1 zmalloc实现      void *zmalloc(size_t size) {void *ptr = malloc(size+PREFIX_SIZE);if (!ptr) zmalloc_oom_handler(size); //如果没有发生内存溢出,则使用的分配方式static void (*zmalloc_oom_handler)(size_t) = zmalloc_default_oom;#ifdef HAVE_MALLOC_SIZE //HAVE_MALLOC_SIZE用来确定系统是否有函数malloc_size,定义如上所示。update_zmalloc_stat_alloc(zmalloc_size(ptr)); //更新分配内存的状态。处理线程安全和线程不安全return ptr;#else*((size_t*)ptr) = size;update_zmalloc_stat_alloc(size+PREFIX_SIZE);return (char*)ptr+PREFIX_SIZE;#endif} #define update_zmalloc_stat_alloc(__n) do {size_t _n = (__n);if (_n&(sizeof(long)-1)) _n += sizeof(long)-(_n&(sizeof(long)-1));if (zmalloc_thread_safe) {update_zmalloc_stat_add(_n);} else {used_memory += _n;}} while(0)#ifdef HAVE_ATOMIC#define update_zmalloc_stat_add(__n) __sync_add_and_fetch(&used_memory, (__n))#define update_zmalloc_stat_sub(__n) __sync_sub_and_fetch(&used_memory, (__n))#else#define update_zmalloc_stat_add(__n) do {pthread_mutex_lock(&used_memory_mutex);used_memory += (__n);pthread_mutex_unlock(&used_memory_mutex);} while(0)#define update_zmalloc_stat_sub(__n) do {pthread_mutex_lock(&used_memory_mutex);used_memory -= (__n);pthread_mutex_unlock(&used_memory_mutex);} while(0)#endif说明int pthread_mutex_lock(pthread_mutex_t *mutex);当pthread_mutex_lock()返回时,该互斥锁已被锁定。线程调用该函数让互斥锁上锁,如果该互斥锁已被另一个线程锁定和拥有,则调用该线程将阻塞,直到该互斥锁变为可用为止。int pthread_mutex_unlock(pthread_mutex_t *mutex);和上面的函数为一对。其它函数的实现类似。   
Keys vs Values: what is swapped out?
http://redis.io/topics/internals-vmVirtual Memory technical specificationThis document details the internals of the Redis Virtual Memory subsystem. The intended audience is not the final user but programmers willing to understand or modify the Virtual Memory implementation.Keys vs Values: what is swapped out?The goal of the VM subsystem is to free memory transferring Redis Objects from memory to disk. This is a very generic command, but specifically, Redis transfers only objects associated with values. In order to understand better this concept we'll show, using the DEBUG command, how a key holding a value looks from the point of view of the Redis internals:redis> set foo barOKredis> debug object fooKey at:0x100101d00 refcount:1, value at:0x100101ce0 refcount:1 encoding:raw serializedlength:4As you can see from the above output, the Redis top level hash table maps Redis Objects (keys) to other Redis Objects (values). The Virtual Memory is only able to swap values on disk, the objects associated to keys are always taken in memory: this trade off guarantees very good lookup performances, as one of the main design goals of the Redis VM is to have performances similar to Redis with VM disabled when the part of the dataset frequently used fits in RAM.How does a swapped value looks like internallyWhen an object is swapped out, this is what happens in the hash table entry:The key continues to hold a Redis Object representing the key.The value is set to NULLSo you may wonder where we store the information that a given value (associated to a given key) was swapped out. Just in the key object!This is how the Redis Object structure robj looks like:/* The actual Redis Object */typedef struct redisObject {void *ptr;unsigned char type;unsigned char encoding;unsigned char storage; /* If this object is a key, where is the value?* REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */unsigned char vtype; /* If this object is a key, and value is swapped out,* this is the type of the swapped out object. */int refcount;/* VM fields, this are only allocated if VM is active, otherwise the* object allocation function will just allocate* sizeof(redisObjct) minus sizeof(redisObjectVM), so using* Redis without VM active will not have any overhead. */struct redisObjectVM vm;} robj;As you can see there are a few fields about VM. The most important one is storage, that can be one of this values:REDISVMMEMORY: the associated value is in memory.REDISVMSWAPPED: the associated values is swapped, and the value entry of the hash table is just set to NULL.REDISVMLOADING: the value is swapped on disk, the entry is NULL, but there is a job to load the object from the swap to the memory (this field is only used when threaded VM is active).REDISVMSWAPPING: the value is in memory, the entry is a pointer to the actual Redis Object, but there is an I/O job in order to transfer this value to the swap file.If an object is swapped on disk (REDISVMSWAPPED or REDISVMLOADING), how do we know where it is stored, what type it is, and so forth? That's simple: the vtype field is set to the original type of the Redis object swapped, while the vm field (that is a redisObjectVM structure) holds information about the location of the object. This is the definition of this additional structure:/* The VM object structure */struct redisObjectVM {off_t page; /* the page at which the object is stored on disk */off_t usedpages; /* number of pages used on disk */time_t atime; /* Last access time */} vm;As you can see the structure contains the page at which the object is located in the swap file, the number of pages used, and the last access time of the object (this is very useful for the algorithm that select what object is a good candidate for swapping, as we want to transfer on disk objects that are rarely accessed).As you can see, while all the other fields are using unused bytes in the old Redis Object structure (we had some free bit due to natural memory alignment concerns), the vm field is new, and indeed uses additional memory. Should we pay such a memory cost even when VM is disabled? No! This is the code to create a new Redis Object:... some code ...if (server.vm_enabled) {pthread_mutex_unlock(&server.obj_freelist_mutex);o = zmalloc(sizeof(*o));} else {o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));}... some code ...As you can see if the VM system is not enabled we allocate just sizeof(*o)-sizeof(struct redisObjectVM) of memory. Given that the vm field is the last in the object structure, and that this fields are never accessed if VM is disabled, we are safe and Redis without VM does not pay the memory overhead.The Swap FileThe next step in order to understand how the VM subsystem works is understanding how objects are stored inside the swap file. The good news is that's not some kind of special format, we just use the same format used to store the objects in .rdb files, that are the usual dump files produced by Redis using the SAVE command.The swap file is composed of a given number of pages, where every page size is a given number of bytes. Thi
深入redis内部--事件处理机制
1. redis事件的定义/* State of an event based program */typedef struct aeEventLoop {int maxfd; /* highest file descriptor currently registered */int setsize; /* max number of file descriptors tracked */long long timeEventNextId; /*下一个定时器的id*/time_t lastTime; /* Used to detect system clock skew */aeFileEvent *events; /* Registered events注册的文件事件 */aeFiredEvent *fired; /* Fired events 已注销的文件事件 */aeTimeEvent *timeEventHead; /*定时器事件链表的首部*/int stop;void *apidata; /* This is used for polling API specific data */aeBeforeSleepProc *beforesleep;} aeEventLoop;  1.1 事件定义/* File event structure */typedef struct aeFileEvent {int mask; /* one of AE_(READABLE|WRITABLE) */aeFileProc *rfileProc;aeFileProc *wfileProc;void *clientData;} aeFileEvent;/* Time event structure */typedef struct aeTimeEvent {long long id; /* time event identifier. */long when_sec; /* seconds */long when_ms; /* milliseconds */aeTimeProc *timeProc;aeEventFinalizerProc *finalizerProc;void *clientData;struct aeTimeEvent *next;} aeTimeEvent;/* A fired event */typedef struct aeFiredEvent {int fd;int mask;} aeFiredEvent; 2.封装事件处理的实现/* Include the best multiplexing layer supported by this system.* The following should be ordered by performances, descending. */#ifdef HAVE_EVPORT#include "ae_evport.c"#else#ifdef HAVE_EPOLL#include "ae_epoll.c"#else#ifdef HAVE_KQUEUE#include "ae_kqueue.c"#else#include "ae_select.c"#endif#endif#endif 3.事件处理的主函数void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS);}}  3.1事件处理过程/* Process every pending time event, then every pending file event* (that may be registered by time event callbacks just processed).* Without special flags the function sleeps until some file event* fires, or when the next time event occurs (if any).** If flags is 0, the function does nothing and returns.* if flags has AE_ALL_EVENTS set, all the kind of events are processed.* if flags has AE_FILE_EVENTS set, file events are processed.* if flags has AE_TIME_EVENTS set, time events are processed.* if flags has AE_DONT_WAIT set the function returns ASAP until all* the events that's possible to process without to wait are processed.** The function returns the number of events processed. */int aeProcessEvents(aeEventLoop *eventLoop, int flags){int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);if (shortest) {long now_sec, now_ms;/* Calculate the time missing for the nearest* timer to fire. */aeGetTime(&now_sec, &now_ms);tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;if (shortest->when_ms < now_ms) {tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;tvp->tv_sec --;} else {tvp->tv_usec = (shortest->when_ms - now_ms)*1000;}if (tvp->tv_sec < 0) tvp->tv_sec = 0;if (tvp->tv_usec < 0) tvp->tv_usec = 0;} else {/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}numevents = aeApiPoll(eventLoop, tvp);for (j = 0; j < numevents; j++) {aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* note the fe->mask & mask & ... code: maybe an already processed* event removed an element that fired and we still didn't* processed, so we check if the event is still valid. */if (fe->mask & mask & AE_READABLE) {rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}if (fe->mask & mask & AE_WRITABLE) {if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}/* Check time events */if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */}/* Process time events */static int processTimeEvents(aeEventLoop *eventLoop) {int processed = 0;aeTimeEvent *te;long long maxId;time_t now = time(NULL);/* If the system clock is moved to the future, and then set back to the* right value, time events may be delayed in a random way. Often this* means that scheduled operations will not be performed soon enough.** Here we try to detect system clock skews, and force all the time* events to be processed ASAP when this
深入redis内部---网络编程
Redis在anet.h和anet.c中封装了底层套接字实现:1.anetTcpServer,建立网络套接字服务器,完成对socket(),bind(),listen()等操作的封装,返回socket的fd。int anetTcpServer(char *err, int port, char *bindaddr){int s;struct sockaddr_in sa;//见1.1结构体if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR) //AF_INET表示使用IPv4return ANET_ERR;memset(&sa,0,sizeof(sa));sa.sin_family = AF_INET;sa.sin_port = htons(port);sa.sin_addr.s_addr = htonl(INADDR_ANY);if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) {anetSetError(err, "invalid bind address");close(s);return ANET_ERR;}if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR)return ANET_ERR;return s;} 1.1 结构体sockaddr_instruct sockaddr_in {short int sin_family; // Address familyunsigned short int sin_port; // Port numberstruct in_addr sin_addr; // Internet addressunsigned char sin_zero[8]; // Same size as struct sockaddr};1.2 创建socket,封装了socket实现static int anetCreateSocket(char *err, int domain) {int s, on = 1;if ((s = socket(domain, SOCK_STREAM, 0)) == -1) { //创建socketanetSetError(err, "creating socket: %s", strerror(errno));return ANET_ERR;}/* Make sure connection-intensive things like the redis benchmark* will be able to close/open sockets a zillion of times */if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { //设置选项anetSetError(err, "setsockopt SO_REUSEADDR: %s", strerror(errno));return ANET_ERR;}return s;}1.3 memset函数在C中 <string.h>,原型为:void *memset(void *s, int ch, size_t n);作用:将s中前n个字节 (typedef unsigned int size_t)用 ch 替换并返回 s 。memset:作用是在一段内存块中填充某个给定的值,它是对较大的结构体或数组进行清零操作的一种最快方法。1.4 网络转头文件:#include <netinet/in.h>定义函数:unsigned short int htons(unsigned short int hostshort);函数说明:htons()用来将参数指定的16 位hostshort 转换成网络字符顺序.返回值:返回对应的网络字符顺序.定义函数:unsigned long int htonl(unsigned long int hostlong);函数说明:htonl ()用来将参数指定的32 位hostlong 转换成网络字符顺序.返回值:返回对应的网络字符顺序.定义函数:int inet_aton(const char *string, struct in_addr*addr);参数描述:1 输入参数string包含ASCII表示的IP地址。2 输出参数addr是将要用新的IP地址更新的结构。返回值:如果这个函数成功,函数的返回值非零,如果输入地址不正确则会返回零。使用这个函数并没有错误码存放在errno中,所以它的值会被忽略 1.5 监听,封装了bind和listen实现static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len) {if (bind(s,sa,len) == -1) {//绑定anetSetError(err, "bind: %s", strerror(errno));close(s);return ANET_ERR;}/* Use a backlog of 512 entries. We pass 511 to the listen() call because* the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);* which will thus give us a backlog of 512 entries */if (listen(s, 511) == -1) { //监听anetSetError(err, "listen: %s", strerror(errno));close(s);return ANET_ERR;}return ANET_OK;} 2.tcp连接建立堵塞和非堵塞网络套接字连接。int anetTcpConnect(char *err, char *addr, int port){return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONE);}int anetTcpNonBlockConnect(char *err, char *addr, int port){return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONBLOCK);}//具体实现#define ANET_CONNECT_NONE 0#define ANET_CONNECT_NONBLOCK 1static int anetTcpGenericConnect(char *err, char *addr, int port, int flags){int s;struct sockaddr_in sa;if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR)return ANET_ERR;sa.sin_family = AF_INET;sa.sin_port = htons(port);if (inet_aton(addr, &sa.sin_addr) == 0) {struct hostent *he;he = gethostbyname(addr);if (he == NULL) {anetSetError(err, "can't resolve: %s", addr);close(s);return ANET_ERR;}memcpy(&sa.sin_addr, he->h_addr, sizeof(struct in_addr));}if (flags & ANET_CONNECT_NONBLOCK) {if (anetNonBlock(err,s) != ANET_OK)return ANET_ERR;}if (connect(s, (struct sockaddr*)&sa, sizeof(sa)) == -1) {if (errno == EINPROGRESS &&flags & ANET_CONNECT_NONBLOCK)return s;anetSetError(err, "connect: %s", strerror(errno));close(s);return ANET_ERR;}return s;}2.1 结构体hostentstruct hostent {char *h_name;char **h_aliases;int h_addrtype;int h_length;char **h_addr_list;};其中,h_name – 地址的正式名称。 h_aliases – 空字节-地址的预备名称的指针。 h_addrtype –地址类型; 通常是AF_INET。  h_length – 地址的比特长度。 h_addr_list – 零字节-主机网络地址指针。网络字节顺序。 h_addr - h_addr_list中的第一地址。 gethostbyname() 成功时返回一个指向结构体 hostent 的指针,或者 是个空 (NULL) 指针。2.2 设置非堵塞int anetNonBlock(char *err, int fd){int flags;/* Set the socket non-blocking.* Note that fcntl(2) for F_GETFL and F_SETFL can't be* interrupted by a signal. */if ((flags = fcntl(fd, F_GETFL)) == -1) {anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));return ANET_ERR;}if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));return ANET_ERR;}return ANET_OK;}2.3 文件控制fcntl定义函数 int fcntl(int fd, int cmd);int fcntl(int fd, int cmd, long arg);int fcntl(int fd, int cmd, struct flock *lock);fcntl()针对(文件)描述符提供控制.实例:int flags;/* 设置为非阻塞*/if (fcntl(socket_descriptor, F_SETFL, flags | O_NONBLOCK) < 0){/* Handle error */}/* 设置为阻塞 */if ((flags = fcntl(sock_descriptor, F_GETFL, 0)) < 0){/* Handle error */}3. tcp接收,在网络套接字上新增连接int anetTcpAccept(char *err, int s, char *ip, int *port) {int fd;struct sockaddr_in sa;socklen_t salen = sizeof(sa);if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)return ANET_ERR;i
深入redis内部--初始化服务器
初始化服务器代码如下:void initServer() {int j;signal(SIGHUP, SIG_IGN);signal(SIGPIPE, SIG_IGN);setupSignalHandlers();if (server.syslog_enabled) {openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,server.syslog_facility);}server.current_client = NULL;server.clients = listCreate(); //创建客户队列server.clients_to_close = listCreate(); //创建将关闭的客户队列server.slaves = listCreate(); //创建从机队列server.monitors = listCreate(); //创建监控队列server.slaveseldb = -1; /* Force to emit the first SELECT command. */server.unblocked_clients = listCreate(); //创建非堵塞客户队列server.ready_keys = listCreate(); //创建可读key队列createSharedObjects(); // 创建共享对象adjustOpenFilesLimit(); //改变可打开文件的最大数量server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR); //创建事件处理server.db = zmalloc(sizeof(redisDb)*server.dbnum); //分别db内存/* Open the TCP listening socket for the user commands. */if (listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR) //监听端口exit(1);/* Open the listening Unix domain socket. */if (server.unixsocket != NULL) {unlink(server.unixsocket); /* don't care if this fails */server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm);if (server.sofd == ANET_ERR) {redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr);exit(1);}}/* Abort if there are no listening sockets at all. */if (server.ipfd_count == 0 && server.sofd < 0) {redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting.");exit(1);}/* Create the Redis databases, and initialize other internal state. */for (j = 0; j < server.dbnum; j++) {server.db[j].dict = dictCreate(&dbDictType,NULL);server.db[j].expires = dictCreate(&keyptrDictType,NULL);server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);server.db[j].ready_keys = dictCreate(&setDictType,NULL);server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);server.db[j].id = j;server.db[j].avg_ttl = 0;}server.pubsub_channels = dictCreate(&keylistDictType,NULL);server.pubsub_patterns = listCreate();listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);server.cronloops = 0;server.rdb_child_pid = -1;server.aof_child_pid = -1;aofRewriteBufferReset();server.aof_buf = sdsempty();server.lastsave = time(NULL); /* At startup we consider the DB saved. */server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */server.rdb_save_time_last = -1;server.rdb_save_time_start = -1;server.dirty = 0;server.stat_numcommands = 0;server.stat_numconnections = 0;server.stat_expiredkeys = 0;server.stat_evictedkeys = 0;server.stat_starttime = time(NULL);server.stat_keyspace_misses = 0;server.stat_keyspace_hits = 0;server.stat_peak_memory = 0;server.stat_fork_time = 0;server.stat_rejected_conn = 0;server.stat_sync_full = 0;server.stat_sync_partial_ok = 0;server.stat_sync_partial_err = 0;memset(server.ops_sec_samples,0,sizeof(server.ops_sec_samples));server.ops_sec_idx = 0;server.ops_sec_last_sample_time = mstime();server.ops_sec_last_sample_ops = 0;server.unixtime = time(NULL);server.mstime = mstime();server.lastbgsave_status = REDIS_OK;server.repl_good_slaves_count = 0;/* Create the serverCron() time event, that's our main way to process* background operations. */if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {redisPanic("Can't create the serverCron time event.");exit(1);}/* Create an event handler for accepting new connections in TCP and Unix* domain sockets. */for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR){redisPanic("Unrecoverable error creating server.ipfd file event.");}}if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");/* Open the AOF file if needed. */if (server.aof_state == REDIS_AOF_ON) {server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);if (server.aof_fd == -1) {redisLog(REDIS_WARNING, "Can't open the append-only file: %s",strerror(errno));exit(1);}}/* 32 bit instances are limited to 4GB of address space, so if there is* no explicit limit in the user provided configuration we set a limit* at 3 GB using maxmemory with 'noeviction' policy'. This avoids* useless crashes of the Redis instance for out of memory. */if (server.arch_bits == 32 && server.maxmemory == 0) {redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");server.maxmemory = 3072LL*(1024*1024); /* 3 GB */server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;}replicationScriptCacheInit();scriptingInit();slowlogInit();bioInit();}1.1 信号处理signal(SIGHUP, SIG_IGN);signal(SIGPIPE, SIG_IGN);setupSignalHandlers();signal语法:#include <signal.h>void (*signal(int sig, void (*func)(int)))(
Redis数据持久化机制AOF原理分析一---转
http://blog.csdn.net/acceptedxukai/article/details/18136903http://blog.csdn.net/acceptedxukai/article/details/18181563本文所引用的源码全部来自Redis2.8.2版本。Redis AOF数据持久化机制的实现相关代码是redis.c, redis.h, aof.c, bio.c, rio.c, config.c在阅读本文之前请先阅读Redis数据持久化机制AOF原理分析之配置详解文章,了解AOF相关参数的解析,文章链接http://blog.csdn.net/acceptedxukai/article/details/18135219转载请注明,文章出自http://blog.csdn.net/acceptedxukai/article/details/18136903下面将介绍AOF数据持久化机制的实现 Server启动加载AOF文件数据 Server启动加载AOF文件数据的执行步骤为:main() -> initServerConfig() -> loadServerConfig() -> initServer() -> loadDataFromDisk()。initServerConfig()主要为初始化默认的AOF参数配置;loadServerConfig()加载配置文件redis.conf中AOF的参数配置,覆盖Server的默认AOF参数配置,如果配置appendonly on,那么AOF数据持久化功能将被激活,server.aof_state参数被设置为REDIS_AOF_ON;loadDataFromDisk()判断server.aof_state == REDIS_AOF_ON,结果为True就调用loadAppendOnlyFile函数加载AOF文件中的数据,加载的方法就是读取AOF文件中数据,由于AOF文件中存储的数据与客户端发送的请求格式相同完全符合Redis的通信协议,因此Server创建伪客户端fakeClient,将解析后的AOF文件数据像客户端请求一样调用各种指令,cmd->proc(fakeClient),将AOF文件中的数据重现到Redis Server数据库中。 [cpp] view plaincopyprint? /* Function called at startup to load RDB or AOF file in memory. */  void loadDataFromDisk(void) {      long long start = ustime();      if (server.aof_state == REDIS_AOF_ON) {          if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)              redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);      } else {          if (rdbLoad(server.rdb_filename) == REDIS_OK) {              redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",                  (float)(ustime()-start)/1000000);          } else if (errno != ENOENT) {              redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));              exit(1);          }      }  }  Server首先判断加载AOF文件是因为AOF文件中的数据要比RDB文件中的数据要新。  [cpp] view plaincopyprint? int loadAppendOnlyFile(char *filename) {      struct redisClient *fakeClient;      FILE *fp = fopen(filename,"r");      struct redis_stat sb;      int old_aof_state = server.aof_state;      long loops = 0;        //redis_fstat就是fstat64函数,通过fileno(fp)得到文件描述符,获取文件的状态存储于sb中,      //具体可以参考stat函数,st_size就是文件的字节数      if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {          server.aof_current_size = 0;          fclose(fp);          return REDIS_ERR;      }        if (fp == NULL) {//打开文件失败          redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));          exit(1);      }        /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI      * to the same file we're about to read. */      server.aof_state = REDIS_AOF_OFF;        fakeClient = createFakeClient(); //建立伪终端      startLoading(fp); // 定义于 rdb.c ,更新服务器的载入状态        while(1) {          int argc, j;          unsigned long len;          robj **argv;          char buf[128];          sds argsds;          struct redisCommand *cmd;            /* Serve the clients from time to time */          // 有间隔地处理外部请求,ftello()函数得到文件的当前位置,返回值为long          if (!(loops++ % 1000)) {              loadingProgress(ftello(fp));//保存aof文件读取的位置,ftellno(fp)获取文件当前位置              aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);//处理事件          }          //按行读取AOF数据          if (fgets(buf,sizeof(buf),fp) == NULL) {              if (feof(fp))//达到文件尾EOF                  break;              else                  goto readerr;          }          //读取AOF文件中的命令,依照Redis的协议处理          if (buf[0] != '*') goto fmterr;          argc = atoi(buf+1);//参数个数          if (argc < 1) goto fmterr;            argv = zmalloc(sizeof(robj*)*argc);//参数值          for (j = 0; j < argc; j++) {              if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;              if (buf[0] != '$') goto fmterr;              len = strtol(buf+1,NULL,10);//每个bulk的长度              argsds = sdsnewlen(NULL,len);//新建一个空sds              //按照bulk的长度读取              if (len && fread(argsds,len,1,fp) == 0) goto fmterr;              argv[j] = createObject(REDIS_STRING,argsds);              if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF 跳过rn*/          }            /* Command lookup */          cmd = lookupCommand(argv[0]->ptr);          if (!cmd) {              redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr);              exit(1);          }          /* Run the command in the context of a fake client */          fakeClient->argc = argc;          fakeClient->argv = argv;          cmd->proc(fakeClient);//执行命令            /* The fake client should not have a reply */          redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);          /* The fake client should never get blocked */          redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);            /* Clean up. Command code may have changed argv/argc so we use the          * argv/argc of the client instead of the local variables. */          for (j = 0; j < fakeClient->argc; j++)              decrR
Redis双机热备方案--转
http://luyx30.blog.51cto.com/1029851/1350832参考资料: http://patrick-tang.blogspot.com/2012/06/redis-keepalived-failover-system.htmlhttp://deidara.blog.51cto.com/400447/302402http://my.oschina.net/guol/blog/182491http://shiguanghui.iteye.com/blog/2001499 背景 目前,Redis集群的官方方案还处在开发测试中,未集成到稳定版中。且目前官方开发中的Redis Cluster提供的功能尚不完善(可参考官方网站或http://www.redisdoc.com/en/latest/topic/cluster-spec.html),在生产环境中不推荐使用。通过调研发现市面上要实现采用单一的IP来访问,大多采用keepalived实现redis的双机热备作为过渡方案。 环境部署 环境介绍:    Master: 192.168.1.218     redis,keepalived     Slave: 192.168.1.219        redis,keepalived     Virtural IP Address (VIP):  192.168.1.220 设计思路:两个redis server主从备份。提供redis 服务高可用;两个keepalived 服务主从备份,提供VIP 服务的高可用。1)每台redis server分别有主,从两个配置文件(redis_master.conf, redis_slave.conf),通过启动脚本启动服务,启动脚本会检测这个redis集群中的其他服务器的角色,如果有master 服务存在,则以slave角色启动,否则将自己以master服务器启动;2) keepalived 监控脚本,定时(频率:每秒一次)检测当前服务器是否获取集群VIP,如果获取集群VIP,则将本服务器上的redis服务器设置为master。同时将远端的其他redis服务器设置为slave;保证获取集群VIP的redis 服务器角色为master,其他的设置为slave。3) keepalived 监控脚本,还会自动检测当前redis服务器是否正常,如果连续两次检测异常,则停掉本本机的keepalived服务,释放集群VIP,让去漂移到其他可以提供redis 服务的服务器上;4)当 Master 与 Slave 均运作正常时, Master负责服务,Slave负责同步数据;当 Master 挂掉,Slave 正常时, Slave接管服务,同时关闭主从复制功能;当 Master 恢复正常,则从Slave同步数据,同步数据之后关闭主从复制功能,恢复Master身份,于此同时Slave等待Master同步数据完成之后,恢复Slave身份。然后依次循环。 实施步骤:----创建专用用户useradd -g develop redisadmin    echo Hisun@1125|passwd --stdin redisadmin说明:以下部署过程都是在root(或具备sudo权限的账号)账户下进行。----安装配置redis1.下载redis源码cdwget http://download.redis.io/releases/redis-2.8.3.tar.gz2.安装redistar -zxvf redis-2.8.3.tar.gzcd redis-2.8.3#reds的安装可以不用执行configuremake#测试make test####在速度较慢的机器上执行make test可能出现下列错误,无影响#*** [err]: Test replication partial resync: no backlog in tests/integration/replication-psync.tcl3.配置redis#创建redis主目录mkdir -p /usr/local/redis-2.8.3/{bin,conf,logs}cp -a -R -p src/redis-server /usr/local/redis-2.8.3/bin/cp -a -R -p src/redis-cli /usr/local/redis-2.8.3/bin/cp -a -R -p src/redis-benchmark /usr/local/redis-2.8.3/bin/cp -a -R -p src/redis-sentinel /usr/local/redis-2.8.3/bin/cp -a -R -p src/redis-check-dump /usr/local/redis-2.8.3/bin/cp -a -R -p src/redis-check-aof /usr/local/redis-2.8.3/bin/#创建redis启动脚本vi /usr/local/redis-2.8.3/redis-start.sh####以下为master上的配置,slave上的配置只需要修改对应的LOCALIP和REMOTEIP即可。#!/bin/bashREDISPATH=/usr/local/redis-2.8.3REDISCLI=$REDISPATH/bin/redis-cliLOGFILE=$REDISPATH/logs/redis-state.logLOCALIP=192.168.1.218REMOTEIP=192.168.1.219REMOTEREDISROLE=`$REDISCLI -h $REMOTEIP info | grep "role"`if grep "role:master" <<< $REMOTEREDISROLE ; then#start as slave$REDISPATH/bin/redis-server $REDISPATH/conf/redis_slave.confif [ "$?" == "0" ];thenecho "[INFO]`date +%F/%H:%M:%S` :$LOCALIP start as slave successful." >> $LOGFILEelseecho "[ERROR]`date +%F/%H:%M:%S` :$LOCALIP start as slave error." >> $LOGFILEfielse#start as master$REDISPATH/bin/redis-server $REDISPATH/conf/redis_master.confif [ "$?" == "0" ];thenecho "[INFO]`date +%F/%H:%M:%S` :$LOCALIP start as master successful." >> $LOGFILEelseecho "[ERROR]`date +%F/%H:%M:%S` :$LOCALIP start as master error." >> $LOGFILEfifi#创建redis关闭脚本vi /usr/local/redis-2.8.3/redis-stop.sh####以下为master上的配置,slave上的配置相同。#!/bin/bashREDISPATH=/usr/local/redis-2.8.3LOGFILE=$REDISPATH/logs/redis-state.logkill -9 `ps -ef|grep '/bin/redis-server'|grep -v grep|awk '{print $2}'`if [ "$?" == "0" ];thenecho "[INFO]`date +%F/%H:%M:%S` :redis shutdown completed!" >> $LOGFILEelseecho "[ERROR]`date +%F/%H:%M:%S` :redis is not started." >> $LOGFILEfi#创建redis配置文件cp -a -R -p redis.conf /usr/local/redis-2.8.3/conf/redis_master.confcp -a -R -p redis.conf /usr/local/redis-2.8.3/conf/redis_slave.conf#修改redis_master.conf对应配置项:####192.168.1.218主服务器redis_master.conf对应配置项#######daemonize nodaemonize yes#bind 127.0.0.1bind 192.168.1.218logfile "/usr/local/redis-2.8.3/logs/redis.log"#其他配置依据实际生产环境修改############################################################192.168.1.219从服务器redis_master.conf对应配置项#######daemonize nodaemonize yes#bind 127.0.0.1bind 192.168.1.219logfile "/usr/local/redis-2.8.3/logs/redis.log"#其他配置依据实际生产环境修改#########################################################修改redis_slave.conf对应配置项:####192.168.1.218主服务器redis_slave.conf对应配置项########daemonize nodaemonize yes#bind 127.0.0.1bind 192.168.1.218logfile "/usr/local/redis-2.8.3/logs/redis.log"# slaveof <masterip> <masterport>slaveof 192.168.1.219 6379#其他配置依据实际生产环境修改############################################################192.168.1.219从服务器redis_slave.conf对应配置项########daemonize nodaemonize yes#bind 127.0.0.1bind 192.168.1.219logfile "/usr/local/redis-2.8.3/logs/redis.log"# slaveof <masterip> <masterport>slaveof 192.168.1.218 6379#其他配置依据实际生产环境修改#########################################################修改redis的属主和权限chmod –R 750 /usr/local/redis-2.8.3/chown –R redisadmin:develop /usr/local/redis-2.8.3/----安装配置keepalived   1.下载keepalived源码Release 1.2.9注意:最新版为1.2.10测试过程中出错.   wget http://www.keepalived.org/software/keepalived-1.2.9.tar.gz2.安装keepal
HAProxy advanced Redis health check---ref
http://blog.exceliance.fr/2014/01/02/haproxy-advanced-redis-health-check/HAProxy advanced Redis health checkPosted on January 2, 2014 by Baptiste AssmannIntroductionRedis is an opensource nosql database working on a key/value model.One interesting feature in Redis is that it is able to write data to disk as well as a master can synchronize many slaves.HAProxy can load-balance Redis servers with no issues at all.There is even a built-in health check for redis in HAProxy.Unfortunately, there was no easy way for HAProxy to detect the status of a redis server: master or slave node. Hence people usually hacks this part of the architecture.As written in the title of this post, we’ll learn today how to make a simple Redisinfrastructure thanks to newest HAProxy advanced send/expect health checks.This feature is available in HAProxy 1.5-dev20 and above.Purpose is to make the redis infrastructure as simple as possible and ease fail over for the web servers. HAProxy will have to detect which node is MASTER and route all the connection to it.Redis high availability diagram with HAProxyBelow, an ascii art diagram of HAProxy load-balancing Redis servers:12345678910111213+----+ +----+ +----+ +----+| W1 | | W2 | | W3 | | W4 |   Web application servers+----+ +----+ +----+ +----+          |   |     /          |   |    /          |   |   /        +---------+        | HAProxy |        +---------+           /         +----+ +----+       | R1 | | R2 |           Redis servers       +----+ +----+The scenario is simple:  * 4 web application servers need to store and retrieve data to/from a Redis database  * one (better using 2) HAProxy servers which load-balance redis connections  * 2 (at least) redis servers in an active/standby mode with replicationConfigurationBelow, is the HAProxy configuration for the1234567891011121314151617181920defaults REDIS mode tcp timeout connect  4s timeout server  30s timeout client  30s frontend ft_redis bind 10.0.0.1:6379 name redis default_backend bk_redis backend bk_redis option tcp-check tcp-check send PINGrn tcp-check expect string +PONG tcp-check send info replicationrn tcp-check expect string role:master tcp-check send QUITrn tcp-check expect string +OK server R1 10.0.0.11:6379 check inter 1s server R2 10.0.0.12:6379 check inter 1sThe HAProxy health check sequence above allows to consider the Redis master server as UP in the farm and redirect connections to it.When the Redis master server fails, the remaining nodes elect a new one. HAProxy will detect it thanks to its health check sequence.It does not require third party tools and make fail over transparent.
深入redis内部之redis启动过程之一
redis作为一个服务器,它的启动是从main函数开始的。redis.c1. 进程重命名#ifdef INIT_SETPROCTITLE_REPLACEMENTspt_init(argc, argv);#endif定义在config.h/* Check if we can use setproctitle(). 修改进程名称* BSD systems have support for it, we provide an implementation for* Linux and osx. */#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__) //bsd(unix的变种)宏定义#define USE_SETPROCTITLE#endif#if (defined __linux || defined __APPLE__) //linux和苹果的宏定义#define USE_SETPROCTITLE#define INIT_SETPROCTITLE_REPLACEMENTvoid spt_init(int argc, char *argv[]);void setproctitle(const char *fmt, ...);#endifhttp://www.baike.com/wiki/BSDBSD的开源後裔 不同的BSD操作系统针对不同的用途及用户,可应用于多种硬件构架。在政府机构中常能看到BSD的身影。虽然下面的BSD功能可能并非独有,但每种BSD在各自的领域,都逐渐具有了良好声誉,有的专注于性能,有的则以安全见长。DragonflyBSD是最年轻的BSD,专门提供比FreeBSD更优秀的对称多处理机系统,并使内核直接支持SSI集群,以取得更好的计算效果。这个项目在此方向上,才开始数年,主要关注i386平台。 FreeBSD在BSD家族中以易用性与高性能而着称,由于主要用作微处理器架构,如i386、AMD's 64-bit i386扩展,所以FreeBSD非常关注多处理器。FreeBSD在i386和amd64服务器上,运行地非常好,当然,它也可以在其他硬件构架上运行。 NetBSD拥有特别出色的可移植性,能在多达54种平台上运行,小到嵌入式的掌上设备,大到服务器群,NetBSD甚至还在国际空间站中服务。OpenBSD在密码学和安全方面特别出众,可移植性也很好,当然略逊于NetBSD。安全功能如OpenSSH,是由OpenBSD率先开创的。OpenBSD作为安全请求机器(security demanding machines)运行,受到好评。必须注意的是,上面所罗列的,更多地是基于感性认识,并针对其开发焦点,并没有严格地比较规则。实际而言,每种具体的BSD都可担当许多角色任务。2. 设置locale setlocale(LC_COLLATE,"");http://manpages.ubuntu.com/manpages/lucid/en/man3/setlocale.3.htmlNAMEsetlocale - set the current localeSYNOPSIS#include <locale.h>char *setlocale(int category, const char *locale);DESCRIPTIONThe setlocale() function is used to set or query the program’s currentlocale.If locale is not NULL, the program’s current locale is modifiedaccording to the arguments. The argument category determines whichparts of the program’s current locale should be modified.LC_ALL for all of the locale.LC_COLLATEfor regular expression matching (it determines the meaning ofrange expressions and equivalence classes) and string collation.LC_CTYPEfor regular expression matching, character classification,conversion, case-sensitive comparison, and wide characterfunctions.LC_MESSAGESfor localizable natural-language messages.LC_MONETARYfor monetary formatting.LC_NUMERICfor number formatting (such as the decimal point and thethousands separator).LC_TIMEfor time and date formatting.The argument locale is a pointer to a character string containing therequired setting of category. Such a string is either a well-knownconstant like "C" or "da_DK" (see below), or an opaque string that wasreturned by another call of setlocale().If locale is "", each part of the locale that should be modified is setaccording to the environment variables. The details areimplementation-dependent. For glibc, first (regardless of category),the environment variable LC_ALL is inspected, next the environmentvariable with the same name as the category (LC_COLLATE, LC_CTYPE,LC_MESSAGES, LC_MONETARY, LC_NUMERIC, LC_TIME) and finally theenvironment variable LANG. The first existing environment variable isused. If its value is not a valid locale specification, the locale isunchanged, and setlocale() returns NULL.The locale "C" or "POSIX" is a portable locale; its LC_CTYPE partcorresponds to the 7-bit ASCII character set.A locale name is typically of the formlanguage[_territory][.codeset][@modifier], where language is an ISO 639language code, territory is an ISO 3166 country code, and codeset is acharacter set or encoding identifier like ISO-8859-1 or UTF-8. For alist of all supported locales, try "locale -a", cf. locale(1).If locale is NULL, the current locale is only queried, not modified.On startup of the main program, the portable "C" locale is selected asdefault. A program may be made portable to all locales by calling:setlocale(LC_ALL, "");after program initialization, by using the values returned from alocaleconv(3) call for locale-dependent information, by using themulti-byte and wide character functions for text processing ifMB_CUR_MAX > 1, and by using strcoll(3), wcscoll(3) or strxfrm(3),wcsxfrm(3) to compare strings.RETURN VALUEA successful call to setlocale() returns an opaque string thatcorresponds to the locale set. This string may be allocated in staticstorage. The string returned is such that a subsequent call with thatstring and its associated category will restore that part of theprocess’s locale. The return value is NULL if the request cannot behonored.CONFORMING TOC89, C99, POSIX.1-2001.NOTESLinux (that is, glibc) supports the portable locales "C" and "POSIX".In the good old days there used to be support for the European Latin-1"ISO-8859-1" locale (e.g., in libc-4.5.21 and libc-4.6.27), and theRussian "KOI-8" (more precisely, "koi-8r") locale (e.g., inlibc-4.6.27), so that having an environment variableLC_CTYPE=ISO-8859-1 sufficed to make isprint(3) return the rightanswer. These days non-English speaking Eur
深入redis内部之redis启动过程之二
接上文,继续分析代码1. 设置线程安全模式zmalloc_enable_thread_safeness();/*设置线程安全标识符为1*/void zmalloc_enable_thread_safeness(void) {zmalloc_thread_safe = 1;}2. 内存溢出处理zmalloc_set_oom_handler(redisOutOfMemoryHandler);/*内存溢出的调用方法*/   void zmalloc_set_oom_handler(void (*oom_handler)(size_t)) {      zmalloc_oom_handler = oom_handler;      }//调用下一级static void (*zmalloc_oom_handler)(size_t) = zmalloc_default_oom;//最终调用static void zmalloc_default_oom(size_t size) {fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytesn",size);fflush(stderr);abort();} 3.生成hash seedsrand(time(NULL)^getpid());gettimeofday(&tv,NULL);dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid()); 3.1 time( )函数头文件:#include <time.h>函数定义:time_t time(time_t *timer)功能描述:该函数返回从1970年1月1日00时00分00秒至今所经过的秒数。如果time_t *timer非空指针,函数也会将返回值存到timer指针指向的内存。返回值:成功则返回秒数,失败则返回((time_t)-1)值,错误原因存于errno中。3.2 getpid(取得进程识别码)表头文件 #include<unistd.h>定义函数 pid_t getpid(void);函数说明 getpid()用来取得目前进程的进程识别码。3.3 srand()函数 void srand(unsigned seed) 初始化随机数发生器。3.4 gettimeofday()函数#include<sys/time.h>int gettimeofday(struct timeval*tv,struct timezone *tz )gettimeofday()会把目前的时间用tv 结构体返回,当地时区的信息则放到tz所指的结构中3.5 设置hash seedstatic uint32_t dict_hash_function_seed = 5381;void dictSetHashFunctionSeed(uint32_t seed) {dict_hash_function_seed = seed;}4. 检查是否sentime模式(集群的临时方案)server.sentinel_mode = checkForSentinelMode(argc,argv);//根据启动的参数来检查是否sentinel模式/* Returns 1 if there is --sentinel among the arguments or if* argv[0] is exactly "redis-sentinel". */int checkForSentinelMode(int argc, char **argv) {int j;if (strstr(argv[0],"redis-sentinel") != NULL) return 1;for (j = 1; j < argc; j++)if (!strcmp(argv[j],"--sentinel")) return 1;return 0;} 
Redis Cluster 101
原文地址:https://raw.githubusercontent.com/antirez/redis/3.0/00-RELEASENOTESRedis 3.0 release notes--[ Redis 3.0.1 ] Release date: 5 May 2015--[ Redis 3.0.0 ] Release date: 1 Apr 2015--[ Redis 3.0.0 RC6 (version 2.9.106) ] Release date: 24 mar 2015--[ Redis 3.0.0 RC5 (version 2.9.105) ] Release date: 20 mar 2015--[ Redis 3.0.0 RC4 (version 2.9.104) ] Release date: 13 feb 2015--[ Redis 3.0.0 RC3 (version 2.9.103) ] Release date: 30 jan 2015--[ Redis 3.0.0 RC2 (version 2.9.102) ] Release date: 13 jan 2015--[ Redis 3.0.0 RC1 (version 2.9.101) ] Release date: 9 oct 2014官方地址:http://redis.io/topics/cluster-tutorialRedis cluster tutorialThis document is a gentle introduction to Redis Cluster, that does not use complex to understand distributed systems concepts. It provides instructions about how to setup a cluster, test, and operate it, without going into the details that are covered in the Redis Cluster specification but just describing how the system behaves from the point of view of the user.Note that if you plan to run a serious Redis Cluster deployment, the more formal specification is an highly suggested reading.Redis cluster is currently alpha quality code, please get in touch in the Redis mailing list or open an issue in the Redis Github repository if you find any issue.Redis Cluster 101Redis Cluster provides a way to run a Redis installation where data is automatically sharded across multiple Redis nodes.Commands dealing with multiple keys are not supported by the cluster, because this would require moving data between Redis nodes, making Redis Cluster not able to provide Redis-alike performances and predictable behavior under load.Redis Cluster also provides some degree of availability during partitions, that is in practical terms the ability to continue the operations when some nodes fail or are not able to communicate.So in practical terms, what you get with Redis Cluster?The ability to automatically split your dataset among multiple nodes.The ability to continue operations when a subset of the nodes are experiencing failures or are unable to communicate with the rest of the cluster.Redis Cluster data shardingRedis Cluster does not use consistency hashing, but a different form of sharding where every key is conceptually part of what we call an hash slot.There are 16384 hash slots in Redis Cluster, and to compute what is the hash slot of a given key, we simply take the CRC16 of the key modulo 16384.Every node in a Redis Cluster is responsible of a subset of the hash slots, so for example you may have a cluster with 3 nodes, where:Node A contains hash slots from 0 to 5500.Node B contains hash slots from 5501 to 11000.Node C contains hash slots from 11001 to 16384.This allows to add and remove nodes in the cluster easily. For example if I want to add a new node D, I need to move some hash slot from nodes A, B, C to D. Similarly if I want to remove node A from the cluster I can just move the hash slots served by A to B and C. When the node A will be empty I can remove it from the cluster completely.Because moving hash slots from a node to another does not require to stop operations, adding and removing nodes, or changing the percentage of hash slots hold by nodes, does not require any downtime.Redis Cluster master-slave modelIn order to remain available when a subset of nodes are failing or are not able to communicate with the majority of nodes, Redis Cluster uses a master-slave model where every node has from 1 (the master itself) to N replicas (N-1 additional slaves).In our example cluster with nodes A, B, C, if node B fails the cluster is not able to continue, since we no longer have a way to serve hash slots in the range 5501-11000.However if when the cluster is created (or at a latter time) we add a slave node to every master, so that the final cluster is composed of A, B, C that are masters, and A1, B1, C1 that are slaves, the system is able to continue if node B fails.Node B1 replicates B, so the cluster will elect node B1 as the new master and will continue to operate correctly.However note that if nodes B and B1 fail at the same time Redis Cluster is not able to continue to operate.Redis Cluster consistency guaranteesRedis Cluster is not able to guarantee strong consistency. In practical terms this means that under certain conditions it is possible that Redis Cluster will forget a write that was acknowledged by the system.The first reason why Redis Cluster can lose writes is because it uses asynchronous replication. This means that during writes the following happens:Your client writes to the master B.The master B replies OK to your client.The master B propagates the write to its slaves B1, B2 and B3.As you can see B does not wait for an acknowledge from B1, B2, B3 before replying to the client, since this would be a prohibitive latency penalty for Redis, so if your client writes something, B acknowledges the write, but crashes before being able to send the write to its slaves, one of the slaves can be promoted to master losing the write forever.This is very similar to what happens with most databases that are configur
keepalived + haproxy 实现web 双主模型的高可用负载均衡--转
原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。http://xz159065974.blog.51cto.com/8618592/14058121、本文的架构图:  阐述各服务器用途:   1、haproxy在本构架中实现的是:负载均衡   2、keepalived实现对haproxy的高可用   3、apache static 实现静态页面的访问   4、aoache dynamic实现动态页面的访问,图中有两个是实现负载均衡的 配置各功能模块:   一、配置haproxy和keepalived 验证:        1、当一台keepalived宕机后,VIP会不会转移到另外一台服务器        2、当一台haproxy服务出故障,VIP会不会转移到另外一台服务器 注意:        那如果keepalived宕机了,haproxy服务还正常运行,我们要不要让另外一台服务器把VIP夺过去呢?        理论上来讲:最好不要,但是我们的keepalived中的脚本监控着haproxy的进程,keepalived宕机之后,就无从得知haproxy的健康状态,也不能决定自己的优先权priority降不降低了。所以,理论上来讲最好不要,但是实际中光靠keepalived是做不到的。  配置: 1、给两台服务器分别安装上keepalived 1[root@station139 ~]# yum -y install keepalived  2、配置keepalived 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253[root@node2 ~]# vim /etc/keepalived/keepalived.conf! Configuration File for keepalivedglobal_defs {   notification_email {        root@localhost    配置服务状态变化发送邮件到哪个地址   }   notification_email_from kaadmin@localhost   smtp_server 127.0.0.1    给哪个smtp服务器发邮件   smtp_connect_timeout 30    联系上面smtp服务器30秒联系不上,就超时   router_id LVS_DEVEL}vrrp_script chk_haproxy {     本脚本是用来检测该服务器上haproxy服务的健康状态的    script "killall -0 haproxy"    interval 1    weight -2}vrrp_instance VI_1 {    state MASTER   这太服务器为主的keepalived    interface eth0    通过eth0网卡广播    virtual_router_id 200  虚拟路由id要改,如果在一个局域网中有多个keepalived集群    priority 100   优先级    advert_int 1    authentication {        auth_type PASS        auth_pass 11112222    }    track_script {        chk_haproxy    }    virtual_ipaddress {        192.168.1.200   本机的虚拟IP   }    notify_master "/etc/keepalived/notify.sh master" 各不用状态下运行的脚本    notify_backup "/etc/keepalived/notify.sh backup"    notify_fault "/etc/keepalived/notify.sh fault"}vrrp_instance VI_2 {   另外一台主keepalived的从    state BACKUP    interface eth0    virtual_router_id 57    priority 99   设置要比另外一台主keepalived的优先级低    advert_int 1    authentication {        auth_type PASS        auth_pass 1111    }    track_script {        chk_mantaince_down   }    virtual_ipaddress {        192.168.1.201   }}3、写keepalived处在不同状态下所运行的脚本 1234567891011121314151617181920212223242526272829303132333435#!/bin/bash# Author: MageEdu <[email protected]># description: An example of notify script#vip=192.168.1.200contact='root@localhost'notify() {    mailsubject="`hostname` to be $1: $vip floating"    mailbody="`date '+%F %H:%M:%S'`: vrrp transition, `hostname` changed to be $1"    echo $mailbody | mail -s "$mailsubject" $contact}                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   case "$1" in    master)        notify master        /etc/rc.d/init.d/haproxy start        exit 0    ;;    backup)        notify backup        /etc/rc.d/init.d/haproxy stop        exit 0    ;;    fault)        notify fault        /etc/rc.d/init.d/haproxy stop        exit 0    ;;    *)        echo 'Usage: `basename $0` {master|backup|fault}'        exit 1    ;;esac给脚本以执行权限:chmod  +x  /etc/keepalived/notify.sh 4、配置haproxy   因为要实现动静分离,那么我们在配置文件中,就要定义动态资源静态资源转移到不同的服务上去 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667[root@node2 ~]# yum -y install haproxy  安装haproxy[root@node2 ~]# vim /etc/haproxy/haproxy.cfg    #    log         127.0.0.1 local2    chroot      /var/lib/haproxy    pidfile     /var/run/haproxy.pid    maxconn     4000    user        haproxy    group       haproxy    daemon    # turn on stats unix socket    stats socket /var/lib/haproxy/stats#---------------------------------------------------------------------# common defaults that all the 'listen' and 'backend' sections will# use if not designated in their block#---------------------------------------------------------------------defaults    mode                    http      指定haproxy工作模式为http    log                     global    option                  httplog    option                  dontlognull    option http-server-close       当客户端超时时,允许服务端断开连接    option forwardfor       except 127.0.0.0/8  在http的响应头部加入forwardfor    option                  redispa
High performance
XMemcached IntroductionXMemcached is a new java memcached client. Maybe you don't know "memcached" so far, you can check Here. It is a free & open source, high-performance, distributed memory object caching system, generic in nature, but intended for use in speeding up dynamic web applications by alleviating database load. And many users use it as memory database too. Memcached talk with client with self define protocol, XMemcached is a java client for it.There are two java clients before: the official client base on traditional blocked IO, maintained by Greg Whalin, the Spymemcached based on java NIO, maintained by Dustin Sallings. And there are some improved version base on them too. So, what's advantage of XMemcached, why we need a new java client for Memcached?XMemcached FeaturesHigh performanceXMemcached is a client which base on java NIO too, Java NIO is efficient (special under high concurrent), and use less resource than traditional blocked IO. The traditional blocked IO need create some connections to build a connection pool for improve efficiency. But NIO only need one connection (sure, NIO also support pool management), reduce the cost for thread creation and switch, it is more obvious under hight concurrent. So the performance of XMemcached and Spymemcached are very excellent, and in some point, XMemcached is more excellent than Spymemcached, you can check Java Memcached Clients Benchmark for details.Binary and Text protocolXMemcached support all protocols of memcached, also included Binary protocol from Memcached 1.4.0.Client distributionMemcached supports distribution by client, and XMemcached implements it, and supply implement of consistent hash algorithm.Weighted serverXMemcached can adjust weight of node for balance the load of memcached server, the weight is more high, the memcached server will store more data, and receive more load.Dynamically add/remove serverXMemcached can dynamically add/remove server, by JMX or programming, it is easy to extend server or replace server.JMX Monitor/ControlYou can monitor or control XMemcached client by JMX, you can set some parameters, view the STAT. data, add or remove server etc.Integration with Spring and Hibernate-memcachedJust like many projects, XMemcached also support integration with Spring framework. Hibernate-memcached is a open source project, it can use memcached as secondary cache of hibernate, the default is Spymemcached, you can use XMemcached too.NIO Connection PoolAs mentioned before, Java NIO will use one connection for one memcached server at most time, but XMemcached can also support connection pool. You can create a few connections to build a connection pool to one memcached server, it can improve the performance under high concurrent environment, and it is transparent to user. You must assure the data's independence or synchronization of data, there is no synchronization between connections to a server, you must make sure the data are all independent, or you can use CAS for atomic operation.ExpendableXMemcached is implemented under java NIO frameworkyanf4j, with clear architecture. (yanf4j is combined into XMemcached after XMemcached 1.2.5). You can find the classes UML diagram of XMemcached here.User GuideWe will show you some examples from simple to complex, then you can learn XMemcached easily.DependenceXMemcached depend onslf4jYou must download dependence or download whole XMemcached package with dependence before test following code.If you use mavenIf you build your project with maven, you can only add dependency to use XMemcached (only for 1.2.5+) <dependency>       <groupId>com.googlecode.xmemcached</groupId>       <artifactId>xmemcached</artifactId>       <version>${version}</version>  </dependency>Simple ExampleAs the general user, you need to add/get data from memcached at most time. If we have a memcached server, the IP address or host name is "host", the service port is 11211, you can check this simple example:    MemcachedClientBuilder builder =newXMemcachedClientBuilder(                                        AddrUtil.getAddresses("localhost:11211"));    MemcachedClient memcachedClient = builder.build();    try{                memcachedClient.set("hello",0,"Hello,xmemcached");                String value = memcachedClient.get("hello");                System.out.println("hello="+ value);                memcachedClient.delete("hello");                value = memcachedClient.get("hello");                System.out.println("hello="+ value);    }catch(MemcachedException e){                System.err.println("MemcachedClient operation fail");                e.printStackTrace();    }catch(TimeoutException e){                System.err.println("MemcachedClient operation timeout");                e.printStackTrace();    }catch(InterruptedException e){                // ignore    }    try{              //close memcached client                memcachedClient.shutdown();    }catch(IOExcep
Return value
redis提供了rate limit demo 如下所示:INCR keyAvailable since 1.0.0.Time complexity: O(1)Increments the number stored at key by one. If the key does not exist, it is set to 0 before performing the operation. An error is returned if the key contains a value of the wrong type or contains a string that can not be represented as integer. This operation is limited to 64 bit signed integers.Note: this is a string operation because Redis does not have a dedicated integer type. The string stored at the key is interpreted as a base-10 64 bit signed integer to execute the operation.Redis stores integers in their integer representation, so for string values that actually hold an integer, there is no overhead for storing the string representation of the integer.Return valueInteger reply: the value of key after the incrementExamplesredis> SET mykey "10"OKredis> INCR mykey(integer) 11redis> GET mykey"11"redis> Pattern: CounterThe counter pattern is the most obvious thing you can do with Redis atomic increment operations. The idea is simply send an INCR command to Redis every time an operation occurs. For instance in a web application we may want to know how many page views this user did every day of the year.To do so the web application may simply increment a key every time the user performs a page view, creating the key name concatenating the User ID and a string representing the current date.This simple pattern can be extended in many ways:It is possible to use INCR and EXPIRE together at every page view to have a counter counting only the latest N page views separated by less than the specified amount of seconds.A client may use GETSET in order to atomically get the current counter value and reset it to zero.Using other atomic increment/decrement commands like DECR or INCRBY it is possible to handle values that may get bigger or smaller depending on the operations performed by the user. Imagine for instance the score of different users in an online game.Pattern: Rate limiterThe rate limiter pattern is a special counter that is used to limit the rate at which an operation can be performed. The classical materialization of this pattern involves limiting the number of requests that can be performed against a public API.We provide two implementations of this pattern using INCR, where we assume that the problem to solve is limiting the number of API calls to a maximum of ten requests per second per IP address.Pattern: Rate limiter 1The more simple and direct implementation of this pattern is the following:FUNCTION LIMIT_API_CALL(ip)ts = CURRENT_UNIX_TIME()keyname = ip+":"+tscurrent = GET(keyname)IF current != NULL AND current > 10 THENERROR "too many requests per second"ELSEMULTIINCR(keyname,1)EXPIRE(keyname,10)EXECPERFORM_API_CALL()ENDBasically we have a counter for every IP, for every different second. But this counters are always incremented setting an expire of 10 seconds so that they'll be removed by Redis automatically when the current second is a different one.Note the used of MULTI and EXEC in order to make sure that we'll both increment and set the expire at every API call.Pattern: Rate limiter 2An alternative implementation uses a single counter, but is a bit more complex to get it right without race conditions. We'll examine different variants.FUNCTION LIMIT_API_CALL(ip):current = GET(ip)IF current != NULL AND current > 10 THENERROR "too many requests per second"ELSEvalue = INCR(ip)IF value == 1 THENEXPIRE(value,1)ENDPERFORM_API_CALL()ENDThe counter is created in a way that it only will survive one second, starting from the first request performed in the current second. If there are more than 10 requests in the same second the counter will reach a value greater than 10, otherwise it will expire and start again from 0.In the above code there is a race condition. If for some reason the client performs the INCR command but does not perform the EXPIRE the key will be leaked until we'll see the same IP address again.This can be fixed easily turning the INCR with optional EXPIRE into a Lua script that is send using the EVAL command (only available since Redis version 2.6).local currentcurrent = redis.call("incr",KEYS[1])if tonumber(current) == 1 thenredis.call("expire",KEYS[1],1)endThere is a different way to fix this issue without using scripting, but using Redis lists instead of counters. The implementation is more complex and uses more advanced features but has the advantage of remembering the IP addresses of the clients currently performing an API call, that may be useful or not depending on the application.FUNCTION LIMIT_API_CALL(ip)current = LLEN(ip)IF current > 10 THENERROR "too many requests per second"ELSEIF EXISTS(ip) == FALSEMULTIRPUSH(ip,ip)EXPIRE(ip,1)EXECELSERPUSHX(ip,ip)ENDPERFORM_API_CALL()ENDThe RPUSHX command only pushes the element if the key already exists.Note that we have a race here, but it is not a problem: EXISTS may return false but the key may be created by another client before we create it inside the MULTI / EXEC blo
一、CacheCloud是做什么的
https://github.com/sohutv/cachecloud一、CacheCloud是做什么的       CacheCloud提供一个Redis云管理平台:实现多种类型(Redis Standalone、Redis Sentinel、Redis Cluster)自动部署、解决Redis实例碎片化现象、提供完善统计、监控、运维功能、减少运维成本和误操作,提高机器的利用率,提供灵活的伸缩性,提供方便的接入客户端。二、CacheCloud提供哪些功能监控统计: 提供了机器、应用、实例下各个维度数据的监控和统计界面。一键开启: Redis Standalone、Redis Sentinel、Redis Cluster三种类型的应用,无需手动配置初始化。Failover: 支持哨兵,集群的高可用模式。伸缩: 提供完善的垂直和水平在线伸缩功能。完善运维: 提供自动运维和简化运维操作功能,避免纯手工运维出错。方便的客户端 方便快捷的客户端接入。元数据管理: 提供机器、应用、实例、用户信息管理。流程化: 提供申请,运维,伸缩,修改等完善的处理流程一键导入: 一键导入已经存在Redis三、CacheCloud解决什么问题1.部署成本       Redis多机(Redis-Sentinel, Redis-Cluster)部署和配置相对比较复杂,较容易出错。例如:100个redis数据节点组成的redis-cluster集群,如果单纯手工安装,既耗时又容易出错。2.实例碎片化       作为一个Redis管理员(可以看做redis DBA)需要帮助开发者管理上百个Redis-Cluster集群,分布在数百台机器上,人工维护成本很高,需要自动化运维工具。3. 监控、统计和管理不完善       一些开源的Redis监控和管理工具,例如:RedisLive(Python)、Redis Commander(Node.js),Redmon(Ruby)无论从功能的全面性(例如配置管理,支持Redis-Cluster等等)、扩展性很难满足需求。4. 运维成本       Redis的使用者需要维护各自的Redis,但是用户可能更加善于使用Redis实现各种功能,但是没有足够的精力和经验维护Redis。Redis的开发人员如同使用MySQL一样,不需要运维Mysql服务器,同样使用Redis服务,不要自己运维Redis,Redis由一些在Redis运维方面更有经验的人来维护(保证高可用,高扩展性),使得开发者更加关注于Redis使用本身。5. 伸缩性       本产品支持Redis最新的Redis-Sentinel、Redis-Cluster集群机构,既满足Redis高可用性、又能满足Redis的可扩展性,具有较强的容量和性能伸缩能力。6. 经济成本       机器利用率低,各个项目组的Redis较为分散的部署在各自服务器上,造成了大量闲置资源没有有效利用。7. 版本不统一       各个项目的Redis使用各种不同的版本,不便于管理和交互。四、CacheCloud提供的价值规模化自动运维: 降低运维成本,降低人为操作出错率。自由伸缩: 提供灵活的伸缩性,应用扩容/收缩成本降低,机器资源得到重复利用。团队提升,开源贡献:提升云产品开发设计经验,自己作为开发者和使用者。五、CacheCloud在搜狐的规模每天500+亿次命令调用2T+的内存空间1600+个Redis实例200+台机器六、CacheCloud环境需求JDK 7+Maven 3MySQL 5.5Redis 3
亿级别记录的mongodb分页查询java代码实现
1.准备环境1.1 mongodb下载1.2 mongodb启动 C:mongodbbinmongod --dbpath D:mongodbdata1.3 可视化mongo工具Robo 3T下载2.准备数据<dependency><groupId>org.mongodb</groupId><artifactId>mongo-java-driver</artifactId><version>3.6.1</version></dependency>java代码执行public static void main(String[] args) {try {/**** Connect to MongoDB ****/// Since 2.10.0, uses MongoClientMongoClient mongo = new MongoClient("localhost", 27017);/**** Get database ****/// if database doesn't exists, MongoDB will create it for youDB db = mongo.getDB("www");/**** Get collection / table from 'testdb' ****/// if collection doesn't exists, MongoDB will create it for youDBCollection table = db.getCollection("person");/**** Insert ****/// create a document to store key and valueBasicDBObject document=null;for(int i=0;i<100000000;i++) {document = new BasicDBObject();document.put("name", "mkyong"+i);document.put("age", 30);document.put("sex", "f");table.insert(document);}/**** Done ****/System.out.println("Done");} catch (UnknownHostException e) {e.printStackTrace();} catch (MongoException e) {e.printStackTrace();}}3.分页查询传统的limit方式当数据量较大时查询缓慢,不太适用。考虑别的方式,参考了logstash-input-mongodb的思路:publicdef get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size)collection = mongodb.collection(mongo_collection_name)# Need to make this sort by date in object id then get the first of the series# db.events_20150320.find().limit(1).sort({ts:1})return collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size)endcollection_name = collection[:name]@logger.debug("collection_data is: #{@collection_data}")last_id = @collection_data[index][:last_id]#@logger.debug("last_id is #{last_id}", :index => index, :collection => collection_name)# get batch of events starting at the last_place if it is setlast_id_object = last_idif since_type == 'id'last_id_object = BSON::ObjectId(last_id)elsif since_type == 'time'if last_id != ''last_id_object = Time.at(last_id)endendcursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size)使用java实现import java.net.UnknownHostException;import java.util.List;import org.bson.types.ObjectId;import com.mongodb.BasicDBObject;import com.mongodb.DB;import com.mongodb.DBCollection;import com.mongodb.DBCursor;import com.mongodb.DBObject;import com.mongodb.MongoClient;import com.mongodb.MongoException;public class Test {public static void main(String[] args) {int pageSize=50000;try {/**** Connect to MongoDB ****/// Since 2.10.0, uses MongoClientMongoClient mongo = new MongoClient("localhost", 27017);/**** Get database ****/// if database doesn't exists, MongoDB will create it for youDB db = mongo.getDB("www");/**** Get collection / table from 'testdb' ****/// if collection doesn't exists, MongoDB will create it for youDBCollection table = db.getCollection("person");DBCursor dbObjects;Long cnt=table.count();//System.out.println(table.getStats());Long page=getPageSize(cnt,pageSize);ObjectId lastIdObject=new ObjectId("5bda8f66ef2ed979bab041aa");for(Long i=0L;i<page;i++) {Long start=System.currentTimeMillis();dbObjects=getCursorForCollection(table, lastIdObject, pageSize);System.out.println("第"+(i+1)+"次查询,耗时:"+(System.currentTimeMillis()-start)/1000+"秒");List<DBObject> objs=dbObjects.toArray();lastIdObject=(ObjectId) objs.get(objs.size()-1).get("_id");}} catch (UnknownHostException e) {e.printStackTrace();} catch (MongoException e) {e.printStackTrace();}}public static DBCursor getCursorForCollection(DBCollection collection,ObjectId lastIdObject,int pageSize) {DBCursor dbObjects=null;if(lastIdObject==null) {lastIdObject=(ObjectId) collection.findOne().get("_id"); //TODO 排序sort取第一个,否则可能丢失数据}BasicDBObject query=new BasicDBObject();query.append("_id",new BasicDBObject("$gt",lastIdObject));BasicDBObject sort=new BasicDBObject();sort.append("_id",1);dbObjects=collection.find(query).limit(pageSize).sort(sort);return dbObjects;}public static Long getPageSize(Long cnt,int pageSize) {return cnt%pageSize==0?cnt/pageSize:cnt/pageSize+1;}}4.一些经验教训1. 不小心漏打了一个$符号,导致查询不到数据,浪费了一些时间去查找原因query.append("_id",new BasicDBObject("$gt",lastIdObject));2.创建索引创建普通的单列索引:db.collection.ensureIndex({field:1/-1});  1是升续 -1是降续实例:db.articles.ensureIndex({title:1}) //注意 field 不要加""双引号,否则创建不成功查看当前索引状态: db.collection.getIndexes();实例:db.articles.getIndexes();删除单个索引db.collection.dropIndex({filed:1/-1});      3.执行计划 db.student.find({"name":"dd1"}).explain() 参考文献:【1】https://github.com/phutchins/logstash-input-mongodb/blob/master/lib/logstash/inputs/mongodb.rb【2】https://www.cnblogs.com/yxlblogs/p/4930308.html【3】https://docs.mongodb.com/manual/reference/method/db.collection.ensureIndex/
15天玩转redis —— 第一篇 开始入手
 双十一终于还是过去了,我负责的mongodb由于做了副本集,最终还是挺过去了,同事负责的redis,还是遗憾的在早上8点左右宕机了,然后大家就是马不停蹄的赶往公司解决问题,因为我对redis也不是很了解,工作上使用redis的时候也是应付的找找资料,所以没有从系统层次上了解redis,准备用这个系列来整理整理自己所了解的redis。一:Redis是什么?这个我想怎么总结呢,突然发现再好的解释也没有redis官网解释的好,它的解释已经达到超宇宙的级别了。。。不信你可以看看。 人家也说了,redis是个内存存储的数据结构服务器,这个听起来有多么牛逼啊。。。。一说到数据结构,第一反映就会想到C#中那些dictionary,hashset,list,SortDictionary等等。。。然后你也会想到这些数据结构有如下一些缺点。比如:1.  dictionary不能在多台机器中共享内存,除非你用wcf把dictionary单独包装起来作为一个服务。2.  不能序列化到硬盘,除非你自己写很多的序列化硬盘代码,而且还要保证数据不丢失。 现在可以明确的告诉你,这些对redis来说都不是问题,因为它就是为解决这些问题而生的。。。 什么list,hashset,dictionary。。。redis里面都有,实际用途上面也说了redis可以用做database,cache and messagebroker。。。 二:如何下载    前面都是一些大道理,我们只要抓住几个关键字就可以了,说的再好也没用,怎么玩才是最重要的,在http:/redis.io/download页面中,你会惊讶的发现,真他娘的奇葩。。。居然不支持window平台。这下没办法了,微软开源组织实现了扩展版,这个当然我不想用了哦,毕竟不是redis官方的,既然不想用,我得下载个VMWare+CentOS。。。 具体怎么下载这两样,我就不详细说了。 1. 使用命令其实reids官网上已经详细教我们怎么下载,编译和运行了,我们按照步骤一步一步来就好了。 前奏的几个步骤之后,你可以清楚的看到redis的安装包,解压包, 完了之后你再执行一下reids-server来开启一下服务端,你会欣喜的看到,马丹的,redis跑起来啦,很easy的事情嘛~~~ 2. 如何交互 redis官网也说了,使用redis-cli来进行交互,接下来我们试试就好咯。。。 好了,一切都是顺意民意,第一篇大概就这么说了,我想到现在为止,你应该知道怎么下载,安装和简单使用了,下一篇我们开始使用各种命令来玩转它。 
15天玩转redis —— 第二篇 基础的字符串类型
        我们都知道redis是采用C语言开发,那么在C语言中表示string都是采用char[]数组的,然后你可能会想,那还不简单,当我执行如下命令,肯定是直接塞给char[]数组的。如果你真的这么想的话,会有几个问题就要过来砍你了,先我们来找一个redis手册,http://doc.redisfans.com/ 第一:如果你每次都执行Append函数,那是不是redis的char[]每次都需要再次扩容,这样是不是每次都是耗时操作呢?第二:如果你每次执行String中的StrLen,那redis底层是不是每次都要遍历char数组来得到结果呢? 一: 探索Redis中的String是如何存储的       根据上面说的那些小情况,所以redis的作者没有那么傻,正常的逻辑应该是在char[]数组的层面上自己再来封装一层,你说对不对??? 1. SDS结构体    在redis里面是采用SDS(simple dynamic string)来封装char[]的,这个也是redis存储的最小单元,下一个问题就是哪里能看得到呢?我在wget压缩包的时候,里面就有redis源码啦,据说还只有3w多行,这就告诉我们,有什么问题,自己动手丰衣足食,对吧,为查找方便,我就把redis的源码拖到window上用vs打开,接下来我们看看SDS长成啥样??? 可以看到它是定义在redis源码中的sds.h源文件中的,你可能会奇怪,这三个属性是干嘛用的???下面我简单说一下。<1> len: 标记char[]的长度, 有点类似我们C#中List的length一个意思。<2> free: 标记char[]中未使用的元素个数,就是有几个空坑的意思。<3>buf[]:存放元素的坑,不一定和元素的实际个数相等,比如前面说的cnblogs。也有可能是[c][n][b][l][o][g][s][/0][][][]。 二:探索Redis对象(RedisObject)     前面说到的SDS仅仅是char[]数组的封装,并不能标识redis中的5大类型,所以可想而知,redis还需要在SDS上面进行封装,所以就有了接下来的RedisObject对象,我们先看看它长成啥样。可以看到RedisObject是在redis.h源代码文件中的,下面我简单说说type和ptr属性,详细的东西在后续说。 <1> type   这个就是用来标识redisObject是哪种类型,既然是哪种类型,肯定就有一个类型枚举,对吧,肯定有了,给你看看。<2> *ptr  可以看到这玩意还是个指针类型,它所指向的内存地址,你应该也知道了,就是所谓的SDS枚举类型。 好了,到现在你可以整合一下博客开始处的:127.0.0.1:6379> set name cnblogsOK127.0.0.1:6379> get name"cnblogs"127.0.0.1:6379>针对上面的set命令,redis其实会创建两个RedisObject对象, 键的RedisObject 和 值的RedisOjbect 其中它们的type=REDIS_STRING ,也就都是字符串对象类型,其中的SDS分别存储的就是name和cnblogs的字符咯,好了,大概就这样了。 三:挑选几个有意思的命令1. incr,incrby,decr,decrby这四个命令有点像C#中的Interlocked类的方法,如果你了解Interlocked,你应该就知道下面有各种原子自增,自减等等方法,如下图: redis这个自增有什么好处呢?我觉得用这个生成订单号还是蛮好的,我记得在携程的时候,生成订单号是专门的一个OrderIDDB中的func函数来生成的,这样OrderID是不依赖于任何业务库的,然后我们就可以相对方便的分库分表了,现在用redis这样做也挺好的。其他的一些命令也没什么好说的了,大家可以对照redis手册看一看就好了,就此结束,准备去公司了。 
15天玩转redis —— 第三篇 无敌的列表类型
 据说60%的人使用redis看重的是redis中的list类型,那这个list有什么用呢???不用我说大家都明白,做队列使用呗,为什么用它呢,很简单呗,因为有了它我就不需要专门的MQ产品啦,比如说RabbitMQ,ActiveMQ等等。。。对吧。 一:实战     先我们还是看一下List列表给我们提供的方法。这些方法还是稀里糊涂的有一些的,没关系,做队列使用的话,常用的也就四个:LPOP,LPUSH,RPOP,RPUSH,从这四个单词上面,你应该就明白这有点像数据结构中的“双端队列”,对吧,既然我可以在左边Pop或者Push,又可以在右边Pop或者Push,那这样的话,我又可以把List做成队列或者堆栈,哈哈,是不是很有意思,下面我举个例子: 我向List的左边顺序的塞入10,20,30,40,50,然后从队列的另一头依次输出10,20,30,40,50。 对了,我就说一下在我们目前的项目中使用list的一些场景吧。1.   由于项目中使用了大量的wcf,导致配置过多,维护和更新异常繁琐,基于这种情况,我们把wcf可以异步处理的所有请求都丢到了redis的List中去,      这样下来之后,web站点的config配置清爽的不要不要的。2.  还有一个业务就是我们做的淘宝订单催付,付款提醒,签收提醒,收货提醒 等等都是采用轮询List的方式,大大降低了代码复杂量。 好了,这个大概就是list的使用场景,既然这么牛逼的不要不要的,你肯定会好奇,这吊毛是怎么实现的??? 下面我简单的扯一扯。。。 二:探索原理结合上面说的那么多,你可能会觉得这个List也许就是C#中的那个List实现吧。。。如果你这样想,那就说明你看问题比较付肤浅了哦,其实list的源代码是在adlist.c中,如下所示。 是不是简单的一吊,如果你学过数据结构中的链表,我想你一看便懂:<1> listNode  很明显这是一个node节点,可以看出它有一个prev指针和一个next指针,分别指向节点的前驱和后继,然后还有一个void* 这个类型的value,它存放的就是上一篇我们所说的SDS类型的枚举。<2> list这个list蛮有意思的一点就是,里面有一个head和tail节点,可想而知,tail存放的是list的尾节点,有了这个节点就说明什么呢?说明你删除尾节点的复杂度是O(1),同样有了这个head,你删除头节点同样也是O(1)。这就有了刚才说的LPush,LPop,RPush,RPop,是的吧,同时list里面还有一个len属性,是记录当前list的元素个数,这样的话,你统计list的个数也是O(1)的,对吧。 还记得上一篇所说的RedisObject吧,里面有一个ptr指针,它指向的就是本篇的list,好了,根据种种总结,我应该可以画出如下的图: 大概就是这样的了,洗洗睡啦,么么哒~~~ 
15天玩转redis —— 第四篇 哈希对象类型
redis中的hash也是我们使用中的高频数据结构,它的构造基本上和编程语言中的HashTable,Dictionary大同小异,如果大家往后有什么逻辑需要用Dictionary存放的话,可以根据场景优先考虑下redis哦,起码可以装装逼嘛,现在我默认你已经有装逼的冲动了,打开redis手册,看看有哪些我们用得到的装逼方法。 一:常用方法只要是一个数据结构,最基础的永远是CURD,redis中的insert和update,永远只需要set来替代,比如下面的Hset,如下图:前面几篇文章我都没有挑选一个方法仔细讲解,其实也没什么好讲解的,就好似C#中的一个类的一个方法而已,知道传递一些啥参数就OK了,就比如要说的HSet,它的格式如下:接下来我在CentOS里面操作一下,[administrator@localhost redis-3.0.5]$ src/redis-cli127.0.0.1:6379> clear127.0.0.1:6379> hset person name jack(integer) 1127.0.0.1:6379> hset person age 20(integer) 1127.0.0.1:6379> hset person sex famale(integer) 1127.0.0.1:6379> hgetall person1) "name"2) "jack"3) "age"4) "20"5) "sex"6) "famale"127.0.0.1:6379> hkeys person1) "name"2) "age"3) "sex"127.0.0.1:6379> hvals person1) "jack"2) "20"3) "famale"127.0.0.1:6379>或许有人看了上面的console有一点疑惑,那就是前面有几个参数,比如person,name啦,然后才是value,如果你看了第一篇的话,你大概就明白了,其实在redis的这个层面,它永远只有一个键,一个值,这个键永远都是字符串对象,也就是SDS对象,而值的种类就多了,有字符串对象,有队列对象,还有这篇的hash对象,往后的有序集合对象等等,如果你还不明白的话,转化为C#语言就是。1 var person=new Dictionary<string,string>();2 person.Add("name","jack");3 ....调用方法就是这么的简单,关键在于时不时的需要你看一看手册,其实最重要的是了解下它在redis源码中的原理就好了。 二:探索原理hash的源代码是在dict.h源代码里面,枚举如下:typedef struct dictEntry {void *key;union {void *val;uint64_t u64;int64_t s64;double d;} v;struct dictEntry *next;} dictEntry;typedef struct dictType {unsigned int (*hashFunction)(const void *key);void *(*keyDup)(void *privdata, const void *key);void *(*valDup)(void *privdata, const void *obj);int (*keyCompare)(void *privdata, const void *key1, const void *key2);void (*keyDestructor)(void *privdata, void *key);void (*valDestructor)(void *privdata, void *obj);} dictType;/* This is our hash table structure. Every dictionary has two of this as we* implement incremental rehashing, for the old to the new 0. */typedef struct dictht {dictEntry **table;unsigned long size;unsigned long sizemask;unsigned long used;} dictht;typedef struct dict {dictType *type;void *privdata;dictht ht[2];long rehashidx; /* rehashing not in progress if rehashidx == -1 */int iterators; /* number of iterators currently running */} dict;/* If safe is set to 1 this is a safe iterator, that means, you can call* dictAdd, dictFind, and other functions against the dictionary even while* iterating. Otherwise it is a non safe iterator, and only dictNext()* should be called while iterating. */typedef struct dictIterator {dict *d;long index;int table, safe;dictEntry *entry, *nextEntry;/* unsafe iterator fingerprint for misuse detection. */long long fingerprint;} dictIterator;上面就是我们使用hash的源代码数据结构,接下来我来撸一撸其中的逻辑关系。 1. dict结构1 typedef struct dict {2 dictType *type;3 void *privdata;4 dictht ht[2];5 long rehashidx; /* rehashing not in progress if rehashidx == -1 */6 int iterators; /* number of iterators currently running */7 } dict;这个结构是hash的真正的底层数据结构,可以看到其中有5个属性。 <1> dictType *type可以看到它的类型是dictType,从上面你也可以看到,它是有枚举结构定义的,如下:1 typedef struct dictType {2 unsigned int (*hashFunction)(const void *key);3 void *(*keyDup)(void *privdata, const void *key);4 void *(*valDup)(void *privdata, const void *obj);5 int (*keyCompare)(void *privdata, const void *key1, const void *key2);6 void (*keyDestructor)(void *privdata, void *key);7 void (*valDestructor)(void *privdata, void *obj);8 } dictType; 从上面这个数据结构中你可以看到里面都是一些方法,但是有一个非常重要的方法,那就是第一个hashFunction,可以看到它就是计算hash值的,跟C#中的dictionary中求hash值一样一样的。 <2> dictht ht[2]       你可能会疑问,为什么这个属性是2个大小的数组呢,其实正真使用的是ht[0],而ht[1]是用于扩容hash表时的暂存数组,这一点也很奇葩,同时也很精妙,redis为什么会这么做呢???仔细想想你可能会明白,扩容有两种方法,要么一次性扩容,要么渐进性扩容,后面这种扩容是什么意思呢?就是我在扩容的同时不影响前端的CURD,我慢慢的把数据从ht[0]转移到ht[1]中,同时rehashindex来记录转移的情况,当全部转移完成之后,将ht[1]改成ht[0]使用,就这么简单。 2. dicth结构1 typedef struct dictht {2 dictEntry **table;3 unsigned long size;4 unsigned long sizemask;5 unsigned long used;6 } dictht;<1> dictEntry **table;       从上面这个结构体中,你可以看到一个非常重要的属性: dictEntry **table, 其中table是一个数组,数组类型是dictEntry,既然是一个数组,那后面的三个属性就好理解了,size是数组的大小,sizemask和数组求模有关,used记录数组中已使用的大小,现在我们把注意力放在dictEntry这个数组实体类型上面。 3. dictEntry结构1 typedef struct dictEntry {2 void *key;3 union {4 void *val;5 uint64_t u64;6 int64_t s64;7 double d;8 } v;9 struct dictEntry *next;10 } dictEntry;从这个数据结构上面你可以看到有三个大属性。第一个就是:   *key:它就是hash表中的key。第二个就是:    union的*val 就是hash的value。第三个就是:    *next就是为了防止hash冲突采用的挂链手段。这个原理和C#中的Dictionary还是一样一样的。 不知道你看懂了没有,如果总结上面描述的话,我可以画出如下的hash结构图。好了,就此打住,去公司了。 
15天玩转redis —— 第五篇 集合对象类型
这篇我们来看看Redis五大类型中的第四大类型:“集合类型”,集合类型还是蛮有意思的,第一个是因为它算是只使用key的Dictionary简易版,这样说来的话,它就比Dictionary节省很多内存消耗,第二个是因为它和C#中的HashSet是一个等同类型,废话不多说,先看redis手册,如下:上面就是redis中的set类型使用到的所有方法,还是老话,常用的方法也就那么四个(CURD)。。。 一: 常用方法1. SAdd这个方法毫无疑问,就是向集合里面添加数据,比如下面这样,我往fruits集合里面添加喜爱的水果。127.0.0.1:6379> sadd fruits apple(integer) 1127.0.0.1:6379> sadd fruits banana(integer) 1127.0.0.1:6379> smembers fruits1) "banana"2) "apple"127.0.0.1:6379> 上面这个sadd你也看到了,我往集合里面成功添加了两个元素,现在你可能不满足这么简单的添加,你或许想知道set这个集合在redis底层是使用什么来实现的,你可以用object encoding查看一下便知:127.0.0.1:6379> object encoding fruits"hashtable"127.0.0.1:6379> 看到了吧,是hashtable这个吊毛,现在闭上眼睛都能想到,肯定就是只用key的dictionary啦,对不对,如果你还有疑问的话,我还可以找到底层代码给你看,好不啦??? 有没有看到dictAdd方法,而其中的第三个参数正好是Null。。。对应着*val形参,你看牛叉不牛叉。。。然后我再带你看看dictAdd方法的定义。好了,关于hashtable的实现理论,我在上一篇文章中也已经说过了,这里就不再赘叙了。 2. SPOP,SMEMBERS    既然元素进来了,总不能不出来吧,这里的第一个SPOP:移除并返回集合中的一个随机元素,有一点奇怪的是,这种奇怪的方法其实在我们C#中的HashSet并没有好办法解决,就比如”这个随机“就有点烦人了,下面这是我能想到的方法。 刚才随便插了一句话,下面我们继续SAdd,再SPop出来。127.0.0.1:6379> sadd fruits pear(integer) 1127.0.0.1:6379> sadd fruits grape(integer) 1127.0.0.1:6379> sadd fruits chestnut(integer) 1127.0.0.1:6379> smembers fruits1) "grape"2) "pear"3) "banana"4) "apple"5) "chestnut"127.0.0.1:6379> spop fruits"apple"127.0.0.1:6379> spop fruits"chestnut"127.0.0.1:6379> smembers fruits1) "grape"2) "pear"3) "banana"127.0.0.1:6379>这个方法确实还是蛮好的,起码它是原子性操作,如果要我自己实现的话,起码还是要10行左右代码的。 3. SREM    既然说到了CURD,那怎么能少了D呢,它的功能定义就是:移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略,下面我随便举个例子,删除fruits中的pear。127.0.0.1:6379> smembers fruits1) "grape"2) "pear"3) "banana"127.0.0.1:6379> srem fruits pear(integer) 1127.0.0.1:6379> smembers fruits1) "grape"2) "banana"127.0.0.1:6379>    好了,常用的操作就那么几个,是不是觉得好傻瓜哦。。。傻瓜就对了,方法是简单的,关键你需要了解这个方法底层是如何实现的,这样才能做到心里有数,就比如Set函数,它的源代码全部都在 “t.set.c” 中。1 /*2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>3 * All rights reserved.4 *5 * Redistribution and use in source and binary forms, with or without6 * modification, are permitted provided that the following conditions are met:7 *8 * * Redistributions of source code must retain the above copyright notice,9 * this list of conditions and the following disclaimer.10 * * Redistributions in binary form must reproduce the above copyright11 * notice, this list of conditions and the following disclaimer in the12 * documentation and/or other materials provided with the distribution.13 * * Neither the name of Redis nor the names of its contributors may be used14 * to endorse or promote products derived from this software without15 * specific prior written permission.16 *17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE27 * POSSIBILITY OF SUCH DAMAGE.28 */2930 #include "redis.h"3132 /*-----------------------------------------------------------------------------33 * Set Commands34 *----------------------------------------------------------------------------*/3536 void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op);3738 /* Factory method to return a set that *can* hold "value". When the object has39 * an integer-encodable value, an intset will be returned. Otherwise a regular40 * hash table. */41 robj *setTypeCreate(robj *value) {42 if (isObjectRepresentableAsLongLong(value,NULL) == REDIS_OK)43 return createIntsetObject();44 return createSetObject();45 }4647 int setTypeAdd(robj *subject, robj *value) {48 long long llval;49 if (subject->encoding == REDIS_ENCODING_HT) {50 if (dictAdd(subject->ptr,value,NULL) == DICT_OK) {51 incrRefCount(value);52 return 1;53 }54 } else if (subject->encoding == REDIS_ENCODING_INTSET) {55 if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) {56 uint8_t success = 0;57 subject->ptr = intsetAdd(subject->ptr,llval,&success);58 if (success) {59 /* Convert to regular set when the intset contains60 * too many entries. */61 if (intsetLen(subject->ptr) > server.set_max_intset_entries)62 setTypeConvert(subject,REDIS_ENCODING_HT
15天玩转redis —— 第六篇 有序集合类型
      今天我们说一下Redis中最后一个数据类型 “有序集合类型”,回首之前学过的几个数据结构,不知道你会不会由衷感叹,开源的世界真好,写这些代码的好心人真的要一生平安哈,不管我们想没想的到的东西,在这个世界上都已经存在着,曾几何时,我们想把所有数据按照数据结构模式组成后灌输到内存中,然而为了达到内存共享的方式,不得不将这块内存包装成wcf单独部署,同时还要考虑怎么序列化,何时序列互的问题,烦心事太多太多。。。后来才知道有redis这么个吊毛玩意,能把高级的,低级的数据结构单独包装到一个共享内存中(Redis),高级的数据结构,就是本篇所说的 “有序集合”,和C#中的SortDictionary相对应,下面我来具体聊一聊。       一: 有序集合(SortedSet)可能有些初次接触SortedSet集合的人可能会说,这个集合的使用场景都有哪些??? 我可以明确的告诉你:“范围查找“的天敌就是”有序集合“,任何大数据量下,查找一个范围的时间复杂度永远都是 O[(LogN)+M],其中M:返回的元素个数。 为了从易到难,我们还是先看一下redis手册,挑选几个我们常用的方法观摩观摩效果。。。 从上面17个命令中,毫无疑问,常用的命令为ZADD,ZREM,ZRANGEBYSCORE,ZRANGE。 1. ZADDZADD key score member [[score member] [score member] ...]将一个或多个 member 元素及其 score 值加入到有序集 key 当中。这个是官方的解释,赋值方式和hashtable差不多,只不过这里的key是有序的而已。下面我举个例子:我有一个fruits集合,其中记录了每个水果的price,然后我根据price的各种操作来获取对应的水果信息。 有了上面的基本信息,接下来我逐一送他们到SortedSet中,如下图:从上面的图中,不知道你有没有发现到什么异常???至少有两种。<1> 浮点数近似值的问题,比如grape,我在add的时候,写明的是2.8,在redis中却给我显示近似值2.79999....,这个没关系,本来就是这样。<2>  默认情况下,SortedSet是以key的升序排序的方式进行存放。 2.  ZRANGE,ZREVRANGEZRANGE key start stop [WITHSCORES]返回有序集 key 中,指定区间内的成员。其中成员的位置按 score 值递增(从小到大)来排序。  上面就是ZRange的格式模版,前面我在说ZAdd的时候其实我也已经说了,但是这个不是重点,在说ZAdd的时候留下了一个问题就是ZRange默认是按照key升序排序的,对吧,那如果你想倒序显示的话,怎么办呢???其实你可以使用ZRange的镜像方法ZREVRANGE 即可,如下图: 3. ZRANGEBYSCOREZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。 这个算是对SortedSet来说最最重要的方法了,文章开头我也说了,有序集合最利于范围查找,既然是查找,你得有条件对吧,下面我举个例子:   <1>  我要找到1-4块钱的水果种类,理所当然,我会找到【葡萄,苹果】,如下图:1 127.0.0.1:6379> zrangebyscore fruits 1 4 withscores2 1) "grape"3 2) "2.7999999999999998"4 3) "apple"5 4) "3.5"6 127.0.0.1:6379>   <2>  我要找到1-4区间中最接近4块的水果是哪个??? 这个问题就是要找到apple这个选项,那如果找到呢??? 仔细想想我可以这么做,    将1-4区间中的所有数倒序再取第一条数据即可,对吧,如下图:127.0.0.1:6379> zrevrangebyscore fruits 4 1 withscores1) "apple"2) "3.5"3) "grape"4) "2.7999999999999998"127.0.0.1:6379> zrevrangebyscore fruits 4 1 withscores limit 0 11) "apple"2) "3.5"127.0.0.1:6379> 4. ZREMZREM key member [member ...]移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。当 key 存在但不是有序集类型时,返回一个错误。跟其他方法一样,zrem的目的就是删除指定的value成员,比如这里我要删除scores=3.5 的 apple记录。127.0.0.1:6379> zrem fruits apple(integer) 1127.0.0.1:6379> zrange fruits 0 -1 withscores1) "grape"2) "2.7999999999999998"3) "pear"4) "4.0999999999999996"5) "banana"6) "5"7) "nut"8) "9.1999999999999993"127.0.0.1:6379>你会发现,已经没有apple的相关记录了,因为已经被我删除啦。。。 二:探索原理简单的操作都已经演示完毕了,接下来探讨下sortedset到底是由什么数据结构支撑的,大家应该早有耳闻,sortedset在CURD的摊还分析上都是Log(N)的复杂度,可以与平衡二叉树媲美,它就是1987年才出来的新型高效数据结构“跳跃表(SkipList)”,SkipList牛逼的地方在于跳出了树模型的思维,用多层链表的模式构造了Log(N)的时间复杂度,层的高度增加与否,采用随机数的模式,这个和 ”Treap树“ 的思想一样,用它来保持”树“或者”链表”的平衡。     详细的我就不说了哈,不然的话又是一篇文章啦,如果非要了解的话,大家可以参见一下百度百科:http://baike.baidu.com/link?url=I8F7TW933ZjIeBea_-dW9KeNsfKXMni0IdwNB10N1qnVfrOh_ubzcUpgwNVgRPFw3iCkhewGaYjM_o51xchS8a 我大概看了下百科里面画的这张图,就像下面这样:这幅图中有三条链,对吧,在SkipList中是必须要保证每条链中的数据必须有序才可以,这是必须的。 1. 如果要在level1层中找到节点6,那么你需要逐一遍历,需要6次查找才能正确的找到数据。2. 如果你在level2层中找到节点6的话,那么你需要4次才能找到。3. 如果你在level3层中找到节点6的话,那么你需要3次就可以找到。。。。 现在宏观理解上,是不是有一种感觉,如果level的层数越高,相对找到数据需要遍历的次数就越少,对吧,这就是跳跃表的思想,不然怎么跳哈,接下来我们来看看redis中是怎么定义这个skiplist的,它的源码在redis.h 中:1 /* ZSETs use a specialized version of Skiplists */2 typedef struct zskiplistNode {3 robj *obj;4 double score;5 struct zskiplistNode *backward;6 struct zskiplistLevel {7 struct zskiplistNode *forward;8 unsigned int span;9 } level[];10 } zskiplistNode;1112 typedef struct zskiplist {13 struct zskiplistNode *header, *tail;14 unsigned long length;15 int level;16 } zskiplist;从源码中可以看出如下几点: <1>   zskiplistnode就是skiplist中的node节点,节点中有一个level[]数组,如果你够聪明的话,你应该知道这个level[]就是存放着上图中         的 level1,level2,level3 这三条链。 <2>   level[]里面是zskiplistLevel实体,这个实体中有一个 *forward指针,这个指针就是指向同层中的后续节点。 <3>   在zskiplistLevel中还有一个 robj类型的*obj指针,这个就是RedisObject对象哈,里面存放的就是我们的value值,接下来还有一个         score属性,这个就是key值啦。。。skiplist就是根据它来进行排序的哈。 <4>  接下来就是第二个枚举zskiplist,这个没什么意思,纯粹的包装层,比如里面的length是记录skiplist中的节点个数,level记录skiplist 当前的层数,用*header,*tail记录skiplist中的首节点和尾节点。。。仅此而已。。。 好了,不管你听懂没听懂,大体上就这样了。。。上班去啦~~~ 
15天玩转redis —— 第七篇 同事的一次缓存操作引起对慢查询的认识
      上个星期同事做一个业务模块,需要将一个80M的数据存入到redis缓存中,想法总是好的,真操作的时候遇到了HSet超时,我们使用的是C#的StackExchange.Redis驱动。<redisCacheClient allowAdmin="true" ssl="false" connectTimeout="5000" abortConnect="false" database="0"><hosts><add host="192.168.202.129" cachePort="6379"/></hosts></redisCacheClient>       由于封装代码啥的都是网上下载的,第一反应就是把上面的“connectTimeout”设为5000 * 60 =3min,但是丝毫没有用处,也就是3min没有起到作用,码蛋的,这可怎么办???只能硬着头皮去看StackExchange的源码,终于在1个小时的地毯式搜索中找到了两处凶杀现场,如下所示:  接着我发现其中的 timeoutMilliseconds 和  this.multiplexer.RawConfig.ResponseTimeout的取值决定着是否会抛异常,感谢感谢,接下来我继续顺藤摸瓜,找到了两个属性的赋值处。当我看到了上面的syncTimeout.GetValueOrDefault(1000)的时候一颗悬着的心也就放下了,也懒得改了,直接将这里的1000改成1000*60*5就好啦,commit代码后让同事再运行下看看效果。。。终于拨开迷雾见青天,数据出来啦,遗憾的是,读写操作需要耗时3s,虽然问题表面上看似解决了,但是问题来了,3s延时真的不是什么好事情,我们都知道redis是单线程的,那就意味着什么??? 意味着这3s的时间内其他redis客户端是阻塞的。。。虽然心里是这么想的,但是还是存有一点侥幸心理觉得不是这样的,不过还是决定做一个实验看一看。 一:阻塞演示我决定开一个线程将一个txt中140M的数据插入到redis的hashset中,同时我开另一个线程1秒钟一次的从string中获取数据,同时记录下获取时间,如果获取string的时间间隔太大,说明阻塞产生了,想法就是这样,说干就干。。。1 System.Threading.Tasks.Task.Factory.StartNew(() =>2 {3 try4 {5 var content = File.ReadAllText(@"D:20151120-1320151120-13.log", Encoding.Default);67 Console.WriteLine("主线程 读取txt内容完毕,准备插入redis {0}", DateTime.Now);8 var result = CacheUtil.CacheClient.HashSet("myredis", "mykey", content);910 Console.WriteLine("主线程 插入Redis 成功:{0} , {1}", result, DateTime.Now);1112 var r = CacheUtil.CacheClient.HashGet<string>("myredis", "mykey");1314 Console.WriteLine("主线程,读取Redis成功,总长度{0}, {1}", r.Length, DateTime.Now);15 }16 catch (Exception ex)17 {18 Console.WriteLine(ex.Message);19 }20 });2122 System.Threading.Tasks.Task.Factory.StartNew(() =>23 {24 try25 {26 var result = CacheUtil.CacheClient.Add<string>("myfruits", "asdfasdfasdfasdfasd");2728 for (int i = 0; i < 10; i++)29 {30 var content = CacheUtil.CacheClient.Get<string>("myfruits");3132 Console.WriteLine("第 {0} 次读取 {1} ,{2}", i, content, DateTime.Now);3334 Thread.Sleep(1000);35 }36 }37 catch (Exception ex)38 {39 Console.WriteLine(ex.Message);40 }41 });读取string的线程被Hashset阻塞了6s之多,很恐怖,这个就属于典型的慢查询,它的慢果然阻塞了其他client,接下来就拿着问题找同事,第一个想法就是问同事为什么要存这么大的数据,得知为了避免海量运算必须要存这么大数据之后,没撤只能从假定80M的数据量做优化,第二个想法就是拆,既然是80M的数据,我可以拆成8份10M的数据,这样有两个好处,第一个不会太拖长Hset的时间,第二个尽最大努力不阻塞其他client,但是呢,同事不想改动代码,还要问我还有其他解决方案不???然后我就提了一个下下策,隔离你的缓存业务,既然你都是存储大数据,那我专门给你开一个redis去存储缓存,几秒钟就几秒钟吧,估计对他业务还能够承受,我可不能让我的主redis因为这个吊毛业务挂了。。。 二:慢查询日志自从发生这个事情之后,我就有一个想法了,我是不是也需要记录一下主redis中那些“慢操作”的命令,然后找到相关的业务方,不然的话,阻塞就不好玩了。然后就直接在redis手册中就找到了相关的命令。SLOWLOG subcommand [argument]什么是 SLOWLOGSlow log 是 Redis 用来记录查询执行时间的日志系统。查询执行时间指的是不包括像客户端响应(talking)、发送回复等 IO 操作,而单单是执行一个查询命令所耗费的时间。另外,slow log 保存在内存里面,读写速度非常快,因此你可以放心地使用它,不必担心因为开启 slow log 而损害 Redis 的速度。设置 SLOWLOGSlow log 的行为由两个配置参数(configuration parameter)指定,可以通过改写 redis.conf 文件或者用 CONFIG GET 和 CONFIG SET 命令对它们动态地进行修改。第一个选项是 slowlog-log-slower-than ,它决定要对执行时间大于多少微秒(microsecond,1秒 = 1,000,000 微秒)的查询进行记录。比如执行以下命令将让 slow log 记录所有查询时间大于等于 100 微秒的查询:CONFIG SET slowlog-log-slower-than 100而以下命令记录所有查询时间大于 1000 微秒的查询:CONFIG SET slowlog-log-slower-than 1000另一个选项是 slowlog-max-len ,它决定 slow log 最多能保存多少条日志, slow log 本身是一个 FIFO 队列,当队列大小超过 slowlog-max-len 时,最旧的一条日志将被删除,而最新的一条日志加入到 slow log ,以此类推。以下命令让 slow log 最多保存 1000 条日志:CONFIG SET slowlog-max-len 1000从上面这段话中,大概看出了两个属性:  slowlog-log-slower-than  和 slowlog-max-len,为了测试方便,我就不config set了,直接改掉redis.conf文件即可。。。# The following time is expressed in microseconds, so 1000000 is equivalent# to one second. Note that a negative number disables the slow log, while# a value of zero forces the logging of every command.slowlog-log-slower-than 0# There is no limit to this length. Just be aware that it will consume memory.# You can reclaim memory used by the slow log with SLOWLOG RESET.slowlog-max-len 10 然后我简单测试一下,所有command都会被记录到slowlog里面去了,下图中的红色框框就是comand的执行时间。 有了这个,我现在是不是可以找到所有生产线上哪些慢的command命令呢???这样大家就不会扯皮了。。。最后我们简单看下他们的源码,从源码中你可以看到其实slowlog是用List实现的,而我们也知道在Redis中List是用“双向链表”实现的。 
15天玩转redis —— 第八篇 你不得不会的事务玩法
 我们都知道redis追求的是简单,快速,高效,在这种情况下也就拒绝了支持window平台,学sqlserver的时候,我们知道事务还算是个比较复杂的东西,所以这吊毛要是照搬到redis中去,理所当然redis就不是那么简单纯碎的东西了,但是呢,事务是我们写程序无法逃避的场景,所以redis作者折衷的写了个简化版的事务机制,下面我来扯一下它的蛋蛋。 一: 事务实战具体到事务是什么,要保证什么。。。这个我想没必要说了,先不管三七二十一,看一下redis手册,领略下它的魔力。1. multi,exec   还记得sqlserver是怎么玩的吗?一般都是这样的三个步骤,生成事务,产生命令,执行事务,对吧,而对应redis呢??multi就是生成事务,然后输入redis命令,最后用exec执行命令,就像下面这样:可以看到,我set完命令之后,反馈信息是QUEUED,最后我再执行exec,这些命令才会真正的执行,就是这么的简单,一切执行的就是那么的顺利,一点都不拖泥带水,牛逼的不要不要的,可能有些人说,其实事务中还有一个rollback操作,但好像在redis中没有看到,哈哈,牛逼哈,很遗憾是redis中没有rollback操作,比如下面这样。 在图中我故意用lpush命令去执行string,可想而知自然不会执行成功,但从结果中,你看到什么了呢?两个OK,一个Error,这就是违反了事务的原子性,对吧,但是我该怎么反驳呢??? 我会说,错你妹啊。。。连个基本的命令都写错了,你搞个毛啊。。。还写个吊毛代码,reids仅仅是个数据结构服务器,多简单的一件事情,退一万步说,很明显的错误命令它会直接返回的,比如我故意把lpush写成lpush1: 2. watch不知道你看完multi后面的三条set命令之后,有没有一种心虚的感觉,怎么说呢,就是只要命令是正确的,redis保证会一并执行,誓死完成任务,虽然说命令是一起执行的,但是谁可以保证我在执行命令的过程中,其他client不会修改这些值呢???如果修改了这些值,那我的exec还有什么意义呢???没关系,这种烂大街的需求,redis怎可能袖手旁观???这里的watch就可以助你一臂之力。WATCHWATCH key [key ...]监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。上面就是redis手册中关于watch的解释,使用起来貌似很简单,就是我在multi之前,用watch去监视我要修改的key,如果说我在exec之前,multi之后的这段时间,key被其他client修改,那么exec就会执行失败,返回(nil),就这么简单,我还是来举个例子:  二:原理探索关于事务操作的源代码,大多都在redis源码中的multi.c 文件中,接下来我会一个一个的简单剖析一下:1. multi在redis的源代码中,它大概是这么写的:1 void multiCommand(redisClient *c) {2 if (c->flags & REDIS_MULTI) {3 addReplyError(c,"MULTI calls can not be nested");4 return;5 }6 c->flags |= REDIS_MULTI;7 addReply(c,shared.ok);8 }从这段代码中,你可以看到multi只是简单的把redisClient的REDIS_MULTI状态打开,告诉这个redis客户端已经进入事务模式了,对吧。 2. 生成命令在redisClient中,里面有一个multiState命令:typedef struct redisClient {。。。multiState mstate; /* MULTI/EXEC state */。。。} redisClient;从注释中你大概也看到了这个命令和multi/exec肯定有关系,接下来我很好奇的看看multiState的定义:typedef struct multiState {multiCmd *commands; /* Array of MULTI commands */int count; /* Total number of MULTI commands */int minreplicas; /* MINREPLICAS for synchronous replication */time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */} multiState;从multiState这个枚举中,你可以看到下面有一个*command命令,从注释中可以看到它其实指向的是一个数组,这个数组我想你闭着眼睛都能想得到吧。。。它就是你的若干条命令啦。。。下面还有一个count,可以看到是实际的commands的总数。 3. watch    为了方便说到后面的exec,这里想说一下watch大概是怎么实现的,在multi.c源代码中是这样写的。1 typedef struct watchedKey {2 robj *key;3 redisDb *db;4 } watchedKey;56 void watchCommand(redisClient *c) {7 int j;89 if (c->flags & REDIS_MULTI) {10 addReplyError(c,"WATCH inside MULTI is not allowed");11 return;12 }13 for (j = 1; j < c->argc; j++)14 watchForKey(c,c->argv[j]);15 addReply(c,shared.ok);16 }1718 /* Watch for the specified key */19 void watchForKey(redisClient *c, robj *key) {20 list *clients = NULL;21 listIter li;22 listNode *ln;23 watchedKey *wk;2425 /* Check if we are already watching for this key */26 listRewind(c->watched_keys,&li);27 while((ln = listNext(&li))) {28 wk = listNodeValue(ln);29 if (wk->db == c->db && equalStringObjects(key,wk->key))30 return; /* Key already watched */31 }32 /* This key is not already watched in this DB. Let's add it */33 clients = dictFetchValue(c->db->watched_keys,key);34 if (!clients) {35 clients = listCreate();36 dictAdd(c->db->watched_keys,key,clients);37 incrRefCount(key);38 }39 listAddNodeTail(clients,c);40 /* Add the new key to the list of keys watched by this client */41 wk = zmalloc(sizeof(*wk));42 wk->key = key;43 wk->db = c->db;44 incrRefCount(key);45 listAddNodeTail(c->watched_keys,wk);46 }这段代码中大概最核心的一点就是:/* This key is not already watched in this DB. Let's add it */clients = dictFetchValue(c->db->watched_keys,key);就是通过dicFetchValue这个字典方法,从watched_keys中找到指定key的value,而这个value是一个clients的链表,说明人家其实是想找到关于这个key的所有client,对吧,最后还会将本次key塞入到redisclient的watched_keys字典中,如下代码:/* Add the new key to the list of keys watched by this client */wk = zmalloc(sizeof(*wk));wk->key = key;wk->db = c->db;incrRefCount(key);listAddNodeTail(c->watched_keys,wk);如果非要画图,大概就是这样:其中watched_key是个字典结构,字典的键为上面的key1,key2。。。,value为client的链表,这样的话,我就非常清楚某个key中是被哪些client监视着的,对吧。 4.exec    这个命令里面大概做了两件事情:<1>:   判断c->flags=REDIS_DIRTY_EXEC 打开与否,如果是的话,取消事务discardTransaction(c),也就是说这个key已经          被别的client修改了。<2>:   如果没有修改,那么就for循环执行comannd[]中的命令,如下图中的两处信息: 好了,大概就这么说了,希望对你有帮助哈~~~ 
15天玩转redis —— 第九篇 发布/订阅模式
本系列已经过半了,这一篇我们来看看redis好玩的发布订阅模式,其实在很多的MQ产品中都存在这样的一个模式,我们常听到的一个例子就是邮件订阅的场景,什么意思呢,也就是说100个人订阅了你的博客,如果博主发表了文章,那么100个人就会同时收到通知邮件,除了这个场景还能找到其他场景么,当然有啦,你想想,如果你要在内存里面做一个读写分离的程序,为了维持数据的完整性,你是不是需要保证在写入的时候,也要分发到各个读内存的程序中呢?所以说场景还是很多的,在于你的挖掘~~~ 下面还是从基本命令入手: 一:命令简介从redis手册上面可以看到,其实“发布、订阅”模式才区区6个命令,下面听我一一解说下哈~~~ 1. subscribeSUBSCRIBE channel [channel ...]订阅给定的一个或多个频道的信息。     从上面的官方解释上来看,它的玩法有一点像现实生活中我们听收音机一个道理,要想听收音机,我们要做什么?肯定就是调频啦,只有在正确的频道上面,我们才能听得到好听的节目,所以说subscribe首先要订阅一个频道(channel),下面我举个例子,开两个client,分别订阅着msg 这个频道,比如下面这样: 2.publish   到现在为止,这两个subscibe都在监视着msg这个频道,接下来,如果msg频道有消息传出,必定会被subscribe接收到,先我们还是看看redis手册上怎么用这个命令。PUBLISH channel message将信息 message 发送到指定的频道 channel 。看到上面命令的用法,我也就放心了。看到么有,publish在msg这个频道上面发送消息后,被subscribe监视到了,然后就被分别打印输出了,好了,到现在为止,最基本的发布订阅模式就是这样,是不是很简单哈。。。其实呢??? 也就是这么简单呐,但是呢,有时候我们还有这样一个需求,就是我能不能模糊匹配key呢???举了例子,就是要求订阅china为前缀的所有频道,如果这样也可以做到的话,那确实是很牛逼啦。。。我要是回答的话,当然啦,强大的redis自然会做到这一点,它提供了的命令就是:Psubscribe。 3. PsubscribePSUBSCRIBE pattern [pattern ...]订阅一个或多个符合给定模式的频道。每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等), news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。看到上面的解释,你心里可能就在想,这不就是正则匹配么。。。而且前缀“P”就是Pattern的意思,对吧,接下来我就订阅一下所有china为前缀的channel。好了,最常用的也就是这三个命令,接下来我们简单分析一下代码。 二: 源码简单分析其实redis的发布订阅模式,使用RedisServer下面的 pubsub_channels字典 和 pubsub_patterns数组存放的,所有的操作代码都在pubsub.c文件下,如下图:1.   pubsub_channels       可以看到,它是一个字典结构,通过注释你应该明白,它的key为channel,value为list。 2.   pubsub_patterns      同样从注释中,你可以看到,其实它就是存放模式匹配的subscribe的clients列表,对吧,用一个list数组实现。 3.   subcribeCommand      通过下面的代码,你是不是在脑子里面很有轮廓了???其实这个pubsub_channels果然就是key=channel,value=list的存放模式,这个list就是所谓的clients列表,这样的话,你就知道了哪些key挂了哪些clients,对吧,如果再publish的话,只需要遍历一下这个list就知道结果了。 4.  publishCommand先前也说了,publish的原理很简单,就是找到字典中的channel这个key,获取到clients之后,遍历client的来发送信息。 同样的道理,pubsub_patterns也是差不多的实现,只要大家简单看一下pubsub.c这个源代码文件,差不多都会懂得,没啥好说的,希望这篇对你有用~
15天玩转redis —— 第十篇 对快照模式的深入分析
      我们知道redis是带有持久化这个能力了,那到底持久化成到哪里,持久化成啥样呢???这篇我们一起来寻求答案。 一:快照模式或许在用Redis之初的时候,就听说过redis有两种持久化模式,第一种是SNAPSHOTTING模式,还是一种是AOF模式,而且在实战场景下用的最多的莫过于SNAPSHOTTING模式,这个不需要反驳吧,而且你可能还知道,使用SNAPSHOTTING模式,需要在redis.conf中设置配置参数,比如下面这样:# Save the DB on disk:## save <seconds> <changes>## Will save the DB if both the given number of seconds and the given# number of write operations against the DB occurred.## In the example below the behaviour will be to save:# after 900 sec (15 min) if at least 1 key changed# after 300 sec (5 min) if at least 10 keys changed# after 60 sec if at least 10000 keys changed## Note: you can disable saving completely by commenting out all "save" lines.## It is also possible to remove all the previously configured save# points by adding a save directive with a single empty string argument# like in the following example:## save ""save 900 1save 300 10save 60 10000上面三组命令也是非常好理解的,就是说900指的是“秒数”,1指的是“change次数”,接下来如果在“900s“内有1次更改,那么就执行save保存,同样的道理,如果300s内有10次change,60s内有1w次change,那么也会执行save操作,就这么简单,看了我刚才说了这么几句话,是不是有种直觉在告诉你,有两个问题是不是要澄清一下: 1.  上面这个操作应该是redis自身进行的同步操作,请问是否可以手工执行save呢?   当然可以进行手工操作,redis提供了两个操作命令:save,bgsave,这两个命令都会强制将数据刷新到硬盘中,如下图: 2. 看上面的图,貌似bgsave是开启单独线程的,请问是吗?      确实如你所说,bgsave是开启次线程进行数据刷新的,不信的话我们来看看代码,它的代码是在rdb.c源文件中,如下:从上面的代码中,有没有看到一个重点,那就是fork方法,它就是一些牛人口中说的什么fork出一个线程,今天你也算终于看到了,其实redis并不是单纯的单线程服务,至少fork告诉我们,它在一些场景下也是会开启工作线程的,然后可以看到代码会在工作线程中执行同步的bgsave操作,就这么简单。  3. 能简单说下saveparams参数在redis源码中的逻辑吗?      可以的,其实在redis中有一个周期性函数,叫做serverCron,它会周期性启动,大概会做七件事情,如redis注释所说:/* This is our timer interrupt, called server.hz times per second.* Here is where we do a number of things that need to be done asynchronously.* For instance:** - Active expired keys collection (it is also performed in a lazy way on* lookup).* - Software watchdog.* - Update some statistic.* - Incremental rehashing of the DBs hash tables.* - Triggering BGSAVE / AOF rewrite, and handling of terminated children.* - Clients timeout of different kinds.* - Replication reconnection.* - Many more...** Everything directly called here will be called server.hz times per second,* so in order to throttle execution of things we want to do less frequently* a macro is used: run_with_period(milliseconds) { .... }*/int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {  上面的红色字体就是做了我们所关心的save操作,看过方法的注释,接下来我们来找一下具体逻辑。 从上面这段代码逻辑,你应该可以发现以下几点: <1>. saveparams参数是在server对象下面,而server对象正好是redisServer类型,如下图: 从上面图中 *saveparams 的注释上来看,你应该知道*saveparams是saveparam类型的数组,那现在是不是有强烈的好奇心想看一下saveparam类型是怎么定义的的呢??? 如下图:可以看到,saveparam参数里面有两个参数,seconds就是保存秒数,changes就是改变量,而这二个参数就对应着我们配置文件中的900 0 这样的配置节,想起来的没有哈~~~ <2>  然后我们通过if发现,如果终满足,就会最终调用rdbSaveBackground来持久化我们的rdb文件,简单吧。。。 好了,大概就这样了,希望对你有帮助。
15天玩转redis —— 第十一篇 让你彻底了解RDB存储结构
        接着上一篇说,这里我们来继续分析一下RDB文件存储结构,首先大家都知道RDB文件是在redis的“快照”的模式下才会产生,那么如果我们理解了RDB文件的结构,是不是让我们对“快照”模式能做到一个心中有数呢??? 一:RDB结构剖析首先呢,我们要对RDB文件有一个概念性的认识,比如下面画的图一样: 从图中,我们大概看到了RDB文件的一个简要的存储模式,但为了更好的方便对照,我准备save一个empty database,对比一下看看效果:  然后我们用winHex打开dump.rdb文件,看看它的16进制。好了,该打开的我都打开了,下面我们一一来比较一下。 1. Redis参数: 可以看到在16进制的前5个字节中,是“REDIS"五个大字母,这个的作用显而易见,肯定就是判断当前的文件是否为“RDB                      文件“,这样才方便用常量的时间来判别。。。 2. db_version: 在Redis字符之后,我们看到了占用4个字节的0006,这个就是RDB文件结构图中的 db_version。对吧,同样也很简单,                        就是判断当前Redis的版本号,对否??? 3. database:   由于我演示的是一个empty database,自然没有相应的结构,等下我们再插入记录,再对比一下。 4. EOF:   从winHex上面你是否看到了,它占用一个字节的空间,就是一个“y”上面加了两点,由于用unicode无法表示,所以出现了这么个乱码,当然16进制可以标识,就是所谓的“FF”,看到了没有??? 那么它的作用就是标识database的结束。 5. checksum: 从名字上你就可以看得到,它就是一个校验和,原理当然就是看文件是否损坏,或者是否被修改,这样有点像现在的OAuth验证,         对吧,它占用了8个字节,也就是最后的:DC  B3  43  F0 5A DC F2  56。。。 二:带数据的RDB文件结构演示 好了,上面我已经演示了除Database之外的所有参数,下面我们来set一个最简单的string类型,看看database结构是否如图所示。。。  用WinHex打开dump.rdb文件如下:  为了方便对照,我在图中标记了一下Database开始的位置,也就是十六进制的 FE。。。 1.  database [selectDB]: 可以看到,selectDB其实就是一个无法用unicode标记出来的一个字节,十六进制就是FE,当redis碰到这个字符                                      的时候就知道自己该干嘛了。。。。要准备执行select命令了。。。 2.  database[db_number]: 在FE之后,我们看到了十六进制的 ”03“,也就是切换到第三个数据库中,还记得吗? 我之前在set数据的时候,                                         曾今执行过 select 3,也就是将数据set到第3号数据库中,如果你忘记了,没关系,我用redis客户端打开给你看~~ 3. database[pairs][type]:  当你知道select哪一号数据库之后,接下来的操作就是怎么去分析key,value数据了,在key/value数据中,第一个                                      就是type,其实这个type就是你的value的encoding类型,可以看到在winHex中表示的0,也就是以下的源码: 4. database[pairs][key][len]:  在type之后,就是所谓的key,而key的组合模式是【len,value】,其中len就是key的长度,你也可以看到,                                              winHex中表示的是 “04”,也就是说name的长度为4。对吧。。。 5. database[pairs][key][value]  同样的道理,这里的模式也是【len,value】,前面为value的length,后面为value的具体值。。。 好了,大概就说这么多了,希望对你有帮助。。。              ~~~~~圣诞快乐~~~~~
使用强大的可视化工具redislive来监控我们的redis,别让自己死的太惨~~~
作为玩windows的码农,在centos上面装点东西,真的会崩溃的要死,,,我想大家也知道,在centos上面,你下载的是各种源代码,需要自己编译。。。而使用yum的话,这个吊软件包有点想nuget,不过yum上面都是老的掉牙的软件。。。有时候还要升级,比如我在安装redis的另一种监控redmon可视化工具,可惜这吊毛是ruby写的。。。比如使用ruby 1.9.3 以上的版本,使用rvm工具包安装,而且还限于国内的网络环境,容易被墙,还好可以使用淘宝源,不扯远了,本篇我们来安装redislive。 一:安装首先我们去官网看看:http://www.nkrode.com/article/real-time-dashboard-for-redis,从官网上可以看到,这吊毛是python写的,不过开心的是centos上面默认是装有python环境的,比如这里的centos7: 1. 安装pip   学过python的朋友应该知道,pip就是一个安装和管理python包的工具,现在我们可以去官网看一看https://pypi.python.org/pypi/pip,通过wget这个链接就可以了。下载之后,我们手工解压一下,然后进入到pip-8.1.2的根目录,执行:  python setup.py install,不过大家一定要是root权限才可以执行哦。。。  二:tornado,redis.py,python-dateutil依赖项    在redislive的官网上,我们发现这三样都是redislive项目的依赖项,我们必须要先安装,才能开启我们的项目,也是操蛋。。。《1》 tornado        这个依赖项,我们按照官网的模式进行安装,比如这样: pip install  tornado 《2》 redis.py       接着我要苦逼的安装这个傻吊,也是烦死了。。。继续使用pip这个包管理器 pip install redis 《3》 python-dateutil       这个工具包看名字估计是什么util之类的帮助工具,不管他,继续使用官方提供的 pip install python-dateutil 从上面可以看到,python-dateutil 这个工具已经在python中了,如果再安装的话,需要你更新即可,ok,更新就更新吧。。。 ok,安装到这里,我开心兴奋了,终于到现在redislive的依赖项已经全部安装完毕了。。。 三:下载redisLive源代码      从官网上可以看到,源代码地址要么git,要么download,这里我就选择wget方式吧。 然后手工解压一下,就是这副吊样。。。  不过在redislive中有三个非常重要的东西。。。。如下图: 1. redis-live.conf  这个就是redislive的配置文件,打开如下:从配置文件中,大概可以看到四个重要节点:《1》 RedisServers      这个就是我们要监视的redis端口,从[] 中可以看出,是可以灌入一批的。。。 《2》 DataStoreType    我们知道,redisLive是一个网站,既然是网站,它也必须存在一些网站数据和统计信息,所以给你两个可选项,要么redis,要么sqllite。这里呢,就选择redis吧。 《3》 RedisStatusServer    这个节点是配置你在选择datastoretype="redis"的情况下,指定的redis服务器地址。 《4》 SqliteStatusStore   这个也非常简单的,datastoretype="sqlite"的情况下,选择的sqlite的服务器地址。 最终,修改好的conf文件如下: 【为了方便起见,这里的监听服务器和redislive的统计存放服务器都是一个端口,强烈建议不要是一个哦】{"RedisServers":[{"server": "127.0.0.1","port" : 6379}],"DataStoreType" : "redis","RedisStatsServer":{"server" : "127.0.0.1","port" : 6379},"SqliteStatsStore":{"path":"/root/RedisLive/src/db/redislive.sqlite"}} 2. redis-monitor.py这个程序就是用来调用redis的monitor命令来收集redis的命令来进行统计,所以这个程序一定要保持恒久执行,而不像官网上设置120s。这里,我就设置 --duration=12000000,嘿嘿~~~~  3. redis-python.py终于到最后一步了,启动我们的web站点。。。操,开个东西真累。。。 到这里,我应该是可以打开8888端口的站点了,,,,兴奋的感觉有没有,嘿嘿。。。。 终于,站点开启了。一个非常漂亮的界面展示在亲的面前了。。。开心吧。。。