当前位置: 首页 > 知识库问答 >
问题:

将StateRestoreListener与Spring Cloud Kafka Streams绑定器一起使用

公冶嘉
2023-03-14

我打算将StateRestoreListener与Spring Cloud Kafka Streams绑定器一起使用。我需要监视应用程序的容错状态存储的恢复进度。汇流中有一个例子https://docs.confluent.io/current/streams/monitoring.html#streams-监控运行时状态。

为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.kafka.streams.processor.StateRestoreListener 接口的实例。您可以通过调用 KafkaStreams#setGlobalStateRestoreListener 方法来设置 org.apache.kafka.streams.processor.StateRestoreListener。

第一个问题是从应用程序获取Kafka Streams。我用

StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();

第二个问题是将StateRestoreListener设置为KafkaStreams,因为我收到错误

Java . lang . illegalstateexception:只能在创建状态下设置GlobalStateRestoreListener。当前状态为:正在运行

是否可以在Spring Cloud Kafka Streams绑定器中使用StateRestoreListener?谢谢

共有1个答案

澹台臻
2023-03-14

您可以通过使用< code > streamsbuilderfactorybean customizer 来实现这一点,该工具允许您访问基础< code > kafk streams 对象。如果您使用的是binder或更高版本,这是推荐的方法。例如,您可以在应用程序中提供以下< code>bean,并使用< code > GlobalStateRestoreListener 对其进行自定义。

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setGlobalStateRestoreListener(...);
            }
        });
    };
}

此博客提供有关此策略的更多详细信息。

 类似资料:
  • 我想通过代理服务器连接到Azure Service Bus消息队列。我在Spring应用程序中使用流绑定库 波姆。xml: 应用yml 我试图通过命令行参数提供HTTP和SOCKS代理设置,但这似乎不起作用。是否可以为“examplehost.servicebus.windows.net”的连接提供SOCKS或HTTP代理?

  • 我有一个名为LibraryConfig的带有注释的配置属性类。它使用内部类作为属性/配置结构的类型定义。当类是内部类而不是独立类时,我得到“元素[…]“未绑定”错误/异常。为什么会这样?我如何修复它? application.yml 库配置。Java语言

  • 问题内容: 因此,我一直在为这个(应该是)简单的练习而绞尽脑汁,以使该程序将日期字符串转换为对象,对其进行格式化,并在完成后将其作为字符串再次返回。 这是程序的最后一点,它从文件中获取一小段文本,将其分解为单独的记录,然后将记录分解为单独的数据并将它们分配给个人对象。 我已经在多个位置检查了该代码,并且该代码完全执行了应该执行的操作,直到调用了format函数(该函数抛出)为止。为对象分配了应该分

  • 问题内容: 我想在目录中获取具有特定扩展名的文件列表。在中,我看到了可以做到这一点的方法。 由于我需要特定的扩展名,因此我创建了一个。但是,当我与此一起使用时,出现编译错误。我以为自以来,我应该能够做到这一点。代码如下: 最后一行显示编译错误: 类型的方法不适用于类型的参数 我正在尝试使用,不是。为何编译器无法识别这一点? 如果我编写自己的扩展筛选器,则此方法有效。我宁愿使用而不愿自己写。我究竟做

  • 问题内容: 我正在尝试在我的watchKit应用中使用firebase数据库。我已经在我的iPhone应用程序上开发了此功能,但是发现在我的Watch应用程序上很难做到这一点。当我尝试将firebase导入watch应用程序的VC类中时,它正在创建error 。 可以在Watch app中使用Firebase吗? 问题答案: 可悲的是,没有支持,并由于这样的事实,有没有支持在这些版本中,并高度依赖

  • 问题内容: 当请求来自Ajax.ActionLink(使用Http方法发布)时,是否可以在控制器操作上使用ValidateAntiForgeryToken属性。替代方法似乎是手动滚动JQuery Ajax请求,但我很好奇MVC Ajax框架中是否有办法。 问题答案: 我还没看过。您必须将令牌放入POST中记录的数据中。每次都使用相同的防伪令牌ID(或名称,我不记得了),但是您必须非常小心,并确保您