hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系

hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系

大家好,又见面了,我是你们的朋友全栈君。

1,在介绍hadoop写文件的时候我们经常会说首先分割文件为多个块;那么是怎么分割的呢?

这里其实不要有过的纠结,这里的块是block,是hdfs中切块的大小,属于物理划分,默认64M,在hadoop-default.xml配置中有体现:

<property>  
  <name>dfs.block.size</name>  
  <value>67108864</value>  
  <description>The default block size for new files.</description>  
</property>  

当然如果文件没有64M也不会占据整块空间。

将文件分割成多个块后,形成一个数据队列,然后依次写入datanode列表。

再者,如果写入的是个文件夹,而且每个文件的都不大,这样在hdfs中是默认每个文件一个块的,即使没有64m,当然也可做优化处理,不过hbase更便利于处理把小文件合并到一个块中,这个我会在其他博文中介绍。

2,下面我们说说split,并与block的关系

首先,split是mapreduce中的概念,而block是hdfs中切块的大小。

如下:

//设置要处理的文本数据所存放的路径  
        FileInputFormat.setInputPaths(wordCountJob, "hdfs://ubuntu:9000/input/aa.txt");  
        FileOutputFormat.setOutputPath(wordCountJob, new Path("hdfs://ubuntu:9000/output/"));  

我们设置要处理的文件路径时都会用到fileInputFormat类, 不过我们更多看到的是inputFormat,其实fileInputFormat这个类的也是实现inputFomat接口,

下面我们接着看源码,说明为什么需要分片?

以hadoop自带的wordCount的源码为例:

for (int i = 0; i < otherArgs.length - 1; ++i) {  
  FileInputFormat.addInputPath(job, new Path(otherArgs[i]));  
}  
FileOutputFormat.setOutputPath(job,  
  new Path(otherArgs[otherArgs.length - 1]));  
System.exit(job.waitForCompletion(true) ? 0 : 1);  

我们看到使用的InputFormat是FileOutputFormat,任务执行调用了Job的waitForCompletion方法。waitForCompletion方法中真正提交job的代码如下:

public boolean waitForCompletion(boolean verbose  
                                 ) throws IOException, InterruptedException,  
                                          ClassNotFoundException {  
  if (state == JobState.DEFINE) {  
    submit();  
  }  
  // 省略本文不关心的代码  
  return isSuccessful();  
}  

这里的submit方法的实现如下:

public void submit()   
         throws IOException, InterruptedException, ClassNotFoundException {  
    // 省略本文不关心的代码</span>  
    final JobSubmitter submitter =   
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());  
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {  
      public JobStatus run() throws IOException, InterruptedException,   
      ClassNotFoundException {  
        return submitter.submitJobInternal(Job.this, cluster);  
      }  
    });  
    state = JobState.RUNNING;  
    LOG.info("The url to track the job: " + getTrackingURL());  
   }  

submit方法首先创建了JobSubmitter实例,然后异步调用了JobSubmitter的submitJobInternal方法。JobSubmitter的submitJobInternal方法有关划分任务的代码如下:

// Create the splits for the job  
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));  
int maps = writeSplits(job, submitJobDir);  
conf.setInt(MRJobConfig.NUM_MAPS, maps);  
LOG.info("number of splits:" + maps); 

writeSplits方法的实现如下:

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,  
    Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  JobConf jConf = (JobConf)job.getConfiguration();  
  int maps;  
  if (jConf.getUseNewMapper()) {  
    maps = writeNewSplits(job, jobSubmitDir);  
  } else {  
    maps = writeOldSplits(jConf, jobSubmitDir);  
  }  
  return maps;  
}  

由于WordCount使用的是新的mapreduce API,所以最终会调用writeNewSplits方法。writeNewSplits的实现如下:

private <T extends InputSplit>  
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  Configuration conf = job.getConfiguration();  
  InputFormat<?, ?> input =  
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);  
  
  List<InputSplit> splits = input.getSplits(job);  
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);  
  
  // sort the splits into order based on size, so that the biggest  
  // go first  
  Arrays.sort(array, new SplitComparator());  
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf,   
      jobSubmitDir.getFileSystem(conf), array);  
  return array.length;  
}  

writeNewSplits方法中,划分任务数量最关键的代码即为InputFormat的getSplits方法(提示:大家可以直接通过此处的调用,查看不同InputFormat的划分任务实现)。根据前面的分析我们知道此时的InputFormat即为FileOutputFormat,其getSplits方法的实现如下:

public List<InputSplit> getSplits(JobContext job) throws IOException {  
  Stopwatch sw = new Stopwatch().start();  
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
  long maxSize = getMaxSplitSize(job);  
  
  // generate splits  
  List<InputSplit> splits = new ArrayList<InputSplit>();  
  List<FileStatus> files = listStatus(job);  
  for (FileStatus file: files) {  
    Path path = file.getPath();  
    long length = file.getLen();  
    if (length != 0) {  
      BlockLocation[] blkLocations;  
      if (file instanceof LocatedFileStatus) {  
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();  
      } else {  
        FileSystem fs = path.getFileSystem(job.getConfiguration());  
        blkLocations = fs.getFileBlockLocations(file, 0, length);  
      }  
      if (isSplitable(job, path)) {  
        long blockSize = file.getBlockSize();  
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);  
  
        long bytesRemaining = length;  
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,  
                      blkLocations[blkIndex].getHosts(),  
                      blkLocations[blkIndex].getCachedHosts()));  
          bytesRemaining -= splitSize;  
        }  
  
        if (bytesRemaining != 0) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,  
                     blkLocations[blkIndex].getHosts(),  
                     blkLocations[blkIndex].getCachedHosts()));  
        }  
      } else { // not splitable  
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),  
                    blkLocations[0].getCachedHosts()));  
      }  
    } else {   
      //Create empty hosts array for zero length files  
      splits.add(makeSplit(path, 0, length, new String[0]));  
    }  
  }  
  // Save the number of input files for metrics/loadgen  
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());  
  sw.stop();  
  if (LOG.isDebugEnabled()) {  
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()  
        + ", TimeTaken: " + sw.elapsedMillis());  
  }  
  return splits;  
}  

totalSize:是整个Map-Reduce job所有输入的总大小。

numSplits:来自job.getNumMapTasks(),即在job启动时用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)设置的值,给M-R框架的Map数量的提示。

goalSize:是输入总大小与提示Map task数量的比值,即期望每个Mapper处理多少的数据,仅仅是期望,具体处理的数据数由下面的computeSplitSize决定。

minSplitSize:默认为1,可由子类复写函数protected void setMinSplitSize(long minSplitSize) 重新设置。一般情况下,都为1,特殊情况除外

minSize:取的1和mapred.min.split.size中较大的一个。

blockSize:HDFS的块大小,默认为64M,一般大的HDFS都设置成128M。

splitSize:就是最终每个Split的大小,那么Map的数量基本上就是totalSize/splitSize。

接下来看看computeSplitSize的逻辑:首先在goalSize(期望每个Mapper处理的数据量)和HDFS的block size中取较小的,然后与mapred.min.split.size相比取较大的

 

  一个片为一个splits,即一个map,只要搞清楚片的大小,就能计算出运行时的map数。而一个split的大小是由goalSize, minSize, blockSize这三个值决定的。computeSplitSize的逻辑是,先从goalSize和blockSize两个值中选出最小的那个(比如一般不设置map数,这时blockSize为当前文件的块size,而goalSize是文件大小除以用户设置的map数得到的,如果没设置的话,默认是1),在默认的大多数情况下,blockSize比较小。然后再取blockSize和minSize中最大的那个。而minSize如果不通过”mapred.min.split.size”设置的话(”mapred.min.split.size”默认为0),minSize为1,可理解为一个block块,这样得出的一个splits的size就是blockSize,即一个块一个map,有多少块就有多少map。

split的大小时默认和hdfs的block块大小一致,但是可以通过配置文件自己设置: 
其中有俩个配置文件(如下):

--minsize   默认大小为1
mapreduce.input.fileinputformat.split.minsize  

--maxsize   默认大小为Long.MAXValue 
mapreduce.input.fileinputformat.split.maxsize

举例:

比如说我问写入一个文件夹,里面有10个只有几k的文件,所以List<FileStatus> files = listStatus(job);方法返回的files列表的大小为10。在遍历files列表的过程中,会获取每个文件的blockSize,最终调用computeSplitSize方法计算每个输入文件应当划分的任务数。computeSplitSize方法的实现如下:

protected long computeSplitSize(long blockSize, long minSize,  
                                long maxSize) {  
  return Math.max(minSize, Math.min(maxSize, blockSize));  
}  

然后根据

  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  

很明显取split默认值,也就是一个块,那么10个就要分为10块,这也说明为什么处理小文件时,block的大小小于split的 大小。同时我们看到了一个split分配一个map任务。

这里我们可以总结下split大小与block的关系:

(1)block块的小于split分片的最小值,那split的值就是split分片的大小

(2)block块的小大介于split分片配置的最小值和最大值之间,block的大小就是split的大小。

(3)block块的大小大于split分片的最大值,split的大小就是split配置的最大值。但会增加map执行的并发度,但是会造成在节点之间拉取数据

也有公式可以计算split也就是map任务数,这里就不做讨论了。

一个map对应一个split分片吗?

经过上面的讨论,答案是显而易见的:

map个数:由任务切片spilt决定的,默认情况下一个split的大小就是block
由参与任务的文件个数决定的 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/106130.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • qca wlan wifi modules 解析四

    qca wlan wifi modules 解析四WiFi驱动架构的一般层次为:应用层BSDsocket层TCP/IP协议层IP层网络设备层net/coremac8011层/ieee80211设备驱动层具体实例如下图:上层应用程序简历socket,对网络接口进行ioctl操作,正是通过触发,网络设备和80211层,调用底层驱动函数来实现的。qcawlanmodules中,通过创建虚拟AP来实现WiFi功能,即VAP…

    2022年7月11日
    17
  • 你的账户被停用,请向系统管理员咨询_win10退出管理员账户

    你的账户被停用,请向系统管理员咨询_win10退出管理员账户当你的电脑误操作了以下步骤,或者被篡改了设置了这里那恭喜你,重启后就登不上Administrator账户了首先看一下网上的两种无效方式无效方式一:带命令符的安全模式一般两种方式进入安全模式:方式一:F8进入方式二:按住shift重启cmd中输入netuseradministrator/active:yes亲测无效,依然登录不进去无效方式二:PE进入设置用户和组…

    2025年7月18日
    4
  • 广域网技术

    广域网技术1.以太网直连不同网段互通arpstatic10.1.11.200e0-fc8e-6612(绑定静态arp表项,不使用arp广播解析mac,直接使用绑定mac封装)iproute-s10

    2022年7月2日
    28
  • WebForm的Grid页面页脚DropDownList事件[通俗易懂]

    WebForm的Grid页面页脚DropDownList事件[通俗易懂]帖子http://community.csdn.net/Expert/topic/3506/3506615.xml?temp=.5394251中,提到了一个很有意思的问题。【我的datagrid的页脚有一个dropdownload控件,如果是一个BUTTON按钮的话,可以在datagrid的DG_Card_ItemCommand事件里来响应BUTTON按钮的事件,e.CommandName==

    2025年10月28日
    3
  • Python负数取余总结

    Python负数取余总结Python负数取余总结余数存在正余数和负余数,要了解负余数,需要先了解取整原理17//5=3-17//5=-417//-5=-4-17//-5=3根据上述的4个公式,可以看出python的编译器是的取整符号位由被除数和除数同时决定,整数的数值是由向下取整的,即如果整数的符号位正,则取靠近0的数,如果整数是负数,则取远离0的数或者也可以这样理解:被除数和除数处于0的一边就往靠0的方向取整,如果是处于0的两边就往远离0的方向取整。了解了取整原理后,再理解取余就比较简单了17

    2022年5月8日
    39
  • 智能菜品量推荐——RapidMiner(一)

    智能菜品量推荐——RapidMiner(一)前言 本文是一篇阅读RapidMiner手册,结合当下目标产品做出的文字概述总结。RapidMiner与本产品需求非常贴切,对其进行理解与整理,贴出作为记录与项目书素材。 餐饮业盈利核心在于菜品与顾客。什么样的菜系和种类会吸引更多的顾客,每个客户的就餐喜好是什么,不同时期什么样的菜品最为畅销,能否通过几种不同菜品的组合达到更好的效果,是否可预测未来一段时间内菜品原材料的采购份量?一、无技术支撑下已有数据可以干什么? ① 点餐前,由有经验的服务员对顾客进行菜品…

    2022年8月18日
    7

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号