我阅读了Spring集成指南和这里的示例,获得了Spring集成SFTP程序的工作示例。我已经有了一个可以工作的Spring批处理程序,它可以读取大量文件并转储到数据库中。
我现在正试图通过查看Spring文档来集成Spring批处理和Spring集成程序,我创建了以下配置。
<bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="${host}"/>
<property name="port" value="${port}"/>
<property name="user" value="${user}"/>
<property name="password" value="${password}"/>
</bean>
<int:channel id="inboundFileChannel"><int:queue/></int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:/chofac/data/mex/registry/outbox"
remote-directory="/chofac/SFTP/MEX/outbox"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern="*.txt">
<int:poller max-messages-per-poll="-1" fixed-rate="1000" />
</int-sftp:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="com.chofac.mint.integration.FileMessageToJobRequest">
<property name="job" ref="responseFileReaderJob"/>
<!-- <property name="fileParameterName" value="input.file.name"/> -->
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
<int:poller fixed-rate="1000"/>
</batch-int:job-launching-gateway>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
<batch:job id="responseFileReaderJob">
<batch:step id="dailyReaderJob">
<batch:tasklet>
<batch:chunk reader="dailyRRReader" writer="dailyRRDBWriter" processor="itemProcessor" commit-interval="10"/>
</batch:tasklet>
</batch:step>
</batch:job>
我使用下面的测试用例运行这个程序:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/inbound-ResponseReaderJobIntegration.xml"})
public class AAASftpInboundMsgJobTriggerTest {
@Resource(name="inboundFileChannel")
PollableChannel localFileChannel;
@Test
public void runDemo(){
System.out.println("Received first file message: " + localFileChannel.receive());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
我得到了这个错误:
在我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和StachOverflow的建议,所有这些都导致了其他不同的错误,我似乎偏离了主要问题。
最常见的建议是添加一个全球民意调查,但这导致了以下错误:
<int:poller default="true" fixed-delay="50"/>
原因:java。lang.IllegalArgumentException:不应为endpoint的组织指定轮询器。springframework。整合。配置。ConsumerEndpointFactoryBean#1’,因为“outboundJobRequestChannel”是SubscribableChannel(不可轮询)。
(我在所有这些方面都是新手,Spring,Spring批处理和Spring集成)任何帮助都将不胜感激。事先谢谢。
更新1
我删除了#2中的轮询器,如下所示
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>
并删除了全球民意调查
<int:poller default="true" fixed-delay="50"/>
我得到以下例外:
线程“main”组织中出现异常。springframework。豆。工厂BeanCreationException:创建名为“org”的bean时出错。springframework。整合。配置。ConsumerEndpointFactoryBean#0”:调用init方法失败;嵌套的例外是java。lang.IllegalArgumentException:尚未为endpoint的组织定义轮询器。springframework。整合。配置。ConsumerEndpointFactoryBean#0',上下文中没有可用的默认轮询器。在org。springframework。豆。工厂支持AbstractAutoWireAbleBeanFactory。org上的initializeBean(AbstractAutowireCapableBeanFactory.java:1553)。springframework。豆。工厂支持AbstractAutoWireAbleBeanFactory。org上的doCreateBean(AbstractAutowireCapableBeanFactory.java:539)。springframework。豆。工厂支持AbstractAutoWireAbleBeanFactory。在org上创建Bean(AbstractAutowireCapableBeanFactory.java:475)。springframework。豆。工厂支持AbstractBeanFactory 1美元。org上的getObject(AbstractBeanFactory.java:304)。springframework。豆。工厂支持DefaultSingletonBeanRegistry。getSingleton(DefaultSingletonBeanRegistry.java:228)位于org。springframework。豆。工厂支持抽象工厂。org上的doGetBean(AbstractBeanFactory.java:300)。springframework。豆。工厂支持抽象工厂。org上的getBean(AbstractBeanFactory.java:195)。springframework。豆。工厂支持DefaultListableBeanFactory。org上的预实例化单例(DefaultListableBeanFactory.java:681)。springframework。上下文支持AbstractApplicationContext。在org上完成BeanFactoryInitialization(AbstractApplicationContext.java:760)。springframework。上下文支持AbstractApplicationContext。在org上刷新(AbstractApplicationContext.java:482)。springframework。上下文支持ClassPathXmlApplicationContext。(ClassPathXmlApplicationContext.java:139)网址:org。springframework。上下文支持ClassPathXmlApplicationContext。(ClassPathXmlApplicationContext.java:93)在com上。乔法克。造币厂整合。下载文件runbatch。main(DownloadFileRunBatch.java:15)由:java引起。lang.IllegalArgumentException:尚未为endpoint的组织定义轮询器。springframework。整合。配置。ConsumerEndpointFactoryBean#0',上下文中没有可用的默认轮询器。在org。springframework。util。明确肯定org上的notNull(Assert.java:112)。springframework。整合。配置。ConsumerdPointFactoryBean。在org上初始化EndPoint(ConsumerEndpointFactoryBean.java:238)。springframework。整合。配置。ConsumerdPointFactoryBean。AfterPropertieSet(ConsumerEndpointFactoryBean.java:187)位于org。springframework。豆。工厂支持AbstractAutoWireAbleBeanFactory。org上的invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1612)。springframework。豆。工厂支持AbstractAutoWireAbleBeanFactory。initializeBean(AbstractAutowireCapableBeanFactory.java:1549)。。。还有12个
然而,如果我离开全局轮询器,SFTP传输将发生,作业将被触发
<int:poller default="true" fixed-delay="50"/>
更新二
如果我删除
Exception in thread "main" org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'inboundFileChannel' must be of type [org.springframework.messaging.PollableChannel], but was actually of type [org.springframework.integration.channel.DirectChannel] at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:376) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:979) at com.chofac.mint.batchintegration.SftpInboundMsgJobTriggerMain.main(SftpInboundMsgJobTriggerMain.java:16)
下面是我的配置:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:${inbound.local.directory}"
remote-directory="${inbound.remote.directory}"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern="*.*">
<int:poller max-messages-per-poll="-1" fixed-rate="45000" />
<!-- <int:poller max-messages-per-poll="-1" cron="${MEX.CRON.PATTERN}"/> -->
<!-- 0 15 10 ? * MON-FRI -->
</int-sftp:inbound-channel-adapter>
<int:channel id="inboundFileChannel"></int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<!-- <int:poller default="true" cron="${MEX.CRON.PATTERN}"/> -->
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="com.chofac.mint.integration.FileMessageToJobRequest">
<property name="job" ref="responseFileReaderJob"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
我从一个主程序触发如下:
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/batchintegration/inboundSFTPJob.xml");
PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());
更新三
web上的示例配置、JUnit和Spring示例。
更新4
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:${inbound.local.directory}"
remote-directory="${inbound.remote.directory}"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern ="*.*"
local-filter="acceptAllFileListFilter">
<int:poller max-messages-per-poll="-1" fixed-rate="45000" />
<bean id="acceptAllFileListFilter" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>
程序运行到一个有异常的连续循环中。我觉得轮询非常频繁(每秒),尽管我在这里为轮询器完整配置和日志指定了45秒。
outboundJobRequestChannel
是一个SubscribableChannel
,因此您不能拥有
注意bean名称中的
#0
,#1
。int-sftp:inbound channel adapter
(#0
)正确地具有轮询器;只需移除另一个。
在这种情况下,您并不真正需要全局(默认)轮询器(但它正在用于变压器)。
编辑:
(回复对您问题的评论)。
这是默认行为。默认情况下,
local过滤器
是一个AcceptOnceFileListFilter
。如果在下次轮询之前删除该文件,则可以将其更改为AcceptAllFileListFilter
。
如果要将文件保留在磁盘上,但检测到它已被更改,请使用
filesystemtempersistanceptoncefileListFilter
;由于不删除远程文件,还应将过滤器设置为
sftpersistentAcceptonFileListFilter
(实际上是包装此过滤器和其中一个模式过滤器的CompositeFileListFilter
)。
否则,您将继续获取相同的文件,并且,如果没有
preseve-time戳
,本地过滤器每次都会认为这是一个新文件。
持久过滤器使用元数据存储,并使用文件名和
lastModified
时间戳来确定过滤器是否应该接受(传递)该文件。
编辑2:
该示例只是将文件转储到队列通道中,并在
main
中接收;您订阅了一个变压器。
是这个密码。。。
PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());
这会给你带来当前的悲伤——只需从你的主屏幕上删除这些线条。
我正在使用下面的测试用例运行这个程序: 我得到这个错误: 当我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和StachOverflow的建议,所有这些都导致了不同的其他错误,我似乎从主要问题上转移了注意力。 最常见的建议是添加一个全局轮询器,但这会导致以下错误: 原因:java.lang.IllegalArgumentException:不应为终结点“org.SpringFramewo
我们有一个作业,它使用cron表达式在下午1点到5点之间每隔M-F轮询一次文件和数据库。在此期间,如果文件到达,它将下载文件并调用作业。这很好,我们使用了spring集成和批处理。 现在,我们需要一些定制,其中我们有多个作业,其中job1应该像上面一样轮询。一旦文件处理成功,它应该停止轮询。 第二个要求是,若在轮询期间并没有收到文件,我们希望向ops团队发送一些通知,以便他们可以采取一些行动。
如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。
我正在将Spring Boot项目与Spring批处理和数据jpa项目集成。所有与作业和数据配置相关的东西都是正确的,除了将我的作业编写器结果保存在数据库中。在我读取文件并对其进行处理后,我无法将其写入mysql数据库。没有错误,但也没有插入。有趣的是我的数据源已配置。因为在插入之前,我可以从数据库中获取示例记录。请协助我解决这个问题。 我的申请。属性: 批次配置: 道类: 作家类: temPer
如果spring集成webflux流中发生异常,则异常本身(带有stacktrace)通过MessagePublishingErrorHandler作为有效负载发送回调用方,该处理器使用来自“errorChannel”头的错误通道,而不是默认错误通道。 如何设置类似于WebExceptionHandler的错误处理程序?我想生成一个Http状态代码,并可能生成一个DefaultErrorAttri
我希望能够使用在作业配置中定义的以相同的参数(基本上是相同的文件)重新启动作业(可能是因为应用程序已经重新启动,或者由于某些原因我们再次收到了相同的文件)。 不幸的是,run.id=1没有增加,我得到一个 作业配置 多谢