当前位置:首页 > 公众号精选 > 架构师社区
[导读]Redis可以看成NoSQL类型的数据库系统,Redis也提供了事务,但和传统的关系型数据库的事务既有相似性,也存在区别,因为Redis的架构基于操作系统的多路复用的IO接口,主处理流程是一个单线程,因此对于一个完整的命令,其处理都是原子性的,但如果需要将多个命令作为一个不可分割的处理序列,就需要使用事务。


作者:azurelaker

原文:blog.csdn.net/azurelaker/article/details/85045220


Redis可以看成NoSQL类型的数据库系统, Redis也提供了事务, 但是和传统的关系型数据库的事务既有相似性, 也存在区别.因为Redis的架构基于操作系统的多路复用的IO接口,主处理流程是一个单线程,因此对于一个完整的命令, 其处理都是原子性的, 但是如果需要将多个命令作为一个不可分割的处理序列, 就需要使用事务.


Redis事务有如下一些特点:
  • 事务中的命令序列执行的时候是原子性的,也就是说,其不会被其他客户端的命令中断. 这和传统的数据库的事务的属性是类似的.
  • 尽管Redis事务中的命令序列是原子执行的, 但是事务中的命令序列执行可以部分成功,这种情况下,Redis事务不会执行回滚操作. 这和传统关系型数据库的事务是有区别的.
  • 尽管Redis有RDB和AOF两种数据持久化机制, 但是其设计目标是高效率的cache系统. Redis事务只保证将其命令序列中的操作结果提交到内存中,不保证持久化到磁盘文件. 更进一步的, Redis事务和RDB持久化机制没有任何关系, 因为RDB机制是对内存数据结构的全量的快照.由于AOF机制是一种增量持久化,所以事务中的命令序列会提交到AOF的缓存中.但是AOF机制将其缓存写入磁盘文件是由其配置的实现策略决定的,和Redis事务没有关系.


Redis事务API
从宏观上来讲, Redis事务开始后, 会缓存后续的操作命令及其操作数据,当事务提交时,原子性的执行缓存的命令序列.从版本 2.2开始,Redis提供了一种乐观的锁机制, 配合这种机制,Redis事务提交时, 变成了事务的条件执行. 具体的说,如果乐观锁失败了,事务提交时, 丢弃事务中的命令序列,如果乐观锁成功了, 事务提交时,才会执行其命令序列.当然,也可以不使用乐观锁机制, 在事务提交时, 无条件执行事务的命令序列.
Redis事务涉及到 MULTI, EXEC, DISCARD, WATCH UNWATCH这五个命令:
  • 事务开始的命令是MULTI, 该命令返回OK提示信息. Redis不支持事务嵌套,执行多次MULTI命令和执行一次是相同的效果.嵌套执行MULTI命令时,Redis只是返回错误提示信息.
  • EXEC是事务的提交命令,事务中的命令序列将被执行(或者不被执行,比如乐观锁失败等).该命令将返回响应数组,其内容对应事务中的命令执行结果.
  • WATCH命令是开始执行乐观锁,该命令的参数是key(可以有多个), Redis将执行WATCH命令的客户端对象和key进行关联,如果其他客户端修改了这些key,则执行WATCH命令的客户端将被设置乐观锁失败的标志.该命令必须在事务开始前执行,即在执行MULTI命令前执行WATCH命令,否则执行无效,并返回错误提示信息.
  • UNWATCH命令将取消当前客户端对象的乐观锁key,该客户端对象的事务提交将变成无条件执行.
  • DISCARD命令将结束事务,并且会丢弃全部的命令序列.
需要注意的是,EXEC命令和DISCARD命令结束事务时,会调用UNWATCH命令,取消该客户端对象上所有的乐观锁key.


无条件提交
如果不使用乐观锁, 则事务为无条件提交.下面是一个事务执行的例子:

