当前位置: 首页 > 知识库问答 >
问题:

RemoteChunking中的Spring批处理集成超时

皮承基
2023-03-14

我试图使用Spring Boot、Spring Batch和Spring集成来配置一个RemoteChunking任务。

我已经配置了ActiveMQ服务器,并按照官方文档https://docs.Spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-chunking开始配置Spring批处理。

我的主配置:

import com.arrobaautowired.payment.Payment;
import com.arrobaautowired.record.Record;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;

@Configuration
@Slf4j
@EnableBatchProcessing
public class MasterBatchConfiguration {

    private final static String MASTER_JOB_TEST = "JOB_MASTER";
    private final static String MATER_JOB_STEP = "STEP-1";
    private final static int CHUNK_SIZE = 50;

    private JobBuilderFactory jobBuilderFactory;
    private StepBuilderFactory stepBuilderFactory;
    private MultiResourceItemReader<Record> filesReader;
    private StepListener stepListener;


    @Autowired
    public MasterBatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, MultiResourceItemReader<Record> filesReader, StepListener stepListener) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.filesReader = filesReader;
        this.stepListener = stepListener;
    }

@Bean
public Job processRecordsJob(JobCompletionNotificationListener listener, Step step1) {
    return jobBuilderFactory
            .get(MASTER_JOB_TEST)
            .listener(listener)
            .flow(step1)
            .end()
            .build();
}

@Bean
public TaskletStep step1() {
    return stepBuilderFactory.get(MATER_JOB_STEP)
            .<Record, Payment>chunk(CHUNK_SIZE)
            .reader(filesReader)
            .writer(itemWriter())
            .listener(stepListener)
            .build();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    factory.setTrustAllPackages(Boolean.TRUE);
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow jmsOutboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from("requests")
            .handle(Jms
                    .outboundAdapter(connectionFactory)
                    .destination("requests"))
            .get();
}


/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter.
 * Se trata de un ItemWriter especial, {@link ChunkMessageChannelItemWriter}, que se encarga de enviar la información al pooleer (Middleware externo) y recogerla.
 */
@Bean
@StepScope
public ChunkMessageChannelItemWriter<Payment> itemWriter() {

    ChunkMessageChannelItemWriter<Payment> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate());
    chunkMessageChannelItemWriter.setReplyChannel(replies());

    return chunkMessageChannelItemWriter;
}

@Bean
public MessagingTemplate messagingTemplate(){

    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    return messagingTemplate;
}
}
import com.arrobaautowired.processor.PaymentWriter;
import com.arrobaautowired.processor.ComplexRecordProcessor;
import com.arrobaautowired.processor.SimpleRecordProcessor;
import com.arrobaautowired.record.Record;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.batch.core.step.item.SimpleChunkProcessor;
import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;


@Configuration
@IntegrationComponentScan
@EnableIntegration
public class WorkerBatchConfiguration {

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL("tcp://localhost:61616");
        factory.setTrustAllPackages(Boolean.TRUE);
        return factory;
    }

    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow jmsIn() {
        return IntegrationFlows
                .from(Jms
                        .messageDrivenChannelAdapter(connectionFactory())
                        .configureListenerContainer(c -> c.subscriptionDurable(false))
                        .destination("requests"))
                .channel(requests())
                .get();
    }

    @Bean
    public IntegrationFlow outgoingReplies() {
        return IntegrationFlows
                .from("replies")
                .handle(Jms
                        .outboundGateway(connectionFactory())
                        .requestDestination("replies"))
                .get();
    }

    @Bean
    @ServiceActivator(inputChannel = "requests", outputChannel = "replies", sendTimeout = "10000")
    public ChunkProcessorChunkHandler<Record> chunkProcessorChunkHandler() {
        ChunkProcessorChunkHandler chunkProcessorChunkHandler = new ChunkProcessorChunkHandler();
        chunkProcessorChunkHandler.setChunkProcessor(new SimpleChunkProcessor(recordProcessor(), paymentWriter()));
        return chunkProcessorChunkHandler;
    }

    @Bean
    public SimpleRecordProcessor recordProcessor() {
        return new SimpleRecordProcessor();
    }

    @Bean
    public PaymentWriter paymentWriter() {
        return new PaymentWriter();
    }

}
2018-09-17 13:15:21.509 DEBUG 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter              : 

FINALIZADO CHUNK  ============================


2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-4] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Culver Gapper, bic=ES08 9240 0446 6617 7749 9525, amount=75189, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Burgess Feldbau, bic=ES62 6361 1904 4990 0753 3877, amount=null, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-0] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Brenda Waddell, bic=ES23 4535 5585 5095 5691 1491, amount=28353, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-6] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Brittaney Bliben, bic=ES88 3076 6115 5504 4561 1796, amount=86995, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-0] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Hortensia Willshee, bic=ES62 7020 0819 9813 3352 2742, amount=null, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-4] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Roselin Maccrie, bic=ES34 5876 6541 1999 9568 8714, amount=29865, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Jonathan Parlet, bic=ES74 5605 5066 6941 1376 6204, amount=null, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Ilise Semiras, bic=ES59 4689 9344 4052 2235 5296, amount=10698, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-5] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Audrey Lempenny, bic=ES45 2456 6470 0023 3823 3629, amount=56543, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-2] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Hayyim Fetter, bic=ES76 5134 4202 2267 7072 2547, amount=14662, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter              : 

