datax(10): 源码解读Communication(Datax通讯类)「建议收藏」

datax(10): 源码解读Communication(Datax通讯类)「建议收藏」前面看了datax的通讯机制,继续看源码—具体的通讯类Communication。根据datax的运行模式的区别,数据的收集会有些区别,这篇文章都是讲的在standalone模式下。一、communication概述DataX所有的统计信息都会保存到Communication类里面。Communication支持下列数据的统计计数器,比如读取的字节速度,写入成功的数据条数/***所有的数值key-value对**/privateMap<String.

大家好,又见面了,我是你们的朋友全栈君。

前面看了datax的通讯机制,继续看源码—具体的通讯类 Communication。根据datax的运行模式的区别, 数据的收集会有些区别,这篇文章都是讲的在standalone模式下。


一、communication概述

DataX所有的统计信息都会保存到Communication类里面。

Communication支持下列数据的统计

  1. 计数器,比如读取的字节速度,写入成功的数据条数
  2. 统计的时间点 字符串类型的消息
  3. 执行时的异常
  4. 执行的状态, 比如成功或失败

  /** * 所有的数值key-value对 * */
  private Map<String, Number> counter;

  /** * 运行状态 * */
  private State state;

  /** * 异常记录 * */
  private Throwable throwable;

  /** * 记录的timestamp * */
  private long timestamp;

  /** * task给job的信息 * */
  Map<String, List<String>> message;
  

communication继承关系
在这里插入图片描述

如果需要汇总多个Communication的数据,Communication提供了mergeFrom方法。根据不同的数据类型,对应着不同的操作计数器类型,相同的key的数值累加

  • 合并异常,当自身的异常为null,才合并别的异常

  • 合并状态,如果有任意一个的状态失败了,那么返回失败的状态。如果有任意一个的状态正在运行,那么返回正在运行的状态

  • 合并消息, 相同的key的消息添加到同一个列表


二、communication主要方法

在这里插入图片描述


三、Communication的管理类

对于每个task组都有一个单独的Communication,用来存储这个组的统计数据。对于这些Communication,在LocalTGCommunicationManager类实现了集中管理。接下来看看LocalTGCommunicationManager的原理。

LocalTGCommunicationManager有个重要的属性taskGroupCommunicationMap,它是一个Map,保存了每个task组的统计数据。


public final class LocalTGCommunicationManager { 
   

  private static Map<Integer, Communication> taskGroupCommunicationMap = new ConcurrentHashMap<>();

  /** * 根据tgId注册comm * 当task组在初始化的时候,都会向LocalTGCommunicationManager这里注册。// 这里只是简单保存到taskGroupCommunicationMap变量里 * @param taskGroupId * @param communication */
  public static void registerTaskGroupCommunication(int taskGroupId, Communication communication) { 
   
    taskGroupCommunicationMap.put(taskGroupId, communication);
  }

  /** * 获取(合并)tg里面所有的comm * * @return Communication */
  public static Communication getJobCommunication() { 
   
    Communication communication = new Communication();
    communication.setState(State.SUCCEEDED);

    for (Communication taskGroupCommunication : taskGroupCommunicationMap.values()) { 
   
      communication.mergeFrom(taskGroupCommunication);
    }
    return communication;
  }

  /** * 采用获取taskGroupId后再获取对应communication的方式, * 防止map遍历时修改,同时也防止对map key-value对的修改 * * @return */
  public static Set<Integer> getTaskGroupIdSet() { 
   
    return taskGroupCommunicationMap.keySet();
  }

  public static Communication getTaskGroupCommunication(int taskGroupId) { 
   
    Validate.isTrue(taskGroupId >= 0, "taskGroupId不能小于0");
    return taskGroupCommunicationMap.get(taskGroupId);
  }


  /** * 根据tgId 将taskGroupCommunicationMap中没有的comm 插入 * @param taskGroupId * @param comm */
  public static void updateTaskGroupCommunication(final int taskGroupId, final Communication comm) { 
   
    Validate.isTrue(taskGroupCommunicationMap.containsKey(
        taskGroupId), String.format("taskGroupCommunicationMap中没有注册taskGroupId[%d]的Communication," +
        "无法更新该taskGroup的信息", taskGroupId));
    taskGroupCommunicationMap.put(taskGroupId, comm);
  }

  public static void clear() { 
   
    taskGroupCommunicationMap.clear();
  }

  public static Map<Integer, Communication> getTaskGroupCommunicationMap() { 
   
    return taskGroupCommunicationMap;
  }
}

四、谁会注册Communication

AbstractScheduler会根据切分后的任务,为每个task组注册一个Communication。registerCommunication接收task配置列表,里面每个配置都包含了task group id。

进行注册communication的类

  • AbstractScheduler的schedule方法里 registerCommunication
  • TaskGroupContainer的start方法里 registerCommunication
  • AbstractTGContainerCommunicator的registerCommunication方法
  • AbstractContainerCommunicator的registerCommunication方法
  • StandAloneJobContainerCommunicator的registerCommunication方法

