当前位置: 首页 > 知识库问答 >
问题:

为什么我的Flink独立群集没有收到我的作业?

刁星渊
2023-03-14

我在Flink(Java)中创建了一个程序来计算3个不同房间的9个假传感器的平均值。如果我启动jar文件,该程序运行良好。所以我决定启动flink独立集群来检查运行我的作业和相应任务的TaskManager,如这里(https://ci.apache.org/projects/flink/flink-docs-stable/tutorials/local_setup.html)。我正在我的机器上运行所有内容。为什么我看不到在仪表板(http://localhost:8081/#/overview)上运行的作业,但如果我查看日志文件(尾-f log/flink--client--*-T430.log),我可以看到正在处理的东西?此外,print()方法正在将输出溢出到控制台。

我使用此命令启动应用程序/bin/flink跑步示例/探索flink。jar-c

但可能在配置文件中有一些参数需要配置。这是我的代码:

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.sense.flink.mqtt.MqttTemperature;
import org.sense.flink.mqtt.TemperatureMqttConsumer;

public class SensorsMultipleReadingMqttEdgentQEP {

    private boolean checkpointEnable = true;
    private long checkpointInterval = 1000;
    private CheckpointingMode checkpointMode = CheckpointingMode.EXACTLY_ONCE;

    public SensorsMultipleReadingMqttEdgentQEP() throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        if (checkpointEnable)
            env.enableCheckpointing(checkpointInterval, checkpointMode);

        DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
        DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
        DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));
        DataStream<MqttTemperature> temperatureStreams = temperatureStream01.union(temperatureStream02)
                .union(temperatureStream03);

        DataStream<Tuple2<String, Double>> average = temperatureStreams.keyBy(new TemperatureKeySelector())
                .map(new AverageTempMapper());

        average.print();

        String executionPlan = env.getExecutionPlan();
        System.out.println("ExecutionPlan ........................ ");
        System.out.println(executionPlan);
        System.out.println("........................ ");

        // env.execute("SensorsMultipleReadingMqttEdgentQEP");
        env.execute();
    }

    public static class TemperatureKeySelector implements KeySelector<MqttTemperature, Integer> {

        private static final long serialVersionUID = 5905504239899133953L;

        @Override
        public Integer getKey(MqttTemperature value) throws Exception {
            return value.getId();
        }
    }

    public static class AverageTempMapper extends RichMapFunction<MqttTemperature, Tuple2<String, Double>> {

        private static final long serialVersionUID = -5489672634096634902L;
        private MapState<String, Double> averageTemp;

        @Override
        public void open(Configuration parameters) throws Exception {
            averageTemp = getRuntimeContext()
                    .getMapState(new MapStateDescriptor<>("average-temperature", String.class, Double.class));
        }

        @Override
        public Tuple2<String, Double> map(MqttTemperature value) throws Exception {
            String key = "no-room";
            Double temp = value.getTemp();

            if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
                key = "room-A";
            } else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
                key = "room-B";
            } else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
                key = "room-C";
            } else {
                System.err.println("Sensor not defined in any room.");
            }
            if (averageTemp.contains(key)) {
                temp = (averageTemp.get(key) + value.getTemp()) / 2;
            } else {
                averageTemp.put(key, temp);
            }
            return new Tuple2<String, Double>(key, temp);
        }
    }
}

谢谢,费利佩

共有1个答案

卞嘉许
2023-03-14

在我选择选项“将所需库提取到生成的JAR中”后,它起作用了。奇怪的是,我正在使用选项“将所需库打包到生成的JAR中”生成JAR,但它不起作用。

 类似资料:
  • 问题内容: 我在android虚拟机中使用以下代码 我收到HttpHostConnectException。不知道为什么?我已将网址中的地址从127.0.0.1更改为10.0.2.2,但仍然收到该异常。我的电脑中安装了wamp服务器,文件“ ReadingFromServer.php”位于“ www”文件夹中。 这是完整的堆栈跟踪 谢谢。 问题答案: 您是否在AndroidManifest.xml

  • 我正在尝试使用JUnit错误收集器报告错误。虽然我的断言失败了,但JUnit中并没有报告错误。但我在控制台中收到了“错误”信息。

  • 工人出现在图片上。为了运行我的代码,我使用了以下命令:

  • 问题内容: 我知道静态方法在类级别。因此,我知道我不需要创建实例来调用静态方法。但我也知道我可以将静态方法(如LIKE)称为实例方法。这是我感到困惑的地方,因为我期望从null对象调用静态方法(就像在调用实例方法中一样)。我真的很感谢一些解释,为什么我错了一个期望。 这是示例代码: 问题答案: 通过实例调用静态方法不需要实例存在。只要编译器能够确定变量的类型,它就可以在评估表达式并丢弃结果后静态进

  • 我想根据id加入Customer和Address对象。这些是我对kafka stream for Customer主题的输入 和以下fro地址 我使用了间隔连接以及使用TumblingEventTimeWindows和滑动窗口的JoinFunction,但它没有连接客户和地址流。我不明白我在代码中遗漏了什么。

  • 我使用surefire和failsafe分别执行单元测试和集成测试。所有测试都位于文件夹中。到目前为止,我有一个集成测试类,其测试方法(用@test注释)在所有单元测试运行时从不执行。这是我的pom的摘录。xml: 我使用maven目标来运行测试。