datax(11):源码解读 ContainerCommunicator

datax(11):源码解读 ContainerCommunicator前面看了datax的通讯类communication,现在看看在他之上包装的一个容器通信类ContainerCommunicator一、抽象基类AbstractContainerCommunicatordataX中提供了一个基类 AbstractContainerCommunicator来处理JobContainer、TaskGroupContainer和Task的通讯。AbstractContainerCommunicator提供了注册、收集信息等接口,信息的单位是Communication.

大家好,又见面了,我是你们的朋友全栈君。

前面看了datax的 通讯类communication,现在看看在他之上包装的一个容器通信类ContainerCommunicator


一、抽象基类AbstractContainerCommunicator

dataX中提供了一个基类 AbstractContainerCommunicator来处理JobContainer、TaskGroupContainer和Task的通讯。AbstractContainerCommunicator提供了注册、收集信息等接口,信息的单位是Communication。

类继承关系
在这里插入图片描述

类的主要方法

在这里插入图片描述


二、AbstractContainerCommunicator的主要两个属性

AbstractContainerCommunicator主要将其功能委托给2个属性:
private AbstractCollector collector;
private AbstractReporter reporter;

1、AbstractCollector collector

Collector负责管理下级注册到上级,搜集并合并下级所有的信息。 dataX提供一个基类AbstractCollector和一个实现类ProcessInnerCollector

在这里插入图片描述

1.1 AbstractCollector同时包含将Task注册到TaskGroupContainer(registerTaskCommunication方法)和将TaskGroupContainer注册到JobContainer(registerTGCommunication方法)的功能。具体如下:
  • taskCommunicationMap属性用于保存Task注册到TaskGroupContainer,当Task注册到TaskGroupContainer的时候将TaskId和新建的Communication对象保存进taskCommunicationMap即可(在registerTaskCommunication方法中)。

  /** * Task注册到TaskGroupContainer * * @param taskConfigurationList List<Configuration> */
  public void registerTaskCommunication(List<Configuration> taskConfigurationList) { 
   
    for (Configuration taskConfig : taskConfigurationList) { 
   
      int taskId = taskConfig.getInt(CoreConstant.TASK_ID);
      this.taskCommunicationMap.put(taskId, new Communication());
    }
  }
  • TaskGroupContainer注册到JobContainer注册信息则是保存在全局变量LocalTGCommunicationManager中,便于全局访问(在registerTGCommunication方法中)。
  /** * 将TaskGroupContainer注册到JobContainer * * @param taskGroupConfigurationList List<Configuration> */
  public void registerTGCommunication(List<Configuration> taskGroupConfigurationList) { 
   
    for (Configuration config : taskGroupConfigurationList) { 
   
      int taskGroupId = config.getInt(
          CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
      LocalTGCommunicationManager.registerTaskGroupCommunication(taskGroupId, new Communication());
    }
  }

此外AbstractCollector#collectFromTask提供搜集所有任务信息的功能;


    /** * 搜集所有任务信息的功能 * @return Communication */
  public Communication collectFromTask() { 
   
    Communication communication = new Communication();
    communication.setState(State.SUCCEEDED);

    for (Communication taskCommunication :
        this.taskCommunicationMap.values()) { 
   
      communication.mergeFrom(taskCommunication);
    }
    return communication;
  }

1.2 实现类ProcessInnerCollector只实现 了一个方法collectFromTaskGroup,collectFromTaskGroup提供搜集所有TaskGroupContainer的信息。
    @Override
    public Communication collectFromTaskGroup() { 
   
        return LocalTGCommunicationManager.getJobCommunication();
    }


2、AbstractReporter reporter

Reporter的主要功能是将收集到的信息上报给上级。dataX提供一个基类AbstractReporter和一个实现类ProcessInnerCollector.

类继承关系
在这里插入图片描述

主要方法
在这里插入图片描述

2.1 ProcessInnerCollector#reportJobCommunication将job信息汇报给上级,job在dataX中是最上级,所以该方法没有操作。
  • ProcessInnerCollector#reportTGCommunication将TaskGroupContianer的信息汇报给上级,操作也很简单直接更新注册时分配给该TaskGroup的Communication(Map中的值)
public class ProcessInnerReporter extends AbstractReporter { 
   

  @Override
  public void reportJobCommunication(Long jobId, Communication communication) { 
   
    // do nothing
  }

  /** * 将TaskGroupContianer的信息汇报给上级,操作也很简单直接更新注册时分配给该TaskGroup的Communication(Map中的值) * * @param taskGroupId Integer * @param communication Communication */
  @Override
  public void reportTGCommunication(Integer taskGroupId, Communication communication) { 
   
    LocalTGCommunicationManager.updateTaskGroupCommunication(taskGroupId, communication);
  }
}

三、 AbstractContainerCommunicator的实现类

1、StandAloneJobContainerCommunicator

StandAloneJobContainerCommunicator是AbstractContainerCommunicator一个实现类,主要处理JobContainer和TaskGroupContainer之间的信息传递。

  1. 每个TaskGroupContainer通过StandAloneJobContainerCommunicator#registerCommunication注册
  2. 注册之后TaskGroupContainer每隔一段时间通过StandAloneJobContainerCommunicator#Reporter#report向JobContainer发送自己的状态。
  3. JobContainer每隔一段时间通过StandAloneJobContainerCommunicator#collect获取TaskGroup的信息。最后调用StandAloneJobContainerCommunicator#report向上级汇报,这里JobContainer已经是最上级了,向日志中输出先关信息即可。
/** * 主要处理JobContainer和TaskGroupContainer之间的信息传递 */
public class StandAloneJobContainerCommunicator extends AbstractContainerCommunicator { 
   

  private static final Logger LOG = LoggerFactory
      .getLogger(StandAloneJobContainerCommunicator.class);

  public StandAloneJobContainerCommunicator(Configuration cfg) { 
   
    super(cfg);
    super.setCollector(new ProcessInnerCollector(cfg.getLong(DATAX_CORE_CONTAINER_JOB_ID)));
    super.setReporter(new ProcessInnerReporter());
  }

  @Override
  public void registerCommunication(List<Configuration> configurationList) { 
   
    super.getCollector().registerTGCommunication(configurationList);
  }

  /** * JobContainer每隔一段时间 主动 获取TaskGroup的信息。最后调用本类的#report向上级汇报, * 这里JobContainer已经是最上级了,向日志中输出先关信息即可 * @return */
  @Override
  public Communication collect() { 
   
    return super.getCollector().collectFromTaskGroup();
  }

  @Override
  public State collectState() { 
   
    return this.collect().getState();
  }

  /** * 和 DistributeJobContainerCollector 的 report 实现一样 * 每隔一段时间向JobContainer 主动 发送自己的状态 */
  @Override
  public void report(Communication communication) { 
   
    super.getReporter().reportJobCommunication(super.getJobId(), communication);
    LOG.info(CommunicationTool.Stringify.getSnapshot(communication));
    reportVmInfo();
  }

  @Override
  public Communication getCommunication(Integer taskGroupId) { 
   
    return super.getCollector().getTGCommunication(taskGroupId);
  }

  @Override
  public Map<Integer, Communication> getCommunicationMap() { 
   
    return super.getCollector().getTGCommunicationMap();
  }
}

2、AbstractTGContainerCommunicator

AbstractTGContainerCommunicator是AbstractContainerCommunicator的另一个抽象实现类,

/** * 该类是用于处理 taskGroupContainer 的 communication 的收集汇报的父类 * 主要是 taskCommunicationMap 记录了 taskExecutor 的 communication 属性 * 主要处理TaskGroupContainer和Task之间的信息 */
public abstract class AbstractTGContainerCommunicator extends AbstractContainerCommunicator { 
   

    protected long jobId;

    /** * 由于taskGroupContainer是进程内部调度 * 其registerCommunication(),getCommunication(), * getCommunications(),collect()等方法是一致的 * 所有TG的Collector都是ProcessInnerCollector */
    protected int taskGroupId;

    public AbstractTGContainerCommunicator(Configuration configuration) { 
   
        super(configuration);
        this.jobId = configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
        super.setCollector(new ProcessInnerCollector(this.jobId));
        this.taskGroupId = configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
    }

    @Override
    public void registerCommunication(List<Configuration> configurationList) { 
   
        super.getCollector().registerTaskCommunication(configurationList);
    }

    @Override
    public final Communication collect() { 
   
        return this.getCollector().collectFromTask();
    }

    @Override
    public final State collectState() { 
   
        Communication communication = new Communication();
        communication.setState(State.SUCCEEDED);

        for (Communication taskCommunication :
                super.getCollector().getTaskCommunicationMap().values()) { 
   
            communication.mergeStateFrom(taskCommunication);
        }

        return communication.getState();
    }

    @Override
    public final Communication getCommunication(Integer taskId) { 
   
        Validate.isTrue(taskId >= 0, "注册的taskId不能小于0");

        return super.getCollector().getTaskCommunication(taskId);
    }

    @Override
    public final Map<Integer, Communication> getCommunicationMap() { 
   
        return super.getCollector().getTaskCommunicationMap();
    }

}

从类的继承实现看最终实现类是StandaloneTGContainerCommunicator, 该类主要处理TaskGroupContainer和Task之间的信息,处理逻辑和StandAloneJobContainerCommunicator差不多

/** * 独立模式的taskGroup 的通讯类 主要处理TaskGroupContainer和Task之间的信息,处理逻辑和StandAloneJobContainerCommunicator差不多 */
public class StandaloneTGContainerCommunicator extends AbstractTGContainerCommunicator { 
   

  /** * 单机版的容器沟通者(独立模式的taskGroup 的通讯类) * * @param configuration */
  public StandaloneTGContainerCommunicator(Configuration configuration) { 
   
    super(configuration);
    super.setReporter(new ProcessInnerReporter());
  }

  @Override
  public void report(Communication communication) { 
   
    super.getReporter().reportTGCommunication(super.taskGroupId, communication);
  }

}

注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/145375.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • c#程序调试(如何使用debug调试)

    c#的Debug.WriteLine()和Trace.WriteLine()有没有例子?1.区别://输出跟踪信息Trace.WriteLine()将有关跟踪的信息写入Listeners集合中的跟踪侦听器在调试和release模式都输出!默认是写到输出窗口的,也可以指定写到指定文件,或系统日志中!//输出调试信息Debug.WriteLine()将有关调试的信息写入Listener

    2022年4月17日
    92
  • pycharm的安装选项_tomcat环境配置

    pycharm的安装选项_tomcat环境配置pycharm安装以及其环境的配置说明此次我们使用win10系统安装pycharm的64位社区版,并且对Anaconda3中自带的Python3进行环境的配置,如果您没有Anaconda3甚至是没有Python3环境,可以参考Anaconda3安装教程及说明,如果您的pip源未更改,这里推荐您改为使用国内的pip源,这样可以更快的下载组件,方法见修改pip源至国内镜像网站。教程从开始菜单中找到你的AnacondaPrompt并打开…

    2022年8月29日
    0
  • linux重启nginx服务_centos7重启服务器命令

    linux重启nginx服务_centos7重启服务器命令连接服务器输入以下命令,如果要重启其他服务把nginx替换即可。servicenginxrestart出现以下页面即为成功

    2022年8月13日
    3
  • 阿里云轻量应用型服务器防火墙端口开放了还是无法访问问题@林[通俗易懂]

    阿里云轻量应用型服务器防火墙端口开放了还是无法访问问题@林[通俗易懂]阿里云轻量应用型服务器防火墙端口开放了还是无法访问问题登录阿里云找到防火墙远程连接服务器,开启防火墙对应端口这里我用的是FinalShell,当然这个远程连接的软件可以不同,只要能远程连接上服务器就行;(1)查看防火墙状态(dead状态,防火墙未开启;active状态,即防火墙开启)systemctlstatusfirewalld(2)防火墙未开启,开启防火墙(防火墙开启后记得再查看防火墙状态)systemctlstartfirewalld(3)查看开放端口列表f

    2022年9月27日
    0
  • 一个可以恶搞朋友的关机程序!!!

    一个可以恶搞朋友的关机程序!!!文章目录前言一、关机程序1.思路分析二、运行结果如下1.点击后总结前言这是一个可以恶搞朋友的关机程序,实现原理是通过system直接调用dos命令窗口启动关机程序,既然是恶搞那就一定要给你的恶搞对象一定的“机会”,下面我将带大家一起来学习一个这个简单的恶搞程序。(源码可以私信找我要,简单改一下输出语句便可以直接自己使用)大家编译后可以直接把.exe文件发给你所需要恶搞的对象一、关机程序1.思路分析1.使用system调用dos命令窗口直接输入关机命令并弹出提示,此时windows响应跳出关机

    2022年7月22日
    4
  • Office 2007或Office 2010套件初始安装中断后无法重新启动安装程序,错误“Microsoft Office xxx在安装过程中出错”…[通俗易懂]

    大家好,不知道大家在安装Office2007或Office2010时有没有遇到过这样的现象,在您第一次安装Office2007或Office2010套件时,因为某种原因安装中断,当您试图重新安装相同的Office套件时,安装失败并且收到如下的错误提示MicrosoftOffice<具体Office套件名>在安装过程中出错.具体祥见下图:其实出现这个错误的原因可能是因为在您上一次安…

    2022年4月3日
    148

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号