当前位置: 首页 > 知识库问答 >
问题:

Spring批量RabbitMQ问题

常甫
2023-03-14

我希望将Spring Batch与RabbitMQ集成。我已经开发了如下代码,但是没有数据通过通道。代码中有什么问题?

hannel.java

public interface CustomerMessageChannel {
    @Output("customerMessageChannel")
    MessageChannel customerMessageChannel();
}

顾客JAVA

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
    private static final long serialVersionUID = 1L;
    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;
}

CustomerFieldSetMapper。JAVA

public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
    
    @Override
    public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
        return Customer.builder()
                .id(fieldSet.readLong("id"))
                .firstName(fieldSet.readRawString("firstName"))
                .lastName(fieldSet.readRawString("lastName"))
                .birthdate(fieldSet.readRawString("birthdate"))
                .build();
    }

}

JobConfig。JAVA

@Configuration
public class JobConfig {

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Bean
    public FlatFileItemReader<Customer> customerItemReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);
        reader.setResource(new ClassPathResource("/data/customer.csv"));

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });

        DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
        customerLineMapper.setLineTokenizer(tokenizer);
        customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
        customerLineMapper.afterPropertiesSet();

        reader.setLineMapper(customerLineMapper);

        return reader;
    }
    
    @Bean
    public AmqpItemWriter<Customer> amqpItemWriter() throws Exception{
        return new AmqpItemWriter<>(amqpTemplate);
    }
        
    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(customerItemReader())
                .writer(amqpItemWriter())
                .build();
    }
    
    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }
}

应用属性

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.cloud.stream.bindings.customerMessageChannel.destination=customerMessageChannel
spring.cloud.stream.default.contentType=application/json

顾客csv

id,firstName,lastName,
1, John, Doe,10-10-1952 10:10:10
2, Amy, Eugene,05-07-1985 17:10:00
3, Laverne, Mann,11-12-1988 10:10:10
4, Janice, Preston,19-02-1960 10:10:10
5, Pauline, Rios,29-08-1977 10:10:10
6, Perry, Burnside,10-03-1981 10:10:10
7, Todd, Kinsey,14-12-1998 10:10:10
8, Jacqueline, Hyde,20-03-1983 10:10:10
9, Rico, Hale,10-10-2000 10:10:10
10, Samuel, Lamm,11-11-1999 10:10:10
11, Robert, Coster,10-10-1972 10:10:10
12, Tamara, Soler,02-01-1978 10:10:10
13, Justin, Kramer,19-11-1951 10:10:10
14, Andrea, Law,14-10-1959 10:10:10
15, Laura, Porter,12-12-2010 10:10:10
16, Michael, Cantu,11-04-1999 10:10:10
17, Andrew, Thomas,04-05-1967 10:10:10
18, Jose, Hannah,16-09-1950 10:10:10
19, Valerie, Hilbert,13-06-1966 10:10:10
20, Patrick, Durham,12-10-1978 10:10:10

SpringBatchAmqpApplication.java

@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchAmqpApplication.class, args);
    }

}

共有1个答案

逑彬炳
2023-03-14

您需要为模板配置交换和路由密钥——默认情况下,RabbitMQ将丢弃不可路由的消息。

https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html#spring.rabbitmq.template.exchangehttps://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html#spring.rabbitmq.template.routing-key

您似乎也没有在任何地方引用customerMessageChannel,因此不清楚您期望的是什么。

要从spring cloud stream目的地消费,需要@EnableBinding(CustomerMessageChannel.class)@StreamListener方法。然而,注释模型现在被弃用,取而代之的是函数式编程模型。

https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-producing-consuming-messages

 类似资料:
  • 我有一个JSF支持bean和spring+RabbitMQ应用程序。我打电话给那里 我不能在那里使用convertSendAndReceive,因为复杂的逻辑、队列链等等。问题是我有一个输出队列和它的侦听器。这个侦听器充当路由器,在队列、调用函数等之间调度请求 但我需要返回到客户“OK”字符串,以防它已经在第一个队列中被处理。该消息将进一步进入下一个队列,但在第一个队列之后,我需要用“OK”消息通

  • 问题内容: 我正在尝试将数据从此链接插入到我的SQL Server https://www.ian.com/affiliatecenter/include/V2/CityCoordinatesList.zip 我创建了表 我正在运行以下脚本来进行批量插入 但是批量插入失败,并出现以下错误 当我使用google时,我发现了几篇文章,指出问题可能出在RowTerminator上,但我尝试了诸如/ n

  • 我使用RabbitMQ作为不同消息的队列。当我使用来自一个队列的两个不同消费者的消息时,我会处理它们并将处理结果插入数据库: 我想大量使用队列中的消息,这将减少数据库负载。由于RabbitMQ不支持消费者批量读取消息,我将这样做smth: 消息在全部完全处理之前处于队列中 如果消费者跌倒或断开连接 - 消息保持安全 你认为这个解决方案怎么样?如果可以的话,如果消费者摔倒了,我怎样才能重新得到所有未

  • 使用CommandLineRunner处理工作

  • 当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该

  • 清除Spring表的最佳方法是什么? Spring是否提供任何用于清除的API?或者,我们是否需要对所有Spring批处理表执行delete语句?