当前位置: 首页 > 知识库问答 >
问题:

Apache Flink:Wierd FlatMap行为

萧嘉茂
2023-03-14

我正在向Flink接收数据流。对于这些数据的每个“实例”,我都有一个时间戳。我可以检测从中获取数据的机器是否正在“生产”或“不生产”,这是通过位于其自身静态类中的自定义平面映射函数完成的。

我想计算机器生产/不生产的时间。我目前的方法是在两个简单列表中收集生产和非生产时间戳。对于数据的每个“实例”,我通过从最早的时间戳中减去最新的时间戳来计算当前的生产/非生产持续时间。但是,这给了我不正确的结果。当生产状态从生产状态变为非生产状态时,我清除生产时间戳列表,反之亦然,这样如果生产再次开始,持续时间从零开始。

我已经查看了收集各自时间戳的两个列表,发现了一些我不理解的地方。我的假设是,只要机器“生产”,生产时间戳列表中的第一个时间戳保持不变,而每个新的数据实例都会向列表中添加新的时间戳。显然,这种假设是错误的,因为我在列表中得到的时间戳似乎是随机的。不过,它们的顺序仍然正确。

以下是我的flatmap函数代码:

public static class ImaginePaperDataConverterRich extends RichFlatMapFunction<ImaginePaperData, String> {
    private static final long serialVersionUID = 4736981447434827392L;
    private transient ValueState<ProductionState> stateOfProduction;
    SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss.SS");
    DateFormat timeDiffFormat = new SimpleDateFormat("dd HH:mm:ss.SS");
    String timeDiffString = "00 00:00:00.000";
    List<String> productionTimestamps = new ArrayList<>();
    List<String> nonProductionTimestamps = new ArrayList<>();

    public String calcProductionTime(List<String> timestamps) {
        if (!timestamps.isEmpty()) {
            try {
                Date firstDate = dateFormat.parse(timestamps.get(0));
                Date lastDate = dateFormat.parse(timestamps.get(timestamps.size()-1));
                long timeDiff = lastDate.getTime() - firstDate.getTime();

                if (timeDiff < 0) {
                    System.out.println("Something weird happened. Maybe EOF.");
                    return timeDiffString;
                }

                timeDiffString = String.format("%02d %02d:%02d:%02d.%02d",
                    TimeUnit.MILLISECONDS.toDays(timeDiff),
                    TimeUnit.MILLISECONDS.toHours(timeDiff)   % TimeUnit.HOURS.toHours(1),
                    TimeUnit.MILLISECONDS.toMinutes(timeDiff) % TimeUnit.HOURS.toMinutes(1),
                    TimeUnit.MILLISECONDS.toSeconds(timeDiff) % TimeUnit.MINUTES.toSeconds(1),
                    TimeUnit.MILLISECONDS.toMillis(timeDiff)  % TimeUnit.SECONDS.toMillis(1));

            } catch (ParseException e) {
                e.printStackTrace();
            }
            System.out.println("State duration: " + timeDiffString);
        }
        return timeDiffString;
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<ProductionState> descriptor = new ValueStateDescriptor<>(
            "stateOfProduction",
            TypeInformation.of(new TypeHint<ProductionState>() {}),
            ProductionState.NOT_PRODUCING);
            stateOfProduction = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(ImaginePaperData ImaginePaperData, Collector<String> output) throws Exception {
        List<String> warnings = new ArrayList<>();
        JSONObject jObject = new JSONObject();
        String productionTime = "0";
        String nonProductionTime = "0";

        // Data analysis
        if (stateOfProduction == null || stateOfProduction.value() == ProductionState.NOT_PRODUCING && ImaginePaperData.actSpeedCl > 60.0) {
            stateOfProduction.update(ProductionState.PRODUCING);
        } else if (stateOfProduction.value() == ProductionState.PRODUCING && ImaginePaperData.actSpeedCl < 60.0) {
            stateOfProduction.update(ProductionState.NOT_PRODUCING);
        }

        if(stateOfProduction.value() == ProductionState.PRODUCING) {
            if (!nonProductionTimestamps.isEmpty()) {
                System.out.println("Production has started again, non production timestamps cleared");
                nonProductionTimestamps.clear();
            }
            productionTimestamps.add(ImaginePaperData.timestamp);

            System.out.println(productionTimestamps);
            productionTime = calcProductionTime(productionTimestamps);
        } else {
            if(!productionTimestamps.isEmpty()) {
                System.out.println("Production has stopped, production timestamps cleared");
                productionTimestamps.clear();
            }
            nonProductionTimestamps.add(ImaginePaperData.timestamp);
            warnings.add("Production has stopped.");

            System.out.println(nonProductionTimestamps);
            //System.out.println("Production stopped");
            nonProductionTime = calcProductionTime(nonProductionTimestamps);
        }
// The rest is just JSON stuff

我是否必须将这两个时间戳列表保存在ListState中?

编辑:因为另一个用户询问,这里是我得到的数据。

{'szenario': 'machine01', 'timestamp': '31.10.2018 09:18:39.432069', 'data': {1: 100.0, 2: 100.0, 101: 94.0, 102: 120.0, 103: 65.0}}

我期望的行为是,我的flink程序在两个列表中收集时间戳productionTimestamps和nonProductionTimestamps。然后,我希望我的calcProductionTime方法从第一个时间戳中减去列表中的最后一个时间戳,以获得我第一次检测到机器正在“生产”/“不生产”和它停止“生产”/“不生产”之间的持续时间。

共有1个答案

巫马阳飙
2023-03-14

我发现“看似随机”的时间戳的原因是Apache Flink的并行执行。当平行度设置为

我的快速修复方法是将程序的并行性设置为1,据我所知,这保证了事件的顺序。

 类似资料:
  • 问题内容: 说我有身份证。我将如何获得下一行或上一行? 问题答案: 这就是我用来查找上一个/下一个记录的方法。表格中的任何列都可以用作排序列,并且不需要联接或讨厌的技巧: 下一条记录(日期大于当前记录): 上一个记录(日期小于当前记录): 例:

  • 问题内容: 下面的小提琴有三个方块。 块1 包含三列。中间的列中有两行,每行设置为flex:1。 块2 包含三列。中间的列中有两行,每行设置为flex:1。第二行包含一条狗的图像。图像将不会缩小到包含图像的行的高度。 块3 仅包含中间的列,中间有两行,每行设置为flex:1。第二行包含一条狗的图像。图像确实缩小到包含图像的行的高度。 问题是,为什么块2中间列的第二行中的图像不缩小到包含该行的行的高

  • 我已经简化了下面的代码在更新的表视图。 由于某些原因,我没有像人们所期望的那样使用JavaFx的属性,也许这种行为与此有关。来自JTable和observer模式,我想尝试如何在javafx中实现这一点。然而,我的表格数据都很好,但当我试图改变背景颜色时,更多的行出现在表格范围之外。我已经检查了可观察列表的大小,结果与预期一致。 我无法真正看到或理解datas.set是如何产生这种行为的,这是我在

  • 行为是 yii\base\Behavior 或其子类的实例。 行为,也称为 mixins, 可以无须改变类继承关系即可增强一个已有的 组件 类功能。 当行为附加到组件后,它将“注入”它的方法和属性到组件, 然后可以像访问组件内定义的方法和属性一样访问它们。 此外,行为通过组件能响应被触发的事件,从而自定义或调整组件正常执行的代码。 定义行为 要定义行为,通过继承 yii\base\Behavior

  • 问题内容: 我正在尝试不同的JOIN查询,但没有得到想要的结果。 我有2张桌子: 我找不到想要的结果。 我想得到以下结果: 问题答案: 您不能具有这样的动态列数,但是可以 将数据连接 成字符串: 或者您可以使用或手动 旋转行( 我更喜欢后一种方法,对我来说似乎更灵活,但是在某些情况下可以大大减少代码量): 您还可以将前面的语句转换为 动态SQL, 如下所示:

  • 问题内容: 我有一个问题,如果我有一排像这样 我如何将其分成三行,如下所示: / J 问题答案: 您可以使用递归CTE: SQLFiddleDEMO 编辑: 基于Marek Grzenkowicz的回答和MatBailie的评论,全新的想法: 生成从1到max(qty)的数字,并在其上加入表。 SQLFiddle演示

  • 问题内容: 我想选择数据库中的所有行,但希望它们以倒序排列。意思是,我想将第一列数据用作新实体,将当前实体用作第一列。我想你明白我的意思 这是一个例子 至 问题答案: 使用固定的已知列,这里是做这的方法(我将表命名为“ grades”是自由的): 大概的概念: 创建并执行不同查询的并集。 由于您需要实际数据作为列标题,因此联合的第一部分如下所示: 仅该查询将复制结果,因此我们需要通过添加告诉MyS

  • 问题内容: 我遇到了一个非常奇怪的问题。我需要在Jenkins中配置代理,以便 能够访问其中一项作业的SVN存储库。我这样做有两种方式: 从命令行使用必需的参数启动Jenkins 在jenkins.xml文件中定义参数时,将Jenkins作为Windows服务启动。 Starting from command line : -Dhudson.model.DirectoryBrowserSuppor