需要一个解决方案来使用AmqpWriter
在RabbitMQ上写入数据,并使用AmqpReader
使用RabbitMQ读取数据。我们不是在寻找阿帕奇·Kafka,我们只是想简单地发送程序细节并使用它。
作家代码
JobConfig。JAVA
@Configuration
public class JobConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@Bean
public FlatFileItemReader<Customer> customerItemReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new ClassPathResource("/data/customer.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
customerLineMapper.setLineTokenizer(tokenizer);
customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
customerLineMapper.afterPropertiesSet();
reader.setLineMapper(customerLineMapper);
return reader;
}
@Bean
public AmqpItemWriter<Customer> amqpWriter(){
AmqpItemWriter<Customer> amqpItemWriter = new AmqpItemWriter<>(this.rabbitTemplate());
return amqpItemWriter;
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(customerItemReader())
.writer(amqpWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
}
CustomelFieldSetMapper.java
public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
return Customer.builder()
.id(fieldSet.readLong("id"))
.firstName(fieldSet.readRawString("firstName"))
.lastName(fieldSet.readRawString("lastName"))
.birthdate(fieldSet.readRawString("birthdate"))
.build();
}
}
顾客JAVA
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
private static final long serialVersionUID = 1L;
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
SpringBatchAMQP应用程序。JAVA
@EnableBatchProcessing
@SpringBootApplication
@EnableBinding(Source.class)
public class SpringBatchAmqpApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchAmqpApplication.class, args);
}
}
读码器
作业配置。JAVA
@Configuration
public class JobConfiguration {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setDefaultReceiveQueue("myqueue");
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@Bean
public ItemReader<Customer> customerReader(){
return new AmqpItemReader<>(this.rabbitTemplate());
}
@Bean
public ItemWriter<Customer> customerItemWriter(){
return items -> {
for(Customer c : items) {
System.out.println(c.toString());
}
};
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer> chunk(10)
.reader(customerReader())
.writer(customerItemWriter())
.listener(customerStepListener())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
@Bean
public CustomerStepListener customerStepListener() {
return new CustomerStepListener();
}
}
CustomerStepListener。JAVA
public class CustomerStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("==");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("READ COUNT = "+stepExecution);
return ExitStatus.COMPLETED;
}
}
日志
2021-01-18 18:41:05.023信息25532---[main]o.s.批次。果心工作SimpleStreamHandler:执行步骤:[step1]==2021-01-18:41:05.031信息25532---[main]o.s.a.r.c.CachingConnectionFactory:尝试连接到:localhost:5672 2021-01-18:41:05.072信息25532---[main]o.s.a.r.c.CachingConnectionFactory:创建的新连接:connectionFactory#20a14b55:0/SimpleConnection@4650a407[代表=amqp://guest@127.0.0.1:5672/,localPort=55797]READ COUNT=step执行:id=1,version=2,name=step1,status=COMPLETED,exitStatus=COMPLETED,readCount=0,filterCount=0,writeCount=0 readSkipCount=0,writeSkipCount=0,processSkipCount=0,commitCount=1,rollbackCount=0,exitDescription=2021-01-18:41:05.097信息25532-[main]o.s.batch。果心步抽象步骤:步骤:[step1]在73ms 2021-01-18 18:41:05.099信息25532中执行---[main]o.s.b.c.l.支持。SimpleJob:[SimpleJob:[name=Job]]已使用以下参数完成:[{-spring.output.ansi.enabled=always}],并在87毫秒内处于以下状态:[completed]
在“编写器代码”端,您使用的是配置有rabbitmplate
的AmqpItemWriter
。默认情况下,消息将被发送到无名交易所,这里是Javadoc的摘录:
Messages will be sent to the nameless exchange if not specified on the provided AmqpTemplate.
在writer配置中,rabbit模板和队列之间没有“连接”。因此,您需要配置rabbit模板以向队列发送消息:
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setRoutingKey(myQueue().getName());
return rabbitTemplate;
}
这与您在读者端使用rabbitmplate所做的类似。setDefaultReceiveQueue(“myqueue”)
。
我有一个Spring批处理作业,它通过SFTP从远程Linux服务器检索文件。远程服务器上的目录是一个包含七天文件(约400个文件)的存档。文件的大小相对较小。 Spring批处理知道哪些文件已经被处理。 当我启动应用程序时。第一次,Spring Batch tasklet检索文件时,Spring Batch会为它已经处理的每个文件生成一个异常: > 在Transformer类中,是否应该检查文件
我正在尝试在Spring批处理中并行运行多个作业。在谷歌上搜索了很多之后,我遇到了JobStep。有没有人使用过JobStep可以解释如何使用它来并行运行作业,或者有没有其他方法可以并行运行2个独立的作业,即当我启动批处理时,2个作业应该开始并行运行。我的要求就像 当我的应用程序启动时,两个作业都应该开始运行。使用spring batch是否可以这样做 编辑:我甚至试过这种方法 我面临着例外。sp
我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。
在Spring批处理作业中,我将项目写入目标文件(使用FlatFileItemWriter),并将输入记录“process indicator”字段更新为“processed”/“failed”(使用JdbcBatchItemWriter)。在“物品交易”中实现这一点的最佳方式是什么? 使用CompositeItemWriter(委托FlatFileItemWriter写入文件,委托JdbcBat
我有一个spring批处理应用程序(spring boot 2.3.5版),它在spring批处理时使用一个JpaRepository将一些自定义日志消息插入数据库。这与开箱即用的spring批处理表是分开的。似乎当我从ItemProcessorAdapter抛出异常时,它会被ItemProcessListener onProcessError()方法捕获。在这个方法中,我执行一个JpaRepos
我想分散加工大批量。这个想法是使用Spring Batch在云中激发一堆AMQP消费者,然后加载廉价的任务(如项目ID)并将它们提交给AMQP交换。结果的书写将由消费者自己完成。 null