multi+OKincr key1+QUEUEDset key2 val2+QUEUEDexec*2:1+OK
当客户端开始事务后, 后续发送的命令将被Redis缓存起来,Redis向客户端返回响应提示字符串QUEUED.当执行EXEC提交事务时,缓存的命令依次被执行,返回命令序列的执行结果.
事务的错误处理
事务提交命令EXEC有可能会失败, 有三种类型的失败场景:
  • 在事务提交之前,客户端执行的命令缓存失败.比如命令的语法错误(命令参数个数错误, 不支持的命令等等).如果发生这种类型的错误,Redis将向客户端返回包含错误提示信息的响应.
  • 事务提交时,之前缓存的命令有可能执行失败.
  • 由于乐观锁失败,事务提交时,将丢弃之前缓存的所有命令序列.

当发生第一种失败的情况下,客户端在执行事务提交命令EXEC时,将丢弃事务中所有的命令序列.下面是一个例子:

		
multi+OKincr num1 num2-ERR wrong number of arguments for 'incr' commandset key1 val1+QUEUEDexec-EXECABORT Transaction discarded because of previous errors. 

命令incr num1 num2并没有缓存成功, 因为incr命令只允许有一个参数,是个语法错误的命令.Redis无法成功缓存该命令,向客户端发送错误提示响应.接下来的set key1 val1命令缓存成功.最后执行事务提交的时候,因为发生过命令缓存失败,所以事务中的所有命令序列被丢弃.

如果事务中的所有命令序列都缓存成功,在提交事务的时候,缓存的命令中仍可能执行失败.但Redis不会对事务做任何回滚补救操作.下面是一个这样的例子:

multi+OKset key1 val1+QUEUEDlpop key1+QUEUEDincr num1+QUEUEDexec*3+OK-WRONGTYPE Operation against a key holding the wrong kind of value:1
		


所有的命令序列都缓存成功,但是在提交事务的时候,命令set key1 val1和incr num1执行成功了,Redis保存了其执行结果,但是命令lpop key1执行失败了.

乐观锁机制
Redis事务和乐观锁一起使用时,事务将成为有条件提交.

关于乐观锁,需要注意的是:

  • WATCH命令必须在MULTI命令之前执行. WATCH命令可以执行多次.

  • WATCH命令可以指定乐观锁的多个key,如果在事务过程中,任何一个key被其他客户端改变,则当前客户端的乐观锁失败,事务提交时,将丢弃所有命令序列.

  • 多个客户端的WATCH命令可以指定相同的key.

  • WATCH命令指定乐观锁后,可以接着执行MULTI命令进入事务上下文,也可以在WATCH命令和MULTI命令之间执行其他命令. 具体使用方式取决于场景需求,不在事务中的命令将立即被执行.

  • 如果WATCH命令指定的乐观锁的key,被当前客户端改变,在事务提交时,乐观锁不会失败.

  • 如果WATCH命令指定的乐观锁的key具有超时属性,并且该key在WATCH命令执行后, 在事务提交命令EXEC执行前超时, 则乐观锁不会失败.如果该key被其他客户端对象修改,则乐观锁失败.


一个执行乐观锁机制的事务例子:

rpush list v1 v2 v3:3watch list+OKmulti+OKlpop list+QUEUEDexec*1$2v1
		


下面是另一个例子,乐观锁被当前客户端改变, 事务提交成功:

watch num+OKmulti+OKincr num+QUEUEDexec*1:2


Redis事务和乐观锁配合使用时, 可以构造实现单个Redis命令不能完成的更复杂的逻辑.

Redis事务的源码实现机制

首先,事务开始的MULTI命令执行的函数为multiCommand, 其实现为(multi.c):

void multiCommand(redisClient *c) { if (c->flags & REDIS_MULTI) { addReplyError(c,"MULTI calls can not be nested"); return; } c->flags |= REDIS_MULTI; addReply(c,shared.ok);}
		


该命令只是在当前客户端对象上加上REDIS_MULTI标志, 表示该客户端进入了事务上下文.

客户端进入事务上下文后,后续执行的命令将被缓存. 函数processCommand是Redis处理客户端命令的入口函数, 其实现为(redis.c):



		
int processCommand(redisClient *c) { /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in * a regular command proc. */ if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= REDIS_CLOSE_AFTER_REPLY; return REDIS_ERR; }  /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { flagTransaction(c); addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr); return REDIS_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { flagTransaction(c); addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return REDIS_OK; }  /* Check if the user is authenticated */ if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) { flagTransaction(c); addReply(c,shared.noautherr); return REDIS_OK; }  /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile * keys in the dataset). If there are not the only thing we can do * is returning an error. */ if (server.maxmemory) { int retval = freeMemoryIfNeeded(); /* freeMemoryIfNeeded may flush slave output buffers. This may result * into a slave, that may be the active client, to be freed. */ if (server.current_client == NULL) return REDIS_ERR;  /* It was impossible to free enough memory, and the command the client * is trying to execute is denied during OOM conditions? Error. */ if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) { flagTransaction(c); addReply(c, shared.oomerr); return REDIS_OK; } }  /* Don't accept write commands if there are problems persisting on disk * and if this is a master instance. */ if (((server.stop_writes_on_bgsave_err && server.saveparamslen > 0 && server.lastbgsave_status == REDIS_ERR) || server.aof_last_write_status == REDIS_ERR) && server.masterhost == NULL && (c->cmd->flags & REDIS_CMD_WRITE || c->cmd->proc == pingCommand)) { flagTransaction(c); if (server.aof_last_write_status == REDIS_OK) addReply(c, shared.bgsaveerr); else addReplySds(c, sdscatprintf(sdsempty(), "-MISCONF Errors writing to the AOF file: %s\r\n", strerror(server.aof_last_write_errno))); return REDIS_OK; }  /* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ if (server.masterhost == NULL && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && c->cmd->flags & REDIS_CMD_WRITE && server.repl_good_slaves_count < server.repl_min_slaves_to_write) { flagTransaction(c); addReply(c, shared.noreplicaserr); return REDIS_OK; }  /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ if (server.masterhost && server.repl_slave_ro && !(c->flags & REDIS_MASTER) && c->cmd->flags & REDIS_CMD_WRITE) { addReply(c, shared.roslaveerr); return REDIS_OK; }  /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */ if (c->flags & REDIS_PUBSUB && c->cmd->proc != pingCommand && c->cmd->proc != subscribeCommand && c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context"); return REDIS_OK; }  /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and * we are a slave with a broken link with master. */ if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED && server.repl_serve_stale_data == 0 && !(c->cmd->flags & REDIS_CMD_STALE)) { flagTransaction(c); addReply(c, shared.masterdownerr); return REDIS_OK; }  /* Loading DB? Return an error if the command has not the * REDIS_CMD_LOADING flag. */ if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) { addReply(c, shared.loadingerr); return REDIS_OK; }  /* Lua script too slow? Only allow a limited number of commands. */ if (server.lua_timedout && c->cmd->proc != authCommand && c->cmd->proc != replconfCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'n') && !(c->cmd->proc == scriptCommand && c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) { flagTransaction(c); addReply(c, shared.slowscripterr); return REDIS_OK; }  /* Exec the command */ if (c->flags & REDIS_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { queueMultiCommand(c); addReply(c,shared.queued); } else { call(c,REDIS_CALL_FULL); if (listLength(server.ready_keys)) handleClientsBlockedOnLists(); } return REDIS_OK;}
		


Line145:151当客户端处于事务上下文时, 如果接收的是非事务命令(MULTI, EXEC, WATCH, DISCARD), 则调用queueMultiCommand将命令缓存起来,然后向客户端发送成功响应.

在函数processCommand中, 在缓存命令之前, 如果检查到客户端发送的命令不存在,或者命令参数个数不正确等情况, 会调用函数flagTransaction标命令缓存失败.也就是说,函数processCommand中, 所有调用函数flagTransaction的条件分支,都是返回失败响应.

缓存命令的函数queueMultiCommand的实现为(multi.c):

/* Add a new command into the MULTI commands queue */void queueMultiCommand(redisClient *c) { multiCmd *mc; int j;  c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.count+1)); mc = c->mstate.commands+c->mstate.count; mc->cmd = c->cmd; mc->argc = c->argc; mc->argv = zmalloc(sizeof(robj*)*c->argc); memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); for (j = 0; j < c->argc; j++) incrRefCount(mc->argv[j]); c->mstate.count++;}
		


在事务上下文中, 使用multiCmd结构来缓存命令, 该结构定义为(redis.h):


/* Client MULTI/EXEC state */typedef struct multiCmd { robj **argv; int argc; struct redisCommand *cmd;} multiCmd;
		


其中argv字段指向命令的参数内存地址,argc为命令参数个数,cmd为命令描述结构, 包括名字和函数指针等.

命令参数的内存空间已经使用动态分配记录于客户端对象的argv字段了,multiCmd结构的argv字段指向客户端对象redisClient的argv即可.

无法缓存命令时, 调用函数flagTransaction,该函数的实现为(multi.c):


/* Flag the transacation as DIRTY_EXEC so that EXEC will fail. * Should be called every time there is an error while queueing a command. */void flagTransaction(redisClient *c) { if (c->flags & REDIS_MULTI) c->flags |= REDIS_DIRTY_EXEC;}
		


该函数在客户端对象中设置REDIS_DIRTY_EXEC标志, 如果设置了这个标志, 事务提交时, 命令序列将被丢弃.

最后,在事务提交时, 函数processCommand中将调用call(c,REDIS_CALL_FULL);, 其实现为(redis.c):


/* Call() is the core of Redis execution of a command */void call(redisClient *c, int flags) { long long dirty, start, duration; int client_old_flags = c->flags;  /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ if (listLength(server.monitors) && !server.loading && !(c->cmd->flags & (REDIS_CMD_SKIP_MONITOR|REDIS_CMD_ADMIN))) { replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); }  /* Call the command. */ c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); redisOpArrayInit(&server.also_propagate); dirty = server.dirty; start = ustime(); c->cmd->proc(c); duration = ustime()-start; dirty = server.dirty-dirty; if (dirty < 0) dirty = 0;  /* When EVAL is called loading the AOF we don't want commands called * from Lua to go into the slowlog or to populate statistics. */ if (server.loading && c->flags & REDIS_LUA_CLIENT) flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);  /* If the caller is Lua, we want to force the EVAL caller to propagate * the script if the command flag or client flag are forcing the * propagation. */ if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) { if (c->flags & REDIS_FORCE_REPL) server.lua_caller->flags |= REDIS_FORCE_REPL; if (c->flags & REDIS_FORCE_AOF) server.lua_caller->flags |= REDIS_FORCE_AOF; }  /* Log the command into the Slow log if needed, and populate the * per-command statistics that we show in INFO commandstats. */ if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand) { char *latency_event = (c->cmd->flags & REDIS_CMD_FAST) ? "fast-command" : "command"; latencyAddSampleIfNeeded(latency_event,duration/1000); slowlogPushEntryIfNeeded(c->argv,c->argc,duration); } if (flags & REDIS_CALL_STATS) { c->cmd->microseconds += duration; c->cmd->calls++; }  /* Propagate the command into the AOF and replication link */ if (flags & REDIS_CALL_PROPAGATE) { int flags = REDIS_PROPAGATE_NONE;  if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL; if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF; if (dirty) flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF); if (flags != REDIS_PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,flags); }  /* Restore the old FORCE_AOF/REPL flags, since call can be executed * recursively. */ c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL); c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL);  /* Handle the alsoPropagate() API to handle commands that want to propagate * multiple separated commands. */ if (server.also_propagate.numops) { int j; redisOp *rop;  for (j = 0; j < server.also_propagate.numops; j++) { rop = &server.also_propagate.ops[j]; propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target); } redisOpArrayFree(&server.also_propagate); } server.stat_numcommands++;}

在函数call中通过执行c->cmd->proc(c);调用具体的命令函数.事务提交命令EXEC对应的执行函数为execCommand, 其实现为(multi.c):


void execCommand(redisClient *c) { int j; robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */  if (!(c->flags & REDIS_MULTI)) { addReplyError(c,"EXEC without MULTI"); return; }  /* Check if we need to abort the EXEC because: * 1) Some WATCHed key was touched. * 2) There was a previous error while queueing commands. * A failed EXEC in the first case returns a multi bulk nil object * (technically it is not an error but a special behavior), while * in the second an EXECABORT error is returned. */ if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) { addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr : shared.nullmultibulk); discardTransaction(c); goto handle_monitor; }  /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; orig_argc = c->argc; orig_cmd = c->cmd; addReplyMultiBulkLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; c->argv = c->mstate.commands[j].argv; c->cmd = c->mstate.commands[j].cmd;  /* Propagate a MULTI request once we encounter the first write op. * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) { execCommandPropagateMulti(c); must_propagate = 1; }  call(c,REDIS_CALL_FULL);  /* Commands may alter argc/argv, restore mstate. */ c->mstate.commands[j].argc = c->argc; c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].cmd = c->cmd; } c->argv = orig_argv; c->argc = orig_argc; c->cmd = orig_cmd; discardTransaction(c); /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ if (must_propagate) server.dirty++; handle_monitor: /* Send EXEC to clients waiting data from MONITOR. We do it here * since the natural order of commands execution is actually: * MUTLI, EXEC, ... commands inside transaction ... * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command * table, and we do it here with correct ordering. */ if (listLength(server.monitors) && !server.loading) replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);}
		



LINE8:11检查EXEC命令和MULTI命令是否配对使用, 单独执行EXEC命令是没有意义的.

LINE19:24检查客户端对象是否具有REDIS_DIRTY_CAS或者REDIS_DIRTY_EXEC标志, 如果存在,则调用函数discardTransaction丢弃命令序列, 向客户端返回失败响应.

如果没有检查到任何错误,则先执行unwatchAllKeys(c);取消该客户端上所有的乐观锁key.

LINE32:52依次执行缓存的命令序列,这里有两点需要注意的是:


  • 事务可能需要同步到AOF缓存或者replica备份节点中.如果事务中的命令序列都是读操作, 则没有必要向AOF和replica进行同步.如果事务的命令序列中包含写命令,则MULTI,EXEC和相关的写命令会向AOF和replica进行同步.根据LINE41:44的条件判断,执行execCommandPropagateMulti(c);保证MULTI命令同步, LINE59检查EXEC命令是否需要同步, 即MULTI命令和EXEC命令必须保证配对同步.EXEC命令的同步执行在函数的call中LINE62propagate(c->cmd,c->db->id,c->argv,c->argc,flags);, 具体的写入命令由各自的执行函数负责同步.

  • 这里执行命令序列时, 通过执行call(c,REDIS_CALL_FULL);所以call函数是递归调用.


所以,综上所述, Redis事务其本质就是,以不可中断的方式依次执行缓存的命令序列,将结果保存到内存cache中.

事务提交时, 丢弃命令序列会调用函数discardTransaction, 其实现为(multi.c):


void discardTransaction(redisClient *c) { freeClientMultiState(c); initClientMultiState(c); c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC); unwatchAllKeys(c);}


该函数调用freeClientMultiState释放multiCmd对象内存.调用initClientMultiState复位客户端对象的缓存命令管理结构.调用unwatchAllKeys取消该客户端的乐观锁.


WATCH命令执行乐观锁, 其对应的执行函数为watchCommand, 其实现为(multi.c):

 void watchCommand(redisClient *c) { int j;  if (c->flags & REDIS_MULTI) { addReplyError(c,"WATCH inside MULTI is not allowed"); return; } for (j = 1; j < c->argc; j++) watchForKey(c,c->argv[j]); addReply(c,shared.ok);}


进而调用函数watchForKey, 其实现为(multi.c):

 /* Watch for the specified key */void watchForKey(redisClient *c, robj *key) { list *clients = NULL; listIter li; listNode *ln; watchedKey *wk;  /* Check if we are already watching for this key */ listRewind(c->watched_keys,&li); while((ln = listNext(&li))) { wk = listNodeValue(ln); if (wk->db == c->db && equalStringObjects(key,wk->key)) return; /* Key already watched */ } /* This key is not already watched in this DB. Let's add it */ clients = dictFetchValue(c->db->watched_keys,key); if (!clients) { clients = listCreate(); dictAdd(c->db->watched_keys,key,clients); incrRefCount(key); } listAddNodeTail(clients,c); /* Add the new key to the list of keys watched by this client */ wk = zmalloc(sizeof(*wk)); wk->key = key; wk->db = c->db; incrRefCount(key); listAddNodeTail(c->watched_keys,wk);}




关于乐观锁的key, 既保存于其客户端对象的watched_keys链表中, 也保存于全局数据库对象的watched_keys哈希表中.

LINE10:14检查客户端对象的链表中是否已经存在该key, 如果已经存在, 则直接返回.LINE16在全局数据库中返回该key对应的客户端对象链表, 如果链表不存在, 说明其他客户端没有使用该key作为乐观锁, 如果链表存在, 说明其他客户端已经使用该key作为乐观锁. LINE22将当前客户端对象记录于该key对应的链表中. LINE28将该key记录于当前客户端的key链表中.


当前客户端执行乐观锁以后, 其他客户端的写入命令可能修改该key值.所有具有写操作属性的命令都会执行函数signalModifiedKey, 其实现为(db.c):


void signalModifiedKey(redisDb *db, robj *key) { touchWatchedKey(db,key);}
		



函数touchWatchedKey的实现为(multi.c):

 /* "Touch" a key, so that if this key is being WATCHed by some client the * next EXEC will fail. */void touchWatchedKey(redisDb *db, robj *key) { list *clients; listIter li; listNode *ln;  if (dictSize(db->watched_keys) == 0) return; clients = dictFetchValue(db->watched_keys, key); if (!clients) return;  /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ /* Check if we are already watching for this key */ listRewind(clients,&li); while((ln = listNext(&li))) { redisClient *c = listNodeValue(ln);  c->flags |= REDIS_DIRTY_CAS; }}


语句 if (dictSize(db->watched_keys) == 0) return;检查全局数据库中的哈希表 watched_keys是否为空, 如果为空,说明没有任何客户端执行 WATCH命令, 直接返回.如果该哈希表不为空, 取回该key对应的客户端链表结构,并把该链表中的每个客户端对象设置 REDIS_DIRTY_CAS标志. 前面在 EXEC的执行命令中,进行过条件判断, 如果客户端对象具有这个标志, 则丢弃事务中的命令序列.

在执行 EXEC, DISCARD, UNWATCH命令以及在客户端结束连接的时候,都会取消乐观锁, 最终都会执行函数 unwatchAllKeys, 其实现为(multi.c):
 /* Unwatch all the keys watched by this client. To clean the EXEC dirty * flag is up to the caller. */void unwatchAllKeys(redisClient *c) { listIter li; listNode *ln;  if (listLength(c->watched_keys) == 0) return; listRewind(c->watched_keys,&li); while((ln = listNext(&li))) { list *clients; watchedKey *wk;  /* Lookup the watched key -> clients list and remove the client * from the list */ wk = listNodeValue(ln); clients = dictFetchValue(wk->db->watched_keys, wk->key); redisAssertWithInfo(c,NULL,clients != NULL); listDelNode(clients,listSearchKey(clients,c)); /* Kill the entry at all if this was the only client */ if (listLength(clients) == 0) dictDelete(wk->db->watched_keys, wk->key); /* Remove this watched key from the client->watched list */ listDelNode(c->watched_keys,ln); decrRefCount(wk->key); zfree(wk); }}


语句if (listLength(c->watched_keys) == 0) return;判断如果当前客户端对象的watched_keys链表为空,说明当前客户端没有执行WATCH命令,直接返回.如果该链表非空, 则依次遍历该链表中的key, 并从该链表中删除key, 同时,获得全局数据库中的哈希表watched_keys中该key对应的客户端链表, 删除当前客户端对象.


免责声明:本文内容由21ic获得授权后发布,版权归原作者所有,本平台仅提供信息存储服务。文章仅代表作者个人观点,不代表本平台立场,如有问题,请联系我们,谢谢!

本站声明: 本文章由作者或相关机构授权发布,目的在于传递更多信息,并不代表本站赞同其观点,本站亦不保证或承诺内容真实性等。需要转载请联系该专栏作者,如若文章内容侵犯您的权益,请及时联系本站删除。
换一批
延伸阅读

9月2日消息,不造车的华为或将催生出更大的独角兽公司,随着阿维塔和赛力斯的入局,华为引望愈发显得引人瞩目。

关键字: 阿维塔 塞力斯 华为

加利福尼亚州圣克拉拉县2024年8月30日 /美通社/ -- 数字化转型技术解决方案公司Trianz今天宣布,该公司与Amazon Web Services (AWS)签订了...

关键字: AWS AN BSP 数字化

伦敦2024年8月29日 /美通社/ -- 英国汽车技术公司SODA.Auto推出其旗舰产品SODA V,这是全球首款涵盖汽车工程师从创意到认证的所有需求的工具,可用于创建软件定义汽车。 SODA V工具的开发耗时1.5...

关键字: 汽车 人工智能 智能驱动 BSP

北京2024年8月28日 /美通社/ -- 越来越多用户希望企业业务能7×24不间断运行,同时企业却面临越来越多业务中断的风险,如企业系统复杂性的增加,频繁的功能更新和发布等。如何确保业务连续性,提升韧性,成...

关键字: 亚马逊 解密 控制平面 BSP

8月30日消息,据媒体报道,腾讯和网易近期正在缩减他们对日本游戏市场的投资。

关键字: 腾讯 编码器 CPU

8月28日消息,今天上午,2024中国国际大数据产业博览会开幕式在贵阳举行,华为董事、质量流程IT总裁陶景文发表了演讲。

关键字: 华为 12nm EDA 半导体

8月28日消息,在2024中国国际大数据产业博览会上,华为常务董事、华为云CEO张平安发表演讲称,数字世界的话语权最终是由生态的繁荣决定的。

关键字: 华为 12nm 手机 卫星通信

要点: 有效应对环境变化,经营业绩稳中有升 落实提质增效举措,毛利润率延续升势 战略布局成效显著,战新业务引领增长 以科技创新为引领,提升企业核心竞争力 坚持高质量发展策略,塑强核心竞争优势...

关键字: 通信 BSP 电信运营商 数字经济

北京2024年8月27日 /美通社/ -- 8月21日,由中央广播电视总台与中国电影电视技术学会联合牵头组建的NVI技术创新联盟在BIRTV2024超高清全产业链发展研讨会上宣布正式成立。 活动现场 NVI技术创新联...

关键字: VI 传输协议 音频 BSP

北京2024年8月27日 /美通社/ -- 在8月23日举办的2024年长三角生态绿色一体化发展示范区联合招商会上,软通动力信息技术(集团)股份有限公司(以下简称"软通动力")与长三角投资(上海)有限...

关键字: BSP 信息技术
关闭
关闭