当前位置: 首页 > 知识库问答 >
问题:

如何使用Apache Camel读取文件,使用Spring batch处理文件,读取每一行并通过Apache Camel返回

何松
2023-03-14

想知道如何将JAXBElement传递给Camel路由,它从通过Camel路由加载的Spring批处理读取的批处理文件的每一行进行处理。

下面给出的代码段使用customerWriter方法调用JMSTemplate将消息写入队列。相反,我需要将消息路由到另一条骆驼路由。

当前:CamelRoute->ReadFile->Spring批处理->处理每一行->队列

应为:CamelRoute->ReadFile->Spring批处理->处理每一行->Camel Route

@Override
public void configure()  {

    String fromUri = batchLoadPath + "?" + batchFileOptions;
    from(fromUri).process(new Processor() {
        public void process(Exchange msg)  {
            File file = msg.getIn().getBody(File.class);
            String fileName = file.getAbsolutePath();

            try {
                JobParameters jobParameters = new JobParametersBuilder().addString("input.file.name", fileName).addDate("dateTime", new Date()).toJobParameters();
                jobLauncher.run(importCustomerJob, jobParameters);
            } catch (Exception e) {
                log.error(Process file encountered error:" + e.getMessage(), e);
            }
        }

    })
    .to("log:EndBatch");

批处理配置:

@Bean
public JmsItemWriter<String> customerWriter() {
    JmsItemWriter<String> writer = new JmsItemWriter<String>();
    writer.setJmsTemplate(jmsTemplate);
    return writer;
}

public Job importCustomerJob(JobCompletionNotificationListener listener, JobBuilderFactory jobBuilderFactory, Step step1) {
    JobBuilder builder = jobBuilderFactory.get("importCustomerJob");
    builder.incrementer(new RunIdIncrementer());
    builder.listener(listener);
    JobFlowBuilder jfb = builder.flow(step1);
    jfb.end();
    Job job = jfb.build().build();
    return job;
}

@Bean
public Step step1(StepBuilderFactory stepBuilderFactory) {
    // Read chunk of 10 records and writing those as messages to queue
    return stepBuilderFactory.get("step1")
            .<Customer, String>chunk(10)
            .reader(customerReader())
            .faultTolerant()
            .skipPolicy(fileVerificationSkipper())
            .processor(customerItemProcessor())
            .writer(customerWriter())
            .listener(customerReader())
            .build();
}

批处理程序:

public class CustomerItemProcessor implements ItemProcessor<Customer, String> {
    @Autowired
    JaxbUtil jaxbUtil;

    public String process(Customer item) throws Exception {
        // Mapping code goes here
        JAXBElement<CustomerX> mobj = customerFactory.createCustomerX(cp);
        return jaxbUtil.objectToXml(mobj);
    }
}

共有1个答案

何志业
2023-03-14

从骆驼的角度来看,要调用另一条骆驼路由,只需添加一个.to()语句。例如,要同步调用内存中的路由,可以使用direction:

from(fromUri).process(new Processor() {
    public void process(Exchange msg)  {
        ... your processor impl
    }
})
.to("direct:yourOtherRoute")
.to("log:EndBatch"); 

from("direct:yourOtherRoute")
...

要将处理器的结果传递到下一条路由,处理器必须将此结果设置到Exchange正文中。

 类似资料:
  • 我使用OpenCSV进行CSV文件的读写。在I hsd安装早期版本的Java(即Java 6)之前,它工作得很好。安装完之后,我的代码就停止工作了: 线程“main”java.lang.UnsupportedClassVersionError:COM/OpenCSV/CSVReader:不支持Major.Minor版本51.0在java.lang.ClassLoader.DefineClass1(

  • 问题内容: 如何使用fseek逐行读取文件? 代码可能会有所帮助。必须是跨平台和纯PHP。 提前谢谢了 问题答案: 问题是使用fseek进行询问,因此只能假定性能是一个问题,而file()不是解决方案。这是使用fseek的一种简单方法: 我的file.txt 和代码: 输出: 您不必像我一样附加到$ lines数组,如果这是脚本的目的,则可以立即打印输出。如果要限制行数,也很容易引入计数器。

  • 问题内容: EMF = Eclipse建模框架 我必须在一个课堂项目中使用EMF。我正在尝试了解如何使用EMF执行以下操作: 读取XML, 将值放入对象。 使用ORM将对象中的值持久保存到数据库中。-完成 使用ORM从数据库获取数据并生成XML。 我需要使用EMF(不知道是什么)和JPA(完成)来完成所有这些操作。 我使用过JAXB,我知道,可以使用JAXB完成,但是(EMF == JAXB)怎么

  • 问题内容: 我正在建立一个供人们上传PHP中的.tar(和.tar.gz,.tar.bz2,.zip等)文件的系统。上载文件是可以的,但是在上载档案之后,我想列出档案中包含的文件。 有人可以推荐一个可以读取文件存档的优秀PHP库吗? 我在Pear上找到了File_Archive,但是几年来没有更新。ZipArchive非常适合.zip文件,但是我需要一些可以处理更多文件类型的文件。 更新 我在RH

  • 问题内容: 我需要使用Java读取XML文件。它的内容就像 是否有特殊的阅读器/ JAR,还是应该使用 FileInputStream进行 阅读? 问题答案: 另一个建议:尝试使用Commons消化器。这使您可以使用基于规则的方法非常快速地开发解析代码。有一个教程在这里和图书馆可在这里 我也同意Brian和Alzoid的观点,因为JAXB非常适合快速启动和运行。您可以使用JDK附带的xjc绑定编译

  • 我试图从XML文件中读入一些数据,但遇到了一些问题,我的XML如下所示: 我试图将这些值作为字符串读入Java程序,到目前为止,我已经编写了以下代码: 我正在努力读取和打印id、用户名等的值。