我有3个分区:0、1、2。因此,这些消息可以分为0、1、2。
eg:
1消息在分区0:0
分区1中的3条消息:111
分区2中的2条消息:22
如何使消费者以012x12x1x的顺序使用消息(x表示当时没有消息)。已消费消息的顺序如下所示:012121。我想在C和Python中都这样做。从现有的客户端来看,消息可以以循环方式生成,但不能以循环方式消费。
任何想法?
Kafka消费者配置(http://kafka.apache.org/documentation.html#consumerconfigs)中partition.assignment.strategy。我正在寻找一些工具,实现这个配置(如水槽,火花,Storm)从Kafka读取数据,重新排序,并再次写入Kafka。继续上面的例子。重新排序的消息看起来像:012121 (012x12x1x)
使现代化
现在,我可以在C Kafka客户端(https://github.com/edenhill/librdkafka)中执行此操作。
for(int i = 0; i < 2; i++)
{
RdKafka::Message *msg = m_consumer->consume(m_topic, i, 1000);
// Do something about msg here...
}
输出:
Reading from 1=>4953---1---
Reading from 0=>46164---0---
Reading from 1=>4954---1---
Reading from 0=>46165---0---
Reading from 1=>4955---1---
Reading from 0=>46166---0---
Reading from 1=>4956---1---
Reading from 0=>46167---0---
Reading from 1=>4957---1---
Reading from 0=>46168---0---
在Java中,您可以使用高级使用者。
如果您使用相同的group.id
在同一个进程中使用所有3个分区,您可以创建3个充当消费者的工作线程,并以循环方式在它们之间循环。
我知道您明确提到Python和C是您的目标语言
根据我自己在Python中的经验,Java中提供的高级使用者是无与伦比的。
因此,您可以以某种方式包装它,例如创建一个新的Java进程作为服务器,并通过该进程消耗消息,或者您可以尝试将高级消费者移植到Python和/或C。
另一种选择是使用每个分区,将数据写入其他介质,如MySQL(或任何其他RDBMS),并使用SQL使用消息执行循环,最后从相关表中删除它们。
无论如何,我建议你重新考虑Kafka作为你的传输层。您的需求(以循环方式消费)与Kafka的核心设计/架构不一致。原因如下:
只要分区的数量小于消费机器中可用的内核,那么就可以使用单个(!)上的消息机器以循环的方式运行,就像我上面描述的那样。
然而,Kafka也是为分布式负载(消费者)而设计的。这就是为单个主题创建分区的动机。可伸缩性是Kafka的一个关键概念。
因此,如果用户计算机中的分区多于可用的内核,则可能无法正确地迭代分区并以循环方式使用消息。
示例:假设在一个特定主题中有20个分区。本主题始终会产生大量消息。现在让我们假设您有一台具有4个CPU核心的机器来使用该主题。根据设计,您一次最多可以使用4个分区。将此转换为循环消耗可以通过在html" target="_blank">内存中缓冲有意义的消息量或某种其他机制(这可能导致延迟等其他问题)来实现。这时我们假设所有的分区都是随时可用的,没有影响Kafka的网络问题或磁盘问题。这就是为什么我认为,大规模地将分区“连接回”到其他消息流中并不是一件小事。
我有一个关于这个连接器的问题。如果我的Spark集群和Cassandra集群不在同一个集群上,读取如何工作?Spark是否将整个Cassandra表带入自己的集群并将其重新排列到Spark分区中?
本文向大家介绍Paypal实现循环扣款(订阅)功能,包括了Paypal实现循环扣款(订阅)功能的使用技巧和注意事项,需要的朋友参考一下 起因 业务需求要集成Paypal,实现循环扣款功能,然而百度和GOOGLE了一圈,除官网外,没找到相关开发教程,只好在Paypal上看,花了两天后集成成功,这里对如何使用Paypal的支付接口做下总结。 Paypal现在有多套接口: 通过Braintree(后面会
我想我问这个问题,因此使用问题主题是因为我仍然不知道如何正确阅读API,或者我只是错过了一些甚至是基本的火花概念?!
一、背景 先说一下,为什么要使用 Flume + Kafka? 以实时流处理项目为例,由于采集的数据量可能存在峰值和峰谷,假设是一个电商项目,那么峰值通常出现在秒杀时,这时如果直接将 Flume 聚合后的数据输入到 Storm 等分布式计算框架中,可能就会超过集群的处理能力,这时采用 Kafka 就可以起到削峰的作用。Kafka 天生为大数据场景而设计,具有高吞吐的特性,能很好地抗住峰值数据的冲击
问题内容: 我在一起有多个循环,而在最内部的循环中有一个睡眠。例如: 如果您运行该代码,则可能需要等待1秒钟然后再次休眠直到结束,才能获得价值。 但是结果是不同的,它等待10秒钟并打印整行,然后再次等待打印下一行。 我发现打印末尾的逗号导致了此问题。我该如何解决? 问题答案: 由于存在逗号,因此输出缓冲到a为止。 您应在每次打印或使用后冲洗并冲洗缓冲区。 定义您的打印方法: 在行的末尾打印一个
一、简介 Apache Flume 是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中。Spark Straming 提供了以下两种方式用于 Flume 的整合。 二、推送式方法 在推送式方法 (Flume-style Push-based Approach) 中,Spark Streaming 程序需要对某台服务器的某个端口进行监听,Fl