datax(12):调度源码解读AbstractScheduler「建议收藏」

datax(12):调度源码解读AbstractScheduler「建议收藏」datax的jobContainer最终会通过调度周期性的执行,今天把它看完;一、基类AbstractScheduler概述类继承关系全部方法二、AbstractScheduler的主要属性和方法1、主要属性/***脏数据行数检查器,用于运行中随时检查脏数据是否超过限制(脏数据行数,或脏数据百分比)*/privateErrorRecordCheckererrorLimit;/***积累容器通讯器,来处理JobContainer、Tas.

大家好,又见面了,我是你们的朋友全栈君。

datax的jobContainer最终会通过调度周期性的执行,今天把它看完;


一、基类AbstractScheduler概述

类继承关系
在这里插入图片描述

全部方法
在这里插入图片描述


二、AbstractScheduler的主要属性和方法

1、主要属性

  /** * 脏数据行数检查器,用于运行中随时检查脏数据是否超过限制(脏数据行数,或脏数据百分比) */
  private ErrorRecordChecker errorLimit;

  /** * 积累容器通讯器,来处理JobContainer、TaskGroupContainer和Task的通讯 */
  private AbstractContainerCommunicator containerCommunicator;

2、主要方法

  /** * 默认调度执行方法 <br> * 1 传入多个调度配置,获取报告时间+休息时间+jobId(赋值给全局jobId),生成错误记录检查类 * 2 给全局jobId赋值,生成错误记录检查类,生成容器通讯类(反馈任务信息) * 3 根据入参计算task的数量,开始所有taskGroup * * @param cfg List<Configuration> */
  public void schedule(List<Configuration> cfg) { 
   
    xxx
}
  /** * 开始所有的taskGroup,只允许本包的类访问 * * @param configurations List<Configuration> */
  protected abstract void startAllTaskGroup(List<Configuration> configurations);
 

三、谁调用AbstractScheduler的schedule

从JobContainer.schedule调用AbstractScheduler.schedule


四、schedule和startAllTaskGroup方法解析

schedule方法主要在AbstractScheduler实现
运行时序图

在这里插入图片描述

  /** * 默认调度执行方法 <br> * 1 传入多个调度配置,获取报告时间+休息时间+jobId(赋值给全局jobId),生成错误记录检查类 * 2 给全局jobId赋值,生成错误记录检查类,生成容器通讯类(反馈任务信息) * 3 根据入参计算task的数量,开始所有taskGroup * * @param cfg List<Configuration> */
  public void schedule(List<Configuration> cfg) { 
   
    Validate.notNull(cfg, "scheduler配置不能为空");
    int reportMillSec = cfg.get(0).getInt(DATAX_CORE_CONTAINER_JOB_REPORTINTERVAL, 30000);
    int sleepMillSec = cfg.get(0).getInt(DATAX_CORE_CONTAINER_JOB_SLEEPINTERVAL, 10000);

    this.jobId = cfg.get(0).getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
    errorLimit = new ErrorRecordChecker(cfg.get(0));
    //给 taskGroupContainer 的 Communication 注册
    this.containerCommunicator.registerCommunication(cfg);
    int taskCnt = calculateTaskCount(cfg);
    startAllTaskGroup(cfg);
    Communication lastComm = new Communication();
    long lastReportTimeStamp = System.currentTimeMillis();
    try { 
   
      while (true) { 
   
        /** * step 1: collect job stat * step 2: getReport info, then report it * step 3: errorLimit do check * step 4: dealSucceedStat(); * step 5: dealKillingStat(); * step 6: dealFailedStat(); * step 7: refresh last job stat, and then sleep for next while * * above steps, some ones should report info to DS * */
        Communication nowComm = this.containerCommunicator.collect();
        nowComm.setTimestamp(System.currentTimeMillis());
        LOG.debug(nowComm.toString());

        //汇报周期
        long now = System.currentTimeMillis();
        if (now - lastReportTimeStamp > reportMillSec) { 
   
          Communication comm = CommunicationTool.getReportCommunication(nowComm, lastComm, taskCnt);

          this.containerCommunicator.report(comm);
          lastReportTimeStamp = now;
          lastComm = nowComm;
        }

        errorLimit.checkRecordLimit(nowComm);
        if (nowComm.getState() == State.SUCCEEDED) { 
   
          LOG.info("Scheduler accomplished all tasks.");
          break;
        }

        if (isJobKilling(this.getJobId())) { 
   
          dealKillingStat(this.containerCommunicator, taskCnt);
        } else if (nowComm.getState() == State.FAILED) { 
   
          dealFailedStat(this.containerCommunicator, nowComm.getThrowable());
        }
        Thread.sleep(sleepMillSec);
      }
    } catch (InterruptedException e) { 
   
      // 以 failed 状态退出
      LOG.error("捕获到InterruptedException异常!", e);
      throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
    }
  }

