我在Kinesis数据分析中使用Apache flink。
Flink版本:1.13.2 Jave:1.11
我正在使用来自Kafka的json消息。输入记录示例如下所示
null {"plateNumber":"506b9910-74a7-4c3e-a885-b5e9717efe3a","vignetteStickerId":"9e69df3f-d728-4fc8-9b09-42104588f772","currentTimestamp":"2022/04/07 16:19:55","timestamp":1649362795.444459000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null {"plateNumber":"5ffe0326-571e-4b97-8f7b-4f49aebb6993","vignetteStickerId":"6c2e1342-b096-4cc9-a92c-df61571c2c7d","currentTimestamp":"2022/04/07 16:20:00","timestamp":1649362800.638060000,"vehicleType":"CAR","vehicleModelType":"HONDA"}
null {"plateNumber":"d15f49f9-5550-4780-b260-83f3116ba64a","vignetteStickerId":"1366fbfe-7d0a-475f-9249-261ef1dd6de2","currentTimestamp":"2022/04/07 16:20:05","timestamp":1649362805.643749000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null {"plateNumber":"803508fb-9701-438e-9028-01bb8d96a804","vignetteStickerId":"b534369f-533e-4c15-ac3f-fc28cf0f3aba","currentTimestamp":"2022/04/07 16:20:10","timestamp":1649362810.648813000,"vehicleType":"CAR","vehicleModelType":"FORD"}
我想使用车辆类型(CAR OR TRUCK)和车辆模型类型(丰田、本田或福特)将这些记录汇总到20秒窗口中。SQL类比(sum(),按车辆类型分组,车辆模型类型)
我使用聚合函数来实现这一点。
import static java.util.Objects.isNull;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.springframework.stereotype.Component;
import com.helecloud.streams.demo.model.Vehicle;
import com.helecloud.streams.demo.model.VehicleStatistics;
@Component
public class VehicleStatisticsAggregator implements AggregateFunction<Vehicle, VehicleStatistics, VehicleStatistics> {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public VehicleStatistics createAccumulator() {
System.out.println("Creating Accumulator!!");
return new VehicleStatistics();
}
@Override
public VehicleStatistics add(Vehicle vehicle, VehicleStatistics vehicleStatistics) {
System.out.println("vehicle in add method : " + vehicle);
if (isNull(vehicleStatistics.getVehicleType())) {
vehicleStatistics.setVehicleType(vehicle.getVehicleType());
}
if (isNull(vehicleStatistics.getVehicleModelType())) {
vehicleStatistics.setVehicleModelType(vehicle.getVehicleModelType());
}
// if(isNull(vehicleStatistics.getStart())) {
//
// vehicleStatistics.setStart(vehicle.getTimestamp());
// }
// if(isNull(vehicleStatistics.getCurrentTimestamp())) {
//
// vehicleStatistics.setCurrentTimestamp(vehicle.getCurrentTimestamp());
// }
if (isNull(vehicleStatistics.getCount())) {
vehicleStatistics.setCount(1);
} else {
System.out.println("incrementing count for : vehicleStatistics : " + vehicleStatistics);
vehicleStatistics.setCount(vehicleStatistics.getCount() + 1);
}
vehicleStatistics.setEnd(vehicle.getTimestamp());
System.out.println("vehicleStatistics in add : " + vehicleStatistics);
return vehicleStatistics;
}
@Override
public VehicleStatistics getResult(VehicleStatistics vehicleStatistics) {
System.out.println("vehicleStatistics in getResult : " + vehicleStatistics);
return vehicleStatistics;
}
@Override
public VehicleStatistics merge(VehicleStatistics vehicleStatistics, VehicleStatistics accumulator) {
System.out.println("Coming to merge!!");
VehicleStatistics vs = new VehicleStatistics(
// vehicleStatistics.getStart(),
accumulator.getEnd(),
// vehicleStatistics.getCurrentTimestamp(),
vehicleStatistics.getVehicleType(), vehicleStatistics.getVehicleModelType(),
vehicleStatistics.getCount() + accumulator.getCount());
System.out.println("VehicleStatistics in Merge :" + vs);
return vs;
}
}
在上面的代码中,我也没有看到合并代码被调用。下面是主要的处理代码
@Service
public class ProcessingService {
@Value("${kafka.bootstrap-servers}")
private String kafkaAddress;
@Value("${kafka.group-id}")
private String kafkaGroupId;
public static final String TOPIC = "flink_input";
public static final String VEHICLE_STATISTICS_TOPIC = "flink_output";
@Autowired
private VehicleDeserializationSchema vehicleDeserializationSchema;
@Autowired
private VehicleStatisticsSerializationSchema vehicleStatisticsSerializationSchema;
@PostConstruct
public void startFlinkStreamProcessing() {
try {
processVehicleStatistic();
} catch (Exception e) {
// log.error("Cannot process", e);
e.printStackTrace();
}
}
public void processVehicleStatistic() {
try {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<Vehicle> consumer = createVehicleConsumerForTopic(TOPIC, kafkaAddress, kafkaGroupId);
consumer.setStartFromLatest();
System.out.println("Starting to consume!!");
consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
FlinkKafkaProducer<VehicleStatistics> producer = createVehicleStatisticsProducer(VEHICLE_STATISTICS_TOPIC, kafkaAddress);
DataStream<Vehicle> inputMessagesStream = environment.addSource(consumer);
inputMessagesStream
.keyBy((vehicle -> vehicle.getVehicleType().ordinal()))
// .keyBy(vehicle -> vehicle.getVehicleModelType().ordinal())
// .keyBy(new KeySelector<Vehicle, Tuple2<VehicleType, VehicleModelType>>() {
//
// /**
// *
// */
// private static final long serialVersionUID = 1L;
//
// @Override
// public Tuple2<VehicleType, VehicleModelType> getKey(Vehicle vehicle) throws Exception {
// return Tuple2.of(vehicle.getVehicleType(), vehicle.getVehicleModelType());
// }
// })
// .filter(v -> CAR.equals(v.getVehicleType()))
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
// .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new VehicleStatisticsAggregator())
.addSink(producer);
System.out.println("Adding to Sink!!");
environment.execute("Car Truck Counts By Model");
} catch(Exception e) {
e.printStackTrace();;
}
}
private FlinkKafkaConsumer<Vehicle> createVehicleConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup ) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id", kafkaGroup);
return new FlinkKafkaConsumer<>(topic, vehicleDeserializationSchema, properties);
}
private FlinkKafkaProducer<VehicleStatistics> createVehicleStatisticsProducer(String topic, String kafkaAddress){
return new FlinkKafkaProducer<>(kafkaAddress, topic, vehicleStatisticsSerializationSchema);
}
}
我得到的结果如下。
null {"end":1649362835.665466000,"vehicleType":"TRUCK","vehicleModelType":"HONDA","count":3}
null {"end":1649362825.656024000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":1}
null {"end":1649362850.675786000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":3}
null {"end":1649362855.677596000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA","count":1}
但是有什么方法可以验证这一点吗?
另一个问题是,我正在尝试基于多个键聚合结果,这是AglogIveFunction执行此操作的正确方法。
我在问这个,因为我看到了这个如何在Flink中求和多个字段?
因此,如果我必须在多个字段上聚合总和,是否可以聚合函数完成相同的任务?(我写代码的方式)
请告诉我。提前感谢。
通常情况下选择器可以直接定位到我们想要的元素,但是,当我们拿到一个jQuery对象后,还可以以这个对象为基准,进行查找和过滤。 最常见的查找是在某个节点的所有子节点中查找,使用find()方法,它本身又接收一个任意的选择器。例如如下的HTML结构: JavaScript Python Swift Scheme Haskell <!-- HTML结构 --> <ul class="lang">
问题内容: 我有两个域,是一对多关系中的一部分。我想知道如何查询孩子的父母FK吗?贝娄是父母/孩子的伪代码 上级: 儿童: 尽管我没有明确创建FK,但是grails会自行创建MySQL数据库。但是,当我想像这样通过FK查询孩子时: 我收到一个错误:找不到类[class mgr.AlumLanguage]的名称[alumProfileId]的属性 关于如何做到这一点的任何建议? 谢谢杰森 问题答
问题内容: 我在表中有以下几列: SCORE_PERSON_ID是一个变量。我需要对每个SCORE_PERSON_ID的SCORE_VOTE求和。 您能建议一个好的方法吗? 问题答案: 您需要一个和聚合函数,例如或
问题内容: 我有一个模特 我试图这样做来计算此查询集中的总和: 此查询有什么问题?还是有其他方法可以计算列总和? 我知道这可以通过在queryset上使用for循环来完成,但是我需要一个优雅的解决方案。 问题答案: 你可能正在寻找
我正在设置一个验证器,可以检查签名的有效性。 我所做的签名基于DSS级别LT,因此文档中内置了撤销检查。 我现在遇到的问题是在我在iText中开发的验证器级别上。它允许验证签名的有效性,但允许验证撤销的信息。根据我的研究,ITexts允许基于pkcs7.getCrl()验证签名本身中的信息。 然而,DSS签名将撤销信息纳入字典。 下面是我用来验证签名的代码:
问题内容: 我想识别该网络元素。它仅定义了这两个类。我不能执行以下操作,因为不使用空格分隔的值。什么是替代品? 问题答案: 我认为巴拉克·马诺斯的答案不能完全解释这一点。 想象一下,我们只有以下几个元素: XPath如何匹配 只匹配1个(完全匹配),巴拉克的答案 比赛1,比赛2和比赛3(比赛类别包含,课程顺序很重要) 匹配1、2、3和4(只要元素具有class 和) 同样,在这种情况下,Css S