我对Spark Streaming是新手,从Spark Streaming我使用Kafkautils创建了一个直接到Kafka的流。如下所示
jssc = new JavaStreamingContext(sparkConf, Durations.seconds(KafkaConfig.getInstance().getBatchDuration()));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put(KafkaConstants.ZOOKEEPER_CONNECTION_STRING, zookeeperHost + ":" + zookeeperPort);
kafkaParams.put(KafkaConstants.METADATA_BROKER_LIST_STRING, bootstrapHost + ":" + bootstrapPort);
kafkaParams.put(KafkaConstants.GROUP_ID_STRING, groupId);
HashSet<String> topicSet = new HashSet<String>();
topicSet.add(topic);
JavaPairInputDStream<String, String> topicStream = KafkaUtils.createDirectStream(jssc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicSet);
JavaDStream<String> topicMessages = topicStream.map(Tuple2::_2);
topicMessages.print()
当我试图运行该作业时,它正抛出以下错误
下面是我的pom.xml
<!-- Scala version -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.3</version>
</dependency>
<!--Spark Core -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<!--Spark Streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<!--Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
请让我知道如何解决这个问题。
您已经定义了Scala version->2.10.3
,并且编写了Artifactid和2.11
。对于每个依赖项,您应该将artifactID更改为2.10
,然后它就可以工作了。
我得到以下错误: https://spark.apache.org/docs/1.4.1/streaming-flume-integration.html
我刚刚在Intellij中设置了Scala(以及SDK和JDK) 文件- 创建了一个项目。 项目名称src下-- 我做错了什么。请帮忙
我正在尝试使用spark df读取spark中的CSV文件。文件没有标题列,但我想有标题列。如何做到这一点?我不知道我是否正确,我写了这个命令- 并将列名作为列的_c0和_c1。然后我尝试使用:val df1=df.with列重命名("_c0","系列")将列名更改为所需的名称,但我得到"with列重命名"不是单元上的成员。 PS:我已经导入了spark.implicits._和spark.sql
我使用卡珊德拉2.1.5(dsc),火花1.2.1与火花卡珊德拉连接器1.2.1。 运行Spark作业(scala脚本)时,出现以下错误: 16/03/08 10:22:03INFO DAGScheduler:作业0失败:减少在JsonRDD. scala: 57,采取了15.051150的异常线程"main"org.apache.spark.SparkExc0019:作业中止由于阶段故障:阶段1
我正在android模拟器中运行我的flutter项目。我正面临以下错误。附屏幕截图。 但是,我可以在相应的文件路径中看到< code>AndroidManifest.xml文件。
使用https://stackoverflow.com/a/32407543/5379015中提供的解决方案,我尝试重新创建相同的查询,但使用编程语法代替API,如下所示: 第一个工作正常,但是结果是