当前位置: 首页 > 知识库问答 >
问题:

Flink AggregateFunction通过多个键查找总和(验证过程和测试)

戚飞雨
2023-03-14

我在Kinesis数据分析中使用Apache flink。

Flink版本:1.13.2 Jave:1.11

我正在使用来自Kafka的json消息。输入记录示例如下所示

null    {"plateNumber":"506b9910-74a7-4c3e-a885-b5e9717efe3a","vignetteStickerId":"9e69df3f-d728-4fc8-9b09-42104588f772","currentTimestamp":"2022/04/07 16:19:55","timestamp":1649362795.444459000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null    {"plateNumber":"5ffe0326-571e-4b97-8f7b-4f49aebb6993","vignetteStickerId":"6c2e1342-b096-4cc9-a92c-df61571c2c7d","currentTimestamp":"2022/04/07 16:20:00","timestamp":1649362800.638060000,"vehicleType":"CAR","vehicleModelType":"HONDA"}
null    {"plateNumber":"d15f49f9-5550-4780-b260-83f3116ba64a","vignetteStickerId":"1366fbfe-7d0a-475f-9249-261ef1dd6de2","currentTimestamp":"2022/04/07 16:20:05","timestamp":1649362805.643749000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA"}
null    {"plateNumber":"803508fb-9701-438e-9028-01bb8d96a804","vignetteStickerId":"b534369f-533e-4c15-ac3f-fc28cf0f3aba","currentTimestamp":"2022/04/07 16:20:10","timestamp":1649362810.648813000,"vehicleType":"CAR","vehicleModelType":"FORD"}

我想使用车辆类型(CAR OR TRUCK)和车辆模型类型(丰田、本田或福特)将这些记录汇总到20秒窗口中。SQL类比(sum(),按车辆类型分组,车辆模型类型)

我使用聚合函数来实现这一点。

import static java.util.Objects.isNull;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.springframework.stereotype.Component;

import com.helecloud.streams.demo.model.Vehicle;
import com.helecloud.streams.demo.model.VehicleStatistics;

@Component
public class VehicleStatisticsAggregator implements AggregateFunction<Vehicle, VehicleStatistics, VehicleStatistics> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public VehicleStatistics createAccumulator() {
        System.out.println("Creating Accumulator!!");
        return new VehicleStatistics();
    }

    @Override
    public VehicleStatistics add(Vehicle vehicle, VehicleStatistics vehicleStatistics) {

        System.out.println("vehicle in add method : " + vehicle);

        if (isNull(vehicleStatistics.getVehicleType())) {
            vehicleStatistics.setVehicleType(vehicle.getVehicleType());
        }

        if (isNull(vehicleStatistics.getVehicleModelType())) {
            vehicleStatistics.setVehicleModelType(vehicle.getVehicleModelType());
        }

//    if(isNull(vehicleStatistics.getStart())) {
//
//      vehicleStatistics.setStart(vehicle.getTimestamp());
//    }

//    if(isNull(vehicleStatistics.getCurrentTimestamp())) {
//
//        vehicleStatistics.setCurrentTimestamp(vehicle.getCurrentTimestamp());
//      }

        if (isNull(vehicleStatistics.getCount())) {

            vehicleStatistics.setCount(1);
        } else {

            System.out.println("incrementing count for : vehicleStatistics : " + vehicleStatistics);
            vehicleStatistics.setCount(vehicleStatistics.getCount() + 1);
        }

        vehicleStatistics.setEnd(vehicle.getTimestamp());

        System.out.println("vehicleStatistics in add : " + vehicleStatistics);

        return vehicleStatistics;
    }

    @Override
    public VehicleStatistics getResult(VehicleStatistics vehicleStatistics) {
        System.out.println("vehicleStatistics in getResult : " + vehicleStatistics);
        return vehicleStatistics;
    }

    @Override
    public VehicleStatistics merge(VehicleStatistics vehicleStatistics, VehicleStatistics accumulator) {

        System.out.println("Coming to merge!!");

        VehicleStatistics vs = new VehicleStatistics(
                // vehicleStatistics.getStart(),
                accumulator.getEnd(),
                // vehicleStatistics.getCurrentTimestamp(),
                vehicleStatistics.getVehicleType(), vehicleStatistics.getVehicleModelType(),
                vehicleStatistics.getCount() + accumulator.getCount());

        System.out.println("VehicleStatistics in Merge :" + vs);

        return vs;

    }
}

在上面的代码中,我也没有看到合并代码被调用。下面是主要的处理代码

@Service
public class ProcessingService {

  @Value("${kafka.bootstrap-servers}")
  private String kafkaAddress;

  @Value("${kafka.group-id}")
  private String kafkaGroupId;

  public static final String TOPIC = "flink_input";

  public static final String VEHICLE_STATISTICS_TOPIC = "flink_output";

  @Autowired
  private  VehicleDeserializationSchema vehicleDeserializationSchema;

  @Autowired
  private  VehicleStatisticsSerializationSchema vehicleStatisticsSerializationSchema;

  @PostConstruct
  public void startFlinkStreamProcessing() {
    try {

      processVehicleStatistic();
    } catch (Exception e) {

     // log.error("Cannot process", e);
        e.printStackTrace();
    }
  }

  public void processVehicleStatistic()  {
      
      
    try {
        
         StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

            FlinkKafkaConsumer<Vehicle> consumer = createVehicleConsumerForTopic(TOPIC, kafkaAddress, kafkaGroupId);

            consumer.setStartFromLatest();
            
            System.out.println("Starting to consume!!");

            consumer.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

            FlinkKafkaProducer<VehicleStatistics> producer = createVehicleStatisticsProducer(VEHICLE_STATISTICS_TOPIC, kafkaAddress);

            DataStream<Vehicle> inputMessagesStream = environment.addSource(consumer);
            
            

            inputMessagesStream
            .keyBy((vehicle -> vehicle.getVehicleType().ordinal()))
          //  .keyBy(vehicle -> vehicle.getVehicleModelType().ordinal())
//              .keyBy(new KeySelector<Vehicle, Tuple2<VehicleType, VehicleModelType>>() {
//      
//                  /**
//                   * 
//                   */
//                  private static final long serialVersionUID = 1L;
//      
//                  @Override
//                  public Tuple2<VehicleType, VehicleModelType> getKey(Vehicle vehicle) throws Exception {
//                    return Tuple2.of(vehicle.getVehicleType(), vehicle.getVehicleModelType());
//                  }
//                })
               
//              .filter(v -> CAR.equals(v.getVehicleType()))
                .window(TumblingEventTimeWindows.of(Time.seconds(20)))
//              .windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(new VehicleStatisticsAggregator())
             
            .addSink(producer);
            
            System.out.println("Adding to Sink!!");

            environment.execute("Car Truck Counts By Model");

        
    } catch(Exception e) {
        e.printStackTrace();;
    }

  }

  private FlinkKafkaConsumer<Vehicle> createVehicleConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup ) {

    Properties properties = new Properties();

    properties.setProperty("bootstrap.servers", kafkaAddress);

    properties.setProperty("group.id", kafkaGroup);

    return new FlinkKafkaConsumer<>(topic, vehicleDeserializationSchema, properties);

  }

  private FlinkKafkaProducer<VehicleStatistics> createVehicleStatisticsProducer(String topic, String kafkaAddress){

    return new FlinkKafkaProducer<>(kafkaAddress, topic, vehicleStatisticsSerializationSchema);
  }

}

