当前位置: 首页 > 知识库问答 >
问题:

将BroadcastConnectedStream连接到AsyncIO

锺离飞鸣
2023-03-14

我需要让一个AsyncIO富函数基于最新的一组规则执行调用。对于像map这样的操作,我能够通过以下博客文章处理一个具有丰富功能的BroadcastConnectedStream:https://flink.apache.org/2019/06/26/broadcast-state.html

但是,创建异步IO函数需要数据流作为输入,而BroadcastConnectedStream不是(https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.html)

有人对我如何克服这个限制有什么想法吗?这种情况是,我想要一个异步函数在外部世界异步调用出现非暂时性错误时将传入消息隐藏在状态中,并在kafka上接收到“继续”消息后恢复操作(我想我可以使用广播流)

共有2个答案

殳阳飙
2023-03-14

因此,首先,AsyncFunction不支持键控状态,因此您也必须解决这一问题,并通过CheckpointedFunction自己实现这一点。

一般来说,我不认为有任何开箱即用的东西可以用于这个案例。如果要使用broadcast,我能想到的最好办法是使用KeyedBroadcastProcessFunction沿流发送结果,然后使用AsyncIO函数。如果您实现了自己的状态处理,那么您可以保留所有失败的结果,只需重试即可。

然而,仅仅将所有请求作为列表获取并通过它们循环重试可能不是最好的主意,因为这可能会导致性能下降(您可以设置同时进行的请求数量,但此请求实际上会比其他请求持续更长的时间)。

徐昕
2023-03-14

我认为可以将BroadcastProcessFunction(不是键控函数)放在异步I/o操作符前面,但您必须在正在处理的其他流中进行并集,因为异步I/o只有一个输入。考虑到这是多么丑陋,寻找其他方式传达“前进”信号可能更可取。

或者您可能想看看有状态函数,它在这方面具有更大的灵活性。

 类似资料:
  • 问题内容: 我需要使用angular + bootstrap创建一个带有日期和时间的输入字段。我发现这个日期时间选择器看起来确实很需要我- 日期和时间在一个字段中,并且阻止了用户错误的版本。我写了一条指令,启动了datepickers,但是它改变了视图,并且模型没有改变……我也尝试了onSelect,但是也没有任何反应 js 如何解决?建立联系? 问题答案: 因此,问题是: 改变每在元件; 从元素

  • 问题内容: 如何使用SQLAlchemy连接到MS Access?在他们的网站上,它说连接字符串是access + pyodbc。这是否意味着我需要连接pyodbc?由于我是新手,请保持温柔。 问题答案: 从理论上讲,这将通过create_engine(“ access:/// some_odbc_dsn”)进行,但是自从SQLAlchemy 0.5以来,Access后端就一直没有使用过,并且尚不

  • 我正在调试我的网络应用程序,以找出我的一个请求中的网络错误。由于错误发生在库中,我宁愿使用Charles代理,而不是使用命令行。 我的问题是连接是由https完成的,它是不可见的,因为REPL没有通过代理。我试图将-Dhttp.proxy主机和-Dhttp.proxy端口作为参数添加到运行/调试配置中,但没有成功。 其他应用程序正在通过代理正确运行。 我错过了什么选择?

  • 我是nifi的新手,我想将SQL server数据库连接到nifi,并用处理器创建数据流。我怎样才能做到这一点,有没有人能帮我弄清楚这一点。 事先谢谢山姆

  • 我正在尝试将Azure与Android Studio连接起来。我已经按照关于Azure的教程,创建了一个移动服务。然后它给了我一些代码添加到我的应用程序,以便他们连接。但是,代码抛出了一个我无法理解的异常。我从头到脚在网上搜了一遍,也找不到答案。我认为这与依赖项或库的导入方式有关(通过将jar文件粘贴到libs文件夹中并添加依赖项)。 引发异常的代码(由Azure提供):

  • 我正在尝试将 kafka 与 windows 上的 mysql 连接起来。我没有使用汇合。我的 kafka 版本是 2.12 我已经启动了动物园管理员、Kafka、生产者和消费者,这一切都很好用。 我的MysQL版本是8.0.15 我已经在libs文件夹中复制了这3个jar文件 我的源代码quickstart mysql。属性文件代码为 当我运行命令时 我在控制台上收到此错误 请帮助我。 我也试过