springboot集成hadoop实现hdfs增删改查
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-streaming</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<!-- 中文分词器 -->
<dependency>
<groupId>cn.bestwu</groupId>
<artifactId>ik-analyzers</artifactId>
<version>5.1.0</version>
</dependency>
hdfs的配置
hdfs:
hdfsPath: hdfs://bigdata-master:8020
hdfsName: bigdata-master
将fileSystem配置并注册到spring容器
@Slf4j
@Configuration
public class HadoopHDFSConfiguration {
@Value("${hdfs.hdfsPath}")
private String hdfsPath;
@Value("${hdfs.hdfsName}")
private String hdfsName;
@Bean
public org.apache.hadoop.conf.Configuration getConfiguration(){
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
configuration.set("fs.defaultFS", hdfsPath);
return configuration;
}
@Bean
public FileSystem getFileSystem(){
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
} catch (IOException e) {
// TODO Auto-generated catch block
log.error(e.getMessage());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
log.error(e.getMessage());
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
log.error(e.getMessage());
}
return fileSystem;
}
}
public interface HDFSService {
// 创建文件夹
boolean makeFolder(String path);
// 是否存在文件
boolean existFile(String path);
List<Map<String, Object>> readCatalog(String path);
boolean createFile(String path, MultipartFile file);
String readFileContent(String path);
List<Map<String, Object>> listFile(String path);
boolean renameFile(String oldName, String newName);
boolean deleteFile(String path);
boolean uploadFile(String path, String uploadPath);
boolean downloadFile(String path, String downloadPath);
boolean copyFile(String sourcePath, String targetPath);
byte[] openFileToBytes(String path);
BlockLocation[] getFileBlockLocations(String path);
}
@Slf4j
@Service
public class HDFSServiceImpl implements HDFSService {
private static final int bufferSize = 1024 * 1024 * 64;
@Autowired
private FileSystem fileSystem;
@Override
public boolean makeFolder(String path) {
boolean target = false;
if (StringUtils.isEmpty(path)) {
return false;
}
if (existFile(path)) {
return true;
}
Path src = new Path(path);
try {
target = fileSystem.mkdirs(src);
} catch (IOException e) {
log.error(e.getMessage());
}
return target;
}
@Override
public boolean existFile(String path) {
if (StringUtils.isEmpty(path)){
return false;
}
Path src = new Path(path);
try {
return fileSystem.exists(src);
} catch (IOException e) {
log.error(e.getMessage());
}
return false;
}
@Override
public List<Map<String, Object>> readCatalog(String path) {
if (StringUtils.isEmpty(path)){
return Collections.emptyList();
}
if (!existFile(path)){
log.error("catalog is not exist!!");
return Collections.emptyList();
}
Path src = new Path(path);
FileStatus[] fileStatuses = null;
try {
fileStatuses = fileSystem.listStatus(src);
} catch (IOException e) {
log.error(e.getMessage());
}
List<Map<String, Object>> result = new ArrayList<>(fileStatuses.length);
if (null != fileStatuses && 0 < fileStatuses.length) {
for (FileStatus fileStatus : fileStatuses) {
Map<String, Object> cataLogMap = new HashMap<>();
cataLogMap.put("filePath", fileStatus.getPath());
cataLogMap.put("fileStatus", fileStatus);
result.add(cataLogMap);
}
}
return result;
}
@Override
public boolean createFile(String path, MultipartFile file) {
boolean target = false;
if (StringUtils.isEmpty(path)) {
return false;
}
String fileName = file.getName();
Path newPath = new Path(path + "/" + fileName);
FSDataOutputStream outputStream = null;
try {
outputStream = fileSystem.create(newPath);
outputStream.write(file.getBytes());
target = true;
} catch (IOException e) {
log.error(e.getMessage());
} finally {
if (null != outputStream) {
try {
outputStream.close();
} catch (IOException e) {
log.error(e.getMessage());
}
}
}
return target;
}
@Override
public String readFileContent(String path) {
if (StringUtils.isEmpty(path)){
return null;
}
if (!existFile(path)) {
return null;
}
Path src = new Path(path);
FSDataInputStream inputStream = null;
StringBuilder sb = new StringBuilder();
try {
inputStream = fileSystem.open(src);
String lineText = "";
while ((lineText = inputStream.readLine()) != null) {
sb.append(lineText);
}
} catch (IOException e) {
log.error(e.getMessage());
} finally {
if (null != inputStream) {
try {
inputStream.close();
} catch (IOException e) {
log.error(e.getMessage());
}
}
}
return sb.toString();
}
@Override
public List<Map<String, Object>> listFile(String path) {
if (StringUtils.isEmpty(path)) {
return Collections.emptyList();
}
if (!existFile(path)) {
return Collections.emptyList();
}
List<Map<String,Object>> resultList = new ArrayList<>();
Path src = new Path(path);
try {
RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(src, true);
while (fileIterator.hasNext()) {
LocatedFileStatus next = fileIterator.next();
Path filePath = next.getPath();
String fileName = filePath.getName();
Map<String, Object> map = new HashMap<>();
map.put("fileName", fileName);
map.put("filePath", filePath.toString());
resultList.add(map);
}
} catch (IOException e) {
log.error(e.getMessage());
}
return resultList;
}
@Override
public boolean renameFile(String oldName, String newName) {
boolean target = false;
if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
return false;
}
Path oldPath = new Path(oldName);
Path newPath = new Path(newName);
try {
target = fileSystem.rename(oldPath, newPath);
} catch (IOException e) {
log.error(e.getMessage());
}
return target;
}
@Override
public boolean deleteFile(String path) {
boolean target = false;
if (StringUtils.isEmpty(path)) {
return false;
}
if (!existFile(path)) {
return false;
}
Path src = new Path(path);
try {
target = fileSystem.deleteOnExit(src);
} catch (IOException e) {
log.error(e.getMessage());
}
return target;
}
@Override
public boolean uploadFile(String path, String uploadPath) {
if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {
return false;
}
Path clientPath = new Path(path);
Path serverPath = new Path(uploadPath);
try {
fileSystem.copyFromLocalFile(false,clientPath,serverPath);
return true;
} catch (IOException e) {
log.error(e.getMessage(), e);
}
return false;
}
@Override
public boolean downloadFile(String path, String downloadPath) {
if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {
return false;
}
Path clienPath = new Path(path);
Path targetPath = new Path(downloadPath);
try {
fileSystem.copyToLocalFile(false,clienPath, targetPath);
return true;
} catch (IOException e) {
log.error(e.getMessage());
}
return false;
}
@Override
public boolean copyFile(String sourcePath, String targetPath) {
if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {
return false;
}
Path oldPath = new Path(sourcePath);
Path newPath = new Path(targetPath);
FSDataInputStream inputStream = null;
FSDataOutputStream outputStream = null;
try {
inputStream = fileSystem.open(oldPath);
outputStream = fileSystem.create(newPath);
IOUtils.copyBytes(inputStream,outputStream,bufferSize,false);
return true;
} catch (IOException e) {
log.error(e.getMessage());
} finally {
if (null != inputStream) {
try {
inputStream.close();
} catch (IOException e) {
log.error(e.getMessage());
}
}
if (null != outputStream) {
try {
outputStream.close();
} catch (IOException e) {
log.error(e.getMessage());
}
}
}
return false;
}
@Override
public byte[] openFileToBytes(String path) {
if (StringUtils.isEmpty(path)) {
return null;
}
if (!existFile(path)) {
return null;
}
Path src = new Path(path);
byte[] result = null;
FSDataInputStream inputStream = null;
try {
inputStream = fileSystem.open(src);
result = IOUtils.readFullyToByteArray(inputStream);
} catch (IOException e) {
log.error(e.getMessage());
} finally {
if (null != inputStream){
try {
inputStream.close();
} catch (IOException e) {
log.error(e.getMessage());
}
}
}
return result;
}
@Override
public BlockLocation[] getFileBlockLocations(String path) {
if (StringUtils.isEmpty(path)) {
return null;
}
if (!existFile(path)) {
return null;
}
BlockLocation[] blocks = null;
Path src = new Path(path);
try{
FileStatus fileStatus = fileSystem.getFileStatus(src);
blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
}catch(Exception e){
log.error(e.getMessage());
}
return blocks;
}
}
package com.winterchen.hadoopdemo.reduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/*
* 继承Reducer类需要定义四个输出、输出类型泛型:
* 四个泛型类型分别代表:
* KeyIn Reducer的输入数据的Key,这里是每行文字中的单词"hello"
* ValueIn Reducer的输入数据的Value,这里是每行文字中的次数
* KeyOut Reducer的输出数据的Key,这里是每行文字中的单词"hello"
* ValueOut Reducer的输出数据的Value,这里是每行文字中的出现的总次数
*/
public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
private List<String> textList = new ArrayList<>();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
String keyStr = key.toString();
// 使用分词器,内容已经被统计好了,直接输出即可
if (textList.contains(keyStr)) {
System.out.println("============ " + keyStr + " 统计分词为: " + sum + " ============");
}
}
}
package com.winterchen.hadoopdemo.configuration;
import com.winterchen.hadoopdemo.HadoopDemoApplication;
import com.winterchen.hadoopdemo.mapper.WordMapper;
import com.winterchen.hadoopdemo.reduce.WordReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
@Component
public class ReduceJobsConfiguration {
@Value("${hdfs.hdfsPath}")
private String hdfsPath;
/**
* 获取HDFS配置信息
*
* @return
*/
public Configuration getConfiguration() {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", hdfsPath);
configuration.set("mapred.job.tracker", hdfsPath);
return configuration;
}
/**
* 获取单词统计的配置信息
*
* @param jobName
* @param inputPath
* @param outputPath
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public void getWordCountJobsConf(String jobName, String inputPath, String outputPath)
throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = getConfiguration();
Job job = Job.getInstance(conf, jobName);
job.setMapperClass(WordMapper.class);
job.setCombinerClass(WordReduce.class);
job.setJarByClass(HadoopDemoApplication.class);
job.setReducerClass(WordReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
@PostConstruct
public void getPath() {
hdfsPath = this.hdfsPath;
}
public String getHdfsPath() {
return hdfsPath;
}
}
public interface MapReduceService {
void wordCount(String jobName, String inputPath, String outputPath) throws Exception;
}
package com.winterchen.hadoopdemo.service.impl;
import com.winterchen.hadoopdemo.configuration.ReduceJobsConfiguration;
import com.winterchen.hadoopdemo.service.HDFSService;
import com.winterchen.hadoopdemo.service.MapReduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
@Service
public class MapReduceServiceImpl implements MapReduceService {
@Autowired
private HDFSService hdfsService;
@Autowired
private ReduceJobsConfiguration reduceJobsConfiguration;
@Override
public void wordCount(String jobName, String inputPath, String outputPath) throws Exception {
if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
return;
}
// 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的
if (hdfsService.existFile(outputPath)) {
hdfsService.deleteFile(outputPath);
}
reduceJobsConfiguration.getWordCountJobsConf(jobName, inputPath, outputPath);
}
}
package com.winterchen.hadoopdemo.service.impl;
import com.winterchen.hadoopdemo.configuration.ReduceJobsConfiguration;
import com.winterchen.hadoopdemo.service.HDFSService;
import com.winterchen.hadoopdemo.service.MapReduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
@Service
public class MapReduceServiceImpl implements MapReduceService {
@Autowired
private HDFSService hdfsService;
@Autowired
private ReduceJobsConfiguration reduceJobsConfiguration;
@Override
public void wordCount(String jobName, String inputPath, String outputPath) throws Exception {
if (StringUtils.isEmpty(jobName) || StringUtils.isEmpty(inputPath)) {
return;
}
// 输出目录 = output/当前Job,如果输出路径存在则删除,保证每次都是最新的
if (hdfsService.existFile(outputPath)) {
hdfsService.deleteFile(outputPath);
}
reduceJobsConfiguration.getWordCountJobsConf(jobName, inputPath, outputPath);
}
}
@Slf4j
@Api(tags = "map reduce api")
@RestController
@RequestMapping("/api/v1/map-reduce")
public class MapReduceController {
@Autowired
private MapReduceService mapReduceService;
@ApiOperation("count word")
@PostMapping("/word/count")
public APIResponse wordCount(
@ApiParam(name = "jobName", required = true)
@RequestParam(name = "jobName", required = true)
String jobName,
@ApiParam(name = "inputPath", required = true)
@RequestParam(name = "inputPath", required = true)
String inputPath,
@ApiParam(name = "outputPath", required = true)
@RequestParam(name = "outputPath", required = true)
String outputPath
){
try {
mapReduceService.wordCount(jobName, inputPath, outputPath);
return APIResponse.success();
} catch (Exception e) {
log.error(e.getMessage());
return APIResponse.fail(e.getMessage());
}
}
}
以上就是日常开发中能使用到的基本的功能:hdfs的增删改查,以及MapReduce;
源码地址: