当前位置: 首页 > 知识库问答 >
问题:

如何暂停(打开/关闭)流处理W/Spring Cloud Streams&Kafka Streams绑定器?

吕永嘉
2023-03-14

到目前为止我试过什么?-使用/Actuator/Bindingsendpoint启动/停止kafka-streams绑定。这似乎不适用于Kafka-Streams活页夹,只适用于Kafka活页夹:(。

任何帮助都将不胜感激!谢了!

共有1个答案

亢正德
2023-03-14

Kafka Streams绑定器不支持开箱即用的用于控制流处理的执行器绑定endpoint。这个用例以前出现过。

如果您可以在Kafka streams处理器前面添加额外的输入/输出主题(以及取决于许多因素的潜在延迟),那么有一种方法可以解决这个问题。请参见此处添加的注释。

其基本思想是,第一个处理器是一个简单的passthrough处理器,它不使用Kafka流,而是Spring Cloud Stream中基于消息传递的标准绑定器。在那里,您可以使用执行器绑定endpoint来控制事件流。该处理器的输出成为Kafkfa流处理器的输入。

同样,实现此模式并不需要太多代码(可能是3或4行),但根据应用程序的需求和吞吐量,可能会有性能影响。然而,如果这不是一个问题,这是一个模式,您可以尝试。

 类似资料:
  • 问题内容: 当我打开自定义JDialog以及关闭对话框以再次继续时,如何使我的应用程序暂停。 问题答案: 只需使用: 我通常从的构造函数中调用它。 请参阅中的Javadocs 。 http://java.sun.com/javase/6/docs/api/java/awt/Dialog.html#setModal(boolean) 这将导致执行在当前线程上阻塞,直到对话框关闭。 或者,您可以使用:

  • 通过使用会话窗口与相当高级的组一起运行流数据流管道,在运行几个小时后,我遇到了问题。工作在workers中扩展,但后来开始获得日志负载,其内容如下 记录此代码的转换位于“group by”-块之后,并执行对外部服务的异步HTTP调用(使用)。 你知道为什么会这样吗?与异步、伸缩或按策略分组有关? 作业ID:2018-01-29_03_13_40-12789475517328084866 SDK:A

  • 我们正在使用spring cloude stream 2.0 现在在Spring Cloud stream 2.0中,有一种使用执行器管理绑定器生命周期的方法:绑定可视化和控制 是否可以从代码控制绑定器的生命周期,这意味着在目标服务器关闭的情况下,绑定器,当它启动时,?

  • 我有一个Flink流应用程序,需要能够“暂停”和“取消暂停”对特定键控流的处理。“处理”意味着只是在流上执行一些简单的异常检测。 我们正在考虑的flow是这样工作的: 命令流,可以是ProcessCommand、PauseCommand或ResumeCommand,每个命令都有一个用于按键的id。 处理命令将检查按键在处理前是否暂停,如果没有暂停,则检查缓冲区。 暂停命令(PauseCommand

  • 我复制了一个模板从coDepen。这段代码非常过时,有一些旧的cdn链接。这意味着我必须找到这些链接的更新版本。我是如此接近使应用程序完全功能,我需要打开后关闭模式。我相信这和自举有关。链接和模态代码如下。

  • 问题内容: 我可以轻松进行“作业”,但是我发现输入流的关闭存在一些问题。简单地说,我必须使用Java创建一个联系人“列表”应用程序,才能以正确的方式使用多态。所以我有一个Contact类和一个Private类(contact)。在这两个类中,都有一个Modify方法来更改变量的值。 这是不会产生问题的Contact方法 相反,这是Private中方法的替代。首先,我创建一个Private对象,然后