当前位置: 首页 > 知识库问答 >
问题:

使用Hadoop配置对象进行Spark Streaming

高祺
2023-03-14

StreamingContext-fileStream也被重载以获取Hadoop配置对象,但它似乎不起作用。

来自Spark源代码的代码片段:

https://github . com/Apache/spark/blob/master/streaming/src/main/Scala/org/Apache/spark/串流/StreamingContext.scala

def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String): InputDStream[(K, V)] = 
{ new FileInputDStream[K, V, F](this, directory) }

def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = 
{ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) }

def fileStream[K: ClassTag,V: ClassTag,F <: NewInputFormat[K, V]: ClassTag] (directory: String,filter: Path => Boolean, newFilesOnly: Boolean, conf: Configuration): InputDStream[(K, V)] = 
{ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) }

代码片段:工作正常

val windowDStream = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), (x: Path) => true, true);

编译错误:

val conf = sc.hadoopConfiguration;
    val windowDStream = ssc.fileStream[LongWritable, Text, TextInputFormat](args(0), (x: Path) => true, true,conf);

错误:

overloaded method value fileStream with alternatives: (directory: String,filter: org.apache.hadoop.fs.Path ⇒ Boolean,newFilesOnly: Boolean)(implicit evidence$9: scala.reflect.ClassTag[org.apache.hadoop.io.LongWritable], implicit evidence$10: scala.reflect.ClassTag[org.apache.hadoop.io.Text], implicit evidence$11: scala.reflect.ClassTag[org.apache.hadoop.mapreduce.lib.input.TextInputFormat])org.apache.spark.streaming.dstream.InputDStream[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] <and> (directory: String)(implicit evidence$6: scala.reflect.ClassTag[org.apache.hadoop.io.LongWritable], implicit evidence$7: scala.reflect.ClassTag[org.apache.hadoop.io.Text], implicit evidence$8: scala.reflect.ClassTag[org.apache.hadoop.mapreduce.lib.input.TextInputFormat])org.apache.spark.streaming.dstream.InputDStream[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] cannot be applied to (String, org.apache.hadoop.fs.Path ⇒ Boolean, Boolean, org.apache.hadoop.conf.Configuration)

共有1个答案

颛孙沈义
2023-03-14

我假设您使用的是 Spark 1.2 或更早版本。如果从 master 分支更改为 1.2 分支,您将看到此重载不存在。事实上,文件输入DStream本身直到1.3才接受这个作为构造函数参数。

 类似资料:
  • ...行号不在文件中。每一行都是一个5维向量,并表示k-means算法的初始聚类质心。这里我们有5个初始集群。 接下来,我们有一个必须分配给集群的数据点文件,称为data.txt,如下所示: 这里的...表示我们有很多行数据点(对于这个问题,~10000)。同样,每一行都是一个5维向量。 这都有点让人摸不着头脑。我在main()或run()函数中创建hadoop配置对象吗?我在main函数中set

  • 主要内容:下载Hadoop,从命令提示符下载Hadoop,安装Hadoop,验证Hadoop安装,在Hadoop上安装SolrSolr可以和Hadoop一起使用。 由于Hadoop是用于处理大量数据,Solr帮助我们从这么大数据源中找到所需的信息。在本节中,我们将了解如何在系统上安装Hadoop。 下载Hadoop 下面给出了如何将Hadoop下载到系统中的步骤。 第1步 - 打开Hadoop主页 - www.hadoop.apache.org/。 单击链接版本,如下面的屏幕截图中突出显示。 它

  • 问题内容: 作为QA流程的一部分(在Jenkins中),目标是自动化VM的配置和配置以运行QA测试。 Jenkins管道可以触发Terraform代码来自动执行VM的配置和用于配置VM的ansible代码,但是,除非我们使用某些特定于供应商的模板(如AzureResourceManager模板),否则 回滚 , 错误处理之 类的问题并不容易。 因此,借助Jenkins管道,在Azure云中置备和配

  • info-jmssender JMS发件人已启动[2013-07-23 17:02:18,752]info-jmssender JMS传输发件人已初始化...在此之后,我创建了JMS消息存储< 并添加调度消息转发处理器 我的代理服务配置如下所示 将消息存储在消息存储区中,但如果endpoint不工作,则转发是错误的,尽管消息在WSO2esb中处理其丢失的消息

  • 问题内容: 我正在阅读有关MapReduce的内容,以下内容使我感到困惑。 假设我们有一个包含一百万个条目(整数)的文件,并且我们想使用MapReduce对它们进行排序。我了解的处理方式如下: 编写一个对整数排序的映射器函数。因此,框架会将输入文件分为多个块,并将它们分配给不同的映射器。每个映射器将彼此独立地对数据块进行排序。完成所有映射器后,我们会将其每个结果传递给Reducer,它将合并结果并

  • 问题内容: 我对Spring Boot配置有问题。 我已经使用https://start.spring.io/创建了基本的Spring Boot项目 我有一个问题,配置仅适用于子目录中的类: 我尝试了批注@ComponentScan,但没有帮助。 您知道我该怎么办吗? 问题答案: 在spring启动文档@SpringBootApplication状态 许多spring引导开发者总是有其主类注解为和