最近这两天一直在弄大文件的分片上传和下载,前端选择文件,然后使用百度的webuploader切的文件(1M一份),然后调用ftp的api上传到了服务器上,然后再调用api对文件就行追加,其中遇到了文件损坏,输入流为null的问题,都一一解决掉了,现在记录下。 一、工具类如下:(jar包 commons-net 3.6) @Data public class IotFtpService { private Logger log = LoggerFactory.getLogger(this.getClass()); private static String directory; /** * ftpClient连接池初始化标志 */ private boolean hasInit = false; /** * ftpClient连接池 */ private ObjectPool<FTPClient> ftpClientPool; /** * 上传文件 * * @param pathname ftp服务保存地址 * @param fileName 上传到ftp的文件名 * @param originFilename 待上传文件的名称(绝对地址) * * @return */ public boolean uploadFile(String pathname, String fileName, String originFilename) { FTPClient ftpClient = getFtpClient(); boolean flag = false; InputStream inputStream = null; try { ftpClient.setFileType(FTP.BINARY_FILE_TYPE); log.info("开始上传文件"); inputStream = new FileInputStream(new File(originFilename)); CreateDirecroty(pathname, ftpClient, fileName); ftpClient.makeDirectory(pathname); ftpClient.changeWorkingDirectory(pathname); //ftpClient.storeFile(fileName, inputStream);//上传文件 ftpClient.appendFile(fileName, inputStream);//追加文件 inputStream.close(); flag = true; log.info("上传文件成功"); } catch (Exception e) { log.error("上传文件失败"); e.printStackTrace(); } finally { releaseFtpClient(ftpClient); } return flag; } /** * 上传文件 * * @param pathname ftp服务保存地址 * @param fileName 上传到ftp的文件名 * @param inputStream 输入文件流 * @return */ public boolean uploadFile(String pathname, String fileName, InputStream inputStream) { boolean flag = false; FTPClient ftpClient = getFtpClient(); try { log.info("开始上传文件"); ftpClient.setFileType(FTP.BINARY_FILE_TYPE); CreateDirecroty(pathname, ftpClient, fileName); ftpClient.makeDirectory(pathname); ftpClient.changeWorkingDirectory(pathname); ftpClient.storeFile(fileName, inputStream); inputStream.close(); flag = true; log.info("上传文件成功"); } catch (Exception e) { log.error("上传文件失败"); e.printStackTrace(); } finally { releaseFtpClient(ftpClient); } return flag; } /** * 合并分片文件 */ public void mergeChunk(String fileName) { } /** * 显示路径下的分片文件 * * @param dir */ public void showDirFiles(String dir) { List<byte[]> listByte = new ArrayList<>(); FTPClient ftpClient = getFtpClient(); try { log.info("select"); ftpClient.changeWorkingDirectory(dir); ftpClient.setFileType(FTP.BINARY_FILE_TYPE); FTPFile[] list = ftpClient.listFiles(); //排序目标 for (int i = 0; i < list.length; i++) { for (int j = i + 1; j < list.length; j++) { if (Integer.parseInt(list[i].getName()) > Integer.parseInt(list[j].getName())) { FTPFile o =list[i]; list[i]=list[j]; list[j]=o; } } System.out.println("查看排序"+list[i].getName()); } InputStream in = null; for (int i = 0; i < list.length; i++) { try { log.info(">>>>>>>>" + list[i].getName()); //读取小文件的输入流 in = ftpClient.retrieveFileStream(encodingPath("/data/eanupload/xx.tar.gz.tmp/" + list[i].getName())); byte[] bytes = IOUtils.toByteArray(in); //关闭输入流 in.close(); //ftp传输结束 ftpClient.completePendingCommand(); listByte.add(bytes); } catch (Exception e) { e.printStackTrace(); } /*file.delete();*/ } //开始合并文件 for (byte[] bytes : listByte) { in = new ByteArrayInputStream(bytes); boolean flag = ftpClient.appendFile("/data/eanupload/test2.gz", in); in.close(); } } catch (Exception e) { e.printStackTrace(); } finally { releaseFtpClient(ftpClient); } } /** * 下载文件 * * * @param pathname FTP服务器文件目录 * * @param filename 文件名称 * * @param localpath 下载后的文件路径 * * @return */ public boolean downloadFile(String pathname, String filename, String localpath) { boolean flag = false; String remote = pathname + "/" + filename; OutputStream os = null; // 获取本地文件大小 long checkLocalFile = checkLocalFile(localpath + "/" + filename); FTPClient ftpClient = getFtpClient(); try { ftpClient.setFileType(FTP.BINARY_FILE_TYPE); log.info("开始下载文件"); //切换FTP目录 ftpClient.changeWorkingDirectory(pathname); FTPFile[] ftpFiles = ftpClient.listFiles(); // 判断本地文件是否存在 if (checkLocalFile != 0) { System.out.println("本地文件已存在"); // 存在,则进行断点下载 for (FTPFile ff : ftpFiles) { if (ff.getName().equals(filename)) { checkLocalFile = downBreakPoint(localpath, checkLocalFile, ff, remote); } flag = true; } } else { // 直接下载 for (FTPFile file : ftpFiles) { if (filename.equalsIgnoreCase(file.getName())) { File localFile = new File(localpath + "/" + file.getName()); os = new FileOutputStream(localFile); ftpClient.retrieveFile(file.getName(), os); os.close(); } } flag = true; log.info("下载文件成功"); } } catch (Exception e) { log.error("下载文件失败"); e.printStackTrace(); } finally { releaseFtpClient(ftpClient); if (null != os) { try { os.close(); } catch (IOException e) { e.printStackTrace(); } } } return flag; } /** * 断点下载代码段 * * @param localPath * @param checkLocalFile * @param ff * @param remote 远程服务器路径+文件名 * @return checkLocalFile 返回long值 * @throws FileNotFoundException * @throws IOException * @throws UnsupportedEncodingException */ public long downBreakPoint(String localPath, long checkLocalFile, FTPFile ff, String remote) throws IOException { FTPClient ftpClient = getFtpClient(); // 获得服务器文件大小 long RemoteSize = ff.getSize(); if (RemoteSize <= checkLocalFile) { System.out.println("文件已下载"); } else { // 断点下载 File localFile = new File(localPath + "/" + ff.getName()); OutputStream out = new FileOutputStream(localFile); InputStream in = ftpClient.retrieveFileStream(remote);//服务器路径+文件名 ftpClient.setRestartOffset(checkLocalFile); byte[] bytes = new byte[1024]; long step = RemoteSize / 100; long process = checkLocalFile / step; int c; while ((c = in.read(bytes)) != -1) { out.write(bytes, 0, c); checkLocalFile += c; long nowProcess = RemoteSize / step; if (nowProcess > process) { process = nowProcess; if (process % 10 == 0) System.out.println("下载进度:" + process); //TODO 更新文件下载进度,值存放在process变量中 } } in.close(); out.close(); ftpClient.completePendingCommand(); releaseFtpClient(ftpClient);//释放ftp客户端 } return checkLocalFile; } /** * 断点续传 * * @param remotePath 远程路径 * @param filename 本地上传的文件名 * @param file 绝对路径 * @return * @throws IOException */ public boolean uploadFileBreakPoint(String filename, File file, String remotePath) { boolean status = false; FTPClient ftpClient = getFtpClient(); try { // 转移目录到远端服务器目录 ftpClient.changeWorkingDirectory(remotePath); String f = new String(filename.getBytes("GBK"), "iso-8859-1"); // 检查ftp服务器中目录否已存在 boolean result = CreateDirecroty(remotePath, ftpClient, filename); if (result) { // 文件存在,判断文件大小进行选择上传 System.out.println("文件已存在"); // 1.服务器文件大于将要上传文件,不需要上传,或者重新上传 FTPFile[] listFiles = ftpClient.listFiles(remotePath + "/" + f); System.out.println(file.length()); if (listFiles[0].getSize() >= file.length()) { System.out.println("不需要上传"); return true; } else { //开始执行断点续传 status = resumeBreakPoint(filename, file, listFiles, remotePath); } } else { //文件不存在直接上传 uploadFile(remotePath, filename, new FileInputStream(file)); } } catch (IOException e) { e.printStackTrace(); } finally { releaseFtpClient(ftpClient);//释放sftp } return status; } /** * 断点上传代码段 * * @param filename * @param file * @param listFiles * @return * @throws FileNotFoundException * @throws IOException * @throws UnsupportedEncodingException */ private boolean resumeBreakPoint(String filename, File file, FTPFile[] listFiles, String remotePath) throws IOException { FTPClient ftpClient = getFtpClient(); boolean status; // 2.服务器文件小于将要上传文件,进行断点续传 // 显示上传的进度 // long step = file.length() / 10; // long process = 0; long localreadbytes = 0L; RandomAccessFile raf = new RandomAccessFile(file, "rw"); OutputStream out = ftpClient.appendFileStream(remotePath + "/" + filename); // 断点续传 ftpClient.setRestartOffset(listFiles[0].getSize()); // process = listFiles[0].getSize() % step; raf.seek(listFiles[0].getSize()); localreadbytes = listFiles[0].getSize(); byte[] bytes = new byte[1024]; int c; while ((c = raf.read(bytes)) != -1) { out.write(bytes, 0, c); localreadbytes += c; // if (localreadbytes / step != process) { // process = localreadbytes / step; // System.out.println("上传进度:" + process); // } } out.flush(); raf.close(); out.close(); boolean result = ftpClient.completePendingCommand(); if (listFiles[0].getSize() > 0) { status = result ? true : false; } else { status = result ? true : false; } releaseFtpClient(ftpClient);//释放ftp客户端 return status; } /** * 删除文件 * * * @param pathname FTP服务器保存目录 * * @param filename 要删除的文件名称 * * @return */ public boolean deleteFile(String pathname, String filename) { boolean flag = false; FTPClient ftpClient = getFtpClient(); try { log.info("开始删除文件"); //切换FTP目录 ftpClient.changeWorkingDirectory(pathname); ftpClient.dele(filename); flag = true; log.info("删除文件成功"); } catch (Exception e) { log.error("删除文件失败"); e.printStackTrace(); } finally { releaseFtpClient(ftpClient); } return flag; } /** * 按行读取FTP文件 * * @param remoteFilePath 文件路径(path+fileName) * @return * @throws IOException */ public List<String> readFileByLine(String remoteFilePath) throws IOException { FTPClient ftpClient = getFtpClient(); try (InputStream in = ftpClient.retrieveFileStream(encodingPath(remoteFilePath)); BufferedReader br = new BufferedReader(new InputStreamReader(in))) { return br.lines().map(line -> StrUtil.trimToEmpty(line)) .filter(line -> StrUtil.isNotEmpty(line)).collect(Collectors.toList()); } finally { ftpClient.completePendingCommand(); releaseFtpClient(ftpClient); } } /** * 获取指定路径下FTP文件 * * @param remotePath 路径 * @return FTPFile数组 * @throws IOException */ public FTPFile[] retrieveFTPFiles(String remotePath) throws IOException { FTPClient ftpClient = getFtpClient(); try { return ftpClient.listFiles(encodingPath(remotePath + "/"), file -> file != null && file.getSize() > 0); } finally { releaseFtpClient(ftpClient); } } /** * 获取指定路径下FTP文件名称 * * @param remotePath 路径 * @return ftp文件名称列表 * @throws IOException */ public List<String> retrieveFileNames(String remotePath) throws IOException { FTPFile[] ftpFiles = retrieveFTPFiles(remotePath); if (ArrayUtil.isEmpty(ftpFiles)) { return new ArrayList<>(); } return Arrays.stream(ftpFiles).filter(Objects::nonNull) .map(FTPFile::getName).collect(Collectors.toList()); } /** * 编码文件路径 */ private static String encodingPath(String path) throws UnsupportedEncodingException { // FTP协议里面,规定文件名编码为iso-8859-1,所以目录名或文件名需要转码 return new String(path.replaceAll("//", "/").getBytes("GBK"), "iso-8859-1"); } /** * 获取ftpClient * * @return */ private FTPClient getFtpClient() { checkFtpClientPoolAvailable(); FTPClient ftpClient = null; Exception ex = null; // 获取连接最多尝试3次 for (int i = 0; i < 3; i++) { try { ftpClient = ftpClientPool.borrowObject(); ftpClient.enterLocalPassiveMode();//被动模式 ftpClient.changeWorkingDirectory("/"); break; } catch (Exception e) { ex = e; } } if (ftpClient == null) { throw new RuntimeException("Could not get a ftpClient from the pool", ex); } return ftpClient; } /** * 释放ftpClient */ private void releaseFtpClient(FTPClient ftpClient) { if (ftpClient == null) { return; } try { ftpClientPool.returnObject(ftpClient); } catch (Exception e) { log.error("Could not return the ftpClient to the pool", e); // destoryFtpClient if (ftpClient.isAvailable()) { try { ftpClient.logout();//尝试解决文件损坏问题 ftpClient.disconnect(); } catch (IOException io) { } } } } /** * 检查ftpClientPool是否可用 */ private void checkFtpClientPoolAvailable() { Assert.state(hasInit, "FTP未启用或连接失败!"); } /** * 创建多层目录文件,如果有ftp服务器已存在该文件,则不创建,如果无,则创建 * * @param remote * @param ftpClient * @return * @throws IOException true文件存在 false文件不存在 */ public boolean CreateDirecroty(String remote, FTPClient ftpClient, String filename) throws IOException { boolean success = false; //检查远程目录是否存在情况 checkRemote(remote); // 如果远程目录不存在,则递归创建远程服务器目录 if (!directory.equalsIgnoreCase("/") && !changeWorkingDirectory(new String(directory), ftpClient)) { int start = 0; int end = 0; if (directory.startsWith("/")) { start = 1; } else { start = 0; } end = directory.indexOf("/", start); String path = ""; String paths = ""; while (true) { // 对目录进行转码 String subDirectory = new String(remote.substring(start, end).getBytes("GBK"), "iso-8859-1"); path = path + "/" + subDirectory; if (!checkRemoteFile(path, ftpClient)) { if (makeDirectory(subDirectory, ftpClient)) { changeWorkingDirectory(subDirectory, ftpClient); } else { System.out.println("创建目录[" + subDirectory + "]失败"); changeWorkingDirectory(subDirectory, ftpClient); } } else { changeWorkingDirectory(subDirectory, ftpClient); } paths = paths + "/" + subDirectory; start = end + 1; end = directory.indexOf("/", start); // 检查所有目录是否创建完毕 if (end <= start) { break; } } } else { //目录存在则校验服务器文件是否存在 success = checkRemoteFile(filename, ftpClient); } return success; } /** * 改变目录路径 * * @param directory * @param ftpClient * @return */ public boolean changeWorkingDirectory(String directory, FTPClient ftpClient) { boolean flag = true; try { flag = ftpClient.changeWorkingDirectory(directory); if (flag) { System.out.println("进入文件夹" + directory + " 成功!"); } else { System.out.println("进入文件夹" + directory + " 失败!开始创建文件夹"); } } catch (IOException ioe) { ioe.printStackTrace(); } return flag; } /** * /创建目录 * * @param dir * @param ftpClient * @return */ public boolean makeDirectory(String dir, FTPClient ftpClient) { boolean flag = true; try { flag = ftpClient.makeDirectory(dir); if (flag) { log.info("创建文件夹" + dir + " 成功!"); } else { log.info("创建文件夹" + dir + " 失败!"); } } catch (Exception e) { e.printStackTrace(); } return flag; } /** * 判断ftp服务器文件是否存在 * * @param filename * @param ftp * @return * @throws IOException */ public boolean checkRemoteFile(String filename, FTPClient ftp) throws IOException { boolean flag = false; FTPFile[] ftpFileArr = ftp.listFiles(filename); for (FTPFile ftpFile : ftpFileArr) { if (ftpFileArr.length > 0) { flag = true;//存在 } } return flag; } /** * 检查本地文件是否存在 * * @param localPath 本地文件路径 * @return */ public long checkLocalFile(String localPath) { File file = new File(localPath); if (file.exists()) { return file.length(); } return 0; } /** * 检查远程目录 * * @param remote 远程服务器目录 */ private static void checkRemote(String remote) { char c = remote.charAt(remote.length() - 1); if ("/".charAt(0) == c) { directory = remote; } else { directory = remote + "/"; } } }
二、注意事项
在分片文件进行合并的时候,我使用的是 ftpClient.appendFile(fileName, inputStream);
1.该方法需要设置ftpClient的传输方式,详情见代码
2.分片的文件需要排序后再合并,否则会把文件损坏切记。