Redis——Lettuce连接redis集群

Redis——Lettuce连接redis集群lettuce 集群 redis

Lettuce连接redis集群使用的都是集群专用类,像RedisClusterClient、StatefulRedisClusterConnection、RedisAdvancedClusterCommands、StatefulRedisClusterPubSubConnection等等;

Lettuce对redis cluster的支持:

  • 支持所有Cluster命令;
  • 基于键哈希槽的路由节点;
  • 对集群命令高级抽象;
  • 在多个集群节点上执行命令;
  • 处理MOVED和ASK重定向;
  • 通过槽位和ip端口直接连接集群节点;
  • SSL和身份验证;
  • 定期和自适应集群拓扑更新;
  • 发布订阅;

启动时只需至少一个可以连接的集群节点就可以,能够自动拓扑出集群全部节点;也可以使用ReadFrom设置读取数据来源,跟主从模式一样;

虽然redis本身的多键命令要求key必须都在同一个槽位,但Lettuce对一部分命令多了优化,可以对多键命令进行跨槽位执行,通过将对不同槽位键的操作命令分解为多条命令,单个命令以fork/join方式并发运行,最后将结果合并返回;

可以跨槽位的命令有:

  • DEL:删除键,返回删除数量;
  • EXISTS:统计跨槽位的存在的键的数量;
  • MGET:获取所有给定键的值,顺序按照键的顺序返回;
  • MSET:批量保存键值对,总是返回OK;
  • TOUCH:改变给定键的最后访问时间,返回改变的键的数量;
  • UNLINK:删除键并在另一个不同的线程中回收内存,返回删除数量;

提供跨槽位命令的api:RedisAdvancedClusterCommands、RedisAdvancedClusterAsyncCommands、RedisAdvancedClusterReactiveCommands;

可以在多个集群节点上执行的命令有:

  • CLIENT SETNAME:在所有已知的集群节点上设置客户端的名称,总是返回OK;
  • KEYS:返回所有master上存储的key;
  • DBSIZE:返回所有master上存储的key的数量;
  • FLUSHALL:清空master上的所有数据,总是返回OK;
  • FLUSHDB:清空master上的所有数据,总是返回OK;
  • RANDOMKEY:从随机master上返回随机的key;
  • SCAN:根据ReadFrom设置扫描整个集群的键空间;
  • SCRIPT FLUSH:从所有的集群节点脚本缓存中删除所有脚本;
  • SCRIPT LOAD:在所有的集群节点上加载lua脚本;
  • SCRIPT KILL:在所有集群节点上杀死脚本;(即使脚本没有运行调用也不会失败)
  • SHUTDOWN:将数据集同步保存到磁盘,然后关闭集群所有节点;

关于发布订阅:

普通用户空间的发布订阅,redis集群会发送到每个节点,发布者和订阅者不需要在同一个节点,普通订阅发布消息可以在集群拓扑改变时重新连接。对于键空间事件,只会发到自己的节点,不会扩散到其他节点,要订阅键空间事件可以去适当的多个节点上订阅,或者使用RedisClusterClient消息传播和NodeSelection API获得一个托管连接集合;

注意:由于主从同步,键会被复制到多个从节点上,特别是键过期事件,会在主从节点上都产生过期事件,如果订阅从节点,可能会收到多条相同的过期事件;订阅是通过NodeSelection API或者单个节点调用subscribe(…)发出的,订阅对于新增的节点无效;

测试Demo:(redis版本7.0.2,Lettuce版本6.1.8)

集群节点:虚拟机 192.168.1.31,端口 9001-9006,集群节点已设置notify-keyspace-events AK;

