当前位置: 首页 > 知识库问答 >
问题:

使用Apache Flink创建CEP

燕和裕
2023-03-14

我正在尝试为Kafka输入流实现一个非常简单的Apache Flink CEP。Kafka生产者生成一个简单的Double值,并通过Kafka主题将它们作为字符串发送给消费者。目前,我正在用Flink编码一个CEP消费者。到目前为止,这是我编写的代码:

public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); 
        env.setParallelism(3);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "flink_consumer");

        DataStream<String> stream = env
                .addSource(new FlinkKafkaConsumer09<>("temp", new SimpleStringSchema(), properties));

        Pattern<String, ?> warning= Pattern.<String>begin("first")
                .where(new IterativeCondition<String>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public boolean filter(String value, Context<String> ctx) throws Exception {
                        return Double.parseDouble(value) >= 89.0;
                    }
                })
                .next("second")
                .where(new IterativeCondition<String>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public boolean filter(String value, Context<String> ctx) throws Exception {
                        return Double.parseDouble(value) >= 89.0;
                    }
                })
                .within(Time.seconds(10));  
        DataStream<String> temp = CEP.pattern(stream, warning).select(new PatternSelectFunction<String, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String select(Map<String, List<String>> pattern) throws Exception {
                List warnung1 = pattern.get("first");
                String first = (String) warnung1.get(1);
                return first;
            }   

        });

        temp.print();
        env.execute();

    }

如果我正在尝试执行这段代码,这是一条错误消息:

编辑:我尝试了另一个例子,每次执行我都得到相同的错误。所以我觉得我的包裹有问题?

共有1个答案

令狐献
2023-03-14

在Flink1.6.0中,我的代码工作得非常好。

 类似资料:
  • 安装(下载 这是Flink的默认配置。 关于这里发生了什么事,有什么建议吗?

  • 问题内容: 我决定使用Java重新创建Snake,但是我有些困惑。目前,我有一个正方形,用户可以使用箭头键在屏幕上移动。当您按一次LEFT时,方型会开始使用计时器向左移动。当您按任何其他已设置的键(向右,向上,向下)时,它会改变方向。我的目标是使用ArrayList容纳组成蛇的正方形。目前,我已经创建了一个ArrayList,其中仅包含一个Snake对象,如果我将第二个Snake对象添加到列表中并

  • 问题内容: 我有在Objective-C中创建和NSAlert的代码,但是现在我想在Swift中创建它。 该警报是为了确认用户要删除文档。 我希望“删除”按钮可以运行删除功能,而“取消”按钮只是为了消除警报。 如何在Swift中编写此代码? 问题答案: 在OS X 10.10 Yosemite中已弃用。 迅捷2 返回或根据用户的选择。 表示添加到对话框的第一个按钮,此处为“确定”。 迅捷3 斯威夫

  • 问题内容: 我尝试使用下面的代码设置Cookie: 我已将角度cookie更新到1.3.14版本,我知道有一个重大更改,但是现在应该如何编写上面的代码? 运行上面的代码,我得到这个错误: 更新:我必须在2个文件中执行此操作: 问题答案: 通过设置变量来实现: 您的版本: 资源 注意: 请记住要包含在您的html中。

  • 我想创建一个Spring Bean Factory后处理器,将Bean添加到当前的ApplicationContext中。 我的中有很多Web-Services定义,我想尽可能地减少。 配置如下所示: 因此,我用以下bean定义创建了一个@Configuration类: 这好多了!,但是我想把它减少得更多,所以我想创建一个名为@WebService定义的注释,并添加一个BeanFactoryPos

  • null 有人能解释一下,当我使用PdfReader阅读模板后,我如何制作模板的副本吗?有没有办法把表格写到模板副本上,而不是一个新文档上? 为了将来的参考,我做了以下工作: