FastDfs分布式文件系统集群搭建及Java客户端使用

范俊逸
2023-12-01

背景

最近的一个新闻类采集项目,客户要求,采集的新闻内容中的相关图片也要下载并将新闻内容中图片的“src”替换为下载后的路径。

为了方便图片的统一管理,客户建议将下载的图片存储至他们搭建的FastDfs。本文由此而来。

FastDfs介绍

FastDFS是一个开源高性能分布式文件系统。它的主要功能包括:文件存储、文件同步和文件访问(文件上传和文件下载),它可以解决高容量和负载平衡问题。FastDfs应满足基于照片共享网站和视频共享网站等文件的网站的要求。

FastDfs有两个角色:跟踪器和存储。

跟踪器负责文件访问的调度和负载平衡。

存储存储文件及其功能是文件管理,包括:文件存储、文件同步、提供文件访问接口。它还管理元数据,这些元数据是表示为文件的键值对的属性。例如:宽度=1024,键为"宽度",值为"1024"。

跟踪器和存储包含一个或多个服务器。跟踪器或存储群集中的服务器可以随时添加到群集或从群集中删除,而不会影响联机服务。跟踪器群集中的服务器是对等服务器。

按文件卷组织以获取高容量的存储服务器。存储系统包含一个或多个卷,其文件在这些卷中是独立的。整个存储系统的容量等于所有卷容量的总和。文件卷包含一个或多个存储服务器,这些服务器中的文件相同。文件卷中的服务器相互备份,所有这些服务器都是负载平衡。将存储服务器添加到卷时,此卷中已有的文件将自动复制到此新服务器,并且此复制完成后,系统将联机切换到此服务器以提供存储服务。

当整个存储容量不足时,可以添加一个或多个卷来扩展存储容量。为此,您需要添加一个或多个存储服务器。

文件的标识由两部分组成:卷名和文件名。

基于Docker搭建Tracker和Storage集群

主机用途
XXX.XXX.XXX.101Tracker1:22122,Storage1:22000
XXX.XXX.XXX.102Tracker2:22122,Storage2:22000
XXX.XXX.XXX.103Nginx负载均衡代理服务器22000

先在101和102服务器上拉取FastDfs镜像。

docker pull morunchang/fastdfs

101和102服务器上都需求做以下操作。

1. 配置Tracker

启动Tracker

docker run -d --name tracker --net=host \
-p 21000:21000 \
-p 22122:22122 \
morunchang/fastdfs sh tracker.sh

进入Tracker容器

docker exec -it tracker /bin/bash

配置 /etc/nginx/conf/nginx.conf

vim /etc/nginx/conf/nginx.conf

修改监听端口

listen 21000;

配置 /etc/fdfs/client.conf

vim /etc/fdfs/client.conf

检查tracker_server地址和Nginx端口

tracker_server=XXX.XXX.XXX.101:22122
tracker_server=XXX.XXX.XXX.102:22122
http.tracker_server_port=21000

配置 /etc/fdfs/tracker.conf

vim /etc/fdfs/tracker.conf

检查Tracker服务端口和Nginx端口

port=22122
http.server_port=21000

重启Tracker容器

docker restart tracker

2. 配置Storage

创建挂载目录

mkdir /home/fastdfs

启动Storage容器

docker run -d --name storage \
--net=host \
-p 22000:22000 \
-p 23000:23000 \
-e TRACKER_IP=XXX.XXX.XXX.101:22122,XXX.XXX.XXX.102:22122 \
-e GROUP_NAME=group1 \
-v /home/fastdfs:/data/fast_data \
morunchang/fastdfs sh storage.sh

进入容器

docker exec -it storage /bin/bash

配置 /etc/nginx/conf/nginx.conf文件

vim /etc/nginx/conf/nginx.conf

修改监听端口

listen 22000;

配置 /etc/fdfs/client.conf

vim /etc/fdfs/client.conf

配置tracker_server地址及端口

tracker_server=XXX.XXX.XXX.101:22122
tracker_server=XXX.XXX.XXX.102:22122
http.tracker_server_port=21000

配置 /etc/fdfs/storage.conf

vim /etc/fdfs/storage.conf

检查Storage的端口和tracker_server地址以及Nginx端口。

port=23000
tracker_server=XXX.XXX.XXX.101:22122
tracker_server=XXX.XXX.XXX.102:22122
http.server_port=22000

配置 /etc/fdfs/mod_fastdfs.conf

vim /etc/fdfs/mod_fastdfs.conf

检查tracker_server地址

tracker_server=XXX.XXX.XXX.101:22122
tracker_server=XXX.XXX.XXX.102:22122

重启Storage容器

docker restart storage

释放端口

firewall-cmd --zone=public --add-port=21000/tcp --permanent
firewall-cmd --zone=public --add-port=22000/tcp --permanent
firewall-cmd --zone=public --add-port=22122/tcp --permanent
firewall-cmd --zone=public --add-port=23000/tcp --permanent
firewall-cmd --reload

配置反向代理服务器

上面搭建了两个节点的Tracker,在下载文件时,两个IP都可以下载文件,在此最好借助Nginx,反向代理,进行内容分发。

下面在103服务器上安装Nginx。

配置 nginx.conf

 upstream backserver { 
     server XXX.XXX.XXX.101:22000; 
     server XXX.XXX.XXX.102:22000; 
    } 
    
     server {
        listen 22000;
        server_name XXX.XXX.XXX.103;
        location {
           proxy_pass http://backserver;
           index index.html index.htm;
        }
    }

重启Nginx即可生效。

搭建Java客户端

1.下载Java客户端源码

github:https://github.com/happyfish100/fastdfs-client-java

2.进入项目将项目Install进Maven中

mvn clean install

3.或者打包为Jar,再打到Maven中

mvn clean package
mvn install:install-file-Dfile= E:/Optimize/FastDfs/fastdfs-client-java-master

SpringBoot项目测试

1.pom

<dependency>
    <groupId>org.csource</groupId>
    <artifactId>fastdfs-client-java</artifactId>
    <version>1.29-SNAPSHOT</version>
</dependency>

2.在resources下,新建fdfs_client.conf

connect_timeout = 60
network_timeout = 60
charset = UTF-8
http.tracker_http_port = 22000
http.anti_steal_token = no
http.secret_key = 123456
tracker_server = XXX.XXX.XXX.101:22122
tracker_server = XXX.XXX.XXX.102:22122
connection_pool.enabled = true
connection_pool.max_count_per_entry = 500
connection_pool.max_idle_time = 3600
connection_pool.max_wait_time_in_ms = 1000

3.application.yml

server:
  port: 80

spring:
  servlet:
    multipart:
      max-file-size: 900MB
      max-request-size: 900MB

4.FastDFSFile.java

public class FastDFSFile {
    private String name;
    private byte[] content;
    private String ext;
    private String md5;
    private String author;

    public FastDFSFile() {
    }

    public FastDFSFile(String name, byte[] content, String ext) {
        this.name = name;
        this.content = content;
        this.ext = ext;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }

    public String getExt() {
        return ext;
    }

    public void setExt(String ext) {
        this.ext = ext;
    }

    public String getMd5() {
        return md5;
    }

    public void setMd5(String md5) {
        this.md5 = md5;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }
}

5.FastDFSClient.java

@Slf4j
@Component
public class FastDFSClient implements ApplicationRunner {

    @Override
    public void run(ApplicationArguments args) throws Exception {
        try {
            String filePath = new ClassPathResource("fdfs_client.conf").getFile().getAbsolutePath();
            ClientGlobal.init(filePath);
        } catch (Exception e) {
            log.error("fastdfs初始化失败!" + e.toString());
        }
    }

    public String saveFile(MultipartFile multipartFile) throws IOException {
        String[] fileAbsolutePath = {};
        String fileName = multipartFile.getOriginalFilename();
        String ext = fileName.substring(fileName.lastIndexOf(".") + 1);
        byte[] file_buff = null;
        InputStream inputStream = multipartFile.getInputStream();
        if (inputStream != null) {
            int len1 = inputStream.available();
            file_buff = new byte[len1];
            inputStream.read(file_buff);
        }
        inputStream.close();
        FastDFSFile file = new FastDFSFile(fileName, file_buff, ext);
        try {
            fileAbsolutePath = this.upload(file);  //upload to fastdfs
        } catch (Exception e) {
            log.error("上传错误:"+ e.toString());
        }
        if (fileAbsolutePath == null) {
            log.error("上传错误!");
        }

        return fileAbsolutePath[0] + "/" + fileAbsolutePath[1];
    }
    
    /*
    文件上传
     */
    public String[] upload(FastDFSFile file) {
        log.info("File Name: " + file.getName() + "File Length:" + file.getContent().length);

        NameValuePair[] meta_list = new NameValuePair[1];
        meta_list[0] = new NameValuePair("author", file.getAuthor());

        long startTime = System.currentTimeMillis();
        String[] uploadResults = null;
        StorageClient storageClient = null;
        try {
            storageClient = getTrackerClient();
            //upload_file()三个参数:@param fileContent ①:文件的内容,字节数组 ②:文件扩展名 ③文件扩展信息 数组
            uploadResults = storageClient.upload_file(file.getContent(), file.getExt(), meta_list);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("文件上传错误:" + e.toString());
        }
        log.info("上传文件耗时: " + (System.currentTimeMillis() - startTime) + " ms");
        if (uploadResults == null && storageClient != null) {
            log.error("文件上传错误, error code:" + storageClient.getErrorCode());
        }
        String groupName = uploadResults[0];
        String remoteFileName = uploadResults[1];

        log.info("upload file successfully!!!" + "group_name:" + groupName + ", remoteFileName:" + " " + remoteFileName);
        return uploadResults;
    }

    /*
    查询文件信息
     */
    public FileInfo getFileInfo(String groupName, String remoteFileName) {
        try {
            StorageClient storageClient = getTrackerClient();
            return storageClient.get_file_info(groupName, remoteFileName);
        } catch (Exception e) {
            log.error("查询文件信息错误" + e.toString());
        }
        return null;
    }

    /*
    下载文件
     */
    public byte[] downFile(String groupName, String remoteFileName) {
        try {
            StorageClient storageClient = getTrackerClient();
            byte[] fileByte = storageClient.download_file(groupName, remoteFileName);
            return fileByte;
        } catch (Exception e) {
            log.error("下载文件错误:" + e.toString());
        }
        return null;
    }
    
    /*
    删除文件
     */
    public int deleteFile(String groupName, String remoteFileName)
            throws Exception {
        StorageClient storageClient = getTrackerClient();
        return storageClient.delete_file(groupName, remoteFileName);
    }

    /**
     * 获取storage
     */
    public StorageServer[] getStoreStorages(String groupName) throws IOException, MyException {
        TrackerClient trackerClient = new TrackerClient();
        TrackerServer trackerServer = trackerClient.getTrackerServer();
        return trackerClient.getStoreStorages(trackerServer, groupName);
    }

    public ServerInfo[] getFetchStorages(String groupName, String remoteFileName) throws IOException, MyException {
        TrackerClient trackerClient = new TrackerClient();
        TrackerServer trackerServer = trackerClient.getTrackerServer();
        return trackerClient.getFetchStorages(trackerServer, groupName, remoteFileName);
    }
    
    /**
     * 获取reacker地址
     */
    public String getTrackerUrl() throws IOException {
        return "http://" + getTrackerServer().getInetSocketAddress().getHostString() + ":" + ClientGlobal.getG_tracker_http_port() + "/";
    }
    
    /**
     * 获取tracker连接
     */
    private StorageClient getTrackerClient() throws IOException {
        TrackerServer trackerServer = getTrackerServer();
        StorageClient storageClient = new StorageClient(trackerServer, null);
        return storageClient;
    }
    
    /**
     * 获取tracker服务
     */
    private TrackerServer getTrackerServer() throws IOException {
        TrackerClient trackerClient = new TrackerClient();
        TrackerServer trackerServer = trackerClient.getTrackerServer();
        return trackerServer;
    }
}

6.UploadController.java

@Slf4j
@RestController
public class UploadController {
    @Autowired
    FastDFSClient fastDFSClient;

    private String group = "group1";

    @PostMapping("/upload")
    public String upload(@RequestParam("file") MultipartFile file) throws IOException {
        if (file.isEmpty()) {
            return "文件不能为空!";
        }
        return fastDFSClient.saveFile(file);
    }

    @GetMapping("/getFileInfo")
    public String getFile(String path) throws IOException {
        FileInfo fileInfo = fastDFSClient.getFileInfo(group, path);
        return fileInfo.toString();
    }

    @GetMapping("/downLoad")
    public ResponseEntity<byte[]> downLoad(String path) throws IOException {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentDispositionFormData("attachment", path);
        headers.setContentType(MediaType.APPLICATION_OCTET_STREAM);
        return new ResponseEntity<byte[]>(fastDFSClient.downFile(group,path),
                headers, HttpStatus.OK);
    }

    @GetMapping("/deleteFile")
    public String deleteFile(String path) throws Exception {
        return (fastDFSClient.deleteFile(group, path) > 0)+"";
    }
}

测试

运行下面h5页面

<!DOCTYPE html>
<html>
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <title>文件上传</title>
</head>
<body>
<form action="http://localhost/upload"
      method="post" enctype="multipart/form-data" onsubmit="return check()">
    请选择文件:<input id="file" type="file" name="file"
                 multiple="multiple"/><br/>
    <input type="submit" value="上传"/>
</form>
</body>
</html>
 类似资料: