Redis高级客户端Lettuce详解

Redis高级客户端Lettuce详解前提 Lettuce 是一个 Redis 的 Java 驱动包 初识她的时候是使用 RedisTemplat 的时候遇到点问题 Debug 到底层的一些源码 发现 spring data redis 的驱动包在某个版本以后替换为 Lettuce Lettuce 翻译为生菜 没错 就是吃的那种生菜 因此它的 Logo 长这样 html 既然能被 Spring 生态所承认 Lettuce 想必有过人之处 因而笔者花时间阅读她的官方文档 整理测试示例 写下这篇文章 编写本文时所使用的版本为 Lettuce5 1 8 RE

前提

Lettuce是一个RedisJava驱动包,初识她的时候是使用RedisTemplate的时候遇到点问题Debug到底层的一些源码,发现spring-data-redis的驱动包在某个版本以后替换为LettuceLettuce翻译为生菜,没错,就是吃的那种生菜,因此它的Logo长这样:html

 

Redis高级客户端Lettuce详解

 

 

既然能被Spring生态所承认,Lettuce想必有过人之处,因而笔者花时间阅读她的官方文档,整理测试示例,写下这篇文章。编写本文时所使用的版本为Lettuce 5.1.8.RELEASESpringBoot 2.1.8.RELEASEJDK [8,11]超长警告:这篇文章断断续续花了两周完成,超过4万字…..java

Lettuce简介

Lettuce是一个高性能基于Java编写的Redis驱动框架,底层集成了Project Reactor提供自然的反应式编程,通讯框架集成了Netty使用了非阻塞IO5.x版本以后融合了JDK1.8的异步编程特性,在保证高性能的同时提供了十分丰富易用的API5.1版本的新特性以下:node

  • 支持Redis的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX
  • 支持经过Brave模块跟踪Redis命令执行。
  • 支持Redis Streams
  • 支持异步的主从链接。
  • 支持异步链接池。
  • 新增命令最多执行一次模式(禁止自动重连)。
  • 全局命令超时设置(对异步和反应式命令也有效)。
  • ……等等

注意一点Redis的版本至少须要2.6,固然越高越好,API的兼容性比较强大。react

只须要引入单个依赖就能够开始愉快地使用Lettuceweb

  • Maven
 
    
    
      io.lettuce 
     
    
      lettuce-core 
     
    
      5.1.8.RELEASE 
     
    复制代码
  • Gradle
dependencies { compile 'io.lettuce:lettuce-core:5.1.8.RELEASE' } 复制代码

链接Redis

单机、哨兵、集群模式下链接Redis须要一个统一的标准去表示链接的细节信息,在Lettuce中这个统一的标准是RedisURI。能够经过三种方式构造一个RedisURI实例:redis

  • 定制的字符串URI语法:
RedisURI uri = RedisURI.create("redis://localhost/"); 复制代码
  • 使用建造器(RedisURI.Builder):
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build(); 复制代码
  • 直接经过构造函数实例化:
RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS); 复制代码

定制的链接URI语法

  • 单机(前缀为redis://
格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]] 完整:redis://mypassword@127.0.0.1:6379/0?timeout=10s 简单:redis://localhost 复制代码
  • 单机而且使用SSL(前缀为rediss://) <== 注意后面多了个s
格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]] 完整:rediss://mypassword@127.0.0.1:6379/0?timeout=10s 简单:rediss://localhost 复制代码
  • 单机Unix Domain Sockets模式(前缀为redis-socket://
格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]] 完整:redis-socket:///tmp/redis?timeout=10s&_database=0 复制代码
  • 哨兵(前缀为redis-sentinel://
格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId 完整:redis-sentinel://mypassword@127.0.0.1:6379,127.0.0.1:6380/0?timeout=10s#mymaster 复制代码

超时时间单位:spring

  • d 天
  • h 小时
  • m 分钟
  • s 秒钟
  • ms 毫秒
  • us 微秒
  • ns 纳秒

我的建议使用RedisURI提供的建造器,毕竟定制的URI虽然简洁,可是比较容易出现人为错误。鉴于笔者没有SSLUnix Domain Socket的使用场景,下面不对这两种链接方式进行列举。shell

基本使用

Lettuce使用的时候依赖于四个主要组件:apache

  • RedisURI:链接信息。
  • RedisClientRedis客户端,特殊地,集群链接有一个定制的RedisClusterClient
  • ConnectionRedis链接,主要是StatefulConnection或者StatefulRedisConnection的子类,链接的类型主要由链接的具体方式(单机、哨兵、集群、订阅发布等等)选定,比较重要。
  • RedisCommandsRedis命令API接口,基本上覆盖了Redis发行版本的全部命令,提供了同步(sync)、异步(async)、反应式(reative)的调用方式,对于使用者而言,会常常跟RedisCommands系列接口打交道。

一个基本使用例子以下:编程

@Test public void testSetGet() throws Exception { RedisURI redisUri = RedisURI.builder() // <1> 建立单机链接的链接信息 .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); // <2> 建立客户端 StatefulRedisConnection 
   
     connection = redisClient.connect(); // <3> 建立线程安全的链接 RedisCommands 
    
      redisCommands = connection.sync(); // <4> 建立同步命令 SetArgs setArgs = SetArgs.Builder.nx().ex(5); String result = redisCommands.set("name", "throwable", setArgs); Assertions.assertThat(result).isEqualToIgnoringCase("OK"); result = redisCommands.get("name"); Assertions.assertThat(result).isEqualTo("throwable"); // ... 其余操做 connection.close(); // <5> 关闭链接 redisClient.shutdown(); // <6> 关闭客户端 } 复制代码 
     
   

注意:

  • <5>:关闭链接通常在应用程序中止以前操做,一个应用程序中的一个Redis驱动实例不须要太多的链接(通常状况下只须要一个链接实例就能够,若是有多个链接的须要能够考虑使用链接池,其实Redis目前处理命令的模块是单线程,在客户端多个链接多线程调用理论上没有效果)。
  • <6>:关闭客户端通常应用程序中止以前操做,若是条件容许的话,基于后开先闭原则,客户端关闭应该在链接关闭以后操做。

API

Lettuce主要提供三种API

  • 同步(sync):RedisCommands
  • 异步(async):RedisAsyncCommands
  • 反应式(reactive):RedisReactiveCommands

先准备好一个单机Redis链接备用:

private static StatefulRedisConnection 
   
     CONNECTION; private static RedisClient CLIENT; @BeforeClass public static void beforeClass() { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); CLIENT = RedisClient.create(redisUri); CONNECTION = CLIENT.connect(); } @AfterClass public static void afterClass() throws Exception { CONNECTION.close(); CLIENT.shutdown(); } 复制代码 
   

Redis命令API的具体实现能够直接从StatefulRedisConnection实例获取,见其接口定义:

public interface StatefulRedisConnection 
   
     extends StatefulConnection 
    
      { boolean isMulti(); RedisCommands 
     
       sync(); RedisAsyncCommands 
      
        async(); RedisReactiveCommands 
       
         reactive(); } 复制代码 
        
       
      
     
   

值得注意的是,在不指定编码解码器RedisCodec的前提下,RedisClient建立的StatefulRedisConnection实例通常是泛型实例StatefulRedisConnection
,也就是全部命令APIKEYVALUE都是String类型,这种使用方式能知足大部分的使用场景。固然,必要的时候能够定制编码解码器RedisCodec

同步API

先构建RedisCommands实例:

private static RedisCommands 
   
     COMMAND; @BeforeClass public static void beforeClass() { COMMAND = CONNECTION.sync(); } 复制代码 
   

基本使用:

@Test public void testSyncPing() throws Exception { String pong = COMMAND.ping(); Assertions.assertThat(pong).isEqualToIgnoringCase("PONG"); } @Test public void testSyncSetAndGet() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); COMMAND.set("name", "throwable", setArgs); String value = COMMAND.get("name"); log.info("Get value: {}", value); } // Get value: throwable 复制代码

