datax(25):插件加载原理

datax(25):插件加载原理一、插件分类按照功能分reader,读插件,例如mysqlReader,从mysql读取数据writer,写插件。例如mysqlWriter,给mysql写入数据;transformer,中间结果转换,例如SubstrTransformer用于字符截取;handler,主要用于任务执行前的准备工作和完成的收尾工作。插件类型由PluginType枚举表示publicenumPluginType{ READER(“reader”),TRANSFORMER(“transfor.

大家好,又见面了,我是你们的朋友全栈君。

一、插件分类

按照功能分

  1. reader, 读插件,例如mysqlReader,从mysql读取数据
  2. writer, 写插件。例如mysqlWriter,给mysql写入数据;
  3. transformer, 中间结果转换,例如SubstrTransformer用于字符截取;
  4. handler, 主要用于任务执行前的准备工作和完成的收尾工作。

插件类型由PluginType枚举表示

public enum PluginType { 
   
	READER("reader"), TRANSFORMER("transformer"), WRITER("writer"), HANDLER("handler");
}

按照运行类型

  1. Job级别的插件
  2. Task级别的插件

二、插件目录结构

大目录:xxx\DataX\target\datax\datax\plugin下分2个reader和writer目录,下面以mysql为例
在这里插入图片描述
如上图,可以看到

  • libs文件夹主要放该插件运行所依赖的jars
  • xxx-xxx.jar即本插件最后打成的包
  • plugin.json里面是本插件的元数据信息(名称,主类,描述信息,开发者),具体如下
{ 
   
    "name": "mysqlreader",
    "class": "com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader",
    "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.",
    "developer": "alibaba"
}
  • plugin_job_template.json是本插件的一个模板JSON,加载时候会根据该模板里的参数校验用户的入参json

三、插件加载原理

1. 配置job信息,获取所有插件名称

给ConfigParser.parse(final String jobPath)传入job路径,该方法组装解析,最后返回一个conf对象,conf里解析出了reader,writer,handler等插件名称;

public static Configuration parse(final String jobPath) { 
   
    Configuration configuration = parseJobConfig(jobPath);
    configuration.merge(parseCoreConfig(CoreConstant.DATAX_CONF_PATH), false);

    // todo config优化,只捕获需要的plugin
    String readerPluginName = configuration.getString(CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
    String writerPluginName = configuration.getString(CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);

    String preHandlerName = configuration.getString(CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
    String postHandlerName = configuration.getString(CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);

    Set<String> pluginList = new HashSet<>();
    pluginList.add(readerPluginName);
    pluginList.add(writerPluginName);

    if (StringUtils.isNotEmpty(preHandlerName)) { 
   
      pluginList.add(preHandlerName);
    }
    if (StringUtils.isNotEmpty(postHandlerName)) { 
   
      pluginList.add(postHandlerName);
    }
    try { 
   
      configuration.merge(parsePluginConfig(new ArrayList<>(pluginList)), false);
    } catch (Exception e) { 
   
      //吞掉异常,保持log干净。这里message足够。
      LOG.warn(String
          .format("插件[%s,%s]加载失败,1s后重试... Exception:%s ", readerPluginName, writerPluginName,
              e.getMessage()));
      try { 
   
        Thread.sleep(1000);
      } catch (InterruptedException e1) { 
   
        //
      }
      configuration.merge(parsePluginConfig(new ArrayList<>(pluginList)), false);
    }
    return configuration;
  }

2. 根据插件名称获取插件配置(plugin.json)

上一步获取了很多插件信息,本步骤根据插件名称和datax的规范,从目录中获取每个插件的详细信息;

public static Configuration parsePluginConfig(List<String> wantPluginNames) { 
   
    Configuration configuration = Configuration.newDefault();

    Set<String> replicaCheckPluginSet = new HashSet<>();
    int complete = 0;
    for (final String each : ConfigParser
        .getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) { 
   
      Configuration eachReaderConfig = ConfigParser
          .parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);
      if (eachReaderConfig != null) { 
   
        configuration.merge(eachReaderConfig, true);
        complete += 1;
      }
    }

    for (final String each : ConfigParser
        .getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) { 
   
      Configuration eachWriterConfig = ConfigParser
          .parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames);
      if (eachWriterConfig != null) { 
   
        configuration.merge(eachWriterConfig, true);
        complete += 1;
      }
    }
    if (wantPluginNames != null && wantPluginNames.size() > 0
        && wantPluginNames.size() != complete) { 
   
      throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INIT_ERROR,
          "插件加载失败,未完成指定插件加载:" + wantPluginNames);
    }
    return configuration;
  }

3.动态加载插件

插件的加载都是使用ClassLoader动态加载。 为了避免类的冲突,datax通过自定义类加载器JarLoader,提供Jar隔离的加载机制

  • 加载器由JarLoader实现
  • 插件的加载接口由LoadUtil类负责
  • 当要加载一个插件时,需要实例化一个JarLoader,然后切换thread class loader之后,才加载插件。这个主要由ClassLoaderSwapper实现

在这里插入图片描述

3.1 JarLoader 类

JarLoader继承URLClassLoader,扩充了可以加载目录的功能。可以从指定的目录下,把传入的路径、及其子路径、以及路径中的jar文件加入到class path。

/** * 提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。 */
public class JarLoader extends URLClassLoader { 
   

  public JarLoader(String[] paths) { 
   
    this(paths, JarLoader.class.getClassLoader());
  }

  public JarLoader(String[] paths, ClassLoader parent) { 
   
    super(getURLs(paths), parent);
  }

  private static URL[] getURLs(String[] paths) { 
   
    Validate.isTrue(null != paths && 0 != paths.length, "jar包路径不能为空.");

    List<String> dirs = new ArrayList<String>();
    for (String path : paths) { 
   
      dirs.add(path);
      JarLoader.collectDirs(path, dirs);
    }

    List<URL> urls = new ArrayList<URL>();
    for (String path : dirs) { 
   
      urls.addAll(doGetURLs(path));
    }

    return urls.toArray(new URL[0]);
  }

  private static void collectDirs(String path, List<String> collector) { 
   
    if (null == path || StringUtils.isBlank(path)) { 
   
      return;
    }

    File current = new File(path);
    if (!current.exists() || !current.isDirectory()) { 
   
      return;
    }

    for (File child : current.listFiles()) { 
   
      if (!child.isDirectory()) { 
   
        continue;
      }

      collector.add(child.getAbsolutePath());
      collectDirs(child.getAbsolutePath(), collector);
    }
  }

  private static List<URL> doGetURLs(final String path) { 
   
    Validate.isTrue(!StringUtils.isBlank(path), "jar包路径不能为空.");
    File jarPath = new File(path);
    Validate.isTrue(jarPath.exists() && jarPath.isDirectory(), "jar包路径必须存在且为目录.");
    /* set filter */
    FileFilter jarFilter = pathname -> pathname.getName().endsWith(".jar");

    /* iterate all jar */
    File[] allJars = new File(path).listFiles(jarFilter);
    List<URL> jarURLs = new ArrayList<>(allJars.length);

    for (int i = 0; i < allJars.length; i++) { 
   
      try { 
   
        jarURLs.add(allJars[i].toURI().toURL());
      } catch (Exception e) { 
   
        throw DataXException.asDataXException(
            FrameworkErrorCode.PLUGIN_INIT_ERROR, "系统加载jar包出错", e);
      }
    }
    return jarURLs;
  }
}
3.2 LoadUtil 类

LoadUtil管理着插件的加载器,调用getJarLoader返回插件对应的加载器。

/** * Created by jingxing on 14-8-24. * <p/> * 插件加载器,大体上分reader、transformer(还未实现)和writer三种插件类型, reader和writer在执行时又可能出现Job和Task两种运行时(加载的类不同) */
public class LoadUtil { 
   

  private static final String pluginTypeNameFormat = "plugin.%s.%s";

  private LoadUtil() { 
   
  }

  private enum ContainerType { 
   
    Job("Job"),
    Task("Task");

    private String type;

    ContainerType(String type) { 
   
      this.type = type;
    }

    public String value() { 
   
      return type;
    }
  }

  /** * 所有插件配置放置在pluginRegisterCenter中,为区别reader、transformer和writer,还能区别 * 具体pluginName,故使用pluginType.pluginName作为key放置在该map中 */
  private static Configuration pluginRegisterCenter;

  /** * jarLoader的缓冲 */
  private static Map<String, JarLoader> jarLoaderCenter = new HashMap<>();

  /** * 设置pluginConfigs,方便后面插件来获取 初始化PluginLoader,可以获取各种插件配置 * * @param pluginConfigs */
  public static void bind(Configuration pluginConfigs) { 
   
    pluginRegisterCenter = pluginConfigs;
  }

  /** * 根据插件类型+插件名称,生成一个 字符串。插件中心根据该字符串找到对应插件 * * @param pluginType PluginType * @param pluginName String * @return String */
  private static String generatePluginKey(PluginType pluginType, String pluginName) { 
   
    return String.format(pluginTypeNameFormat, pluginType.toString(), pluginName);
  }

  /** * 根据插件类型和插件名称,获取配置; <br/> 1 根据 插件类型+插件名称,返回string ; <br/> 2 从 pluginRegisterCenter 中根据 * string获取配置 * * @param pluginType * @param pluginName * @return */
  private static Configuration getPluginConf(PluginType pluginType, String pluginName) { 
   
    Configuration pluginConf = pluginRegisterCenter
        .getConfiguration(generatePluginKey(pluginType, pluginName));

    if (null == pluginConf) { 
   
      throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_INSTALL_ERROR,
          String.format("DataX不能找到插件[%s]的配置.", pluginName));
    }

    return pluginConf;
  }

  /** * 根据反射使用插件类型+插件名称 返回 插件。加载JobPlugin,reader、writer都可能要加载 * * @param type PluginType * @param name String * @return AbstractJobPlugin */
  public static AbstractJobPlugin loadJobPlugin(PluginType type, String name) { 
   
    Class<? extends AbstractPlugin> clazz = loadPluginClass(type, name, ContainerType.Job);

    try { 
   
      AbstractJobPlugin jobPlugin = (AbstractJobPlugin) clazz.newInstance();
      jobPlugin.setPluginConf(getPluginConf(type, name));
      return jobPlugin;
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
          String.format("DataX找到plugin[%s]的Job配置.", name), e);
    }
  }

  /** * 原理类同上面loadJobPlugin 方法。加载taskPlugin,reader、writer都可能加载 * * @param type PluginType * @param name String * @return AbstractTaskPlugin */
  public static AbstractTaskPlugin loadTaskPlugin(PluginType type, String name) { 
   
    Class<? extends AbstractPlugin> clz = LoadUtil.loadPluginClass(type, name, ContainerType.Task);

    try { 
   
      AbstractTaskPlugin taskPlugin = (AbstractTaskPlugin) clz.newInstance();
      taskPlugin.setPluginConf(getPluginConf(type, name));
      return taskPlugin;
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
          String.format("DataX不能找plugin[%s]的Task配置.", name), e);
    }
  }

  /** * 根据插件类型、名字和执行时taskGroupId加载对应运行器 * * @param pluginType * @param pluginName * @return */
  public static AbstractRunner loadPluginRunner(PluginType pluginType, String pluginName) { 
   
    AbstractTaskPlugin taskPlugin = LoadUtil.loadTaskPlugin(pluginType, pluginName);

    switch (pluginType) { 
   
      case READER:
        return new ReaderRunner(taskPlugin);
      case WRITER:
        return new WriterRunner(taskPlugin);
      default:
        throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
            String.format("插件[%s]的类型必须是[reader]或[writer]!", pluginName));
    }
  }

  /** * 反射出具体plugin实例 * * @param pluginType * @param pluginName * @param pluginRunType * @return */
  @SuppressWarnings("unchecked")
  private static synchronized Class<? extends AbstractPlugin> loadPluginClass(PluginType pluginType,
      String pluginName, ContainerType pluginRunType) { 
   
    Configuration pluginConf = getPluginConf(pluginType, pluginName);
    JarLoader jarLoader = LoadUtil.getJarLoader(pluginType, pluginName);
    try { 
   
      return (Class<? extends AbstractPlugin>) jarLoader
          .loadClass(pluginConf.getString("class") + "$" + pluginRunType.value());
    } catch (Exception e) { 
   
      throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, e);
    }
  }

  public static synchronized JarLoader getJarLoader(PluginType pluginType, String pluginName) { 
   
    Configuration pluginConf = getPluginConf(pluginType, pluginName);
    JarLoader jarLoader = jarLoaderCenter.get(generatePluginKey(pluginType, pluginName));
    if (null == jarLoader) { 
   
      String pluginPath = pluginConf.getString("path");
      if (StringUtils.isBlank(pluginPath)) { 
   
        throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
            String.format("%s插件[%s]路径非法!", pluginType, pluginName));
      }
      jarLoader = new JarLoader(new String[]{ 
   pluginPath});
      jarLoaderCenter.put(generatePluginKey(pluginType, pluginName), jarLoader);
    }
    return jarLoader;
  }
}
3.3 ClassLoaderSwapper

切换类加载器

/** * Created by jingxing on 14-8-29. * * 为避免jar冲突,比如hbase可能有多个版本的读写依赖jar包,JobContainer和TaskGroupContainer * 就需要脱离当前classLoader去加载这些jar包,执行完成后,又退回到原来classLoader上继续执行接下来的代码 */
public final class ClassLoaderSwapper { 
   
    private ClassLoader storeClassLoader = null;

    private ClassLoaderSwapper() { 
   
    }

    public static ClassLoaderSwapper newCurrentThreadClassLoaderSwapper() { 
   
        return new ClassLoaderSwapper();
    }

    /** * 保存当前classLoader,并将当前线程的classLoader设置为 传入的classLoader(实现classLoader互换) * * @param * @return */
    public ClassLoader setCurrentThreadClassLoader(ClassLoader classLoader) { 
   
        // 1 将当前线程的 classLoader 先保持到 本类的 storeClassLoader 中
        this.storeClassLoader = Thread.currentThread().getContextClassLoader();
        // 2 将当前线程的 classLoader 设置为 传入的 classLoader
        Thread.currentThread().setContextClassLoader(classLoader);
        return this.storeClassLoader;
    }

    /** * 将当前线程的类加载器设置为保存的类加载 * @return */
    public ClassLoader restoreCurrentThreadClassLoader() { 
   
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(this.storeClassLoader);
        return classLoader;
    }
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/145242.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • vim选中复制粘贴_vim 复制一行并且粘贴

    vim选中复制粘贴_vim 复制一行并且粘贴vim有12个粘贴板        ”代表全局粘贴板:reg查看粘贴板    “Np 粘贴其中一个:n,mcon 从第几行到第几行复制到第几行后:n,mmn 从第几行到第几行移动第几行后:n,md   从第几行到第几行删除:n,mwfilename 从第几行到第几行保存文件:savfilena

    2022年9月22日
    3
  • java调用python脚本返回的参数_javaweb调用python

    java调用python脚本返回的参数_javaweb调用python实际工程项目中可能会用到Java和python两种语言结合进行,这样就会涉及到一个问题,就是怎么用Java程序来调用已经写好的python脚本呢,一共有四种方法可以实现:1、在java类中直接执行python语句此方法需要引用org.python包,需要下载Jpython。Jython是一种完整的语言,而不是一个Java翻译器或仅仅是一个Python编译器,它是一个Python语言在Java中的完全实现。Jython是在JVM上实现的Python,由J

    2025年8月25日
    4
  • ubuntu wine安装的软件怎么运行_Ubuntu安装wine

    ubuntu wine安装的软件怎么运行_Ubuntu安装wineCSDNGitHubUbuntu安装wine和WineQQ(2012国际版)AderXCoding/system/tools/wine本作品采用知识共享署名-非商业性使用-相同方式共享4.0国际许可协议进行许可参照Wine–UbuntuWikiQQ–UbuntuWikiWine–WikiWinehq1安装wine1.1

