当前位置: 首页 > 知识库问答 >
问题:

2个具有相同消费者组id的spark stream作业

慕宏峻
2023-03-14

我正在尝试对消费者群体进行实验

这是我的代码片段

public final class App {

private static final int INTERVAL = 5000;

public static void main(String[] args) throws Exception {

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "xxx:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", true);
    kafkaParams.put("auto.commit.interval.ms","1000");
    kafkaParams.put("security.protocol","SASL_PLAINTEXT");
    kafkaParams.put("sasl.kerberos.service.name","kafka");
    kafkaParams.put("retries","3");
    kafkaParams.put(GROUP_ID_CONFIG,"mygroup");
    kafkaParams.put("request.timeout.ms","210000");
    kafkaParams.put("session.timeout.ms","180000");
    kafkaParams.put("heartbeat.interval.ms","3000");
    Collection<String> topics = Arrays.asList("venkat4");

    SparkConf conf = new SparkConf();
    JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(INTERVAL));


    final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    ssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
                    return new Tuple2<>(record.key(), record.value());
                }
            }).print();


    ssc.start();
    ssc.awaitTermination();


}

}

当我同时运行两个spark流媒体作业时,它会出错

线程“main”java中出现异常。lang.IllegalStateException:当前没有分配给组织上的分区venkat4-1。阿帕奇。Kafka。客户。消费者内部。订阅状态。组织上的assignedState(SubscriptionState.java:251)。阿帕奇。Kafka。客户。消费者内部。订阅状态。needOffsetReset(SubscriptionState.java:315)位于org。阿帕奇。Kafka。客户。消费者Kafka苏美尔。参见org上的kToEnd(KafkaConsumer.java:1170)。阿帕奇。火花流动。Kafka010。导演Kafka因普特流。LatestOffset(DirectKafkaInputDStream.scala:197)位于org。阿帕奇。火花流动。Kafka010。导演Kafka因普特流。计算(DirectKafkaInputDStream.scala:214)位于org。阿帕奇。火花流动。数据流。数据流$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7。在org上申请(DStream.scala:341)。阿帕奇。火花流动。数据流。数据流$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7。在scala上应用(DStream.scala:341)。util。动态变量。org上的withValue(DynamicVariable.scala:58)。阿帕奇。火花流动。数据流。数据流$$anonfun$getOrCompute$1$$anonfun$1。在org上申请(DStream.scala:340)。阿帕奇。火花流动。数据流。数据流$$anonfun$getOrCompute$1$$anonfun$1。在org上申请(DStream.scala:340)。阿帕奇。火花流动。数据流。数据流。在org上创建rddwithLocalProperties(DStream.scala:415)。阿帕奇。火花流动。数据流。数据流$$anonfun$getOrCompute$1。在org上申请(DStream.scala:335)。阿帕奇。火花流动。数据流。数据流$$anonfun$getOrCompute$1。在scala上应用(DStream.scala:333)。选项orElse(Option.scala:289)

https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html创建具有相同组的kafka消费者的单独实例将创建分区的重新平衡。我相信消费者不会容忍这种重新平衡。我应该如何解决这个问题

下面是使用的命令

SPARK_KAFKA_VERSION=0.10 spark2 submit--num executors 2--master thread--deploy mode client--files jaas。conf#jaas。形态,蜂巢。键盘#蜂巢。keytab--驱动程序java选项“-Djava.security.auth.login.config=./jaas.conf”--类流。App--conf“spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf”--conf spark。流动。Kafka。消费者隐藏物enabled=false 1-1.0-SNAPSHOT。罐子

共有2个答案

霍建章
2023-03-14

@Ravikumar为延迟道歉。

我的测试是这样做的

a、 我的主题有3个分区b。spark streaming作业由2个执行器启动,运行良好。c、 后来,我决定用另一个实例来扩展它,运行另一个spark streaming作业,其中有一个执行器,以匹配失败的第三个分区。

关于你的说法:当你开始第二个火花流作业时,另一个消费者试图从同一个消费者Groupid消费同一个分区。所以它抛出了错误。是的,这是完全正确的。但是为什么它不能容忍是这里的问题。

引用您突出显示的文档:

Kafka将一个主题的分区分配给一个组中的消费者,因此每个分区只由该组中的一个消费者使用。Kafka保证,一条消息只能由组中的单个消费者阅读。每当任何代理出现故障或向现有主题添加新分区时,Kafka都会重新平衡分区存储。这是kafka特有的如何在代理中跨分区平衡数据的方法。如果添加更多进程/线程,Kafka将重新平衡。如果任何消费者或代理未能向ZooKeeper发送心跳信号,则Kafka群集可以重新配置ZooKeeper。

这也是我对火花流工作的期望。我尝试了能够忍受再平衡的普通Kafka客户。

