当前位置: 首页 > 知识库问答 >
问题:

沉没到目的地后的工艺元件

秦城
2023-03-14

我正在设置一个从 Kafka 读取并汇到 HDFS 的 flink 管道。我想在addSink()步骤之后处理元素。这是因为我想设置触发文件,指示某个分区/小时的数据写入(到接收器)已完成。如何实现这一点?目前我正在使用铲斗水槽。

>

  • DataStream消息流=env. addSource(flinkKafka消费者011);

    //将消息流转换为keyedStream的一些聚合

    keyedStream.addSink(水槽);

    //如何处理3之后的元素。?

  • 共有1个答案

    莫承运
    2023-03-14

    Flink API不支持将作业图扩展到接收器之外。(但是,您可以在向接收器写入数据的同时分叉流并执行其他处理。)

    使用流式文件接收器,您可以观察到零件文件在完成时转换到完成状态。有关更多信息,请参见JavaDoc。

    状态存在于单个操作符中——只有该操作符(例如ProcessFunction)可以修改它。如果你想在接收完成后修改键值状态,没有简单的方法。一种想法是在ProcessFunction中添加一个处理时间计时器,它具有定期唤醒并检查新完成的零件文件的键控状态,并根据它们的存在修改状态。或者,如果粒度不合适,则编写一个自定义源来做类似的事情,并将信息流式传输或广播到ProcessFunction(这将必须是一个协ProcessFunction或KeyedBroadcastProcessFunction)中,它可以用它来进行必要的状态更新。

     类似资料:
    • 我正在运行一个Django服务,它为selenium启动chromedriver,并从网站上获取数据。Django服务由另一个Java服务通过HTTP调用。 这是代码: views.py 刮刀py公司 问题是,在java服务完成所有调用并完成整个过程后,有25-50个chrome进程孤立在RAM中,占用超过1 GB的内存。我在这里有什么不对吗?

    • 我试图在Flink的数据流上应用每窗口功能。以下是我的代码 下面是我的实现MyProcessWindow函数 然而,当我试图通过maven编译上述代码时,我得到了以下错误 知道我做错了什么吗?仅供参考,我正在使用ApacheFlink 1.5.1版,并在Mac上使用maven3编译Java代码。

    • 我有一个 spring-boot 项目的 gradle 构建作为 Jenkins 的工作。我正在使用 Jenkins 的 artifactory 插件将生成的 JAR 发布到我们的 artifactory 服务器。 构建成功完成并发布工件,但 Jenkins 控制台报告与 Artifactory 通信时出错(摘自下面列出的控制台)。 我使用的是Jenkins 1.597、artifactory插件

    • 我正在尝试进入沉浸式模式,使用 工具栏部分隐藏在状态栏后面。那么我如何使工具栏再次正确地出现呢?

    • 我正试图使用MacBook上的scp将本地文件复制到远程服务器。 当我知道文件存在(我已经检查并重新检查了路径)时,我不断地得到错误“没有这样的文件或目录”。该文件具有u、g和O的文件rwx权限。文件不是符号链接。 文件a2.pdf位于本地计算机的根目录中。我还复制了路径,就像我使用pwd时所显示的那样,它包含在目录中,如下所示: 我正在登录到远程服务器时启动此命令。给出了本地路径的错误。