当前位置: 首页 > 知识库问答 >
问题:

Spark streaming和kafka缺少没有默认值的必需配置“partition.assignment.strategy”

周马鲁
2023-03-14

我正在尝试使用纱线与Kafka一起运行spark streaming应用程序。我得到以下堆栈跟踪错误-

造成原因:org.apache.kafka.common.config.配置异常:缺少所需的配置partition.assignment.strategy没有默认值。在org.apache.kafka.common.config.配置ef.parse(配置ef.java:124)在org.apache.kafka.common.config.AbstractConfig。(AbstractConfig.java:48)org.apache.kafka.clients.consumer.消费者配置。消费者onfig.java:194)在org.apache.kafka.clients.consumer.Kafka消费者。Kafkaonsumer.java:380)在org.apache.kafka.clients.consumer.Kafka消费者。Kafkaonsumer.java:363)org.apache.kafka.clients.consumer.Kafka消费者。KafkaConsumer.java:350)org.apache.spark.streaming.kafka010。CachedKafka消费者。CachedKafkaConsumer.scala:45)org.apache.spark.streaming.kafka010。消费者$. get(CachedKafkaConsumer.scala:194)在org.apache.spark.streaming.kafka010。KafkaRDDIterator。kafkaRDD. scala: 252)在org. apache. Spark. stream. kafka010。计算(KafkaRDD. scala: 212)在org. apache. spark. rdd。RDD. computeOrReadCheckpoint(RDD. scala: 324)at org. apache. spark. rdd。rdd. iterator(rdd. scala: 288)at org. apache. spark. rdd。scala: 49)在org. apache. spark. rdd。RDD. computeOrReadCheckpoint(RDD. scala: 324)at org. apache. spark. rdd。rdd. iterator(rdd. scala: 288)at org. apache. spark.调度器。运行任务(ResultTasks. scala: 87)在org. apache. Spark.调度器。在org. apache. spak. exitor上运行(Task. scala: 109)。执行器$TaskRunner. run(Executor. scala: 345)

下面是我如何使用spark stream创建KafkaStream的代码片段-

        val ssc = new StreamingContext(sc, Seconds(60))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "*boorstrap_url:port*",
  "security.protocol" -> "SASL_PLAINTEXT",
  "sasl.kerberos.service.name" -> "kafka",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "annotathtml" target="_blank">ion-test",
  //Tried commenting and uncommenting this property      
  //"partition.assignment.strategy"->"org.apache.kafka.clients.consumer.RangeAssignor",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean))

val topics = Array("*topic-name*")

val kafkaStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams))
val valueKafka = kafkaStream.map(record => record.value())

我已经看过下面的帖子了-

  1. https://issues.apache.org/jira/browse/KAFKA-4547

根据这一点,我已经将fat jar中的kafka util jar从spark stream kafka jar默认打包的0.10.1.0版本更新为0.10.2.0版本,作为临时依赖项。当我在单个节点上运行它时,通过将master设置为local,我的工作也可以正常工作。我正在运行spark 2.3.1版本。

共有1个答案

温举
2023-03-14

添加kafka客户端-*。jar到您的spark jar文件夹<代码>Kafka客户端-*。jar位于kafka-*/lib目录中。

 类似资料:
  • 我使用的是Json。net将对象序列化到数据库。 我向类中添加了一个新属性(该属性在数据库的json中缺失),我希望新属性在json中缺失时具有默认值。 我尝试了DefaultValue属性,但它不起作用。我正在使用私有setter和构造函数来反序列化json,因此在构造函数中设置属性的值将不起作用,因为有一个带有该值的参数。 以下是一个例子: 我预计年龄是5岁,但现在是零岁。 有什么建议吗?

  • 问题内容: 我有一个重写版本discord.py。如果消息中包含内容,则不会发生错误。如果消息中内容不存在,则我希望该错误不会发生。 我的代码: 完整回溯: 问题答案: 命令解析参数的方式意味着定义 表示需要单词作为命令调用的一部分。如果您想捕获消息的其余部分,则可以使用仅关键字参数语法: 此功能在此处记录。

  • 云功能部署失败:缺少资源项目/ourcafe-mucqxq上service-1044193269753@gcf-admin-robot.iam.gserviceAccount.com的必需权限resourceanager.projects.getiampolicy。请授予service-1044193269753@gcf-admin-robot.iam.gserviceAccount.com云功能

  • 问题内容: 我正在开发一个游戏作为附带项目,很有趣,但是遇到了这个错误,我真的不知道为什么会发生… 这是代码: 我这样称呼它: 我得到的错误是: 有任何想法吗? 问题答案: 您不应直接调用类方法,而应创建该类的实例: 要详细说明该错误,您将得到: TypeError:turn()缺少1个必需的位置参数:“ playerImages” 这是因为需要第一个参数()的实例。类方法总是将实例作为第一个参数

  • 可能有一个非常明显的答案,但如何做到以下几点: 基本上是从一个类型中检索所有键,以循环通过?

  • 这是我的用户。JAVA 这是我的地址。JAVA 但是当我试图持久化对象时,我得到了这个异常 错误不会来,如果我删除"NOTNULL"约束从FORIGN_KEY"USER_ID",但我需要使它作为一个NOTNULL列我应该怎么做。 这是我的注册方法 这是我的SQL脚本