当前位置: 首页 > 知识库问答 >
问题:

Kafka&Spring批处理--如何只读取来自同一主题的未提交消息?

邹时铭
2023-03-14
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
ConsumerConfig.GROUP_ID_CONFIG to a random value
@SpringBootApplication
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchStudent {
    public static void main(String[] args) {
        SpringApplication.run(BatchStudent.class, args);
    }

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final KafkaTemplate<Integer, Student> template;
    private final KafkaProperties properties;

    @Value("${kafka.topic.consumer}")
    private String topic;

    @Bean
    public ItemProcessor<Student, Student> customItemProcessor() {
        return new CustomProcessor();
    }

    @Bean
    Job job() {
        return this.jobBuilderFactory.get("job")
                .start(start())
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    KafkaItemWriter<Integer, Student> writer() {
        return new KafkaItemWriterBuilder<Integer, Student>()
                .kafkaTemplate(template)
                .itemKeyMapper(Student::getId)
                .build();
    }

    @Bean
    public KafkaItemReader<Integer, Student> reader() {
        Properties props = new Properties();
        props.putAll(this.properties.buildConsumerProperties());

        return new KafkaItemReaderBuilder<Integer, Student>()
                .partitions(0)
                .consumerProperties(props)
                .name("students-consumer-reader")
                .saveState(true)
                .topic(topic)
                .build();
    }

    @Bean
    Step start() {
        return this.stepBuilderFactory
                .get("step")
                .<Student, Student>chunk(10)
                .writer(writer())
                .processor(customItemProcessor())
                .reader(reader())
                .build();
    }
}

spring.batch.initialize-schema: always

#Conf Kafka Consumer
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
#spring.kafka.consumer.group-id: student-group
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
spring.kafka.consumer.properties.spring.json.value.default.type: com.org.model.Student

#Conf Kafka Producer
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.bootstrap-servers: localhost:9092

#Conf topics
spring.kafka.template.default-topic: producer.student
kafka.topic.consumer: consumer.student
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
    Integer id;
    Integer count;
}

customProcessor.java

@NoArgsConstructor
public class CustomProcessor implements ItemProcessor<Student, Student> {

    @Override
    public Student process(Student studentRecieved) {
        final Student studentSent = new Student();
        studentSent.setId(studentRecieved.getId());
        studentSent.setCount(200);
        return studentSent;
    }
}

谢谢你的帮助!

共有1个答案

郭彬郁
2023-03-14

一切都很好,但我唯一的问题是我的消费者总是从话题的乞求中阅读。我需要它从最后一个未消费的消息中读取。

Spring Batch4.3引入了一种从Kafka中存储的偏移量消费记录的方法。我在去年Spring One的演讲中谈到了这个特性:Spring Batch 4.3中有什么新特性?。您可以使用SetPartitionOffsets在每个分区中为kafka阅读器配置自定义起始偏移量:

Setter for partition offsets. This mapping tells the reader the offset to start reading
from in each partition. This is optional, defaults to starting from offset 0 in each
partition. Passing an empty map makes the reader start from the offset stored in Kafka
for the consumer group ID.

您可以在这个测试用例中找到一个完整的示例。

 类似资料:
  • 我有一个Kafka集群(版本:0.10.1.0),有9个代理和10个分区。 我尝试使用camel kafka从java应用程序中获取消息。这是我的pom。xml 这只是我使用的与骆驼Kafka相关的依赖项。下面是骆驼Kafka消费者代码。 我正在使用文档中指定的KafkaURIhttps://camel.apache.org/components/latest/kafka-component.ht

  • 我在java中有一个函数,在这个函数中我试图获取未读的消息。例如,如果我在broker中有偏移量为0、1、2的消息,这些消息已经被使用者读取,并且如果我关闭我的使用者一个小时。那时我产生的信息偏移量为3,4,5。之后,当我的消费者启动时,它应该从偏移量3读取消息,而不是从0读取消息。但是,它要么读取所有的消息,要么读取启动Kafka Consumer后产生的消息。我想读那些未读或未提交的消息 我尝

  • 我们使用Akka流Kafka来生成和消费消息和Strimzi Kafka集群。以下是相关版本: 重构消息发出后,消费者停止工作。我们在主题中确实有一些信息,但消费者只是在无休止地等待。 以下是日志片段: 还有一些要点: 架构注册表配置正确且良好(否则生产者将无法工作)。 主题(和组协调器)很好,我可以通过这样的普通消费者消费消息: 这就是代码卡住的地方——我使用阻塞调用获取2条消息(甚至无法获取1

  • 有一系列关于Kafka交易和一次性交付的优秀文章 在其中一篇文章中,作者谈到了消费者: 因此,在消费者端,您有两个选项来读取事务性消息,通过“isolation.level”消费者配置来表示: read_committed:除了读取不属于事务的消息外,还可以在事务提交后读取属于事务的消息。 read_uncommitted(读取未提交):按偏移顺序读取所有消息,而无需等待事务提交。此选项类似于Ka

  • 有人能帮我弄清楚这件事吗。 谢了!

  • 我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml