datax(6):启动步骤解析

datax(6):启动步骤解析通过前面datax(2):通过idea搭建源码阅读+调试环境已经知道了idea下阅读源码的步骤,现在看下DataX启动步骤解析一,启动java类(主入口)/***Engine是DataX入口类,该类负责初始化Job或者Task的运行容器,并运行插件的Job或者Task逻辑*/com.alibaba.datax.core.Engine二,启动的步骤1、解析配置,包括job.json、core.json、plugin.json三个配置2、设置jobId到config.

大家好,又见面了,我是你们的朋友全栈君。

通过前面 datax(2): 通过idea搭建源码阅读+调试环境 已经知道了idea下阅读源码的步骤,现在看下 DataX启动步骤解析


一、启动java类(主入口)

/**
 * Engine是DataX入口类,该类负责初始化Job或者Task的运行容器,并运行插件的Job或者Task逻辑
 */
com.alibaba.datax.core.Engine

二、启动的步骤


1、解析用户输入的参数: job(datax的json),jobId(默认-1,),running_mode

2、配置额外的参数,打印vm的信息,打印过滤的配置信息(过滤敏感字符),校验配置

3、配置conf传入Engine.start(),启动程序

4、绑定字段信息,初始化插件加载器

5、判断任务类型(taskGroup还是job),生成不同的container(JobContainer或TaskGroupContainer)

6、打开各种追踪器,报告器,用于任务的运行状况收集

7、container.start() 对应的容器开始运行任务

Datax的执行过程
在这里插入图片描述

过程详细说明如下:

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

简单总结过程如下:

一个DataX Job会切分成多个Task,每个Task会按TaskGroup进行分组,一个Task内部会有一组Reader->Channel->Writer。Channel是连接Reader和Writer的数据交换通道,所有的数据都会经由Channel进行传输


三、启动时序图

在这里插入图片描述


四、主要方法

在这里插入图片描述


五、代码详细

package com.alibaba.datax.core;

**
 * Engine是DataX入口类,该类负责初始化Job或者Task的运行容器,并运行插件的Job或者Task逻辑
 */
public class Engine { 
   

  private static final Logger LOG = LoggerFactory.getLogger(Engine.class);

  private static String RUNTIME_MODE;

  /**
   * 真正开始执行任务的地方 check job model (job/task) first
   *
   * @param allConf Configuration
   */
  public void start(Configuration allConf) { 
   

    // 绑定column转换信息
    ColumnCast.bind(allConf);

    //初始化PluginLoader,可以获取各种插件配置
    LoadUtil.bind(allConf);

    boolean isJob = !("taskGroup"
        .equalsIgnoreCase(allConf.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
    //JobContainer会在schedule后再行进行设置和调整值
    int channelNumber = 0;
    AbstractContainer container;
    long instanceId;
    int taskGroupId = -1;
    if (isJob) { 
   
      allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
      container = new JobContainer(allConf);
      instanceId = allConf.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
    } else { 
   
      container = new TaskGroupContainer(allConf);
      instanceId = allConf.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
      taskGroupId = allConf.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
      channelNumber = allConf.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
    }

    //缺省打开perfTrace
    boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
    boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);

    //standalone 模式的 datax shell任务不进行汇报
    if (instanceId == -1) { 
   
      perfReportEnable = false;
    }

    int priority = 0;
    try { 
   
      priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
    } catch (NumberFormatException e) { 
   
      LOG.warn("priority set to 0, because NumberFormatException, the value is: {}",
          System.getProperty("PROIORY"));
    }

    Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
    //初始化PerfTrace
    PerfTrace perfTrace = PerfTrace
        .getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
    perfTrace.setJobInfo(jobInfoConfig, perfReportEnable, channelNumber);
    container.start();
  }


  /**
   * 过滤job配置信息
   *
   * @param configuration Configuration
   * @return String
   */
  public static String filterJobConfiguration(final Configuration configuration) { 
   
    Configuration jobConfWithSetting = configuration.getConfiguration("job").clone();
    Configuration jobContent = jobConfWithSetting.getConfiguration("content");
    jobConfWithSetting.set("content", filterSensitiveConfiguration(jobContent));
    return jobConfWithSetting.beautify();
  }

  /**
   * 屏蔽敏感信息
   *
   * @param conf Configuration
   * @return Configuration
   */
  public static Configuration filterSensitiveConfiguration(Configuration conf) { 
   
    Set<String> keys = conf.getKeys();
    for (final String key : keys) { 
   
      boolean isSensitive =
          endsWithIgnoreCase(key, "password") || endsWithIgnoreCase(key, "accessKey");
      if (isSensitive && conf.get(key) instanceof String) { 
   
        conf.set(key, conf.getString(key).replaceAll(".", "*"));
      }
    }
    return conf;
  }

  /**
   * @param args String[]
   * @throws Throwable
   */
  public static void entry(final String[] args) throws Throwable { 
   
    Options options = new Options();
    options.addOption("job", true, "Job config.");
    options.addOption("jobid", true, "Job unique id.");
    options.addOption("mode", true, "Job runtime mode.");

    BasicParser parser = new BasicParser();
    CommandLine cl = parser.parse(options, args);
    String jobPath = cl.getOptionValue("job");
    // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
    String jobIdString = cl.getOptionValue("jobid");
    RUNTIME_MODE = cl.getOptionValue("mode");
    Configuration conf = ConfigParser.parse(jobPath);
    long jobId;
    String defaultJobId = "-1";
    if (!defaultJobId.equals(jobIdString)) { 
   
      //  如果jobId相同,会怎样?
      jobId = Long.parseLong(jobIdString);
    } else { 
   
      // 如果用户没有指定jobId,或jobId==1,执行后面逻辑
      // only for dsc & ds & datax 3 update
      String dscJobUrlPatternStr = "/instance/(\\d{1,})/config.xml";
      String dsJobUrlPatternStr = "/inner/job/(\\d{1,})/config";
      String dsTaskGroupUrlPatternStr = "/inner/job/(\\d{1,})/taskGroup/";
      List<String> patterns = Arrays
          .asList(dscJobUrlPatternStr, dsJobUrlPatternStr, dsTaskGroupUrlPatternStr);
      jobId = parseJobIdFromUrl(patterns, jobPath);
    }

    boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
    if (!isStandAloneMode && jobId == -1) { 
   
      // 如果不是 standalone 模式,那么 jobId 一定不能为-1
      throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
          "非 standalone 模式必须在 URL 中提供有效的 jobId.");
    }
    conf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
    //打印vmInfo
    VMInfo vmInfo = VMInfo.getVmInfo();
    if (vmInfo != null) { 
   
      LOG.info(vmInfo.toString());
    }

    LOG.info("\n" + filterJobConfiguration(conf) + "\n");
    LOG.debug(conf.toJSON());
    ConfigurationValidate.doValidate(conf);
    Engine engine = new Engine();
    engine.start(conf);
  }


  /**
   * -1 表示未能解析到 jobId
   * <p>
   * only for dsc & ds & datax 3 update
   */
  private static long parseJobIdFromUrl(List<String> patternStringList, String url) { 
   
    long result = -1;
    for (String patternString : patternStringList) { 
   
      result = doParseJobIdFromUrl(patternString, url);
      if (result != -1) { 
   
        return result;
      }
    }
    return result;
  }

  private static long doParseJobIdFromUrl(String patternString, String url) { 
   
    Pattern pattern = Pattern.compile(patternString);
    Matcher matcher = pattern.matcher(url);
    if (matcher.find()) { 
   
      return Long.parseLong(matcher.group(1));
    }
    return -1;
  }

