Kafka Streams引擎将一个分区映射到一个工作者(即Java应用程序),以便该分区中的所有消息都由该工作者处理。我有下面的场景,并试图了解它是否仍然可行。
我有一个主题A(有3个分区)。发送到它的消息由Kafka随机分区(即没有密钥)。我发送给它的消息有一个如下的模式
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
因为我有3个分区,消息在它们之间随机分区,所以相同型号的汽车可以写入不同的分区。举个例子
P1
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Toyota", color: "Blue", timeStampEpoch: 14334343342}
P2
{carModel: "Toyota", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Nissan", color: "Blue", timeStampEpoch: 14334343342}
P3
{carModel: "Nissan", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Honda", color: "Red", timeStampEpoch: 14334343342}
{carModel: "Nissan", color: "Blue", timeStampEpoch: 14334343342}
现在假设我想计算carModel看到的汽车总数。我写了一个Kafka Streams应用程序,用于监听主题A,通过carModel映射消息,即。
carStream.map((key, value) -> KeyValue.pair(value["carModel"], value))
并将总数写入另一个主题B,表单的消息
{carModel: "Nissan", totalCount: 5}
然后我启动了3个实例,它们都属于同一个消费者群体。然后,Kafka将有效地将每个分区映射到其中一个工作区。实例
P1 --> Worker A
P2 --> Worker B
P3 --> Worker C
但是,由于每个工作人员只看到一个分区,因此它只会看到每个车型的部分信息。它将丢失来自其他分区的同一车型的数据。
问题:我的理解正确吗?
如果是的话,我可以想象我可以通过carModel重新划分(即重新洗牌)我的数据,以使这个用例工作。
但我只是想确保我没有误解它是如何工作的,事实上,Kafka在我的应用程序中的内部映射之后神奇地处理了重新分区。
Kafka Streams将自动对数据进行重新分区。您的程序将类似于:
stream.map(...).groupByKey().count();
对于此模式,Kafka Streams检测到您在map
中设置了一个新键,因此将在后台自动创建一个主题,以重新划分groupByKey()的数据。count()
步骤(从v0.10.1开始,通过KAFKA-3561)。
注意,map()
标记需要重新分区的流,并且. group pByKey(). count()
将创建重新分区的主题。在这方面,重新分区是“懒惰的”,即只有在需要时才进行。如果没有. group pByKey(). count()
,就不会引入重新分区。
基本上,上面的程序的执行方式与
stream.map(...).through("some-topic").groupByKey().count();
Kafka Streams通过()步骤自动插入,从而计算正确的结果。
如果您使用的是Kafka Streams 0.10.0,则需要使用所需数量的分区手动创建重新分区主题,并且还需要通过()将对
的调用添加到代码中。
问题内容: 我从我的教授那里得知,使用UDP套接字发送的数据报包在较低层中被分段,并且 可能 以多个包的形式到达接收器端。例如,如果我以数据报包的形式发送1000字节的数据,则在接收端 可能会 到达2字节,500字节,12字节,依此类推。因此,他建议执行多次receive(…)以接收发送方发送的整个1000字节数据包。 稍后,当我浏览Java文档中有关数据报套接字的receive(…)时,一行的内
我正在使用Spring云流Kafka流编写Java应用程序。下面是我正在使用的函数方法片段: fetch_data_from_database()可以抛出异常。 如果fetch\u from\u database()发生异常,如何停止对入站KStream的处理(不应提交偏移量),并使其使用相同的偏移量数据重试处理?
假设有Kafka主题顺序。数据以JSON格式存储: 定义订单的状态(待定-1,已完成-2)。 完成后如何在“已完成”上进行更改? 正如我所知,Kafka主题是不可变的,我不能更改消息JSON,只需创建一个带有更改值的新消息,对吗?
我有一个连续发送数据的UDP服务器。我要接收服务器发送的所有数据包。 在服务器端,我有两个线程。一个线程从文件中连续读取数据并放入Deque。另一个线程从deque读取数据并不断发送到UDP客户端。客户端代码不断地从服务器接收数据。
问题内容: 在基于nodejs / express的应用程序中,我需要处理GET请求,其中可能包含使用iso-8859-1字符集编码的变音符号。 不幸的是,它的querystring解析器似乎只能处理纯ASCII和UTF8: 是否有隐藏的选项或其他干净的方法也可以与其他字符集一起使用?默认行为的主要问题是,我没有办法知道是否存在解码错误-毕竟,输入 本来可以 只是简单地解码为仍然看起来像urlen
我正在寻找解决这个问题的方法,已经快三天了。我有一个带有占位符的Word文档。对于如何解析word文档模板并用内容替换占位符,我们有自己的实现。但我遇到了一个问题,段落列表并没有包含word文档中的所有内容。我得到了OpenXmlPartRootElement的所有段落类型的后代。 然后我检查特定的占位符。 文档中有3个相同的占位符,但此语句只找到其中两个。这会破坏所有文档,因此会对其进行半解析。