我正在迁移一个Kafka Streams实现,它使用纯Kafka apis来使用sping-kafka,因为它被合并在sping-引导应用程序中。
一切都很好Stream,GlobalKtable,分支,我所有的工作都非常好,但我很难合并ReadOnlyKeyValueStore。基于这里的sping-kafka留档:https://docs.spring.io/spring-kafka/docs/2.6.10/reference/html/#streams-spring
它说:
如果需要直接执行一些KafkaStreams操作,可以使用StreamsBuilderFactoryBean.getKafkaStream()访问该内部KafkaStreams实例。您可以按类型自动连接StreamsBuilderFactoryBean bean,但应确保在bean定义中使用完整类型。
基于此,我试图将其合并到我的示例中,如下所示:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = defaultStreamsConfigs();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "quote-stream");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "stock-quotes-stream-group");
return new KafkaStreamsConfiguration(props);
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration defaultKafkaStreamsConfig) {
return new StreamsBuilderFactoryBean(defaultKafkaStreamsConfig);
}
…
final GlobalKTable<String, LeveragePrice> leverageBySymbolGKTable = streamsBuilder
.globalTable(KafkaConfiguration.LEVERAGE_PRICE_TOPIC,
Materialized.<String, LeveragePrice, KeyValueStore<Bytes, byte[]>>as("leverage-by-symbol-table")
.withKeySerde(Serdes.String())
.withValueSerde(leveragePriceSerde));
leveragePriceView = myKStreamsBuilder.getKafkaStreams().store("leverage-by-symbol-table", QueryableStoreTypes.keyValueStore());
但是添加StreamsBuilderFactoryBean(似乎需要它来获取对KafkaStreams的引用)定义会导致错误:
The bean 'defaultKafkaStreamsBuilder', defined in class path resource [com/resona/springkafkastream/repository/KafkaConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.class] and overriding is disabled.
问题是我不想控制流的生命周期,这是我用普通Kafka APIs得到的,所以我想得到一个对默认托管流的引用,因为我希望spring来管理它,但每当我试图公开bean时,它都会给出错误。有什么想法是正确的方法来使用Spring-Kafka吗?
P. S-我不感兴趣的解决方案使用sping-Cloud-流我正在寻找sping-kafka的实现。
您不需要定义任何新的bean;像这样的东西应该可以工作…
spring.application.name=quote-stream
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
@SpringBootApplication
@EnableKafkaStreams
public class So69669791Application {
public static void main(String[] args) {
SpringApplication.run(So69669791Application.class, args);
}
@Bean
GlobalKTable<String, String> leverageBySymbolGKTable(StreamsBuilder sb) {
return sb.globalTable("gkTopic",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>> as("leverage-by-symbol-table"));
}
private ReadOnlyKeyValueStore<String, String> leveragePriceView;
@Bean
StreamsBuilderFactoryBean.Listener afterStart(StreamsBuilderFactoryBean sbfb,
GlobalKTable<String, String> leverageBySymbolGKTable) {
StreamsBuilderFactoryBean.Listener listener = new StreamsBuilderFactoryBean.Listener() {
@Override
public void streamsAdded(String id, KafkaStreams streams) {
leveragePriceView = streams.store("leverage-by-symbol-table", QueryableStoreTypes.keyValueStore());
}
};
sbfb.addListener(listener);
return listener;
}
@Bean
KStream<String, String> stream(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream("someTopic");
stream.to("otherTopic");
return stream;
}
}
我在读取KTABLE时得到了LongDeserializer异常。 我正在使用 该项目的github链接可在https://github.com/jaysara/kstreamanalytics获得
问题内容: 我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。 spark-avro外部模块可以为读取avro文件提供以下解决方案: 但是,我需要阅读流式Avro消息。库文档建议使用 from_avro() 函数,该函数仅适用于Scala和Java。 是否有其他模块支持读取从Kafka流式传输的Avro消息? 问题答案: 您可以包括spark-avro软件包,例如使用(调整版本
目前我们正在使用:Kafka Streams API(版本1.1.0)来处理来自Kafka集群的消息(3个代理,每个主题3个分区,复制因子为2)。安装的Kafka版本为1.1.1。 最终用户向我们报告数据消失的问题。他们报告说,突然之间他们看不到任何数据(例如,昨天他们可以在UI中看到n条记录,而第二天的morning table是空的)。我们检查了这个特定用户的changelog主题,看起来很奇
这是一个场景:我知道,使用与Spring kafka相关的最新API(如Spring集成kafka 2.10),我们可以执行以下操作: 以及来自与相同kafka主题相关的不同分区的读取。 我想知道我们是否可以使用同样的方法,例如spsping-集成-Kafka1.3.1 我没有找到任何关于如何做到这一点的提示(我对xml版本很感兴趣)。
我正在运行Strom集群,其中2个主管和1个灵气正在运行。我在哪里读Kafka与主题ID"topic1"。但在UI上我得到以下错误 JAVARuntimeException:java。lang.RuntimeException:org。阿帕奇。动物园管理员。KeeperException$NoNodeException:KeeperErrorCode=storm中/brokers/topic/to
我处理问题正确吗? (mapValues->只保留“before”/“after”字段。groupBy->将ID作为消息的键。Aggregate->?)