当前位置: 首页 > 知识库问答 >
问题:

如何在apache flink中加入两个流?

龚勇锐
2023-03-14

我开始使用flink,看看官方教程之一。

据我所知,这个练习的目标是在时间属性上加入两个流。

任务:

此练习的结果是一个Tuple2记录的数据流,每个记录对应一个不同的rideId。您应该忽略结束事件,只在每次骑乘开始时加入事件,并提供相应的票价数据。

生成的流应打印到标准输出。

问:EnrichmentFunction如何连接这两个流aka。它如何知道参加哪个集市和哪个骑行?我希望它能够缓冲多个展会/游乐设施,直到一个即将到来的展会/游乐设施有一个匹配的合作伙伴。

据我所知,它只是保存了它看到的每一个骑乘/游乐设施,并将其与下一个最好的骑乘/游乐设施相结合。为什么这是一个正确的连接?

提供解决方案:

/*
 * Copyright 2017 data Artisans GmbH
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.dataartisans.flinktraining.solutions.datastream_java.state;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiFare;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiFareSource;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

/**
 * Java reference implementation for the "Stateful Enrichment" exercise of the Flink training
 * (http://training.data-artisans.com).
 *
 * The html" target="_blank">goal for this exercise is to enrich TaxiRides with fare information.
 *
 * Parameters:
 * -rides path-to-input-file
 * -fares path-to-input-file
 *
 */
public class RidesAndFaresSolution extends ExerciseBase {
    public static void main(String[] args) throws Exception {

        ParameterTool params = ParameterTool.fromArgs(args);
        final String ridesFile = params.get("rides", pathToRideData);
        final String faresFile = params.get("fares", pathToFareData);

        final int delay = 60;                   // at most 60 seconds of delay
        final int servingSpeedFactor = 1800;    // 30 minutes worth of events are served every second

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(ExerciseBase.parallelism);

        DataStream<TaxiRide> rides = env
                .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
                .filter((TaxiRide ride) -> ride.isStart)
                .keyBy("rideId");

        DataStream<TaxiFare> fares = env
                .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
                .keyBy("rideId");

        DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
                .connect(fares)
                .flatMap(new EnrichmentFunction());

        printOrTest(enrichedRides);

        env.execute("Join Rides with Fares (java RichCoFlatMap)");
    }

    public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
        // keyed, managed state
        private ValueState<TaxiRide> rideState;
        private ValueState<TaxiFare> fareState;

        @Override
        public void open(Configuration config) {
            rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
            fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
        }

        @Override
        public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiFare fare = fareState.value();
            if (fare != null) {
                fareState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                rideState.update(ride);
            }
        }

        @Override
        public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiRide ride = rideState.value();
            if (ride != null) {
                rideState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                fareState.update(fare);
            }
        }
    }
}

共有1个答案

凌恩
2023-03-14

在这个关于状态丰富的特定培训练习中,rideId的每个值都有三个事件——滑行开始事件、滑行结束事件和滑行票价。本练习的目的是将每个滑行开始事件与具有相同rideId的一个滑行票价事件连接起来,或者换句话说,在rideId上加入骑乘流和票价流,同时知道每个骑乘流和票价流只有一个。

此练习演示了键控状态在Flink中的工作原理。键控状态实际上是一个分片的键值存储。当我们有ValueState项时,例如ValueState

每次调用flatMap1和flatMap2时,上下文中都隐含着一个键(rideId),当我们调用rideState时。更新(骑乘)或骑乘状态。value()我们不是在访问单个变量,而是在键值存储中设置和获取一个条目,使用rideId作为键。

在此练习中,两个流都由rideId键控,因此对于每个不同的rideId可能有一个rideState元素和一个fareState元素。因此,提供的解决方案是缓冲大量游乐设施和票价,但每个rideId只有一个(这就足够了,因为游乐设施和票价在此数据集中完美配对)。

所以,你问:

EnrichmentFunction如何连接两个流aka。它如何知道该乘坐哪种车费?

而答案是

它加入了具有相同rideId的票价。

您所询问的这个特定练习展示了如何实现一个简单的扩展连接,以了解键控状态和连接流的概念。但使用Flink肯定可以实现更复杂的连接。查看有关使用DataStream API连接、与Flink的表API连接以及与Flink SQL连接的文档。

 类似资料:
  • 问题内容: 条件:请勿修改原始清单;仅JDK,无外部库。一线或JDK 1.3版本的加分点。 有没有比以下更简单的方法: 问题答案: 你可以使用Apache commons-collections库:

  • 问题内容: 我的数据库中有三个表,一个学生,一个班级和一个作业。POJO如下: 基本上,一个学生有一个当前班级,以及一份作业清单(他被分配到的所有班级)。 我的问题是,在hibernate状态下,我想知道所有没有班级() 或 分配给属于“ 1”学年班级但已设置为 我设法制定了两个单独的标准来获得我想要的东西: 和 但是我不知道如何用or来“统一”这两个条件。 问题答案: 如果要创建包含许多对象的

  • 问题内容: 如何在Seaborn中叠加两个图形?我的数据中有两列,我希望它们在同一图中。如何保存两个图形的标签。 问题答案: 在单个轴上运行的seaborn函数可以作为一个参数。 例如,文档包括: 因此,如果您这样做: 然后,您可以执行以下操作:

  • 问题内容: 我正在使用codeigniter框架开发一个音乐cms。我在mysql数据库中有3个表,目前我正在“相册”表和“模型,控制器”中工作。我想选择“专辑”表1并用“类别”->“ cat_id”联接“专辑”->“ cat_id”,并获取所有类别记录。 然后,我想在“ Soundtrack”->“ album_id”上加入“ Album”->“ album_id”,然后获取所有音轨记录A至Z。

  • 我想把两个音频文件合并成一个新文件。我尝试使用MP4parser库,它会导致一些问题,使用SquenceInputStream组合两个音频文件,它会增加音频文件的大小,但只播放初始歌曲。 参考:https://stackoverflow.com/a/43650758/8775993 与StackOverflow答案核对无任何效果。 请提出解决方案。提前谢谢。