当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:所有窗口流上的窗口函数-组合Kafka主题

赫连方伟
2023-03-14

我正在尝试使用主题列表中的单个kafka使用者组合两个kafka主题,进一步将流中的json字符串转换为POJO。然后,通过keyBy(On事件时间字段)将它们加入,并将它们合并为单个胖json,我计划使用窗口流并在窗口流上应用窗口函数。假设主题A

我有几个问题。

  1. 这种方法适合合并主题并创建单个JSON吗
  2. 所有窗口流上的窗口函数似乎工作不正常;任何指点都将不胜感激

代码片段:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

logger.info("Flink Stream Window Charger has started");

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "127.0.0.1:1030");

properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka");

properties.setProperty("group.id", "group-0011");

properties.setProperty("auto.offset.reset", "smallest");



List < String > names = new ArrayList < > ();



names.add("Topic-A");

names.add("Topic-B");



DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties));

DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());

List < String > where = new ArrayList < String > ();

AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);

DataStream < String > data_charging = data_window.apply(new MyWindowFunction());

data_charging.addSink(new SinkFunction < String > () {



public void invoke(String value) throws Exception {



  // Yet to be implemented - Merge two POJO into one 

 }

});



try

{

 env.execute();

} catch (Exception e)

{

 return;

}

}

}

class Tokenizer implements FlatMapFunction < TopicPojo, String > {

 private static final long serialVersionUID = 1 L;

 @Override

 public void flatMap(TopicPojo value, Collector < String > out) throws Exception {

  ObjectMapper mapper = new ObjectMapper();

  out.collect(mapper.writeValueAsString(value));

 }

}

class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > {

 @Override

 public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out)

 throws Exception {

  int count = 0;

  for (TopicPojo in : arg2) {

   count++;

  }

  // Test Result - TO be modified

  out.collect("Window: " + window + "count: " + count);



 }

}

class Deserializer implements MapFunction < String, TopicPojo > {

 private static final long serialVersionUID = 1 L;

 @Override

 public TopicPojo map(String value) throws IOException {

  // TODO Auto-generated method stub

  ObjectMapper mapper = new ObjectMapper();

  TopicPojo obj = null;

  try {



   System.out.println(value);



   obj = mapper.readValue(value, TopicPojo.class);



  } catch (JsonParseException e) {



   // TODO Auto-generated catch block



   throw new IOException("Failed to deserialize JSON object.");



  } catch (JsonMappingException e) {



   // TODO Auto-generated catch block



   throw new IOException("Failed to deserialize JSON object.");

  } catch (IOException e) {



   // TODO Auto-generated catch block



   throw new IOException("Failed to deserialize JSON object.");

  }

  return obj;

 }

} 

我得到了-

AllWindowedStream类型中的方法Application(AllWindowFunction)不适用于参数(MyWindowFunction)错误。

共有1个答案

阎鸿煊
2023-03-14

AllWindowedStream是无键流,因此AllWindowedStreams的apply方法没有键参数。由于您正在对密钥流进行窗口化,因此data_window应该是密钥流。

 类似资料:
  • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

  • 在中,元素被分配给一个或多个实例。在滑动事件时间窗口的情况下,这发生在1中。 如果窗口的和,则将时间戳为0的元素分配到以下窗口: 窗口(开始=0,结束=5) 窗口(开始=-1,结束=4) 窗口(开始=-2,结束=3) 窗口(开始=-3,结束=2) 窗口(开始=-4,结束=1) 在一幅图片中: 有没有办法告诉Flink时间有开始,而在那之前,没有窗户?如果没有,从哪里开始寻求改变?在上述情况下,Fl

  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点

  • 我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问

  • 我写了一个小GUI程序与python在tkinter的窗口。我的窗口必须在全屏游戏窗口的前面。 此刻,我用这句话: 它适用于普通窗口(浏览器,浏览器,...),但如果我启动游戏到全屏模式,我的窗口隐藏在游戏后面。 为什么会发生这种情况?调用游戏可能类似于覆盖我的属性的? 我的问题还有别的解决办法吗?也许可以告诉windows,我的窗口应该在特定窗口(游戏窗口)的前面?

  • 我有一个,它是由一个kafka主题创建的,并且指定了属性。 当我试图创建一个时,会话窗口化了一个查询,如下所示: 我总是得到错误: KSQL不支持对窗口表的持久查询 如何在KSQL中创建开始会话窗口的事件的?