同步API在全部命令调用以后会当即返回结果。若是熟悉Jedis的话,RedisCommands的用法其实和它相差不大。

异步API

先构建RedisAsyncCommands实例:

private static RedisAsyncCommands 
   
     ASYNC_COMMAND; @BeforeClass public static void beforeClass() { ASYNC_COMMAND = CONNECTION.async(); } 复制代码 
   

基本使用:

@Test public void testAsyncPing() throws Exception { RedisFuture 
   
     redisFuture = ASYNC_COMMAND.ping(); log.info("Ping result:{}", redisFuture.get()); } // Ping result:PONG 复制代码 
   

RedisAsyncCommands全部方法执行返回结果都是RedisFuture实例,而RedisFuture接口的定义以下:

public interface RedisFuture 
   
     extends CompletionStage 
    
      , Future 
     
       { String getError(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; } 复制代码 
      
     
   

也就是,RedisFuture能够无缝使用Future或者JDK1.8中引入的CompletableFuture提供的方法。举个例子:

@Test public void testAsyncSetAndGet1() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); RedisFuture 
   
     future = ASYNC_COMMAND.set("name", "throwable", setArgs); // CompletableFuture#thenAccept() future.thenAccept(value -> log.info("Set命令返回:{}", value)); // Future#get() future.get(); } // Set命令返回:OK @Test public void testAsyncSetAndGet2() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); CompletableFuture 
    
      result = (CompletableFuture 
     
       ) ASYNC_COMMAND.set("name", "throwable", setArgs) .thenAcceptBoth(ASYNC_COMMAND.get("name"), (s, g) -> { log.info("Set命令返回:{}", s); log.info("Get命令返回:{}", g); }); result.get(); } // Set命令返回:OK // Get命令返回:throwable 复制代码 
      
     
   

若是能熟练使用CompletableFuture和函数式编程技巧,能够组合多个RedisFuture完成一些列复杂的操做。

反应式API

Lettuce引入的反应式编程框架是Project Reactor,若是没有反应式编程经验能够先自行了解一下Project Reactor

构建RedisReactiveCommands实例:

private static RedisReactiveCommands 
   
     REACTIVE_COMMAND; @BeforeClass public static void beforeClass() { REACTIVE_COMMAND = CONNECTION.reactive(); } 复制代码 
   

根据Project ReactorRedisReactiveCommands的方法若是返回的结果只包含0或1个元素,那么返回值类型是Mono,若是返回的结果包含0到N(N大于0)个元素,那么返回值是Flux。举个例子:

@Test public void testReactivePing() throws Exception { Mono 
   
     ping = REACTIVE_COMMAND.ping(); ping.subscribe(v -> log.info("Ping result:{}", v)); Thread.sleep(1000); } // Ping result:PONG @Test public void testReactiveSetAndGet() throws Exception { SetArgs setArgs = SetArgs.Builder.nx().ex(5); REACTIVE_COMMAND.set("name", "throwable", setArgs).block(); REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value)); Thread.sleep(1000); } // Get命令返回:throwable @Test public void testReactiveSet() throws Exception { REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block(); Flux 
    
      flux = REACTIVE_COMMAND.smembers("food"); flux.subscribe(log::info); REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block(); Thread.sleep(1000); } // meat // bread // fish 复制代码 
     
   

举个更加复杂的例子,包含了事务、函数转换等:

@Test public void testReactiveFunctional() throws Exception { REACTIVE_COMMAND.multi().doOnSuccess(r -> { REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe(); REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe(); }).flatMap(s -> REACTIVE_COMMAND.exec()) .doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded())) .subscribe(); Thread.sleep(1000); } // OK // 2 // Discarded:false 复制代码

这个方法开启一个事务,先把counter设置为1,再将counter自增1。

发布和订阅

非集群模式下的发布订阅依赖于定制的链接StatefulRedisPubSubConnection,集群模式下的发布订阅依赖于定制的链接StatefulRedisClusterPubSubConnection,二者分别来源于RedisClient#connectPubSub()系列方法和RedisClusterClient#connectPubSub()

  • 非集群模式:
