当前位置: 首页 > 知识库问答 >
问题:

使用Spring集成Java DSL建议失败的转换的正确方法是什么

秦景同
2023-03-14

我已经完成了一条“快乐之路”(如下)。

如何向提供建议。转换调用以使其调用错误流(通过errorChannel),而不中断主流

当前,mainFlow在第二个中出现第一个故障时终止。转换(当有效负载无法反序列化为类型时)。我想要的行为是我想记录并继续处理。

我已经读过关于表达式评估RequestHandlerAdvice。我是否可以为每个添加第二个参数。转换call likee-

在一些注释方向上,我更新了原始代码示例。但我还是很难“一路回家”。


2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG handler 'ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0)' produced no reply for request Message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]])
 at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]])
 at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'mainFlow.channel#3', message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.aggregator.AggregatingMessageHandler DEBUG org.springframework.integration.aggregator.AggregatingMessageHandler#0 received message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}]
2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}]
2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}]
2015-09-08 11:49:19,665 [pool-3-thread-1] com.xxx.DataMigrationModule$ErrorService ERROR org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:48)
    at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:92)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Null correlation not allowed.  Maybe the CorrelationStrategy is failing?
    at org.springframework.util.Assert.state(Assert.java:385)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:369)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    ... 22 more

更新(09-08-2015)

代码示例

@Bean
public IntegrationFlow mainFlow() {
    // @formatter:off
    return IntegrationFlows
            .from(
                    amazonS3InboundSynchronizationMessageSource(),
                    e -> e.poller(p -> p.trigger(this::nextExecutionTime))
            )
            .transform(unzipTransformer())
            .split(f -> new FileSplitter())
            .channel(MessageChannels.executor(Executors.newCachedThreadPool()))
            .transform(Transformers.fromJson(persistentType()), , e -> e.advice(handlingAdvice()))
            // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
            .aggregate(a -> 
                            a.releaseStrategy(g -> g.size() == persistenceBatchSize)
                            .expireGroupsUponCompletion(true)
                            .sendPartialResultOnExpiry(true)
                            .groupTimeoutExpression("size() ge 2 ? 10000 : -1")
                            , null
            )
            .handle(jdbcRepositoryHandler())
            // TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
            .get();
    // @formatter:on
}

@Bean
public ErrorService errorService() {
    return new ErrorService();
}

@Bean
public MessageChannel customErrorChannel() {
    return MessageChannels.direct().get();
}

@Bean
public IntegrationFlow customErrorFlow() {
    // @formatter:off
    return IntegrationFlows
            .from(customErrorChannel())
            .handle("errorService", "handleError")
            .get();
    // @formatter:on
}

@Bean
ExpressionEvaluatingRequestHandlerAdvice handlingAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnFailureExpression("payload");
    advice.setFailureChannel(customErrorChannel());
    advice.setReturnFailureExpressionResult(true);
    advice.setTrapException(true);
    return advice;
}

protected class ErrorService implements ErrorHandler {

    private final Logger log = LoggerFactory.getLogger(getClass());

    @Override
    public void handleError(Throwable t) {
        stopEndpoints(t);
    }

    private void stopEndpoints(Throwable t) {
        log.error(ExceptionUtils.getStackTrace(t));
    }

}

共有2个答案

华哲茂
2023-03-14

是的,你说得对。对于adviceTransformer的MessageHandler的handle()方法,您应该使用的第二个参数的e.advice方法。transform()EIP方法。是的:您应该为自己的目的定义expressionevaluationrequesthandleradvicebean

您可以为不同的目标重用Advicebean,以相同的方式处理成功和失败。

使现代化

虽然我不清楚您希望如何使用错误的消息继续流,但您可以使用expressionevaluationrequesthandleradvice的onFailureExpressionreturnFailureExpressionResult=trueunzipErrorChannel()之后返回一些内容。

顺便说一句,failureChannel逻辑在没有onFailureExpression的情况下无法工作:

if (this.onFailureExpression != null) {
    Object evalResult = this.evaluateFailureExpression(message, actualException);
    if (this.returnFailureExpressionResult) {
        return evalResult;
    }
}

方坚壁
2023-03-14

结果发现我在一些地方做错了,比如:

>

  • 我必须自动连接一个Jackson2ObjectMapper(我从Sprint Boot auto-config获得),并构造一个JsonObjectMapper的实例,作为Transformers中的第二个参数添加。fromJson;用于更轻松地对持久类型进行解组(停止无法识别的属性异常);因此,无需使用表达式评估RequestHandlerAdvice

    为了使用FileSplitter,您可以在[code]中选择[code]的适当变体。>InputStream

    移动转换聚合处理到自己的pubsub通道,使用异步任务执行器

    仍然不是我需要的100%,但还远着呢。

    看看下面我的结局。。。

    @Configuration
    @EnableIntegration
    @IntegrationComponentScan
    public class DataMigrationModule {
    
    private final Logger log = LoggerFactory.getLogger(getClass());
    
    @Value("${cloud.aws.credentials.accessKey}")
    private String accessKey;
    
    @Value("${cloud.aws.credentials.secretKey}")
    private String secretKey;
    
    @Value("${cloud.aws.s3.bucket}")
    private String bucket;
    
    @Value("${cloud.aws.s3.max-objects-per-batch:1024}")
    private int maxObjectsPerBatch;
    
    @Value("${cloud.aws.s3.accept-subfolders:false}")
    private String acceptSubFolders;
    
    @Value("${cloud.aws.s3.remote-directory}")
    private String remoteDirectory;
    
    @Value("${cloud.aws.s3.local-directory-ref:java.io.tmpdir}")
    private String localDirectoryRef;
    
    @Value("${cloud.aws.s3.local-subdirectory:target/s3-dump}")
    private String localSubdirectory;
    
    @Value("${cloud.aws.s3.filename-wildcard:}")
    private String fileNameWildcard;
    
    @Value("${app.persistent-type:}")
    private String persistentType;
    
    @Value("${app.repository-type:}")
    private String repositoryType;
    
    @Value("${app.persistence-batch-size:2500}")
    private int persistenceBatchSize;
    
    @Value("${app.persistence-batch-release-timeout-in-milliseconds:5000}")
    private int persistenceBatchReleaseTimeoutMillis;
    
    @Autowired
    private ListableBeanFactory beanFactory;
    
    @Autowired
    private ObjectMapper objectMapper;
    
    private final AtomicBoolean invoked = new AtomicBoolean();
    
    private Class<?> repositoryType() {
        try {
            return Class.forName(repositoryType);
        } catch (ClassNotFoundException cnfe) {
            log.error("Unknown repository implementation!", cnfe);
            System.exit(0);
        }
        return null;
    }
    
    private Class<?> persistentType() {
        try {
            return Class.forName(persistentType);
        } catch (ClassNotFoundException cnfe) {
            log.error("Unsupported type!", cnfe);
            System.exit(0);
        }
        return null;
    }
    
    public Date nextExecutionTime(TriggerContext triggerContext) {
        return this.invoked.getAndSet(true) ? null : new Date();
    }
    
    @Bean
    public FileToInputStreamTransformer unzipTransformer() {
        FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
        transformer.setDeleteFiles(true);
        return transformer;
    }
    
    @Bean
    public MessageSource<?> amazonS3InboundSynchronizationMessageSource() {
        AWSCredentials credentials = new BasicAWSCredentials(this.accessKey, this.secretKey);
        AmazonS3InboundSynchronizationMessageSource messageSource = new AmazonS3InboundSynchronizationMessageSource();
        messageSource.setCredentials(credentials);
        messageSource.setBucket(bucket);
        messageSource.setMaxObjectsPerBatch(maxObjectsPerBatch);
        messageSource.setAcceptSubFolders(Boolean.valueOf(acceptSubFolders));
        messageSource.setRemoteDirectory(remoteDirectory);
        if (!fileNameWildcard.isEmpty()) {
            messageSource.setFileNameWildcard(fileNameWildcard);
        }
        String directory = System.getProperty(localDirectoryRef);
        if (!localSubdirectory.startsWith("/")) {
            localSubdirectory = "/" + localSubdirectory;
        }
        if (!localSubdirectory.endsWith("/")) {
            localSubdirectory = localSubdirectory + "/";
        }
        directory = directory + localSubdirectory;
        FileUtils.mkdir(directory);
        messageSource.setDirectory(new LiteralExpression(directory));
        return messageSource;
    }
    
    @Bean
    public IntegrationFlow mainFlow() {
        // @formatter:off
        return IntegrationFlows
                .from(
                        amazonS3InboundSynchronizationMessageSource(),
                        e -> e.poller(p -> p.trigger(this::nextExecutionTime))
                )
                .transform(unzipTransformer())
                .split(new FileSplitter(), null)
                .publishSubscribeChannel(new SimpleAsyncTaskExecutor(), p -> p.subscribe(persistenceSubFlow()))
                .get();
        // @formatter:on
    }
    
    @Bean
    public IntegrationFlow persistenceSubFlow() {
        JsonObjectMapper<?, ?> jsonObjectMapper = new Jackson2JsonObjectMapper(objectMapper);
        ReleaseStrategy releaseStrategy = new TimeoutCountSequenceSizeReleaseStrategy(persistenceBatchSize,
                persistenceBatchReleaseTimeoutMillis);
        // @formatter:off
        return f -> f
                .transform(Transformers.fromJson(persistentType(), jsonObjectMapper))
                // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
                .aggregate(
                        a -> a
                            .releaseStrategy(releaseStrategy)
                            .correlationStrategy(m -> m.getHeaders().get("id"))
                            .expireGroupsUponCompletion(true)
                            .sendPartialResultOnExpiry(true)
                            , null
                )
                .handle(jdbcRepositoryHandler());
        // @formatter:on
    }
    
    @Bean
    public JdbcRepositoryHandler jdbcRepositoryHandler() {
        return new JdbcRepositoryHandler(repositoryType(), beanFactory);
    }
    
    protected class JdbcRepositoryHandler extends AbstractMessageHandler {
    
        @SuppressWarnings("rawtypes")
        private Insertable repository;
    
        public JdbcRepositoryHandler(Class<?> repositoryClass, ListableBeanFactory beanFactory) {
            repository = (Insertable<?>) beanFactory.getBean(repositoryClass);
        }
    
        @Override
        protected void handleMessageInternal(Message<?> message) {
            repository.insert((List<?>) message.getPayload());
        }
    
    }
    
    protected class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
    
        @Override
        protected InputStream transformFile(File payload) throws Exception {
            return new GZIPInputStream(new FileInputStream(payload));
        }
    }
    
    }
    

  •  类似资料:
    • 问题内容: 即使在使用Java Swing一年以上之后,对我来说,它仍然像魔术一样。如何正确使用BufferStrategy,尤其是方法? 我想添加一个JFrame和一个Canvas,然后进行绘制。我还希望能够调整()画布的大小。每次我调整Canvas的大小时,似乎都会被浪费掉,或者变得毫无用处,因为在上使用并没有真正做任何事情。另外,它具有怪异的不确定性行为,我不知道如何正确同步它。 这就是我的

    • 问题内容: 我想在Linux上使用该机制。我希望我的应用程序知道何时更改了文件。能否请您提供给我一个示例,该怎么做? 问题答案: 文档(来自具有inotify的Monitor文件系统活动) 在C API 提供了三个系统调用来构建各种文件系统监视器: 在内核中创建子系统的实例,并在成功和失败时返回文件描述符。与其他系统调用一样,如果失败,请检查诊断。 顾名思义,它增加了一块 手表 。每个监视都必须提

    • 问题内容: 需要帮助,以了解如何在UIKit中使用prepareForReuse()。该文件说 您只应重置与内容无关的单元格属性,例如Alpha,编辑和选择状态 但是如何重置单个属性属性(例如isHidden)呢? 假设我的单元格有2个标签,我应该在哪里重置: 标签文本 label.numberOfLines label.isHidden 我的tableView(_:cellForRowAt :)

    • 一段时间以来,我一直试图将我的tableview工作作为一种电子表格,通过背景线程进行更新,当单元格更新时,它会亮起几秒钟(更改样式),然后返回到原始样式。我已经知道,我不能直接在表格单元格中存储和设置样式,我需要某种支持类来保存这些数据。但是tableview“重用”单元格(使用相同的单元格处理不同的数据)的行为真的很奇怪。当所有单元格都适合屏幕时,它对我来说完美无瑕,但一旦我放置大约100个单

    • 问题内容: 我是Hibernate的新手,并且正在编写一个简单的方法来返回与特定过滤器匹配的对象列表。似乎是自然的回报类型。 不管我做什么,除非雇用了ugly,否则我似乎都无法使编译器满意。 我想摆脱它。但是如果我这样做,我会得到警告 (我可以忽略它,但是我不想一开始就得到它),如果我删除泛型以符合返回类型,则会收到警告 我注意到 确实 声明了;但它是完全不同的类型- 返回a 作为原始类型。我发现

    • 当msg处理抛出异常时,如何有效地支持JMS重新交付? 我有一个使用JMS(ActiveMQ)的流,它具有配置为允许n次重新传递尝试的连接工厂。 我希望在处理msg时出现任何错误,导致msg在connectionFactory配置允许的情况下被放回重新交付,然后在最大重新交付尝试用尽时,交付给DLQ。与AMQ保持一致。 对一个相关SO问题的回答意味着我可能会有一个重新抛出的错误通道,它应该触发重新