当前位置: 首页 > 知识库问答 >
问题:

如何在Flink Java API中获取keyBy()后的数据流键

黄凌龙
2023-03-14
    DataStream<Event> stream = environment.addSource(mySource)

    stream.keyBy(new KeySelector<Event,Integer>() {
    public Integer getKey(Event event) { return event.getClientId(); })
.window(TumblingEventTimeWindows.of(Time.minutes(1))).aggregate(new MyAggregateFunction)

我如何获得我之前指定的密钥?我没有在累加器中注入输入事件的键,因为我觉得我不会很好。

共有1个答案

钱峻
2023-03-14

而不是

.aggregate(new MyAggregateFunction)

您可以使用

.aggregate(new MyAggregateFunction, new MyProcessWindowFunction)

在这种情况下,将向ProcessWindowFunction的process方法传递密钥,同时传递AggregateFunction的预聚合结果和具有其他潜在相关信息的上下文对象。有关更多详细信息,请参阅文档中关于ProcessWindowFunction with增量聚合的部分。

 类似资料:
  • 问题内容: 如何使用PHP从JSON对象获取数据? 这是我拥有的PHP代码: 问题答案: foreach($json as $i){ echo $i[‘name’]; }

  • 我以以下形式向服务器发送请求: 请求已正确解析为以下方法: 但是,我通过请求内容传递附加数据。如何检索这些数据? 为了举例,让我们假设,请求是从表单发送的:

  • 问题内容: 这是我的类,用于从数据库中获取数据 这是我的文件: 当我运行该程序时,出现异常后,请帮助我如何解决它。我是Hibernate的新手,尝试学习但被卡住了。 虽然我能够将数据存储在数据库中,但我有2个用于第一和第二类的数据获取数据,但在获取数据时遇到了问题PLZ帮助。 问题答案: 让我引述一下: 据我所知,您正在使用表名。 所以应该是这样的:

  • 问题内容: 我需要从表单获取数据。 我使用JavaScript创建表单: 那么我需要从输入字段中获取数据。 这是我的视图函数: 但我得到一个错误: 帮我从表格中获取数据。 问题答案: 从Flask的请求对象获取表单数据: 你还可以设置默认值以避免400错误,如下所示:

  • Jenkins有一个$CAUSE变量可用于freestyle构建作业。 我如何在工作流程中访问这个或类似的东西? 我的团队在现有临时构建的电子邮件输出中使用它。我们希望在基于工作流的新作业中继续这样做。

  • 在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?