hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系

hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系

大家好,又见面了,我是你们的朋友全栈君。

1,在介绍hadoop写文件的时候我们经常会说首先分割文件为多个块;那么是怎么分割的呢?

这里其实不要有过的纠结,这里的块是block,是hdfs中切块的大小,属于物理划分,默认64M,在hadoop-default.xml配置中有体现:

<property>  
  <name>dfs.block.size</name>  
  <value>67108864</value>  
  <description>The default block size for new files.</description>  
</property>  

当然如果文件没有64M也不会占据整块空间。

将文件分割成多个块后,形成一个数据队列,然后依次写入datanode列表。

再者,如果写入的是个文件夹,而且每个文件的都不大,这样在hdfs中是默认每个文件一个块的,即使没有64m,当然也可做优化处理,不过hbase更便利于处理把小文件合并到一个块中,这个我会在其他博文中介绍。

2,下面我们说说split,并与block的关系

首先,split是mapreduce中的概念,而block是hdfs中切块的大小。

如下:

//设置要处理的文本数据所存放的路径  
        FileInputFormat.setInputPaths(wordCountJob, "hdfs://ubuntu:9000/input/aa.txt");  
        FileOutputFormat.setOutputPath(wordCountJob, new Path("hdfs://ubuntu:9000/output/"));  

我们设置要处理的文件路径时都会用到fileInputFormat类, 不过我们更多看到的是inputFormat,其实fileInputFormat这个类的也是实现inputFomat接口,

下面我们接着看源码,说明为什么需要分片?

以hadoop自带的wordCount的源码为例:

for (int i = 0; i < otherArgs.length - 1; ++i) {  
  FileInputFormat.addInputPath(job, new Path(otherArgs[i]));  
}  
FileOutputFormat.setOutputPath(job,  
  new Path(otherArgs[otherArgs.length - 1]));  
System.exit(job.waitForCompletion(true) ? 0 : 1);  

我们看到使用的InputFormat是FileOutputFormat,任务执行调用了Job的waitForCompletion方法。waitForCompletion方法中真正提交job的代码如下:

public boolean waitForCompletion(boolean verbose  
                                 ) throws IOException, InterruptedException,  
                                          ClassNotFoundException {  
  if (state == JobState.DEFINE) {  
    submit();  
  }  
  // 省略本文不关心的代码  
  return isSuccessful();  
}  

这里的submit方法的实现如下:

public void submit()   
         throws IOException, InterruptedException, ClassNotFoundException {  
    // 省略本文不关心的代码</span>  
    final JobSubmitter submitter =   
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());  
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {  
      public JobStatus run() throws IOException, InterruptedException,   
      ClassNotFoundException {  
        return submitter.submitJobInternal(Job.this, cluster);  
      }  
    });  
    state = JobState.RUNNING;  
    LOG.info("The url to track the job: " + getTrackingURL());  
   }  

submit方法首先创建了JobSubmitter实例,然后异步调用了JobSubmitter的submitJobInternal方法。JobSubmitter的submitJobInternal方法有关划分任务的代码如下:

// Create the splits for the job  
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));  
int maps = writeSplits(job, submitJobDir);  
conf.setInt(MRJobConfig.NUM_MAPS, maps);  
LOG.info("number of splits:" + maps); 

writeSplits方法的实现如下:

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,  
    Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  JobConf jConf = (JobConf)job.getConfiguration();  
  int maps;  
  if (jConf.getUseNewMapper()) {  
    maps = writeNewSplits(job, jobSubmitDir);  
  } else {  
    maps = writeOldSplits(jConf, jobSubmitDir);  
  }  
  return maps;  
}  

由于WordCount使用的是新的mapreduce API,所以最终会调用writeNewSplits方法。writeNewSplits的实现如下:

private <T extends InputSplit>  
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,  
    InterruptedException, ClassNotFoundException {  
  Configuration conf = job.getConfiguration();  
  InputFormat<?, ?> input =  
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);  
  
  List<InputSplit> splits = input.getSplits(job);  
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);  
  
  // sort the splits into order based on size, so that the biggest  
  // go first  
  Arrays.sort(array, new SplitComparator());  
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf,   
      jobSubmitDir.getFileSystem(conf), array);  
  return array.length;  
}  

