springboot集成hadoop实战

springboot集成hadoop实战springboot 集成 hadoop 实现 hdfs 增删改查 maven 坐标 dependency groupId org apache hadoop groupId artifactId hadoop common artifactId version hadoop version version lt dependency

springboot集成hadoop实现hdfs增删改查

maven坐标

 <dependency> <groupId>org.apache.hadoop 
     groupId> <artifactId>hadoop-common 
      artifactId> <version>${hadoop.version} 
       version>  
        dependency> <dependency> <groupId>org.apache.hadoop 
         groupId> <artifactId>hadoop-streaming 
          artifactId> <version>${hadoop.version} 
           version>  
            dependency> <dependency> <groupId>org.apache.hadoop 
             groupId> <artifactId>hadoop-yarn-common 
              artifactId> <version>${hadoop.version} 
               version> <exclusions> <exclusion> <groupId>com.google.guava 
                groupId> <artifactId>guava 
                 artifactId>  
                  exclusion>  
                   exclusions>  
                    dependency> <dependency> <groupId>org.apache.hadoop 
                     groupId> <artifactId>hadoop-distcp 
                      artifactId> <version>${hadoop.version} 
                       version> <scope>provided 
                        scope>  
                         dependency> <dependency> <groupId>org.apache.hadoop 
                          groupId> <artifactId>hadoop-mapreduce-client-core 
                           artifactId> <version>${hadoop.version} 
                            version>  
                             dependency> <dependency> <groupId>org.apache.hadoop 
                              groupId> <artifactId>hadoop-hdfs 
                               artifactId> <version>${hadoop.version} 
                                version>  
                                 dependency> <dependency> <groupId>org.apache.hadoop 
                                  groupId> <artifactId>hadoop-mapreduce-client-jobclient 
                                   artifactId> <version>${hadoop.version} 
                                    version> <scope>provided 
                                     scope>  
                                      dependency>  
                                      <dependency> <groupId>cn.bestwu 
                                       groupId> <artifactId>ik-analyzers 
                                        artifactId> <version>5.1.0 
                                         version>  
                                          dependency> 

配置

hdfs的配置

hdfs: hdfsPath: hdfs://bigdata-master:8020 hdfsName: bigdata-master 

将fileSystem配置并注册到spring容器

