当前位置: 首页 > 知识库问答 >
问题:

如何实现Kafka消费者使用Spring-cloud-stream按需处理事件?

祁修诚
2023-03-14

我正在尝试实现一个PollableConsumer,当我在SpringBoot应用程序中遇到endpoint时,它会在特定条件下开始轮询来自Kafka的消息。

在特定条件下,我尝试了多种触发民意调查的方法,但显然,只有当它不断地从Kafka主题进行民意调查时,它才会起作用。(就像spring cloud stream文档中的所有示例一样)

我正在寻找这样的东西:

public interface CustomProcessor {
    @Input
    PollableMessageSource input();
}
 public void run() {
        boolean result = true;
        while (result) {
            result = input.poll(m -> {
                Event event = (Event) m.getPayload();
                GenericMessage<Event> genericMessage = new GenericMessage<>(event, m.getHeaders());
                eventMessageConsumer.consume(genericMessage);
            }, new ParameterizedTypeReference<Event>() {
            });

            try {
                Thread.sleep(1_000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
            if (result) {
                System.out.println("Success");
            }
        }
    }

当我碰到这样的endpoint时可能会触发:

@GetMapping("/process")
public void process() {
   SomeClass.run();
}

共有2个答案

朱锐
2023-03-14

在您的代码示例中,我不确定输入了什么。poll返回,因为您将结果分配给布尔值,而这不是KafkaConsumer的结果。poll方法返回

Kafka消费者可以根据需要暂停和重启,这不应该是问题。

来自最新的KafkaConsumer文档:

消耗流量控制

...

Kafka支持使用pause(Collection)和resume(Collection)对消费流进行动态控制,在将来的轮询(Duration)调用中分别暂停指定分配分区上的消费,并恢复指定暂停分区上的消费。

然后这来自暂停方法描述:

请注意,此方法不影响分区订阅。特别是,使用自动分配时,不会导致组重新平衡。

https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

因此,在您的情况下,您可以简单地拥有一个易失性布尔值isPause标志,如果更改了该标志,您将根据需要发出暂停恢复命令。Kafka消费者将完成剩下的工作。

另外,请注意,KafkaConsumer不是线程安全的,因此您应该在轮询数据的同一线程上发出这些命令。

黄扬
2023-03-14

显然,目前使用spring cloud stream不可能暂停PollableConsumer,所以我回到了基于事件的消息消费,并使用执行器来控制绑定的状态。遵循此线程和spring cloud stream文档,我注入了BindingsEndpoint并更改了绑定的状态,如下所示:

@RestController
public class EventController {
    @Autowired
    public EventController(BindingsEndpoint bindingsEndpoint) {
        this.bindingsEndpoint = bindingsEndpoint;
    }
    @GetMapping("/changeState")
    public void sendMessage(@RequestParam("state") String state) {
        if (state.equals("paused")) {
            bindingsEndpoint.changeState("MY_BINDING", 
                           BindingsEndpoint.State.PAUSED);
        }
        if (state.equals("resumed")) {
            bindingsEndpoint.changeState("MY_BINDING", 
                           BindingsEndpoint.State.RESUMED);
        }
    }

这并不是我想要实现的,但已经足够接近了。

 类似资料:
  • 以下是我的情况: 我们有一个Spring cloud Stream 3 Kafka服务连接到同一个代理中的多个主题,但我想基于属性控制连接到特定主题。 每个主题都有自己的活页夹和绑定,但代理对所有人来说都是一样的。 我尝试使用下面的属性禁用绑定(这是我到目前为止找到的唯一解决方案),这适用于StreamListener不接收消息,但与主题的连接和重新平衡仍在发生。 我想知道活页夹级别是否有任何设置

  • 我用的是Spring boot 1.5.9.RELEASE和Spring cloud Edgware。跨微服务发布。 我使用注释绑定了一个消费者。注释将完成我使用事件的其余部分。 出现了一些手动配置主题名称和其他一些配置属性的需求,我希望在应用程序启动时覆盖application.properties中定义的一些消费者属性。 有什么直接的方法吗?

  • 使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp

  • 在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。 我们使用的是spring cloud stre

  • Spring cloud stream starter kafka在连接消费者时没有加载配置。以下是我在调试模式下运行控制台时在控制台中看到的配置: 我有以下引导yml文件的配置部分

  • 我试图使用ConsumerSeeKaware,阅读kafka主题中可用的最后一条消息。消息类型是Avro对象列表。我能成功地做到这一点。但在反序列化过程中会失败。该消息使用spring-cloud-stream-kafka框架生成。消息具有contentType。 我知道avro消息可以像下面这样反序列化。 但不管用。可能是因为两件事。 > 消息是avro对象的列表。但我正在尝试使用Avro模式创