MapReduce 编程不可怕,一篇文章搞定它

MapReduce 编程不可怕,一篇文章搞定它前言本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系正文需求:WordCount,大数据领域的HelloWorld。Mapperpackagecom.shockang.study.bigdata.mapreduce;importjava.io.IOException;importorg.apache.hadoop.io.IntWr

大家好,又见面了,我是你们的朋友全栈君。

前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

正文

需求: Word Count,大数据领域的 Hello World。

Mapper

package com.shockang.study.bigdata.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { 
   
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException { 
   
        String[] words = value.toString().split(" ");
        for (String word : words) { 
   
            // 每个单词出现1次,作为中间结果输出
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

Reducer

package com.shockang.study.bigdata.mapreduce;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 
   
    /* key: hello value: List(1, 1, ...) */
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException { 
   
        int sum = 0;

        for (IntWritable count : values) { 
   
            sum = sum + count.get();
        }
        context.write(key, new IntWritable(sum));
    };
}

Main

package com.shockang.study.bigdata.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCountMain { 
   
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException { 
   
        if (args.length != 2) { 
   
            System.out.println("please input Path!");
            System.exit(0);
        }

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName());

        // 打jar包
        job.setJarByClass(WordCountMain.class);

        // 通过job设置输入/输出格式,默认的就是 TextInputFormat/TextOutputFormat
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 设置输入/输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 设置处理Map/Reduce阶段的类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行
        //如果不一样,需要分别设置map, reduce的输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 设置最终输出key/value的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 提交作业
        job.waitForCompletion(true);

    }

}

Combiner

package com.shockang.study.bigdata.mapreduce;


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class WordCountMainWithCombiner { 
   
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException { 
   
        if (args.length != 2) { 
   
            System.out.println("please input Path!");
            System.exit(0);
        }

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, WordCountMainWithCombiner.class.getSimpleName());

        // 打jar包
        job.setJarByClass(WordCountMainWithCombiner.class);

        // 通过job设置输入/输出格式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 设置输入/输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 设置处理Map/Reduce阶段的类
        job.setMapperClass(WordCountMapper.class);
        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);
        //如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行
        // 如果不一样,需要分别设置map, reduce的输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 设置最终输出key/value的类型m
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 提交作业
        job.waitForCompletion(true);

    }
}

二次排序

package com.shockang.study.bigdata.mapreduce;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/** * 现有需求,需要自定义key类型,并自定义key的排序规则,如按照人的salary降序排序,若相同,则再按age升序排序 */
public class Person implements WritableComparable<Person> { 
   
    private String name;
    private int age;
    private int salary;

    public Person() { 
   
    }

    public Person(String name, int age, int salary) { 
   
        this.name = name;
        this.age = age;
        this.salary = salary;
    }

    public String getName() { 
   
        return name;
    }

    public void setName(String name) { 
   
        this.name = name;
    }

    public int getAge() { 
   
        return age;
    }

    public void setAge(int age) { 
   
        this.age = age;
    }

    public int getSalary() { 
   
        return salary;
    }

    public void setSalary(int salary) { 
   
        this.salary = salary;
    }

    @Override
    public String toString() { 
   
        return this.salary + " " + this.age + " " + this.name;
    }

    public int compareTo(Person o) { 
   
        //先比较salary,高的排序在前;若相同,age小的在前
        int compareResult1 = this.salary - o.salary;
        if (compareResult1 != 0) { 
   
            return -compareResult1;
        } else { 
   
            return this.age - o.age;
        }
    }

    public void write(DataOutput dataOutput) throws IOException { 
   
        //序列化,将NewKey转化成使用流传输的二进制
        dataOutput.writeUTF(name);
        dataOutput.writeInt(age);
        dataOutput.writeInt(salary);
    }

    public void readFields(DataInput dataInput) throws IOException { 
   
        //使用in读字段的顺序,要与write方法中写的顺序保持一致
        this.name = dataInput.readUTF();
        this.age = dataInput.readInt();
        this.salary = dataInput.readInt();
    }
}

自定义分区

package com.shockang.study.bigdata.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;

public class CustomPartitioner extends Partitioner<Text, IntWritable> { 
   
    public static HashMap<String, Integer> dict = new HashMap<>();