/ * 2022年6月23日上午9:41:47 */ package testlettuce; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import io.lettuce.core.ClientOptions.DisconnectedBehavior; import io.lettuce.core.KeyScanCursor; import io.lettuce.core.KeyValue; import io.lettuce.core.ReadFrom; import io.lettuce.core.RedisURI; import io.lettuce.core.ScanCursor; import io.lettuce.core.SocketOptions; import io.lettuce.core.SslOptions; import io.lettuce.core.TimeoutOptions; import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.api.sync.Executions; import io.lettuce.core.cluster.api.sync.NodeSelection; import io.lettuce.core.cluster.api.sync.NodeSelectionCommands; import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.cluster.pubsub.api.async.NodeSelectionPubSubAsyncCommands; import io.lettuce.core.cluster.pubsub.api.async.PubSubAsyncNodeSelection; import io.lettuce.core.cluster.pubsub.api.reactive.RedisClusterPubSubReactiveCommands; import io.lettuce.core.protocol.DecodeBufferPolicies; import io.lettuce.core.protocol.ProtocolVersion; import io.lettuce.core.pubsub.RedisPubSubListener; import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; / * @author XWF * */ public class TestLettuceCluster { / * @param args */ public static void main(String[] args) { List 
  
    nodeList = new ArrayList<>(); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9001).withAuthentication("default", "").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9002).withAuthentication("default", "").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9003).withAuthentication("default", "").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9004).withAuthentication("default", "").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9005).withAuthentication("default", "").build()); nodeList.add(RedisURI.builder().withHost("192.168.1.31").withPort(9006).withAuthentication("default", "").build()); RedisClusterClient clusterClient = RedisClusterClient.create(nodeList); ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder() .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(5L))//设置自适应拓扑刷新超时,每次超时刷新一次,默认30s; .closeStaleConnections(false)//刷新拓扑时是否关闭失效连接,默认true,isPeriodicRefreshEnabled()为true时生效; .dynamicRefreshSources(true)//从拓扑中发现新节点,并将新节点也作为拓扑的源节点,动态刷新可以发现全部节点并计算每个客户端的数量,设置false则只有初始节点为源和计算客户端数量; .enableAllAdaptiveRefreshTriggers()//启用全部触发器自适应刷新拓扑,默认关闭; .enablePeriodicRefresh(Duration.ofSeconds(5L))//开启定时拓扑刷新并设置周期; .refreshTriggersReconnectAttempts(3)//长连接重新连接尝试n次才拓扑刷新 .build(); ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder() .autoReconnect(true)//在连接丢失时开启或关闭自动重连,默认true; .cancelCommandsOnReconnectFailure(true)//允许在重连失败取消排队命令,默认false; .decodeBufferPolicy(DecodeBufferPolicies.always())//设置丢弃解码缓冲区的策略,以回收内存;always:解码后丢弃,最大内存效率;alwaysSome:解码后丢弃一部分;ratio(n)基于比率丢弃,n/(1+n),通常用1-10对应50%-90%; .disconnectedBehavior(DisconnectedBehavior.DEFAULT)//设置连接断开时命令的调用行为,默认启用重连;DEFAULT:启用时重连中接收命令,禁用时重连中拒绝命令;ACCEPT_COMMANDS:重连中接收命令;REJECT_COMMANDS:重连中拒绝命令; // .maxRedirects(5)//当键从一个节点迁移到另一个节点,集群重定向次数,默认5; // .nodeFilter(nodeFilter)//设置节点过滤器 // .pingBeforeActivateConnection(true)//激活连接前设置PING,默认true; // .protocolVersion(ProtocolVersion.RESP3)//设置协议版本,默认RESP3; // .publishOnScheduler(false)//使用专用的调度器发出响应信号,默认false,启用时数据信号将使用服务的多线程发出; // .requestQueueSize(requestQueueSize)//设置每个连接请求队列大小; // .scriptCharset(scriptCharset)//设置Lua脚本编码为byte[]的字符集,默认StandardCharsets.UTF_8; // .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)).keepAlive(true).tcpNoDelay(true).build())//设置低级套接字的属性 // .sslOptions(SslOptions.builder().build())//设置ssl属性 // .suspendReconnectOnProtocolFailure(false)//当重新连接遇到协议失败时暂停重新连接(SSL验证,连接失败前PING),默认值为false; // .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(10)))//设置超时来取消和终止命令; .topologyRefreshOptions(clusterTopologyRefreshOptions)//设置拓扑更新设置 .validateClusterNodeMembership(true)//在允许连接到集群节点之前,验证集群节点成员关系,默认值为true; .build(); clusterClient.setDefaultTimeout(Duration.ofSeconds(5L)); clusterClient.setOptions(clusterClientOptions); StatefulRedisClusterConnection 
   
     clusterConn = clusterClient.connect(); clusterConn.setReadFrom(ReadFrom.ANY);//设置从哪些节点读取数据; RedisAdvancedClusterCommands 
    
      clusterCmd = clusterConn.sync(); clusterCmd.set("a", "A"); clusterCmd.set("b", "B"); clusterCmd.set("c", "C"); clusterCmd.set("d", "D"); System.out.println("get a=" + clusterCmd.get("a")); System.out.println("get b=" + clusterCmd.get("b")); System.out.println("get c=" + clusterCmd.get("c")); System.out.println("get d=" + clusterCmd.get("d")); //跨槽位命令 Map 
     
       kvmap = new HashMap<>(); kvmap.put("a", "AA"); kvmap.put("b", "BB"); kvmap.put("c", "CC"); kvmap.put("d", "DD"); clusterCmd.mset(kvmap);//Lettuce做了优化,支持一些命令的跨槽位命令; System.out.println("Lettuce mget:" + clusterCmd.mget("a", "b", "c", "d")); //选定部分节点操作 NodeSelection 
      
        replicas = clusterCmd.replicas(); NodeSelectionCommands 
       
         replicaseCmd = replicas.commands(); Executions 
        
          > executions = replicaseCmd.scan(ScanCursor.INITIAL); executions.forEach(s -> {System.out.println(s.getKeys());}); //订阅发布消息 StatefulRedisClusterPubSubConnection 
         
           pubSubConn = clusterClient.connectPubSub(); pubSubConn.addListener(new RedisPubSubListener 
          
            () { @Override public void message(String channel, String message) { System.out.println("[message]ch:" + channel + ",msg:" + message); } @Override public void message(String pattern, String channel, String message) { } @Override public void subscribed(String channel, long count) { System.out.println("[subscribed]ch:" + channel); } @Override public void psubscribed(String pattern, long count) { } @Override public void unsubscribed(String channel, long count) { } @Override public void punsubscribed(String pattern, long count) { } }); pubSubConn.sync().subscribe("TEST_Ch");//(回调内部使用阻塞调用或者lettuce同步api调用,需使用异步订阅) clusterCmd.publish("TEST_Ch", "MSGMSGMSG"); //响应式订阅,可以监听ChannelMessage和PatternMessage,使用链式过滤处理计算等操作 RedisClusterPubSubReactiveCommands 
           
             pubsubReactive = pubSubConn.reactive(); pubsubReactive.subscribe("TEST_Ch2").subscribe(); pubsubReactive.observeChannels() .filter(chmsg -> {return chmsg.getMessage().contains("tom");}) .doOnNext(chmsg -> {System.out.println(" 
            
              " + chmsg.getChannel() + ">>" + chmsg.getMessage());}) .subscribe(); clusterCmd.publish("TEST_Ch2", "send to jerry"); clusterCmd.publish("TEST_Ch", "tom MSG"); clusterCmd.publish("TEST_Ch2", "this is tom"); //keySpaceEvent事件 StatefulRedisClusterPubSubConnection 
             
               clusterPubSubConn = clusterClient.connectPubSub(); clusterPubSubConn.setNodeMessagePropagation(true);//启用禁用节点消息传播到该listener,例如只能在本节点通知的键事件通知; RedisPubSubListener 
              
                listener = new RedisPubSubListener 
               
                 () { @Override public void unsubscribed(String channel, long count) { System.out.println("unsubscribed_ch:" + channel); } @Override public void subscribed(String channel, long count) { System.out.println("subscribed_ch:" + channel); } @Override public void punsubscribed(String pattern, long count) { System.out.println("punsubscribed_pattern:" + pattern); } @Override public void psubscribed(String pattern, long count) { System.out.println("psubscribed_pattern:" + pattern); } @Override public void message(String pattern, String channel, String message) { System.out.println("message_pattern:" + pattern + " ch:" + channel + " msg:" + message); } @Override public void message(String channel, String message) { System.out.println("message_ch:" + channel + " msg:" + message); } }; clusterPubSubConn.addListener(listener); PubSubAsyncNodeSelection 
                
                  allPubSubAsyncNodeSelection = clusterPubSubConn.async().all(); NodeSelectionPubSubAsyncCommands 
                 
                   pubsubAsyncCmd = allPubSubAsyncNodeSelection.commands(); clusterCmd.setex("a", 1, "A"); pubsubAsyncCmd.psubscribe("__keyspace@0__:*"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end"); } } 
                  
                 
                
               
              
             
            
           
          
         
        
       
      
     
    
  

运行结果:

Redis——Lettuce连接redis集群

另外,还有一个cluster专用的Listener:RedisClusterPubSubListener,可以从listener里获得发布消息的节点信息:

RedisClusterPubSubListener 
  
    clusterListener = new RedisClusterPubSubListener 
   
     () { @Override public void message(RedisClusterNode node, String channel, String message) { } @Override public void message(RedisClusterNode node, String pattern, String channel, String message) { } @Override public void subscribed(RedisClusterNode node, String channel, long count) { } @Override public void psubscribed(RedisClusterNode node, String pattern, long count) { } @Override public void unsubscribed(RedisClusterNode node, String channel, long count) { } @Override public void punsubscribed(RedisClusterNode node, String pattern, long count) { } }; 
    
  

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/228897.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月16日 下午5:56
下一篇 2026年3月16日 下午5:57


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号