当前位置: 首页 > 知识库问答 >
问题:

Flink CEP模式检测不会实时发生

廉学潞
2023-03-14

我对Flink CEP库还是个新手,但我不了解模式检测行为。考虑到下面的示例,我有一个Flink应用程序,它使用来自kafka主题的数据,数据是定期生成的,我想使用Flink CEP模式来检测值何时大于给定阈值。代码如下:

public class CEPJob{
    
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
                properties);

        consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

        DataStream<String> stream = env.addSource(consumer);

        // Process incoming data.
        DataStream<Stock> inputEventStream = stream.map(new MapFunction<String, Stock>() {

            private static final long serialVersionUID = -491668877013085114L;

            @Override
            public Stock map(String value) {
                String[] data = value.split(":");

                System.out.println("Date: " + data[0] + ", Adj Close: " + data[1]);

                Stock stock = new Stock(data[0], Double.parseDouble(data[1]));

                return stock;
            }
        });

        // Create the pattern
        Pattern<Stock, ?> myPattern = Pattern.<Stock>begin("first").where(new SimpleCondition<Stock>() {
            private static final long serialVersionUID = -6301755149429716724L;

            @Override
            public boolean filter(Stock value) throws Exception {
                return (value.getAdj_Close() > 140.0);
            }

        });

        // Create a pattern stream from our warning pattern
        PatternStream<Stock> myPatternStream = CEP.pattern(inputEventStream, myPattern);

        // Generate alert for each matched pattern
        DataStream<Stock> warnings = myPatternStream .select((Map<String, List<Stock>> pattern) -> {
            Stock first = pattern.get("first").get(0);

            return first;
        });

        warnings.print();

        env.execute("CEP job");
    }
}

当我运行作业时会发生什么,模式检测不会实时发生,它只在生成第二条记录后才输出当前记录检测到的模式的警告,似乎延迟了将警告打印到日志中,我真的不知道如何让它在检测到模式时输出警告,而不等待下一条记录,谢谢:)。

来自Kafka的数据是字符串格式:“日期:值”,它每5秒生成一次数据。

Java版本:1.8,Scala版本:2.11.12,Flink版本:1.12.2,Kafka版本:2.3.0

共有1个答案

张星洲
2023-03-14

我发现的解决方案是,每次我给Kafka主题生成一个值时,都在该主题中发送一个假记录(例如空对象),在Flink端(在模式声明中),我测试接收到的记录是否是假的。FlinkCEP似乎总是在输出警告之前等待即将到来的事件。

 类似资料:
  • 问题内容: IE 10的现代桌面版本始终是全屏的。 W3上有一个针对伪类的有效规范 但是,当我尝试使用jQuery 1.9.x和2.x版检测全屏时: 它引发了这个错误: 语法错误,无法识别的表达式:全屏 是因为jQuery尚未识别此标准还是IE10? 检查全屏模式的 传统 方式是什么?我期待以下结果: } 我们可以在没有浏览器嗅探的情况下做到吗? 问题答案: 您已经发现,浏览器兼容性是一个很大的缺

  • 问题内容: 如果将persistence.xml放在src / test / META-INF文件夹中,则自动检测实体不适用于maven- verify。当persistence.xml位于src / main / META-INF文件夹中时,它将起作用。 在两种情况下都可以在Eclipse中运行测试。 当persistence.xml位于src / test文件夹中时,有没有办法使自动检测适用于

  • 并行度=1(成功检测到模式) 并行度=4(无法检测到模式)

  • 问题内容: 我有一个javax.servlet.http.HttpSessionListener的实现,该实现应该检测Struts项目中的用户会话无效/超时。 sessionDestroyed()似乎从未被调用过,我可以通过删除JSESSIONID并刷新页面来重现此内容。我还发现,保持浏览器打开直到会话超时也有相同的效果。 该站点正在Java 1.5的JBoss 4.2.3.GA中运行。 我开始怀

  • 我需要知道flinkcep的实现背后是否存在文件。如果有,那它们是什么?

  • 我在试验Eclipselink。我正在尝试更新数据库中的一个现有实体。JPA实体只有公共字段,字段使用JPA注释进行注释。代码使用加载实体。 代码创建JPA实体的实例,为实体的公共字段赋值,并调用EntityManager.merge(entity)方法。Eclipselink不更新数据库记录。我启用了log来查看Eclipselink是否发出SQL语句。 Eclipselink不发出任何upda