我不是很理解,因为我可以更改所有集成流的错误通道。我需要处理像InvalidAccessTokenException这样的异常,它们可以在路由器内部的子流中抛出。
我所尝试的是通过以下方式处理来自默认通道“ErrorChannel”的异常:
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from("errorChannel")
.handle("errorService", "handleError")
.get();
}
此错误由具有以下签名的方法处理:
void handleError(Message<Exception> exception)
集成流的配置我解释如下:
@Configuration
@IntegrationComponentScan
public class InfrastructureConfiguration {
private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);
@Autowired
private IFacebookService facebookService;
@Autowired
private IInstagramService instagramService;
@Autowired
private IYoutubeService youtubeService;
/**
* The Pollers builder factory can be used to configure common bean definitions or
* those created from IntegrationFlowBuilder EIP-methods
*/
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
/**
* MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload
* which is the result of execution of a Query
*/
@Bean
@Autowired
public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
messageSource.setExpectSingleResult(false);
messageSource.setEntityClass(UserEntity.class);
messageSource.setCollectionNameExpression(new LiteralExpression("users"));
return messageSource;
}
@Bean
@ServiceActivator(inputChannel = "storeChannel")
public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception {
MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo);
adapter.setCollectionNameExpression(new LiteralExpression("comments"));
return adapter;
}
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from("errorChannel")
.handle("errorService", "handleError")
.get();
}
@Bean
@Autowired
public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
.<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList
-> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia))
)
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet();
}
})
.channel("directChannel_1")
.enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")))
.split(new AbstractMessageSplitter() {
@Override
protected Object splitMessage(Message<?> msg) {
return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue();
}
})
.channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
.<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(),
m
-> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK,
sf -> sf.handle(SocialMediaEntity.class, (p, h) -> facebookService.getComments(p.getAccessToken())))
.subFlowMapping(SocialMediaTypeEnum.YOUTUBE,
sf -> sf.handle(SocialMediaEntity.class, (p, h) -> youtubeService.getComments(p.getAccessToken())))
.subFlowMapping(SocialMediaTypeEnum.INSTAGRAM,
sf -> sf.handle(SocialMediaEntity.class, (p, h) -> instagramService.getComments(p.getAccessToken())))
)
.channel("directChannel_2")
.aggregate()
.channel("directChannel_3")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.aggregate()
.channel("directChannel_4")
.<List<List<CommentEntity>>, List<CommentEntity>>transform(comments ->
comments.stream().flatMap(List::stream).collect(Collectors.toList()))
.channel("storeChannel")
.get();
}
@PostConstruct
protected void init(){
Assert.notNull(facebookService, "The Facebook Service can not be null");
Assert.notNull(instagramService, "The Instagram Service can not be null");
Assert.notNull(youtubeService, "The Youtube Service can not be null");
}
}
可以在任何社交网络服务中启动的异常示例如下:
public class InvalidAccessTokenException extends RuntimeException {
private SocialMediaTypeEnum socialMediaType;
private String accessToken;
public InvalidAccessTokenException(SocialMediaTypeEnum socialMediaType, String accessToken) {
this.socialMediaType = socialMediaType;
this.accessToken = accessToken;
}
public SocialMediaTypeEnum getSocialMediaType() {
return socialMediaType;
}
public String getAccessToken() {
return accessToken;
}
}
是否可能将此异常绑定到特定的错误通道?。
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(10, TimeUnit.SECONDS)
.errorChannel("customErrorChannel")
.get();
}
以下是日志消息的节选:
2017-07-25 20:20:51.922 DEBUG 3268 --- [ taskExecutor-4] o.s.i.channel.PublishSubscribeChannel : preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e20089, accessToken=maite_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20094, correlationId=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}]
2017-07-25 20:20:51.923 DEBUG 3268 --- [ taskExecutor-5] o.s.integration.handler.LoggingHandler : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008c, accessToken=david_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 4, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20095, correlationId=254f918f-52d2-1cff-7ba5-7343a39a8941, id=3a5989df-2af7-56d0-2697-5fcaf75a2891, timestamp=1501006851527}], headers={id=c270d1ef-68ac-cb6b-245f-11e567b5b7e8, timestamp=1501006851922}]
2017-07-25 20:20:51.922 DEBUG 3268 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008f, accessToken=elena_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20096, correlationId=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}], headers={id=30a965d3-f829-d6cf-c971-7b4ed2f15f88, timestamp=1501006851922}]
2017-07-25 20:20:51.923 DEBUG 3268 --- [ taskExecutor-4] o.s.integration.handler.LoggingHandler : _org.springframework.integration.errorLogger.handler received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e20089, accessToken=maite_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 2, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20094, correlationId=6665f8ee-2bc9-e6c0-17de-35d63a0afaaa, id=6925b3ea-0b30-3304-b7f0-d235959d7db1, timestamp=1501006851466}], headers={id=6eb1789c-21d7-1814-48ee-77553abd99b9, timestamp=1501006851922}]
2017-07-25 20:20:51.927 ERROR 3268 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: nested exception is sanchez.sanchez.sergio.exception.InvalidAccessTokenException, failedMessage=GenericMessage [payload=SocialMediaEntity{id=59778bf93681ac0cc4e2008f, accessToken=elena_access_token_facebook, type=FACEBOOK, invalidToken=false}, headers={sequenceNumber=1, sequenceDetails=[[35dd7519-59cd-f0ea-69b2-c5fbb7c1c57f, 5, 5]], mongo_collectionName=users, sequenceSize=3, user-id=59778bf93681ac0cc4e20096, correlationId=1aba58bf-733e-f1ed-a82e-67f482ff3531, id=5eb9c517-f451-e2d6-938d-e436bb362aef, timestamp=1501006851549}]
at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:130)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)
我尝试了两种新的方法来更改默认错误通道:
>
创建带有错误通道的自定义ErrorHandler并在PoolSpec中声明它:
错误消息仍将转到默认通道'error channel'。
>
添加MessageHeaders.error_channel标头以显式指示错误通道:
.ENRICHEADERS(S->s.HeaderExpressions(H->h.PUT(“user-id”,“payload.key”)).Header(MessageHeaders.error_channel,“CustomErrorChannel”))
如果这种方法有效,错误消息将被定向到“CustomErrorChannel”。
是的,你能做到的。类似任务有ErrorMessageExceptionTyperOuter
。因此,您将能够将InvalidAccessTokenException
路由到特定通道。
还要注意,pollerspec
可以随errorchannel()
一起提供,因此您不需要担心所有异常都转到默认的errorchannel
。
更新
.channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
ExecutorChannel
具有以下逻辑:
if (!(this.executor instanceof ErrorHandlingTaskExecutor)) {
ErrorHandler errorHandler = new MessagePublishingErrorHandler(
new BeanFactoryChannelResolver(this.getBeanFactory()));
this.executor = new ErrorHandlingTaskExecutor(this.executor, errorHandler);
}
其中MessagePublishingErrorHandler
默认情况下实际上基于ErrorChannel
。这里可以做的是为TaskExecutor()
bean声明类似的bean,并将CustomErrorChannel
注入到MessagePublishingErrorHandler
中。
在此应与MessagePublishingErrorHandler
一起工作的另一个选项是ErrorChannel
定义上游的ErrorChannel
头填充。
在spring integration (Java DSL)中,如何定义一个完整流程的事务? 通过Spring集成,我们可以定义一个示例流程: 我需要一个跨度整个流程的交易。目前,当我使用“aMessage转换器”访问数据库时,事务将在处理完此消息转换器后关闭。但是我需要一个在处理“另一个消息转换器”时仍未提交的事务? 我希望只需添加一个“@Transactional”(或@Transaction
我有一个已经使用Spring集成(5.1.6最新)的应用程序。配置如下流: 和 一切正常。 如果我试图在初始化的上下文中找到bean,我会看到下一个转换器: 之后,我添加到我的pom依赖项Spring Cloud Stream 2.1.2依赖项和Kinesis Binder 1.2.0。默认情况下配置绑定。 应用程序启动,但当我试图处理现有流时,它失败了,原因如下: EL1004E:方法调用:在p
问题内容: 有一个MySQL 具有以下定义: 我想将此表的更改为。怎么做 ? 问题答案: 如果要将表和所有字符列更改为新字符集,请使用如下语句: 因此查询将是:
问题内容: 尝试更改列的数据类型并设置新的默认值时遇到以下错误: 错误1064(42000):您的SQL语法有错误;检查与您的MySQL服务器版本相对应的手册,以在第1行的’VARCHAR(255)NOT NULL SET DEFAULT’{}’‘附近使用正确的语法 问题答案: 同样的第二种可能性(感谢juergen_d):
在尝试更改列的数据类型并设置新的默认值时,我遇到以下错误: 错误1064(42000):您的SQL语法中有错误;查看与您的MySQL server版本相对应的手册,以了解第1行“varchar(255)NOT NULL SET DEFAULT”{}“附近使用的正确语法
自从我在电脑上运行了windows修复程序后,我现在遇到了一个非常烦人的问题。我的wsl远程vscode由于一些我在网上找不到的原因,无法在终端中打开当前文件夹。而是打开vscode appdata windows文件夹(/mnt/c/Users/Jonathan/AppData/Local/Programs/Microsoft VS Code),如下图所示: 我当前的工作区 在集成终端结果中打开