Zookeeper分布式锁实现(zk怎么实现分布式锁)

如题,使用zookeeper实现分布式锁时隔多日又来水文章了,距离上一篇好像过去很久了,现在回头看看之前写的那些东西,只能称之为“垃圾”。今天分享一个基于zookeeper实现的分布式锁简单案例,此案例仅实现了分布式锁的功能,代码优化等一概不扯。下面先来聊聊其实现的核心思想:首先用到zookeeper中的两个重要知识点:1、zookeeper中的节点类型:临时节点、临时有序节点、持久节点、持久有序节点。临时节点跟session关联。2、zookeeper的watch。以上两点就是实现分布式锁的核

大家好,又见面了,我是你们的朋友全栈君。

如题,使用zookeeper实现分布式锁

时隔多日又来水文章了,距离上一篇好像过去很久了,现在回头看看之前写的那些东西,只能称之为“垃圾”。今天分享一个基于zookeeper实现的分布式锁简单案例,此案例仅实现了分布式锁的功能,代码优化等一概不扯。

下面先来聊聊其实现的核心思想:

首先用到zookeeper中的两个重要知识点:1、zookeeper中的节点类型:临时节点、临时有序节点、持久节点、持久有序节点。临时节点跟session关联。2、zookeeper的watch。以上两点就是实现分布式锁的核心点。

1、创建一个节点lock作为锁的根节点,当有线程需要抢锁的时候在该节点下创建一个临时有序节点

2、节点创建成功后,获取当前根节点下的所有孩子节点列表,并将自己阻塞住

3、因为获取到的子节点列表是无序的,所以需要先对子节点进行排序,然后判断自己是不是当前的第一个子节点,如果自己是第一个子节点说明抢到锁可以执行业务代码

4、如果自己不是第一个子节点,获取到自己当前在列表中索引,去监听自己的前一个节点,也就是自己的索引  index -1   (这里的监听前一个节点为核心,如果我们去监听根节点,那么一个节点的删除就需要回调所有的子节点代价太大,所以是监听前一个节点)

5、当获得锁的节点执行释放锁,也就是删除自己的节点时,后边监听的节点收到回调事件后再去获取所有的子节点,再去判断自己是不是第一个,执行抢锁操作

以上几步,便是实现分布式锁的核心思想。下面将实现的代码贴出来。

代码部分

1、pom.xml

 <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.7.0</version>
        </dependency>

2、ZKUtils.java  获取zookeeper实例的工具类

package com.bx.wx.system.zk;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/16
 */
public class ZKUtils {

    private static ZooKeeper zooKeeper;

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static ZooKeeper getZooKeeper() throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper("ip:2181,ip:2182,ip:2183/testConfig/lock", 3000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                Event.KeeperState state = event.getState();
                switch (state) {
                    case Unknown:
                        break;
                    case Disconnected:
                        break;
                    case NoSyncConnected:
                        break;
                    case SyncConnected:
                        countDownLatch.countDown();
                        break;
                    case AuthFailed:
                        break;
                    case ConnectedReadOnly:
                        break;
                    case SaslAuthenticated:
                        break;
                    case Expired:
                        break;
                    case Closed:
                        break;
                }

            }
        });
        countDownLatch.await();
        return zooKeeper;
    }
}

3、ZKLockUtils.java  实现了分布式锁的工具类

package com.bx.wx.system.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/21
 */
public class ZKLockUtils implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback, AsyncCallback.Children2Callback, AsyncCallback.StringCallback {

    /**
     * 这里可通过set方法或者构造方法,传入zooKeeper,
     */
    private ZooKeeper zooKeeper;

    /**
     * 当前节点的path
     */
    private String pathName;

    /**
     * 当前线程的名字,便于查看
     */
    private String threadName;

    /**
     * 用于获取不到锁时候阻塞
     */
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * v 1.0
     * 加锁方法
     * 基础版本,功能实现了,后续再进行优化吧
     */
    public  void lock(){
        /**
         * 思路.....
         * 1、在锁目录下创建自己的节点,临时有序节点
         * 2、获取所有的孩子节点、判断自己是不是第一个
         * 3、如果自己是第一个,则加锁成功,执行业务代码
         * 4、如果自己不是第一个,watch自己的前一个节点
         * 5、当第一个节点,也就是获取锁的执行完之后,删除自己的节点
         * 6、第二个就能监听到,从而继续执行获取所有孩子节点,判断自己是不是第一个的操作
         */
        try {
            zooKeeper.create("/lock", "lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,threadName);
            //当前线程阻塞,进行抢锁
            countDownLatch.await();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 解锁方法
     * 执行完业务后,删除掉自己的节点即可 version为-1  忽略数据版本
     */
    public  void ulock(){
        //删除自己的节点
        try {
            zooKeeper.delete(pathName,-1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }


    /**
     * Children2Callback 接口
     * 获取节点下所有孩子
     * 实现分布式锁的核心点
     * @param rc
     * @param path
     * @param ctx
     * @param children
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        if(children != null && children.size()>0){
            //对节点进行排序
            Collections.sort(children);
            String currentPath = pathName.substring(1);
            //查询自己是第几个
            int index = children.indexOf(currentPath);
            //判断自己是不是第一个
            if(index<1){
                try {
                    //如果自己是第一个,则认为抢到了锁
                    System.out.println(threadName+"抢到锁了..");
                    zooKeeper.setData("/",threadName.getBytes(),-1);
                    countDownLatch.countDown();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }else{
                //只监听自己的前一个
                zooKeeper.exists("/"+children.get(index-1),this,this,"abc");
            }
        }
    }

    /**
     * 节点创建成功时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param name
     */
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        pathName = name;
        System.out.println(threadName+"-节点创建成功:"+pathName);
        //处的watch为false,表示不需要对根节点下的所有节点进行watch,我们只需要监听自己的前一个即可
        zooKeeper.getChildren("/",false,this,"abc");
    }

    /**
     * DataCallback接口
     * 当getdata有数据时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param data
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        //TODO
    }

    /**
     * StatCallback接口
     * 判断节点是否存在时的回调
     * @param rc
     * @param path
     * @param ctx
     * @param stat
     */
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        //TODO
        /*if(stat != null){
            zooKeeper.getData("/lock",this,this,"abc");
        }*/
    }

    /**
     * Watcher 接口
     * 节点的事件回调
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        Event.EventType eventType = event.getType();
        String path = event.getPath();
        switch (eventType) {
            case None:
                break;
            case NodeCreated:
                System.out.println("节点被创建...");
                break;
            case NodeDeleted:
                //当前一个节点被删除,判断自己是不是第一个
                System.out.println(path+"-节点被删除...");
                //执行获取所有孩子节点的操作
                zooKeeper.getChildren("/",false,this,"abc");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }


}

4、ZkLockTest.java 测试类

package com.bx.wx.system.zk;

import org.apache.zookeeper.ZooKeeper;

/**
 * @创建人 z.bx
 * @创建时间 2021/5/21
 */
public class ZkLockTest {

    public static void main(String[] args)throws Exception {
        ZooKeeper zooKeeper = ZKUtils.getZooKeeper();
        //模拟多线程请求
        for (int i = 0; i < 5; i++) {
            String threadName = "LockThread-"+i;
            new Thread(()->{
                ZKLockUtils lockUtils = new ZKLockUtils();
                lockUtils.setZooKeeper(zooKeeper);
                lockUtils.setThreadName(threadName);
                //加锁
                lockUtils.lock();
                System.out.println(Thread.currentThread().getName()+"正在执行任务");
                //模拟执行任务
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //解锁
                lockUtils.ulock();
            },threadName).start();
        }
    }
}

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/128665.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • PXE启动原理以及与普通Linux启动的对比

    PXE启动原理以及与普通Linux启动的对比关于PXE部署的详细配置的文章已经有不少了,这篇文章主要讲一下PXE启动的原理以及PXE启动和普通Linux启动的对比。

    2022年6月29日
    23
  • stringstream 的用法介绍[通俗易懂]

    stringstream 的用法介绍[通俗易懂]stringstream主要有两个作用:简化类型转换和一次性读入数据: 一、使用stringstream对象简化类型转换C++标准库中的提供了比ANSIC的更高级的一些功能,即单纯性、类型安全和可扩展性。在本文中,我将展示怎样使用这些库来实现安全和自动的类型转换。为什么要学习如果你已习惯了风格的转换,也许你首先会问:为什么要花额外的精力来学习基于的类型转换呢?也许对下面一

    2022年5月6日
    55
  • 清单程序员修身

    清单程序员修身

    2022年1月10日
    42
  • 微信公众平台开发入门教程[2020版]

    微信公众平台开发入门教程[2020版]在这篇微信公众平台开发教程中,我们假定你已经有了PHP语言程序、MySQL数据库、计算机网络通讯、及HTTP/XML/CSS/JS等基础。我们将使用微信公众账号方倍工作室作为讲解的例子,二维码见左侧。本系列教程将引导你完成如下任务:创建新浪云计算平台应用 启用微信公众平台开发模式 体验常用接收消息及发送消息类型 了解数据收发原理及消息格式第一章申请服务器资源创建新浪云计算应用申请账号我们使用SAE新浪云计算平台作为服务器资源,并且申请PHP环境+MySQL数据库作为程.

    2022年6月6日
    255
  • scrapyip池(ip route命令)

    目录一、中间件的使用1-1具体方法详解1-1-1process_request-正常请求调用1-1-2process_response-正常返回调用1-1-3process_exception-捕获错误调用二、Proxy相关官方中间件2-1HttpProxyMiddleware2-2RetryMiddleware2-2-1源码分析…

    2022年4月15日
    44
  • 什么是提权_怎么防止服务器被渗透提权

    什么是提权_怎么防止服务器被渗透提权当你的才华还撑不起你的野心时那你就应该静下心来学习目录Windows2008server提权之突破系统权限安装shift后门0x01前言0x02主要操作部分0x03主要命令组成部分 Windows2008server提权之突破系统权限安装shift后门0x01前言…

    2022年9月15日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号