我正在尝试实现一个Spring Integration类,它获取一个. xml文件并对其进行解析,如果有效,就将其移动到一个“存档”目录,如果无效,就将其移动到一个错误目录。
import com.nagarro.studentapi.integration.queue.StudentSender;
import com.nagarro.studentapi.util.XmlParser;
import org.aopalliance.aop.Advice;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.io.File;
@Configuration
@EnableIntegration
public class IntegrationConfiguration {
private static final String XML = "*.xml";
private static final String STUDENT = "\\student.xml";
@Value("${student-api.xmlPath}")
private String inputPath;
@Value("${student-api.archivedDestination}")
private String successPath;
@Value("${student-api.errorDestination}")
private String errorPath;
@Bean
public MessageChannel messageChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "messageChannel")
public MessageSource<File> messageProducer() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File(inputPath));
messageSource.setFilter(new SimplePatternFileListFilter(XML));
return messageSource;
}
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public MessageHandler handler() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(successPath));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}
@Bean
public IntegrationFlow integrationFlow(XmlParser xmlParser) {
return IntegrationFlows.from(messageProducer(), spec -> spec.poller(Pollers.fixedDelay(1000)))
.enrichHeaders(h -> h.headerExpression(FileHeaders.ORIGINAL_FILE, "payload"))
.convert(String.class)
.transform((String path) -> xmlParser.parsePath(path))
.handle("xmlParser", "parsePath", e -> e.advice(errorAdvice()))
.get();
}
@Bean
public AbstractRequestHandlerAdvice errorAdvice() {
return new AbstractRequestHandlerAdvice() {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
File file = message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class);
try {
Object result = callback.execute();
file.renameTo(new File(successPath, STUDENT));
System.out.println("File renamed after success");
return result;
}
catch (Exception e) {
file.renameTo(new File(errorPath, STUDENT));
System.out.println("File renamed after failure");
throw e;
}
}
};
}
}
然而,每当调用< code>calback.execute()时,我都会得到这个错误,我不太明白为什么。
2022-09-06 18:20:07.971 ERROR 32152 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@1135e3d6]; nested exception is java.lang.IllegalArgumentException: No candidate methods found for messages., failedMessage=GenericMessage [payload=Student(firstname=John, lastname=Dose, cnp=123, birthDate=2000-12-12, address=Address(street=a, number=1, city=Craiova, country=Romania), grades=[Grade(discipline=a, date=2021-12-12, grade=10), Grade(discipline=b, date=2021-12-12, grade=9)]), headers={....
虽然我有一个消息处理程序,但我怀疑这个问题的原因是我没有重写handle方法。但我不知道该怎么做。
你有几个问题:
@InboundChannelAdapter
和集成流.from(消息生产者()
.这样,即可为同一源创建两个独立的轮询终结点。@ServiceActivator
- 要写入的终结点刚刚从其中一个源读取文件。@InboundChannelAdapter,你的@ServiceActivator
期望和这种流动之间没有
联系。.转换((字符串路径) -
请仔细修改您的逻辑:现在您的某些配置具有误导性,并且确实容易出错。我相信你得到的错误是因为你的
parsePath()
需要一个字符串
,而不是学生
,因为我们在该句柄()
的有效负载中看到。
我在Windows中使用Java8,我试图实现一件简单的事情。假设我有一个目录和一个目录。我每天都在尝试将文件从源移动到目的地。下面是一个简单的java代码,使用NIO; null
我正在使用Spring集成流入站通道适配器,从远程SFTP获取流并解析内容进程的每一行。 我使用: 它现在可以工作了。但是我只能从目录中获取文件,但是我需要递归地从这个目录和子目录中获取文件,并解析每一行。 我注意到入站通道适配器是Sftp。内置适配器(sftpSessionFactory)。扫描仪(…) 。它可以扫描子目录。但我没有看到任何关于流式入站通道适配器的内容。 那么,如何在流入站通道适
我有一个文件夹,其中有一个。dat文件和一个是。zip文件,我必须移动。将文件压缩到另一个目录 我有两个文件夹,一个是 请建议如何实现这一点,我现在所做的是。。。
Java是否提供了基于文件夹目录的API来完成此任务,或者我们是否需要递归地将每个文件复制到此路径?
我需要将所有文件从Camel监控的导入目录移动到另一个目录。为此,我在路由中使用了Camel的移动和移动失败选项。 我在属性文件中有一个MaxFilesAllowedInArchive参数,如果超过文件限制,则必须删除最旧的文件。 如何进行自定义移动和移动无法控制移动的文件数量并将最新文件保存在存档中?
我正在为我公司的 POC 实施一个小型 hadoop 集群。我正在尝试使用Flume将文件导入HDFS。每个文件都包含如下 JSON 对象(每个文件 1 个“长”行): “objectType”是数组中对象的类型(例如:事件、用户…)。 这些文件稍后将由多个任务根据“对象类型”进行处理。 我正在使用spoolDir源和HDFS接收器。 我的问题是: > 当flume写入HDFS时,是否可以保留源文