datax(18):源码解读Transformer

datax(18):源码解读Transformer现在很多场景都把datax当做ETL工具,datax中的各种reader相当于E(Extract),各种writer相当于L(load),那么datax中是否有T(transform)。答案是肯定的~一、概述transformer作用:在生产上数据传输,一般情况下只需要rw就行,但是有时候需要在中间过程做些操作,比如加解密、切割、拼接等等,这个时候就需要transform了。族谱datax中的transform有2个顶级祖宗,简单类型的Transformer和复杂类型的ComplexTran.

大家好,又见面了,我是你们的朋友全栈君。

现在很多场景都把datax当做ETL工具,datax中的各种reader相当于E(Extract),各种writer相当于L(load),那么datax中是否有T(transform)。答案是肯定的~


一、概述transformer

作用:在生产上数据传输,一般情况下只需要rw就行,但是有时候需要在中间过程做些操作,比如加解密、切割、拼接等等,这个时候就需要transform了。

在这里插入图片描述

族谱

datax中的transform有2个顶级祖宗,简单类型的Transformer和复杂类型的ComplexTransformer(后续代码中其实可以将简单类型转为复杂类型);

二、简单类型Transformer

里面只有1个属性+1个方法(get set方法忽略不计)。属性transformerName 是每个transformer的唯一标识符(datax中多以dx_开头命名),方法evaluate是一个抽象方法,主要靠子类实现;

/** * transformerName的唯一性在datax中检查,或者提交到插件中心检查。 */
  private String transformerName;
 /** * 用于具体的处理逻辑的实现 <br> * * @param record Record 行记录,UDF进行record的处理后,更新相应的record * @param paras Object transformer函数参数 */
  abstract public Record evaluate(Record record, Object... paras);

1、族谱如下,从下图可以看出,目前datax的本地的transform主要有5种(替换replace,截取substr,Groovy,过滤filter,填充pad)

在这里插入图片描述


三、复杂类型ComplexTransformer

和简单类型的transformer相比,属性相同,抽象方法里多了了一个参数tContext,主要是做允许的配置项;

  /** * transformerName的唯一性在datax中检查,或者提交到插件中心检查。 */
  private String transformerName;


  public String getTransformerName() { 
   
    return transformerName;
  }

  public void setTransformerName(String transformerName) { 
   
    this.transformerName = transformerName;
  }

  /** * @param record 行记录,UDF进行record的处理后,更新相应的record * @param tContext transformer运行的配置项 * @param paras transformer函数参数 */
  abstract public Record evaluate(Record record, Map<String, Object> tContext, Object... paras);

1、族谱如下,从下图可以看出,主要只有一个子类ComplexTransformerProxy

在这里插入图片描述


2、子类ComplexTransformerProxy

这里有意思了,看代码,里面一个构造函数,一个重写函数。构造函数可以将简单的transform转为复杂的,同时重写函数里实现上调用的是简单类型的evaluate函数。此处不知是否是因为阿里只开源出部分代码。。。

/** * no comments. * Created by liqiang on 16/3/8. */
public class ComplexTransformerProxy extends ComplexTransformer { 
   

  private Transformer realTransformer;

  /** * 将简单类型的transform转为复杂类型 * * @param transformer ComplexTransformerProxy */
  public ComplexTransformerProxy(Transformer transformer) { 
   
    setTransformerName(transformer.getTransformerName());
    this.realTransformer = transformer;
  }

  /** * @param record 行记录,UDF进行record的处理后,更新相应的record * @param tContext transformer运行的配置项 * @param paras transformer函数参数 * @return */
  @Override
  public Record evaluate(Record record, Map<String, Object> tContext, Object... paras) { 
   
    return this.realTransformer.evaluate(record, paras);
  }

  public Transformer getRealTransformer() { 
   
    return realTransformer;
  }
}

三、围绕transform的辅助类

  1. TransformerInfo是transformer的单实例类,主要记录transformer一些信息
  /** * function基本信息 */
  private ComplexTransformer transformer;
  private ClassLoader classLoader;
  /** * 是否是本地transform */
  private boolean isNative;
  1. TransformerRegistry 将transform注册的类
    核心方法是loadTransformerFromLocalStorage,从本地加载transformer
public class TransformerRegistry { 
   

