我需要创建一个消费者,能够从多个主题拉和订单消息相对于时间戳(Kafka消息时间戳)
在本例中,我订阅了“主题A”和“主题B”,并按照时间戳的顺序对消息进行排队
现在,只要所有主题只有一个分区,这很容易用这个伪代码来解决:
kafka.subscribe(['topicA', 'topicB'])
messagesByTopic = {}
finalMessageQueue = []
while true:
records = html" target="_blank">kafka.poll()
for record in records:
messagesByTopic[record.topic()].enqueue(record)
while messagesByTopic.any(queue => !queue.notEmpty()):
minQueue = messagesByTopic.min(queue => queue.peek().timestamp)
finalMessageQueue.enqueue(minQueue.pop())
当我为每个主题引入多个分区时,问题就出现了。显然,不可能将多个主题按时间顺序排序到一个流中,因为在一个主题中,顺序不能保证,只能在一个分区中,所以新的问题是将多个主题排序到具有相同密钥的流中。
想象2个主题,订单和撤销。主题中消息的关键是交易所属的客户id。
目标是将所有主题流到队列中(每个客户一个),按时间戳排序。
从理论上讲,这应该是可能的,因为order和Retraction主题中的消息是按每个客户的时间戳排序的,事实上,当处理每个主题的单个分区时,这个问题很容易解决。
现在,考虑2个订单分区和1个撤回分区的情况,如果我有两个进程同时运行会发生什么?一个进程将有所有客户的撤回,但只有一半的客户的订单,第二个进程将只有一半的客户的订单,它会分裂。
唯一的办法就是告诉Kafka,确保相同的密钥(即使来自不同的主题)总是被路由到同一个进程,但据我所知,没有办法做到这一点。
我卡住了。我需要一个办法。
为了达到预期效果,您应该确保两个主题之间的分区具有一定的对应性,方法是更改消息生成器对消息的分区方式,或者在排序逻辑之前将原始主题中的数据重新分区为新的中间主题。理想情况下,两个主题的分区之间应该有1对1的对应关系。一般来说,并行性(线程数)受两个主题的分区计数之间的最高公因数限制,例如,如果orders主题有12个分区,Drawing主题有9个,那么您可以将分区分配给HCF(12,9)=3个线程,如下所示:线程1:orders partithtml" target="_blank">ions(0,1,2,3),取款分区(0,1,2)线程2:订单分区(4,5,6,7),取款分区(3,4,5)线程3:订单分区(8,9,10,11),取款分区(6,7,8)要实现此功能,您需要为这两个主题实现自定义分区,而不是默认分区。
然而,如果一个主题有一个分区,另一个有两个,那么HCF(1,2)是1,这意味着你只能用单线程的方式来实现。
我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测
我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?
我们有一个传入的kafka主题,多个基于Avro模式的消息序列化到其中。 我们需要将Avro格式的消息拆分为多个其他kafka主题,基于某个公共模式属性的值。 想了解如何实现它,同时避免在汇流平台上构建中间客户端来进行这种拆分/路由。
注:使用kafka_2.11-0.9.0.1 我创建了一个Kafka主题,名为:
向源生成特殊的clear-message,这将导致聚合的消息变为空 将消息直接写入具有空数据的中间主题 另一种方式,也许kafka-streams已经有一个API调用了? 加分问题:如果我知道我不想让消息坐在中间话题中的时间超过6个月,我可以指示kafka-streams创建6M留存的中间话题,还是在我运行App之前我自己手动创建话题?
我们正在尝试使用托管在Windows独立环境中的Kafka中的代理消息。消费者正在Kubernetes中运行。 Server.Properties: 请帮助我解决这个问题。