当前位置: 首页 > 知识库问答 >
问题:

如何在Flink中打印聚合数据流?

冯敏达
2023-03-14

我有一个表示为set 的自定义状态计算,当我的数据流 > 看到来自Kafka的新事件时,它将不断更新。现在,每次更新状态时,我都希望将更新后的状态打印到stdout。想知道怎么在Flink中做到这一点吗?与所有的窗口和触发器操作很少混淆,我一直得到以下错误。

原因:java.lang.RuntimeException:记录具有long.min_value时间戳(=没有时间戳标记)。时间特性设置为“ProcessingTime”,还是忘记调用“DataStream.AssignTimeStampsandWatermarks(...)”?

我只想知道如何将我的聚合流Datastream > 打印到stdout或写回另一个kafka主题?

下面是引发错误的代码片段。

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

DataStream<Set<Long>> stream = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
   stream
      .aggregate(new MyCustomAggregation(100))
      .process(new ProcessFunction<Set<Long>, Object>() {
       @Override
         public void processElement(Set<Long> value, Context ctx, Collector<Object> out) throws Exception {
           System.out.println(value.toString());
         }
       });

共有1个答案

公良俊楚
2023-03-14

使用Flink将集合保持在状态可能非常昂贵,因为在某些情况下,集合将频繁地序列化和反序列化。如果可能,则首选使用Flink的内置ListState和MapState类型。

下面是一个示例,说明了以下几点:

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();

    env.fromElements(1L, 2L, 3L, 4L, 3L, 2L, 1L, 0L)
        .keyBy(x -> 1)
        .process(new KeyedProcessFunction<Integer, Long, List<Long>> () {
            private transient MapState<Long, Boolean> set;

            @Override
            public void open(Configuration parameters) throws Exception {
                set = getRuntimeContext().getMapState(new MapStateDescriptor<>("set", Long.class, Boolean.class));
            }

            @Override
            public void processElement(Long x, Context context, Collector<List<Long>> out) throws Exception {
                if (set.contains(x)) {
                    System.out.println("set contains " + x);
                } else {
                    set.put(x, true);
                    List<Long> list = new ArrayList<>();
                    Iterator<Long> iter = set.keys().iterator();
                    iter.forEachRemaining(list::add);
                    out.collect(list);
                }
            }
        })
        .print();

    env.execute();

}

请注意,我希望使用键控状态,但事件中没有任何可用作键的内容,所以我只是通过一个常量对流进行键控。这通常不是一个好主意,因为它阻止了并行处理--但是由于您是作为一个集合进行聚合,所以不能并行处理,所以没有坏处。

[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]
set contains 3
set contains 2
set contains 1
[0, 1, 2, 3, 4]
 类似资料:
  • 我通过以下说明创建了一个主题: 然后,我测试了这个主题是否有正确的数据。之后,我想在Flink程序中打印这个主题。我的计划是: 但是我得到了这个信息(因为信息太长了,我不得不写一些): [main]INFOorg.apache.flink.streaming.api.environment.LocalStream环境-在本地嵌入式Flink迷你集群上运行作业[main]INFOorg.apache

  • 我正在探索一种方法来实现这一点,就像下面的SQL一样。 是一个将聚合到

  • 我有一个窗口化的每小时聚合的数据流。 Datastreamds=.....

  • 我目前正在使用Flink 1.0编写一个聚合用例,作为该用例的一部分,我需要获得过去10分钟内登录的api数量。 这我可以很容易地使用keyBy("api"),然后应用10分钟的窗口和doe sum(count)操作。 但问题是我的数据可能会出现混乱,所以我需要一些方法来获取10分钟窗口内的api计数。。 例如:如果相同的api日志出现在两个不同的窗口中,我应该得到一个全局计数,即2,而不是两个单

  • 问题内容: 我正在使用以下聚合: 这有效,但是我只会得到无用的聚合-我也必须与… 相处 问题答案: 如果您想获取该ID的类别名称, 因此,子聚合(嵌套在上面)为您提供了id的categoryName。

  • 我正在尝试按照此处的步骤创建一个基本的 Flink 聚合 UDF。我已经添加了依赖项()并实现了 我已经实现了强制方法和其他一些方法:< code>accumulate,merge等。所有这些构建都没有错误。根据文件,我应该可以注册为 但是,似乎只需要作为输入。我收到一个不兼容的类型错误: 任何帮助都会很好。