==============================================


2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] o.s.integration.channel.DirectChannel    : preSend on channel 'replies', message: GenericMessage [payload=ChunkResponse: jobId=1, sequence=0, stepContribution=[StepContribution: read=0, written=10, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={jms_redelivered=true, JMSXDeliveryCount=2, jms_destination=queue://requests, id=16120698-22b4-c615-502b-7c6d050d82c6, priority=4, jms_timestamp=1537182878228, jms_messageId=ID:mbp-de-jose.neoris.cxnetworks.net-59228-1537182877872-1:2:1:1:1, timestamp=1537182921510}]
2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] o.s.integration.jms.JmsOutboundGateway   : outgoingReplies.org.springframework.integration.jms.JmsOutboundGateway#0 received message: GenericMessage [payload=ChunkResponse: jobId=1, sequence=0, stepContribution=[StepContribution: read=0, written=10, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={jms_redelivered=true, JMSXDeliveryCount=2, jms_destination=queue://requests, id=16120698-22b4-c615-502b-7c6d050d82c6, priority=4, jms_timestamp=1537182878228, jms_messageId=ID:mbp-de-jose.neoris.cxnetworks.net-59228-1537182877872-1:2:1:1:1, timestamp=1537182921510}]
2018-09-17 13:15:21.514 DEBUG 75729 --- [erContainer#0-1] o.s.integration.jms.JmsOutboundGateway   : ReplyTo: temp-queue://ID:mbp-de-jose.neoris.cxnetworks.net-58907-1537181494749-3:24:1
2018-09-17 13:15:26.536  WARN 75729 --- [erContainer#0-1] o.s.j.l.DefaultMessageListenerContainer  : Execution of JMS message listener failed, and no ErrorHandler has been set.

org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
    at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:762) ~[spring-integration-jms-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:150) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:142) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:415) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener$GatewayDelegate.send(ChannelPublishingJmsMessageListener.java:511) ~[spring-integration-jms-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:341) ~[spring-integration-jms-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

共有1个答案

禄和宜
2023-03-14

我认为您的问题出在工作人员这边:

@Bean
public IntegrationFlow outgoingReplies() {
    return IntegrationFlows
            .from("replies")
            .handle(Jms
                    .outboundGateway(connectionFactory())
                    .requestDestination("replies"))
            .get();
}

你只是发送回复,除了主人的任何东西之外,你没有。

必须是单程的

Jms
  .outboundAdapter(connectionFactory())
  .destination("replies")`
 类似资料:
  • 我正在尝试将BeanIO与spring Batch集成。使用BeanIO,我正在读取一个固定长度的流文件。我已经测试并验证了使用独立类读取平面文件的代码,它可以无缝地工作,但是当我试图将它与Spring Batch集成时,BeanIOFlatFileItemReader的doRead()方法没有被调用,而且我编写的RedemptionEventCustomProcessor是如何直接被调用的。 我

  • 我有一个spring批处理应用程序,它从文件中读取数据,进行一些处理,最后编写一个定制的输出。这一切都是一步到位的。在下一步中,我将使用一个tasklet来归档输入文件(移动到另一个文件夹)。这个应用程序运行良好。但是,现在我需要在远程服务器上对sftp输出文件进行进一步处理。我找到了一种使用spring integration实现sftp的方法,在这里我创建了一个输入通道,该通道将反馈给outb

  • 我正在将Spring Boot项目与Spring批处理和数据jpa项目集成。所有与作业和数据配置相关的东西都是正确的,除了将我的作业编写器结果保存在数据库中。在我读取文件并对其进行处理后,我无法将其写入mysql数据库。没有错误,但也没有插入。有趣的是我的数据源已配置。因为在插入之前,我可以从数据库中获取示例记录。请协助我解决这个问题。 我的申请。属性: 批次配置: 道类: 作家类: temPer

  • 我正在寻找一些关于测试Spring批处理步骤和步骤执行的一般性意见和建议。 我的基本步骤是从api读入数据,处理实体对象,然后写入数据库。我已经测试了快乐之路,这一步成功地完成了。我现在想做的是在处理器阶段数据丢失时测试异常处理。我可以单独测试processor类,但我更愿意测试整个步骤,以确保在步骤/作业级别正确反映流程故障。 我已经阅读了spring批量测试指南,如果我是诚实的,我对它有点迷茫

  • 我希望能够使用在作业配置中定义的以相同的参数(基本上是相同的文件)重新启动作业(可能是因为应用程序已经重新启动,或者由于某些原因我们再次收到了相同的文件)。 不幸的是,run.id=1没有增加,我得到一个 作业配置 多谢

  • 我需要从远程SFTP服务器下载一个文件,并使用spring batch处理它们。我已经实现了使用Spring集成下载文件的代码。但我无法从Spring集成组件启动Spring批处理作业。我有以下代码: 但这不起作用(上一个方法中的错误),因为找不到文件类型的bean。我不能把这两部分连在一起。如何连接集成和批处理?