20200509 Curator入门

20200509 Curator入门简介 Curator 是 Netflix 公司开源的一套 zookeeper 客户端框架 解决了很多 Zookeeper 客户端非常底层的细节开发工作 包括连接重连 反复注册 Watcher 和 NodeExistsEx 异常等等 PatrixckHunt Zookeeper 以一句 GuavaistoJav 给 Curator 予高度评价 Curator 的 maven 依赖 一般直接使用 curator recipes 就行了 如果需要自己封装一些底层些的功

简介

 
   
   
   
     org.apache.curator 
    
   
     curator-framework 
    
   
     2.12.0 
    
   
   
   
   
     org.apache.curator 
    
   
     curator-recipes 
    
   
     2.12.0 
    
   

基本API

创建会话

使用静态工程方法创建

 

 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.128.129:2181", 5000, 5000, retryPolicy); 

其中RetryPolicy为重试策略,第一个参数为baseSleepTimeMs初始的sleep时间,用于计算之后的每次重试的sleep时间。第二个参数为maxRetries,最大重试次数。

使用Fluent风格api创建

 

 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.128.129:2181") .sessionTimeoutMs(5000) // 会话超时时间 .connectionTimeoutMs(5000) // 连接超时时间 .retryPolicy(retryPolicy) .namespace("base") // 包含隔离名称 .build(); client.start(); 

创建数据节点

 

client.create().creatingParentContainersIfNeeded() // 递归创建所需父节点 .withMode(CreateMode.PERSISTENT) // 创建类型为持久节点 .forPath("/nodeA", "init".getBytes()); // 目录及内容 

删除数据节点

 

 client.delete() .guaranteed() // 强制保证删除 .deletingChildrenIfNeeded() // 递归删除子节点 .withVersion(10086) // 指定删除的版本号 .forPath("/nodeA"); 

读取数据节点

读数据,返回值为byte[]

 

byte[] bytes = client.getData().forPath("/nodeA"); System.out.println(new String(bytes)); 

读stat

 

Stat stat = new Stat(); client.getData() .storingStatIn(stat) .forPath("/nodeA"); 

修改数据节点

 

client.setData() .withVersion(10086) // 指定版本修改 .forPath("/nodeA", "data".getBytes()); 

事务

 

client.inTransaction().check().forPath("/nodeA") .and() .create().withMode(CreateMode.EPHEMERAL).forPath("/nodeB", "init".getBytes()) .and() .create().withMode(CreateMode.EPHEMERAL).forPath("/nodeC", "init".getBytes()) .and() .commit(); 

其他

 

 client.checkExists() // 检查是否存在 .forPath("/nodeA"); client.getChildren().forPath("/nodeA"); // 获取子节点的路径 

异步回调

异步创建节点

 

Executor executor = Executors.newFixedThreadPool(2); client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .inBackground((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode())); },executor) .forPath("path");

 

监听器

Curator提供了三种Watcher(Cache)来监听结点的变化:

  • Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
  • Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
  • Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.retry.RetryNTimes; / * Curator framework watch test. */ public class CuratorWatcherTest { / Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws Exception { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); // 2.Register watcher PathChildrenCache watcher = new PathChildrenCache( client, ZK_PATH, true // if cache data ); watcher.getListenable().addListener((client1, event) -> { ChildData data = event.getData(); if (data == null) { System.out.println("No data in event[" + event + "]"); } else { System.out.println("Receive event: " + "type=[" + event.getType() + "]" + ", path=[" + data.getPath() + "]" + ", data=[" + new String(data.getData()) + "]" + ", stat=[" + data.getStat() + "]"); } }); watcher.start(StartMode.BUILD_INITIAL_CACHE); System.out.println("Register zk watcher successfully!"); Thread.sleep(Integer.MAX_VALUE); } }

Curator“菜谱”

既然Maven包叫做curator-recipes,那说明Curator有它独特的“菜谱”:

  • :包括共享锁、共享可重入锁、读写锁等。
  • 选举:Leader选举算法。
  • Barrier:阻止分布式计算直至某个条件被满足的“栅栏”,可以看做JDK Concurrent包中Barrier的分布式实现。
  • 缓存:前面提到过的三种Cache及监听机制。
  • 持久化结点:连接或Session终止后仍然在Zookeeper中存在的结点。
  • 队列:分布式队列、分布式优先级队列等。

分布式锁

分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。

下面的程序会启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒。运行多次可以观察到,有时是t1先拿到锁而t2等待,有时又会反过来。Curator会用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据。

package com.cdai.codebase.bigdata.hadoop.zookeeper.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes; import java.util.concurrent.TimeUnit; / * Curator framework's distributed lock test. */ public class CuratorDistrLockTest { / Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_LOCK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { doWithLock(client); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(client); }, "t2"); t1.start(); t2.start(); } private static void doWithLock(CuratorFramework client) { InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " release lock"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }

 

Leader选举

当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。

package com.cdai.codebase.bigdata.hadoop.zookeeper.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath; / * Curator framework's leader election test. * Output: * LeaderSelector-2 take leadership! * LeaderSelector-2 relinquish leadership! * LeaderSelector-1 take leadership! * LeaderSelector-1 relinquish leadership! * LeaderSelector-0 take leadership! * LeaderSelector-0 relinquish leadership! * ... */ public class CuratorLeaderTest { / Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(Thread.currentThread().getName() + " take leadership!"); // takeLeadership() method should only return when leadership is being relinquished. Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " relinquish leadership!"); } @Override public void stateChanged(CuratorFramework client, ConnectionState state) { } }; new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); Thread.sleep(Integer.MAX_VALUE); } private static void registerListener(LeaderSelectorListener listener) { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); // 2.Ensure path try { new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); } // 3.Register listener LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); } }
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/214902.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午3:10
下一篇 2026年3月18日 下午3:11


相关推荐

  • 华为太极magisk安装教程_教程:如何升级太极内部的应用

    华为太极magisk安装教程_教程:如何升级太极内部的应用使用过太极的小伙伴都知道,要把应用添加到太极(magisk版除外)中是个麻烦事儿:首先得花费大量的时间来创建应用,然后还必须卸载原来的应用,安装完毕之后又要花很长的时间来优化。如果待创建的应用安装包比较大并且你手机的CPU不在工作状态,那这个流程就不是一般的长了。这时候就有童鞋会问了,如果我在太极里面创建的应用要升级了怎么办?是不是每次升级都需要卸载原应用?实际上,在太极内部升级应用…

    2022年6月3日
    165
  • 数据库副本的自动种子设定(自增长)

    数据库副本的自动种子设定(自增长)

    2021年11月26日
    41
  • java核心技术总结

    java核心技术总结*****************java基础*****************一、java中的局部变量、实例变量和类变量二、java中的基本类型和引用类型三、java中的位操作符四、关于java的方法1、方法的重载和重写(方法签名:方法名和参数列表)2、方法的可变参(int…args)(1)基本特点底层就是用数组来实现的;在方法内部可变参可以直接当成数组进…

    2022年7月7日
    21
  • Linux服务.NO6——http协议

    Linux服务.NO6——http协议9.http9.1.http概念http协议即超文本传输协议,用于从万维网服务器传输超文本到本地浏览器的传送协议。http是基于TCP/IP通信协议来传递数据的一个属于应用层的面向对象的协议。http协议工作于c/s架构,浏览器作为客户端通过url向http服务端(即web服务器)发送所有请求,web服务器根据受到的请求后,向客户端发送响应。9.2.http特点1.简单快速:客户向服务…

    2025年7月24日
    5
  • Ubuntu安装和配置ssh

    Ubuntu安装和配置ssh因为配置pypbc环境,需要windows系统下PycharmSSH连接虚拟机python环境1.安装ssh服务器sudoaptinstallopenssh-server2.安装ssh客

    2022年7月1日
    29
  • WorkBuddyClaw实战

    WorkBuddyClaw实战

    2026年3月18日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号