当前位置: 首页 > 知识库问答 >
问题:

用Kafka溪流计数

吕亮
2023-03-14

我对流媒体有一个普遍的问题,但对于问题的范围,让我们限制自己使用Kafka Streams。让我们进一步缩小范围,将我们的问题局限于单词计数,或者可能是一般的计数。假设我有一个由某个键和一个值组成的流,键可以是一个字符串(假设我们可以有很多字符串,除了空字符串,由世界上的任何字符组成),值是一个整数,现在我们正在构建一个单词计数应用程序,如果词汇表中的单词总数是一万亿,我们不能将它们存储在本地缓存中。如果看到一个单词w的值x,我需要将w现有的计数更新为x,假设x是以前的计数,我将如何构建此应用程序。我无法在KTable或Kafka本地的任何其他本地存储中存储万亿个单词,我将如何构建这个应用程序?我对Streams或其工作方式的理解是否错误。

共有2个答案

轩辕翰
2023-03-14

(马蒂亚斯·萨克斯在回答中说的话。)

另一种方法是使用概率计数,它具有显着降低的存储和内存占用;即,使用概率数据结构,如Count-min Sketch(CMS),而不是线性数据结构,如Kafka Streams'Ktable或JavaHashMap

有一个名为ProbabilisticCounting的示例演示了如何在Kafka Streams中使用CMS执行概率计数:https://github.com/confluentinc/kafka-streams-examples(Confluent Platform 5.2.1/Apache Kafka 2.2.1版的直接链接)

我已经成功地将概率计数用于密钥空间非常大的类似用例(在您的例子中:数万亿个密钥)。

冷俊健
2023-03-14

由于 Kafka Streams 是水平扩展的,因此您可以根据需要部署任意数量的应用程序实例。因此,实际上应该可以构建此应用程序。请注意,KTable 状态将在所有计算机上分片。

如果您假设一万亿个密钥,每个密钥大约100个字节,您将需要大约100 TB的存储空间。为了给一些头部空间,实际上您可能希望提供200 TB。因此,每个2 TB的100个实例应该可以完成这项工作。

为此,您的输入主题需要有100个分区,尽管这对Kafka来说不是问题。

 类似资料:
  • 我写了一个kafka流代码,使用kafka 2.4 kafka客户端版本和kafka 2.2服务器版本。我的主题有50个分区 我的Kafka流代码有选择键()DSL操作,我有200万条记录使用相同的KEY。在流配置中,我已经完成了 因此,我能够使用完全相同的键使用不同的分区。如果我没有按预期使用轮循机制,我的所有消息都会转到同一分区。 直到现在一切都很好,但我意识到;当我使用RoundRobin分

  • 在Spring Boot应用程序中,我试图配置Kafka流。用简单的Kafka主题,一切都很好,但我无法得到工作SpringKafka流。 这是我的配置: 我想创建一个基于主题的流。应用一个简单的转换并将此流中的消息发送到test主题。 我向发送以下消息,其中是我自己的复杂类型,但是我现在不知道如何将它转换为中的,以便能够在中使用它。 请建议如何使其工作。

  • 我正在使用Kafka Streams,我注意到它使我的kafka日志记录了很多日志消息,例如: 这真的很令人不安,因为我发现它会淹没日志,所以我看不到任何其他内容(也会消耗资源)。 为什么它发生在(一些)Kafka Streams内部主题上,而不是其他主题上? 我怎样才能禁用它?

  • 我有一个通用的Streams API问题,我想“高效地”解决。假设我有一个(可能非常大,可能无限)流。我想以某种方式对其进行预处理,例如,过滤掉一些项目,并对一些项目进行变异。让我们假设这个预处理是复杂的,时间和计算密集型的,所以我不想做两次。 接下来,我想对项序列执行两组不同的操作,并使用不同的流类型构造处理每个不同序列的远端。对于无限流,这将是一个forEach,对于有限流,它可能是一个收集器

  • 我想知道我是否能做这样的事情。假设我有一个数字流1-20。我想利用一个特性,比如drop 3(我想用Java术语来说是限制还是跳过?)并产生一个流,即数字流: 1-20、4-20、7-20等 然后可能平坦地将这些全部映射到一条溪流中。我尝试了使用Stream.iterate的各种组合,主要是从流生成流,但我一直收到一个IllegalStateExcema,说流已经操作或关闭。 例如,人们可能期望这