@Slf4j @Configuration public class HadoopHDFSConfiguration { 
    @Value("${hdfs.hdfsPath}") private String hdfsPath; @Value("${hdfs.hdfsName}") private String hdfsName; @Bean public org.apache.hadoop.conf.Configuration getConfiguration(){ 
    org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); configuration.set("fs.defaultFS", hdfsPath); return configuration; } @Bean public FileSystem getFileSystem(){ 
    FileSystem fileSystem = null; try { 
    fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName); } catch (IOException e) { 
    // TODO Auto-generated catch block log.error(e.getMessage()); } catch (InterruptedException e) { 
    // TODO Auto-generated catch block log.error(e.getMessage()); } catch (URISyntaxException e) { 
    // TODO Auto-generated catch block log.error(e.getMessage()); } return fileSystem; } } 

增删改查

public interface HDFSService { 
    // 创建文件夹 boolean makeFolder(String path); // 是否存在文件 boolean existFile(String path); List<Map<String, Object>> readCatalog(String path); boolean createFile(String path, MultipartFile file); String readFileContent(String path); List<Map<String, Object>> listFile(String path); boolean renameFile(String oldName, String newName); boolean deleteFile(String path); boolean uploadFile(String path, String uploadPath); boolean downloadFile(String path, String downloadPath); boolean copyFile(String sourcePath, String targetPath); byte[] openFileToBytes(String path); BlockLocation[] getFileBlockLocations(String path); } 
@Slf4j @Service public class HDFSServiceImpl implements HDFSService { 
    private static final int bufferSize = 1024 * 1024 * 64; @Autowired private FileSystem fileSystem; @Override public boolean makeFolder(String path) { 
    boolean target = false; if (StringUtils.isEmpty(path)) { 
    return false; } if (existFile(path)) { 
    return true; } Path src = new Path(path); try { 
    target = fileSystem.mkdirs(src); } catch (IOException e) { 
    log.error(e.getMessage()); } return target; } @Override public boolean existFile(String path) { 
    if (StringUtils.isEmpty(path)){ 
    return false; } Path src = new Path(path); try { 
    return fileSystem.exists(src); } catch (IOException e) { 
    log.error(e.getMessage()); } return false; } @Override public List<Map<String, Object>> readCatalog(String path) { 
    if (StringUtils.isEmpty(path)){ 
    return Collections.emptyList(); } if (!existFile(path)){ 
    log.error("catalog is not exist!!"); return Collections.emptyList(); } Path src = new Path(path); FileStatus[] fileStatuses = null; try { 
    fileStatuses = fileSystem.listStatus(src); } catch (IOException e) { 
    log.error(e.getMessage()); } List<Map<String, Object>> result = new ArrayList<>(fileStatuses.length); if (null != fileStatuses && 0 < fileStatuses.length) { 
    for (FileStatus fileStatus : fileStatuses) { 
    Map<String, Object> cataLogMap = new HashMap<>(); cataLogMap.put("filePath", fileStatus.getPath()); cataLogMap.put("fileStatus", fileStatus); result.add(cataLogMap); } } return result; } @Override public boolean createFile(String path, MultipartFile file) { 
    boolean target = false; if (StringUtils.isEmpty(path)) { 
    return false; } String fileName = file.getName(); Path newPath = new Path(path + "/" + fileName); FSDataOutputStream outputStream = null; try { 
    outputStream = fileSystem.create(newPath); outputStream.write(file.getBytes()); target = true; } catch (IOException e) { 
    log.error(e.getMessage()); } finally { 
    if (null != outputStream) { 
    try { 
    outputStream.close(); } catch (IOException e) { 
    log.error(e.getMessage()); } } } return target; } @Override public String readFileContent(String path) { 
    if (StringUtils.isEmpty(path)){ 
    return null; } if (!existFile(path)) { 
    return null; } Path src = new Path(path); FSDataInputStream inputStream = null; StringBuilder sb = new StringBuilder(); try { 
    inputStream = fileSystem.open(src); String lineText = ""; while ((lineText = inputStream.readLine()) != null) { 
    sb.append(lineText); } } catch (IOException e) { 
    log.error(e.getMessage()); } finally { 
    if (null != inputStream) { 
    try { 
    inputStream.close(); } catch (IOException e) { 
    log.error(e.getMessage()); } } } return sb.toString(); } @Override public List<Map<String, Object>> listFile(String path) { 
    if (StringUtils.isEmpty(path)) { 
    return Collections.emptyList(); } if (!existFile(path)) { 
    return Collections.emptyList(); } List<Map<String,Object>> resultList = new ArrayList<>(); Path src = new Path(path); try { 
    RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(src, true); while (fileIterator.hasNext()) { 
    LocatedFileStatus next = fileIterator.next(); Path filePath = next.getPath(); String fileName = filePath.getName(); Map<String, Object> map = new HashMap<>(); map.put("fileName", fileName); map.put("filePath", filePath.toString()); resultList.add(map); } } catch (IOException e) { 
    log.error(e.getMessage()); } return resultList; } @Override public boolean renameFile(String oldName, String newName) { 
    boolean target = false; if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) { 
    return false; } Path oldPath = new Path(oldName); Path newPath = new Path(newName); try { 
    target = fileSystem.rename(oldPath, newPath); } catch (IOException e) { 
    log.error(e.getMessage()); } return target; } @Override public boolean deleteFile(String path) { 
    boolean target = false; if (StringUtils.isEmpty(path)) { 
    return false; } if (!existFile(path)) { 
    return false; } Path src = new Path(path); try { 
    target = fileSystem.deleteOnExit(src); } catch (IOException e) { 
    log.error(e.getMessage()); } return target; } @Override public boolean uploadFile(String path, String uploadPath) { 
    if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) { 
    return false; } Path clientPath = new Path(path); Path serverPath = new Path(uploadPath); try { 
    fileSystem.copyFromLocalFile(false,clientPath,serverPath); return true; } catch (IOException e) { 
    log.error(e.getMessage(), e); } return false; } @Override public boolean downloadFile(String path, String downloadPath) { 
    if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) { 
    return false; } Path clienPath = new Path(path); Path targetPath = new Path(downloadPath); try { 
    fileSystem.copyToLocalFile(false,clienPath, targetPath); return true; } catch (IOException e) { 
    log.error(e.getMessage()); } return false; } @Override public boolean copyFile(String sourcePath, String targetPath) { 
    if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) { 
    return false; } Path oldPath = new Path(sourcePath); Path newPath = new Path(targetPath); FSDataInputStream inputStream = null; FSDataOutputStream outputStream = null; try { 
    inputStream = fileSystem.open(oldPath); outputStream = fileSystem.create(newPath); IOUtils.copyBytes(inputStream,outputStream,bufferSize,false); return true; } catch (IOException e) { 
    log.error(e.getMessage()); } finally { 
    if (null != inputStream) { 
    try { 
    inputStream.close(); } catch (IOException e) { 
    log.error(e.getMessage()); } } if (null != outputStream) { 
    try { 
    outputStream.close(); } catch (IOException e) { 
    log.error(e.getMessage()); } } } return false; } @Override public byte[] openFileToBytes(String path) { 
    if (StringUtils.isEmpty(path)) { 
    return null; } if (!existFile(path)) { 
    return null; } Path src = new Path(path); byte[] result = null; FSDataInputStream inputStream = null; try { 
    inputStream = fileSystem.open(src); result = IOUtils.readFullyToByteArray(inputStream); } catch (IOException e) { 
    log.error(e.getMessage()); } finally { 
    if (null != inputStream){ 
    try { 
    inputStream.close(); } catch (IOException e) { 
    log.error(e.getMessage()); } } } return result; } @Override public BlockLocation[] getFileBlockLocations(String path) { 
    if (StringUtils.isEmpty(path)) { 
    return null; } if (!existFile(path)) { 
    return null; } BlockLocation[] blocks = null; Path src = new Path(path); try{ 
    FileStatus fileStatus = fileSystem.getFileStatus(src); blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); }catch(Exception e){ 
    log.error(e.getMessage()); } return blocks; } } 

