<bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="${host}"/>
<property name="port" value="${port}"/>
<property name="user" value="${user}"/>
<property name="password" value="${password}"/>
</bean>
<int:channel id="inboundFileChannel"><int:queue/></int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:/chofac/data/mex/registry/outbox"
remote-directory="/chofac/SFTP/MEX/outbox"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern="*.txt">
<int:poller max-messages-per-poll="-1" fixed-rate="1000" />
</int-sftp:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="com.chofac.mint.integration.FileMessageToJobRequest">
<property name="job" ref="responseFileReaderJob"/>
<!-- <property name="fileParameterName" value="input.file.name"/> -->
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
<int:poller fixed-rate="1000"/>
</batch-int:job-launching-gateway>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
<batch:job id="responseFileReaderJob">
<batch:step id="dailyReaderJob">
<batch:tasklet>
<batch:chunk reader="dailyRRReader" writer="dailyRRDBWriter" processor="itemProcessor" commit-interval="10"/>
</batch:tasklet>
</batch:step>
</batch:job>
我正在使用下面的测试用例运行这个程序:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/inbound-ResponseReaderJobIntegration.xml"})
public class AAASftpInboundMsgJobTriggerTest {
@Resource(name="inboundFileChannel")
PollableChannel localFileChannel;
@Test
public void runDemo(){
System.out.println("Received first file message: " + localFileChannel.receive());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
我得到这个错误:
当我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和StachOverflow的建议,所有这些都导致了不同的其他错误,我似乎从主要问题上转移了注意力。
最常见的建议是添加一个全局轮询器,但这会导致以下错误:
<int:poller default="true" fixed-delay="50"/>
原因:java.lang.IllegalArgumentException:不应为终结点“org.SpringFramework.Integration.config.ConsumerEndpointFactoryBean#1”指定轮询器,因为“outbound JobRequestChannel”是SubscribableChannel(不可轮询)。
(我是所有这些方面的新手,Spring、Spring批处理和Spring集成)任何帮助都将非常感谢。提前道谢。
更新1
我删除了#2中的投票器,如下所示
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>
并删除了全局轮询器
<int:poller default="true" fixed-delay="50"/>
<int:poller default="true" fixed-delay="50"/>
如果删除以下异常
Exception in thread "main" org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'inboundFileChannel' must be of type [org.springframework.messaging.PollableChannel], but was actually of type [org.springframework.integration.channel.DirectChannel] at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:376) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:979) at com.chofac.mint.batchintegration.SftpInboundMsgJobTriggerMain.main(SftpInboundMsgJobTriggerMain.java:16)
下面是我的配置:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:${inbound.local.directory}"
remote-directory="${inbound.remote.directory}"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern="*.*">
<int:poller max-messages-per-poll="-1" fixed-rate="45000" />
<!-- <int:poller max-messages-per-poll="-1" cron="${MEX.CRON.PATTERN}"/> -->
<!-- 0 15 10 ? * MON-FRI -->
</int-sftp:inbound-channel-adapter>
<int:channel id="inboundFileChannel"></int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<!-- <int:poller default="true" cron="${MEX.CRON.PATTERN}"/> -->
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="com.chofac.mint.integration.FileMessageToJobRequest">
<property name="job" ref="responseFileReaderJob"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
我从主程序触发此操作,如下所示:
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/batchintegration/inboundSFTPJob.xml");
PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:${inbound.local.directory}"
remote-directory="${inbound.remote.directory}"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern ="*.*"
local-filter="acceptAllFileListFilter">
<int:poller max-messages-per-poll="-1" fixed-rate="45000" />
<bean id="acceptAllFileListFilter" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>
程序运行到一个有异常的连续循环中。我觉得轮询发生得非常频繁(每秒),尽管我在这里为轮询器的完整配置和日志指定了45秒。
OutboundJobRequestChannel
是一个SubscribableChannel
,因此在batch-int:job-launking-gateway上不能有
。InboundFileChannel
是一个QueueChannel
,因此它的使用者需要一个轮询器(转换器)。
请注意bean名称中的#0
、#1
。int-sftp:inbound-channel-adapter
(#0
)正确地具有轮询器;把另一个拿掉就行了。
对于这种情况,您并不真正需要全局(默认)轮询器(但它正在用于您的transformer)。
编辑:
(答复对你问题的评论)。
这是默认行为。默认情况下,local-filter
是AcceptonCEFilelistFilter
。如果在下一轮投票之前删除该文件,则可以将其更改为AcceptAllFileListFilter
。
持久筛选器使用元数据存储并使用文件名和lastmodified
时间戳来确定筛选器是否应该接受(传递)该文件。
编辑2:
该示例只是将文件转储到队列通道中,并在主
中接收;您已经订购了变压器。
PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());
我阅读了Spring集成指南和这里的示例,获得了Spring集成SFTP程序的工作示例。我已经有了一个可以工作的Spring批处理程序,它可以读取大量文件并转储到数据库中。 我现在正试图通过查看Spring文档来集成Spring批处理和Spring集成程序,我创建了以下配置。 我使用下面的测试用例运行这个程序: 我得到了这个错误: 在我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和St
我们有一个作业,它使用cron表达式在下午1点到5点之间每隔M-F轮询一次文件和数据库。在此期间,如果文件到达,它将下载文件并调用作业。这很好,我们使用了spring集成和批处理。 现在,我们需要一些定制,其中我们有多个作业,其中job1应该像上面一样轮询。一旦文件处理成功,它应该停止轮询。 第二个要求是,若在轮询期间并没有收到文件,我们希望向ops团队发送一些通知,以便他们可以采取一些行动。
如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。
我正在将Spring Boot项目与Spring批处理和数据jpa项目集成。所有与作业和数据配置相关的东西都是正确的,除了将我的作业编写器结果保存在数据库中。在我读取文件并对其进行处理后,我无法将其写入mysql数据库。没有错误,但也没有插入。有趣的是我的数据源已配置。因为在插入之前,我可以从数据库中获取示例记录。请协助我解决这个问题。 我的申请。属性: 批次配置: 道类: 作家类: temPer
如果spring集成webflux流中发生异常,则异常本身(带有stacktrace)通过MessagePublishingErrorHandler作为有效负载发送回调用方,该处理器使用来自“errorChannel”头的错误通道,而不是默认错误通道。 如何设置类似于WebExceptionHandler的错误处理程序?我想生成一个Http状态代码,并可能生成一个DefaultErrorAttri
我希望能够使用在作业配置中定义的以相同的参数(基本上是相同的文件)重新启动作业(可能是因为应用程序已经重新启动,或者由于某些原因我们再次收到了相同的文件)。 不幸的是,run.id=1没有增加,我得到一个 作业配置 多谢