当前位置: 首页 > 知识库问答 >
问题:

在Spring启动中暂停/启动Kafka Stream处理器

岳彬炳
2023-03-14

我将为消息实现断路器模式。基本要求是,如果微服务无法将消息发布到发布主题,则应停止接受来自其他 Kafka 主题的消息。当发布主题可用时,微服务应开始接受来自其他 Kafka 主题的消息。

有没有一种方法可以在Spring BootKafka Streams中实现这一点?

共有1个答案

太叔坚
2023-03-14

我能够通过使用BindingsEndpoint实现这一点。

private final BindingsEndpoint binding;

@Override
public void stop() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Stopping Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::stop);
        log.info("Stopped Kafka topics ");
    }
}

@Override
public void start() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Starting Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::start);
        log.info("Started Kafka topics ");
    }
}

protected List<Binding> getBindings(List<?> objects) {
    return objects.stream().filter(object -> object instanceof Binding)
            .map(object -> (Binding) object).collect(Collectors.toList());
}
 类似资料:
  • 下面的spring批处理作业带有一个分区步骤,它为一个分区步骤创建3600个分区。我使用的ThreadPoolTaskExecutor的最大池大小为100,队列容量为100(尽管这似乎对速度没有什么影响)。Im使用Visual VM监视线程,我注意到taskExecutor线程在启动作业后超过5分钟才启动。 奇怪的是,如果我将分区的数量限制为100,那么线程启动得相当快,大约在一分钟内完成。 我注

  • 几个月前,我用Spring Batch制作了一个项目。 该项目工作正常,包括JobExecution决策器的实现 这使fini仅与Spring Batch配合使用。 现在我必须将其用于Spring Boot批处理。在决策步骤之前,所有流程都运行良好。其中,我返回了良好的FlowExecutionStatus,但我不知道为什么,作业以“失败”状态完成。 有人知道为什么不工作了? 谢谢

  • 在WPF应用程序中,我有一个线程池(每个线程都是对返回base 64映像的REST webservice的调用),我想精确地“控制”这些线程。 这些线程是异步的,我希望能够暂停/恢复它们中的每一个。 > < li> 我想使用CancellationToken之类的东西不是一个选项,因为每个线程都只是进行一次调用并等待响应。 我只是在课堂线程中看到的= 如果有人有什么建议呢? 提前谢谢。

  • 我有一个包含两个功能的处理应用程序。第一个被称为加载屏幕。第二个叫做主菜单。当应用程序初始化时,它调用函数“loadScreen();”。我在这个函数中设置了一个计时器,这样5秒钟后,它就会跳转到“主菜单”。问题是,如何停止我的函数并调用另一个函数?有没有“Rest”或者我可以使用的“停止”功能?谢谢

  • Triathlon程序执行一个长时间运行的任务,如果该任务已完全执行,则有可能重新启动该任务。我想添加停止执行以重置UI的可能性。为了达到这个目的,我增加了一个新的按钮,停止。代码如下: 如果任务已经完成,程序很好地重新启动,但是如果我在停止它之后调用start,程序就会崩溃。我该纠正什么?

  • 我在启动spring boot应用程序时遇到以下错误。这是我的第一个spring boot项目。因此,我不确定错误以及如何修复它。 申请启动失败 描述: 配置为侦听端口8080的Tomcat连接器无法启动。端口可能已在使用中,或者连接器可能配置错误。 行动: 验证连接器的配置,识别并停止在端口8080上侦听的任何进程,或者将此应用程序配置为在另一个端口上侦听。