    static { 
   
        dict.put("Dear", 0);
        dict.put("Bear", 1);
        dict.put("River", 2);
        dict.put("Car", 3);
    }

    public int getPartition(Text text, IntWritable intWritable, int i) { 
   
        return dict.get(text.toString());
    }
}

数据倾斜处理

当遇到数据倾斜的时候,我们可以在 Reducer 中日志记录哪些超过阈值的 key

package com.shockang.study.bigdata.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class WordCountReducerWithDataSkew extends Reducer<Text, IntWritable, Text, IntWritable> { 
   

    public static final String MAX_VALUES = "skew.maxvalues";
    private int maxValueThreshold;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException { 
   
        Configuration conf = context.getConfiguration();
        maxValueThreshold = Integer.parseInt(conf.get(MAX_VALUES));
    }

    /* key: hello value: List(1, 1, ...) */
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException { 
   
        int i = 0;
        for (IntWritable value : values) { 
   
            System.out.println(value);
            i++;
        }

        if (++i > maxValueThreshold) { 
   
            System.out.println("Received " + i + " values for key " + key);
        }
    }
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/147546.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Java|JavaScript 模拟钓鱼网站实例一[通俗易懂]

    Java|JavaScript 模拟钓鱼网站实例一[通俗易懂]本次只是用最简单的方法模拟钓鱼网站。前端的代码是从网上下载的,我只是做了稍微的修改。整个项目的源码如下(2018年2月10日在审核估计2月11后可以下载):http://download.csdn.net/download/qq78442761/10247969最后的效果是如下图:输入用户名和密码后,会提醒服务器繁忙然后,我们登录到另外一个页面进行查看:这里涉及如下技术:1.修改网上下载的html…

    2022年8月24日
    4
  • idea正则替换小技巧「建议收藏」

    idea正则替换小技巧「建议收藏」相信很多idea开发的人都遇到过要替换某些拷贝的内容然后转换成自己的注释,但是一个一个替换又太麻烦,正则替换这时就是快速解决的办法。如下面图所示这个java类里面的属性就是mybatis的自动生成,带了很多注释,但如果想换成swagger来展示给前端看的时候就需要替换成下面的样子。想要做到这一步只需要执行下面的正则就行。首先点击打开替换窗口,idea原生快捷键按CTRL+R就行,全…

    2022年9月27日
    0
  • 【quorum源码】quorum tessera源码剖析

    【quorum源码】quorum tessera源码剖析tessera是quorum的一种隐私管理器实现,使用Java语言编写,用于对quorum隐私交易的加密、解密和分发。

    2022年5月30日
    39
  • 倒立摆起摆控制_旋转倒立摆原理

    倒立摆起摆控制_旋转倒立摆原理**基于STM32控制的旋转倒立摆**文章目录基于STM32控制的旋转倒立摆前言一、旋转倒立摆的结构1.相对编码器与绝对编码器2.相对编码器与绝对编码器的信号采集3.STM32编码器模式4.使用STM32CubeMx配置过程二、倒立摆模型建立三、实验方案与实验现象1.整体方案2.实验现象与上位机数据反思与总结前言近期在学习简易旋转倒立摆装置,倒立摆其实是一个十分经典的自动控制模型,不过开始学习了解结构和原理还是花了很多时间,在思路以及调试过程中遇到了很多困难。我认为倒立摆有两个难点,一个是自动

    2022年8月18日
    12
  • MySql数据库增删改查常用语句命令「建议收藏」

    MySql数据库增删改查常用语句命令「建议收藏」文章目录增删改查语句库操作表操作增删改查实例准备表插入数据修改表数据删除表数据查询表数据常见的MySQL语句命令常见MySQL字段含义增删改查语句增删改查的语句命令为:操作命令增insert删delete改update查select或者show库操作操作代码创建数据库createdatabaseshujuku;…

    2022年5月30日
    32
  • vue文件下载功能_vue实现下载功能

    vue文件下载功能_vue实现下载功能vue下载文件常用的几种方式一、直接打开直接打开是指我们直接使用window.open(URL)的方法优点:简单操作缺点:没办法携带token二、我们可以自己封装一个方法,比如如下:importaxiosfrom”axios”import*asauthfrom’@/utils/auth.js’letajax=axios.create({baseURL:process.env.VUE_APP_BASE_API,timeout:100000}

    2022年10月24日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号