我正在使用@StreamListener(Spring-Cloud-Stream)来使用来自主题(输入通道)的消息,进行一些处理并保存到一些缓存或数据库中。
我的要求是,如果DB在处理消费的消息时停止,我想暂停主消费者(输入通道),并从另一个主题(输入56通道)开始消费,一旦它消费了来自输入56通道的所有消息(没有很多),我想再次恢复主消费者(输入通道)。
这能做到吗??
不推荐使用@StreamListener;您应该转换为函数式编程模型。
下面是一个使用该模型的示例(但同样的技术也适用于不推荐使用的侦听器)。
spring.cloud.function.definition=input1;input2
spring.cloud.stream.bindings.input1-in-0.group=grp1
spring.cloud.stream.bindings.input2-in-0.consumer.auto-startup=false
spring.cloud.stream.bindings.input2-in-0.group=grp2
spring.cloud.stream.kafka.bindings.input2-in-0.consumer.idle-event-interval=5000
@SpringBootApplication
public class So69726610Application {
public static void main(String[] args) {
SpringApplication.run(So69726610Application.class, args);
}
boolean dbIsDown = true;
@Autowired
BindingsLifecycleController controller;
TaskExecutor exec = new SimpleAsyncTaskExecutor();
@Bean
public Consumer<String> input1() {
return str -> {
System.out.println(str);
if (this.dbIsDown) {
this.controller.changeState("input1-in-0", State.PAUSED);
this.controller.changeState("input2-in-0", State.STARTED);
throw new RuntimeException("Paused");
}
};
}
@Bean
public Consumer<String> input2() {
return System.out::println;
}
@EventListener
public void idle(ListenerContainerIdleEvent event) {
System.out.println(event);
// assumes concurrency = 1 (default)
if (event.getListenerId().contains("input2-in-0")) {
this.controller.changeState("input1-in-0", State.RESUMED);
this.exec.execute(() -> this.controller.changeState("input2-in-0", State.STOPPED));
}
}
}
我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决
我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。
如何在apache/kafka中使用regex消费所有主题?我尝试了上面的代码,但不起作用。
我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
我们有一个传入的kafka主题,多个基于Avro模式的消息序列化到其中。 我们需要将Avro格式的消息拆分为多个其他kafka主题,基于某个公共模式属性的值。 想了解如何实现它,同时避免在汇流平台上构建中间客户端来进行这种拆分/路由。