当前位置: 首页 > 知识库问答 >
问题:

flink CassandrasInk.AddSink(输入)类型不匹配

蔡理
2023-03-14

我对闪身是个新手。我正在尝试使用Flink1.3.2从我们的Kinesis流中读取并将输出写入一个Cassandra表。该程序能够从Kinesis流式传输数据。

提前道谢!

val env = StreamExecutionEnvironment.getExecutionEnvironment
val mapper = new ObjectMapper
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
  "kinesis-stream", new SimpleStringSchema, ConsumerConfig))

//DataStream[(String, Long)]
val countsStreaming: DataStream[(String, Long)] = kinesis.map(x => mapper.readValue(x,classOf[java.util.Map[String,String]]))
  .map(x => x.get("game_name"))
  .map({x => (x,1L) })
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1)

countsStreaming.print()

CassandraSink.addSink(countsStreaming)
  .setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.setClusterBuilder(new ClusterBuilder() {
  override def buildCluster(builder: Cluster.Builder): Cluster = {
    builder.addContactPoint("0.0.0.0").build()
  }
}).build()

env.execute("StreamingExample")

共有1个答案

越嘉茂
2023-03-14

问题是cassandrasink.addsink只接受Java数据流。

您需要在scala数据流之后添加.javastream,然后类型不匹配就会消失。

 类似资料:
  • 失败:ParseException行1:161输入“>”不匹配,在结构类型HIVE>创建外部表user1(id BIGINT,created_at STRING,source STRING,favorited BOOLEAN,retweet_count INT,retweeted_status struct sreen_name:STRING,name:STRING>>,entities stru

  • 我是ANTLR的新手。我想写一个语法来解析下面的输入: 语法如下:: 当我尝试使用语法解析上述输入时,它会引发以下异常:: 第1行:0不匹配的输入'commit a1b2c3d4',应为'commit' 我已经引用了ANTLR4:不匹配的输入链接,但仍然不清楚发生了什么。

  • 我在这里复制代码;https://developer.android.com/codelabs/kotlin-android-training-view-model#5 但我从DataBindingUtil中得到了一个类型不匹配。充气方法。正在返回ViewDataBinding!,当需要FragmentPlayBinding时。 我https://github.com/google-develop

  • 问题内容: 编写内部API时遇到以下错误。我正在尝试以以下方式读取值(SQL Server 2012): 现在,虽然看起来有些奇怪,但我以这种方式阅读的原因是因为它是我编写的包装程序的一部分,我们使用它来加快sql的读写速度。它接受一个匿名对象,并根据属性名称或属性名称将所有sql值读入其中。 这适用于除以外的所有内容。我进行了类型比较,它也同样失败,因此很hacky,我什至无法检查该列是否为类型

  • 问题内容: Whenver我有一个角表达式作为值的,它不会工作: 数字值将转换为字符串,并且复选框根本无法使用。 http://jsfiddle.net/punund/NRRj7/3/ 问题答案: 您不能动态更改输入的类型,因为IE不允许这样做,并且AngularJS需要跨浏览器兼容。因此,即使您可能会看到更改后的类型显示在源代码中,AngularJS也不会接受它- 类型仅被评估一次,并且无法更改

  • 所以我在学习java,两天来我一直在寻找这个问题的解决方案。我尝试了所有的十进制分隔符,并试图设置语言环境,结果发现它不起作用。 代码: 输出和错误