当前位置: 首页 > 工具软件 > Spring Hadoop > 使用案例 >

springboot集成hadoop实战

法弘亮
2023-12-01

springboot集成hadoop实现hdfs增删改查

maven坐标

				<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-streaming</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-yarn-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-distcp</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
        </dependency>

				<!-- 中文分词器 -->
        <dependency>
            <groupId>cn.bestwu</groupId>
            <artifactId>ik-analyzers</artifactId>
            <version>5.1.0</version>
        </dependency>

配置

hdfs的配置

hdfs:
  hdfsPath: hdfs://bigdata-master:8020
  hdfsName: bigdata-master

将fileSystem配置并注册到spring容器

@Slf4j
@Configuration
public class HadoopHDFSConfiguration {

    @Value("${hdfs.hdfsPath}")
    private String hdfsPath;
    @Value("${hdfs.hdfsName}")
    private String hdfsName;

    @Bean
    public org.apache.hadoop.conf.Configuration  getConfiguration(){
        org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
        configuration.set("fs.defaultFS", hdfsPath);
        return configuration;
    }

    @Bean
    public FileSystem getFileSystem(){
        FileSystem fileSystem = null;
        try {
            fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            log.error(e.getMessage());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            log.error(e.getMessage());
        } catch (URISyntaxException e) {
            // TODO Auto-generated catch block
            log.error(e.getMessage());
        }
        return fileSystem;
    }

}

增删改查

public interface HDFSService {

		// 创建文件夹
    boolean makeFolder(String path);
		// 是否存在文件
    boolean existFile(String path);
		
    List<Map<String, Object>> readCatalog(String path);

    boolean createFile(String path, MultipartFile file);

    String readFileContent(String path);

    List<Map<String, Object>> listFile(String path);

    boolean renameFile(String oldName, String newName);

    boolean deleteFile(String path);

    boolean uploadFile(String path, String uploadPath);

    boolean downloadFile(String path, String downloadPath);

    boolean copyFile(String sourcePath, String targetPath);

    byte[] openFileToBytes(String path);

    BlockLocation[] getFileBlockLocations(String path);

}
@Slf4j
@Service
public class HDFSServiceImpl implements HDFSService {

    private static final int bufferSize = 1024 * 1024 * 64;

    @Autowired
    private FileSystem fileSystem;

    @Override
    public boolean makeFolder(String path) {
        boolean target = false;
        if (StringUtils.isEmpty(path)) {
            return false;
        }
        if (existFile(path)) {
            return true;
        }
        Path src = new Path(path);
        try {
            target = fileSystem.mkdirs(src);
        } catch (IOException e) {
            log.error(e.getMessage());
        }
        return target;
    }

    @Override
    public boolean existFile(String path) {
        if (StringUtils.isEmpty(path)){
            return false;
        }
        Path src = new Path(path);
        try {
            return fileSystem.exists(src);
        } catch (IOException e) {
            log.error(e.getMessage());
        }
        return false;
    }

    @Override
    public List<Map<String, Object>> readCatalog(String path) {
        if (StringUtils.isEmpty(path)){
            return Collections.emptyList();
        }
        if (!existFile(path)){
            log.error("catalog is not exist!!");
            return Collections.emptyList();
        }

        Path src = new Path(path);
        FileStatus[] fileStatuses = null;
        try {
            fileStatuses = fileSystem.listStatus(src);
        } catch (IOException e) {
            log.error(e.getMessage());
        }
        List<Map<String, Object>> result = new ArrayList<>(fileStatuses.length);

        if (null != fileStatuses && 0 < fileStatuses.length) {
            for (FileStatus fileStatus : fileStatuses) {
                Map<String, Object> cataLogMap = new HashMap<>();
                cataLogMap.put("filePath", fileStatus.getPath());
                cataLogMap.put("fileStatus", fileStatus);
                result.add(cataLogMap);
            }
        }
        return result;
    }

    @Override
    public boolean createFile(String path, MultipartFile file) {
        boolean target = false;
        if (StringUtils.isEmpty(path)) {
            return false;
        }
        String fileName = file.getName();
        Path newPath = new Path(path + "/" + fileName);

        FSDataOutputStream outputStream = null;
        try {
            outputStream = fileSystem.create(newPath);
            outputStream.write(file.getBytes());
            target = true;
        } catch (IOException e) {
            log.error(e.getMessage());
        } finally {
            if (null != outputStream) {
                try {
                    outputStream.close();
                } catch (IOException e) {
                    log.error(e.getMessage());
                }
            }
        }
        return target;
    }