你在文档中的观点“缓存是由topic分区和group.id键控的,所以使用

除了公关部https://github.com/apache/spark/pull/21038--下面是

“当新消费者加入消费者组以重新平衡分区时,Kafka分区可以被撤销。但当前的Spark Kafka connector代码确保没有分区撤销场景,因此尝试从撤销的分区获取最新偏移量将引发JIRA提到的异常。”

很高兴结束这个话题非常感谢你的回应

卓瀚
2023-03-14

https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html创建具有相同组的kafka消费者的单独实例将创建分区的重新平衡。我相信消费者不会容忍这种重新平衡。我应该如何解决这个问题

现在,所有分区都只由一个使用者使用。如果数据摄取率很高,消费者可能会以摄取的速度缓慢地消耗数据。

向同一个consumergroup添加更多消费者,以使用某个主题中的数据并提高消费率。Spark streaming使用这种方法在Kafka分区和Spark分区之间实现1:1并行。Spark将在内部处理它。

如果你有比主题分区更多的消费者,那么它将处于空闲状态,资源利用率也会降低。始终建议使用者应小于或等于分区计数。

如果添加了更多的进程/线程,Kafka将重新平衡。如果任何消费者或代理未能向ZooKeeper发送心跳,Kafka集群可以重新配置ZooKeeper。

当任何代理失败或向现有主题添加新分区时,kafka都会重新平衡分区存储。这是kafka特定的如何平衡代理中跨分区的数据。

Spark streaming在Kafka分区和Spark分区之间提供了简单的1:1并行性。如果您没有使用ConsumerStragies提供任何分区详细信息。分配、使用给定主题的所有分区。

Kafka将一个主题的分区分配给组中的消费者,这样每个分区都被组中的一个消费者使用。Kafka保证消息只被组中的单个消费者读取。

当您启动第二个spark流媒体作业时,另一个使用者尝试使用来自同一使用者groupid的同一分区。所以它抛出了错误。

val alertTopics = Array("testtopic")

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> sparkJobConfig.kafkaBrokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> sparkJobConfig.kafkaConsumerGroup,
  "auto.offset.reset" -> "latest"
)

val streamContext = new StreamingContext(sparkContext, Seconds(sparkJobConfig.streamBatchInterval.toLong))

val streamData = KafkaUtils.createDirectStream(streamContext, PreferConsistent, Subscribe[String, String](alertTopics, kafkaParams))

如果您想使用特定于分区的spark作业,请使用以下代码。

val topicPartitionsList =  List(new TopicPartition("topic",1))

val alertReqStream1 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies

消费者可以通过使用samegroup.id.加入一个组

val topicPartitionsList =  List(new TopicPartition("topic",3), new TopicPartition("topic",4))

    val alertReqStream2 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

再添加两个消费者就是添加到同一个groupid中。

请阅读Spark-Kafka集成指南。https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

希望这有帮助。

 类似资料:
  • 我是Kafka的初学者。我知道具有相同组id的多个消费者不能在一个主题中使用来自同一个分区的消息。我想知道如果来自一个消费组的多个Kafka消费者从一个分区读取相同的消息会发生什么,为什么这是一件坏事。 。

  • 我对Kafka是陌生的。我用spring boot创建了一个kafka消费者(spring-kafka dependency)。在我的应用程序中,我使用了consumerFactory和producerfactory beans进行配置。所以在我的应用程序中,我创建了如下的kafka消费者。 我的配置如下 所以我想并行消费,因为我可能会收到更多的消息。关于使用并行主题,我发现我需要为一个主题创建多

  • 有以下消费者代码: 然后我用脚本生成消息: 问题是,当我将消费者作为两个不同的进程启动时,我会在每个进程中收到新消息。但是,我希望它只发送给一个消费者,而不是广播。 在Kafka的文献中(https://kafka.apache.org/documentation.html)其中写道: 如果所有使用者实例都具有相同的使用者组,则其工作原理就像在使用者之间平衡负载的传统队列一样。 我发现这些消费者的

  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。

  • 我试图有多个消费者的Kafka主题的多个分区与相同的groupId,这将帮助我扩大消费的消息。 根据Kafka的文件,它说: 如何让多个消费者拥有相同的消费者groupId,以实现负载平衡?

  • 我以前认为设置我的消费者将始终收到他们尚未收到的消息,但最近我发现情况并非如此。这只在使用者尚未提交抵消时才起作用。在任何其他情况下,使用者将继续接收偏移大于其提交的最后偏移的消息。 由于我总是使用随机的组ID创建新的使用者,我意识到我的使用者“没有内存”,他们是新的使用者,并且他们永远不会提交偏移,因此策略将始终适用。我的疑虑就从这里开始了。假设以下场景: 我有两个客户端应用程序,A和B,每个客