当前位置: 首页 > 知识库问答 >
问题:

在Kafka Streams应用程序中,第二个输出流不再被写入

鞠征
2023-03-14

我目前正在实现一个Kafka Streams应用程序,其中我正在阅读一个主题并进行一些处理。在处理过程中,我将其分为两个流,一个写入一个主题(Avro模式),另一个是计数聚合(字数计数),将键/值对(字符串/长)写入另一个主题。代码之前运行良好,但最近第二个流不再编写。

在此代码示例中:

KStream<String, ProcessedSentence> sentenceKStream = stream
        .map((k,v) -> {
                [...]
        });

// configure serializers for publishing to topic
final Serde<ProcessedSentence> valueProcessedSentence = new SpecificAvroSerde<>();
valueProcessedSentence.configure(serdeConfig, false);
stringSerde.configure(serdeConfig, true);

// write to Specific Avro Record
sentenceKStream
        .to(EnvReader.KAFKA_SENTENCES_TOPIC_NAME_OUT,
                Produced.with(
                        Serdes.String(),
                        valueProcessedSentence));

句子流(sentenceKStream)编写正确,但单词计数分组出现了问题:

KStream<String, Long> wordCountKStream =
        sentenceKStream.flatMap((key, processedSentence) -> {
            List<KeyValue<String, Long>> result = new LinkedList<>();
            Map<CharSequence, Long> words = processedSentence.getWords();
            for (CharSequence word: words.keySet() ) {
                result.add(KeyValue.pair(word.toString(), words.get(word)));
            }
            return result;
        })
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
        .reduce(Long::sum)
        .toStream();

// write to Specific Avro Record
wordCountKStream
        .to(EnvReader.KAFKA_WORDS_COUNT_TOPIC_NAME_OUT,
                Produced.with(
                        Serdes.String(),
                        Serdes.Long()));

我真的不明白为什么不再写wordCountKStream了。

也许有人能提供一些帮助?我很乐意提供更多细节!

很多谢谢

更新:我发现两个新输出流中的数据都丢失了。实际上,一切都写得很正确,但是在写入数据后几分钟,两个主题都被删除了(还剩0字节)。

共有1个答案

狄河
2023-03-14

这与实现本身无关。我只是用

kafka-consumer-groups.sh --bootstrap-server [broker:port] --delete-offsets --group [group_name] --topic [topic_name]

解决了问题。在调试过程中,存储的偏移量出现了问题,并且与streams应用程序的多次重启相冲突。

对于那些想要列出组以查找存储的主题位置的人,请拨打电话

kafka-consumer-groups.sh --bootstrap-server node1:9092 --list

更新:不幸的是,删除组偏移量也没有正常工作。实际的问题是,输出主题中的新条目所用的时间戳是来自原始主题(已消费)的时间戳,它根本没有改变。因此,新条目的时间戳比默认保留时间早。

因为这个话题有一个主题。ms of-1(永久保留数据)和新主题(我认为是6天的标准),消耗的主题中的条目仍然存在,但生成的主题中的条目总是被删除,因为这些条目已超过6天。

最终的解决方案是改变保留率。输出主题从ms到-1。

提示:对于Streams应用程序,建议使用应用程序重置工具,而不是如上所示手动重置/删除偏移。

 类似资料:
  • 我的应用程序实现了VpnService来拦截流量并提供量身定制的响应。目标是处理到特定地址的流量,并丢弃其他请求。 目前,我成功地解析了传入的请求,构建并发送了响应。然而,问题是这些响应并不是对原始请求的实际响应;使用套接字连接进行测试只是超时。 为了进行这一区分,我目前正在解析VPN服务输入流中的原始IP数据包,如下所示: IpDatagram是一个类,通过它可以将字节数组解析为IP数据包的表示

  • 问题内容: 我正在尝试从JavaFx应用程序内部启动JavaFx应用程序,但是看起来Application.launch()只能被调用一次。这是否意味着我必须像exec(“ java …”中那样启动一个单独的JVM …还是还有另一种方法? 更多背景信息。我希望我的JavaFx应用程序能够构建和运行JavaFx应用程序。现在,它可以在内存中编译类,装入类…真的很不​​幸,不得不诉诸一切将文件写入文件

  • 我试图从JavaFx应用程序中启动JavaFx应用程序,但看起来application.launch()只能调用一次。这是否意味着我必须启动一个单独的JVM...就像在exec(“java.......或者还有其他方法吗? 作为次要问题...是否有方法打开另一个JavaFx窗口,获取stage并将其传递给我新编译和加载的应用程序子类?

  • 我有一个集群,我可以成功启动,至少这是出现在web UI上,我在其中看到这些信息 我收到这个消息 使用REST应用程序提交协议运行Spark。使用Spark默认的log4j配置文件:org/apache/spark/log4j-defaults.properties 16/08/31 16:59:06 INFO restsubmissionclient:提交请求以在Spark://name25:6

  • 我对使用KStream组件的Spring云流应用程序有问题。它正在监听一个输入并在处理后将消息定向到一个输出。 它期待一个JSON字符串进来,并在到达时尝试将其转换为Spring Tuple。将消息发送出去时会发生相反的情况。 问题是,当Sysadmin想要使用…并打印字符串 “哈哈” 在其中,整个Spring云流应用程序将立即消亡,只有以下例外: 我希望这个框架对这种行为至少有一定的容错能力。你