我正在尝试实现一个PollableConsumer,当我在SpringBoot应用程序中遇到endpoint时,它会在特定条件下开始轮询来自Kafka的消息。
在特定条件下,我尝试了多种触发民意调查的方法,但显然,只有当它不断地从Kafka主题进行民意调查时,它才会起作用。(就像spring cloud stream文档中的所有示例一样)
我正在寻找这样的东西:
public interface CustomProcessor {
@Input
PollableMessageSource input();
}
public void run() {
boolean result = true;
while (result) {
result = input.poll(m -> {
Event event = (Event) m.getPayload();
GenericMessage<Event> genericMessage = new GenericMessage<>(event, m.getHeaders());
eventMessageConsumer.consume(genericMessage);
}, new ParameterizedTypeReference<Event>() {
});
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
if (result) {
System.out.println("Success");
}
}
}
当我碰到这样的endpoint时可能会触发:
@GetMapping("/process")
public void process() {
SomeClass.run();
}
在您的代码示例中,我不确定输入了什么。poll返回,因为您将结果分配给布尔值,而这不是KafkaConsumer的结果。poll方法返回。
Kafka消费者可以根据需要暂停和重启,这不应该是问题。
来自最新的KafkaConsumer文档:
消耗流量控制
...
Kafka支持使用pause(Collection)和resume(Collection)对消费流进行动态控制,在将来的轮询(Duration)调用中分别暂停指定分配分区上的消费,并恢复指定暂停分区上的消费。
然后这来自暂停
方法描述:
请注意,此方法不影响分区订阅。特别是,使用自动分配时,不会导致组重新平衡。
https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
因此,在您的情况下,您可以简单地拥有一个易失性布尔值isPause
标志,如果更改了该标志,您将根据需要发出暂停
或恢复
命令。Kafka消费者将完成剩下的工作。
另外,请注意,KafkaConsumer不是线程安全的,因此您应该在轮询数据的同一线程上发出这些命令。
显然,目前使用spring cloud stream不可能暂停PollableConsumer,所以我回到了基于事件的消息消费,并使用执行器来控制绑定的状态。遵循此线程和spring cloud stream文档,我注入了BindingsEndpoint并更改了绑定的状态,如下所示:
@RestController
public class EventController {
@Autowired
public EventController(BindingsEndpoint bindingsEndpoint) {
this.bindingsEndpoint = bindingsEndpoint;
}
@GetMapping("/changeState")
public void sendMessage(@RequestParam("state") String state) {
if (state.equals("paused")) {
bindingsEndpoint.changeState("MY_BINDING",
BindingsEndpoint.State.PAUSED);
}
if (state.equals("resumed")) {
bindingsEndpoint.changeState("MY_BINDING",
BindingsEndpoint.State.RESUMED);
}
}
这并不是我想要实现的,但已经足够接近了。
以下是我的情况: 我们有一个Spring cloud Stream 3 Kafka服务连接到同一个代理中的多个主题,但我想基于属性控制连接到特定主题。 每个主题都有自己的活页夹和绑定,但代理对所有人来说都是一样的。 我尝试使用下面的属性禁用绑定(这是我到目前为止找到的唯一解决方案),这适用于StreamListener不接收消息,但与主题的连接和重新平衡仍在发生。 我想知道活页夹级别是否有任何设置
我用的是Spring boot 1.5.9.RELEASE和Spring cloud Edgware。跨微服务发布。 我使用注释绑定了一个消费者。注释将完成我使用事件的其余部分。 出现了一些手动配置主题名称和其他一些配置属性的需求,我希望在应用程序启动时覆盖application.properties中定义的一些消费者属性。 有什么直接的方法吗?
使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp
在我们的spring boot应用程序中,我们注意到Kafka消费者偶尔会在prod env中随机消费两次消息。我们在PCF中部署了6个实例和6个分区。我们发现在同一主题中收到两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产环境中很难复制。我们最近转向Kafka,但我们无法找到根本问题。 我们使用的是spring cloud stre
Spring cloud stream starter kafka在连接消费者时没有加载配置。以下是我在调试模式下运行控制台时在控制台中看到的配置: 我有以下引导yml文件的配置部分
我试图使用ConsumerSeeKaware,阅读kafka主题中可用的最后一条消息。消息类型是Avro对象列表。我能成功地做到这一点。但在反序列化过程中会失败。该消息使用spring-cloud-stream-kafka框架生成。消息具有contentType。 我知道avro消息可以像下面这样反序列化。 但不管用。可能是因为两件事。 > 消息是avro对象的列表。但我正在尝试使用Avro模式创