当前位置: 首页 > 知识库问答 >
问题:

使用kafka源和使用Flink sql查询的Flink流表

公良英资
2023-03-14

我试图将数据从kafka主题读入DataStream并注册DataStream,然后使用tableEnvironment.sqlQuery(“sql”)查询数据,当tableEnvironment.execute()没有错误也没有输出时。

public static void main(String[] args){
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setStreamTimeCharacteristic(TimeCharateristic.EventTime);
   env.enableCheckpointing(5000);
   StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
   FlinkKafkaConsumer<Person> consumer = new FlinkKafkaConsumer(
                                                                "topic",
                                                                 new JSONDeserializer(),
                                                                 Job.getKafkaProperties
                                                               );

  consumer.setStartFromEarliest();
  DataStream<Person> stream = env.addSource(consumer).fliter(x -> x.status != -1).assignTimestampAndWatermarks(new AssignerWithPeriodicWatermarks<Person>(){
      long current = 0L;
      final long expire = 1000L;

      @Override
      public Watermakr getCurrentWatermark(){
         return new Watermark(current - expire);
      }

      @Override
      public long extractTimestamp(Person person){
         long timestamp = person.createTime;
         current = Math.max(timestamp,current);
         return timestamp;
      }
  });
  //set createTime as rowtime
  tableEnvironment.registerDataStream("Table_Person",stream,"name,age,sex,createTime.rowtime");
  Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
  tableEnvironment.toAppendStream(t,Types.Row(new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.LONG})).print();
tableEnvironment.execute("person-query");
}

依赖关系:

  1. flink-streaming-java2.11版本:1.9.0-csa1.0.0.0;
  2. Flink-Streaming-Scala2.11版本:1.9.0-CSA1.0.0.0;
  3. flink-connector-kafka2.11版本:1.9.0-csa1.0.0.0;
  4. flink-table-api-java-bridge2.11版本:1.9.0-csa1.0.0.0;
  5. flink-table-planner2.11版本:1.9.0-csa1.0.0.0;

共有1个答案

许淳
2023-03-14

在将SQL查询结果转换回DataStream的代码中,需要将res而不是t传递给ToAppendStream。(我不知道您发布的代码将如何编译--t在哪里声明?)我想你应该能做到

Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
tableEnvironment.toAppendStream(res,Row.class).print();

而不是麻烦于typeinformation

 类似资料:
  • 我有一个我真的无法解决的问题。所以我有一个kafka流,其中包含一些这样的数据: 我想用另一个值“bookingId”替换“adId”。此值位于csv文件中,但我无法真正弄清楚如何使其工作。 这是我的映射csv文件: 所以我的输出最好是这样的 该文件可以每小时至少刷新一次,因此它应该会接收对它的更改。 我目前有一个不适合我的代码: 代码只运行一次,然后停止,因此它不会使用csv文件转换kafka中

  • 我有一份使用python表api的Flink工作。现在,我的应用程序将使用额外的源流。我很好奇,使用表API消费多个源流的推荐方法是什么。 其他信息: 这两个输入流只是两个事件源。我想通过窗口操作将它们聚合在一起。这就像DataStream中的联合操作 谢谢

  • 我试图在Kafka流之上实现一个简单的CQRS/Event sourcing概念验证(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/所述) 我有4个基本部分: 命令处理器-命令流,左与聚合状态KTABLE连接。对于结果流中的每个条目,使用函数生成结果事件,并将它们发布到主题 问题是--有没有办法确保我在州存储中有聚

  • 我正在试用Kafka和Flink: 我使用flink制作人向Kafka发送推特流 如果我创建一个基本的RESTWebServices,我想我会失去流媒体的兴趣,对吗? 我应该向我的网络应用程序提供flink数据,还是应该将其发送到另一个Kafka主题,以便将其提供给网络应用程序? 非常感谢。 安托万

  • 我使用flink和Kafka创建了一个流媒体程序,用于流媒体mongodb oplog。根据与Flink支持团队的讨论,流的顺序不能通过kafka分区来保证。我已经创建了N个kafka分区,并希望每个分区创建N个flink kafka消费者,所以流的顺序应该至少在特定的分区中保持。请建议我是否可以创建分区特定的flink kafka消费者? 我正在使用env.setParallelism(N)进行

  • 我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。 线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apach