最经典的大数据案例解析(附代码)

最经典的大数据案例解析(附代码)首先我们来说说需求假设以上就是我们需要处理的数据,我们需要计算出每个月天气最热的两天。首先我们对自己提出几个问题1.怎么划分数据,怎么定义一组???2.考虑reduce的计算复杂度???3.能不能多个reduce???4.如何避免数据倾斜???5.如何自定义数据类型???—-记录特点每年每个月温度最高2天1天多条记录怎么处理?—-进一步思考年月分组温度升序…

大家好,又见面了,我是你们的朋友全栈君。

首先我们来说说需求在这里插入图片描述
假设以上就是我们需要处理的数据,我们需要计算出每个月天气最热的两天。
这个案例用到的东西很多,如果你能静下心来好好看完,你一定会受益匪浅的
首先我们对自己提出几个问题
1.怎么划分数据,怎么定义一组???
2.考虑reduce的计算复杂度???
3.能不能多个reduce???
4.如何避免数据倾斜???
5.如何自定义数据类型???
—-记录特点
每年
每个月
温度最高
2天
1天多条记录怎么处理?
—-进一步思考
年月分组
温度升序
key中要包含时间和温度!
—-MR原语:相同的key分到一组
通过GroupCompartor设置分组规则
—-自定义数据类型Weather
包含时间
包含温度
自定义排序比较规则
—-自定义分组比较
年月相同被视为相同的key
那么reduce迭代时,相同年月的记录有可能是同一天的,reduce中需要判断是否同一天
注意OOM
—-数据量很大
全量数据可以切分成最少按一个月份的数据量进行判断
这种业务场景可以设置多个reduce
通过实现partition

一>>>MainClass的实现

package com.huawei.mr.weather;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Lpf.
 * @version 创建时间:2019年4月13日 下午7:43:40
 */
public class MainClass {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		// 输入错误返回提示
		if (args == null || args.length != 2) {
			System.out.println("输入格式有误");
			System.out.println("正确格式为:yarn jar weather.jar com.huawei.mr.weather.MainClass args[0] args[1]");
		}

		// 初始化hadoop默认配置文件,如果有指定的配置,则覆盖默认配置
		Configuration conf = new Configuration(true);
		// 创建Job对象,用到系统配置信息
		Job job = Job.getInstance(conf);
		// 指定job入口程序
		job.setJarByClass(MainClass.class);
		// 设置job名称
		job.setJobName("weather");
		// 指定文件从哪里读取,从hdfs加载一个输入文件给job
		FileInputFormat.addInputPath(job, new Path(args[0]));
		// 指定hdfs上一个不存在的路径作为job的输出路径
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		// 自主设置reduce的数量
		job.setNumReduceTasks(2);
		// 指定map输出中key的类型
		job.setMapOutputKeyClass(Weather.class);
		// 指定map输出中value的类型
		job.setMapOutputValueClass(Text.class);

		// 设置map中的比较器,如果不设置默认采用key类型自带的比较器
		/**
		 * 由于map里面的排序和这儿的排序不一样,称之为二次排序
		 */
		job.setSortComparatorClass(WetherComparator.class);

		// 设置分区器类型 避免数据倾斜
		job.setPartitionerClass(WeatherPartitioner.class);
		
		job.setMapperClass(WeatherMapper.class);
		job.setReducerClass(WeatherReduce.class);

		job.waitForCompletion(true);
	}
}

二 >>>Weather 自定义key的实现

    package com.huawei.mr.weather;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    /**
     * @author Lpf.
     * @version 创建时间:2019年4月13日 下午8:15:26
     * map中输出key的自定义
     */
    public class Weather implements WritableComparable<Weather> {
    
    	private String year;
    	private String month;
    	private String day;
    	private Integer weather;
    
    	public String getYear() {
    		return year;
    	}
    
    	public void setYear(String year) {
    		this.year = year;
    	}
    
    	public String getMonth() {
    		return month;
    	}
    
    	public void setMonth(String month) {
    		this.month = month;
    	}
    
    	public String getDay() {
    		return day;
    	}
    
    	public void setDay(String day) {
    		this.day = day;
    	}
    
    	public Integer getWeather() {
    		return weather;
    	}
    
    	public void setWeather(Integer weather) {
    		this.weather = weather;
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		// 把封装的数据序列化之后写出去
    		out.writeUTF(year);
    		out.writeUTF(month);
    		out.writeUTF(day);
    		out.writeInt(weather);
    	}
    	/*
    	 * 读写的顺序要一致
    	 */
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		// 把封装的数据序列化之后读进来
    		setYear(in.readUTF());
    		setMonth(in.readUTF());
    		setDay(in.readUTF());
    		setWeather(in.readInt());
    	}
    
    	@Override
    	public int compareTo(Weather that) {
    		int result = 0;
    		result = this.getYear().compareTo(that.getYear());
    		if (result == 0) {
    			result = this.getMonth().compareTo(that.getMonth());
    			if (result == 0) {
    				result = this.getDay().compareTo(that.getDay());
    				if (result == 0) {
    					// 如果年月日都相同,把温度按照高到低倒序排列
    					result = that.getWeather().compareTo(this.getWeather());
    				}
    			}
    		}
    
    		return result;
    	}
    }
    三 >>>自定义map中key的比较器用于排序
    package com.huawei.mr.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * @author Lpf.
 * @version 创建时间:2019年4月13日 下午8:29:41
 * map中的比较器设置
 */
public class WetherComparator extends WritableComparator {

	public WetherComparator() {
		super(Weather.class, true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		int result = 0;
		Weather wa = (Weather) a;
		Weather wb = (Weather) b;

		// 分组比较器要保证同年同月为一组 和Weather里面的排序规则不一样
		result = wa.getYear().compareTo(wb.getYear());
		if (result == 0) {
			result = wa.getMonth().compareTo(wb.getMonth());
			if (result == 0) {
				result = wb.getWeather().compareTo(wa.getWeather());
			}
		}
		return result;
	}
}

四>>>设置分区器避免数据倾斜

package com.huawei.mr.weather;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author Lpf.
 * @version 创建时间:2019年4月13日 下午8:47:46 
 * 分区器,避免数据倾斜
 */
public class WeatherPartitioner extends Partitioner<Weather, Text> {

	@Override
	public int getPartition(Weather key, Text value, int numPartitions) {

		String month = key.getMonth();
		int partitionNum = (month.hashCode() & Integer.MAX_VALUE) % numPartitions;
		return partitionNum;
	}
}

五>>>map里面对每一行的处理

    package com.huawei.mr.weather;
    
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * @author Lpf.
     * @version 创建时间:2019年4月13日 下午8:55:29 map里面的处理
     */
    public class WeatherMapper extends Mapper<LongWritable, Text, Weather, Text> {
    
    	private SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-mm-dd");
    
    	private Weather wea = new Weather();
    
    	@Override
    	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
    		// 每一行的数据格式为 1949-10-01 14:21:02 34c
    		String linStr = value.toString();
    		// {"1949-10-01 14:21:02","34c"}
    		String[] linStrs = linStr.split("\t");
    		// 得到温度
    		int weather = Integer.parseInt(linStrs[1].substring(0, linStrs[1].length() - 1));
    
    		// 获取时间
    		try {
    			Date date = DATE_FORMAT.parse(linStrs[0]);
    			Calendar calendar = Calendar.getInstance();
    			calendar.setTime(date);
    			int year = calendar.get(Calendar.YEAR);
    			int month = calendar.get(Calendar.MONTH);
    			int day = calendar.get(Calendar.DAY_OF_MONTH);
    			wea.setYear(year + "");
    			wea.setMonth(month + "");
    			wea.setDay(day + "");
    			wea.setWeather(weather);
    
    			// 把map中的值输出
    			context.write(wea, value);
    		} catch (ParseException e) {
    			e.printStackTrace();
    		}
    	}
    }
六>>>reduce里面的输出
package com.huawei.mr.weather;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author Lpf.
 * @version 创建时间:2019年4月13日 下午8:55:35 
 * reduce 里面的处理
 */
public class WeatherReduce extends Reducer<Weather, Text, Text, NullWritable> {

	@Override
	protected void reduce(Weather key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		Iterator<Text> iterator = values.iterator();
		Text text = null;
		String day = null;
		while (iterator.hasNext()) {
			text = iterator.next();
			if (day != null) {
				if (!day.equals(key.getDay())) {
					// 输出本月温度最高的第二天
					context.write(text, NullWritable.get());
					break;
				}
			} else {
				// 输出本月温度最高的第一天
				context.write(text, NullWritable.get());
				day = key.getDay();
			}
		}
	}
}

年纪上来了 坐一下腰就酸的要死注释补充的不是很完整,有不明白的留言,乐意解答

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/133993.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 常量指针,指针常量的区别是什么_指针常量与常量指针

    常量指针,指针常量的区别是什么_指针常量与常量指针**要有具备扎实指针知识……了解引用、指针的一些注意事项:引用并非对象引用必须初始化引用只能绑定在对象上,而不能与字面值或某个表达式的计算结果绑定在一起类型要严格匹配一、常量指针定义:又叫常指针,可以理解为常量的指针,也即这个是指针,但指向的是个常量,这个常量是指针的值(地址),而不是地址指向的值。关键点:常量指针指向的对象不能通过这个指针来修改,可是仍然可以通过原来的声明修改;常量指针可以被赋值为变量的地址,之所以叫常量指针,是限制了通过这个指针修改变量的值;指针还可以指向别

    2022年10月7日
    3
  • CentOS下yum的安装及配置

    CentOS下yum的安装及配置一般公司都用Linux来搭建服务器,Linux安装软件时能够用yum安装依赖包是一件非常简单而幸福的事情,因为你只需一个简单的安装命令yuminstall[]即可安装相应的软件,yum工具会自动的从网上yum源中下载相应的依赖包,并以正确的依赖关系一个个安装依赖包。下面简单介绍一下CentOS下安装yum源的流程和操作。一、查看、卸载已安装的yum包1、查看已安装的yum包

    2022年6月3日
    91
  • 训练集准确率很高,验证集准确率低问题

    训练集准确率很高,验证集准确率低问题训练集在训练过程中,loss稳步下降,准确率上升,最后能达到97%验证集准确率没有升高,一直维持在50%左右(二分类问题,随机概率)测试集准确率57%在网上搜索可能打的原因:1.learningrate太小,陷入局部最优2.训练集和测试集数据没有规律3.数据噪声太大4.数据量太小(总共1440个样本,80%为训练集)5.训练集和测试集数据分布不同:如训练集正样本太少(训练集和测试集每次运行随机选择,故排除)6.数据集存在问题,如标注有问题(采用公开数据集,排除)7.学习率过大8.模型

    2025年11月4日
    4
  • rinetd小记「建议收藏」

    官网:http://www.boutell.com/rinetd/下载地址:http://www.boutell.com/rinetd/http/rinetd.tar.gz编译安装:对于Windows,包rinetd.tar.gz已包含了编译好的可执行程序文件rinetd.exe,也可以使用VC(如VC6.0)去重新编译。对于Linux,只需要将包rin…

    2022年4月8日
    33
  • ubuntu16.04安装cuda9.0(ubuntu18安装nvidia驱动)

    (安装:NVIDIA-384+CUDA9.0+cuDNN7.1)Ubuntu下安装CUDA需要装NVIDIA驱动,首先进入NVIDIA官网,然后查询对应NVIDIA驱动是否支持你电脑的型号。第一步、安装NVIDIAGPU驱动去NVIDIA官网查询是否支持我电脑的GPU如下&amp;nbsp;可以看出:GeForce700MSeries(Notebooks):GeForceGTX…

    2022年4月14日
    63
  • 逆向 Framework.jar

    逆向 Framework.jar

    2021年12月15日
    45

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号