// 多是单机、普通主从、哨兵等非集群模式的客户端 RedisClient client = ... StatefulRedisPubSubConnection 
   
     connection = client.connectPubSub(); connection.addListener(new RedisPubSubListener 
    
      () { ... }); // 同步命令 RedisPubSubCommands 
     
       sync = connection.sync(); sync.subscribe("channel"); // 异步命令 RedisPubSubAsyncCommands 
      
        async = connection.async(); RedisFuture 
       
         future = async.subscribe("channel"); // 反应式命令 RedisPubSubReactiveCommands 
        
          reactive = connection.reactive(); reactive.subscribe("channel").subscribe(); reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe() 复制代码 
         
        
       
      
     
   
  • 集群模式:
// 使用方式其实和非集群模式基本一致 RedisClusterClient clusterClient = ... StatefulRedisClusterPubSubConnection 
   
     connection = clusterClient.connectPubSub(); connection.addListener(new RedisPubSubListener 
    
      () { ... }); RedisPubSubCommands 
     
       sync = connection.sync(); sync.subscribe("channel"); // ... 复制代码 
      
     
   

这里用单机同步命令的模式举一个Redis键空间通知(Redis Keyspace Notifications)的例子:

@Test public void testSyncKeyspaceNotification() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) // 注意这里只能是0号库 .withDatabase(0) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection 
   
     redisConnection = redisClient.connect(); RedisCommands 
    
      redisCommands = redisConnection.sync(); // 只接收键过时的事件 redisCommands.configSet("notify-keyspace-events", "Ex"); StatefulRedisPubSubConnection 
     
       connection = redisClient.connectPubSub(); connection.addListener(new RedisPubSubAdapter<>() { @Override public void psubscribed(String pattern, long count) { log.info("pattern:{},count:{}", pattern, count); } @Override public void message(String pattern, String channel, String message) { log.info("pattern:{},channel:{},message:{}", pattern, channel, message); } }); RedisPubSubCommands 
      
        commands = connection.sync(); commands.psubscribe("__keyevent@0__:expired"); redisCommands.setex("name", 2, "throwable"); Thread.sleep(10000); redisConnection.close(); connection.close(); redisClient.shutdown(); } // pattern:__keyevent@0__:expired,count:1 // pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name 复制代码 
       
      
     
   

实际上,在实现RedisPubSubListener的时候能够单独抽离,尽可能不要设计成匿名内部类的形式。

事务和批量命令执行

事务相关的命令就是WATCHUNWATCHEXECMULTIDISCARD,在RedisCommands系列接口中有对应的方法。举个例子:

// 同步模式 @Test public void testSyncMulti() throws Exception { COMMAND.multi(); COMMAND.setex("name-1", 2, "throwable"); COMMAND.setex("name-2", 2, "doge"); TransactionResult result = COMMAND.exec(); int index = 0; for (Object r : result) { log.info("Result-{}:{}", index, r); index++; } } // Result-0:OK // Result-1:OK 复制代码

RedisPipeline也就是管道机制能够理解为把多个命令打包在一次请求发送到Redis服务端,而后Redis服务端把全部的响应结果打包好一次性返回,从而节省没必要要的网络资源(最主要是减小网络请求次数)。Redis对于Pipeline机制如何实现并无明确的规定,也没有提供特殊的命令支持Pipeline机制。Jedis中底层采用BIO(阻塞IO)通信,因此它的作法是客户端缓存将要发送的命令,最后须要触发而后同步发送一个巨大的命令列表包,再接收和解析一个巨大的响应列表包。PipelineLettuce中对使用者是透明的,因为底层的通信框架是Netty,因此网络通信层面的优化Lettuce不须要过多干预,换言之能够这样理解:NettyLettuce从底层实现了RedisPipeline机制。可是,Lettuce的异步API也提供了手动Flush的方法:

@Test public void testAsyncManualFlush() { // 取消自动flush ASYNC_COMMAND.setAutoFlushCommands(false); List 
   
     > redisFutures = Lists.newArrayList(); int count = 5000; for (int i = 0; i < count; i++) { String key = "key-" + (i + 1); String value = "value-" + (i + 1); redisFutures.add(ASYNC_COMMAND.set(key, value)); redisFutures.add(ASYNC_COMMAND.expire(key, 2)); } long start = System.currentTimeMillis(); ASYNC_COMMAND.flushCommands(); boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0])); Assertions.assertThat(result).isTrue(); log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start); } // Lettuce cost:1302 ms 复制代码 
   

上面只是从文档看到的一些理论术语,可是现实是骨感的,对比了下JedisPipeline提供的方法,发现了JedisPipeline执行耗时比较低:

@Test public void testJedisPipeline() throws Exception { Jedis jedis = new Jedis(); Pipeline pipeline = jedis.pipelined(); int count = 5000; for (int i = 0; i < count; i++) { String key = "key-" + (i + 1); String value = "value-" + (i + 1); pipeline.set(key, value); pipeline.expire(key, 2); } long start = System.currentTimeMillis(); pipeline.syncAndReturnAll(); log.info("Jedis cost:{} ms", System.currentTimeMillis() - start); } // Jedis cost:9 ms 复制代码

我的猜想Lettuce可能底层并不是合并全部命令一次发送(甚至多是单条发送),具体可能须要抓包才能定位。依此来看,若是真的有大量执行Redis命令的场景,不妨可使用JedisPipeline

注意:由上面的测试推断RedisTemplateexecutePipelined()方法是假的Pipeline执行方法,使用RedisTemplate的时候请务必注意这一点。

Lua脚本执行

Lettuce中执行RedisLua命令的同步接口以下:

public interface RedisScriptingCommands 
   
     { 
    
      T eval(String var1, ScriptOutputType var2, K... var3); 
     
       T eval(String var1, ScriptOutputType var2, K[] var3, V... var4); 
      
        T evalsha(String var1, ScriptOutputType var2, K... var3); 
       
         T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4); List 
        
          scriptExists(String... var1); String scriptFlush(); String scriptKill(); String scriptLoad(V var1); String digest(V var1); } 复制代码 
         
        
       
      
     
   

