Hadoop1.2.0开发笔记(八)

Hadoop1.2.0开发笔记(八)本人一贯的风格是先了解系统的基础部分 然后在深入到高级部分 如果违背这种循序渐进的次序 也超出了本人的接受能力 古人说 学有本末 事有终始 知所先后 则尽道矣 我们还是从基础开始吧 本人上文提到的开发图片服务器还是放到后面吧 本人在第一篇文章中描述的 WordCount 单词统计程序是在单机环境运行的 现在我们改造一下 改造成在单机伪分布环境中运行新建 WordCount 类 继承 Configur

本人一贯的风格是先了解系统的基础部分,然后在深入到高级部分;如果违背这种循序渐进的次序,也超出了本人的接受能力。古人说,学有本末,事有终始,知所先后,则尽道矣。我们还是从基础开始吧(本人上文提到的开发图片服务器还是放到后面吧)

本人在第一篇文章中描述的WordCount单词统计程序是在单机环境运行的,现在我们改造一下,改造成在单机伪分布环境中运行

新建WordCount类,继承Configured,实现Tool接口

public class WordCount extends Configured implements Tool{ public static class Map extends Mapper 
    
     {  
    private 
    final 
    static IntWritable one = 
    new IntWritable(1 
    );  
    private Text word = 
    new 
     Text();  
    public 
    void 
     map(Object key, Text value, Context context )  
    throws 
     IOException, InterruptedException { StringTokenizer itr = 
    new 
     StringTokenizer(value.toString()); String str= 
    null 
    ;  
    while 
     (itr.hasMoreTokens()) { str= 
    itr.nextToken(); word.set(str); context.write(word, one); } } }  
    public 
    static 
    class Reduce 
    extends Reducer 
     
      {  
     private IntWritable result = 
     new 
      IntWritable();  
     public 
     void reduce(Text key, Iterable 
      
       values, Context context)  
      throws 
       IOException, InterruptedException {  
      int sum = 0 
      ;  
      for 
       (IntWritable val : values) { sum += 
       val.get(); } result.set(sum); context.write(key, result); } } @Override  
      public 
      int run(String[] args) 
      throws 
       Exception {  
      // 
       TODO Auto-generated method stub 
       File jarFile = EJob.createTempJar("bin" 
      ); EJob.addClasspath("/usr/hadoop/conf" 
      ); ClassLoader classLoader = 
       EJob.getClassLoader(); Thread.currentThread().setContextClassLoader(classLoader);  
      / 
       创建一个job,起个名字以便跟踪查看任务执行情况 * 
      */ 
       Job job = 
      new 
       Job(getConf()); ((JobConf) job.getConfiguration()).setJar(jarFile.toString());  
      / 
       * 当在hadoop集群上运行作业时,需要把代码打包成一个jar文件(hadoop会在集群分发这个文件), * 通过job的setJarByClass设置一个类,hadoop根据这个类找到所在的jar文件 * 
      */ 
       job.setJarByClass(WordCount. 
      class 
      ); job.setJobName("wordcount" 
      );  
      / 
       * 设置map和reduce函数的输入类型,这里没有代码是因为我们使用默认的TextInputFormat,针对文本文件,按行将文本文件切割成 * InputSplits, 并用 LineRecordReader 将 InputSplit 解析成 
        
        */ 
        / 
         设置map和reduce函数的输出键和输出值类型 * 
        */ 
         job.setOutputKeyClass(Text. 
        class 
        ); job.setOutputValueClass(IntWritable. 
        class 
        );  
        / 
         设置要使用的map、combiner、reduce类型 * 
        */ 
         job.setMapperClass(Map. 
        class 
        ); job.setCombinerClass(Reduce. 
        class 
        ); job.setReducerClass(Reduce. 
        class 
        );  
        / 
         设置输入和输出路径 * 
        */ 
         FileInputFormat.addInputPath(job,  
        new Path(args[0 
        ])); FileOutputFormat.setOutputPath(job,  
        new Path(args[1 
        ]));  
        / 
         提交作业并等待它完成 * 
        */ 
        // 
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
        return job.waitForCompletion( 
        true) ? 0 : 1 
        ; }  
        / 
         *  
        @param 
         args *  
        @throws 
         Exception  
        */ 
        public 
        static 
        void main(String[] args) 
        throws 
         Exception {  
        // 
         TODO Auto-generated method stub  
        // 
        hdfs/localhost:9000 String[] arg={"/test/input","/test/output" 
        };  
        int ret=ToolRunner.run( 
        new 
         WordCount(), arg);  
        // 
        int ret2=ToolRunner.run(conf, tool, args); 
         System.exit(ret); } } 
        
      
     
   

