我目前正在实现一个Kafka Streams应用程序,其中我正在阅读一个主题并进行一些处理。在处理过程中,我将其分为两个流,一个写入一个主题(Avro模式),另一个是计数聚合(字数计数),将键/值对(字符串/长)写入另一个主题。代码之前运行良好,但最近第二个流不再编写。
在此代码示例中:
KStream<String, ProcessedSentence> sentenceKStream = stream
.map((k,v) -> {
[...]
});
// configure serializers for publishing to topic
final Serde<ProcessedSentence> valueProcessedSentence = new SpecificAvroSerde<>();
valueProcessedSentence.configure(serdeConfig, false);
stringSerde.configure(serdeConfig, true);
// write to Specific Avro Record
sentenceKStream
.to(EnvReader.KAFKA_SENTENCES_TOPIC_NAME_OUT,
Produced.with(
Serdes.String(),
valueProcessedSentence));
句子流(sentenceKStream
)编写正确,但单词计数分组出现了问题:
KStream<String, Long> wordCountKStream =
sentenceKStream.flatMap((key, processedSentence) -> {
List<KeyValue<String, Long>> result = new LinkedList<>();
Map<CharSequence, Long> words = processedSentence.getWords();
for (CharSequence word: words.keySet() ) {
result.add(KeyValue.pair(word.toString(), words.get(word)));
}
return result;
})
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream();
// write to Specific Avro Record
wordCountKStream
.to(EnvReader.KAFKA_WORDS_COUNT_TOPIC_NAME_OUT,
Produced.with(
Serdes.String(),
Serdes.Long()));
我真的不明白为什么不再写wordCountKStream
了。
也许有人能提供一些帮助?我很乐意提供更多细节!
很多谢谢
更新:我发现两个新输出流中的数据都丢失了。实际上,一切都写得很正确,但是在写入数据后几分钟,两个主题都被删除了(还剩0字节)。
这与实现本身无关。我只是用
kafka-consumer-groups.sh --bootstrap-server [broker:port] --delete-offsets --group [group_name] --topic [topic_name]
解决了问题。在调试过程中,存储的偏移量出现了问题,并且与streams应用程序的多次重启相冲突。
对于那些想要列出组以查找存储的主题位置的人,请拨打电话
kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
更新:不幸的是,删除组偏移量也没有正常工作。实际的问题是,输出主题中的新条目所用的时间戳是来自原始主题(已消费)的时间戳,它根本没有改变。因此,新条目的时间戳比默认保留时间早。
因为这个话题有一个主题。ms of-1(永久保留数据)和新主题(我认为是6天的标准),消耗的主题中的条目仍然存在,但生成的主题中的条目总是被删除,因为这些条目已超过6天。
最终的解决方案是改变保留率。输出主题从ms到-1。
提示:对于Streams应用程序,建议使用应用程序重置工具,而不是如上所示手动重置/删除偏移。
我的应用程序实现了VpnService来拦截流量并提供量身定制的响应。目标是处理到特定地址的流量,并丢弃其他请求。 目前,我成功地解析了传入的请求,构建并发送了响应。然而,问题是这些响应并不是对原始请求的实际响应;使用套接字连接进行测试只是超时。 为了进行这一区分,我目前正在解析VPN服务输入流中的原始IP数据包,如下所示: IpDatagram是一个类,通过它可以将字节数组解析为IP数据包的表示
提前谢了。
问题内容: 我正在尝试从JavaFx应用程序内部启动JavaFx应用程序,但是看起来Application.launch()只能被调用一次。这是否意味着我必须像exec(“ java …”中那样启动一个单独的JVM …还是还有另一种方法? 更多背景信息。我希望我的JavaFx应用程序能够构建和运行JavaFx应用程序。现在,它可以在内存中编译类,装入类…真的很不幸,不得不诉诸一切将文件写入文件
我试图从JavaFx应用程序中启动JavaFx应用程序,但看起来application.launch()只能调用一次。这是否意味着我必须启动一个单独的JVM...就像在exec(“java.......或者还有其他方法吗? 作为次要问题...是否有方法打开另一个JavaFx窗口,获取stage并将其传递给我新编译和加载的应用程序子类?
我有一个集群,我可以成功启动,至少这是出现在web UI上,我在其中看到这些信息 我收到这个消息 使用REST应用程序提交协议运行Spark。使用Spark默认的log4j配置文件:org/apache/spark/log4j-defaults.properties 16/08/31 16:59:06 INFO restsubmissionclient:提交请求以在Spark://name25:6
我对使用KStream组件的Spring云流应用程序有问题。它正在监听一个输入并在处理后将消息定向到一个输出。 它期待一个JSON字符串进来,并在到达时尝试将其转换为Spring Tuple。将消息发送出去时会发生相反的情况。 问题是,当Sysadmin想要使用…并打印字符串 “哈哈” 在其中,整个Spring云流应用程序将立即消亡,只有以下例外: 我希望这个框架对这种行为至少有一定的容错能力。你