我对闪身是个新手。我正在尝试使用Flink1.3.2从我们的Kinesis流中读取并将输出写入一个Cassandra表。该程序能够从Kinesis流式传输数据。
提前道谢!
val env = StreamExecutionEnvironment.getExecutionEnvironment
val mapper = new ObjectMapper
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
"kinesis-stream", new SimpleStringSchema, ConsumerConfig))
//DataStream[(String, Long)]
val countsStreaming: DataStream[(String, Long)] = kinesis.map(x => mapper.readValue(x,classOf[java.util.Map[String,String]]))
.map(x => x.get("game_name"))
.map({x => (x,1L) })
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
countsStreaming.print()
CassandraSink.addSink(countsStreaming)
.setQuery("INSERT INTO example.values (id, counter) values (?, ?);")
.setClusterBuilder(new ClusterBuilder() {
override def buildCluster(builder: Cluster.Builder): Cluster = {
builder.addContactPoint("0.0.0.0").build()
}
}).build()
env.execute("StreamingExample")
问题是cassandrasink.addsink
只接受Java数据流。
您需要在scala数据流之后添加.javastream
,然后类型不匹配就会消失。
失败:ParseException行1:161输入“>”不匹配,在结构类型HIVE>创建外部表user1(id BIGINT,created_at STRING,source STRING,favorited BOOLEAN,retweet_count INT,retweeted_status struct sreen_name:STRING,name:STRING>>,entities stru
我是ANTLR的新手。我想写一个语法来解析下面的输入: 语法如下:: 当我尝试使用语法解析上述输入时,它会引发以下异常:: 第1行:0不匹配的输入'commit a1b2c3d4',应为'commit' 我已经引用了ANTLR4:不匹配的输入链接,但仍然不清楚发生了什么。
我在这里复制代码;https://developer.android.com/codelabs/kotlin-android-training-view-model#5 但我从DataBindingUtil中得到了一个类型不匹配。充气方法。正在返回ViewDataBinding!,当需要FragmentPlayBinding时。 我https://github.com/google-develop
问题内容: 编写内部API时遇到以下错误。我正在尝试以以下方式读取值(SQL Server 2012): 现在,虽然看起来有些奇怪,但我以这种方式阅读的原因是因为它是我编写的包装程序的一部分,我们使用它来加快sql的读写速度。它接受一个匿名对象,并根据属性名称或属性名称将所有sql值读入其中。 这适用于除以外的所有内容。我进行了类型比较,它也同样失败,因此很hacky,我什至无法检查该列是否为类型
问题内容: Whenver我有一个角表达式作为值的,它不会工作: 数字值将转换为字符串,并且复选框根本无法使用。 http://jsfiddle.net/punund/NRRj7/3/ 问题答案: 您不能动态更改输入的类型,因为IE不允许这样做,并且AngularJS需要跨浏览器兼容。因此,即使您可能会看到更改后的类型显示在源代码中,AngularJS也不会接受它- 类型仅被评估一次,并且无法更改
所以我在学习java,两天来我一直在寻找这个问题的解决方案。我尝试了所有的十进制分隔符,并试图设置语言环境,结果发现它不起作用。 代码: 输出和错误