我试图将数据从kafka主题读入DataStream并注册DataStream,然后使用tableEnvironment.sqlQuery(“sql”)查询数据,当tableEnvironment.execute()没有错误也没有输出时。
public static void main(String[] args){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharateristic.EventTime);
env.enableCheckpointing(5000);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
FlinkKafkaConsumer<Person> consumer = new FlinkKafkaConsumer(
"topic",
new JSONDeserializer(),
Job.getKafkaProperties
);
consumer.setStartFromEarliest();
DataStream<Person> stream = env.addSource(consumer).fliter(x -> x.status != -1).assignTimestampAndWatermarks(new AssignerWithPeriodicWatermarks<Person>(){
long current = 0L;
final long expire = 1000L;
@Override
public Watermakr getCurrentWatermark(){
return new Watermark(current - expire);
}
@Override
public long extractTimestamp(Person person){
long timestamp = person.createTime;
current = Math.max(timestamp,current);
return timestamp;
}
});
//set createTime as rowtime
tableEnvironment.registerDataStream("Table_Person",stream,"name,age,sex,createTime.rowtime");
Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
tableEnvironment.toAppendStream(t,Types.Row(new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.LONG})).print();
tableEnvironment.execute("person-query");
}
依赖关系:
在将SQL查询结果转换回DataStream的代码中,需要将res
而不是t
传递给ToAppendStream
。(我不知道您发布的代码将如何编译--t
在哪里声明?)我想你应该能做到
Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
tableEnvironment.toAppendStream(res,Row.class).print();
而不是麻烦于typeinformation
。
我有一个我真的无法解决的问题。所以我有一个kafka流,其中包含一些这样的数据: 我想用另一个值“bookingId”替换“adId”。此值位于csv文件中,但我无法真正弄清楚如何使其工作。 这是我的映射csv文件: 所以我的输出最好是这样的 该文件可以每小时至少刷新一次,因此它应该会接收对它的更改。 我目前有一个不适合我的代码: 代码只运行一次,然后停止,因此它不会使用csv文件转换kafka中
我有一份使用python表api的Flink工作。现在,我的应用程序将使用额外的源流。我很好奇,使用表API消费多个源流的推荐方法是什么。 其他信息: 这两个输入流只是两个事件源。我想通过窗口操作将它们聚合在一起。这就像DataStream中的联合操作 谢谢
我试图在Kafka流之上实现一个简单的CQRS/Event sourcing概念验证(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/所述) 我有4个基本部分: 命令处理器-命令流,左与聚合状态KTABLE连接。对于结果流中的每个条目,使用函数生成结果事件,并将它们发布到主题 问题是--有没有办法确保我在州存储中有聚
我正在试用Kafka和Flink: 我使用flink制作人向Kafka发送推特流 如果我创建一个基本的RESTWebServices,我想我会失去流媒体的兴趣,对吗? 我应该向我的网络应用程序提供flink数据,还是应该将其发送到另一个Kafka主题,以便将其提供给网络应用程序? 非常感谢。 安托万
我使用flink和Kafka创建了一个流媒体程序,用于流媒体mongodb oplog。根据与Flink支持团队的讨论,流的顺序不能通过kafka分区来保证。我已经创建了N个kafka分区,并希望每个分区创建N个flink kafka消费者,所以流的顺序应该至少在特定的分区中保持。请建议我是否可以创建分区特定的flink kafka消费者? 我正在使用env.setParallelism(N)进行
我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。 线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apach