我有一个表示为set
的自定义状态计算,当我的数据流
看到来自Kafka的新事件时,它将不断更新。现在,每次更新状态时,我都希望将更新后的状态打印到stdout。想知道怎么在Flink中做到这一点吗?与所有的窗口和触发器操作很少混淆,我一直得到以下错误。
原因:java.lang.RuntimeException:记录具有long.min_value时间戳(=没有时间戳标记)。时间特性设置为“ProcessingTime”,还是忘记调用“DataStream.AssignTimeStampsandWatermarks(...)”?
我只想知道如何将我的聚合流Datastream
打印到stdout或写回另一个kafka主题?
下面是引发错误的代码片段。
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
DataStream<Set<Long>> stream = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
stream
.aggregate(new MyCustomAggregation(100))
.process(new ProcessFunction<Set<Long>, Object>() {
@Override
public void processElement(Set<Long> value, Context ctx, Collector<Object> out) throws Exception {
System.out.println(value.toString());
}
});
使用Flink将集合保持在状态可能非常昂贵,因为在某些情况下,集合将频繁地序列化和反序列化。如果可能,则首选使用Flink的内置ListState和MapState类型。
下面是一个示例,说明了以下几点:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1L, 2L, 3L, 4L, 3L, 2L, 1L, 0L)
.keyBy(x -> 1)
.process(new KeyedProcessFunction<Integer, Long, List<Long>> () {
private transient MapState<Long, Boolean> set;
@Override
public void open(Configuration parameters) throws Exception {
set = getRuntimeContext().getMapState(new MapStateDescriptor<>("set", Long.class, Boolean.class));
}
@Override
public void processElement(Long x, Context context, Collector<List<Long>> out) throws Exception {
if (set.contains(x)) {
System.out.println("set contains " + x);
} else {
set.put(x, true);
List<Long> list = new ArrayList<>();
Iterator<Long> iter = set.keys().iterator();
iter.forEachRemaining(list::add);
out.collect(list);
}
}
})
.print();
env.execute();
}
请注意,我希望使用键控状态,但事件中没有任何可用作键的内容,所以我只是通过一个常量对流进行键控。这通常不是一个好主意,因为它阻止了并行处理--但是由于您是作为一个集合进行聚合,所以不能并行处理,所以没有坏处。
[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]
set contains 3
set contains 2
set contains 1
[0, 1, 2, 3, 4]
我通过以下说明创建了一个主题: 然后,我测试了这个主题是否有正确的数据。之后,我想在Flink程序中打印这个主题。我的计划是: 但是我得到了这个信息(因为信息太长了,我不得不写一些): [main]INFOorg.apache.flink.streaming.api.environment.LocalStream环境-在本地嵌入式Flink迷你集群上运行作业[main]INFOorg.apache
我正在探索一种方法来实现这一点,就像下面的SQL一样。 是一个将聚合到
我有一个窗口化的每小时聚合的数据流。 Datastreamds=.....
我目前正在使用Flink 1.0编写一个聚合用例,作为该用例的一部分,我需要获得过去10分钟内登录的api数量。 这我可以很容易地使用keyBy("api"),然后应用10分钟的窗口和doe sum(count)操作。 但问题是我的数据可能会出现混乱,所以我需要一些方法来获取10分钟窗口内的api计数。。 例如:如果相同的api日志出现在两个不同的窗口中,我应该得到一个全局计数,即2,而不是两个单
问题内容: 我正在使用以下聚合: 这有效,但是我只会得到无用的聚合-我也必须与… 相处 问题答案: 如果您想获取该ID的类别名称, 因此,子聚合(嵌套在上面)为您提供了id的categoryName。
我正在尝试按照此处的步骤创建一个基本的 Flink 聚合 UDF。我已经添加了依赖项()并实现了 我已经实现了强制方法和其他一些方法:< code>accumulate,merge等。所有这些构建都没有错误。根据文件,我应该可以注册为 但是,似乎只需要作为输入。我收到一个不兼容的类型错误: 任何帮助都会很好。