当前位置: 首页 > 知识库问答 >
问题:

将保存点添加到数据流

麻华辉
2023-03-14

我已经添加了保存点到kafka源,它恢复kafka流,但相反,我想恢复我处理过的数据流

 DataStream<String> streamOfStrings = env.addSource(new FlinkKafkaConsumer010<>(topicname));

我的字符串流被进一步传递给另一个进程,该进程被转换为对象的数据流。我希望处理流的状态应该只被保持,而不是kafka流。有没有办法将保存点附加到数据流

 DataStream<Object> streamOfObject = App.convertToObject(streamOfStrings);

共有1个答案

狄雅珺
2023-03-14

一些澄清...

  1. 即使保持下游状态,Kafka源也必须保持某种状态,以了解它消耗了多少主题;从检查点或保存点重新启动时,它必须重播使其进入保存状态的上一个数据的时间内的所有数据,以及当前可用的数据。
  2. 保存的任何状态都需要有一定的持续时间(通常是时间限制),这样它就不会无限制地累积,对吗?

我能想到的最简单的方法是将app.ConvertToObject()方法转换为实现[ListCheckpoint][1]接口的适当函数。您可以将每个转换后的对象保存到一个列表中,但要达到一定的限制(请参见同一页上的BufferingSink示例)。

 类似资料:
  • 我想用自动递增的id(1,2,3…等)将多个数据保存到数据库中,而不是保存在同一列中。用户可以动态添加输入字段,最后单击submit按钮,以不同的id(自动递增的id)将数据保存到数据库中。 我的js生成用户想要的输入字段 形式: 路线: 控制器: 但它只在数据库中保存一个输入字段值。任何能帮助我的人。

  • 问题内容: 我试图弄清楚如何将以下数据添加到我的json对象。有人可以告诉我该怎么做。 网站范例 我的JSONObject需要合并以上示例。 问题答案: 为了得到这个结果: 拥有与以下相同的数据: 您可以使用以下代码:

  • 这是使用电子邮件密码注册帐户的所有代码,保存验证电子邮件,将用户数据保存到Firestore数据库中。只有Firestore数据库无法运行。 else { Toast.makeText(register.this, “Error ! ” task.getException().getMessage(), Toast.LENGTH_SHORT).show();progressBar.setVisib

  • 正如标题所说,当我将spring数据rest存储库添加到现有的spring数据rest webmvc项目中时,我开始在项目运行中出错。 我试图使用@ConvertBy(MyConverter.class)在实体中遵循此链接进行嵌入ID转换。然而,仅仅添加jar就会导致以下错误和大量o. s. c. i. s. PathMatchingResourcePatterResolver日志。

  • 问题内容: 我有一个具有表的应用程序,当您单击表中的项目时,它会使用其数据(FieldGroup)填充一组文本字段,然后您可以选择保存更改, 我想知道如何保存更改用户对我的postgres数据库进行的更改 。我正在为此应用程序使用vaadin和hibernate模式。到目前为止,我已经尝试做 我努力了 而且我也尝试过 最后两个给我以下错误 问题答案: 我已经弄清楚了如何对数据库进行更改,下面是一些

  • 通过SQL Developer,我向Oracle数据库添加了一个新的存储过程。它不仅列在存储过程列表中(与其他每个SP一起),而且我可以在SQL Developer上轻松地执行它。 问题是我似乎无法从外部应用程序执行它。数据库链接到它,我可以调用所有其他存储过程,但不能调用我刚刚创建的存储过程。我收到的错误消息是: ORA-06550:第1行,第7列:PLS-00201:标识符'SETDATE'必