当前位置: 首页 > 知识库问答 >
问题:

具有数百万输出的Apache Flink flatMap

陶山
2023-03-14

每当我收到消息时,我都想从数据库中读取,可能会返回数百万行,然后我想将这些行传递到流中。这在Flink被认为是良好的做法吗?

public static class StatsReader implements FlatMapFunction<Msg, Json> {

    Transactor txor = 
        ...;

    @Override
    public void flatMap(Msg msg, Collector<Json> out) {

        //Possibly lazy and async stream
        java.util.Stream<Json> results = 
            txor.exec(Stats.read(msg)); 

        results.foreach(stat->out.collect(stat));

    }
}

编辑:

背景:我想动态运行报告。db基本上是一个巨大的窗口。该报告基于该窗口实时数据。该报告具有高度的可定制性,因此很难预先处理结果或定义管道。

我今天使用的是vanilla java,管道大致是这样的:报告定义-

共有1个答案

吴欣然
2023-03-14

原则上这应该是可能的。但是,我建议使用异步函数,而不是平面映射函数。

请注意,这样的设置可能需要调整检查点参数,例如检查点间隔。

 类似资料:
  • 问题内容: 我正在使用Python进行一些数据分析。我有两个表,第一个(叫它“ A”)有1000万行和10列,第二个(“ B”)有7300万行和2列。他们有1个具有共同ID的列,我想根据该列将两个表相交。特别是我想要表的内部联接。 我无法将表B作为pandas数据框加载到内存中,以在pandas上使用常规合并功能。我尝试通过读取表B上的文件的块,将每个块与A相交,并将这些交集连接起来(内部联接的输

  • 我有数百万个不同标题的csv文件,我想把它们合并到一个大数据框中。 我的问题是我尝试过的解决方案有效,但太慢了!顺便说一句,我可以访问Sparklyr在我的实验室中处理多节点集群,这个大数据工具会有帮助吗? 文件如下所示: 文件1 校长1,校长3,校长5 a、 b,c 文件2 校长4,校长2 e、 f 文件3 校长2,校长6 a, c 我想把它们合并成: 校长1,校长2,校长3,校长4,校长5,校

  • 问题内容: 我进行了很多搜索,但找不到任何东西。.我只是想问一下是否有任何方法可以创建和调用不带参数的过程( Informix )。我知道如何返回一个或多个值(用于过程和函数),但这不是我想要的。如果Informix不允许输出参数,那将真的很奇怪。 提前致谢! 编辑 :是的,我看到了可能,但是我仍然无法执行这样的过程。例如: 我收到的是: 常规mytest无法解决 并且仅在执行具有输出参数的功能时

  • 问题内容: 有没有一种方法可以使用DecimalFormat(或其他一些标准格式化程序)来格式化数字,如下所示: 1,000,000 => 100万 1,234,567 => 1.23M 1,234,567,890 => 1234.57M 基本上是将某个数字除以100万,保留小数点后两位,并在最后打一个’M’。我曾考虑过创建NumberFormat的新子类,但它看起来比我想象的要复杂。 我正在编写

  • 问题内容: 我下面有以下代码示例。你可以在其中输入的命令,即回显结果。但是,先读后。其他输出流不起作用? 为什么会这样或我做错了什么?我的最终目标是创建一个线程计划任务,该任务定期执行对/ bash的命令,因此必须一前一后工作,而不能停止工作。我也一直在经历错误的任何想法? 谢谢。 问题答案: 首先,我建议更换生产线 与线 ProcessBuilder是Java 5中的新增功能,它使运行外部进程更

  • 问题内容: 嗨,我有一个表测试,其结构如下: 现在,我需要查询该表(测试),以便获得以下输出。 Oracle 11g中的sql查询是否可以实现?11g中的PIVOT功能是否可以实现? 问题答案: 不,它不能用来完成,但是 可以 用来完成: 当不可用时,我通常会像这样取消透视: