跟着实例学习ZooKeeper的用法: 计数器[通俗易懂]

跟着实例学习ZooKeeper的用法: 计数器

大家好,又见面了,我是全栈君。

这一篇文章我们将学习使用Curator来实现计数器。 顾名思义,计数器是用来计数的, 利用ZooKeeper可以实现一个集群共享的计数器。 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数,一个用long来计数。

SharedCount

这个类使用int类型来计数。 主要涉及三个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表计数器, 可以为它增加一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值, 包括字面值和带版本信息的值VersionedValue。

例子代码:

package com.colobu.zkrecipe.counter;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;

import com.google.common.collect.Lists;

public class SharedCounterExample implements SharedCountListener{
    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterExample example = new SharedCounterExample();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        count.start();
                        Thread.sleep(rand.nextInt(10000));
                        System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                        return null;
                    }
                };
                service.submit(task);
            }



            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }


    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);        
    }

}

在这个例子中,我们使用baseCount来监听计数值(addListener方法)。 任意的SharedCount, 只要使用相同的path,都可以得到这个计数值。 然后我们使用5个线程为计数值增加一个10以内的随机数。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

这里我们使用trySetCount去设置计数器。 第一个参数提供当前的VersionedValue,如果期间其它client更新了此计数值, 你的更新可能不成功, 但是这时你的client更新了最新的值,所以失败了你可以尝试再更新一次。 而setCount是强制更新计数器的值。

注意计数器必须start,使用完之后必须调用close关闭它。

在这里再重复一遍前面讲到的, 强烈推荐你监控ConnectionStateListener, 尽管我们的有些例子没有监控它。 在本例中SharedCountListener扩展了ConnectionStateListener。 这一条针对所有的Curator recipes都适用,后面的文章中就不专门提示了。

DistributedAtomicLong

再看一个Long类型的计数器。 除了计数的范围比SharedCount大了之外, 它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex方式来更新计数值。 还记得InterProcessMutex是什么吗? 它是我们前面跟着实例学习ZooKeeper的用法: 分布式锁 讲的分布式可重入锁。 这和上面的计数器的实现有显著的不同。

可以从它的内部实现DistributedAtomicValue.trySet中看出端倪。

    AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一系列的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增加特定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须检查返回结果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。

我们下面的例子中使用5个线程对计数器进行加一操作,如果成功,将操作前后的值打印出来。

package com.colobu.zkrecipe.counter;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;

import com.google.common.collect.Lists;

public class DistributedAtomicLongExample {
    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            List<DistributedAtomicLong> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        try {
                            //Thread.sleep(rand.nextInt(1000));
                            AtomicValue<Long> value = count.increment();
                            //AtomicValue<Long> value = count.decrement();
                            //AtomicValue<Long> value = count.add((long)rand.nextInt(20));
                            System.out.println("succeed: " + value.succeeded());
                            if (value.succeeded())
                                System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

                        return null;
                    }
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        }

    }

}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/108481.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • paip.提高工作效率–数据绑定到table原则和过程Angular js jquery实现

    paip.提高工作效率–数据绑定到table原则和过程Angular js jquery实现

    2022年1月3日
    53
  • ExecutorService 的理解与使用「建议收藏」

    ExecutorService 的理解与使用「建议收藏」接口java.util.concurrent.ExecutorService表述了异步执行的机制,并且可以让任务在后台执行。壹個ExecutorService实例因此特别像壹個线程池。事实上,在java.util.concurrent包中的ExecutorService的实现就是壹個线程池的实现。ExecutorService样例这里有壹個简单的使用Java实现的

    2022年9月10日
    3
  • Cpu指令重排_cpu的指令集

    Cpu指令重排_cpu的指令集Cpu为了提高效率会对指令进行重排序,以适合cpu的顺序运行。但是指令重排会遵守As-if-serial的规则,就是所有的动作(Action)都可以为了优化而被重排序,但是必须保证它们重排序后的结果和程序代码本身的应有结果是一致的。所以这种情况在单线程中不会出现什么问题。而对于多线程,这个规则就失效了,所以可能会导致结果出现问题。解决办法就是内存屏障,也叫内存栅栏。是一种屏…

    2022年10月17日
    2
  • matlab手写数字识别实验报告_如何用matlab将图像转为矩阵

    matlab手写数字识别实验报告_如何用matlab将图像转为矩阵本文主要是根据《matlab手写神经网络实现识别手写数字》博客中的代码进行试验。由于没有数据集,所以采用了MNIST数据集进行代码的运行。数据集不同所以需要对代码进行微小改动。简介数据处理:4000张作为训练样本,1000张作为测试样本;图像大小:图片的灰度值矩阵(28,28);图像名称:由标签和顺序号组成。标签_顺序号.bmp训练样本:每个数字的图像名称的顺序号是从0-399,各400…

    2022年9月14日
    3
  • 服务器pfx文件如何导入,linux 导入pfx 证书

    服务器pfx文件如何导入,linux 导入pfx 证书linux导入pfx证书内容精选换一换单击“开始”,运行框输入“MMC”,回车。在MMC控制台菜单栏中单击“文件”,选择“添加/删除管理单元”。在“添加或删除管理单元”对话框,选择“可用管理单元”区域的“证书”。单击“添加”添加证书。在“证书管理”对话框,选择“计算机账户”,单击“下一步”。在“选择计算机”对话框,单击“完成”。在“添加或删除管理单元”对话框,单击“确定”。在单击“开始”,运…

    2022年5月1日
    149
  • 图书销售管理系统概要设计,系统数据结构设计分工

    图书销售管理系统概要设计,系统数据结构设计分工完成人:千城墨白(20160401095)无法无天(20160401115)系统数据结构设计1、逻辑结构设计要点2、物理结构设计要点(1)系统所用到所有数据均存在服务器端,存于SQLServer服务器中;(2)系统界面的显示属性,如字体属性,样式等使用CSS统一界面;(3)系统界面中使用的相关图片需要保存在服务…

    2022年5月13日
    40

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号