我的流服务执行的操作很少:
stream
.map(doSomething)
.filter(filterSomething)
.groupBy(groupMyStuffs)
.aggregate(Map.empty[String, Object])(aggregation)
.mapValues((k, v) => parseAggResults(k, v))
.toStream
.flatMap((_, v) => v)
.to(outputTopic)
在进行测试时,我发现我的服务在mapvalues
调用函数toStream
后中断了,该函数将把我的数据写入由Kafka Streams将KTable转换为Kafka Streams创建的新主题。
我检查了KStreams创建的主题,主题就在那里:
myconsumergroup-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog
myconsumergroup-KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition
我发现response
有三个输入,即topicmetadata
,我不知道第三个输入是什么:
(type=TopicMetadata, error=NONE, topic=myconsumergroup-KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition, isInternal=false, partitionMetadata=[(type=PartitionMetadata, error=NONE, partition=0, leader=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), replicas=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), isr=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), offlineReplicas=)])
(type=TopicMetadata, error=NONE, topic=myconsumergroup-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog, isInternal=false, partitionMetadata=[(type=PartitionMetadata, error=NONE, partition=0, leader=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), replicas=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), isr=xxx.xxx.xxx.xxx:9092 (id: 0 rack: null), offlineReplicas=)])
(type=TopicMetadata, error=INVALID_TOPIC_EXCEPTION, topic=, isInternal=false, partitionMetadata=[])
为了确保所有内容都被覆盖,这里是我的配置:
logger.info(s"Loading Kafka configurations")
logger.info(s"Kafka Connection with: ${getEnvVar("KAFKA_PROTOBUF_CONN")}")
logger.info(s"Consumer Name: ${getEnvVar("CONSUMER_STREAM_NAME")}")
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, getEnvVar("CONSUMER_STREAM_NAME"))
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, getEnvVar("KAFKA_PROTOBUF_CONN"))
settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getEnvVar("AUTO_OFFSET_RESET_CONFIG"))
settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass.getName)
settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass.getName)
settings.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10.seconds)
settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0")
//Added to avoid messages created by old producers
settings.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.WallclockTimestampExtractor")
settings.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "1000")
settings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all")
settings.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), 20.mb)
if (!isLocalRun)
settings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3")
我的问题是,我们的部署正在工作,突然所有的东西都开始出现这个错误:
[kafka-producer-network-thread | myconsumergroup-1d0237ae-6caa-4cbd-aeaa-2154d2303b32-StreamThread-1-producer] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=myconsumergroup-1d0237ae-6caa-4cbd-aeaa-2154d2303b32-StreamThread-1-producer] Error while fetching metadata with correlation id 9 : {=INVALID_TOPIC_EXCEPTION}
我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我
如何识别主题的KTable物化何时完成? 例如,假设KTable只有几百万行。下面的伪代码: 在某个时间点,我想安排一个线程来调用以下内容,该内容写入主题:kt.toStream().to(“output_topic_name”); 跟进问题: 约束 1)好的,我看到kstream和ktable在kafkastream启动后是无界/无限的。但是,ktable物化(压缩主题)不会在指定的时间段内为同
假设我将一个KStream聚合到一个KTable,将一个KStream聚合到一个KTable。和都不传递空值(删除事件被聚合为快照的状态属性)。此时,我们可以假设对于和聚合都有一个持久化的kafka changelog主题和一个rocksDB本地存储。然后,我的拓扑将与连接起来,生成一个连接的。也就是说,我的问题是和物化生命周期(包括changelog主题和本地rocksdb存储)。假设主题和主题
使用Kafka流DSL是否可行?所有正在使用的主题都是,因此我希望模拟一个表,并且永远不要摆脱旧的值。 TL;DR;如何将一条消息转换成多条消息?
目前我们正在使用:Kafka Streams API(版本1.1.0)来处理来自Kafka集群的消息(3个代理,每个主题3个分区,复制因子为2)。安装的Kafka版本为1.1.1。 最终用户向我们报告数据消失的问题。他们报告说,突然之间他们看不到任何数据(例如,昨天他们可以在UI中看到n条记录,而第二天的morning table是空的)。我们检查了这个特定用户的changelog主题,看起来很奇
我正在使用Apache Kafka streaming对从Kafka主题中消耗的数据进行聚合。然后,聚合被序列化到另一个主题,它本身被使用,结果存储在一个DB中。我想是很经典的用例吧。 聚合调用的结果是创建一个由Kafka变更日志“主题”备份的KTable。 这实际上是很好的/必要的,因为这避免了当将来的事件带有相同的键时丢失我的聚合状态。 然而,从长远来看,这意味着这个变更日志将永远增长(随着更