  private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class);

  private static Map<String, TransformerInfo> registedTransformer = new HashMap<>();


  static { 
   
    /** * add native transformer * local storage and from server will be delay load. * 官方默认注册了 5 个方法,分别是截取字符串、填补、替换、过滤、groovy 代码段(后面会详细介绍) */

    registTransformer(new SubstrTransformer());
    registTransformer(new PadTransformer());
    registTransformer(new ReplaceTransformer());
    registTransformer(new FilterTransformer());
    registTransformer(new GroovyTransformer());
  }

  public static void loadTransformerFromLocalStorage() { 
   
    //加载本地存储的 transformer
    loadTransformerFromLocalStorage(null);
  }


  /** * 从本地加载transform(主要是根据transform加载transformer.json) * * @param transformers List<String> transformer文件名列表 */
  public static void loadTransformerFromLocalStorage(List<String> transformers) { 
   
    String[] files = new File(DATAX_STORAGE_TRANSFORMER_HOME).list();
    if (null == files) { 
   
      return;
    }
    for (final String transformerFile : files) { 
   
      try { 
   
        if (transformers == null || transformers.contains(transformerFile)) { 
   
          loadTransformer(transformerFile);
        }
      } catch (Exception e) { 
   
        LOG.error(format("skip transformer(%s) loadTransformer has Exception(%s)",
            transformerFile, e.getMessage()), e);
      }
    }
  }

  /** * 根据文件名加载transformer <br> * 1 先根据 tf名字找到tf.json <br> * 2 将json加载成cfg <br> * 3 将tf 的jar加载 <br> * 4 将tf注册到map中 <br> * * @param tfFile String transformer的文件名 */
  public static void loadTransformer(String tfFile) { 
   
    String tfPath = DATAX_STORAGE_TRANSFORMER_HOME + File.separator + tfFile;
    Configuration tfCfg;
    try { 
   
      tfCfg = loadTransFormerConfig(tfPath);
    } catch (Exception e) { 
   
      String errMsg = format("skip transformer(%s),load transformer.json error,path = %s, ", tfFile,
          tfPath);
      LOG.error(errMsg, e);
      return;
    }

    String className = tfCfg.getString("class");
    if (StringUtils.isEmpty(className)) { 
   
      LOG.error(
          format("skip transformer(%s),class not config, path = %s, config = %s", tfFile, tfPath,
              tfCfg.beautify()));
      return;
    }

    String funName = tfCfg.getString("name");
    if (!tfFile.equals(funName)) { 
   
      LOG.warn(format(
          "transformer(%s) name not match transformer.json config name[%s], will ignore json's name, path = %s, config = %s",
          tfFile, funName, tfPath, tfCfg.beautify()));
    }
    JarLoader jarLoader = new JarLoader(new String[]{ 
   tfPath});

    try { 
   
      Class<?> transformerClass = jarLoader.loadClass(className);
      Object transformer = transformerClass.newInstance();
      // 判断tf 是复杂型还是简单型
      if (ComplexTransformer.class.isAssignableFrom(transformer.getClass())) { 
   
        ((ComplexTransformer) transformer).setTransformerName(tfFile);
        registComplexTransformer((ComplexTransformer) transformer, jarLoader, false);
      } else if (Transformer.class.isAssignableFrom(transformer.getClass())) { 
   
        ((Transformer) transformer).setTransformerName(tfFile);
        registTransformer((Transformer) transformer, jarLoader, false);
      } else { 
   
        LOG.error(format("load Transformer class(%s) error, path = %s", className, tfPath));
      }
    } catch (Exception e) { 
   
      //错误 function 跳过
      LOG.error(format("skip transformer(%s),load Transformer class error, path = %s ", tfFile,
          tfPath), e);
    }
  }

  /** * 根据 transform路径加载transformer.json * * @param transformerPath String * @return Configuration */
  private static Configuration loadTransFormerConfig(String transformerPath) { 
   
    return Configuration.from(new File(transformerPath + File.separator + "transformer.json"));
  }

  public static TransformerInfo getTransformer(String transformerName) { 
   

    TransformerInfo result = registedTransformer.get(transformerName);

    //if (result == null) { 
   
    //todo 再尝试从disk读取
    //}

    return result;
  }

  public static synchronized void registTransformer(Transformer transformer) { 
   
    registTransformer(transformer, null, true);
  }

  public static synchronized void registTransformer(Transformer transformer,
      ClassLoader classLoader, boolean isNative) { 
   

    checkName(transformer.getTransformerName(), isNative);

    if (registedTransformer.containsKey(transformer.getTransformerName())) { 
   
      throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR,
          " name=" + transformer.getTransformerName());
    }

    ComplexTransformerProxy complexTransformer = new ComplexTransformerProxy(transformer);
    TransformerInfo info = buildTransformerInfo(complexTransformer, isNative, classLoader);
    registedTransformer.put(transformer.getTransformerName(), info);

  }

  public static synchronized void registComplexTransformer(ComplexTransformer complexTransformer,
      ClassLoader classLoader, boolean isNative) { 
   

    checkName(complexTransformer.getTransformerName(), isNative);
    if (registedTransformer.containsKey(complexTransformer.getTransformerName())) { 
   
      throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_DUPLICATE_ERROR,
          " name=" + complexTransformer.getTransformerName());
    }
    TransformerInfo info = buildTransformerInfo(complexTransformer, isNative, classLoader);
    registedTransformer.put(complexTransformer.getTransformerName(), info);
  }

  /** * 该方法存在一定问题, <br> * 1 返回值为空,检查结果没处用 <br> * 2 校验是否本地方法不太严谨 <br> * @param functionName * @param isNative */
  private static void checkName(String functionName, boolean isNative) { 
   
    boolean checkResult = true;
    // 只有是datax本地的transform,name名称才dx_开头
    if (isNative) { 
   
      if (!functionName.startsWith("dx_")) { 
   
        checkResult = false;
      }
    } else { 
   
      if (functionName.startsWith("dx_")) { 
   
        checkResult = false;
      }
    }

    if (!checkResult) { 
   
      throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_NAME_ERROR,
          " name=" + functionName + ": isNative=" + isNative);
    }
  }

  private static TransformerInfo buildTransformerInfo(ComplexTransformer complexTransformer,
      boolean isNative, ClassLoader classLoader) { 
   
    TransformerInfo transformerInfo = new TransformerInfo();
    transformerInfo.setClassLoader(classLoader);
    transformerInfo.setIsNative(isNative);
    transformerInfo.setTransformer(complexTransformer);
    return transformerInfo;
  }

  public static List<String> getAllSuportTransformer() { 
   
    return new ArrayList<String>(registedTransformer.keySet());
  }
}
  1. TransformerExecutionParas transform执行类的参数对象,主要包含几个属性及对应get set方法
  /** * 以下是function参数 */

    private Integer columnIndex;
    private String[] paras;
    private Map<String, Object> tContext;
    private String code;
    private List<String> extraPackage;
  1. TransformerExecution transform的执行类,主要方法只有一个genFinalParas,通过一系列内部计算最后得到一个TransformerExecutionParas
  2. TransformerExchanger转换器,主要方法doTransformer,可以在RecordExchanger中直接将record进行转换;
