当前位置: 首页 > 知识库问答 >
问题:

Flink CEP 未打印结果

苏凯
2023-03-14

我正在尝试打印出一个字符串,如果Hello和世界是使用Flink CEP库找到的。我的源是 Kafka,并使用控制台生产者来输入数据。这部分正在起作用。我可以打印出我在主题中输入的内容。但是,它不会打印出我的最后一条消息“世界真好!它甚至不会打印出它进入了lambda。下面是类

package kafka;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.util.Map;
import java.util.Properties;

/**
 * Created by crackerman on 9/16/16.
*/
public class WordCount {

public static void main(String[] args) throws Exception {

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("zookeeper.connect", "localhost:2181");
    properties.put("group.id", "test");
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    DataStream<String> src = see.addSource(new FlinkKafkaConsumer08<>("complexString",
                                                                      new SimpleStringSchema(),
                                                                      properties));

    src.print();


    Pattern<String, String> pattern = Pattern.<String>begin("first")
            .where(evt -> evt.contains("Hello"))
            .followedBy("second")
            .where(evt -> evt.contains("World"));

    PatternStream<String> patternStream = CEP.pattern(src, pattern);

    DataStream<String> alerts = patternStream.flatSelect(
            (Map<String, String> in, Collector<String> out) -> {
                System.out.println("Made it to the lambda");
                String first = in.get("first");
                String second = in.get("second");
                System.out.println("First: " + first);
                System.out.println("Second: " + second);

                if (first.equals("Hello") && second.equals("World")) {

                    out.collect("The world is so nice!");
                }


            });

    alerts.print();

    see.execute();
}

}

任何帮助都将不胜感激。

谢谢!

共有1个答案

于嘉许
2023-03-14

问题是下面这一行

 see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

如果这是删除,它的工作方式,我期望它。

 类似资料:
  • 问题内容: 我试图从我的postgres数据库中检索一些数据,并将它们打印为json。我成功地在没有json的情况下打印了它们,但我在json中需要它们。 main.go: 这是我访问localhost:1337 / db时得到的 这是终端上的输出: 有人知道是什么问题吗? 问题答案: 该包使用反射(结构的包),以存取字段。您需要导出结构的字段以使其起作用(以大写字母开头): 扫描时: 引用自:

  • 我已经从postman手动运行了url,我已经成功连接并打印了结果。 但是当我试图从一个方法调用它时,没有结果是打印。 下面是我使用的curl代码: 下面是curl_log的内容。文本 > 主机名localhost/codeignitertest在DNS缓存中找到 尝试127.0.0.1... TCP_NODELAY集 连接到localhost/codeignitertest(127.0.0.1)

  • 我正在使用Pycharm并试图将文本打印到控制台 最终打印命令('

  • 嗨,我有这样的东西: 如何打印排序结果?

  • 我有一个flask应用程序,只有一条路线,没有复杂的事情发生,运行在docker容器中。我一辈子都不能让print语句出现在日志中(

  • 如果图像无法加载,下面是代码