我在Flink(Java)中创建了一个程序来计算3个不同房间的9个假传感器的平均值。如果我启动jar文件,该程序运行良好。所以我决定启动flink独立集群来检查运行我的作业和相应任务的TaskManager,如这里(https://ci.apache.org/projects/flink/flink-docs-stable/tutorials/local_setup.html)。我正在我的机器上运行所有内容。为什么我看不到在仪表板(http://localhost:8081/#/overview)上运行的作业,但如果我查看日志文件(尾-f log/flink--client--*-T430.log),我可以看到正在处理的东西?此外,print()
方法正在将输出溢出到控制台。
我使用此命令启动应用程序/bin/flink跑步示例/探索flink。jar-c
但可能在配置文件中有一些参数需要配置。这是我的代码:
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.sense.flink.mqtt.MqttTemperature;
import org.sense.flink.mqtt.TemperatureMqttConsumer;
public class SensorsMultipleReadingMqttEdgentQEP {
private boolean checkpointEnable = true;
private long checkpointInterval = 1000;
private CheckpointingMode checkpointMode = CheckpointingMode.EXACTLY_ONCE;
public SensorsMultipleReadingMqttEdgentQEP() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
if (checkpointEnable)
env.enableCheckpointing(checkpointInterval, checkpointMode);
DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));
DataStream<MqttTemperature> temperatureStreams = temperatureStream01.union(temperatureStream02)
.union(temperatureStream03);
DataStream<Tuple2<String, Double>> average = temperatureStreams.keyBy(new TemperatureKeySelector())
.map(new AverageTempMapper());
average.print();
String executionPlan = env.getExecutionPlan();
System.out.println("ExecutionPlan ........................ ");
System.out.println(executionPlan);
System.out.println("........................ ");
// env.execute("SensorsMultipleReadingMqttEdgentQEP");
env.execute();
}
public static class TemperatureKeySelector implements KeySelector<MqttTemperature, Integer> {
private static final long serialVersionUID = 5905504239899133953L;
@Override
public Integer getKey(MqttTemperature value) throws Exception {
return value.getId();
}
}
public static class AverageTempMapper extends RichMapFunction<MqttTemperature, Tuple2<String, Double>> {
private static final long serialVersionUID = -5489672634096634902L;
private MapState<String, Double> averageTemp;
@Override
public void open(Configuration parameters) throws Exception {
averageTemp = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("average-temperature", String.class, Double.class));
}
@Override
public Tuple2<String, Double> map(MqttTemperature value) throws Exception {
String key = "no-room";
Double temp = value.getTemp();
if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
key = "room-A";
} else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
key = "room-B";
} else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
key = "room-C";
} else {
System.err.println("Sensor not defined in any room.");
}
if (averageTemp.contains(key)) {
temp = (averageTemp.get(key) + value.getTemp()) / 2;
} else {
averageTemp.put(key, temp);
}
return new Tuple2<String, Double>(key, temp);
}
}
}
谢谢,费利佩
在我选择选项“将所需库提取到生成的JAR中”
后,它起作用了。奇怪的是,我正在使用选项“将所需库打包到生成的JAR中”
生成JAR,但它不起作用。
问题内容: 我在android虚拟机中使用以下代码 我收到HttpHostConnectException。不知道为什么?我已将网址中的地址从127.0.0.1更改为10.0.2.2,但仍然收到该异常。我的电脑中安装了wamp服务器,文件“ ReadingFromServer.php”位于“ www”文件夹中。 这是完整的堆栈跟踪 谢谢。 问题答案: 您是否在AndroidManifest.xml
我正在尝试使用JUnit错误收集器报告错误。虽然我的断言失败了,但JUnit中并没有报告错误。但我在控制台中收到了“错误”信息。
工人出现在图片上。为了运行我的代码,我使用了以下命令:
问题内容: 我知道静态方法在类级别。因此,我知道我不需要创建实例来调用静态方法。但我也知道我可以将静态方法(如LIKE)称为实例方法。这是我感到困惑的地方,因为我期望从null对象调用静态方法(就像在调用实例方法中一样)。我真的很感谢一些解释,为什么我错了一个期望。 这是示例代码: 问题答案: 通过实例调用静态方法不需要实例存在。只要编译器能够确定变量的类型,它就可以在评估表达式并丢弃结果后静态进
我想根据id加入Customer和Address对象。这些是我对kafka stream for Customer主题的输入 和以下fro地址 我使用了间隔连接以及使用TumblingEventTimeWindows和滑动窗口的JoinFunction,但它没有连接客户和地址流。我不明白我在代码中遗漏了什么。
我使用surefire和failsafe分别执行单元测试和集成测试。所有测试都位于文件夹中。到目前为止,我有一个集成测试类,其测试方法(用@test注释)在所有单元测试运行时从不执行。这是我的pom的摘录。xml: 我使用maven目标来运行测试。