我正在尝试设置一个动态IntegrationFlow
,从本地目录读取文件。因此,我创建了以下可参数化类:
@Configuration
public class FileConsoleFlow {
@Bean("flowBuilderFactory")
Function<Properties, IntegrationFlow> flowBuilderFactory() {
return (properties) -> new SimpleFlowBuilder(properties).build();
}
static class SimpleFlowBuilder {
private Properties properties;
public FlowBuilder(Properties properties) {
this.properties = properties;
}
public IntegrationFlow build() {
return IntegrationFlows.from(localSource(), c -> c.poller(pollerMetadata()))
.handle(message -> System.out.println(message))
.get();
}
public FileReadingMessageSource localSource() {
CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
filters.addFilter(new SimplePatternFileListFilter(properties.getProperty("local-file-pattern")));
filters.addFilter(new AcceptOnceFileListFilter<>(10000));
RecursiveDirectoryScanner scanner = new RecursiveDirectoryScanner();
scanner.setFilter(filters);
FileReadingMessageSource fileSource = new FileReadingMessageSource();
fileSource.setDirectory(new File(properties.getProperty("local-directory")));
fileSource.setScanner(scanner);
return fileSource;
}
public PollerMetadata pollerMetadata() {
return Pollers.fixedDelay(1000)
.maxMessagesPerPoll(100)
.advice(transactionInterceptor())
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.get();
}
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder()
.transactionManager(txManager())
.build();
}
public TransactionManager txManager() {
return new PseudoTransactionManager();
}
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionEvaluatingTransactionSynchronizationProcessor processor =
new ExpressionEvaluatingTransactionSynchronizationProcessor();
SpelExpressionParser spelParser = new SpelExpressionParser();
processor.setAfterCommitExpression(spelParser.parseExpression("payload.delete()"));
processor.setAfterRollbackExpression(spelParser.parseExpression("payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"));
return new DefaultTransactionSynchronizationFactory(processor);
}
}
}
参数化流在IntegrationFlowContext
的帮助下注册,如下所示:
...
integrationFlowContext.registration(flow)
.autoStartup(false)
.id("flow-" + id)
.register();
...
integrationFlowContext.getRegistry().forEach((name, flow) -> flow.start());
启动应用程序后,我得到以下警告:
2022-02-08 14:11:41.951 WARN [,1e3c4776ef54251e,1e3c4776ef54251e] 1 --- [ scheduling-1] o.s.i.expression.ExpressionUtils : Creating EvaluationContext with no beanFactory
java.lang.RuntimeException: No beanFactory
at org.springframework.integration.expression.ExpressionUtils.createStandardEvaluationContext(ExpressionUtils.java:90) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor.createEvaluationContext(ExpressionEvaluatingTransactionSynchronizationProcessor.java:218) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor.prepareEvaluationContextToUse(ExpressionEvaluatingTransactionSynchronizationProcessor.java:202) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor.doProcess(ExpressionEvaluatingTransactionSynchronizationProcessor.java:141) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor.processAfterCommit(ExpressionEvaluatingTransactionSynchronizationProcessor.java:125) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.integration.transaction.DefaultTransactionSynchronizationFactory$DefaultTransactionalResourceSynchronization.processResourceAfterCommit(DefaultTransactionSynchronizationFactory.java:79) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.integration.transaction.DefaultTransactionSynchronizationFactory$DefaultTransactionalResourceSynchronization.processResourceAfterCommit(DefaultTransactionSynchronizationFactory.java:53) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.transaction.support.ResourceHolderSynchronization.afterCommit(ResourceHolderSynchronization.java:87) ~[spring-tx-5.3.15.jar:5.3.15]
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:135) ~[spring-tx-5.3.15.jar:5.3.15]
at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:123) ~[spring-tx-5.3.15.jar:5.3.15]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:936) ~[spring-tx-5.3.15.jar:5.3.15]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:782) ~[spring-tx-5.3.15.jar:5.3.15]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711) ~[spring-tx-5.3.15.jar:5.3.15]
at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:654) ~[spring-tx-5.3.15.jar:5.3.15]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:407) ~[spring-tx-5.3.15.jar:5.3.15]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.15.jar:5.3.15]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.15.jar:5.3.15]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.15.jar:5.3.15]
at jdk.proxy2/jdk.proxy2.$Proxy101.call(Unknown Source) ~[na:na]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.3.15.jar:5.3.15]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341) ~[spring-integration-core-5.5.8.jar:5.5.8]
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:64) ~[spring-cloud-sleuth-instrumentation-3.1.0.jar:3.1.0]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.15.jar:5.3.15]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95) ~[spring-context-5.3.15.jar:5.3.15]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
如果我理解正确的话,IntegrationFlowContext
负责创建集成bean(从而设置beanFactory)。但是为什么我会收到这个错误信息呢?
该错误意味着必须将TransactionSynchronizationFactory
声明为bean,从而满足其BeanFactory
要求。
请修改您所有的挥发性对象,并考虑将它们作为全球单体bean。
我相信所有这些都可以进入主配置,而不是动态配置:PollerMetadata
、TransactionInterceptor
、TransactionManager
和TransactionSynChronizationFactory
。解决方案中的动态部分实际上就是FileReadingMessageSource。
最好使用尽可能少的动态性,以获得更好的性能和适当的可重用性。在不需要的时候,你也应该避免重复。
我甚至会移动。句柄(消息-
我正在遵循Oracle的教程在JavaFX中创建TableView。在这个截图中,我复制并粘贴了代码,发现了几个错误,教程中没有提到这些错误。 TableView上的警告显示: TableView是原始类型。对泛型类型TableView的引用 表列上的警告说: 表列是一个原始类型。对泛型类型TableCol列的引用 addAll方法上的警告是: 类型安全:方法addAll(Object…)属于原始
我试着去看其他的代码例子,但是它们和我的代码比较相似,但是我的应用程序仍然会因为同样的错误而崩溃。 这是我使用firebase文档的指导方针编写的代码: 上面的代码正在将otp发送到给定的号码,但是它崩溃了,并且cat-log显示了上面提到的错误。
问题内容: 我需要一种从try / catch块的中间中断而又不会引发异常的方法。类似于中断并继续for循环的操作。这可能吗? 我对于抛出一个自定义异常(将其命名为“ BreakContinueException”)变得很奇怪,该异常在其catch处理程序中什么都不做。我敢肯定这很扭曲。 因此,我不知道任何直接的解决方案吗? 问题答案: 正确的方法可能是通过将try-catch块放在单独的方法中来
这里提出的问题是:https://vaadin.com/forum/thread/18095407/how-to-create-a-grid-without-binder 然而,瓦丁的论坛关闭了,所以我想在这里继续。 关于Vaadin 14,任何关于实现动态变化列数网格的最佳方法的建议。使用列索引(1,2,3...)对我来说不是一个好选择。假设我有一个简单的Json文件(只有1个级别:key-va
每次我在eclipse中创建一个android项目时,除了作为项目添加的appcompat_v7之外,它都以错误“没有找到与给定名称匹配的资源:attr xxx”开始。 我找到了一种方法来解决此问题,方法是完全删除操作栏,避免使用少于API 11,修改所有XML文件并删除创建的appcompat_v7项目。但是,这个解决方案很繁琐,每次我需要创建一个简单的项目时都必须完成。 我的问题不是“什么是a
“laravel/框架”:“5.7.*” “tymon/jwt认证”:“开发人员开发” 我正在尝试创建一个添加了自定义声明的JWT令牌,而不使用auth(这意味着我不希望从凭据创建令牌)。这是为了创建不需要登录的令牌,例如忘记/重置密码等。 使用Tymon/JWTAuth(https://github.com/tymondesigns/jwt-auth)由于最新的Laravel存在问题,因此建议加