在这里插入图片描述


五、更新communication统计数据

主要更新communication的类
在这里插入图片描述

每个任务执行都会对应着Channel,Channel当每处理一条数据时,都会更新对应Communication的统计信息。
例如下面的pull方法是Writer从Channel拉取数据,每次pull的时候,都会调用statPull函数,会更新写入数据条数和字节数的信息。


public abstract class Channel{ 
   

    private Communication currentCommunication;

    public Record pull() { 
   
        Record record = this.doPull();
        this.statPull(1L, record.getByteSize());
        return record;
    }
    
    /** * statPull方法,并没有限速。因为数据的整个流程是Reader -》 Channle -》 Writer, Reader的push速度限制了, * Writer的pull速度也就没必要限速 * * @param recordSize * @param byteSize */
    private void statPull(long recordSize, long byteSize) { 
   
        currentCommunication.increaseCounter(CommunicationTool.WRITE_RECEIVED_RECORDS, recordSize);
        currentCommunication.increaseCounter(CommunicationTool.WRITE_RECEIVED_BYTES, byteSize);
    }
    

六、收集communication统计数据

  1. AbstractScheduler想统计汇总后的数据,需要调用AbstractContainerCommunicator的collect方法

  2. StandAloneJobContainerCommunicator继承AbstractContainerCommunicator,实现了collect方法,它会调用AbstractCollector的collectFromTaskGroup方法获取数据

  3. ProcessInnerCollector实现了AbstractCollector的collectFromTaskGroup方法,它会调用LocalTGCommunicationManager的getJobCommunication方法, getJobCommunication方法会统计所有task的数据,然后返回。

在这里插入图片描述


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/145489.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • java中short、int、long、float、double取值范围「建议收藏」

    java中short、int、long、float、double取值范围「建议收藏」对于java的数据类型,既熟悉又陌生,于是整理了这篇文档。最近的面试让我开始注意细节,细节真的很重要。一、分析基本数据类型的特点,最大值和最小值。1、基本类型:int二进制位数:32包装类:java.lang.Integer最小值:Integer.MIN_VALUE=-2147483648(-2的31次方)最大值:Integer.MAX_VALUE=2147

    2022年5月22日
    50
  • 分享Nexus桌面插件的安装及使用,超级好看、好用的插件。附下载链接。「建议收藏」

    分享Nexus桌面插件的安装及使用,超级好看、好用的插件。附下载链接。「建议收藏」效果图,如图所示!害怕win11的一系列bug,又想拥有像win11一样的桌面图标插件!Nexus可以满足你的需求!!!软件下载链接:https://pan.baidu.com/s/17lUOdON-0VwQvP98VU-jRQ提取码:10c9注:该链接为网络资源,如有侵权请告知!下载完压缩包之后,可以点击NexusSetup.exe进行安装,一路默认就可以。安装好之后,点击运行Nexus。这时候,你就会发现桌面中上方会有一行奇丑无比的插件!!!!别慌!别慌!别慌!好心人已经写了一些格式

    2022年9月12日
    3
  • ResourceBundle用法[通俗易懂]

    ResourceBundle用法[通俗易懂]ResourceBundle用于解释资源文件。 1.新建一个.properties文件这里为:AccessMessages.properties例error=错误warn=警告放入工程下的en_US,目录结构如图  2.建立绑定关系[ResourceBundle("AccessMessages")]privatestaticvarrb:Resource…

    2022年7月12日
    30
  • Mybatis中JdbcType的使用[通俗易懂]

    Mybatis中JdbcType的使用[通俗易懂]MyBatis插入空值时,需要指定JdbcTypemybatisinsert空值报空值异常,但是在PL/SQL不会提示错误,主要原因是mybatis无法进行转换jdbcType的使用场合,只有当在insert,update和delete中有空字段时,需要使用jdbcType。MyBatis包含的jdbcType类型BIT、FLOAT、CHAR、TIMESTAMP、OTHE…

    2022年10月20日
    2
  • mysql数据库转postgres数据库语法不通错误方言

    mysql数据库转postgres数据库语法不通错误方言之前一直用的是mysql数据库,现在公司要求使用postgres,但是做分页查询的时候,postgres数据库会报错如下:mysql使用的是limitx,y。而postgres使用方式是limitxoffsety,语法不一样就会报这个错误。这时候我们如果继续想要使用之前的方式操作数据库,我们就需要配置一个叫方言的东西。以下是postgres方言的配置。jpa:database:postgresqlproperties:hibernate:

    2022年7月27日
    6
  • 大整数相乘java_大整数乘法—java实现

    大整数相乘java_大整数乘法—java实现大整数相乘参考博客:https://blog.csdn.net/oh_maxy/article/details/10903929https://blog.csdn.net/u010867294/article/details/77482306大整数相乘,对于计算机来说,由于整数的范围存在限制,如果数值太大,则两个较大整数及其结果在表示时就将可能产生溢出。因此,对于两个大整数的乘法我们就需要将其转化…

    2022年6月2日
    32

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号