writeNewSplits方法中,划分任务数量最关键的代码即为InputFormat的getSplits方法(提示:大家可以直接通过此处的调用,查看不同InputFormat的划分任务实现)。根据前面的分析我们知道此时的InputFormat即为FileOutputFormat,其getSplits方法的实现如下:

public List<InputSplit> getSplits(JobContext job) throws IOException {  
  Stopwatch sw = new Stopwatch().start();  
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  
  long maxSize = getMaxSplitSize(job);  
  
  // generate splits  
  List<InputSplit> splits = new ArrayList<InputSplit>();  
  List<FileStatus> files = listStatus(job);  
  for (FileStatus file: files) {  
    Path path = file.getPath();  
    long length = file.getLen();  
    if (length != 0) {  
      BlockLocation[] blkLocations;  
      if (file instanceof LocatedFileStatus) {  
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();  
      } else {  
        FileSystem fs = path.getFileSystem(job.getConfiguration());  
        blkLocations = fs.getFileBlockLocations(file, 0, length);  
      }  
      if (isSplitable(job, path)) {  
        long blockSize = file.getBlockSize();  
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);  
  
        long bytesRemaining = length;  
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,  
                      blkLocations[blkIndex].getHosts(),  
                      blkLocations[blkIndex].getCachedHosts()));  
          bytesRemaining -= splitSize;  
        }  
  
        if (bytesRemaining != 0) {  
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);  
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,  
                     blkLocations[blkIndex].getHosts(),  
                     blkLocations[blkIndex].getCachedHosts()));  
        }  
      } else { // not splitable  
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),  
                    blkLocations[0].getCachedHosts()));  
      }  
    } else {   
      //Create empty hosts array for zero length files  
      splits.add(makeSplit(path, 0, length, new String[0]));  
    }  
  }  
  // Save the number of input files for metrics/loadgen  
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());  
  sw.stop();  
  if (LOG.isDebugEnabled()) {  
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()  
        + ", TimeTaken: " + sw.elapsedMillis());  
  }  
  return splits;  
}  

totalSize:是整个Map-Reduce job所有输入的总大小。

numSplits:来自job.getNumMapTasks(),即在job启动时用org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)设置的值,给M-R框架的Map数量的提示。

goalSize:是输入总大小与提示Map task数量的比值,即期望每个Mapper处理多少的数据,仅仅是期望,具体处理的数据数由下面的computeSplitSize决定。

minSplitSize:默认为1,可由子类复写函数protected void setMinSplitSize(long minSplitSize) 重新设置。一般情况下,都为1,特殊情况除外

minSize:取的1和mapred.min.split.size中较大的一个。

blockSize:HDFS的块大小,默认为64M,一般大的HDFS都设置成128M。

splitSize:就是最终每个Split的大小,那么Map的数量基本上就是totalSize/splitSize。

接下来看看computeSplitSize的逻辑:首先在goalSize(期望每个Mapper处理的数据量)和HDFS的block size中取较小的,然后与mapred.min.split.size相比取较大的

 

  一个片为一个splits,即一个map,只要搞清楚片的大小,就能计算出运行时的map数。而一个split的大小是由goalSize, minSize, blockSize这三个值决定的。computeSplitSize的逻辑是,先从goalSize和blockSize两个值中选出最小的那个(比如一般不设置map数,这时blockSize为当前文件的块size,而goalSize是文件大小除以用户设置的map数得到的,如果没设置的话,默认是1),在默认的大多数情况下,blockSize比较小。然后再取blockSize和minSize中最大的那个。而minSize如果不通过”mapred.min.split.size”设置的话(”mapred.min.split.size”默认为0),minSize为1,可理解为一个block块,这样得出的一个splits的size就是blockSize,即一个块一个map,有多少块就有多少map。

split的大小时默认和hdfs的block块大小一致,但是可以通过配置文件自己设置: 
其中有俩个配置文件(如下):

--minsize   默认大小为1
mapreduce.input.fileinputformat.split.minsize  

--maxsize   默认大小为Long.MAXValue 
mapreduce.input.fileinputformat.split.maxsize

举例:

比如说我问写入一个文件夹,里面有10个只有几k的文件,所以List<FileStatus> files = listStatus(job);方法返回的files列表的大小为10。在遍历files列表的过程中,会获取每个文件的blockSize,最终调用computeSplitSize方法计算每个输入文件应当划分的任务数。computeSplitSize方法的实现如下:

protected long computeSplitSize(long blockSize, long minSize,  
                                long maxSize) {  
  return Math.max(minSize, Math.min(maxSize, blockSize));  
}  

然后根据

  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));  

很明显取split默认值,也就是一个块,那么10个就要分为10块,这也说明为什么处理小文件时,block的大小小于split的 大小。同时我们看到了一个split分配一个map任务。

这里我们可以总结下split大小与block的关系:

(1)block块的小于split分片的最小值,那split的值就是split分片的大小

(2)block块的小大介于split分片配置的最小值和最大值之间,block的大小就是split的大小。

(3)block块的大小大于split分片的最大值,split的大小就是split配置的最大值。但会增加map执行的并发度,但是会造成在节点之间拉取数据

也有公式可以计算split也就是map任务数,这里就不做讨论了。

一个map对应一个split分片吗?

经过上面的讨论,答案是显而易见的:

map个数:由任务切片spilt决定的,默认情况下一个split的大小就是block
由参与任务的文件个数决定的 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/106130.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • jsonObject [通俗易懂]

    jsonObject [通俗易懂]JSON就是一串字符串只不过元素会使用特定的符号标注。{"age":14;"name":“lisi”}这就是一个对象了json数组含有多个json对

    2022年7月4日
    21
  • javascript activexobject_activex控件

    javascript activexobject_activex控件一、什么是ActiveX控件?    ActiveX控件广泛用于Internet。它们可以通过提供视频、动画内容等来增加浏览的乐趣。不过,这些程序可能出问题或者向您提供不需要的内容。在某些情况下,这些程序可被用来以您不允许的方式从计算机收集信息、破坏您的计算机上的数据、在未经您同意的情况下在您的计算机上安装软件或者允许他人远程控制您的计算机。一般软件需要用户单独下载然后执行

    2022年10月14日
    0
  • Quartz中时间表达式的设置—–corn表达式

    Quartz中时间表达式的设置—–corn表达式

    2021年12月9日
    121
  • 测试用例附实例[通俗易懂]

    一、测试用例的概念测试用例是测试过程中很重要的一类文档,它是测试工作的核心,是一组在测试时输入和输出的标准,是软件需求的具体对照。二、测试用例的作用检验软件是否满足客户需求 测试人员的工作量的一种体现 展示测试用例的设计思路三、测试用例的内容测试用例八个基本项是:测试用例编号、测试项目、测试标题、重要级别、预置条件、输入、操作步骤、预期输出(不同公司的测试用例内容不尽相同…

    2022年4月13日
    52
  • 除了we tool还有哪些免费安全好用的微信群发软件?这两个软件比we tool好用!

    除了we tool还有哪些免费安全好用的微信群发软件?这两个软件比we tool好用!除了wetool还有哪些安全好用的微信群发软件?群发工具是社群运营使用频率最高的工具,无论是给群内推送消息,还是给个人推送消息。按键精灵:点击左侧链接下载按键精灵是一款模拟鼠标键盘动作的软件。通过制作脚本,可以让按键精灵代替您的双手,自动执行一系列鼠标键盘动作。按键精灵免费版简单易用,不需要任何编程知识就可以作出功能强大的脚本。只要您在电脑前用双手可以完成的动作,按键精灵都可以替您完成。按键精灵用途广泛,具有大量脚本资源。简单百宝箱:点击左侧链接下载简单百宝箱是一个绿色和安全的游戏

    2022年6月4日
    92
  • 方格子服务器系统,方格子无盘服务器配置推荐方案[通俗易懂]

    方格子服务器系统,方格子无盘服务器配置推荐方案[通俗易懂]方格子无盘服务器配置推荐方案内容精选换一换虚拟私有云使用限制如表1所示。以上配额说明针对单租户情况。一个网络ACL单方向拥有的规则数量最好不超过20条,否则可能引起网络ACL性能下降。二层网关连接在公测期间默认只能创建1个二层连接网关。默认情况下,一个用户可以创建100个安全组。默认情况下,一个安全组最多只允许拥有50条安全组规则。默认情况下,一个云服务器或扩展网卡建议选择安全组华为云最佳实践,…

    2022年10月4日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号