当前位置: 首页 > 知识库问答 >
问题:

在Apache Flink中处理不生成警报的事件

贝自怡
2023-03-14

我正在使用Flink CEP,我需要处理甚至不生成警报的事件。请问我该怎么做?

我正在使用rabbitMq中的事件,并定义了一些模式。现在我需要做的是将另一个队列中接收到的所有事件发送到一个远程API。我是Flink的新手,所以我遵循了文档中的示例。当我在将接收到的事件与定义的模式进行匹配后尝试发送它们时,我只会得到与模式匹配的结果。例如,我想做的就是在我的事件中将一个属性设置为true,然后将它们全部发送到输出队列。

    public static void cep() throws Exception {
    /**
     * RabbitMQ connection
     */
    final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
            .setHost(HOST)
            .setPort(PORTS[RD.getValue()])
            .setUserName("guest")
            .setPassword("guest")
            .setVirtualHost("/")
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    /**
     * Retrieve data inputEventstream from rabbitMQ
     */
    final DataStream<String> inputEventstream = env
            .addSource(new RMQSource<>(
                    connectionConfig, // config for the RabbitMQ connection
                    "input", // name of the RabbitMQ queue to consume
                    true, // use correlation ids; can be false if only at-least-once is required
                    new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
            .setParallelism(1);
    /**
     * Change DataStream<String> to DataStream<MonitoringEvent> where
     * MonitoringEvent refer to a class which modelize our event.
     */
    DataStream<MonitoringEvent> inputEventStreamClean = inputEventstream.flatMap(new Tokenizer());

    Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("start")
            .subtype(MonitoringEvent.class)
            .where(new SimpleCondition<MonitoringEvent>() {
                @Override
                public boolean filter(MonitoringEvent value) {
                        return Integer.parseInt(value.getAncienneChute())>=CHUTE_GRAVE;
                }
            }).or(new SimpleCondition<MonitoringEvent>() {
                @Override
                public boolean filter(MonitoringEvent value) {
                        return value.isChaiseRoulante();
                }
            }).or(new SimpleCondition<MonitoringEvent>() {
                    @Override
                    public boolean filter(MonitoringEvent value) {
                        return value.isDeambulateur();
                }
            }).or(new SimpleCondition<MonitoringEvent>() {
                @Override
                public boolean filter(MonitoringEvent value) {
                    return value.isDeambulateur();
            }
            })
            .or(new SimpleCondition<MonitoringEvent>() {
                @Override
                public boolean filter(MonitoringEvent value) {
                    return EntityManager.getInstance().hasCurrentYearFallTwice(value.getIdClient());
            }
            });

    //PatternStream<MonitoringEvent> fallPatternStream = CEP.pattern(inputEventStreamClean.keyBy("idClient"), warningPattern);
    inputEventStreamClean.print();

    // Create a pattern stream from our warning pattern
    PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
            inputEventStreamClean.keyBy("idClient"),
            warningPattern);

    DataStream<FallWarning> warnings = tempPatternStream.select(
            (Map<String, List<MonitoringEvent>> pattern) -> {
                MonitoringEvent first = (MonitoringEvent) pattern.get("start").get(0);
                return new FallWarning(first.getIdClient(), Integer.valueOf(first.getAncienneChute()));
            }
    );

    // Alert pattern: Two consecutive temperature warnings appearing within a time interval of 20 seconds
    Pattern<FallWarning, ?> alertPattern = Pattern.<FallWarning>begin("start");

    // Create a pattern stream from our alert pattern
    PatternStream<FallWarning> alertPatternStream = CEP.pattern(
            //warnings.keyBy("idClient"),
                warnings,
            alertPattern);

    // Generate alert 
    DataStream<Alert> alerts = alertPatternStream.flatSelect(
            (Map<String, List<FallWarning>> pattern, Collector<Alert> out) -> {
                FallWarning first = pattern.get("start").get(0);

                if (first.idNiveauUrgence>=CHUTE_GRAVE && (first.isChaiseRoulante() || first.isDeambulateur() || first.isFracture())) {
                    out.collect(new Alert(first.idClient));
                }
            });

    // Print the warning and alert events to stdout
    warnings.print();

    alerts.print(); // here I can send them to RabbitMq

    env.execute();
}

共有1个答案

公西翊歌
2023-03-14

您只需要将Sink添加到您的“警报”DataStream中,例如

alert.addSink(new RMQSink<String>(
connectionConfig,            // config for the RabbitMQ connection
"queueName",                 // name of the RabbitMQ queue to send messages to
new SimpleStringSchema()));  // serialization schema to turn Java objects to messages

每个例子在

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/rabbitmq.html

 类似资料:
  • 我们正在构建一个flink用例,在这个用例中,我们从一个kafka主题进行消费,并根据平均、最大和最小阈值执行聚合和生成警报。我们还需要在翻滚事件时间窗口中有0个事件时通知每个键。我们很难想出一个同样的解决方案。我们考虑的选项如下,如果有其他想法我们还没有考虑,请告诉我们。 < li >可查询状态:保存每个过程窗口功能中的键。从外部应用程序查询状态,并在20分钟时间间隔到期后某个密钥丢失时发出警报

  • 本文向大家介绍如何在Selenium中处理基于Web的警报?,包括了如何在Selenium中处理基于Web的警报?的使用技巧和注意事项,需要的朋友参考一下 Selenium WebDriver借助Alert界面提供了多个API来处理弹出窗口或警报。 解雇() 这将取消警报按钮。 接受() 这将接受警报按钮。 getText() 这将提取警报文本。 sendKeys() 这将在警报框中输入文本。 示

  • 我正在编写一个简单的java注释处理器,它使用JavaPoet生成java类,然后将其写入文件管理器。 这个注释处理器正在将文件保存到中,而不是 我尝试将maven编译器插件中目录设置为生成的sources目录,但它仍然在class文件夹中生成它。 如何将生成的类保存在生成的源文件夹中?

  • 我试图在。ics文件中实现警报()。其思想是,每当在系统中创建指定的记录时,就会发送带有邀请的自动电子邮件。该事件工作正常,它正在发送,我能够添加到日历(谷歌日历和iPhone/Mac日历)。 所以问题是:是谷歌和苹果忽视了这些VALARM组件,还是我做错了什么? 我是否正确理解这个带有动作的警报应该只是在浏览器(谷歌日历)和日历应用程序中向我显示一个弹出窗口? 我的文件正文: null

  • 问题内容: 我想检测是否弹出警报。目前,我正在使用以下代码: 问题是,如果网页的当前状态没有警报,它将等待特定的时间,直到达到超时,然后引发异常,因此性能真的很差。 有没有更好的方法,也许可以将警报事件处理程序用于动态发生的警报? 问题答案: 这是使用“从此处进行显式等待”对我有用的WebDriver:高级用法

  • 我正在使用debezium SQL Server跟踪生产基地上的更改。创建了主题,CDC的工作非常出色,但是当试图使用jdbcSinkConnector将数据转储到另一个Sql Server DB中时,我遇到了以下错误。 在源数据库上,sql数据类型为。Kafka事件为1549461754650000000。架构类型为Int64。架构名io.debezium.time.nanotimestamp。