当前位置: 首页 > 知识库问答 >
问题:

使用spring-kafka从KTable读取OnlyKeyValueStore

梁丘伟
2023-03-14

我正在迁移一个Kafka Streams实现,它使用纯Kafka apis来使用sping-kafka,因为它被合并在sping-引导应用程序中。

一切都很好Stream,GlobalKtable,分支,我所有的工作都非常好,但我很难合并ReadOnlyKeyValueStore。基于这里的sping-kafka留档:https://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#streams-spring

它说:

如果需要直接执行一些KafkaStreams操作,可以使用StreamsBuilderFactoryBean.getKafkaStream()访问该内部KafkaStreams实例。您可以按类型自动连接StreamsBuilderFactoryBean bean,但应确保在bean定义中使用完整类型。

基于此,我试图将其合并到我的示例中,如下所示:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = defaultStreamsConfigs();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "quote-stream");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "stock-quotes-stream-group");
        return new KafkaStreamsConfiguration(props);
    }

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
    public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration defaultKafkaStreamsConfig) {
        return new StreamsBuilderFactoryBean(defaultKafkaStreamsConfig);
    }

final GlobalKTable<String, LeveragePrice> leverageBySymbolGKTable = streamsBuilder
            .globalTable(KafkaConfiguration.LEVERAGE_PRICE_TOPIC,
                    Materialized.<String, LeveragePrice, KeyValueStore<Bytes, byte[]>>as("leverage-by-symbol-table")
                            .withKeySerde(Serdes.String())
                            .withValueSerde(leveragePriceSerde));

leveragePriceView = myKStreamsBuilder.getKafkaStreams().store("leverage-by-symbol-table", QueryableStoreTypes.keyValueStore());

但是添加StreamsBuilderFactoryBean(似乎需要它来获取对KafkaStreams的引用)定义会导致错误:

The bean 'defaultKafkaStreamsBuilder', defined in class path resource [com/resona/springkafkastream/repository/KafkaConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.class] and overriding is disabled.

问题是我不想控制流的生命周期,这是我用普通Kafka APIs得到的,所以我想得到一个对默认托管流的引用,因为我希望spring来管理它,但每当我试图公开bean时,它都会给出错误。有什么想法是正确的方法来使用Spring-Kafka吗?

P. S-我不感兴趣的解决方案使用sping-Cloud-流我正在寻找sping-kafka的实现。

共有1个答案

詹弘毅
2023-03-14

您不需要定义任何新的bean;像这样的东西应该可以工作…

spring.application.name=quote-stream
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
@SpringBootApplication
@EnableKafkaStreams
public class So69669791Application {

    public static void main(String[] args) {
        SpringApplication.run(So69669791Application.class, args);
    }

    @Bean
    GlobalKTable<String, String> leverageBySymbolGKTable(StreamsBuilder sb) {
        return sb.globalTable("gkTopic",
                Materialized.<String, String, KeyValueStore<Bytes, byte[]>> as("leverage-by-symbol-table"));
    }

    private ReadOnlyKeyValueStore<String, String> leveragePriceView;

    @Bean
    StreamsBuilderFactoryBean.Listener afterStart(StreamsBuilderFactoryBean sbfb,
            GlobalKTable<String, String> leverageBySymbolGKTable) {

        StreamsBuilderFactoryBean.Listener listener = new StreamsBuilderFactoryBean.Listener() {

            @Override
            public void streamsAdded(String id, KafkaStreams streams) {
                leveragePriceView  = streams.store("leverage-by-symbol-table", QueryableStoreTypes.keyValueStore());
            }

        };
        sbfb.addListener(listener);
        return listener;
    }

    @Bean
    KStream<String, String> stream(StreamsBuilder builder) {
        KStream<String, String> stream = builder.stream("someTopic");
        stream.to("otherTopic");
        return stream;
    }

}
 类似资料:
  • 我在读取KTABLE时得到了LongDeserializer异常。 我正在使用 该项目的github链接可在https://github.com/jaysara/kstreamanalytics获得

  • 问题内容: 我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。 spark-avro外部模块可以为读取avro文件提供以下解决方案: 但是,我需要阅读流式Avro消息。库文档建议使用 from_avro() 函数,该函数仅适用于Scala和Java。 是否有其他模块支持读取从Kafka流式传输的Avro消息? 问题答案: 您可以包括spark-avro软件包,例如使用(调整版本

  • 目前我们正在使用:Kafka Streams API(版本1.1.0)来处理来自Kafka集群的消息(3个代理,每个主题3个分区,复制因子为2)。安装的Kafka版本为1.1.1。 最终用户向我们报告数据消失的问题。他们报告说,突然之间他们看不到任何数据(例如,昨天他们可以在UI中看到n条记录,而第二天的morning table是空的)。我们检查了这个特定用户的changelog主题,看起来很奇

  • 这是一个场景:我知道,使用与Spring kafka相关的最新API(如Spring集成kafka 2.10),我们可以执行以下操作: 以及来自与相同kafka主题相关的不同分区的读取。 我想知道我们是否可以使用同样的方法,例如spsping-集成-Kafka1.3.1 我没有找到任何关于如何做到这一点的提示(我对xml版本很感兴趣)。

  • 我正在运行Strom集群,其中2个主管和1个灵气正在运行。我在哪里读Kafka与主题ID"topic1"。但在UI上我得到以下错误 JAVARuntimeException:java。lang.RuntimeException:org。阿帕奇。动物园管理员。KeeperException$NoNodeException:KeeperErrorCode=storm中/brokers/topic/to

  • 我处理问题正确吗? (mapValues->只保留“before”/“after”字段。groupBy->将ID作为消息的键。Aggregate->?)