使用spring-integration-ftp,在指定目录下每天动态生成一个日期目录,将文件上传至该目录下
由于本人从事信贷系统开发工作,在工作中经常要和核心系统、支付系统、用户中心系统有文件交互(日终批量系统使用spring batch),所以需要使用ftp文件上传下载。以前文件的上传下载都是使用apache的ftp开发框架解决,代码量虽然不大,但是要写的代码还是不少。本次决定使用spring-integration解决方案。
- 导入ftp的依赖
<!-- 项目使用的spring boot 1.5.4、spring batch -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
<version>4.3.9.RELEASE</version>
</dependency>
- 配置ftp服务器信息
uc.ftp.host=localhost
uc.ftp.port=2121
uc.ftp.username=xulong
uc.ftp.password=123456
remote.directory=/share
- 配置ftp配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/ftp
http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd">
<!-- ftp配置 -->
<context:property-placeholder location="classpath:ftp/ftp-dev.properties" ignore-unresolvable="true"/>
<int:channel id="channel">
<int:queue/>
</int:channel>
<bean id="ftpSessionFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="${uc.ftp.host}"/>
<property name="port" value="${uc.ftp.port}"/>
<property name="username" value="${uc.ftp.username}"/>
<property name="password" value="${uc.ftp.password}"/>
</bean>
<bean id="cachingSessionFactory" class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg ref="ftpSessionFactory"/>
</bean>
<int-ftp:outbound-channel-adapter id="ftpOutboundChannel" channel="channel" charset="UTF-8"
session-factory="ftpSessionFactory" auto-create-directory="true"
remote-directory-expression="@targetDir.get()" remote-file-separator="/">
<!--远程目录可变:remote-directory-expression="@targetDir.get()"-->
<int:poller fixed-rate="1000"/>
<int-ftp:request-handler-advice-chain>
<int:retry-advice/>
</int-ftp:request-handler-advice-chain>
</int-ftp:outbound-channel-adapter>
<!-- 重点在这,每次上传文件都会读取该对象的get方法返回值 -->
<bean id="targetDir" class="java.util.concurrent.atomic.AtomicReference">
<constructor-arg value="${remote.directory}"/>
</bean>
</beans>
- 实现上传代码
package com.csii.loan.ftp;
// 有没有很眼熟,spring的ftp也是对apache的ftp进行了一次包装,但是代码减少了不少
import org.apache.commons.net.ftp.FTPClient;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;
import org.springframework.integration.ftp.session.FtpSession;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
/**
* Created by xulong on 2017/9/16.
* xulong1@126.com
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:ftp/ftp.xml"})
public class FtpOutboundTestCase {
@Resource(name = "channel")
private MessageChannel channel;
// @Resource
// private Environment environment;
@Resource
private DefaultFtpSessionFactory ftpSessionFactory;
private Logger logger = LoggerFactory.getLogger(FtpOutboundTestCase.class);
@Resource(name = "targetDir")
private AtomicReference targetDir;
private void sendFile(String path, String filePath) throws IOException, InterruptedException {
FtpSession session = ftpSessionFactory.getSession();
if (logger.isDebugEnabled()) {
logger.debug("current session is:[{}]", session.hashCode());
}
FTPClient ftpClient = session.getClientInstance();
boolean success = ftpClient.changeWorkingDirectory(path);
if (logger.isDebugEnabled()) {
logger.debug("切换工作目录是否成功:【{}】", success);
}
if (!success) {
session.mkdir(path);
}
// TODO session.exists有严重的bug,跟踪源码发现的
// if (!session.exists(path)) {
// session.mkdir(path);
// }
// 此处要修改值,否则上传的目录地址还是默认的目录地址
targetDir.set(path);
File localFile = new File(filePath);
Message<File> message = MessageBuilder.withPayload(localFile).build();
channel.send(message);
Thread.sleep(2000L);
session.close();
}
@Test
public void testUploadFile() throws InterruptedException, IOException {
String path = "/share/20170916";// 模拟每天动态生成一个yyyyMMdd的目录,当天的所有文件都上传到该目录下
String localFilePath = "/Users/xulong/idea_wrk_spc/partition.log";
sendFile(path, localFilePath);
path = "/share/20170917";
sendFile(path, localFilePath);
}
}