Hadoop作业提交分析(五)

Hadoop作业提交分析(五)经过上一篇的分析 我们知道了 Hadoop 的作业提交目标是 Cluster 还是 Local 与 conf 文件夹内的配置文件参数有着密切关系 不仅如此 其它的很多类都跟 conf 有关 所以提交作业时切记把 conf 放到你的 classpath 中 因为 Configuratio 是利用当前线程上下文的类加载器来加载资源和文件的 所以这里我们采用动态载入的方式 先添加好对应的依赖库和资源 然后再构建一个 UR

 经过上一篇的分析,我们知道了Hadoop的作业提交目标是Cluster还是Local,与conf文件夹内的配置文件参数有着密切关系,不仅如此,其它的很多类都跟conf有关,所以提交作业时切记把conf放到你的classpath中。

  因为Configuration是利用当前线程上下文的类加载器来加载资源和文件的,所以这里我们采用动态载入的方式,先添加好对应的依赖库和资源,然后再构建一个URLClassLoader作为当前线程上下文的类加载器。

复制代码
 
  
public static ClassLoader getClassLoader() {
ClassLoader parent

= Thread.currentThread().getContextClassLoader();


if (parent == null ) {
parent

= EJob. class .getClassLoader();
}





if (parent == null ) {
parent

= ClassLoader.getSystemClassLoader();
}





return new URLClassLoader(classPath.toArray( new URL[ 0 ]), parent);
}

复制代码

  代码很简单,废话就不多说了。调用例子如下:

 
  
EJob.addClasspath( " /usr/lib/hadoop-0.20/conf " );
ClassLoader classLoader

= EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);

  设置好了类加载器,下面还有一步就是要打包Jar文件,就是让Project自打包自己的class为一个Jar包,我这里以标准Eclipse工程文件夹布局为例,打包的就是bin文件夹里的class。

复制代码
 
   
public static File createTempJar(String root) throws IOException {


if ( ! new File(root).exists()) {


return null ;
}
Manifest manifest




= new Manifest();
manifest.getMainAttributes().putValue(

" Manifest-Version " , " 1.0 " );


final File jarFile = File.createTempFile( " EJob- " , " .jar " , new File(System
.getProperty(

" java.io.tmpdir " )));

Runtime.getRuntime().addShutdownHook(




new Thread() {


public void run() {
jarFile.delete();
}
});

JarOutputStream out













= new JarOutputStream( new FileOutputStream(jarFile),
manifest);
createTempJarInner(out,




new File(root), "" );
out.flush();
out.close();








return jarFile;
}









private static void createTempJarInner(JarOutputStream out, File f,
String base)

throws IOException {


if (f.isDirectory()) {
File[] fl

= f.listFiles();


if (base.length() > 0 ) {
base

= base + " / " ;
}





for ( int i = 0 ; i < fl.length; i ++ ) {
createTempJarInner(out, fl[i], base

+ fl[i].getName());
}
}




else {
out.putNextEntry(

new JarEntry(base));
FileInputStream in

= new FileInputStream(f);


byte [] buffer = new byte [ 1024 ];


int n = in.read(buffer);


while (n != - 1 ) {
out.write(buffer,

0 , n);
n

= in.read(buffer);
}
in.close();
}
}










复制代码

  这里的对外接口是createTempJar,接收参数为需要打包的文件夹根路径,支持子文件夹打包。使用递归处理法,依次把文件夹里的结构和文件打包到Jar里。很简单,就是基本的文件流操作,陌生一点的就是Manifest和JarOutputStream,查查API就明了。

  好,万事具备,只欠东风了,我们来实践一下试试。还是拿WordCount来举例:

复制代码
 
   
// Add these statements. XXX
File jarFile = EJob.createTempJar("bin");
EJob.addClasspath("/usr/lib/hadoop-0.20/conf");
ClassLoader classLoader =




EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);



Configuration conf




= new Configuration();
String[] otherArgs

= new GenericOptionsParser(conf, args)
.getRemainingArgs();





if (otherArgs.length != 2 ) {
System.err.println(

" Usage: wordcount " );
System.exit(

2 );
}

Job job







= new Job(conf, " word count " );
job.setJarByClass(WordCountTest.

class );
job.setMapperClass(TokenizerMapper.

class );
job.setCombinerClass(IntSumReducer.

class );
job.setReducerClass(IntSumReducer.

class );
job.setOutputKeyClass(Text.

class );
job.setOutputValueClass(IntWritable.

class );
FileInputFormat.addInputPath(job,

new Path(otherArgs[ 0 ]));
FileOutputFormat.setOutputPath(job,

new Path(otherArgs[ 1 ]));
System.exit(job.waitForCompletion(

true ) ? 0 : 1 );

复制代码

  Run as Java Application。。。!!!No job jar file set…异常,看来job.setJarByClass(WordCountTest.class)这个语句设置作业Jar包没有成功。这是为什么呢?

因为这个方法使用了WordCount.class的类加载器来寻找包含该类的Jar包,然后设置该Jar包为作业所用的Jar包。但是我们的作业 Jar包是在程序运行时才打包的,而WordCount.class的类加载器是AppClassLoader,运行后我们无法改变它的搜索路径,所以使用setJarByClass是无法设置作业Jar包的。我们必须使用JobConf里的setJar来直接设置作业Jar包,像下面一样:

 
  
((JobConf)job.getConfiguration()).setJar(jarFile);

  好,我们对上面的例子再做下修改,加上上面这条语句。

 
  
Job job = new Job(conf, " word count " );
// And add this statement. XXX
((JobConf) job.getConfiguration()).setJar(jarFile.toString());

  再Run as Java Application,终于OK了~~

  该种方法的Run on Hadoop使用简单,兼容性好,推荐一试。:)

  本例子由于时间关系,只在Ubuntu上做了伪分布式测试,但理论上是可以用到真实分布式上去的。

     >>点我下载<<

 

  The end.

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/218328.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 上午7:31
下一篇 2026年3月18日 上午7:31


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号