mapReduce

package com.winterchen.hadoopdemo.reduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; import java.util.List; /* * 继承Reducer类需要定义四个输出、输出类型泛型: * 四个泛型类型分别代表: * KeyIn Reducer的输入数据的Key,这里是每行文字中的单词"hello" * ValueIn Reducer的输入数据的Value,这里是每行文字中的次数 * KeyOut Reducer的输出数据的Key,这里是每行文字中的单词"hello" * ValueOut Reducer的输出数据的Value,这里是每行文字中的出现的总次数 */ public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> { 
    private IntWritable result = new IntWritable(); private List<String> textList = new ArrayList<>(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 
    int sum = 0; for (IntWritable val : values) { 
    sum += val.get(); } result.set(sum); context.write(key, result); String keyStr = key.toString(); // 使用分词器,内容已经被统计好了,直接输出即可 if (textList.contains(keyStr)) { 
    System.out.println("============ " + keyStr + " 统计分词为: " + sum + " ============"); } } } 
package com.winterchen.hadoopdemo.configuration; import com.winterchen.hadoopdemo.HadoopDemoApplication; import com.winterchen.hadoopdemo.mapper.WordMapper; import com.winterchen.hadoopdemo.reduce.WordReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; @Component public class ReduceJobsConfiguration { 
    @Value("${hdfs.hdfsPath}") private String hdfsPath; / * 获取HDFS配置信息 * * @return */ public Configuration getConfiguration() { 
    Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", hdfsPath); configuration.set("mapred.job.tracker", hdfsPath); return configuration; } / * 获取单词统计的配置信息 * * @param jobName * @param inputPath * @param outputPath * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public void getWordCountJobsConf(String jobName, String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException { 
    Configuration conf = getConfiguration(); Job job = Job.getInstance(conf, jobName); job.setMapperClass(WordMapper.class); job.setCombinerClass(WordReduce.class); job.setJarByClass(HadoopDemoApplication.class); job.setReducerClass(WordReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); job.waitForCompletion(true); } @PostConstruct public void getPath() { 
    hdfsPath = this.hdfsPath; } public String getHdfsPath() { 
    return hdfsPath; } } 
public interface MapReduceService { 
    void wordCount(String jobName, String inputPath, String outputPath) throws Exception; } 
package com.winterchen.hadoopdemo.service.impl; import com.winterchen.hadoopdemo.configuration.ReduceJobsConfiguration; import com.winterchen.hadoopdemo.service.HDFSService; import com.winterchen.hadoopdemo.service.MapReduceService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; @Service public class MapReduceServiceImpl implements MapReduceService { 
    @Autowired private HDFSService hdfsService; @Autowired private ReduceJobsConfiguration reduceJobsConfiguration; @Override public void wordCount(String jobName, String inputPath, String outputPath) throws Exception { 
    if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { 
    return; } // 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的 if (hdfsService.existFile(outputPath)) { 
    hdfsService.deleteFile(outputPath); } reduceJobsConfiguration.getWordCountJobsConf(jobName, inputPath, outputPath); } } 
package com.winterchen.hadoopdemo.service.impl; import com.winterchen.hadoopdemo.configuration.ReduceJobsConfiguration; import com.winterchen.hadoopdemo.service.HDFSService; import com.winterchen.hadoopdemo.service.MapReduceService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; @Service public class MapReduceServiceImpl implements MapReduceService { 
    @Autowired private HDFSService hdfsService; @Autowired private ReduceJobsConfiguration reduceJobsConfiguration; @Override public void wordCount(String jobName, String inputPath, String outputPath) throws Exception { 
    if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) { 
    return; } // 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的 if (hdfsService.existFile(outputPath)) { 
    hdfsService.deleteFile(outputPath); } reduceJobsConfiguration.getWordCountJobsConf(jobName, inputPath, outputPath); } } 
@Slf4j @Api(tags = "map reduce api") @RestController @RequestMapping("/api/v1/map-reduce") public class MapReduceController { 
    @Autowired private MapReduceService mapReduceService; @ApiOperation("count word") @PostMapping("/word/count") public APIResponse wordCount( @ApiParam(name = "jobName", required = true) @RequestParam(name = "jobName", required = true) String jobName, @ApiParam(name = "inputPath", required = true) @RequestParam(name = "inputPath", required = true) String inputPath, @ApiParam(name = "outputPath", required = true) @RequestParam(name = "outputPath", required = true) String outputPath ){ 
    try { 
    mapReduceService.wordCount(jobName, inputPath, outputPath); return APIResponse.success(); } catch (Exception e) { 
    log.error(e.getMessage()); return APIResponse.fail(e.getMessage()); } } } 

以上就是日常开发中能使用到的基本的功能:hdfs的增删改查,以及MapReduce;

源码地址:

WinterChenS/springboot-learning-experience

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/229469.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月16日 下午4:39
下一篇 2026年3月16日 下午4:39


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号