当前位置: 首页 > 知识库问答 >
问题:

使用Java的Apache Spark流媒体

柯宜年
2023-03-14

我正在尝试从Spark官方网站运行Spark Streaming示例

这些是我在pom文件中使用的依赖项:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.1</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.3.1</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

这是我的Java代码:

package com.myproject.spark;

java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import com.myproject.spark.serialization.JsonDeserializer;

import scala.Tuple2;

public class MainEntryPoint {
  public static void main(String[] args) {
    Map<String, Object> kafkaParams = new HashMap<String, Object>();
    kafkaParams.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer",JsonDeserializer.class.getName());
    kafkaParams.put("group.id", "ttk-event-listener");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("topic1", "topic2");

    SparkConf conf = new SparkConf()
        .setMaster("local[*]")
        .setAppName("EMSStreamingApp");
    JavaStreamingContext streamingContext =
        new JavaStreamingContext(conf, Durations.seconds(1));

    JavaInputDStream<ConsumerRecord<String, String>> stream =
      KafkaUtils.createDirectStream(
        streamingContext,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
      );

    stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));


    streamingContext.start();
    try {
      streamingContext.awaitTermination();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }
}

当我尝试从Eclipse运行它时,我遇到以下异常:

18/07/16 13:35:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.106, 51604, None)
18/07/16 13:35:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.106, 51604, None)
Exception in thread "main" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:168)
at org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.myproject.spark.MainEntryPoint.main(MainEntryPoint.java:47)
18/07/16 13:35:28 INFO SparkContext: Invoking stop() from shutdown hook

我从我的IDE(eclipse)运行它。我是否必须创建并将JAR部署到火花中以使其运行。如果有人知道这个异常,请分享您的经验。提前谢谢

共有1个答案

戚兴邦
2023-03-14

尝试将2.3.1也用于spark streaming kafka依赖项。

还可以查看有关java的其他相关问题及其答案。lang.AbstractMethodError。

这通常意味着使用的库与其接口/实现之间的不匹配。

 类似资料:
  • 我正试图在一些维基百科转储(以压缩的bz2形式)上运行一个带有java Mapper/Reducer的hadoop流作业。我正在尝试使用WikiHadoop,这是Wikimedia最近发布的一个界面。 WikiReader_Mapper。JAVA 维基阅读器。JAVA 我正在运行的命令是 我收到的错误信息是 我更熟悉新的hadoop API和旧的。由于我的mapper和reducer代码位于两个不

  • 我正在用Kafka设计一个spark流媒体应用程序。我有以下几个问题:我正在将数据从RDBMS表流式传输到kafka,并使用Spark consumer来使用消息,并使用Spark-SQL进行处理 问题:1。我将数据从表中流式传输到kafka as(键作为表名,值作为JSON记录形式的表数据)——这是正确的体系结构吗? 这种数据库流的架构和设计是否正常,我如何解决转换问题中的转换? 你好Piyus

  • 我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里

  • 你好,我正在开发一个应用程序,我可以在其中使用widevine drm保护来播放dash stream。我已经阅读了exoplayer的示例,但我的需求不同,我会在我的网站上,当单击dash url时,它将开始在exoplayer中播放流。我已经成功地完成了打开exoplayer活动的第一部分,但是我不知道如何在exoplayer上运行受drm保护的流。 我知道流和drm许可证url。 我正在编写

  • 我想使用gstreamer进行网络传输。目的是启动视频内容(从发射机)并在接收机端播放。我编写了一个用于对网络内容进行流式传输的示例测试代码。 在发射机侧:GST\u DEBUG=“*:2”GST-launch-1.0 videotestsrc!视频/x-raw!jpegenc!rtpjpegpay!udpsink主机=127.0.0.1端口=5001 在接收器端:GST\u DEBUG=“*:2

  • 我正在使用Google的YouTube API Explorer(备用)来查找属于其他人的任意流媒体广播的信息。 无论我在字段中输入了什么,我都会返回 这似乎很荒谬,考虑到视频显然是流媒体。 我突然想到,我可能误解了字段的说明,所以我尝试了几种不同的可能性。这些包括。。。 频道ID() 用户ID() 视频ID() ...每个都无济于事。 我如何询问一个频道有关其直播流视频的信息?这个问题在过去可以