异步和反应式的接口方法定义差很少,不一样的地方就是返回值类型,通常咱们经常使用的是eval()evalsha()scriptLoad()方法。举个简单的例子:

private static RedisCommands 
   
     COMMANDS; private static String RAW_LUA = "local key = KEYS[1]\n" + "local value = ARGV[1]\n" + "local timeout = ARGV[2]\n" + "redis.call('SETEX', key, tonumber(timeout), value)\n" + "local result = redis.call('GET', key)\n" + "return result;"; private static AtomicReference 
    
      LUA_SHA = new AtomicReference<>(); @Test public void testLua() throws Exception { LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA)); String[] keys = new String[]{"name"}; String[] args = new String[]{"throwable", "5000"}; String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args); log.info("Get value:{}", result); } // Get value:throwable 复制代码 
     
   

高可用和分片

为了Redis的高可用,通常会采用普通主从(Master/Replica,这里笔者称为普通主从模式,也就是仅仅作了主从复制,故障须要手动切换)、哨兵和集群。普通主从模式能够独立运行,也能够配合哨兵运行,只是哨兵提供自动故障转移和主节点提高功能。普通主从和哨兵均可以使用MasterSlave,经过入参包括RedisClient、编码解码器以及一个或者多个RedisURI获取对应的Connection实例。

这里注意一点MasterSlave中提供的方法若是只要求传入一个RedisURI实例,那么Lettuce会进行拓扑发现机制,自动获取Redis主从节点信息;若是要求传入一个RedisURI集合,那么对于普通主从模式来讲全部节点信息是静态的,不会进行发现和更新。

拓扑发现的规则以下:

  • 对于普通主从(Master/Replica)模式,不须要感知RedisURI指向从节点仍是主节点,只会进行一次性的拓扑查找全部节点信息,此后节点信息会保存在静态缓存中,不会更新。
  • 对于哨兵模式,会订阅全部哨兵实例并侦听订阅/发布消息以触发拓扑刷新机制,更新缓存的节点信息,也就是哨兵自然就是动态发现节点信息,不支持静态配置。

拓扑发现机制的提供APITopologyProvider,须要了解其原理的能够参考具体的实现。

对于集群(Cluster)模式,Lettuce提供了一套独立的API

另外,若是Lettuce链接面向的是非单个Redis节点,链接实例提供了数据读取节点偏好ReadFrom)设置,可选值有:

  • MASTER:只从Master节点中读取。
  • MASTER_PREFERRED:优先从Master节点中读取。
  • SLAVE_PREFERRED:优先从Slavor节点中读取。
  • SLAVE:只从Slavor节点中读取。
  • NEAREST:使用最近一次链接的Redis实例读取。

普通主从模式

假设如今有三个Redis服务造成树状主从关系以下:

  • 节点一:localhost:6379,角色为Master。
  • 节点二:localhost:6380,角色为Slavor,节点一的从节点。
  • 节点三:localhost:6381,角色为Slavor,节点二的从节点。

首次动态节点发现主从模式的节点信息须要以下构建链接:

@Test public void testDynamicReplica() throws Exception { // 这里只须要配置一个节点的链接信息,不必定须要是主节点的信息,从节点也能够 RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build(); RedisClient redisClient = RedisClient.create(uri); StatefulRedisMasterSlaveConnection 
   
     connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri); // 只从从节点读取数据 connection.setReadFrom(ReadFrom.SLAVE); // 执行其余Redis命令 connection.close(); redisClient.shutdown(); } 复制代码 
   

若是须要指定静态的Redis主从节点链接属性,那么能够这样构建链接:

@Test public void testStaticReplica() throws Exception { List 
   
     uris = new ArrayList<>(); RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build(); RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build(); RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build(); uris.add(uri1); uris.add(uri2); uris.add(uri3); RedisClient redisClient = RedisClient.create(); StatefulRedisMasterSlaveConnection 
    
      connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uris); // 只从主节点读取数据 connection.setReadFrom(ReadFrom.MASTER); // 执行其余Redis命令 connection.close(); redisClient.shutdown(); } 复制代码 
     
   

哨兵模式

因为Lettuce自身提供了哨兵的拓扑发现机制,因此只须要随便配置一个哨兵节点的RedisURI实例便可:

@Test public void testDynamicSentinel() throws Exception { RedisURI redisUri = RedisURI.builder() .withPassword("你的密码") .withSentinel("localhost", 26379) .withSentinelMasterId("哨兵Master的ID") .build(); RedisClient redisClient = RedisClient.create(); StatefulRedisMasterSlaveConnection 
   
     connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri); // 只容许从从节点读取数据 connection.setReadFrom(ReadFrom.SLAVE); RedisCommands 
    
      command = connection.sync(); SetArgs setArgs = SetArgs.Builder.nx().ex(5); command.set("name", "throwable", setArgs); String value = command.get("name"); log.info("Get value:{}", value); } // Get value:throwable 复制代码 
     
   

集群模式

鉴于笔者对Redis集群模式并不熟悉,Cluster模式下的API使用自己就有比较多的限制,因此这里只简单介绍一下怎么用。先说几个特性:

下面的API提供跨槽位(Slot)调用的功能

  • RedisAdvancedClusterCommands
  • RedisAdvancedClusterAsyncCommands
  • RedisAdvancedClusterReactiveCommands

静态节点选择功能:

  • masters:选择全部主节点执行命令。
  • slaves:选择全部从节点执行命令,其实就是只读模式。
  • all nodes:命令能够在全部节点执行。

集群拓扑视图动态更新功能:

  • 手动更新,主动调用RedisClusterClient#reloadPartitions()
  • 后台定时更新。
  • 自适应更新,基于链接断开和MOVED/ASK命令重定向自动更新。

Redis集群搭建详细过程能够参考官方文档,假设已经搭建好集群以下(192.168.56.200是笔者的虚拟机Host):

  • 192.168.56.200:7001 => 主节点,槽位0-5460。
  • 192.168.56.200:7002 => 主节点,槽位5461-10922。
  • 192.168.56.200:7003 => 主节点,槽位10923-16383。
  • 192.168.56.200:7004 => 7001的从节点。
  • 192.168.56.200:7005 => 7002的从节点。
  • 192.168.56.200:7006 => 7003的从节点。

简单的集群链接和使用方式以下:

@Test public void testSyncCluster(){ RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); StatefulRedisClusterConnection 
   
     connection = redisClusterClient.connect(); RedisAdvancedClusterCommands 
    
      commands = connection.sync(); commands.setex("name",10, "throwable"); String value = commands.get("name"); log.info("Get value:{}", value); } // Get value:throwable 复制代码 
     
   

节点选择:

@Test public void testSyncNodeSelection() { RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); StatefulRedisClusterConnection 
   
     connection = redisClusterClient.connect(); RedisAdvancedClusterCommands 
    
      commands = connection.sync(); // commands.all(); // 全部节点 // commands.masters(); // 主节点 // 从节点只读 NodeSelection 
     
       replicas = commands.slaves(); NodeSelectionCommands 
      
        nodeSelectionCommands = replicas.commands(); // 这里只是演示,通常应该禁用keys *命令 Executions 
       
         > keys = nodeSelectionCommands.keys("*"); keys.forEach(key -> log.info("key: {}", key)); connection.close(); redisClusterClient.shutdown(); } 复制代码 
        
       
      
     
   

定时更新集群拓扑视图(每隔十分钟更新一次,这个时间自行考量,不能太频繁):

@Test public void testPeriodicClusterTopology() throws Exception { RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions .builder() .enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES)) .build(); redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build()); StatefulRedisClusterConnection 
   
     connection = redisClusterClient.connect(); RedisAdvancedClusterCommands 
    
      commands = connection.sync(); commands.setex("name", 10, "throwable"); String value = commands.get("name"); log.info("Get value:{}", value); Thread.sleep(Integer.MAX_VALUE); connection.close(); redisClusterClient.shutdown(); } 复制代码 
     
   

自适应更新集群拓扑视图:

@Test public void testAdaptiveClusterTopology() throws Exception { RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build(); RedisClusterClient redisClusterClient = RedisClusterClient.create(uri); ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder() .enableAdaptiveRefreshTrigger( ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT, ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS ) .adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS)) .build(); redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build()); StatefulRedisClusterConnection 
   
     connection = redisClusterClient.connect(); RedisAdvancedClusterCommands 
    
      commands = connection.sync(); commands.setex("name", 10, "throwable"); String value = commands.get("name"); log.info("Get value:{}", value); Thread.sleep(Integer.MAX_VALUE); connection.close(); redisClusterClient.shutdown(); } 复制代码 
     
   

动态命令和自定义命令

自定义命令是Redis命令有限集,不过能够更细粒度指定KEYARGV、命令类型、编码解码器和返回值类型,依赖于dispatch()方法:

// 自定义实现PING方法 @Test public void testCustomPing() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection 
   
     connect = redisClient.connect(); RedisCommands 
    
      sync = connect.sync(); RedisCodec 
     
       codec = StringCodec.UTF8; String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec)); log.info("PING:{}", result); connect.close(); redisClient.shutdown(); } // PING:PONG // 自定义实现Set方法 @Test public void testCustomSet() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection 
      
        connect = redisClient.connect(); RedisCommands 
       
         sync = connect.sync(); RedisCodec 
        
          codec = StringCodec.UTF8; sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec), new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable")); String result = sync.get("name"); log.info("Get value:{}", result); connect.close(); redisClient.shutdown(); } // Get value:throwable 复制代码 
         
        
       
      
     
   

动态命令是基于Redis命令有限集,而且经过注解和动态代理完成一些复杂命令组合的实现。主要注解在io.lettuce.core.dynamic.annotation包路径下。简单举个例子:

public interface CustomCommand extends Commands { // SET [key] [value] @Command("SET ?0 ?1") String setKey(String key, String value); // SET [key] [value] @Command("SET :key :value") String setKeyNamed(@Param("key") String key, @Param("value") String value); // MGET [key1] [key2] @Command("MGET ?0 ?1") List 
   
     mGet(String key1, String key2); / * 方法名做为命令 */ @CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME) String mSet(String key1, String value1, String key2, String value2); } @Test public void testCustomDynamicSet() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); StatefulRedisConnection 
    
      connect = redisClient.connect(); RedisCommandFactory commandFactory = new RedisCommandFactory(connect); CustomCommand commands = commandFactory.getCommands(CustomCommand.class); commands.setKey("name", "throwable"); commands.setKeyNamed("throwable", "doge"); log.info("MGET ===> " + commands.mGet("name", "throwable")); commands.mSet("key1", "value1","key2", "value2"); log.info("MGET ===> " + commands.mGet("key1", "key2")); connect.close(); redisClient.shutdown(); } // MGET ===> [throwable, doge] // MGET ===> [value1, value2] 复制代码 
     
   

高阶特性

Lettuce有不少高阶使用特性,这里只列举我的认为经常使用的两点:

  • 配置客户端资源。
  • 使用链接池。

更多其余特性能够自行参看官方文档。

配置客户端资源

客户端资源的设置与Lettuce的性能、并发和事件处理相关。线程池或者线程组相关配置占据客户端资源配置的大部分(EventLoopGroupsEventExecutorGroup),这些线程池或者线程组是链接程序的基础组件。通常状况下,客户端资源应该在多个Redis客户端之间共享,而且在再也不使用的时候须要自行关闭。笔者认为,客户端资源是面向Netty的。注意除非特别熟悉或者花长时间去测试调整下面提到的参数,不然在没有经验的前提下凭直觉修改默认值,有可能会踩坑

客户端资源接口是ClientResources,实现类是DefaultClientResources

构建DefaultClientResources实例:

// 默认 ClientResources resources = DefaultClientResources.create(); // 建造器 ClientResources resources = DefaultClientResources.builder() .ioThreadPoolSize(4) .computationThreadPoolSize(4) .build() 复制代码

使用:

ClientResources resources = DefaultClientResources.create(); // 非集群 RedisClient client = RedisClient.create(resources, uri); // 集群 RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris); // ...... client.shutdown(); clusterClient.shutdown(); // 关闭资源 resources.shutdown(); 复制代码