因为本人是在伪分布环境测试上面的单词统计程序,需要将该类打包成jar文件,本人这里采用程序中生成临时jar文件的方式

 public class EJob { // To declare global field private static List 
   
     classPath = 
    new ArrayList 
     
     ();  
     // 
      To declare method 
     public 
     static File createTempJar(String root) 
     throws 
      IOException {  
     if (! 
     new 
      File(root).exists()) {  
     return 
     null 
     ; } Manifest manifest = 
     new 
      Manifest(); manifest.getMainAttributes().putValue("Manifest-Version", "1.0" 
     );  
     final File jarFile = File.createTempFile("EJob-", ".jar", 
     new 
      File( System.getProperty("java.io.tmpdir" 
     ))); Runtime.getRuntime().addShutdownHook( 
     new 
      Thread() {  
     public 
     void 
      run() { jarFile.delete(); } }); JarOutputStream out = 
     new 
      JarOutputStream(  
     new 
      FileOutputStream(jarFile), manifest); createTempJarInner(out,  
     new File(root), "" 
     ); out.flush(); out.close();  
     return 
      jarFile; }  
     private 
     static 
     void 
      createTempJarInner(JarOutputStream out, File f, String base)  
     throws 
      IOException {  
     if 
      (f.isDirectory()) { File[] fl = 
      f.listFiles();  
     if (base.length() > 0 
     ) { base = base + "/" 
     ; }  
     for ( 
     int i = 0; i < fl.length; i++ 
     ) { createTempJarInner(out, fl[i], base + 
      fl[i].getName()); } }  
     else 
      { out.putNextEntry( 
     new 
      JarEntry(base)); FileInputStream in = 
     new 
      FileInputStream(f);  
     byte[] buffer = 
     new 
     byte[1024 
     ];  
     int n = 
      in.read(buffer);  
     while (n != -1 
     ) { out.write(buffer, 0 
     , n); n = 
      in.read(buffer); } in.close(); } }  
     public 
     static 
      ClassLoader getClassLoader() { ClassLoader parent = 
      Thread.currentThread().getContextClassLoader();  
     if (parent == 
     null 
     ) { parent = EJob. 
     class 
     .getClassLoader(); }  
     if (parent == 
     null 
     ) { parent = 
      ClassLoader.getSystemClassLoader(); }  
     return 
     new URLClassLoader(classPath.toArray( 
     new URL[0 
     ]), parent); }  
     public 
     static 
     void 
      addClasspath(String component) {  
     if ((component != 
     null) && (component.length() > 0 
     )) {  
     try 
      { File f = 
     new 
      File(component);  
     if 
      (f.exists()) { URL key = 
      f.getCanonicalFile().toURL();  
     if (! 
     classPath.contains(key)) { classPath.add(key); } } }  
     catch 
      (IOException e) { } } } } 
     
   

最后我们运行上面的WordCount类的main方法,记住先要将待统计的文件上传到HDFS文件系统的/test/input目录里面(可以采用本人上文中的编程方式上传或者在eclipse的UI界面上传) 

—————————————————————————  

本系列Hadoop1.2.0开发笔记系本人原创  

转载请注明出处 博客园 刺猬的温驯 

本文链接 http://www.cnblogs.com/chenying99/archive/2013/06/02/3113474.html

转载于:https://www.cnblogs.com/chenying99/archive/2013/06/02/3113474.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/214216.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午4:43
下一篇 2026年3月18日 下午4:44


相关推荐

  • J2SE框架

    J2SE框架尚学堂马老师主讲的 J2SE 分十个模块 每个模块所讲的主要内容见下图 nbsp nbsp nbsp nbsp nbsp 每一模块的重点内容都已明确 细节的知识还需要总结 学习首先要掌握整体框架 然后在消化细节 这是短时间内学习最快最有效的方法 学会学习真的不是一件简单的事情

    2026年3月16日
    2
  • 网页在线视频下载教程(m3u8格式介绍及下载教程)「建议收藏」

    简介:m3u8文件是苹果公司使用的HTTPLiveStreaming(HLS)协议格式的基础。HLS是新一代流媒体传输协议,其基本实现原理为将一个大的媒体文件进行分片,将该分片文件资源路径记录与m3u8文件(即playlist)内,其中附带一些额外描述(比如该资源的多带宽信息等…)用于提供给客户端。客户端依据该m3u8文件可获取对应的媒体资源,进行播放。因此,客户端获取HLS流文件,主…

    2022年4月4日
    121
  • inception v3网络_Netmarble

    inception v3网络_Netmarble本文叙述了CNN历史以来的发展模型,介绍了发展流程以及发展路线,并对比了各种Inception模型区别,详细介绍了Inceptionv3模型,读者可以轻松了解到Inception模型的发展情况。

    2022年8月14日
    8
  • linux使用ps命令查看和控制进程_linux查看进程grep

    linux使用ps命令查看和控制进程_linux查看进程grepps命令Linuxps(英文全拼:processstatus)命令用于显示当前进程的状态,类似于windows的任务管理器查看所有进程ps-A显示所有进程信息,连同命令行ps-

    2022年7月31日
    8
  • VUE中diff比较

    VUE中diff比较

    2021年6月28日
    116
  • .net core开发工具_github 爬虫

    .net core开发工具_github 爬虫没有爬虫就没有互联网!爬虫的意义在于采集大批量数据,然后基于此进行加工/分析,做更有意义的事情。谷歌,百度,今日头条,天眼查都离不开爬虫。去开源中国和Github查询C#的爬虫项目,仅有…

    2025年11月20日
    4

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号