源码分析ElasticJob分片机制(带分片机制流程图)

源码分析ElasticJob分片机制(带分片机制流程图)本文将重点分析ElasticJob的分片机制:ElasticJob分片工作机制:1、ElasticJob在启动时,首先会启动是否需要重新分片的监听器。代码见:ListenerManager#startAllListeners{…;shardingListenerManager.start();…}。2、任务执行之前需要获取分片信息,如果需要重新分片,主服务器执行分片算法,其他从…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

本文将重点分析 ElasticJob 的分片机制:

ElasticJob分片工作机制:

  1. ElasticJob在启动时,首先会启动是否需要重新分片的监听器。
    代码见:ListenerManager#startAllListeners {…; shardingListenerManager.start();…}。
  2. 任务执行之前需要获取分片信息,如果需要重新分片,主服务器执行分片算法,其他从服务器等待直到分片完成。
    代码见:AbstractElasticJobExecutor#execute {…; jobFacade.getShardingContexts();…;}

1、分片管理监听器详解

ElasticJob的事件监听管理器实现类为:AbstractListenerManager。

其类图为:
这里写图片描述

  • JobNodeStorage jobNodeStorage:Job node操作API。
    其核心方法:
    • public abstract void start():启动监听管理器,由子类具体实现。
    • protected void addDataListener(TreeCacheListener listener):增加事件监听器。

ElasticJob的选主监听管理器、分片监听器管理器、故障转移监听管理器等都是 AbstractListenerManager 的子类。 分片相关的监听管理器类图如图所示:
这里写图片描述

  • ShardingListenerManager:分片监听管理器。
  • ShardingTotalCountChangedJobListener:监听总分片数量事件管理器,是TreeCacheListener(curator的事件监听器)子类。
  • ListenServersChangedJobListener:任务job服务器数量(运行时实例)发生变化后的事件监听器。

1.1 源码分析ShardingTotalCountChangedJobListener监听器

class ShardingTotalCountChangedJobListener extends AbstractJobListener { 
   
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) { 
   
            if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { 
   
                int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
                if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { 
   
                    shardingService.setReshardingFlag();
                    JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
                }
            }
        }
    }

Jetbrains全家桶1年46,售后保障稳定

job配置的分片总节点数发生变化监听器(ElasticJob允许通过Web界面修改每个任务配置的分片总数量)。

job的配置信息存储在${namespace}/jobname/config节点上,存储内容为json格式的配置信息。

如果${namespace}/jobname/config节点的内容发生变化,zk会触发该节点的节点数据变化事件,如果zk中存储的分片节点数量与内存中的分片数量不相同的话,调用ShardingService设置需要重新分片标记(创建${namespace}/jobname/leader/sharding/necessary持久节点)并更新内存中的分片节点总数。

1.2 源码分析ListenServersChangedJobListener 监听器

class ListenServersChangedJobListener extends AbstractJobListener { 
   
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) { 
   
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) { 
   
                shardingService.setReshardingFlag();
            }
        }
        private boolean isInstanceChange(final Type eventType, final String path) { 
   
            return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
        }
        private boolean isServerChange(final String path) { 
   
            return serverNode.isServerPath(path);
        }
    }

分片节点(实例数)发生变化事件监听器,当新的分片节点加入或原的分片实例宕机后,需要进行重新分片。

当${namespace}/jobname/servers或${namespace}/jobname/instances路径下的节点数量是否发生变化,如果检测到发生变化,设置需要重新分片标识。

2、具体分片逻辑

上面详细分析了分片监听管理器,其职责就是监听特定的 ZK 目录,当发生变化后判断是否需要设置重新分片的标记,如果设置了需要重新分片标记后,在什么时候触发重新分片呢?

每个调度任务在执行之前,首先需要获取分片信息(分片上下文环境),然后根据分片信息从服务器拉取不同的数据,进行任务处理,其源码入口为:AbstractElasticJobExecutor#execute。

jobFacade.getShardingContexts()方法。
这里写图片描述
具体实现方法代码为:LiteJobFacade#getShardingContexts。

public ShardingContexts getShardingContexts() { 
   
        boolean isFailover = configService.load(true).isFailover();     // @1
        if (isFailover) { 
   
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) { 
   
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
        shardingService.shardingIfNecessary();   // @2
        List<Integer> shardingItems = shardingService.getLocalShardingItems(); // @3
        if (isFailover) { 
   
            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
        }
        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));  // @4
        return executionContextService.getJobShardingContext(shardingItems);  // @5
    }

代码@1:是否启动故障转移,本篇重点关注ElasticJob的分片机制,故障转移在下篇文章中详细介绍,本文假定不开启故障转移功能。
代码@2:如果有必要,则执行分片,如果不存在分片信息(第一次分片)或需要重新分片,则执行分片算法,接下来详细分析分片的实现逻辑。
代码@3:获取本地的分片信息。遍历所有分片信息${namespace}/jobname/sharding/{分片item}下所有instance节点,判断其值jobinstanceId是否与当前的jobInstanceId相等,相等则认为是本节点的分片信息。
代码@4:移除本地禁用分片,本地禁用分片的存储目录为${namespace}/jobname
/sharding/{分片item}/disable。
代码@5:返回当前节点的分片上下文环境,这个主要是根据配置信息(分片参数)与当前的分片实例,构建ShardingContexts对象。

2.1 shardingService.shardingIfNecessary 详解【分片逻辑】

/** * 如果需要分片且当前节点为主节点, 则作业分片. * * <p> * 如果当前无可用节点则不分片. * </p> */
    public void shardingIfNecessary() { 
   
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances(); // @1
        if (!isNeedSharding() || availableJobInstances.isEmpty()) { 
     // @2
            return;
        }
        if (!leaderService.isLeaderUntilBlock()) { 
     // @3
            blockUntilShardingCompleted();           //@4
            return;
        }
        waitingOtherJobCompleted();                  // @5
        LiteJobConfiguration liteJobConfig = configService.load(false);
        int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();  // @5
        log.debug("Job '{}' sharding begin.", jobName);
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");     // @6
        resetShardingInfo(shardingTotalCount);  // @7
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());  // @8
        jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));   // @9
        log.debug("Job '{}' sharding complete.", jobName);
    }

代码@1:获取当前可用实例,首先获取\ ${namespace}/jobname/instances目录下的所有子节点,并且判断该实例节点的IP所在服务器是否可用,${namespace}/jobname/servers/ip节点存储的值如果不是DISABLE,则认为该节点可用。
代码@2:如果不需要重新分片(${namespace}/jobname/leader/sharding
/necessary节点不存在)或当前不存在可用实例,则返回。
代码@3,判断是否是主节点,如果当前正在进行主节点选举,则阻塞直到选主完成,阻塞这里使用的代码如下:

while (!hasLeader() && serverService.hasAvailableServers()) { 
      // 如果不存在主节点摈弃有可用的实例,则Thread.sleep()一下,触发一次选主。
            log.info("Leader is electing, waiting for {} ms", 100);
            BlockUtils.waitingShortTime();
            if (!JobRegistry.getInstance().isShutdown(jobName) &&    
                     serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) { 
   
                electLeader();
            }
}
return isLeader();

代码@4:如果当前节点不是主节点,则等待分片结束。分片是否结束的判断依据是${namespace}/jobname/leader/sharding/necessary节点存在或${namespace}/jobname/leader/sharding/processing节点存在(表示正在执行分片操作),如果分片未结束,使用Thread.sleep方法阻塞100毫米后再试。

代码@5:能进入到这里,说明该节点是主节点。主节点在执行分片之前,首先等待该批任务全部执行完毕,判断是否有其他任务在运行的方法是判断是否存在${namespace}/jobname/sharding/{分片item}/running,如果存在,则使用Thread.sleep(100),然后再判断。

代码@6:创建临时节点${namespace}/jobname/leader/sharding/processing节点,表示分片正在执行。

代码@7:重置分片信息。先删除${namespace}/jobname/sharding/{分片item}/instance节点,然后创建${namespace}/jobname/sharding/{分片item}节点(如有必要)。然后根据当前配置的分片总数量,如果当前${namespace}/jobname/sharding子节点数大于配置的分片节点数,则删除多余的节点(从大到小删除)。

代码@8:获取配置的分片算法类,常用的分片算法为平均分片算法(AverageAllocationJobShardingStrategy)。

代码@9:在一个事务内创建 相应的分片实例信息${namespace}/jobname/{分片item}/instance,节点存放的内容为JobInstance实例的ID。

在ZK中执行事务操作:JobNodeStorage#executeInTransaction

/** * 在事务中执行操作. * * @param callback 执行操作的回调 */
    public void executeInTransaction(final TransactionExecutionCallback callback) { 
   
        try { 
   
            CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();  // @1
            callback.execute(curatorTransactionFinal);   // @2
            curatorTransactionFinal.commit();                 //@3
        //CHECKSTYLE:OFF
        } catch (final Exception ex) { 
   
        //CHECKSTYLE:ON
            RegExceptionHandler.handleException(ex);
        }
    }

代码@1,使用CuratorFrameworkFactory的inTransaction()方法,级联调用check(),最后通过and()方法返回CuratorTransactionFinal实例,由该实例执行事务中的所有更新节点命令。然后执行commit()命令统一提交(该方法可以保证要么全部成功,要么全部失败)。

代码@2,通过回调PersistShardingInfoTransactionExecutionCallback方法执行具体的逻辑。

代码@3,提交事务。

代码见ShardingService$PersistShardingInfoTransactionExecutionCallback

class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback { 
   
       private final Map<JobInstance, List<Integer>> shardingResults;
        @Override
        public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception { 
   
            for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) { 
   
                for (int shardingItem : entry.getValue()) { 
   
                    curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), 
                         entry.getKey().getJobInstanceId().getBytes()).and();   // @1
                }
            }
            curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();   // @2
            curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();  // @3
        }
    }

代码@1:所谓的分片,主要是创建${namespace}/jobname/sharding/{分片item}/instance,节点内容为JobInstance ID。

代码@2:删除${namespace}/jobname/leader/sharding/necessary节点。

代码@3:删除${namespace}/jobname/leader/sharding/processing节点,表示分片结束。

下面以一张分片流程图来结束本节的讲述:
这里写图片描述


见文如面,我是威哥,热衷于成体系剖析JAVA主流中间件,关注公众号『中间件兴趣圈』,回复专栏可获取成体系专栏导航,回复资料可以获取笔者的学习思维导图。
在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/222896.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • C语言 字符串分割

    C语言 字符串分割C语言字符串分割一、简述记–字符串分割,strtok()函数的使用例子、自己实现split()函数。二、例子代码#include<stdio.h>#include<string.h>/**函数:split*描述:按指定分隔符分割字符串*参数:* str:要分割的字符串* strLen:要分割…

    2022年6月9日
    28
  • com.mysql.jdbc.Driver 和 com.mysql.cj.jdbc.Driver的区别 serverTimezone设定[通俗易懂]

    com.mysql.jdbc.Driver 和 com.mysql.cj.jdbc.Driver的区别 serverTimezone设定[通俗易懂]com.mysql.jdbc.Driver是mysql-connector-java5中的,com.mysql.cj.jdbc.Driver是mysql-connector-java6中的1,JDBC连接Mysql5com.mysql.jdbc.Driver:driverClassName=com.mysql.jdbc.Driverurl=jdbc:mysql://localho

    2022年6月23日
    25
  • pycharm2022.01.13专业版注册激活-激活码分享

    (pycharm2022.01.13专业版注册激活)最近有小伙伴私信我,问我这边有没有免费的intellijIdea的激活码,然后我将全栈君台教程分享给他了。激活成功之后他一直表示感谢,哈哈~https://javaforall.net/100143.htmlIntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,上面是详细链接哦~747E…

    2022年3月31日
    59
  • kubeadm 常用命令

    kubeadm 常用命令kubeadm 概述 Kubeadm 是一个工具 它提供了 kubeadminit 以及 kubeadmjoin 这两个命令作为快速创建 kubernetes 集群的最佳实践 安装官方参考 kuadmin 安装任务 kubeadminit 启动引导一个 Kubernetes 主节点 kubeadmjoin 启动引导一个 Kubernetes 工作节点并且将其加入到集群 kubeadmupgra 更新 Kubernetes 集群到新版本 kubeadmconfi 如果你使用 kube

    2025年9月17日
    3
  • 机器学习中最常见的四种分类模型

    机器学习中最常见的四种分类模型点击蓝字关注我,有干货领取!作者:JasonBrownlee翻译:候博学前言机器学习是一个从训练集中学习出算法的研究领域。分类是一项需要使用机器学习算法的任务,该算法学习如何为数据集…

    2022年10月5日
    2
  • windows7添�windows2008R2域配置

    windows7添�windows2008R2域配置

    2021年11月15日
    45

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号