    2025年9月21日
    7
  • eclipse解决“java was started but returned exit code=13”问题

    eclipse解决“java was started but returned exit code=13”问题

    2021年10月2日
    46
  • Java中super()的使用[通俗易懂]

    Java中super()的使用[通俗易懂]目录1.super()的使用实例一一一子类重写父类的方法2.super()的使用实例一一一子类重写父类的变量3.super()的使用实例一一一在子类的构造方法中4.关于构造方法中super()第一种情况:编译不通过第二种情况:编译不通过第三种情况:成功编译通过1.super()的使用实例一一一子类重写父类的方法publicclassA{…

    2022年6月11日
    31
  • win10下CUDA和CUDNN的安装(超详细)!亲测有效![通俗易懂]

    win10下CUDA和CUDNN的安装(超详细)!亲测有效![通俗易懂]CUDA10安装配置CUDA10的安装包可直接从NVIDIA官网下载。根据相应的系统选项,我选择的是cuda_10.1.168_425.25_win10.exe(大小为2.3G),安装的时候建议选择自定义而不是“精简”(从下面的英文解释可以看出,其实这里的精简写成完整应该更贴切,他会安装所有组件并覆盖现有驱动,然而我并不想安装全家桶,何况我的官方显卡驱动比他的新)。下载路径:https…

    2022年6月11日
    30

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号