我得到的结果如下。

null    {"end":1649362835.665466000,"vehicleType":"TRUCK","vehicleModelType":"HONDA","count":3}
null    {"end":1649362825.656024000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":1}
null    {"end":1649362850.675786000,"vehicleType":"CAR","vehicleModelType":"TOYOTA","count":3}
null    {"end":1649362855.677596000,"vehicleType":"TRUCK","vehicleModelType":"TOYOTA","count":1}

但是有什么方法可以验证这一点吗?

另一个问题是,我正在尝试基于多个键聚合结果,这是AglogIveFunction执行此操作的正确方法。

我在问这个,因为我看到了这个如何在Flink中求和多个字段?

因此,如果我必须在多个字段上聚合总和,是否可以聚合函数完成相同的任务?(我写代码的方式)

请告诉我。提前感谢。

共有1个答案

丁阳炎
2023-03-14

只有在使用合并窗口时,才会调用Merge,换句话说,就是使用会话窗口或自定义合并窗口。

基于多个键进行聚合的正确方法是将keyBy与复合类型一起使用,例如Tuple2

 类似资料:
  • 通常情况下选择器可以直接定位到我们想要的元素,但是,当我们拿到一个jQuery对象后,还可以以这个对象为基准,进行查找和过滤。 最常见的查找是在某个节点的所有子节点中查找,使用find()方法,它本身又接收一个任意的选择器。例如如下的HTML结构: JavaScript Python Swift Scheme Haskell <!-- HTML结构 --> <ul class="lang">

  • 问题内容: 我有两个域,是一对多关系中的一部分。我想知道如何查询孩​​子的父母FK吗?贝娄是父母/孩子的伪代码 上级: 儿童: 尽管我没有明确创建FK,但是grails会自行创建MySQL数据库。但是,当我想像这样通过FK查询孩子时: 我收到一个错误:找不到类[class mgr.AlumLanguage]的名称[alumProfileId]的属性 关于如何做到这一点的任何建议? 谢谢杰森 问题答

  • 问题内容: 我在表中有以下几列: SCORE_PERSON_ID是一个变量。我需要对每个SCORE_PERSON_ID的SCORE_VOTE求和。 您能建议一个好的方法吗? 问题答案: 您需要一个和聚合函数,例如或

  • 问题内容: 我有一个模特 我试图这样做来计算此查询集中的总和: 此查询有什么问题?还是有其他方法可以计算列总和? 我知道这可以通过在queryset上使用for循环来完成,但是我需要一个优雅的解决方案。 问题答案: 你可能正在寻找

  • 我正在设置一个验证器,可以检查签名的有效性。 我所做的签名基于DSS级别LT,因此文档中内置了撤销检查。 我现在遇到的问题是在我在iText中开发的验证器级别上。它允许验证签名的有效性,但允许验证撤销的信息。根据我的研究,ITexts允许基于pkcs7.getCrl()验证签名本身中的信息。 然而,DSS签名将撤销信息纳入字典。 下面是我用来验证签名的代码:

  • 问题内容: 我想识别该网络元素。它仅定义了这两个类。我不能执行以下操作,因为不使用空格分隔的值。什么是替代品? 问题答案: 我认为巴拉克·马诺斯的答案不能完全解释这一点。 想象一下,我们只有以下几个元素: XPath如何匹配 只匹配1个(完全匹配),巴拉克的答案 比赛1,比赛2和比赛3(比赛类别包含,课程顺序很重要) 匹配1、2、3和4(只要元素具有class 和) 同样,在这种情况下,Css S