我希望将Spring Batch与RabbitMQ集成。我已经开发了如下代码,但是没有数据通过通道。代码中有什么问题?
hannel.java
public interface CustomerMessageChannel {
@Output("customerMessageChannel")
MessageChannel customerMessageChannel();
}
顾客JAVA
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
private static final long serialVersionUID = 1L;
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
CustomerFieldSetMapper。JAVA
public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
return Customer.builder()
.id(fieldSet.readLong("id"))
.firstName(fieldSet.readRawString("firstName"))
.lastName(fieldSet.readRawString("lastName"))
.birthdate(fieldSet.readRawString("birthdate"))
.build();
}
}
JobConfig。JAVA
@Configuration
public class JobConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private AmqpTemplate amqpTemplate;
@Bean
public FlatFileItemReader<Customer> customerItemReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new ClassPathResource("/data/customer.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
customerLineMapper.setLineTokenizer(tokenizer);
customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
customerLineMapper.afterPropertiesSet();
reader.setLineMapper(customerLineMapper);
return reader;
}
@Bean
public AmqpItemWriter<Customer> amqpItemWriter() throws Exception{
return new AmqpItemWriter<>(amqpTemplate);
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(customerItemReader())
.writer(amqpItemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
}
应用属性
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.customerMessageChannel.destination=customerMessageChannel
spring.cloud.stream.default.contentType=application/json
顾客csv
id,firstName,lastName,
1, John, Doe,10-10-1952 10:10:10
2, Amy, Eugene,05-07-1985 17:10:00
3, Laverne, Mann,11-12-1988 10:10:10
4, Janice, Preston,19-02-1960 10:10:10
5, Pauline, Rios,29-08-1977 10:10:10
6, Perry, Burnside,10-03-1981 10:10:10
7, Todd, Kinsey,14-12-1998 10:10:10
8, Jacqueline, Hyde,20-03-1983 10:10:10
9, Rico, Hale,10-10-2000 10:10:10
10, Samuel, Lamm,11-11-1999 10:10:10
11, Robert, Coster,10-10-1972 10:10:10
12, Tamara, Soler,02-01-1978 10:10:10
13, Justin, Kramer,19-11-1951 10:10:10
14, Andrea, Law,14-10-1959 10:10:10
15, Laura, Porter,12-12-2010 10:10:10
16, Michael, Cantu,11-04-1999 10:10:10
17, Andrew, Thomas,04-05-1967 10:10:10
18, Jose, Hannah,16-09-1950 10:10:10
19, Valerie, Hilbert,13-06-1966 10:10:10
20, Patrick, Durham,12-10-1978 10:10:10
SpringBatchAmqpApplication.java
@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchAmqpApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchAmqpApplication.class, args);
}
}
您需要为模板配置交换和路由密钥——默认情况下,RabbitMQ将丢弃不可路由的消息。
https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html#spring.rabbitmq.template.exchangehttps://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html#spring.rabbitmq.template.routing-key
您似乎也没有在任何地方引用customerMessageChannel
,因此不清楚您期望的是什么。
要从spring cloud stream目的地消费,需要@EnableBinding(CustomerMessageChannel.class)
和@StreamListener
方法。然而,注释模型现在被弃用,取而代之的是函数式编程模型。
https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-producing-consuming-messages
我有一个JSF支持bean和spring+RabbitMQ应用程序。我打电话给那里 我不能在那里使用convertSendAndReceive,因为复杂的逻辑、队列链等等。问题是我有一个输出队列和它的侦听器。这个侦听器充当路由器,在队列、调用函数等之间调度请求 但我需要返回到客户“OK”字符串,以防它已经在第一个队列中被处理。该消息将进一步进入下一个队列,但在第一个队列之后,我需要用“OK”消息通
问题内容: 我正在尝试将数据从此链接插入到我的SQL Server https://www.ian.com/affiliatecenter/include/V2/CityCoordinatesList.zip 我创建了表 我正在运行以下脚本来进行批量插入 但是批量插入失败,并出现以下错误 当我使用google时,我发现了几篇文章,指出问题可能出在RowTerminator上,但我尝试了诸如/ n
我使用RabbitMQ作为不同消息的队列。当我使用来自一个队列的两个不同消费者的消息时,我会处理它们并将处理结果插入数据库: 我想大量使用队列中的消息,这将减少数据库负载。由于RabbitMQ不支持消费者批量读取消息,我将这样做smth: 消息在全部完全处理之前处于队列中 如果消费者跌倒或断开连接 - 消息保持安全 你认为这个解决方案怎么样?如果可以的话,如果消费者摔倒了,我怎样才能重新得到所有未
使用CommandLineRunner处理工作
当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。 我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该
清除Spring表的最佳方法是什么? Spring是否提供任何用于清除的API?或者,我们是否需要对所有Spring批处理表执行delete语句?