当前位置: 首页 > 知识库问答 >
问题:

apache spark streaming kafka集成错误

贡烨烁
2023-03-14

我正在使用火花流和Kafka,我得到了这个错误。

线程“streaming-start”中的异常java.lang.NosuchMethoderror:scala.predef$.arrowassoc(ljava/lang/object;)ljava/lang/object;在org.apache.spark.streaming.kafka010.directkafkainputdstream$$anonfun$start$1.在org.apache.spark.streaming.kafka010.directkafkainputdstream.scala:246)dorg.apache.spark.streaming.kafka010.directkafkainputdstream$$anonfun$start$1.在scala.collection.traversablelike$$anonfun$map$1.在collection.iterator$class.foreach(iterator.scala:727)在Scala.collection.abstractiterator.foreach(iterator.scala:1157)在Scala.collection.iterablelike$class.foreach(iterablelike.scala:72)在Scala.collection.abstractiterable.foreach(iterable.scala:54)在Scala.collection.traversablelike$class.map(traversablelike.scala:244)在.mutable.abstractSet.map(Set.Scala:45)位于org.apache.spark.streaming.kafka010.DirectKafka在org.apache.spark.streaming.dstreamgraph$$anonfun$start$5处Inputdstream.start(drectkafkainputdstream.scala:245)。在org.apache.spark.streaming.dstreamgraph$$anonfun$start$5处应用(dstreamgraph.scala:49)在org.apache.spark.streaming.dstreamgraph$$anonfun$start$5处应用(dstreamgraph.scala:49)在$foreach.leaf(Pariterablelike.scala:975)在Scala.collection.parallel.task$$anonfun$trylefa$1.在Scala.collection.parallel.task$$anonfun$trylefa$1.在Scala.collection.parallel.task$$$anonfun$trylefa$1应用$mcv$sp(tasks.scala:54)在Scala.collection.parallel.task$$anonfun$trylefa$1应用(tasks.scala:53)ection.parallel.AdaptiveWorkStealingTasks$WrappedTask$Class.Compute(tasks.scala:165)at scala.coll在Scala.concurrent.forkjoin.recursiveAction.exec(recursiveAction.java:160)在Scala.concurrent.forkjoin.forkjoinask.doexec(forkjoinask.java:260)在Scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339)在Scala.concurrent.forkjoinpool.runworker(forkjoinpool.java:1979)在.run(ForkJoinWorkerThread.java:107)17/08/02 16:24:58 INFO StreamingContext:StreamingContext已启动

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>2.1.1</version>
        </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
    </dependency>

我的代码:

     SparkConf conf = new SparkConf().setAppName("Streaming").setMaster("local");


       JavaStreamingContext streamingContext = new JavaStreamingContext(conf, new Duration(1000));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "exastax");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList("loglar");
        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );
stream.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
        rdd.foreachPartition(consumerRecords -> {
            OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
            System.out.println(
                    o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
        });
    });
        streamingContext.start();
        streamingContext.awaitTermination();
    }
}

我使用KAFKA2.11-0.11.0.0,我尝试搜索这个问题,但我找不到相关的JAR,请帮助我解决这个问题。

共有1个答案

陶鹏
2023-03-14

您正在混合Scala2.10和Scala2.11代码。在Scala2.10中使用Kafka依赖项,或者在Scala2.11中使用Spark。

 类似资料:
  • 我已经使用最新的可用版本建立了一个新的Spring Boot Spring Integration Spring Integration Java DSL项目。项目构建正常,但当我运行应用程序时,我得到: 当前使用的依赖项如下: 错误可能是由于jar版本的错误组合吗?我不确定如何调试此错误。

  • 登录后,它生成了一个哈希值,但仍然给出错误“Some problem currened!try tain”。

  • 场景可能是:我的期望可能是批量10个数据点,我想对{failed 5,pass 5}或其他什么给出响应。 我的逻辑是将批处理拆分为数据元素并进行验证 成功的验证将发送给aggreagtor, 失败的验证将抛出错误并通过错误通道拾取。 收件人列表路由器将错误通道作为输入通道,并连接2个过滤器,目的是过滤某些类型的错误直接发送响应(与用户输入无关的信息-服务器错误等),某些类型的客户端错误将转到聚合器

  • 目前,我正在尝试使用Jenkins来部署我的项目代码。在这个项目中,我们使用Junit来实现TDD方法。 最后一个错误是在执行这行时发生了以下错误:也是在build.xml中,当构建一个模块(SDK)时 在成功构建之后,我想为各种测试套件生成Junit-report。Junit测试报告xmls现在正在为每个模块生成。

  • 问题内容: 我正在实施一个非常简单的易感性感染恢复模型,该模型具有稳定的工作量,可以用于闲置的副项目- 通常是一项非常琐碎的任务。但是我遇到了使用PysCeS或SciPy的求解器错误,它们都使用lsoda作为其基础求解器。这仅在参数的特定值时发生,我为之困惑。我使用的代码如下: 这将产生以下错误: 通常,当我遇到这样的问题时,我设置的方程组最终有问题,但是我都看不到任何问题。奇怪的是,如果将mu更

  • 我有三种不同的系统。我使用Spring integration来同步所有这些系统中的数据。 系统2将调用服务方法来持久化数据,如果请求有效,则返回响应,否则抛出异常 我需要发送服务方法响应到系统1和系统3,只有当操作成功。调用服务方法后,根据服务方法响应,使用Transformer生成对系统3的请求。在transformer之后,我将请求放入mq队列。 更新的JMS出站代码 如果服务类失败,我需要