我已经完成了一条“快乐之路”(如下)。
如何向提供建议。转换
调用以使其调用错误流(通过errorChannel),而不中断主流
?
当前,mainFlow
在第二个中出现第一个故障时终止。转换(当有效负载无法反序列化为类型时)。我想要的行为是我想记录并继续处理。
我已经读过关于
表达式评估RequestHandlerAdvice
。我是否可以为每个添加第二个参数。转换
call likee-
在一些注释方向上,我更新了原始代码示例。但我还是很难“一路回家”。
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG handler 'ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0)' produced no reply for request Message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]])
at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException: Handler Failed; nested exception is org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "hasDoaCostPriceChanged" (class com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog), not marked as ignorable (18 known properties: "supplierUpdateDate", "fPLOSMaskArrival", "createDate", "endAllowed", "sellStateId", "ratePlanLevel", "ratePlanId", "startAllowed", "stayDate", "doaCostPriceChanged", "hotelId", "logActionTypeId" [truncated]])
at [Source: java.util.zip.GZIPInputStream@242017b8; line: 1, column: 32] (through reference chain: com.xxx.changehistory.jdbc.data.RatePlanLevelRestrictionLog["hasDoaCostPriceChanged"]), headers={id=c054d976-5750-827f-8894-51aba9655c77, timestamp=1441738159660}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'mainFlow.channel#3', message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}]
2015-09-08 11:49:19,664 [pool-3-thread-1] org.springframework.integration.aggregator.AggregatingMessageHandler DEBUG org.springframework.integration.aggregator.AggregatingMessageHandler#0 received message: GenericMessage [payload=java.util.zip.GZIPInputStream@242017b8, headers={id=b80106f9-7f4c-1b92-6aca-6e73d3bf8792, timestamp=1441738159664}]
2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.channel.DirectChannel DEBUG preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}]
2015-09-08 11:49:19,665 [pool-3-thread-1] org.springframework.integration.handler.ServiceActivatingHandler DEBUG ServiceActivator for [org.springframework.integration.dsl.support.BeanNameMessageProcessor@5f3839ad] (org.springframework.integration.handler.ServiceActivatingHandler#0) received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?, headers={id=24e3a1c7-af6b-032c-6a29-b55031fba0d7, timestamp=1441738159665}]
2015-09-08 11:49:19,665 [pool-3-thread-1] com.xxx.DataMigrationModule$ErrorService ERROR org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.aggregator.AggregatingMessageHandler#0]; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:48)
at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:92)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?
at org.springframework.util.Assert.state(Assert.java:385)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:369)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 22 more
更新(09-08-2015)
代码示例
@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(
amazonS3InboundSynchronizationMessageSource(),
e -> e.poller(p -> p.trigger(this::nextExecutionTime))
)
.transform(unzipTransformer())
.split(f -> new FileSplitter())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.transform(Transformers.fromJson(persistentType()), , e -> e.advice(handlingAdvice()))
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(a ->
a.releaseStrategy(g -> g.size() == persistenceBatchSize)
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("size() ge 2 ? 10000 : -1")
, null
)
.handle(jdbcRepositoryHandler())
// TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
.get();
// @formatter:on
}
@Bean
public ErrorService errorService() {
return new ErrorService();
}
@Bean
public MessageChannel customErrorChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow customErrorFlow() {
// @formatter:off
return IntegrationFlows
.from(customErrorChannel())
.handle("errorService", "handleError")
.get();
// @formatter:on
}
@Bean
ExpressionEvaluatingRequestHandlerAdvice handlingAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnFailureExpression("payload");
advice.setFailureChannel(customErrorChannel());
advice.setReturnFailureExpressionResult(true);
advice.setTrapException(true);
return advice;
}
protected class ErrorService implements ErrorHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handleError(Throwable t) {
stopEndpoints(t);
}
private void stopEndpoints(Throwable t) {
log.error(ExceptionUtils.getStackTrace(t));
}
}
是的,你说得对。对于advice
Transformer的MessageHandler的
handle()
方法,您应该使用的第二个参数的
EIP方法。是的:您应该为自己的目的定义e.advice
方法。transform()expressionevaluationrequesthandleradvice
bean。
您可以为不同的目标重用
Advice
bean,以相同的方式处理成功和失败。
使现代化
虽然我不清楚您希望如何使用错误的消息继续流,但您可以使用
expressionevaluationrequesthandleradvice的
onFailureExpression
和returnFailureExpressionResult=true
在unzipErrorChannel()
之后返回一些内容。
顺便说一句,
failureChannel
逻辑在没有onFailureExpression
的情况下无法工作:
if (this.onFailureExpression != null) {
Object evalResult = this.evaluateFailureExpression(message, actualException);
if (this.returnFailureExpressionResult) {
return evalResult;
}
}
结果发现我在一些地方做错了,比如:
>
我必须自动连接一个Jackson2ObjectMapper
(我从Sprint Boot auto-config获得),并构造一个JsonObjectMapper
的实例,作为Transformers中的第二个参数添加。fromJson
;用于更轻松地对持久类型进行解组(停止无法识别的属性异常
);因此,无需使用
表达式评估RequestHandlerAdvice
为了使用FileSplitter,您可以在[code]中选择[code]的适当变体。>InputStream
移动
转换
,聚合
,处理
到自己的pubsub通道,使用异步任务执行器
仍然不是我需要的100%,但还远着呢。
看看下面我的结局。。。
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class DataMigrationModule {
private final Logger log = LoggerFactory.getLogger(getClass());
@Value("${cloud.aws.credentials.accessKey}")
private String accessKey;
@Value("${cloud.aws.credentials.secretKey}")
private String secretKey;
@Value("${cloud.aws.s3.bucket}")
private String bucket;
@Value("${cloud.aws.s3.max-objects-per-batch:1024}")
private int maxObjectsPerBatch;
@Value("${cloud.aws.s3.accept-subfolders:false}")
private String acceptSubFolders;
@Value("${cloud.aws.s3.remote-directory}")
private String remoteDirectory;
@Value("${cloud.aws.s3.local-directory-ref:java.io.tmpdir}")
private String localDirectoryRef;
@Value("${cloud.aws.s3.local-subdirectory:target/s3-dump}")
private String localSubdirectory;
@Value("${cloud.aws.s3.filename-wildcard:}")
private String fileNameWildcard;
@Value("${app.persistent-type:}")
private String persistentType;
@Value("${app.repository-type:}")
private String repositoryType;
@Value("${app.persistence-batch-size:2500}")
private int persistenceBatchSize;
@Value("${app.persistence-batch-release-timeout-in-milliseconds:5000}")
private int persistenceBatchReleaseTimeoutMillis;
@Autowired
private ListableBeanFactory beanFactory;
@Autowired
private ObjectMapper objectMapper;
private final AtomicBoolean invoked = new AtomicBoolean();
private Class<?> repositoryType() {
try {
return Class.forName(repositoryType);
} catch (ClassNotFoundException cnfe) {
log.error("Unknown repository implementation!", cnfe);
System.exit(0);
}
return null;
}
private Class<?> persistentType() {
try {
return Class.forName(persistentType);
} catch (ClassNotFoundException cnfe) {
log.error("Unsupported type!", cnfe);
System.exit(0);
}
return null;
}
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Bean
public FileToInputStreamTransformer unzipTransformer() {
FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
transformer.setDeleteFiles(true);
return transformer;
}
@Bean
public MessageSource<?> amazonS3InboundSynchronizationMessageSource() {
AWSCredentials credentials = new BasicAWSCredentials(this.accessKey, this.secretKey);
AmazonS3InboundSynchronizationMessageSource messageSource = new AmazonS3InboundSynchronizationMessageSource();
messageSource.setCredentials(credentials);
messageSource.setBucket(bucket);
messageSource.setMaxObjectsPerBatch(maxObjectsPerBatch);
messageSource.setAcceptSubFolders(Boolean.valueOf(acceptSubFolders));
messageSource.setRemoteDirectory(remoteDirectory);
if (!fileNameWildcard.isEmpty()) {
messageSource.setFileNameWildcard(fileNameWildcard);
}
String directory = System.getProperty(localDirectoryRef);
if (!localSubdirectory.startsWith("/")) {
localSubdirectory = "/" + localSubdirectory;
}
if (!localSubdirectory.endsWith("/")) {
localSubdirectory = localSubdirectory + "/";
}
directory = directory + localSubdirectory;
FileUtils.mkdir(directory);
messageSource.setDirectory(new LiteralExpression(directory));
return messageSource;
}
@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(
amazonS3InboundSynchronizationMessageSource(),
e -> e.poller(p -> p.trigger(this::nextExecutionTime))
)
.transform(unzipTransformer())
.split(new FileSplitter(), null)
.publishSubscribeChannel(new SimpleAsyncTaskExecutor(), p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
@Bean
public IntegrationFlow persistenceSubFlow() {
JsonObjectMapper<?, ?> jsonObjectMapper = new Jackson2JsonObjectMapper(objectMapper);
ReleaseStrategy releaseStrategy = new TimeoutCountSequenceSizeReleaseStrategy(persistenceBatchSize,
persistenceBatchReleaseTimeoutMillis);
// @formatter:off
return f -> f
.transform(Transformers.fromJson(persistentType(), jsonObjectMapper))
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(
a -> a
.releaseStrategy(releaseStrategy)
.correlationStrategy(m -> m.getHeaders().get("id"))
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
, null
)
.handle(jdbcRepositoryHandler());
// @formatter:on
}
@Bean
public JdbcRepositoryHandler jdbcRepositoryHandler() {
return new JdbcRepositoryHandler(repositoryType(), beanFactory);
}
protected class JdbcRepositoryHandler extends AbstractMessageHandler {
@SuppressWarnings("rawtypes")
private Insertable repository;
public JdbcRepositoryHandler(Class<?> repositoryClass, ListableBeanFactory beanFactory) {
repository = (Insertable<?>) beanFactory.getBean(repositoryClass);
}
@Override
protected void handleMessageInternal(Message<?> message) {
repository.insert((List<?>) message.getPayload());
}
}
protected class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
@Override
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload));
}
}
}
问题内容: 即使在使用Java Swing一年以上之后,对我来说,它仍然像魔术一样。如何正确使用BufferStrategy,尤其是方法? 我想添加一个JFrame和一个Canvas,然后进行绘制。我还希望能够调整()画布的大小。每次我调整Canvas的大小时,似乎都会被浪费掉,或者变得毫无用处,因为在上使用并没有真正做任何事情。另外,它具有怪异的不确定性行为,我不知道如何正确同步它。 这就是我的
问题内容: 我想在Linux上使用该机制。我希望我的应用程序知道何时更改了文件。能否请您提供给我一个示例,该怎么做? 问题答案: 文档(来自具有inotify的Monitor文件系统活动) 在C API 提供了三个系统调用来构建各种文件系统监视器: 在内核中创建子系统的实例,并在成功和失败时返回文件描述符。与其他系统调用一样,如果失败,请检查诊断。 顾名思义,它增加了一块 手表 。每个监视都必须提
问题内容: 需要帮助,以了解如何在UIKit中使用prepareForReuse()。该文件说 您只应重置与内容无关的单元格属性,例如Alpha,编辑和选择状态 但是如何重置单个属性属性(例如isHidden)呢? 假设我的单元格有2个标签,我应该在哪里重置: 标签文本 label.numberOfLines label.isHidden 我的tableView(_:cellForRowAt :)
一段时间以来,我一直试图将我的tableview工作作为一种电子表格,通过背景线程进行更新,当单元格更新时,它会亮起几秒钟(更改样式),然后返回到原始样式。我已经知道,我不能直接在表格单元格中存储和设置样式,我需要某种支持类来保存这些数据。但是tableview“重用”单元格(使用相同的单元格处理不同的数据)的行为真的很奇怪。当所有单元格都适合屏幕时,它对我来说完美无瑕,但一旦我放置大约100个单
问题内容: 我是Hibernate的新手,并且正在编写一个简单的方法来返回与特定过滤器匹配的对象列表。似乎是自然的回报类型。 不管我做什么,除非雇用了ugly,否则我似乎都无法使编译器满意。 我想摆脱它。但是如果我这样做,我会得到警告 (我可以忽略它,但是我不想一开始就得到它),如果我删除泛型以符合返回类型,则会收到警告 我注意到 确实 声明了;但它是完全不同的类型- 返回a 作为原始类型。我发现
当msg处理抛出异常时,如何有效地支持JMS重新交付? 我有一个使用JMS(ActiveMQ)的流,它具有配置为允许n次重新传递尝试的连接工厂。 我希望在处理msg时出现任何错误,导致msg在connectionFactory配置允许的情况下被放回重新交付,然后在最大重新交付尝试用尽时,交付给DLQ。与AMQ保持一致。 对一个相关SO问题的回答意味着我可能会有一个重新抛出的错误通道,它应该触发重新