public Record doTransformer(Record record) { 
   
    if (transformerExecs == null || transformerExecs.size() == 0) { 
   
      return record;
    }

    Record result = record;

    long diffExaustedTime = 0;
    String errorMsg = null;
    boolean failed = false;
    for (TransformerExecution transformerInfoExec : transformerExecs) { 
   
      long startTs = System.nanoTime();

      if (transformerInfoExec.getClassLoader() != null) { 
   
        classLoaderSwapper.setCurrentThreadClassLoader(transformerInfoExec.getClassLoader());
      }

      /** * 延迟检查transformer参数的有效性,直接抛出异常,不作为脏数据 * 不需要在插件中检查参数的有效性。但参数的个数等和插件相关的参数,在插件内部检查 */
      if (!transformerInfoExec.isChecked()) { 
   

        if (transformerInfoExec.getColumnIndex() != null
            && transformerInfoExec.getColumnIndex() >= record.getColumnNumber()) { 
   
          throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
              String.format("columnIndex[%s] out of bound[%s]. name=%s",
                  transformerInfoExec.getColumnIndex(), record.getColumnNumber(),
                  transformerInfoExec.getTransformerName()));
        }
        transformerInfoExec.setIsChecked(true);
      }

      try { 
   
        result = transformerInfoExec.getTransformer()
            .evaluate(result, transformerInfoExec.gettContext(),
                transformerInfoExec.getFinalParas());
      } catch (Exception e) { 
   
        errorMsg = String
            .format("transformer(%s) has Exception(%s)", transformerInfoExec.getTransformerName(),
                e.getMessage());
        failed = true;
        //LOG.error(errorMsg, e);
        // transformerInfoExec.addFailedRecords(1);
        //脏数据不再进行后续transformer处理,按脏数据处理,并过滤该record。
        break;

      } finally { 
   
        if (transformerInfoExec.getClassLoader() != null) { 
   
          classLoaderSwapper.restoreCurrentThreadClassLoader();
        }
      }

      if (result == null) { 
   
        /** * 这个null不能传到writer,必须消化掉 */
        totalFilterRecords++;
        //transformerInfoExec.addFilterRecords(1);
        break;
      }

      long diff = System.nanoTime() - startTs;
      //transformerInfoExec.addExaustedTime(diff);
      diffExaustedTime += diff;
      //transformerInfoExec.addSuccessRecords(1);
    }
    totalExaustedTime += diffExaustedTime;
    if (failed) { 
   
      totalFailedRecords++;
      this.pluginCollector.collectDirtyRecord(record, errorMsg);
      return null;
    } else { 
   
      totalSuccessRecords++;
      return result;
    }
  }
  1. TransformerUtil 工具类,主要方法buildTransformerInfo将conf转为可执行的TransformerExecution列表
/** * 根据task的配置构建transformer * * @param taskCfg Configuration * @return List<TransformerExecution> */
  public static List<TransformerExecution> buildTransformerInfo(Configuration taskCfg) { 
   
    List<Configuration> tfConfigs = taskCfg.getListConfiguration(CoreConstant.JOB_TRANSFORMER);
    if (tfConfigs == null || tfConfigs.size() == 0) { 
   
      return null;
    }
    List<TransformerExecution> result = new ArrayList<>();
    List<String> funNames = new ArrayList<>();

    for (Configuration cfg : tfConfigs) { 
   
      String functionName = cfg.getString("name");
      if (StringUtils.isEmpty(functionName)) { 
   
        throw DataXException
            .asDataXException(TRANSFORMER_CONFIGURATION_ERROR, "config=" + cfg.toJSON());
      }

      if (functionName.equals("dx_groovy") && funNames.contains("dx_groovy")) { 
   
        throw DataXException.asDataXException(TRANSFORMER_CONFIGURATION_ERROR,
            "dx_groovy can be invoke once only.");
      }
      funNames.add(functionName);
    }

    //延迟load 第三方插件的function,并按需load
    LOG.info(String.format(" user config transformers [%s], loading...", funNames));
    TransformerRegistry.loadTransformerFromLocalStorage(funNames);

    int i = 0;
    for (Configuration cfg : tfConfigs) { 
   
      String funName = cfg.getString("name");
      TransformerInfo transformerInfo = TransformerRegistry.getTransformer(funName);
      if (transformerInfo == null) { 
   
        throw DataXException.asDataXException(TRANSFORMER_NOTFOUND_ERROR, "name=" + funName);
      }

      //具体的UDF对应一个paras
      TransformerExecutionParas transformerExecutionParas = new TransformerExecutionParas();
      // groovy function仅仅只有code
      if (!funName.equals("dx_groovy") && !funName.equals("dx_fackGroovy")) { 
   
        Integer colIndex = cfg.getInt(CoreConstant.TRANSFORMER_PARAMETER_COLUMNINDEX);

        if (colIndex == null) { 
   
          throw DataXException.asDataXException(TRANSFORMER_ILLEGAL_PARAMETER,
              "columnIndex must be set by UDF:name=" + funName);
        }

        transformerExecutionParas.setColumnIndex(colIndex);
        List<String> paras = cfg.getList(CoreConstant.TRANSFORMER_PARAMETER_PARAS, String.class);
        if (paras != null && paras.size() > 0) { 
   
          transformerExecutionParas.setParas(paras.toArray(new String[0]));
        }
      } else { 
   
        String code = cfg.getString(CoreConstant.TRANSFORMER_PARAMETER_CODE);
        if (StringUtils.isEmpty(code)) { 
   
          throw DataXException.asDataXException(TRANSFORMER_ILLEGAL_PARAMETER,
              "groovy code must be set by UDF:name=" + funName);
        }
        transformerExecutionParas.setCode(code);

        List<String> extraPackage = cfg.getList(TRANSFORMER_PARAMETER_EXTRAPACKAGE, String.class);
        if (extraPackage != null && extraPackage.size() > 0) { 
   
          transformerExecutionParas.setExtraPackage(extraPackage);
        }
      }
      transformerExecutionParas.settContext(cfg.getMap(TRANSFORMER_PARAMETER_CONTEXT));

      TransformerExecution transformerExecution = new TransformerExecution(transformerInfo,
          transformerExecutionParas);

      transformerExecution.genFinalParas();
      result.add(transformerExecution);
      i++;
      LOG.info(String.format(" %s of transformer init success. name=%s, isNative=%s parameter = %s"
          , i, transformerInfo.getTransformer().getTransformerName()
          , transformerInfo.isNative(), cfg.getConfiguration("parameter")));
    }
    return result;
  }
  1. TransformerErrorCode transform定义的一些错误代码,没什么好说的;

