当前位置: 首页 > 知识库问答 >
问题:

Spring-Cloud-Stream 仅在应用程序启动时实现的状态存储完全填充并准备就绪后处理 kafka 消息

查宜修
2023-03-14

参考这个解决方案,我的Spring云流应用程序.yml文件具有以下配置:

#application.yml

spring.cloud.stream.bindings.input:
  destination: my-topic-name
  contentType: application/json
  consumer:
    useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.input:
  consumer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    materializedAs: my-store

在我的主应用程序中,类注释为@EnableB

ReadOnlyKeyValueStore<Object, String> store;

 input.process(() -> new Processor<Object, Product>() {

                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");


                }

                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }

                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");

问题是,在应用程序第一次启动时,状态存储尚未完全填充,也尚未准备就绪(例如空状态存储),但消息仍然更早到达,并且正在由Kafka流拓扑处理,结果出乎意料。

我们如何确保在第一次启动应用程序时,在@StreamListener中定义的任何流处理拓扑开始处理传入消息之前,所有或特定(如果可能,用户可以定义)已被指示使用application.yml文件中的materializedAs进行具体化的状态存储已完全填充并准备就绪。我们可以强制消息的流处理一直等到应用程序第一次启动时状态存储被完全填充吗?

我试图通过修改其中一个 spring-cloud-stream 示例来复制该问题,并将修改后的版本推送到此处。有关此的更多详细信息讨论也可以在这里找到

共有1个答案

艾原
2023-03-14

您似乎需要使用GlobalKTable(状态存储)加入流,而不是使用进程接口。这里是Kstream join GlobalKTable。请尝试一下。

 类似资料:
  • 我在玩春云流和RabbitMQ。 我有一个生成消息的RESTendpoint。 通过另一个应用程序,我正在消费这些消息。 当两个应用程序都启动并运行时。我可以发布消息并在消费者处使用它。 我在以下场景中面临的问题: 我故意让消费者失望,并通过制作人发布消息。现在,当消费者启动时,它没有收到任何消息 我想RabbitMQ保证消息传递 Github链接https://github.com/govi20

  • 我正在尝试在应用程序启动时使用spring cloud stream向rabbitmq发送消息。使用下面的示例代码。 启用绑定的SpringBoot应用程序 应在启动时发送消息的应用程序运行程序 除非我取消注释上面注释掉的代码,否则上面的代码会产生下面的异常。 我正在使用Spring Boot 2.0.2和Sprig Cloud Stream 2.0.0,如下面的pom所示 根据这个公认的答案ht

  • 我正在尝试使用启用批处理模式的spring cloud stream实现DLQ 但有一些疑问: > 如何使用属性配置键/值序列化程序-我的消息是String类型,但KafkaOperations使用的是ByteArraySerializer 在批处理中,有多个消息,但如果第一条消息失败,它会转到DLQ,但看不到下一条消息的处理。 要求-如果批处理失败,我只需要将该消息发送到DLQ,并且应该再次处理

  • 我将为消息实现断路器模式。基本要求是,如果微服务无法将消息发布到发布主题,则应停止接受来自其他 Kafka 主题的消息。当发布主题可用时,微服务应开始接受来自其他 Kafka 主题的消息。 有没有一种方法可以在Spring BootKafka Streams中实现这一点?

  • 使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp

  • 我正在尝试实现一个PollableConsumer,当我在SpringBoot应用程序中遇到endpoint时,它会在特定条件下开始轮询来自Kafka的消息。 在特定条件下,我尝试了多种触发民意调查的方法,但显然,只有当它不断地从Kafka主题进行民意调查时,它才会起作用。(就像spring cloud stream文档中的所有示例一样) 我正在寻找这样的东西: 当我碰到这样的endpoint时可