当前位置: 首页 > 知识库问答 >
问题:

如何在Spring与Java DSL集成流中更改默认通道

季华茂
2023-03-14

我不是很理解,因为我可以更改所有集成流的错误通道。我需要处理像InvalidAccessTokenException这样的异常,它们可以在路由器内部的子流中抛出。

我所尝试的是通过以下方式处理来自默认通道“ErrorChannel”的异常:

@Bean
public IntegrationFlow errorFlow() {
   return IntegrationFlows.from("errorChannel")
       .handle("errorService", "handleError")
       .get();
}

此错误由具有以下签名的方法处理:

void handleError(Message<Exception> exception)

集成流的配置我解释如下:

@Configuration
@IntegrationComponentScan
public class InfrastructureConfiguration {

    private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);

    @Autowired
    private IFacebookService facebookService;

    @Autowired
    private IInstagramService instagramService;

    @Autowired
    private IYoutubeService youtubeService;

    /**
     * The Pollers builder factory can be used to configure common bean definitions or 
     * those created from IntegrationFlowBuilder EIP-methods
     */
    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        return executor;
    }

    /**
     * MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload 
     * which is the result of execution of a Query
     */
    @Bean
    @Autowired
    public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
        MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
        messageSource.setExpectSingleResult(false);
        messageSource.setEntityClass(UserEntity.class);
        messageSource.setCollectionNameExpression(new LiteralExpression("users"));
        return messageSource;
    }

    @Bean
    @ServiceActivator(inputChannel = "storeChannel")
    public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception {
        MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo);
        adapter.setCollectionNameExpression(new LiteralExpression("comments"));
        return adapter;
    }

    @Bean
    public IntegrationFlow errorFlow() {
        return IntegrationFlows.from("errorChannel")
                .handle("errorService", "handleError")
                .get();
    }


    @Bean
    @Autowired
    public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
        return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
                .<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList
                        -> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia))
                )
                .split(new AbstractMessageSplitter() {
                    @Override
                    protected Object splitMessage(Message<?> msg) {
                        return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet();
                    }
                })
                .channel("directChannel_1")
                .enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")))
                .split(new AbstractMessageSplitter() {
                    @Override
                    protected Object splitMessage(Message<?> msg) {
                        return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue();
                    }
                })
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
                .<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(),
                        m
                        -> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK, 
                                sf -> sf.handle(SocialMediaEntity.class, (p, h) -> facebookService.getComments(p.getAccessToken())))
                            .subFlowMapping(SocialMediaTypeEnum.YOUTUBE, 
                                sf -> sf.handle(SocialMediaEntity.class, (p, h) -> youtubeService.getComments(p.getAccessToken())))
                            .subFlowMapping(SocialMediaTypeEnum.INSTAGRAM, 
                                sf -> sf.handle(SocialMediaEntity.class, (p, h) -> instagramService.getComments(p.getAccessToken())))
                )
                .channel("directChannel_2")
                .aggregate()
                .channel("directChannel_3")
                .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
                        comments.stream().flatMap(List::stream).collect(Collectors.toList()))
                .aggregate()
                .channel("directChannel_4")
                .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
                        comments.stream().flatMap(List::stream).collect(Collectors.toList()))
                .channel("storeChannel")
                .get();
    }


    @PostConstruct
    protected void init(){
        Assert.notNull(facebookService, "The Facebook Service can not be null");
        Assert.notNull(instagramService, "The Instagram Service can not be null");
        Assert.notNull(youtubeService, "The Youtube Service can not be null");
    }

}

可以在任何社交网络服务中启动的异常示例如下:

public class InvalidAccessTokenException extends RuntimeException {

    private SocialMediaTypeEnum socialMediaType;
    private String accessToken;

    public InvalidAccessTokenException(SocialMediaTypeEnum socialMediaType, String accessToken) {
        this.socialMediaType = socialMediaType;
        this.accessToken = accessToken;
    }

    public SocialMediaTypeEnum getSocialMediaType() {
        return socialMediaType;
    }

    public String getAccessToken() {
        return accessToken;
    }
}

是否可能将此异常绑定到特定的错误通道?。

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
    return Pollers.fixedDelay(10, TimeUnit.SECONDS)
          .errorChannel("customErrorChannel")
         .get();
}

以下是日志消息的节选:

2017-07-25 20:20:51.922 DEBUG 3268 --- [ taskExecutor-4] o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e20089, accessToken=maite_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20094, correlationId=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}]
2017-07-25 20:20:51.923 DEBUG 3268 --- [ taskExecutor-5] o.s.integration.handler.LoggingHandler   : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008c, accessToken=david_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 4, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20095, correlationId=254f918f-52d2-1cff-7ba5-7343a39a8941, id=3a5989df-2af7-56d0-2697-5fcaf75a2891, timestamp=1501006851527}], headers={id=c270d1ef-68ac-cb6b-245f-11e567b5b7e8, timestamp=1501006851922}]
2017-07-25 20:20:51.922 DEBUG 3268 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler   : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008f, accessToken=elena_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20096, correlationId=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}], headers={id=30a965d3-f829-d6cf-c971-7b4ed2f15f88, timestamp=1501006851922}]
2017-07-25 20:20:51.923 DEBUG 3268 --- [ taskExecutor-4] o.s.integration.handler.LoggingHandler   : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e20089, accessToken=maite_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20094, correlationId=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}]
2017-07-25 20:20:51.927 ERROR 3268 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008f, accessToken=elena_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20096, correlationId=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}]
    at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:130)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)

我尝试了两种新的方法来更改默认错误通道:

>

  • 创建带有错误通道的自定义ErrorHandler并在PoolSpec中声明它:

    错误消息仍将转到默认通道'error channel'。

    >

  • 添加MessageHeaders.error_channel标头以显式指示错误通道:

    .ENRICHEADERS(S->s.HeaderExpressions(H->h.PUT(“user-id”,“payload.key”)).Header(MessageHeaders.error_channel,“CustomErrorChannel”))

    如果这种方法有效,错误消息将被定向到“CustomErrorChannel”。

  • 共有1个答案

    鞠鸿雪
    2023-03-14

    是的,你能做到的。类似任务有ErrorMessageExceptionTyperOuter。因此,您将能够将InvalidAccessTokenException路由到特定通道。

    还要注意,pollerspec可以随errorchannel()一起提供,因此您不需要担心所有异常都转到默认的errorchannel

    更新

    .channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
    

    ExecutorChannel具有以下逻辑:

        if (!(this.executor instanceof ErrorHandlingTaskExecutor)) {
            ErrorHandler errorHandler = new MessagePublishingErrorHandler(
                    new BeanFactoryChannelResolver(this.getBeanFactory()));
            this.executor = new ErrorHandlingTaskExecutor(this.executor, errorHandler);
        }
    

    其中MessagePublishingErrorHandler默认情况下实际上基于ErrorChannel。这里可以做的是为TaskExecutor()bean声明类似的bean,并将CustomErrorChannel注入到MessagePublishingErrorHandler中。

    在此应与MessagePublishingErrorHandler一起工作的另一个选项是ErrorChannel定义上游的ErrorChannel头填充。

     类似资料:
    • 在spring integration (Java DSL)中,如何定义一个完整流程的事务? 通过Spring集成,我们可以定义一个示例流程: 我需要一个跨度整个流程的交易。目前,当我使用“aMessage转换器”访问数据库时,事务将在处理完此消息转换器后关闭。但是我需要一个在处理“另一个消息转换器”时仍未提交的事务? 我希望只需添加一个“@Transactional”(或@Transaction

    • 我有一个已经使用Spring集成(5.1.6最新)的应用程序。配置如下流: 和 一切正常。 如果我试图在初始化的上下文中找到bean,我会看到下一个转换器: 之后,我添加到我的pom依赖项Spring Cloud Stream 2.1.2依赖项和Kinesis Binder 1.2.0。默认情况下配置绑定。 应用程序启动,但当我试图处理现有流时,它失败了,原因如下: EL1004E:方法调用:在p

    • 问题内容: 有一个MySQL 具有以下定义: 我想将此表的更改为。怎么做 ? 问题答案: 如果要将表和所有字符列更改为新字符集,请使用如下语句: 因此查询将是:

    • 问题内容: 尝试更改列的数据类型并设置新的默认值时遇到以下错误: 错误1064(42000):您的SQL语法有错误;检查与您的MySQL服务器版本相对应的手册,以在第1行的’VARCHAR(255)NOT NULL SET DEFAULT’{}’‘附近使用正确的语法 问题答案: 同样的第二种可能性(感谢juergen_d):

    • 在尝试更改列的数据类型并设置新的默认值时,我遇到以下错误: 错误1064(42000):您的SQL语法中有错误;查看与您的MySQL server版本相对应的手册,以了解第1行“varchar(255)NOT NULL SET DEFAULT”{}“附近使用的正确语法

    • 自从我在电脑上运行了windows修复程序后,我现在遇到了一个非常烦人的问题。我的wsl远程vscode由于一些我在网上找不到的原因,无法在终端中打开当前文件夹。而是打开vscode appdata windows文件夹(/mnt/c/Users/Jonathan/AppData/Local/Programs/Microsoft VS Code),如下图所示: 我当前的工作区 在集成终端结果中打开