前提
Lettuce是一个Redis的Java驱动包,初识她的时候是使用RedisTemplate的时候遇到点问题Debug到底层的一些源码,发现spring-data-redis的驱动包在某个版本以后替换为Lettuce。Lettuce翻译为生菜,没错,就是吃的那种生菜,因此它的Logo长这样:html

既然能被Spring生态所承认,Lettuce想必有过人之处,因而笔者花时间阅读她的官方文档,整理测试示例,写下这篇文章。编写本文时所使用的版本为Lettuce 5.1.8.RELEASE,SpringBoot 2.1.8.RELEASE,JDK [8,11]。超长警告:这篇文章断断续续花了两周完成,超过4万字…..java
Lettuce简介
Lettuce是一个高性能基于Java编写的Redis驱动框架,底层集成了Project Reactor提供自然的反应式编程,通讯框架集成了Netty使用了非阻塞IO,5.x版本以后融合了JDK1.8的异步编程特性,在保证高性能的同时提供了十分丰富易用的API,5.1版本的新特性以下:node
- 支持
Redis的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX。 - 支持经过
Brave模块跟踪Redis命令执行。 - 支持
Redis Streams。 - 支持异步的主从链接。
- 支持异步链接池。
- 新增命令最多执行一次模式(禁止自动重连)。
- 全局命令超时设置(对异步和反应式命令也有效)。
- ……等等
注意一点:Redis的版本至少须要2.6,固然越高越好,API的兼容性比较强大。react
只须要引入单个依赖就能够开始愉快地使用Lettuce:web
- Maven
io.lettuce
lettuce-core
5.1.8.RELEASE
复制代码
- Gradle
dependencies { compile 'io.lettuce:lettuce-core:5.1.8.RELEASE' } 复制代码
链接Redis
单机、哨兵、集群模式下链接Redis须要一个统一的标准去表示链接的细节信息,在Lettuce中这个统一的标准是RedisURI。能够经过三种方式构造一个RedisURI实例:redis
- 定制的字符串
URI语法:
RedisURI uri = RedisURI.create("redis://localhost/"); 复制代码
- 使用建造器(
RedisURI.Builder):
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build(); 复制代码
- 直接经过构造函数实例化:
RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS); 复制代码
定制的链接URI语法
- 单机(前缀为
redis://)
格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]] 完整:redis://mypassword@127.0.0.1:6379/0?timeout=10s 简单:redis://localhost 复制代码
- 单机而且使用
SSL(前缀为rediss://) <== 注意后面多了个s
格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]] 完整:rediss://mypassword@127.0.0.1:6379/0?timeout=10s 简单:rediss://localhost 复制代码
- 单机
Unix Domain Sockets模式(前缀为redis-socket://)
格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]] 完整:redis-socket:///tmp/redis?timeout=10s&_database=0 复制代码
- 哨兵(前缀为
redis-sentinel://)
格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId 完整:redis-sentinel://mypassword@127.0.0.1:6379,127.0.0.1:6380/0?timeout=10s#mymaster 复制代码
超时时间单位:spring
- d 天
- h 小时
- m 分钟
- s 秒钟
- ms 毫秒
- us 微秒
- ns 纳秒
我的建议使用RedisURI提供的建造器,毕竟定制的URI虽然简洁,可是比较容易出现人为错误。鉴于笔者没有SSL和Unix Domain Socket的使用场景,下面不对这两种链接方式进行列举。shell
基本使用
Lettuce使用的时候依赖于四个主要组件:apache
RedisURI:链接信息。RedisClient:Redis客户端,特殊地,集群链接有一个定制的RedisClusterClient。Connection:Redis链接,主要是StatefulConnection或者StatefulRedisConnection的子类,链接的类型主要由链接的具体方式(单机、哨兵、集群、订阅发布等等)选定,比较重要。RedisCommands:Redis命令API接口,基本上覆盖了Redis发行版本的全部命令,提供了同步(sync)、异步(async)、反应式(reative)的调用方式,对于使用者而言,会常常跟RedisCommands系列接口打交道。
一个基本使用例子以下:编程
@Test public void testSetGet() throws Exception { RedisURI redisUri = RedisURI.builder() // <1> 建立单机链接的链接信息 .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); // <2> 建立客户端 StatefulRedisConnection
connection = redisClient.connect(); // <3> 建立线程安全的链接 RedisCommands
redisCommands = connection.sync(); // <4> 建立同步命令 SetArgs setArgs = SetArgs.Builder.nx().ex(5); String result = redisCommands.set("name", "throwable", setArgs); Assertions.assertThat(result).isEqualToIgnoringCase("OK"); result = redisCommands.get("name"); Assertions.assertThat(result).isEqualTo("throwable"); // ... 其余操做 connection.close(); // <5> 关闭链接 redisClient.shutdown(); // <6> 关闭客户端 } 复制代码
注意:
- <5>:关闭链接通常在应用程序中止以前操做,一个应用程序中的一个
Redis驱动实例不须要太多的链接(通常状况下只须要一个链接实例就能够,若是有多个链接的须要能够考虑使用链接池,其实Redis目前处理命令的模块是单线程,在客户端多个链接多线程调用理论上没有效果)。 - <6>:关闭客户端通常应用程序中止以前操做,若是条件容许的话,基于后开先闭原则,客户端关闭应该在链接关闭以后操做。
API
Lettuce主要提供三种API:
- 同步(
sync):RedisCommands。 - 异步(
async):RedisAsyncCommands。 - 反应式(
reactive):RedisReactiveCommands。
先准备好一个单机Redis链接备用:
private static StatefulRedisConnection
CONNECTION; private static RedisClient CLIENT; @BeforeClass public static void beforeClass() { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); CLIENT = RedisClient.create(redisUri); CONNECTION = CLIENT.connect(); } @AfterClass public static void afterClass() throws Exception { CONNECTION.close(); CLIENT.shutdown(); } 复制代码
Redis命令API的具体实现能够直接从StatefulRedisConnection实例获取,见其接口定义:
public interface StatefulRedisConnection
extends StatefulConnection
{ boolean isMulti(); RedisCommands
sync(); RedisAsyncCommands
async(); RedisReactiveCommands
reactive(); } 复制代码
值得注意的是,在不指定编码解码器RedisCodec的前提下,RedisClient建立的StatefulRedisConnection实例通常是泛型实例StatefulRedisConnection,也就是全部命令
API的KEY和VALUE都是String类型,这种使用方式能知足大部分的使用场景。固然,必要的时候能够定制编码解码器RedisCodec。
同步API
先构建RedisCommands实例:
private static RedisCommands
COMMAND; @BeforeClass public static void beforeClass() { COMMAND = CONNECTION.sync(); } 复制代码
基本使用:
@Test public void testSyncPing() throws Exception { String pong = COMMAND.ping(); Assertions.assertThat(pong).isEqualToIgnoringCase("PONG"); } @Test public void testSyncSetAndGet() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); COMMAND.set("name", "throwable", setArgs); String value = COMMAND.get("name"); log.info("Get value: {}", value); } // Get value: throwable 复制代码
同步API在全部命令调用以后会当即返回结果。若是熟悉Jedis的话,RedisCommands的用法其实和它相差不大。
异步API
先构建RedisAsyncCommands实例:
private static RedisAsyncCommands
ASYNC_COMMAND; @BeforeClass public static void beforeClass() { ASYNC_COMMAND = CONNECTION.async(); } 复制代码
基本使用:
@Test public void testAsyncPing() throws Exception { RedisFuture
redisFuture = ASYNC_COMMAND.ping(); log.info("Ping result:{}", redisFuture.get()); } // Ping result:PONG 复制代码
RedisAsyncCommands全部方法执行返回结果都是RedisFuture实例,而RedisFuture接口的定义以下:
public interface RedisFuture
extends CompletionStage
, Future
{ String getError(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; } 复制代码
也就是,RedisFuture能够无缝使用Future或者JDK1.8中引入的CompletableFuture提供的方法。举个例子:
@Test public void testAsyncSetAndGet1() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); RedisFuture
future = ASYNC_COMMAND.set("name", "throwable", setArgs); // CompletableFuture#thenAccept() future.thenAccept(value -> log.info("Set命令返回:{}", value)); // Future#get() future.get(); } // Set命令返回:OK @Test public void testAsyncSetAndGet2() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); CompletableFuture
result = (CompletableFuture
) ASYNC_COMMAND.set("name", "throwable", setArgs) .thenAcceptBoth(ASYNC_COMMAND.get("name"), (s, g) -> { log.info("Set命令返回:{}", s); log.info("Get命令返回:{}", g); }); result.get(); } // Set命令返回:OK // Get命令返回:throwable 复制代码
若是能熟练使用CompletableFuture和函数式编程技巧,能够组合多个RedisFuture完成一些列复杂的操做。
反应式API
Lettuce引入的反应式编程框架是Project Reactor,若是没有反应式编程经验能够先自行了解一下Project Reactor。
构建RedisReactiveCommands实例:
private static RedisReactiveCommands
REACTIVE_COMMAND; @BeforeClass public static void beforeClass() { REACTIVE_COMMAND = CONNECTION.reactive(); } 复制代码
根据Project Reactor,RedisReactiveCommands的方法若是返回的结果只包含0或1个元素,那么返回值类型是Mono,若是返回的结果包含0到N(N大于0)个元素,那么返回值是Flux。举个例子:
@Test public void testReactivePing() throws Exception { Mono
ping = REACTIVE_COMMAND.ping(); ping.subscribe(v -> log.info("Ping result:{}", v)); Thread.sleep(1000); } // Ping result:PONG @Test public void testReactiveSetAndGet() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); REACTIVE_COMMAND.set("name", "throwable", setArgs).block(); REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value)); Thread.sleep(1000); } // Get命令返回:throwable @Test public void testReactiveSet() throws Exception { REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block(); Flux
flux = REACTIVE_COMMAND.smembers("food"); flux.subscribe(log::info); REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block(); Thread.sleep(1000); } // meat // bread // fish 复制代码
举个更加复杂的例子,包含了事务、函数转换等:
@Test public void testReactiveFunctional() throws Exception { REACTIVE_COMMAND.multi().doOnSuccess(r -> { REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe(); REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe(); }).flatMap(s -> REACTIVE_COMMAND.exec()) .doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded())) .subscribe(); Thread.sleep(1000); } // OK // 2 // Discarded:false 复制代码
这个方法开启一个事务,先把counter设置为1,再将counter自增1。
发布和订阅
非集群模式下的发布订阅依赖于定制的链接StatefulRedisPubSubConnection,集群模式下的发布订阅依赖于定制的链接StatefulRedisClusterPubSubConnection,二者分别来源于RedisClient#connectPubSub()系列方法和RedisClusterClient#connectPubSub():
- 非集群模式:
// 多是单机、普通主从、哨兵等非集群模式的客户端 RedisClient client = ... StatefulRedisPubSubConnection
connection = client.connectPubSub(); connection.addListener(new RedisPubSubListener
() { ... }); // 同步命令 RedisPubSubCommands
sync = connection.sync(); sync.subscribe("channel"); // 异步命令 RedisPubSubAsyncCommands
async = connection.async(); RedisFuture
future = async.subscribe("channel"); // 反应式命令 RedisPubSubReactiveCommands
reactive = connection.reactive(); reactive.subscribe("channel").subscribe(); reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe() 复制代码
- 集群模式:
// 使用方式其实和非集群模式基本一致 RedisClusterClient clusterClient = ... StatefulRedisClusterPubSubConnection
connection = clusterClient.connectPubSub(); connection.addListener(new RedisPubSubListener
() { ... }); RedisPubSubCommands
sync = connection.sync(); sync.subscribe("channel"); // ... 复制代码
这里用单机同步命令的模式举一个Redis键空间通知(Redis Keyspace Notifications)的例子:
@Test public void testSyncKeyspaceNotification() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) // 注意这里只能是0号库 .withDatabase(0) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection
redisConnection = redisClient.connect(); RedisCommands
redisCommands = redisConnection.sync(); // 只接收键过时的事件 redisCommands.configSet("notify-keyspace-events", "Ex"); StatefulRedisPubSubConnection
connection = redisClient.connectPubSub(); connection.addListener(new RedisPubSubAdapter<>() { @Override public void psubscribed(String pattern, long count) { log.info("pattern:{},count:{}", pattern, count); } @Override public void message(String pattern, String channel, String message) { log.info("pattern:{},channel:{},message:{}", pattern, channel, message); } }); RedisPubSubCommands
commands = connection.sync(); commands.psubscribe("__keyevent@0__:expired"); redisCommands.setex("name", 2, "throwable"); Thread.sleep(10000); redisConnection.close(); connection.close(); redisClient.shutdown(); } // pattern:__keyevent@0__:expired,count:1 // pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name 复制代码
实际上,在实现RedisPubSubListener的时候能够单独抽离,尽可能不要设计成匿名内部类的形式。
事务和批量命令执行
事务相关的命令就是WATCH、UNWATCH、EXEC、MULTI和DISCARD,在RedisCommands系列接口中有对应的方法。举个例子:
// 同步模式 @Test public void testSyncMulti() throws Exception { COMMAND.multi(); COMMAND.setex("name-1", 2, "throwable"); COMMAND.setex("name-2", 2, "doge"); TransactionResult result = COMMAND.exec(); int index = 0; for (Object r : result) { log.info("Result-{}:{}", index, r); index++; } } // Result-0:OK // Result-1:OK 复制代码
Redis的Pipeline也就是管道机制能够理解为把多个命令打包在一次请求发送到Redis服务端,而后Redis服务端把全部的响应结果打包好一次性返回,从而节省没必要要的网络资源(最主要是减小网络请求次数)。Redis对于Pipeline机制如何实现并无明确的规定,也没有提供特殊的命令支持Pipeline机制。Jedis中底层采用BIO(阻塞IO)通信,因此它的作法是客户端缓存将要发送的命令,最后须要触发而后同步发送一个巨大的命令列表包,再接收和解析一个巨大的响应列表包。Pipeline在Lettuce中对使用者是透明的,因为底层的通信框架是Netty,因此网络通信层面的优化Lettuce不须要过多干预,换言之能够这样理解:Netty帮Lettuce从底层实现了Redis的Pipeline机制。可是,Lettuce的异步API也提供了手动Flush的方法:
@Test public void testAsyncManualFlush() { // 取消自动flush ASYNC_COMMAND.setAutoFlushCommands(false); List
> redisFutures = Lists.newArrayList(); int count = 5000; for (int i = 0; i < count; i++) { String key = "key-" + (i + 1); String value = "value-" + (i + 1); redisFutures.add(ASYNC_COMMAND.set(key, value)); redisFutures.add(ASYNC_COMMAND.expire(key, 2)); } long start = System.currentTimeMillis(); ASYNC_COMMAND.flushCommands(); boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0])); Assertions.assertThat(result).isTrue(); log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start); } // Lettuce cost:1302 ms 复制代码
上面只是从文档看到的一些理论术语,可是现实是骨感的,对比了下Jedis的Pipeline提供的方法,发现了Jedis的Pipeline执行耗时比较低:
@Test public void testJedisPipeline() throws Exception { Jedis jedis = new Jedis(); Pipeline pipeline = jedis.pipelined(); int count = 5000; for (int i = 0; i < count; i++) { String key = "key-" + (i + 1); String value = "value-" + (i + 1); pipeline.set(key, value); pipeline.expire(key, 2); } long start = System.currentTimeMillis(); pipeline.syncAndReturnAll(); log.info("Jedis cost:{} ms", System.currentTimeMillis() - start); } // Jedis cost:9 ms 复制代码
我的猜想Lettuce可能底层并不是合并全部命令一次发送(甚至多是单条发送),具体可能须要抓包才能定位。依此来看,若是真的有大量执行Redis命令的场景,不妨可使用Jedis的Pipeline。
注意:由上面的测试推断RedisTemplate的executePipelined()方法是假的Pipeline执行方法,使用RedisTemplate的时候请务必注意这一点。
Lua脚本执行
Lettuce中执行Redis的Lua命令的同步接口以下:
public interface RedisScriptingCommands
{
T eval(String var1, ScriptOutputType var2, K... var3);
T eval(String var1, ScriptOutputType var2, K[] var3, V... var4);
T evalsha(String var1, ScriptOutputType var2, K... var3);
T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4); List
scriptExists(String... var1); String scriptFlush(); String scriptKill(); String scriptLoad(V var1); String digest(V var1); } 复制代码
异步和反应式的接口方法定义差很少,不一样的地方就是返回值类型,通常咱们经常使用的是eval()、evalsha()和scriptLoad()方法。举个简单的例子:
private static RedisCommands
COMMANDS; private static String RAW_LUA = "local key = KEYS[1]\n" + "local value = ARGV[1]\n" + "local timeout = ARGV[2]\n" + "redis.call('SETEX', key, tonumber(timeout), value)\n" + "local result = redis.call('GET', key)\n" + "return result;"; private static AtomicReference
LUA_SHA = new AtomicReference<>(); @Test public void testLua() throws Exception { LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA)); String[] keys = new String[]{"name"}; String[] args = new String[]{"throwable", "5000"}; String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args); log.info("Get value:{}", result); } // Get value:throwable 复制代码
高可用和分片
为了Redis的高可用,通常会采用普通主从(Master/Replica,这里笔者称为普通主从模式,也就是仅仅作了主从复制,故障须要手动切换)、哨兵和集群。普通主从模式能够独立运行,也能够配合哨兵运行,只是哨兵提供自动故障转移和主节点提高功能。普通主从和哨兵均可以使用MasterSlave,经过入参包括RedisClient、编码解码器以及一个或者多个RedisURI获取对应的Connection实例。
这里注意一点,MasterSlave中提供的方法若是只要求传入一个RedisURI实例,那么Lettuce会进行拓扑发现机制,自动获取Redis主从节点信息;若是要求传入一个RedisURI集合,那么对于普通主从模式来讲全部节点信息是静态的,不会进行发现和更新。
拓扑发现的规则以下:
- 对于普通主从(
Master/Replica)模式,不须要感知RedisURI指向从节点仍是主节点,只会进行一次性的拓扑查找全部节点信息,此后节点信息会保存在静态缓存中,不会更新。 - 对于哨兵模式,会订阅全部哨兵实例并侦听订阅/发布消息以触发拓扑刷新机制,更新缓存的节点信息,也就是哨兵自然就是动态发现节点信息,不支持静态配置。
拓扑发现机制的提供API为TopologyProvider,须要了解其原理的能够参考具体的实现。
对于集群(Cluster)模式,Lettuce提供了一套独立的API。
另外,若是Lettuce链接面向的是非单个Redis节点,链接实例提供了数据读取节点偏好(ReadFrom)设置,可选值有:
MASTER:只从Master节点中读取。MASTER_PREFERRED:优先从Master节点中读取。SLAVE_PREFERRED:优先从Slavor节点中读取。SLAVE:只从Slavor节点中读取。NEAREST:使用最近一次链接的Redis实例读取。
普通主从模式
假设如今有三个Redis服务造成树状主从关系以下:
- 节点一:localhost:6379,角色为Master。
- 节点二:localhost:6380,角色为Slavor,节点一的从节点。
- 节点三:localhost:6381,角色为Slavor,节点二的从节点。
首次动态节点发现主从模式的节点信息须要以下构建链接:
@Test public void testDynamicReplica() throws Exception { // 这里只须要配置一个节点的链接信息,不必定须要是主节点的信息,从节点也能够 RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build(); RedisClient redisClient = RedisClient.create(uri); StatefulRedisMasterSlaveConnection
connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri); // 只从从节点读取数据 connection.setReadFrom(ReadFrom.SLAVE); // 执行其余Redis命令 connection.close(); redisClient.shutdown(); } 复制代码
若是须要指定静态的Redis主从节点链接属性,那么能够这样构建链接:
@Test public void testStaticReplica() throws Exception { List
uris = new ArrayList<>(); RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build(); RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build(); RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build(); uris.add(uri1); uris.add(uri2); uris.add(uri3); RedisClient redisClient = RedisClient.create(); StatefulRedisMasterSlaveConnection
connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uris); // 只从主节点读取数据 connection.setReadFrom(ReadFrom.MASTER); // 执行其余Redis命令 connection.close(); redisClient.shutdown(); } 复制代码
哨兵模式
因为Lettuce自身提供了哨兵的拓扑发现机制,因此只须要随便配置一个哨兵节点的RedisURI实例便可:
@Test public void testDynamicSentinel() throws Exception { RedisURI redisUri = RedisURI.builder() .withPassword("你的密码") .withSentinel("localhost", 26379) .withSentinelMasterId("哨兵Master的ID") .build(); RedisClient redisClient = RedisClient.create(); StatefulRedisMasterSlaveConnection
connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri); // 只容许从从节点读取数据 connection.setReadFrom(ReadFrom.SLAVE); RedisCommands
command = connection.sync(); SetArgs setArgs = SetArgs.Builder.nx().ex(5); command.set("name", "throwable", setArgs); String value = command.get("name"); log.info("Get value:{}", value); } // Get value:throwable 复制代码
集群模式
鉴于笔者对Redis集群模式并不熟悉,Cluster模式下的API使用自己就有比较多的限制,因此这里只简单介绍一下怎么用。先说几个特性:
下面的API提供跨槽位(Slot)调用的功能:
RedisAdvancedClusterCommands。RedisAdvancedClusterAsyncCommands。RedisAdvancedClusterReactiveCommands。
静态节点选择功能:
masters:选择全部主节点执行命令。slaves:选择全部从节点执行命令,其实就是只读模式。all nodes:命令能够在全部节点执行。
集群拓扑视图动态更新功能:
- 手动更新,主动调用
RedisClusterClient#reloadPartitions()。 - 后台定时更新。
- 自适应更新,基于链接断开和
MOVED/ASK命令重定向自动更新。
Redis集群搭建详细过程能够参考官方文档,假设已经搭建好集群以下(192.168.56.200是笔者的虚拟机Host):
- 192.168.56.200:7001 => 主节点,槽位0-5460。
- 192.168.56.200:7002 => 主节点,槽位5461-10922。
- 192.168.56.200:7003 => 主节点,槽位10923-16383。
- 192.168.56.200:7004 => 7001的从节点。
- 192.168.56.200:7005 => 7002的从节点。
- 192.168.56.200:7006 => 7003的从节点。
简单的集群链接和使用方式以下:
@Test public void testSyncCluster(){ RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); StatefulRedisClusterConnection
connection = redisClusterClient.connect(); RedisAdvancedClusterCommands
commands = connection.sync(); commands.setex("name",10, "throwable"); String value = commands.get("name"); log.info("Get value:{}", value); } // Get value:throwable 复制代码
节点选择:
@Test public void testSyncNodeSelection() { RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); StatefulRedisClusterConnection
connection = redisClusterClient.connect(); RedisAdvancedClusterCommands
commands = connection.sync(); // commands.all(); // 全部节点 // commands.masters(); // 主节点 // 从节点只读 NodeSelection
replicas = commands.slaves(); NodeSelectionCommands
nodeSelectionCommands = replicas.commands(); // 这里只是演示,通常应该禁用keys *命令 Executions
> keys = nodeSelectionCommands.keys("*"); keys.forEach(key -> log.info("key: {}", key)); connection.close(); redisClusterClient.shutdown(); } 复制代码
定时更新集群拓扑视图(每隔十分钟更新一次,这个时间自行考量,不能太频繁):
@Test public void testPeriodicClusterTopology() throws Exception { RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions .builder() .enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES)) .build(); redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build()); StatefulRedisClusterConnection
connection = redisClusterClient.connect(); RedisAdvancedClusterCommands
commands = connection.sync(); commands.setex("name", 10, "throwable"); String value = commands.get("name"); log.info("Get value:{}", value); Thread.sleep(Integer.MAX_VALUE); connection.close(); redisClusterClient.shutdown(); } 复制代码
自适应更新集群拓扑视图:
@Test public void testAdaptiveClusterTopology() throws Exception { RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder() .enableAdaptiveRefreshTrigger( ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT, ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS ) .adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS)) .build(); redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build()); StatefulRedisClusterConnection
connection = redisClusterClient.connect(); RedisAdvancedClusterCommands
commands = connection.sync(); commands.setex("name", 10, "throwable"); String value = commands.get("name"); log.info("Get value:{}", value); Thread.sleep(Integer.MAX_VALUE); connection.close(); redisClusterClient.shutdown(); } 复制代码
动态命令和自定义命令
自定义命令是Redis命令有限集,不过能够更细粒度指定KEY、ARGV、命令类型、编码解码器和返回值类型,依赖于dispatch()方法:
// 自定义实现PING方法 @Test public void testCustomPing() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection
connect = redisClient.connect(); RedisCommands
sync = connect.sync(); RedisCodec
codec = StringCodec.UTF8; String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec)); log.info("PING:{}", result); connect.close(); redisClient.shutdown(); } // PING:PONG // 自定义实现Set方法 @Test public void testCustomSet() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection
connect = redisClient.connect(); RedisCommands
sync = connect.sync(); RedisCodec
codec = StringCodec.UTF8; sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec), new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable")); String result = sync.get("name"); log.info("Get value:{}", result); connect.close(); redisClient.shutdown(); } // Get value:throwable 复制代码
动态命令是基于Redis命令有限集,而且经过注解和动态代理完成一些复杂命令组合的实现。主要注解在io.lettuce.core.dynamic.annotation包路径下。简单举个例子:
public interface CustomCommand extends Commands { // SET [key] [value] @Command("SET ?0 ?1") String setKey(String key, String value); // SET [key] [value] @Command("SET :key :value") String setKeyNamed(@Param("key") String key, @Param("value") String value); // MGET [key1] [key2] @Command("MGET ?0 ?1") List
mGet(String key1, String key2); / * 方法名做为命令 */ @CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME) String mSet(String key1, String value1, String key2, String value2); } @Test public void testCustomDynamicSet() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection
connect = redisClient.connect(); RedisCommandFactory commandFactory = new RedisCommandFactory(connect); CustomCommand commands = commandFactory.getCommands(CustomCommand.class); commands.setKey("name", "throwable"); commands.setKeyNamed("throwable", "doge"); log.info("MGET ===> " + commands.mGet("name", "throwable")); commands.mSet("key1", "value1","key2", "value2"); log.info("MGET ===> " + commands.mGet("key1", "key2")); connect.close(); redisClient.shutdown(); } // MGET ===> [throwable, doge] // MGET ===> [value1, value2] 复制代码
高阶特性
Lettuce有不少高阶使用特性,这里只列举我的认为经常使用的两点:
- 配置客户端资源。
- 使用链接池。
更多其余特性能够自行参看官方文档。
配置客户端资源
客户端资源的设置与Lettuce的性能、并发和事件处理相关。线程池或者线程组相关配置占据客户端资源配置的大部分(EventLoopGroups和EventExecutorGroup),这些线程池或者线程组是链接程序的基础组件。通常状况下,客户端资源应该在多个Redis客户端之间共享,而且在再也不使用的时候须要自行关闭。笔者认为,客户端资源是面向Netty的。注意:除非特别熟悉或者花长时间去测试调整下面提到的参数,不然在没有经验的前提下凭直觉修改默认值,有可能会踩坑。
客户端资源接口是ClientResources,实现类是DefaultClientResources。
构建DefaultClientResources实例:
// 默认 ClientResources resources = DefaultClientResources.create(); // 建造器 ClientResources resources = DefaultClientResources.builder() .ioThreadPoolSize(4) .computationThreadPoolSize(4) .build() 复制代码
使用:
ClientResources resources = DefaultClientResources.create(); // 非集群 RedisClient client = RedisClient.create(resources, uri); // 集群 RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris); // ...... client.shutdown(); clusterClient.shutdown(); // 关闭资源 resources.shutdown(); 复制代码
客户端资源基本配置:
| 属性 | 描述 | 默认值 |
|---|---|---|
ioThreadPoolSize |
I/O线程数 |
Runtime.getRuntime().availableProcessors() |
computationThreadPoolSize |
任务线程数 | Runtime.getRuntime().availableProcessors() |
客户端资源高级配置:
| 属性 | 描述 | 默认值 |
|---|---|---|
eventLoopGroupProvider |
EventLoopGroup提供商 |
- |
eventExecutorGroupProvider |
EventExecutorGroup提供商 |
- |
eventBus |
事件总线 | DefaultEventBus |
commandLatencyCollectorOptions |
命令延时收集器配置 | DefaultCommandLatencyCollectorOptions |
commandLatencyCollector |
命令延时收集器 | DefaultCommandLatencyCollector |
commandLatencyPublisherOptions |
命令延时发布器配置 | DefaultEventPublisherOptions |
dnsResolver |
DNS处理器 |
JDK或者Netty提供 |
reconnectDelay |
重连延时配置 | Delay.exponential() |
nettyCustomizer |
Netty自定义配置器 |
- |
tracing |
轨迹记录器 | - |
非集群客户端RedisClient的属性配置:
Redis非集群客户端RedisClient自己提供了配置属性方法:
RedisClient client = RedisClient.create(uri); client.setOptions(ClientOptions.builder() .autoReconnect(false) .pingBeforeActivateConnection(true) .build()); 复制代码
非集群客户端的配置属性列表:
| 属性 | 描述 | 默认值 |
|---|---|---|
pingBeforeActivateConnection |
链接激活以前是否执行PING命令 |
false |
autoReconnect |
是否自动重连 | true |
cancelCommandsOnReconnectFailure |
重连失败是否拒绝命令执行 | false |
suspendReconnectOnProtocolFailure |
底层协议失败是否挂起重连操做 | false |
requestQueueSize |
请求队列容量 | (Integer#MAX_VALUE) |
disconnectedBehavior |
失去链接时候的行为 | DEFAULT |
sslOptions |
SSL配置 |
- |
socketOptions |
Socket配置 |
10 seconds Connection-Timeout, no keep-alive, no TCP noDelay |
timeoutOptions |
超时配置 | - |
publishOnScheduler |
发布反应式信号数据的调度器 | 使用I/O线程 |
集群客户端属性配置:
Redis集群客户端RedisClusterClient自己提供了配置属性方法:
RedisClusterClient client = RedisClusterClient.create(uri); ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES)) .enableAllAdaptiveRefreshTriggers() .build(); client.setOptions(ClusterClientOptions.builder() .topologyRefreshOptions(topologyRefreshOptions) .build()); 复制代码
集群客户端的配置属性列表:
| 属性 | 描述 | 默认值 |
|---|---|---|
enablePeriodicRefresh |
是否容许周期性更新集群拓扑视图 | false |
refreshPeriod |
更新集群拓扑视图周期 | 60秒 |
enableAdaptiveRefreshTrigger |
设置自适应更新集群拓扑视图触发器RefreshTrigger |
- |
adaptiveRefreshTriggersTimeout |
自适应更新集群拓扑视图触发器超时设置 | 30秒 |
refreshTriggersReconnectAttempts |
自适应更新集群拓扑视图触发重连次数 | 5 |
dynamicRefreshSources |
是否容许动态刷新拓扑资源 | true |
closeStaleConnections |
是否容许关闭陈旧的链接 | true |
maxRedirects |
集群重定向次数上限 | 5 |
validateClusterNodeMembership |
是否校验集群节点的成员关系 | true |
使用链接池
引入链接池依赖commons-pool2:
org.apache.commons
commons-pool2
2.7.0
基本使用以下:
@Test public void testUseConnectionPool() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); GenericObjectPool
> pool = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig); try (StatefulRedisConnection
connection = pool.borrowObject()) { RedisCommands
command = connection.sync(); SetArgs setArgs = SetArgs.Builder.nx().ex(5); command.set("name", "throwable", setArgs); String n = command.get("name"); log.info("Get value:{}", n); } pool.close(); redisClient.shutdown(); } 复制代码
其中,同步链接的池化支持须要用ConnectionPoolSupport,异步链接的池化支持须要用AsyncConnectionPoolSupport(Lettuce5.1以后才支持)。
几个常见的渐进式删除例子
渐进式删除Hash中的域-属性:
@Test public void testDelBigHashKey() throws Exception { // SCAN参数 ScanArgs scanArgs = ScanArgs.Builder.limit(2); // TEMP游标 ScanCursor cursor = ScanCursor.INITIAL; // 目标KEY String key = "BIG_HASH_KEY"; prepareHashTestData(key); log.info("开始渐进式删除Hash的元素..."); int counter = 0; do { MapScanCursor
result = COMMAND.hscan(key, cursor, scanArgs); // 重置TEMP游标 cursor = ScanCursor.of(result.getCursor()); cursor.setFinished(result.isFinished()); Collection
fields = result.getMap().values(); if (!fields.isEmpty()) { COMMAND.hdel(key, fields.toArray(new String[0])); } counter++; } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished())); log.info("渐进式删除Hash的元素完毕,迭代次数:{} ...", counter); } private void prepareHashTestData(String key) throws Exception { COMMAND.hset(key, "1", "1"); COMMAND.hset(key, "2", "2"); COMMAND.hset(key, "3", "3"); COMMAND.hset(key, "4", "4"); COMMAND.hset(key, "5", "5"); } 复制代码
渐进式删除集合中的元素:
@Test public void testDelBigSetKey() throws Exception { String key = "BIG_SET_KEY"; prepareSetTestData(key); // SCAN参数 ScanArgs scanArgs = ScanArgs.Builder.limit(2); // TEMP游标 ScanCursor cursor = ScanCursor.INITIAL; log.info("开始渐进式删除Set的元素..."); int counter = 0; do { ValueScanCursor
result = COMMAND.sscan(key, cursor, scanArgs); // 重置TEMP游标 cursor = ScanCursor.of(result.getCursor()); cursor.setFinished(result.isFinished()); List
values = result.getValues(); if (!values.isEmpty()) { COMMAND.srem(key, values.toArray(new String[0])); } counter++; } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished())); log.info("渐进式删除Set的元素完毕,迭代次数:{} ...", counter); } private void prepareSetTestData(String key) throws Exception { COMMAND.sadd(key, "1", "2", "3", "4", "5"); } 复制代码
渐进式删除有序集合中的元素:
@Test public void testDelBigZSetKey() throws Exception { // SCAN参数 ScanArgs scanArgs = ScanArgs.Builder.limit(2); // TEMP游标 ScanCursor cursor = ScanCursor.INITIAL; // 目标KEY String key = "BIG_ZSET_KEY"; prepareZSetTestData(key); log.info("开始渐进式删除ZSet的元素..."); int counter = 0; do { ScoredValueScanCursor
result = COMMAND.zscan(key, cursor, scanArgs); // 重置TEMP游标 cursor = ScanCursor.of(result.getCursor()); cursor.setFinished(result.isFinished()); List
> scoredValues = result.getValues(); if (!scoredValues.isEmpty()) { COMMAND.zrem(key, scoredValues.stream().map(ScoredValue
::getValue).toArray(String[]::new)); } counter++; } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished())); log.info("渐进式删除ZSet的元素完毕,迭代次数:{} ...", counter); } private void prepareZSetTestData(String key) throws Exception { COMMAND.zadd(key, 0, "1"); COMMAND.zadd(key, 0, "2"); COMMAND.zadd(key, 0, "3"); COMMAND.zadd(key, 0, "4"); COMMAND.zadd(key, 0, "5"); } 复制代码
在SpringBoot中使用Lettuce
我的认为,spring-data-redis中的API封装并非很优秀,用起来比较重,不够灵活,这里结合前面的例子和代码,在SpringBoot脚手架项目中配置和整合Lettuce。先引入依赖:
org.springframework.boot
spring-boot-dependencies
2.1.8.RELEASE
pom
import
org.springframework.boot
spring-boot-starter-web
io.lettuce
lettuce-core
5.1.8.RELEASE
org.projectlombok
lombok
1.18.10
provided
复制代码
通常状况下,每一个应用应该使用单个Redis客户端实例和单个链接实例,这里设计一个脚手架,适配单机、普通主从、哨兵和集群四种使用场景。对于客户端资源,采用默认的实现便可。对于Redis的链接属性,比较主要的有Host、Port和Password,其余能够暂时忽略。基于约定大于配置的原则,先定制一系列属性配置类(其实有些配置是能够彻底共用,可是考虑到要清晰描述类之间的关系,这里拆分多个配置属性类和多个配置方法):
@Data @ConfigurationProperties(prefix = "lettuce") public class LettuceProperties { private LettuceSingleProperties single; private LettuceReplicaProperties replica; private LettuceSentinelProperties sentinel; private LettuceClusterProperties cluster; } @Data public class LettuceSingleProperties { private String host; private Integer port; private String password; } @EqualsAndHashCode(callSuper = true) @Data public class LettuceReplicaProperties extends LettuceSingleProperties { } @EqualsAndHashCode(callSuper = true) @Data public class LettuceSentinelProperties extends LettuceSingleProperties { private String masterId; } @EqualsAndHashCode(callSuper = true) @Data public class LettuceClusterProperties extends LettuceSingleProperties { } 复制代码
配置类以下,主要使用@ConditionalOnProperty作隔离,通常状况下,不多有人会在一个应用使用一种以上的Redis链接场景:
@RequiredArgsConstructor @Configuration @ConditionalOnClass(name = "io.lettuce.core.RedisURI") @EnableConfigurationProperties(value = LettuceProperties.class) public class LettuceAutoConfiguration { private final LettuceProperties lettuceProperties; @Bean(destroyMethod = "shutdown") public ClientResources clientResources() { return DefaultClientResources.create(); } @Bean @ConditionalOnProperty(name = "lettuce.single.host") public RedisURI singleRedisUri() { LettuceSingleProperties singleProperties = lettuceProperties.getSingle(); return RedisURI.builder() .withHost(singleProperties.getHost()) .withPort(singleProperties.getPort()) .withPassword(singleProperties.getPassword()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.single.host") public RedisClient singleRedisClient(ClientResources clientResources, @Qualifier("singleRedisUri") RedisURI redisUri) { return RedisClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.single.host") public StatefulRedisConnection
singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) { return singleRedisClient.connect(); } @Bean @ConditionalOnProperty(name = "lettuce.replica.host") public RedisURI replicaRedisUri() { LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica(); return RedisURI.builder() .withHost(replicaProperties.getHost()) .withPort(replicaProperties.getPort()) .withPassword(replicaProperties.getPassword()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.replica.host") public RedisClient replicaRedisClient(ClientResources clientResources, @Qualifier("replicaRedisUri") RedisURI redisUri) { return RedisClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.replica.host") public StatefulRedisMasterSlaveConnection
replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient, @Qualifier("replicaRedisUri") RedisURI redisUri) { return MasterSlave.connect(replicaRedisClient, new Utf8StringCodec(), redisUri); } @Bean @ConditionalOnProperty(name = "lettuce.sentinel.host") public RedisURI sentinelRedisUri() { LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel(); return RedisURI.builder() .withPassword(sentinelProperties.getPassword()) .withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort()) .withSentinelMasterId(sentinelProperties.getMasterId()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.sentinel.host") public RedisClient sentinelRedisClient(ClientResources clientResources, @Qualifier("sentinelRedisUri") RedisURI redisUri) { return RedisClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.sentinel.host") public StatefulRedisMasterSlaveConnection
sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient, @Qualifier("sentinelRedisUri") RedisURI redisUri) { return MasterSlave.connect(sentinelRedisClient, new Utf8StringCodec(), redisUri); } @Bean @ConditionalOnProperty(name = "lettuce.cluster.host") public RedisURI clusterRedisUri() { LettuceClusterProperties clusterProperties = lettuceProperties.getCluster(); return RedisURI.builder() .withHost(clusterProperties.getHost()) .withPort(clusterProperties.getPort()) .withPassword(clusterProperties.getPassword()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.cluster.host") public RedisClusterClient redisClusterClient(ClientResources clientResources, @Qualifier("clusterRedisUri") RedisURI redisUri) { return RedisClusterClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.cluster") public StatefulRedisClusterConnection
clusterConnection(RedisClusterClient clusterClient) { return clusterClient.connect(); } } 复制代码
最后为了让IDE识别咱们的配置,能够添加IDE亲缘性,/META-INF文件夹下新增一个文件spring-configuration-metadata.json,内容以下:
{ "properties": [ { "name": "lettuce.single", "type": "club.throwable.spring.lettuce.LettuceSingleProperties", "description": "单机配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" }, { "name": "lettuce.replica", "type": "club.throwable.spring.lettuce.LettuceReplicaProperties", "description": "主从配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" }, { "name": "lettuce.sentinel", "type": "club.throwable.spring.lettuce.LettuceSentinelProperties", "description": "哨兵配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" }, { "name": "lettuce.single", "type": "club.throwable.spring.lettuce.LettuceClusterProperties", "description": "集群配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" } ] } 复制代码
若是想IDE亲缘性作得更好,能够添加/META-INF/additional-spring-configuration-metadata.json进行更多细节定义。简单使用以下:
@Slf4j @Component public class RedisCommandLineRunner implements CommandLineRunner { @Autowired @Qualifier("singleRedisConnection") private StatefulRedisConnection
connection; @Override public void run(String... args) throws Exception { RedisCommands
redisCommands = connection.sync(); redisCommands.setex("name", 5, "throwable"); log.info("Get value:{}", redisCommands.get("name")); } } // Get value:throwable 复制代码
小结
本文算是基于Lettuce的官方文档,对它的使用进行全方位的分析,包括主要功能、配置都作了一些示例,限于篇幅部分特性和配置细节没有分析。Lettuce已经被spring-data-redis接纳做为官方的Redis客户端驱动,因此值得信赖,它的一些API设计确实比较合理,扩展性高的同时灵活性也高。我的建议,基于Lettuce包自行添加配置到SpringBoot应用用起来会驾轻就熟,毕竟RedisTemplate实在太笨重,并且还屏蔽了Lettuce一些高级特性和灵活的API。
参考资料:
- Lettuce Reference Guide
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/211160.html原文链接:https://javaforall.net
