当前位置: 首页 > 知识库问答 >
问题:

Apache Flink Streaming类型不匹配的平面地图功能

胡飞舟
2023-03-14

正在尝试在scala 2.10.4中使用0.10.0 flink版本的流式api。尝试编译此第一个版本时:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._

object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val words : DataStream[String] = text.flatMap[String](
      new Function[String,TraversableOnce[String]] { 
        def apply(line:String):TraversableOnce[String] = line.split(" ")
      })

    env.execute("Window Stream wordcount")
  }
}

我遇到编译时错误:

[error]  found   : String => TraversableOnce[String]
[error]  required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error]       new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error]       ^

在数据流的反编译版本中。类,我已将其包括在项目中。有接受此类类型的函数(最后一个):

public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence$12, ClassTag<R> evidence$13) {
        if (flatMapper == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }
        TypeInformation outType = (TypeInformation)Predef..MODULE$.implicitly(evidence$12);
        return package..MODULE$.javaToScalaStream((org.apache.flink.streaming.api.datastream.DataStream)this.javaStream.flatMap(flatMapper).returns(outType));
    }

    public <R> DataStream<R> flatMap(Function2<T, Collector<R>, BoxedUnit> fun, TypeInformation<R> evidence$14, ClassTag<R> evidence$15) {
        if (fun == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }
        Function2<T, Collector<R>, BoxedUnit> cleanFun = this.clean((F)fun);
        .anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
        return this.flatMap((FlatMapFunction<T, R>)flatMapper, evidence$14, evidence$15);
    }

    public <R> DataStream<R> flatMap(Function1<T, TraversableOnce<R>> fun, TypeInformation<R> evidence$16, ClassTag<R> evidence$17) {
        if (fun == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }
        Function1<T, TraversableOnce<R>> cleanFun = this.clean((F)fun);
        .anon flatMapper = new /* Unavailable Anonymous Inner Class!! */;
        return this.flatMap((FlatMapFunction<T, R>)flatMapper, evidence$16, evidence$17);
    }

这里会出什么问题?如果你能提供一些见解,我将不胜感激。提前谢谢你。

共有1个答案

高墨一
2023-03-14

问题是您正在导入Flink:org的JavaStreamExecutionEnvironment。阿帕奇。Flink。流动。应用程序编程接口。环境StreamExecutionEnvironment

您必须使用Scala变体的流执行环境,如下所示:导入org.apache.flink.streaming.api.scala.流执行环境。有了这样的改变,一切都在成功建造!

原始答案:问题是您正在将函数传递给flatMap()方法。但是flatMap()需要一个flatMap函数

 val words : DataStream[String] = text.flatMap[String](
      new FlatMapFunction[String,String] {
        override def flatMap(t: String, collector: Collector[String]): Unit = t.split(" ")
      })
 类似资料:
  • 我在这里复制代码;https://developer.android.com/codelabs/kotlin-android-training-view-model#5 但我从DataBindingUtil中得到了一个类型不匹配。充气方法。正在返回ViewDataBinding!,当需要FragmentPlayBinding时。 我https://github.com/google-develop

  • 问题内容: 编写内部API时遇到以下错误。我正在尝试以以下方式读取值(SQL Server 2012): 现在,虽然看起来有些奇怪,但我以这种方式阅读的原因是因为它是我编写的包装程序的一部分,我们使用它来加快sql的读写速度。它接受一个匿名对象,并根据属性名称或属性名称将所有sql值读入其中。 这适用于除以外的所有内容。我进行了类型比较,它也同样失败,因此很hacky,我什至无法检查该列是否为类型

  • 问题内容: 链接是 http://iipacademy.in/askpoll/ten_feed.php 异常在onPostExecute()方法(第4行)中: LOGCAT: 消息是一个数组,所以它的代码应该是什么或如何解决? 提前致谢 。。。 问题答案: 看起来响应是一个字符串而不是json数组 结果是一个json对象而不是json数组 应该

  • 问题内容: 我试图在Swift中使用JSONDecoder将JSON转换为Structs,所以我编写了所有Structs,将它们修改了几个小时,但它仍然给我这个错误。我不知道是否有办法查看给出此信息的行。我将在下面发布我的struct,并在其后发布Json File链接。 完整的错误描述是: typeMismatch(Swift.Dictionary ,Swift.DecodingError.Co

  • 使用任意数量的属性定义任何类型的简单react函数组件时出现类型脚本错误,包括使用FC、ComponentWithChildren等... 这是一个最小的复制品 给了我错误 类型“({ className }: { className?:未定义;}) = 我不明白这就是我总是定义组件的方式。打字稿中有什么我不知道的新东西吗?

  • 我对闪身是个新手。我正在尝试使用Flink1.3.2从我们的Kinesis流中读取并将输出写入一个Cassandra表。该程序能够从Kinesis流式传输数据。 提前道谢!