火花2.1+Kafka0.10+火花流。 批处理持续时间为30s。 我有13个节点,2个代理,每个主题/分区每个执行器使用1个核心。 LocationStrategy更好。 当使用1个主题时,没有问题的执行器总是处理相同的主题/分区(测试到24个分区)。 当我添加另一个主题时,一些用于处理主题/分区的执行器从一个批切换到另一个批。 当一个执行器再次处理相同的主题/分区时(例如,在前一次处理之后的3
我有一些关于Kafka主题分区->spark流媒体资源利用的用例,我想更清楚地说明这些用例。 我使用spark独立模式,所以我只有“执行者总数”和“执行者内存”的设置。据我所知并根据文档,将并行性引入Spark streaming的方法是使用分区的Kafka主题->RDD将具有与Kafka相同数量的分区,当我使用spark-kafka直接流集成时。 因此,如果我在主题中有一个分区和一个执行器核心,
https://cwiki.apache.org/confluence/display/kafka/0.8.0+SimpleConsumer+example 使用SimpleConsumer来使用消息,但是在使用它时,我发现了一些突然的行为,如下所示: 使用者正在使用来自特定分区的消息。但问题是,当我的使用者运行并且我使用生产者将消息推送到主题时,它将使用来自该分区的消息。但是,如果我的消费者目前
我的问题是:我应该如何构造我的kafka应用程序,使其能够从一个主题的单个分区中使用另一个具有多个分区的主题所发生的“全局”事件中做出反应? 如果我有以下场景:我有一个主题,它每次宣布一个新的演出时都会得到一条新消息,键是艺术家的名字,值是一个带有日期、位置等的avro对象。主题有一个分区。 示例键:示例值: 我有另一个紧凑的主题,名为,其中包含订阅了artist的用户,他们希望在他们感兴趣的艺术
对于Java Kafka消费者函数,它要求传入和。但是,我认为这个seek方法将为我的使用者获取订阅的TopicPartitions的集合。 这是我试图处理的例子。 使用者A订阅主题“test-topic”分区1和2。当调用时,我从每个分区读取消息。我处理了一些消息,但我的应用程序得到了一个异常。我不调用。现在,我要倒回到在上次中检索到的偏移量,并尝试重新处理它们。那么我该怎么做呢?我是否需要检查
我有一个Kafka Streams应用程序,每当我重新启动它时,它所消耗的主题的偏移量就会被重置。因此,对于所有分区,延迟增加,应用程序需要重新处理所有数据。 更新:输出主题接收到一系列事件,这些事件在应用程序重新启动后已经被处理,而不是像我在上一段中所说的那样,输入主题的偏移量被重置。但是,内部主题(KTABLE-SUPPRESS-STATE-STORE)偏移量正在重置,请参见下面的注释。 在重
我有一个场景做一些管理任务与Kafka。 现在管理员想, 例1:将app2的偏移量从当前偏移量位置(X)向前移动5个位置。对于此任务,是否应该停止app2并创建一个KafkaConsumer(不是spring),其组id与app2相同,即“Group2”,并提交新的偏移量x+5并启动app2? 案例2:将主题A的所有应用程序的偏移量从它们当前的偏移量位置(X)向前移动5个位置,假设所有应用程序都处
我想有一个主题与10个分区。我使用的是Kafka的默认配置。我用帮助器脚本创建了一个有10个解析的主题,现在我将为它生成消息。 问题是,消费者似乎只有5个分区可以从中获取数据。 让我们更详细地描述一下。 我知道每个分区需要一个使用者线程。我希望能够提交每个分区的偏移量,这是可能的,只有当我有一个线程每个消费连接器每个分区(我是使用高级消费)。 当我这样做10次时,我有10个消费者,每个分区每个线程
我从分区中读取,并对我读取的每个ConsumerRecord执行consumer.commitasync(在理解行为之前,在这一点上故意不对提交进行批处理)。 我在commit async回调中放置了一个每个主题的计数器来度量它被调用的次数,总数为100万次。 在应用程序稳定下来并停止之后,我使用Kafka CLI工具查看我的偏移量,我得到如下内容: 为什么我还会有这种滞后?什么可能导致这一切?
一段时间后,Kubernetes controller将重新创建/重新启动失败/死亡的消费者实例,并再次执行新的重新平衡。 是否有任何方法来控制第一个重新平衡过程(当消费者死亡时)?例如,等待几秒钟而不重新平衡,直到失败的消费者返回,或者直到触发超时。如果消费者返回,是否继续基于旧的再平衡分配(即,没有新的再平衡)进行消费?
当使用者组a的一个Kafka使用者连接到Kafka代理时,我希望搜索到所有分区的末尾,即使在代理端存储了一个偏移量。如果更多的其他消费者为同一个消费者组连接,他们应该提取最新存储的偏移量。我正在做以下工作: 问题是,当我连接消费者组A的第一个消费者c1时,一切都按预期工作,如果我连接消费者组A的另一个消费者c2,该组将重新平衡,c1将消耗跳过的抵消。 有什么想法吗?
我正处于探索Kafka0.8.1.1版本的初始阶段。 使用API进行触发器再平衡 将kafka配置为等待消费者活动一段时间,并假设它被不优雅地关闭,自动重新平衡。 这里的问题是,分配给死亡使用者的分区中的所有消息都保留在队列中,并且在重新平衡发生之前不会被处理。
在消费者之间重新平衡分区的代价有多大。我期待着每隔几秒钟就有一个新的消费者结束或加入同一个消费者群体。所以我只想知道一个再平衡操作的开销和延迟。 假设使用者C1具有分配给它的分区P1、P2、P3,并且它正在处理来自分区P1的消息M1。现在消费者C2加入了这个群体。C1和C2之间的分区是如何划分的。是否有可能拒绝C1的(可能需要一些时间将其消息提交给Kafka)对M1的提交,而M1将被视为一个新的消
我使用@KafkaListener,我需要一个动态的主题名,所以我使用SpEL'\uu listener'来实现这一点 它工作得非常好。 主要问题是当我想添加另一个注释时,它会触发某些方面的编程 @MyCustomAnnotationToRecordPerformance@KafkaListener(主题 = "#{__ listener.my道具}")公共无效监听器Kafka(@Payload
我有一个@KafkaListener方法来获取主题中的所有消息,但对于@Scheduled方法工作的每个间隔时间,我只获取一条消息。如何一次从topic获取所有消息? 这是我的课; 这是我在应用程序中的Kafka属性。yml; 还有我的KafkaConfiguration课程;