  public static void main(String[] args) { 
   
    int exitCode = 0;
    try { 
   
      Engine.entry(args);
    } catch (Throwable e) { 
   
      exitCode = 1;
      LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n {}", ExceptionTracker.trace(e));
      if (e instanceof DataXException) { 
   
        DataXException tempException = (DataXException) e;
        ErrorCode errorCode = tempException.getErrorCode();
        if (errorCode instanceof FrameworkErrorCode) { 
   
          FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode;
          exitCode = tempErrorCode.toExitValue();
        }
      }
      System.exit(exitCode);
    }
    System.exit(exitCode);
  }

}

注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/145463.html原文链接:https://javaforall.net

(0)
上一篇 2022年5月17日 下午12:00
下一篇 2022年5月17日 下午12:00


相关推荐

  • 小强(为什么叫打不死的小强)

    作为科研人员,经常需要下载文献。sci-hub大家应该都比较熟悉,我就不过多介绍了!自从11月20号,小伙伴们陆续反馈sci-hub无法访问了11月21日,sci-hub官方发布,通过修改dns为80.82.77.83和80.82.77.84可以访问sci-hub.cc(ac)不多说

    2022年4月18日
    66
  • 什么是MVC三层架构?「建议收藏」

    什么是MVC三层架构?「建议收藏」1.1、什么是MVC MVC是模型(Model)、视图(View)、控制器(Controller)的简写,是一种软件设计规范。 是将业务逻辑、数据、显示分离的方法来组织代码。 MVC主要作用是降低了视图与业务逻辑间的双向偶合。 MVC不是一种设计模式,MVC是一种架构模式。当然不同的MVC存在差异。 Model(模型):数据模型,提供要展示的数据,因此包含数据和行为,可以认为是领域模型或JavaBean组件(包含数据和行为),不过现在一般都分离开来:ValueObj

    2022年6月25日
    27
  • HTML如何实现页面跳转(html跳转到指定页面)

    1、html中使用meta中跳转,通过meta可以设置跳转时间和页面&lt;head&gt;&lt;!–只是刷新不跳转到其他页面–&gt;&lt;metahttp-equiv="refresh"content="5"&gt;&lt;!–定时转到其他页面–&gt;&lt;metahttp-equiv="refresh&quot

    2022年4月11日
    2.0K
  • java 添加盲水印_图片加数字盲水印

    java 添加盲水印_图片加数字盲水印本文通过一个的实验 简要介绍频域手段添加数字盲水印的方法 并进一步验证其抗攻击性 在上述实验的基础上 总结躲避数字盲水印的方法 多图预警 本文分为五个部分 第一部分综述 第二部分频域数字盲水印制作原理介绍 第三部分盲水印攻击性实验 第四部分总结 第五部分附录 源代码 一 综述本文提供的一种实现 阿里通过肉眼无法识别的标识码追踪员工 的技术手段 通过看其他答主的分析 阿里可能还没用到频域加水印的

    2026年3月20日
    3
  • hashmap线程安全吗 什么解决方案_hashtable为什么是线程安全

    hashmap线程安全吗 什么解决方案_hashtable为什么是线程安全前言该试题从互联网获得,真实性没有考究,加上本人学识浅薄,所以面试题参考为主,解析分享为主。若对解析有不同看法,还请评论指正。谢谢。HashMap为什么不是线程安全?以JDK1.8的HashMap为例,引用作者:一字马胡所写文章中的一张图:上图为…

    2026年4月13日
    4
  • pycharm中pip install如何使用_pycharm使用pip安装第三方库

    pycharm中pip install如何使用_pycharm使用pip安装第三方库pyCharm如何使用通过CMDpipinstall方式安装的依赖包前言将pyCharm的解释器设置成Python解释器的方法前言在window下通过cmd(win+r打开运行,然后输入cmd,按下回车即可打开)方式来安装Python依赖包是一种方便快捷的方式。但是往往很多时候由于pyCharm与Python安装在不同的路径,即使你已经用cmdpipinstall的方式在你的电…

    2022年8月26日
    18

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号