    @Override
    public String readFileContent(String path) {
        if (StringUtils.isEmpty(path)){
            return null;
        }

        if (!existFile(path)) {
            return null;
        }

        Path src = new Path(path);

        FSDataInputStream inputStream = null;
        StringBuilder sb = new StringBuilder();
        try {
            inputStream = fileSystem.open(src);
            String lineText = "";
            while ((lineText = inputStream.readLine()) != null) {
                sb.append(lineText);
            }
        } catch (IOException e) {
            log.error(e.getMessage());
        } finally {
            if (null != inputStream) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    log.error(e.getMessage());
                }
            }
        }
        return sb.toString();
    }

    @Override
    public List<Map<String, Object>> listFile(String path) {
        if (StringUtils.isEmpty(path)) {
            return Collections.emptyList();
        }
        if (!existFile(path)) {
            return Collections.emptyList();
        }
        List<Map<String,Object>> resultList = new ArrayList<>();

        Path src = new Path(path);
        try {
            RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(src, true);
            while (fileIterator.hasNext()) {
                LocatedFileStatus next = fileIterator.next();
                Path filePath = next.getPath();
                String fileName = filePath.getName();
                Map<String, Object> map = new HashMap<>();
                map.put("fileName", fileName);
                map.put("filePath", filePath.toString());
                resultList.add(map);
            }
        } catch (IOException e) {
            log.error(e.getMessage());
        }

        return resultList;
    }

    @Override
    public boolean renameFile(String oldName, String newName) {
        boolean target = false;
        if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
            return false;
        }
        Path oldPath = new Path(oldName);
        Path newPath = new Path(newName);
        try {
            target = fileSystem.rename(oldPath, newPath);
        } catch (IOException e) {
            log.error(e.getMessage());
        }

        return target;
    }

    @Override
    public boolean deleteFile(String path) {
        boolean target = false;
        if (StringUtils.isEmpty(path)) {
            return false;
        }
        if (!existFile(path)) {
            return false;
        }
        Path src = new Path(path);
        try {
            target = fileSystem.deleteOnExit(src);
        } catch (IOException e) {
            log.error(e.getMessage());
        }
        return target;
    }

    @Override
    public boolean uploadFile(String path, String uploadPath) {
        if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {
            return false;
        }

        Path clientPath = new Path(path);

        Path serverPath = new Path(uploadPath);

        try {
            fileSystem.copyFromLocalFile(false,clientPath,serverPath);
            return true;
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
        return false;
    }

    @Override
    public boolean downloadFile(String path, String downloadPath) {
        if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {
            return false;
        }

        Path clienPath = new Path(path);

        Path targetPath = new Path(downloadPath);

        try {
            fileSystem.copyToLocalFile(false,clienPath, targetPath);
            return true;
        } catch (IOException e) {
            log.error(e.getMessage());
        }
        return false;
    }

    @Override
    public boolean copyFile(String sourcePath, String targetPath) {
        if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {
            return false;
        }

        Path oldPath = new Path(sourcePath);

        Path newPath = new Path(targetPath);

        FSDataInputStream inputStream = null;
        FSDataOutputStream outputStream = null;

        try {
            inputStream = fileSystem.open(oldPath);
            outputStream = fileSystem.create(newPath);

            IOUtils.copyBytes(inputStream,outputStream,bufferSize,false);
            return true;
        } catch (IOException e) {
            log.error(e.getMessage());
        } finally {
            if (null != inputStream) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    log.error(e.getMessage());
                }
            }
            if (null != outputStream) {
                try {
                    outputStream.close();
                } catch (IOException e) {
                    log.error(e.getMessage());
                }
            }
        }
        return false;
    }

    @Override
    public byte[] openFileToBytes(String path) {

        if (StringUtils.isEmpty(path)) {
            return null;
        }

        if (!existFile(path)) {
            return null;
        }

        Path src = new Path(path);
        byte[] result = null;
        FSDataInputStream inputStream = null;
        try {
            inputStream = fileSystem.open(src);
            result = IOUtils.readFullyToByteArray(inputStream);
        } catch (IOException e) {
            log.error(e.getMessage());
        } finally {
            if (null != inputStream){
                try {
                    inputStream.close();
                } catch (IOException e) {
                    log.error(e.getMessage());
                }
            }
        }

        return result;
    }

    @Override
    public BlockLocation[] getFileBlockLocations(String path) {
        if (StringUtils.isEmpty(path)) {
            return null;
        }
        if (!existFile(path)) {
            return null;
        }
        BlockLocation[] blocks = null;
        Path src = new Path(path);
        try{
            FileStatus fileStatus = fileSystem.getFileStatus(src);
            blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
        }catch(Exception e){
            log.error(e.getMessage());
        }
        return blocks;
    }
}

mapReduce

package com.winterchen.hadoopdemo.reduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/*
 * 继承Reducer类需要定义四个输出、输出类型泛型:
 * 四个泛型类型分别代表:
 * KeyIn        Reducer的输入数据的Key,这里是每行文字中的单词"hello"
 * ValueIn      Reducer的输入数据的Value,这里是每行文字中的次数
 * KeyOut       Reducer的输出数据的Key,这里是每行文字中的单词"hello"
 * ValueOut     Reducer的输出数据的Value,这里是每行文字中的出现的总次数
 */
public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();
    private List<String> textList = new ArrayList<>();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);

        String keyStr = key.toString();

        // 使用分词器,内容已经被统计好了,直接输出即可
        if (textList.contains(keyStr)) {
            System.out.println("============ " + keyStr + " 统计分词为: " + sum + " ============");
        }
    }
}
package com.winterchen.hadoopdemo.configuration;

import com.winterchen.hadoopdemo.HadoopDemoApplication;
import com.winterchen.hadoopdemo.mapper.WordMapper;
import com.winterchen.hadoopdemo.reduce.WordReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;

@Component
public class ReduceJobsConfiguration {

    @Value("${hdfs.hdfsPath}")
    private String hdfsPath;

    /**
     * 获取HDFS配置信息
     *
     * @return
     */
    public Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", hdfsPath);
        configuration.set("mapred.job.tracker", hdfsPath);
        return configuration;
    }

    /**
     * 获取单词统计的配置信息
     *
     * @param jobName
     * @param inputPath
     * @param outputPath
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public void getWordCountJobsConf(String jobName, String inputPath, String outputPath)
            throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = getConfiguration();
        Job job = Job.getInstance(conf, jobName);

        job.setMapperClass(WordMapper.class);
        job.setCombinerClass(WordReduce.class);
        job.setJarByClass(HadoopDemoApplication.class);
        job.setReducerClass(WordReduce.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.waitForCompletion(true);
    }

    @PostConstruct
    public void getPath() {
        hdfsPath = this.hdfsPath;
    }

    public String getHdfsPath() {
        return hdfsPath;
    }
}
public interface MapReduceService {

    void wordCount(String jobName, String inputPath, String outputPath) throws Exception;

}
package com.winterchen.hadoopdemo.service.impl;

import com.winterchen.hadoopdemo.configuration.ReduceJobsConfiguration;
import com.winterchen.hadoopdemo.service.HDFSService;
import com.winterchen.hadoopdemo.service.MapReduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

@Service
public class MapReduceServiceImpl implements MapReduceService {

    @Autowired
    private HDFSService hdfsService;

    @Autowired
    private ReduceJobsConfiguration reduceJobsConfiguration;

    @Override
    public void wordCount(String jobName, String inputPath, String outputPath) throws Exception {
        if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
            return;
        }
        // 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的
        if (hdfsService.existFile(outputPath)) {
            hdfsService.deleteFile(outputPath);
        }
        reduceJobsConfiguration.getWordCountJobsConf(jobName, inputPath, outputPath);
    }
}
package com.winterchen.hadoopdemo.service.impl;

import com.winterchen.hadoopdemo.configuration.ReduceJobsConfiguration;
import com.winterchen.hadoopdemo.service.HDFSService;
import com.winterchen.hadoopdemo.service.MapReduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

@Service
public class MapReduceServiceImpl implements MapReduceService {

    @Autowired
    private HDFSService hdfsService;

    @Autowired
    private ReduceJobsConfiguration reduceJobsConfiguration;

    @Override
    public void wordCount(String jobName, String inputPath, String outputPath) throws Exception {
        if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
            return;
        }
        // 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的
        if (hdfsService.existFile(outputPath)) {
            hdfsService.deleteFile(outputPath);
        }
        reduceJobsConfiguration.getWordCountJobsConf(jobName, inputPath, outputPath);
    }
}
@Slf4j
@Api(tags = "map reduce api")
@RestController
@RequestMapping("/api/v1/map-reduce")
public class MapReduceController {

    @Autowired
    private MapReduceService mapReduceService;

    @ApiOperation("count word")
    @PostMapping("/word/count")
    public APIResponse wordCount(
            @ApiParam(name = "jobName", required = true)
            @RequestParam(name = "jobName", required = true)
            String jobName,
            @ApiParam(name = "inputPath", required = true)
            @RequestParam(name = "inputPath", required = true)
            String inputPath,
            @ApiParam(name = "outputPath", required = true)
            @RequestParam(name = "outputPath", required = true)
            String outputPath
    ){
        try {
            mapReduceService.wordCount(jobName, inputPath, outputPath);
            return APIResponse.success();
        } catch (Exception e) {
            log.error(e.getMessage());
            return APIResponse.fail(e.getMessage());
        }
    }
}

以上就是日常开发中能使用到的基本的功能:hdfs的增删改查,以及MapReduce;

源码地址:

WinterChenS/springboot-learning-experience

 类似资料: