我正在尝试从Spark官方网站运行Spark Streaming示例
这些是我在pom文件中使用的依赖项:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
这是我的Java代码:
package com.myproject.spark;
java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import com.myproject.spark.serialization.JsonDeserializer;
import scala.Tuple2;
public class MainEntryPoint {
public static void main(String[] args) {
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer",JsonDeserializer.class.getName());
kafkaParams.put("group.id", "ttk-event-listener");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topic1", "topic2");
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("EMSStreamingApp");
JavaStreamingContext streamingContext =
new JavaStreamingContext(conf, Durations.seconds(1));
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
当我尝试从Eclipse运行它时,我遇到以下异常:
18/07/16 13:35:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.106, 51604, None)
18/07/16 13:35:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.106, 51604, None)
Exception in thread "main" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:168)
at org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.myproject.spark.MainEntryPoint.main(MainEntryPoint.java:47)
18/07/16 13:35:28 INFO SparkContext: Invoking stop() from shutdown hook
我从我的IDE(eclipse)运行它。我是否必须创建并将JAR部署到火花中以使其运行。如果有人知道这个异常,请分享您的经验。提前谢谢
尝试将2.3.1也用于spark streaming kafka依赖项。
还可以查看有关java的其他相关问题及其答案。lang.AbstractMethodError。
这通常意味着使用的库与其接口/实现之间的不匹配。
我正试图在一些维基百科转储(以压缩的bz2形式)上运行一个带有java Mapper/Reducer的hadoop流作业。我正在尝试使用WikiHadoop,这是Wikimedia最近发布的一个界面。 WikiReader_Mapper。JAVA 维基阅读器。JAVA 我正在运行的命令是 我收到的错误信息是 我更熟悉新的hadoop API和旧的。由于我的mapper和reducer代码位于两个不
我正在用Kafka设计一个spark流媒体应用程序。我有以下几个问题:我正在将数据从RDBMS表流式传输到kafka,并使用Spark consumer来使用消息,并使用Spark-SQL进行处理 问题:1。我将数据从表中流式传输到kafka as(键作为表名,值作为JSON记录形式的表数据)——这是正确的体系结构吗? 这种数据库流的架构和设计是否正常,我如何解决转换问题中的转换? 你好Piyus
我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里
你好,我正在开发一个应用程序,我可以在其中使用widevine drm保护来播放dash stream。我已经阅读了exoplayer的示例,但我的需求不同,我会在我的网站上,当单击dash url时,它将开始在exoplayer中播放流。我已经成功地完成了打开exoplayer活动的第一部分,但是我不知道如何在exoplayer上运行受drm保护的流。 我知道流和drm许可证url。 我正在编写
我想使用gstreamer进行网络传输。目的是启动视频内容(从发射机)并在接收机端播放。我编写了一个用于对网络内容进行流式传输的示例测试代码。 在发射机侧:GST\u DEBUG=“*:2”GST-launch-1.0 videotestsrc!视频/x-raw!jpegenc!rtpjpegpay!udpsink主机=127.0.0.1端口=5001 在接收器端:GST\u DEBUG=“*:2
我正在使用Google的YouTube API Explorer(备用)来查找属于其他人的任意流媒体广播的信息。 无论我在字段中输入了什么,我都会返回 这似乎很荒谬,考虑到视频显然是流媒体。 我突然想到,我可能误解了字段的说明,所以我尝试了几种不同的可能性。这些包括。。。 频道ID() 用户ID() 视频ID() ...每个都无济于事。 我如何询问一个频道有关其直播流视频的信息?这个问题在过去可以