注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/145320.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • linux同时启动两个Tomcat[通俗易懂]

    linux同时启动两个Tomcat[通俗易懂]编辑环境变量:vim/etc/profile 在文件末尾复制粘贴即可##########firsttomcat###########CATALINA_BASE=/opt/tomcatCATALINA_HOME=/opt/tomcatTOMCAT_HOME=/opt/tomcatexportCATALINA_BASECATALINA_HOMETOMCAT_HO

    2022年6月16日
    57
  • weblogic环境,应用上传图片报Could not initialize class sun.awt.X11.XToolkit

    weblogic环境,应用上传图片报Could not initialize class sun.awt.X11.XToolkit

    2022年1月21日
    38
  • IE主页被https://hao.360.cn/?a1004劫持,如何解决

    IE主页被https://hao.360.cn/?a1004劫持,如何解决最近IE主页被https://hao.360.cn/?a1004劫持了,不管如何杀毒,更换主页地址,都是不行,包括306、火绒等工具,它就是那么的顽强,不让你更改。但是发现一个现象,那就是如果你在C:\ProgramFiles(x86)\InternetExplorer里找到iexplore.exe,直接打开,会跳转到自己设置的主页,如果你把ie“固定到开始屏幕”,然后在开始屏幕里打开,还是被劫持,那如何解决呢?解决方案:1、IE的主页里先设置自己需要的地址;2、卸载360安全卫士;

    2022年7月26日
    4
  • idea2022.01.4激活码【2022最新】

    (idea2022.01.4激活码)JetBrains旗下有多款编译器工具(如:IntelliJ、WebStorm、PyCharm等)在各编程领域几乎都占据了垄断地位。建立在开源IntelliJ平台之上,过去15年以来,JetBrains一直在不断发展和完善这个平台。这个平台可以针对您的开发工作流进行微调并且能够提供…

    2022年4月1日
    89
  • matlab已知经纬度坐标,如何求两者之间的距离_matlab坐标系转换

    matlab已知经纬度坐标,如何求两者之间的距离_matlab坐标系转换文章来源于我的B站专栏:用经纬度算距离​www.bilibili.com突然对于经纬度与距离感兴趣了(公选课讲到东风系列弹道导弹射程),就想了解一下如何通过经纬度来计算距离。百度了一下,觉得不满意就自己尝试做一下,都是些基本的数学知识,感兴趣玩玩。(如果有错漏之处,欢迎指正!)首先说思路:经纬度转换为空间直角坐标系,由此得到两个向量,求出向量夹角,由向量夹角和地球半径求出弧长,即距离。首先说一下经…

    2022年9月23日
    0
  • 对LARS(Least Angle Regression)的简单理解

    对LARS(Least Angle Regression)的简单理解前言我在本科的时候接触过用LASSO筛选变量的方法,但了解不多。这几天在公司实习,学习到特征选择,发现还有个LARS是经常和LASSO一起被提起的,于是我临时抱佛脚,大概了解了一下LARS的原理。在看文章的时候发现很多人提到SolutionPath这样一个概念,感觉很费解,后来参阅了Efron等人的&quot;LeastAngleRegression&quot;论文,总算是明白了一些。不过本人由于懒,后面数学…

    2022年6月20日
    31

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号