hadoop提交作业到云端问题解决

hadoop提交作业到云端问题解决问题描述 当按照 Hadoop 实战上讲述的用 eclipse 提交作业 其实作业是运行在 eclipse 虚拟的一个云环境中 而不是真正提交到 Hadoop 云端运行 在 50030 上也看不到 job 的运行记录 此时的代码如下 packagecom spork hadoop jobutil test importjava io File importjava io IOExc

问题描述:

当按照Hadoop实战上讲述的用eclipse提交作业,其实作业是运行在eclipse虚拟的一个云环境中,而不是真正提交到Hadoop云端运行。在50030上也看不到job的运行记录,此时的代码如下:

package com.spork.hadoop.jobutil.test; import java.io.File; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import com.spork.hadoop.jobutil.EJob; public class WordCountTest { public static class TokenizerMapper extends Mapper 
    
     {  
    private 
    final 
    static IntWritable one = 
    new IntWritable(1 
    );  
    private Text word = 
    new 
     Text();  
    public 
    void 
     map(Object key, Text value, Context context)  
    throws 
     IOException, InterruptedException { StringTokenizer itr = 
    new 
     StringTokenizer(value.toString());  
    while 
     (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }  
    public 
    static 
    class IntSumReducer 
    extends 
     Reducer 
     
      {  
     private IntWritable result = 
     new 
      IntWritable();  
     public 
     void reduce(Text key, Iterable 
      
       values, Context context)  
      throws 
       IOException, InterruptedException {  
      int sum = 0 
      ;  
      for 
       (IntWritable val : values) { sum += 
       val.get(); } result.set(sum); context.write(key, result); } }  
      public 
      static 
      void main(String[] args) 
      throws 
       Exception { Configuration conf = 
      new 
       Configuration(); String[] inAndOut={"hdfs://localhost:9000/bin/in/1","hdfs://localhost:9000/out" 
      }; Job job = 
      new Job(conf, "word count" 
      ); job.setJarByClass(WordCountTest. 
      class 
      ); job.setMapperClass(TokenizerMapper. 
      class 
      ); job.setCombinerClass(IntSumReducer. 
      class 
      ); job.setReducerClass(IntSumReducer. 
      class 
      ); job.setOutputKeyClass(Text. 
      class 
      ); job.setOutputValueClass(IntWritable. 
      class 
      ); FileInputFormat.addInputPath(job,  
      new Path(inAndOut[0 
      ])); FileOutputFormat.setOutputPath(job,  
      new Path(inAndOut[1 
      ])); System.exit(job.waitForCompletion( 
      true) ? 0 : 1 
      ); } } 
      
     
   

这里介绍一个更加智能的方式,其主要思路是将jar打包的工作放在了java代码中来完成。其中使用了一个工具类ejob.java。

示例代码如下:

package com.spork.hadoop.jobutil.test; import java.io.File; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import com.spork.hadoop.jobutil.EJob; public class WordCountTest { public static class TokenizerMapper extends Mapper 
    
      { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer 
     
       { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable 
      
        values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { // Add these statements. XXX File jarFile = EJob.createTempJar("bin"); EJob.addClasspath("/usr/hadoop-1.2.1/conf"); ClassLoader classLoader = EJob.getClassLoader(); Thread.currentThread().setContextClassLoader(classLoader); Configuration conf = new Configuration(); String[] inAndOut={"hdfs://localhost:9000/bin/in/1","hdfs://localhost:9000/out"}; Job job = new Job(conf, "word count"); // And add this statement. XXX ((JobConf) job.getConfiguration()).setJar(jarFile.toString()); job.setJarByClass(WordCountTest.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(inAndOut[0])); FileOutputFormat.setOutputPath(job, new Path(inAndOut[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 
       
      
    

 

ejob类的代码如下:

/ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.spork.hadoop.jobutil; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Array; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; import java.util.Enumeration; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; public class EJob { private static ArrayList 
    
      classPath = new ArrayList 
     
       (); / Unpack a jar file into a directory. */ public static void unJar(File jarFile, File toDir) throws IOException { JarFile jar = new JarFile(jarFile); try { Enumeration entries = jar.entries(); while (entries.hasMoreElements()) { JarEntry entry = (JarEntry) entries.nextElement(); if (!entry.isDirectory()) { InputStream in = jar.getInputStream(entry); try { File file = new File(toDir, entry.getName()); if (!file.getParentFile().mkdirs()) { if (!file.getParentFile().isDirectory()) { throw new IOException("Mkdirs failed to create " + file.getParentFile().toString()); } } OutputStream out = new FileOutputStream(file); try { byte[] buffer = new byte[8192]; int i; while ((i = in.read(buffer)) != -1) { out.write(buffer, 0, i); } } finally { out.close(); } } finally { in.close(); } } } } finally { jar.close(); } } / * Run a Hadoop job jar. If the main class is not in the jar's manifest, then * it must be provided on the command line. */ public static void runJar(String[] args) throws Throwable { String usage = "jarFile [mainClass] args..."; if (args.length < 1) { System.err.println(usage); System.exit(-1); } int firstArg = 0; String fileName = args[firstArg++]; File file = new File(fileName); String mainClassName = null; JarFile jarFile; try { jarFile = new JarFile(fileName); } catch (IOException io) { throw new IOException("Error opening job jar: " + fileName).initCause(io); } Manifest manifest = jarFile.getManifest(); if (manifest != null) { mainClassName = manifest.getMainAttributes().getValue("Main-Class"); } jarFile.close(); if (mainClassName == null) { if (args.length < 2) { System.err.println(usage); System.exit(-1); } mainClassName = args[firstArg++]; } mainClassName = mainClassName.replaceAll("/", "."); File tmpDir = new File(System.getProperty("java.io.tmpdir")); tmpDir.mkdirs(); if (!tmpDir.isDirectory()) { System.err.println("Mkdirs failed to create " + tmpDir); System.exit(-1); } final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir); workDir.delete(); workDir.mkdirs(); if (!workDir.isDirectory()) { System.err.println("Mkdirs failed to create " + workDir); System.exit(-1); } Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { fullyDelete(workDir); } catch (IOException e) { } } }); unJar(file, workDir); classPath.add(new File(workDir + "/").toURL()); classPath.add(file.toURL()); classPath.add(new File(workDir, "classes/").toURL()); File[] libs = new File(workDir, "lib").listFiles(); if (libs != null) { for (int i = 0; i < libs.length; i++) { classPath.add(libs[i].toURL()); } } ClassLoader loader = new URLClassLoader(classPath.toArray(new URL[0])); Thread.currentThread().setContextClassLoader(loader); Class 
       mainClass = Class.forName(mainClassName, true, loader); Method main = mainClass.getMethod("main", new Class[] { Array.newInstance( String.class, 0).getClass() }); String[] newArgs = Arrays.asList(args).subList(firstArg, args.length) .toArray(new String[0]); try { main.invoke(null, new Object[] { newArgs }); } catch (InvocationTargetException e) { throw e.getTargetException(); } } / * Delete a directory and all its contents. If we return false, the directory * may be partially-deleted. */ public static boolean fullyDelete(File dir) throws IOException { File contents[] = dir.listFiles(); if (contents != null) { for (int i = 0; i < contents.length; i++) { if (contents[i].isFile()) { if (!contents[i].delete()) { return false; } } else { // try deleting the directory // this might be a symlink boolean b = false; b = contents[i].delete(); if (b) { // this was indeed a symlink or an empty directory continue; } // if not an empty directory or symlink let // fullydelete handle it. if (!fullyDelete(contents[i])) { return false; } } } } return dir.delete(); } / * Add a directory or file to classpath. * * @param component */ public static void addClasspath(String component) { if ((component != null) && (component.length() > 0)) { try { File f = new File(component); if (f.exists()) { URL key = f.getCanonicalFile().toURL(); if (!classPath.contains(key)) { classPath.add(key); } } } catch (IOException e) { } } } / * Add default classpath listed in bin/hadoop bash. * * @param hadoopHome */ public static void addDefaultClasspath(String hadoopHome) { // Classpath initially contains conf dir. addClasspath(hadoopHome + "/conf"); // For developers, add Hadoop classes to classpath. addClasspath(hadoopHome + "/build/classes"); if (new File(hadoopHome + "/build/webapps").exists()) { addClasspath(hadoopHome + "/build"); } addClasspath(hadoopHome + "/build/test/classes"); addClasspath(hadoopHome + "/build/tools"); // For releases, add core hadoop jar & webapps to classpath. if (new File(hadoopHome + "/webapps").exists()) { addClasspath(hadoopHome); } addJarsInDir(hadoopHome); addJarsInDir(hadoopHome + "/build"); // Add libs to classpath. addJarsInDir(hadoopHome + "/lib"); addJarsInDir(hadoopHome + "/lib/jsp-2.1"); addJarsInDir(hadoopHome + "/build/ivy/lib/Hadoop/common"); } / * Add all jars in directory to classpath, sub-directory is excluded. * * @param dirPath */ public static void addJarsInDir(String dirPath) { File dir = new File(dirPath); if (!dir.exists()) { return; } File[] files = dir.listFiles(); if (files == null) { return; } for (int i = 0; i < files.length; i++) { if (files[i].isDirectory()) { continue; } else { addClasspath(files[i].getAbsolutePath()); } } } / * Create a temp jar file in "java.io.tmpdir". * * @param root * @return * @throws IOException */ public static File createTempJar(String root) throws IOException { if (!new File(root).exists()) { return null; } Manifest manifest = new Manifest(); manifest.getMainAttributes().putValue("Manifest-Version", "1.0"); final File jarFile = File.createTempFile("EJob-", ".jar", new File(System .getProperty("java.io.tmpdir"))); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { jarFile.delete(); } }); JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile), manifest); createTempJarInner(out, new File(root), ""); out.flush(); out.close(); return jarFile; } private static void createTempJarInner(JarOutputStream out, File f, String base) throws IOException { if (f.isDirectory()) { File[] fl = f.listFiles(); if (base.length() > 0) { base = base + "/"; } for (int i = 0; i < fl.length; i++) { createTempJarInner(out, fl[i], base + fl[i].getName()); } } else { out.putNextEntry(new JarEntry(base)); FileInputStream in = new FileInputStream(f); byte[] buffer = new byte[1024]; int n = in.read(buffer); while (n != -1) { out.write(buffer, 0, n); n = in.read(buffer); } in.close(); } } / * Return a classloader based on user-specified classpath and parent * classloader. * * @return */ public static ClassLoader getClassLoader() { ClassLoader parent = Thread.currentThread().getContextClassLoader(); if (parent == null) { parent = EJob.class.getClassLoader(); } if (parent == null) { parent = ClassLoader.getSystemClassLoader(); } return new URLClassLoader(classPath.toArray(new URL[0]), parent); } } 
      
    

 

修改之后,50030就可以监测到job的运行状况了,而且看cpu的使用状态,各个cpu的占用率也上去了。

hadoop提交作业到云端问题解决

 

若想更加详细了解ejob的实现机理,可以参考下面的博客。

http://www.cnblogs.com/spork/archive/2010/04/21/1717592.html

 

转载于:https://www.cnblogs.com/lovecreatemylife/p/4370333.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/206034.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 下午4:38
下一篇 2026年3月19日 下午4:38


相关推荐

  • 微信公众号开发基本流程

    微信公众号开发基本流程背景:过年前后做了个微信公众号项目,已经过去一段时间了,抽空回忆总结下基本流程吧,不然很快估计自己就忘了。。微信公众平台官网:https://mp.weixin.qq.com文章目录一、注册公众号二、了解公众号管理页面三、必备开发者工具的使用1.开发者文档2.在线接口调试工具3.web开发者工具4.公众平台测试账号四、细读开发者文档五、开发流程重点解析1.开发环境准备2.服务器基本配置3….

    2022年6月6日
    35
  • leetocde-416分割等和子集(01背包)

    leetocde-416分割等和子集(01背包)原题链接给定一个只包含正整数的非空数组。是否可以将这个数组分割成两个子集,使得两个子集的元素和相等。注意:每个数组中的元素不会超过 100数组的大小不会超过 200示例 1:输入: [1, 5, 11, 5]输出: true解释: 数组可以分割成 [1, 5, 5] 和 [11]. 示例 2:输入: [1, 2, 3, 5]输出: false解释: 数组不能分割成两个元素和相等的子集.题解先看和如果式奇数,返回false,否则除以2,然后看是否能够用拼凑出整合处于

    2022年8月8日
    5
  • 将WebStorm快捷键修改为eclipse的快捷键风格

    将WebStorm快捷键修改为eclipse的快捷键风格说明:由于大家都熟练使用了eclipse、MyEclipse等软件,其快捷键也应用熟练,所以大家在用WebStorm时,可以将WebStorm的快捷键风格(映射)改为大家常用的eclipse风格快捷键。 修改方法  File(文件)–&gt;Settings…(设置…)–&gt;快捷键–&gt;Keymap(快捷键映射)下拉选择eclipse,应用确定即可。  …

    2022年6月23日
    37
  • 谷歌升级 Gemini 2.0 系列模型,AI 助手可免费深度推理

    谷歌升级 Gemini 2.0 系列模型,AI 助手可免费深度推理

    2026年3月16日
    2
  • 孩子到底要不要学编程课_幼儿编程课主要学什么

    孩子到底要不要学编程课_幼儿编程课主要学什么《快学 Go 语言》第 11 课 —— 千军万马跑协程

    2022年4月21日
    47
  • pycharm 2022.01.13激活码_最新在线免费激活2022.02.24

    (pycharm 2022.01.13激活码)好多小伙伴总是说激活码老是失效,太麻烦,关注/收藏全栈君太难教程,2021永久激活的方法等着你。IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.net/100143.html2KLKA7BQFO-eyJsaWNlbnNlSWQi…

    2022年4月1日
    79

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号