startAllTaskGroup方法在ProcessInnerScheduler实现
运行时序图

在这里插入图片描述


  /** * 1、创建线程池 <br/> * 2、变量传入的cfgs,生成tgRunner,然后线程池执行 <br/> * 3、线程池关闭 <br/> * * @param cfgs List<Configuration> */
  @Override
  public void startAllTaskGroup(List<Configuration> cfgs) { 
   
    this.taskGroupContainerExecutorService = new ThreadPoolExecutor(cfgs.size(), cfgs.size(),
        0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    for (Configuration taskGroupCfg : cfgs) { 
   
      TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupCfg);
      this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
    }
    this.taskGroupContainerExecutorService.shutdown();
  }

注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/145542.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Oracle数据库数据恢复方法

    Oracle数据库数据恢复方法前一段遇到:操作系统崩溃,数据库留有所有数据文件,控制文件,配置文件。下面记录一下数据恢复过程。1.下载安装Oracle数据库。安装的数据库定要与之前版本的数据库版本一致。安装的数据库位置要与之前安装的数据库位置一致。2.创建新的数据库。创建新的数据库实例,实例名称密码要与之前数据库实例一致。3.打开sqlPlus命令行(1)输入命令system/密码assysdba。(2)shutdownimmediate;关闭数据库。4.使用原本的数据文件替换到新的数据

    2022年7月17日
    23
  • 报错:8000401a 因为配置标识不正确,系统无法开始服务器进程。请检查用户名和密码

    报错:8000401a 因为配置标识不正确,系统无法开始服务器进程。请检查用户名和密码8000401a因为配置标识不正确,系统无法开始服务器进程。请检查用户名和密码服务器OS:windowsserver2012R2解决方案:运行dcomcnfg,依次打开计算机->我

    2022年7月2日
    23
  • 企业微信通讯录回调密文解析及微信支付回调密文解析异常处理

    企业微信通讯录回调密文解析及微信支付回调密文解析异常处理企业微信通讯录回调密文解析及微信支付回调密文解析异常处理产生异常表现:javax.crypto.IllegalBlockSizeException:Inputlengthmustbemultipleof16whendecryp原因:因为某些国家的进口管制限制,Java发布的运行环境包中的加解密有一定的限制解决方案:替换jdk和jre中的local_policy.jar,US_export_policy.jar具体目录:1、jre目录/lib/security/policy/

    2022年5月18日
    40
  • 如何理解cicd

    如何理解cicd项目开发可以分为这几个过程编码->构建->集成->测试->交付->部署首先引用一个例子譬如说,你开了一家公司,雇了很多码农在一起写代码。你说,要用Gitlab做代码管理。当一个码农在自己的开发机上写好代码之后,要合并到主分支里,他首先要发起一个MergeRequest(MR),这会在一个特定服务器上触发一次对他提交的代码的检查,包括代码格式检查、依赖关系检查以及单元测试等一系列检查,等通过了全部检查,他就可以将代码合并到主分支,否则他需

    2022年5月7日
    50
  • 网络编程——UDP编程

    网络编程——UDP编程一、网络编程基础1.常用协议:IP协议;TCP协议;UDP协议;2.什么是Socket?二、服务器端的代码实现三、客户端的代码实现1.区别2.易混淆知识点四.代码实现五.最后小结

    2022年9月8日
    1
  • 【教程】Spring+Mybatis环境配置多数据源

    一、简要概述在做项目的时候遇到需要从两个数据源获取数据,项目使用的Spring + Mybatis环境,看到网上有一些关于多数据源的配置,自己也整理学习一下,然后自动切换实现从不同的数据源获取数据功能。二、代码详解2.1 DataSourceConstants 数据源常量类/** * 数据源名称常量类 * 对应 application.xml 中 bean multipleDataSo…

    2022年2月27日
    29

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号