当前位置: 首页 > 知识库问答 >
问题:

Spring批处理-AmqpWriter和AmqpReader示例

邢小云
2023-03-14

需要一个解决方案来使用AmqpWriter在RabbitMQ上写入数据,并使用AmqpReader使用RabbitMQ读取数据。我们不是在寻找阿帕奇·Kafka,我们只是想简单地发送程序细节并使用它。

作家代码

JobConfig。JAVA

@Configuration
public class JobConfig {
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }
    
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    public Queue myQueue() {
       return new Queue("myqueue");
    }
    

    @Bean
    public FlatFileItemReader<Customer> customerItemReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);
        reader.setResource(new ClassPathResource("/data/customer.csv"));

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });

        DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
        customerLineMapper.setLineTokenizer(tokenizer);
        customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
        customerLineMapper.afterPropertiesSet();

        reader.setLineMapper(customerLineMapper);

        return reader;
    }
        
    @Bean
    public AmqpItemWriter<Customer> amqpWriter(){
        AmqpItemWriter<Customer> amqpItemWriter = new AmqpItemWriter<>(this.rabbitTemplate());
        return amqpItemWriter;
    }
        
    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(customerItemReader())
                .writer(amqpWriter())
                .build();
    }
    
    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }
}

CustomelFieldSetMapper.java

public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
    
    @Override
    public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
        return Customer.builder()
                .id(fieldSet.readLong("id"))
                .firstName(fieldSet.readRawString("firstName"))
                .lastName(fieldSet.readRawString("lastName"))
                .birthdate(fieldSet.readRawString("birthdate"))
                .build();
    }
}

顾客JAVA

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
    private static final long serialVersionUID = 1L;
    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;
}

SpringBatchAMQP应用程序。JAVA

@EnableBatchProcessing
@SpringBootApplication
@EnableBinding(Source.class)
public class SpringBatchAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchAmqpApplication.class, args);
    }
}

读码器

作业配置。JAVA

@Configuration
public class JobConfiguration {
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setMessageConverter(jsonMessageConverter());
        return factory; 
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setDefaultReceiveQueue("myqueue");
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    }

    @Bean
    public ItemReader<Customer> customerReader(){
        return new AmqpItemReader<>(this.rabbitTemplate());
    }

    @Bean
    public ItemWriter<Customer> customerItemWriter(){
        return items -> {
            for(Customer c : items) {
                System.out.println(c.toString());
            }
        };
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer> chunk(10)
                .reader(customerReader())
                .writer(customerItemWriter())
                .listener(customerStepListener())
                .build();
    }

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .start(step1())
                .build();
    }

    @Bean
    public CustomerStepListener customerStepListener() {
        return new CustomerStepListener();
    }
}

CustomerStepListener。JAVA

public class CustomerStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("==");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("READ COUNT = "+stepExecution);
        return ExitStatus.COMPLETED;
    }
}

日志

2021-01-18 18:41:05.023信息25532---[main]o.s.批次。果心工作SimpleStreamHandler:执行步骤:[step1]==2021-01-18:41:05.031信息25532---[main]o.s.a.r.c.CachingConnectionFactory:尝试连接到:localhost:5672 2021-01-18:41:05.072信息25532---[main]o.s.a.r.c.CachingConnectionFactory:创建的新连接:connectionFactory#20a14b55:0/SimpleConnection@4650a407[代表=amqp://guest@127.0.0.1:5672/,localPort=55797]READ COUNT=step执行:id=1,version=2,name=step1,status=COMPLETED,exitStatus=COMPLETED,readCount=0,filterCount=0,writeCount=0 readSkipCount=0,writeSkipCount=0,processSkipCount=0,commitCount=1,rollbackCount=0,exitDescription=2021-01-18:41:05.097信息25532-[main]o.s.batch。果心步抽象步骤:步骤:[step1]在73ms 2021-01-18 18:41:05.099信息25532中执行---[main]o.s.b.c.l.支持。SimpleJob:[SimpleJob:[name=Job]]已使用以下参数完成:[{-spring.output.ansi.enabled=always}],并在87毫秒内处于以下状态:[completed]

共有1个答案

东方嘉佑
2023-03-14

在“编写器代码”端,您使用的是配置有rabbitmplateAmqpItemWriter。默认情况下,消息将被发送到无名交易所,这里是Javadoc的摘录:

Messages will be sent to the nameless exchange if not specified on the provided AmqpTemplate.

在writer配置中,rabbit模板和队列之间没有“连接”。因此,您需要配置rabbit模板以向队列发送消息:

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setRoutingKey(myQueue().getName());
    return rabbitTemplate;
}

这与您在读者端使用rabbitmplate所做的类似。setDefaultReceiveQueue(“myqueue”)

 类似资料:
  • 我有一个Spring批处理作业,它通过SFTP从远程Linux服务器检索文件。远程服务器上的目录是一个包含七天文件(约400个文件)的存档。文件的大小相对较小。 Spring批处理知道哪些文件已经被处理。 当我启动应用程序时。第一次,Spring Batch tasklet检索文件时,Spring Batch会为它已经处理的每个文件生成一个异常: > 在Transformer类中,是否应该检查文件

  • 我正在尝试在Spring批处理中并行运行多个作业。在谷歌上搜索了很多之后,我遇到了JobStep。有没有人使用过JobStep可以解释如何使用它来并行运行作业,或者有没有其他方法可以并行运行2个独立的作业,即当我启动批处理时,2个作业应该开始并行运行。我的要求就像 当我的应用程序启动时,两个作业都应该开始运行。使用spring batch是否可以这样做 编辑:我甚至试过这种方法 我面临着例外。sp

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 在Spring批处理作业中,我将项目写入目标文件(使用FlatFileItemWriter),并将输入记录“process indicator”字段更新为“processed”/“failed”(使用JdbcBatchItemWriter)。在“物品交易”中实现这一点的最佳方式是什么? 使用CompositeItemWriter(委托FlatFileItemWriter写入文件,委托JdbcBat

  • 我有一个spring批处理应用程序(spring boot 2.3.5版),它在spring批处理时使用一个JpaRepository将一些自定义日志消息插入数据库。这与开箱即用的spring批处理表是分开的。似乎当我从ItemProcessorAdapter抛出异常时,它会被ItemProcessListener onProcessError()方法捕获。在这个方法中,我执行一个JpaRepos

  • 我想分散加工大批量。这个想法是使用Spring Batch在云中激发一堆AMQP消费者,然后加载廉价的任务(如项目ID)并将它们提交给AMQP交换。结果的书写将由消费者自己完成。 null