客户端资源基本配置:

属性 描述 默认值
ioThreadPoolSize I/O线程数 Runtime.getRuntime().availableProcessors()
computationThreadPoolSize 任务线程数 Runtime.getRuntime().availableProcessors()

客户端资源高级配置:

属性 描述 默认值
eventLoopGroupProvider EventLoopGroup提供商 -
eventExecutorGroupProvider EventExecutorGroup提供商 -
eventBus 事件总线 DefaultEventBus
commandLatencyCollectorOptions 命令延时收集器配置 DefaultCommandLatencyCollectorOptions
commandLatencyCollector 命令延时收集器 DefaultCommandLatencyCollector
commandLatencyPublisherOptions 命令延时发布器配置 DefaultEventPublisherOptions
dnsResolver DNS处理器 JDK或者Netty提供
reconnectDelay 重连延时配置 Delay.exponential()
nettyCustomizer Netty自定义配置器 -
tracing 轨迹记录器 -

非集群客户端RedisClient的属性配置:

Redis非集群客户端RedisClient自己提供了配置属性方法:

RedisClient client = RedisClient.create(uri); client.setOptions(ClientOptions.builder() .autoReconnect(false) .pingBeforeActivateConnection(true) .build()); 复制代码

非集群客户端的配置属性列表:

属性 描述 默认值
pingBeforeActivateConnection 链接激活以前是否执行PING命令 false
autoReconnect 是否自动重连 true
cancelCommandsOnReconnectFailure 重连失败是否拒绝命令执行 false
suspendReconnectOnProtocolFailure 底层协议失败是否挂起重连操做 false
requestQueueSize 请求队列容量 (Integer#MAX_VALUE)
disconnectedBehavior 失去链接时候的行为 DEFAULT
sslOptions SSL配置 -
socketOptions Socket配置 10 seconds Connection-Timeout, no keep-alive, no TCP noDelay
timeoutOptions 超时配置 -
publishOnScheduler 发布反应式信号数据的调度器 使用I/O线程

集群客户端属性配置:

Redis集群客户端RedisClusterClient自己提供了配置属性方法:

RedisClusterClient client = RedisClusterClient.create(uri); ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES)) .enableAllAdaptiveRefreshTriggers() .build(); client.setOptions(ClusterClientOptions.builder() .topologyRefreshOptions(topologyRefreshOptions) .build()); 复制代码

集群客户端的配置属性列表:

属性 描述 默认值
enablePeriodicRefresh 是否容许周期性更新集群拓扑视图 false
refreshPeriod 更新集群拓扑视图周期 60秒
enableAdaptiveRefreshTrigger 设置自适应更新集群拓扑视图触发器RefreshTrigger -
adaptiveRefreshTriggersTimeout 自适应更新集群拓扑视图触发器超时设置 30秒
refreshTriggersReconnectAttempts 自适应更新集群拓扑视图触发重连次数 5
dynamicRefreshSources 是否容许动态刷新拓扑资源 true
closeStaleConnections 是否容许关闭陈旧的链接 true
maxRedirects 集群重定向次数上限 5
validateClusterNodeMembership 是否校验集群节点的成员关系 true

使用链接池

引入链接池依赖commons-pool2

 
    
    
      org.apache.commons 
     
    
      commons-pool2 
     
    
      2.7.0 
     
   

基本使用以下:

@Test public void testUseConnectionPool() throws Exception { RedisURI redisUri = RedisURI.builder() .withHost("localhost") .withPort(6379) .withTimeout(Duration.of(10, ChronoUnit.SECONDS)) .build(); RedisClient redisClient = RedisClient.create(redisUri); GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); GenericObjectPool 
   
     > pool = ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig); try (StatefulRedisConnection 
    
      connection = pool.borrowObject()) { RedisCommands 
     
       command = connection.sync(); SetArgs setArgs = SetArgs.Builder.nx().ex(5); command.set("name", "throwable", setArgs); String n = command.get("name"); log.info("Get value:{}", n); } pool.close(); redisClient.shutdown(); } 复制代码 
      
     
   

其中,同步链接的池化支持须要用ConnectionPoolSupport,异步链接的池化支持须要用AsyncConnectionPoolSupportLettuce5.1以后才支持)。

几个常见的渐进式删除例子

渐进式删除Hash中的域-属性:

@Test public void testDelBigHashKey() throws Exception { // SCAN参数 ScanArgs scanArgs = ScanArgs.Builder.limit(2); // TEMP游标 ScanCursor cursor = ScanCursor.INITIAL; // 目标KEY String key = "BIG_HASH_KEY"; prepareHashTestData(key); log.info("开始渐进式删除Hash的元素..."); int counter = 0; do { MapScanCursor 
   
     result = COMMAND.hscan(key, cursor, scanArgs); // 重置TEMP游标 cursor = ScanCursor.of(result.getCursor()); cursor.setFinished(result.isFinished()); Collection 
    
      fields = result.getMap().values(); if (!fields.isEmpty()) { COMMAND.hdel(key, fields.toArray(new String[0])); } counter++; } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished())); log.info("渐进式删除Hash的元素完毕,迭代次数:{} ...", counter); } private void prepareHashTestData(String key) throws Exception { COMMAND.hset(key, "1", "1"); COMMAND.hset(key, "2", "2"); COMMAND.hset(key, "3", "3"); COMMAND.hset(key, "4", "4"); COMMAND.hset(key, "5", "5"); } 复制代码 
     
   

渐进式删除集合中的元素:

@Test public void testDelBigSetKey() throws Exception { String key = "BIG_SET_KEY"; prepareSetTestData(key); // SCAN参数 ScanArgs scanArgs = ScanArgs.Builder.limit(2); // TEMP游标 ScanCursor cursor = ScanCursor.INITIAL; log.info("开始渐进式删除Set的元素..."); int counter = 0; do { ValueScanCursor 
   
     result = COMMAND.sscan(key, cursor, scanArgs); // 重置TEMP游标 cursor = ScanCursor.of(result.getCursor()); cursor.setFinished(result.isFinished()); List 
    
      values = result.getValues(); if (!values.isEmpty()) { COMMAND.srem(key, values.toArray(new String[0])); } counter++; } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished())); log.info("渐进式删除Set的元素完毕,迭代次数:{} ...", counter); } private void prepareSetTestData(String key) throws Exception { COMMAND.sadd(key, "1", "2", "3", "4", "5"); } 复制代码 
     
   

渐进式删除有序集合中的元素:

@Test public void testDelBigZSetKey() throws Exception { // SCAN参数 ScanArgs scanArgs = ScanArgs.Builder.limit(2); // TEMP游标 ScanCursor cursor = ScanCursor.INITIAL; // 目标KEY String key = "BIG_ZSET_KEY"; prepareZSetTestData(key); log.info("开始渐进式删除ZSet的元素..."); int counter = 0; do { ScoredValueScanCursor 
   
     result = COMMAND.zscan(key, cursor, scanArgs); // 重置TEMP游标 cursor = ScanCursor.of(result.getCursor()); cursor.setFinished(result.isFinished()); List 
    
      > scoredValues = result.getValues(); if (!scoredValues.isEmpty()) { COMMAND.zrem(key, scoredValues.stream().map(ScoredValue 
     
       ::getValue).toArray(String[]::new)); } counter++; } while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished())); log.info("渐进式删除ZSet的元素完毕,迭代次数:{} ...", counter); } private void prepareZSetTestData(String key) throws Exception { COMMAND.zadd(key, 0, "1"); COMMAND.zadd(key, 0, "2"); COMMAND.zadd(key, 0, "3"); COMMAND.zadd(key, 0, "4"); COMMAND.zadd(key, 0, "5"); } 复制代码 
      
     
   

在SpringBoot中使用Lettuce

我的认为,spring-data-redis中的API封装并非很优秀,用起来比较重,不够灵活,这里结合前面的例子和代码,在SpringBoot脚手架项目中配置和整合Lettuce。先引入依赖:

 
    
     
      
      
        org.springframework.boot 
       
      
        spring-boot-dependencies 
       
      
        2.1.8.RELEASE 
       
      
        pom 
       
      
        import 
       
      
     
    
    
     
     
       org.springframework.boot 
      
     
       spring-boot-starter-web 
      
     
     
     
       io.lettuce 
      
     
       lettuce-core 
      
     
       5.1.8.RELEASE 
      
     
     
     
       org.projectlombok 
      
     
       lombok 
      
     
       1.18.10 
      
     
       provided 
      
     
    复制代码

通常状况下,每一个应用应该使用单个Redis客户端实例和单个链接实例,这里设计一个脚手架,适配单机、普通主从、哨兵和集群四种使用场景。对于客户端资源,采用默认的实现便可。对于Redis的链接属性,比较主要的有HostPortPassword,其余能够暂时忽略。基于约定大于配置的原则,先定制一系列属性配置类(其实有些配置是能够彻底共用,可是考虑到要清晰描述类之间的关系,这里拆分多个配置属性类和多个配置方法):

@Data @ConfigurationProperties(prefix = "lettuce") public class LettuceProperties { private LettuceSingleProperties single; private LettuceReplicaProperties replica; private LettuceSentinelProperties sentinel; private LettuceClusterProperties cluster; } @Data public class LettuceSingleProperties { private String host; private Integer port; private String password; } @EqualsAndHashCode(callSuper = true) @Data public class LettuceReplicaProperties extends LettuceSingleProperties { } @EqualsAndHashCode(callSuper = true) @Data public class LettuceSentinelProperties extends LettuceSingleProperties { private String masterId; } @EqualsAndHashCode(callSuper = true) @Data public class LettuceClusterProperties extends LettuceSingleProperties { } 复制代码

配置类以下,主要使用@ConditionalOnProperty作隔离,通常状况下,不多有人会在一个应用使用一种以上的Redis链接场景:

@RequiredArgsConstructor @Configuration @ConditionalOnClass(name = "io.lettuce.core.RedisURI") @EnableConfigurationProperties(value = LettuceProperties.class) public class LettuceAutoConfiguration { private final LettuceProperties lettuceProperties; @Bean(destroyMethod = "shutdown") public ClientResources clientResources() { return DefaultClientResources.create(); } @Bean @ConditionalOnProperty(name = "lettuce.single.host") public RedisURI singleRedisUri() { LettuceSingleProperties singleProperties = lettuceProperties.getSingle(); return RedisURI.builder() .withHost(singleProperties.getHost()) .withPort(singleProperties.getPort()) .withPassword(singleProperties.getPassword()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.single.host") public RedisClient singleRedisClient(ClientResources clientResources, @Qualifier("singleRedisUri") RedisURI redisUri) { return RedisClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.single.host") public StatefulRedisConnection 
   
     singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) { return singleRedisClient.connect(); } @Bean @ConditionalOnProperty(name = "lettuce.replica.host") public RedisURI replicaRedisUri() { LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica(); return RedisURI.builder() .withHost(replicaProperties.getHost()) .withPort(replicaProperties.getPort()) .withPassword(replicaProperties.getPassword()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.replica.host") public RedisClient replicaRedisClient(ClientResources clientResources, @Qualifier("replicaRedisUri") RedisURI redisUri) { return RedisClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.replica.host") public StatefulRedisMasterSlaveConnection 
    
      replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient, @Qualifier("replicaRedisUri") RedisURI redisUri) { return MasterSlave.connect(replicaRedisClient, new Utf8StringCodec(), redisUri); } @Bean @ConditionalOnProperty(name = "lettuce.sentinel.host") public RedisURI sentinelRedisUri() { LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel(); return RedisURI.builder() .withPassword(sentinelProperties.getPassword()) .withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort()) .withSentinelMasterId(sentinelProperties.getMasterId()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.sentinel.host") public RedisClient sentinelRedisClient(ClientResources clientResources, @Qualifier("sentinelRedisUri") RedisURI redisUri) { return RedisClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.sentinel.host") public StatefulRedisMasterSlaveConnection 
     
       sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient, @Qualifier("sentinelRedisUri") RedisURI redisUri) { return MasterSlave.connect(sentinelRedisClient, new Utf8StringCodec(), redisUri); } @Bean @ConditionalOnProperty(name = "lettuce.cluster.host") public RedisURI clusterRedisUri() { LettuceClusterProperties clusterProperties = lettuceProperties.getCluster(); return RedisURI.builder() .withHost(clusterProperties.getHost()) .withPort(clusterProperties.getPort()) .withPassword(clusterProperties.getPassword()) .build(); } @Bean(destroyMethod = "shutdown") @ConditionalOnProperty(name = "lettuce.cluster.host") public RedisClusterClient redisClusterClient(ClientResources clientResources, @Qualifier("clusterRedisUri") RedisURI redisUri) { return RedisClusterClient.create(clientResources, redisUri); } @Bean(destroyMethod = "close") @ConditionalOnProperty(name = "lettuce.cluster") public StatefulRedisClusterConnection 
      
        clusterConnection(RedisClusterClient clusterClient) { return clusterClient.connect(); } } 复制代码 
       
      
     
   

最后为了让IDE识别咱们的配置,能够添加IDE亲缘性,/META-INF文件夹下新增一个文件spring-configuration-metadata.json,内容以下:

{ "properties": [ { "name": "lettuce.single", "type": "club.throwable.spring.lettuce.LettuceSingleProperties", "description": "单机配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" }, { "name": "lettuce.replica", "type": "club.throwable.spring.lettuce.LettuceReplicaProperties", "description": "主从配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" }, { "name": "lettuce.sentinel", "type": "club.throwable.spring.lettuce.LettuceSentinelProperties", "description": "哨兵配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" }, { "name": "lettuce.single", "type": "club.throwable.spring.lettuce.LettuceClusterProperties", "description": "集群配置", "sourceType": "club.throwable.spring.lettuce.LettuceProperties" } ] } 复制代码

若是想IDE亲缘性作得更好,能够添加/META-INF/additional-spring-configuration-metadata.json进行更多细节定义。简单使用以下:

@Slf4j @Component public class RedisCommandLineRunner implements CommandLineRunner { @Autowired @Qualifier("singleRedisConnection") private StatefulRedisConnection 
   
     connection; @Override public void run(String... args) throws Exception { RedisCommands 
    
      redisCommands = connection.sync(); redisCommands.setex("name", 5, "throwable"); log.info("Get value:{}", redisCommands.get("name")); } } // Get value:throwable 复制代码 
     
   

小结

本文算是基于Lettuce的官方文档,对它的使用进行全方位的分析,包括主要功能、配置都作了一些示例,限于篇幅部分特性和配置细节没有分析。Lettuce已经被spring-data-redis接纳做为官方的Redis客户端驱动,因此值得信赖,它的一些API设计确实比较合理,扩展性高的同时灵活性也高。我的建议,基于Lettuce包自行添加配置到SpringBoot应用用起来会驾轻就熟,毕竟RedisTemplate实在太笨重,并且还屏蔽了Lettuce一些高级特性和灵活的API

参考资料:

  • Lettuce Reference Guide
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/211160.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午11:04
下一篇 2026年3月18日 下午11:05


相关推荐

  • RISC架构服务器

    RISC架构服务器RISC 的英文全称为 ReducedInstr 中文即 精简指令集 它的指令系统相对简单 它只要求硬件执行很有限且最常用的那部分指令 大部分复杂的操作则使用成熟的编译技术 由简单指令合成 目前在中高档服务器中普遍采用这一指令系统的 CPU 特别是高档服务器全都采用 RISC 指令系统的 CPU 在中高档服务器中采用 RISC 指令的 CPU 主要有 Compaq 康柏

    2026年3月17日
    1
  • grid布局浏览器兼容_Grid布局指南

    grid布局浏览器兼容_Grid布局指南CSS 网格布局 又称 网格 是一种二维网格布局系统 CSS 在处理网页布局方面一直做的不是很好 一开始我们用的是 table 表格 布局 然后用 float 浮动 position 定位 和 inline block 行内块 布局 但是这些方法本质上是 hack 遗漏了很多功能 例如垂直居中 后来出了 flexbox 盒子布局 解决了很多布局问题 但是它仅仅是一维布局 而不是复杂的二维布局 实际上它们

    2026年3月18日
    2
  • python生成器详解_自动生成python代码

    python生成器详解_自动生成python代码生成器利用迭代器,我们可以在每次迭代获取数据(通过next()方法)时按照特定的规律进行生成。但是我们在实现一个迭代器时,关于当前迭代到的状态需要我们自己记录,进而才能根据当前状态生成下一个数据。

    2022年7月31日
    57
  • 算法讲解:二分图匹配【图论】

    算法讲解:二分图匹配【图论】二分图匹配 自然要先从定义入手 那么二分图是什么呢 二分图 二分图又称作二部图 是图论中的一种特殊模型 设 G V E 是一个无向图 如果顶点 V 可分割为两个互不相交的子集 A B 并且图中的每条边 i j 所关联的两个顶点 i 和 j 分别属于这两个不同的顶点集 iinA jinB 则称图 G 为一个二分图 简单的说 一个图被分成了两部分 相同的部分没有边 那这个图就是二分图 二分图

    2026年3月20日
    2
  • Windows 下搭建LDAP服务器

    Windows 下搭建LDAP服务器TheLightweightDirectoryAccessProtocol,orLDAP,isanapplicationprotocolforqueryingandmodifyingdirectoryservicesrunningoverTCP/IP.(viawikipedia)。LDAP全称是一个轻量级的目录访问协议,它是建立在TCP/IP

    2022年5月14日
    36
  • SRS软件需求规格说明书_SOR是什么文件

    SRS软件需求规格说明书_SOR是什么文件【摘要】随着信息时代科技的飞速发展,经济全球化已广为人知,英语作为全球最主要的语言之一,受到越来越多的人的喜爱,不仅为了增长知识,也为了能适应社会发展的需求。但是,学英语最重要的事首先是积累词汇,没有

    2022年8月2日
    6

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号