当前位置: 首页 > 知识库问答 >
问题:

Flink 1.10.1在平行度最大值大于1时表现不同

薛文斌
2023-03-14

首先,我已经在这里发现了这个问题:flink程序在并行性方面的行为不同,它看起来和我现在面临的问题一样,但是我认为我在我的场景中确实需要CEP,因为我每小时有超过1百万条属于不同用户密钥的记录需要分析。

所以当我用并行性1运行cep时,一切都运行得很好,即使是不同的用户键,但有点慢,因为flink需要在单个线程中逐个用户地分析用户,而这个操作需要足够快,以识别某种模式,然后在不到1分钟的时间内发送通知例如,这就是为什么我需要一个以上的并行线程。

在我的例子中,我使用RichFlatMapFunction来保持前一个模式以识别下一个模式,然后发送通知,下面是我的代码:

final DataStream<EventPush> eventsStream = RabbitMQConnector.eventStreamObject(env)
                .flatMap(new RabbitMQPushConsumer())
                .keyBy(k -> k.id);

private static SingleOutputStreamOperator<String> getPushToSend(KeyedStream<EventPush, String> stream) {
        return stream.flatMap(new WebPushFlatMapFunction())
                .map(x -> new ObjectMapper().writeValueAsString(x));
    }

/*the code below belongs to WebPushFlatMapFunction class, which is the RichFlatMapFunction using ValueState*/

 private boolean inTime(long start, long end) {
        final long difference = (end > start) ? (end - start) : (start - end);
        long time_frame = 120000L;
        return difference > 0 && time_frame >= difference;
    }

    @Override
    public void flatMap(EventPush value, Collector<EventPush> out) {
        final String pageName= value.pageName.trim();
        Tuple4<String, String, Long, Timestamp> prev;
        try {
            prev = previous.value();
            if (b_pageName.equalsIgnoreCase(pattern)) {
                LOG.info("umid " + value.idsUmid + " match (" + pattern + ") at: " + value.timestamp);
                previous.update(new Tuple4<>(value.idsUmid, pageName, value.timestamp.getTime(), value.timestamp));
            }
            if (prev != null) {
                if (inTime(value.timestamp.getTime(), prev.f2)) {
                    if ((prev.f1 != null && !prev.f1.equals("")) && prev.f1.equals(full_pattern) && pageName.equals(home) && prev.f3.before(value.timestamp)) {
                        if (PropertyFileReader.isWebPushLoggerActivated())
                            LOG.info("umid " + value.idsUmid + " match (" + home + ")" + "triggered at: " + value.timestamp);
                        prev.f1 = "";
                        out.collect(value);
                    }
                    if ((prev.f1 != null && !prev.f1.equals("")) && prev.f1.equals(pattern) && pageName.equals(full_pattern) && prev.f3.before(value.timestamp)) {
                        LOG.info("umid " + value.idsUmid + " match (" + full_pattern + ") at: " + value.timestamp);
                        prev.f3 = value.timestamp;
                        prev.f1 = pageName;
                        previous.update(prev);
                    }
                }
            }
        } catch (IOException e) {
            CatchHandler.generalCatchHandler(e);
        }
    }

通过并行度1,我得到正确的顺序:1,2,3。更多的是,我可以在一个线程中接收1,从另一个线程中接收3,因为所有的都属于同一个用户键,这3个状态将在不同的线程中进行分区。我的问题是:有没有什么方法可以用更多的并行性来做到这一点?亲切的问候。

共有1个答案

岳俊雅
2023-03-14

这听起来像是您希望将针对每个用户的所有分析放在一起,但同时执行针对不同用户的分析。这样做的方法是通过用户ID对流进行密钥。这确实意味着对于单个用户,他们的事件正在由单个(非并行)管道处理。

如果这太慢了,你可以做一些事情来加速它。通常最有帮助的事情包括:更有效的序列化、执行预聚合或增量聚合、删除密钥或重新平衡以及启用对象重用。

 类似资料:
  • 我正在 python 上做一个棋盘游戏,我需要在其中实现算法最小值。当我尝试增加搜索深度时,我的程序停止工作。我也尝试实施 alpha beta 削减,但它似乎无法正常工作。当我尝试其他深度值时,它开始进行无效播放,并且还出现此错误: 以下是我的代码: 阿尔法测试版修剪: 辅助功能: 启发式功能:

  • 如何让伪元素的宽度适应文字内容的同时受到最大宽度的限制,且在小于最大宽度时不自动换行,大于最大宽度时才换行? 这是用React封装Tooltip组件时遇到的一个问题,我的思路是给tooltip下的第一个子元素添加::before伪元素,当hover时tooltip出现。tooltip组件的定义如下: tooltip.css代码如下: 组件使用方式如下: 我想要的效果是,类似与ant design的

  • 还有其他关于datatable上的行运算符的帖子。它们要么太简单,要么解决了特定的场景 我这里的问题更一般。有一个使用dplyr的解决方案。我已经尝试过了,但没有找到一个使用数据的等效解决方案。表语法。你能推荐一个优雅的数据吗。与dplyr版本复制相同结果的表解决方案? 编辑1:真实数据集上建议解决方案的基准总结(10MB,73000行,24个数字列上的统计数据)。基准结果是主观的。然而,经过的时

  • 有没有一种方法可以获得小于的浮点类型所代表的最大值。 我看到了以下定义: 但我们真的应该这样定义这些价值观吗? 根据标准,d::numeric_limits

  • 如果数据库行的长度达到了大于max java.util.List length的值,并且我尝试获取所有数据,会发生什么?

  • 在GCP上部署Spring启动应用程序时,显示了以下错误。 错误:(gcloud.app.deploy)无法上载文件[/home/info/Project1/target/appengine staging/myproject-0.0.1-SNAPSHOT.jar],该文件的大小为[42865605](大于允许的最大大小[33554432])。请删除该文件或添加到应用程序中的skip\u file