当前位置: 首页 > 知识库问答 >
问题:

Flink:使用kafka流连接文件

唐伟
2023-03-14

我有一个我真的无法解决的问题。所以我有一个kafka流,其中包含一些这样的数据:

{"adId":"9001", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"}

我想用另一个值“bookingId”替换“adId”。此值位于csv文件中,但我无法真正弄清楚如何使其工作。

这是我的映射csv文件:

9001;8
9002;10

所以我的输出最好是这样的

{"bookingId":"8", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"}

该文件可以每小时至少刷新一次,因此它应该会接收对它的更改。

我目前有一个不适合我的代码:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // create a checkpoint every 30 seconds
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

DataStream<String> adToBookingMapping = env.readTextFile(parameters.get("adToBookingMapping"));

DataStream<Tuple2<Integer,Integer>> input = adToBookingMapping.flatMap(new Tokenizer());

//Kafka Consumer
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", parameters.get("bootstrap.servers"));
properties.setProperty("group.id", parameters.get("group.id"));

FlinkKafkaConsumer010<ObjectNode> consumer = new FlinkKafkaConsumer010<>(parameters.get("inbound_topic"), new JSONDeserializationSchema(), properties);

consumer.setStartFromGroupOffsets();

consumer.setCommitOffsetsOnCheckpoints(true);

DataStream<ObjectNode> logs = env.addSource(consumer);

DataStream<Tuple4<Integer,String,Integer,Float>> parsed = logs.flatMap(new Parser());

// output -> bookingId, action, impressions, sum
DataStream<Tuple4<Integer, String,Integer,Float>> joined = runWindowJoin(parsed, input, 3);


public static DataStream<Tuple4<Integer, String, Integer, Float>> runWindowJoin(DataStream<Tuple4<Integer, String, Integer, Float>> parsed,
      DataStream<Tuple2<Integer, Integer>> input,long windowSize) {

  return parsed.join(input)
          .where(new ParsedKey())
          .equalTo(new InputKey())
          .window(TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)))
          //.window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
          .apply(new JoinFunction<Tuple4<Integer, String, Integer, Float>, Tuple2<Integer, Integer>, Tuple4<Integer, String, Integer, Float>>() {

              private static final long serialVersionUID = 4874139139788915879L;

              @Override
              public Tuple4<Integer, String, Integer, Float> join(
                              Tuple4<Integer, String, Integer, Float> first,
                              Tuple2<Integer, Integer> second) {
                  return new Tuple4<Integer, String, Integer, Float>(second.f1, first.f1, first.f2, first.f3);
              }
          });
}

代码只运行一次,然后停止,因此它不会使用csv文件转换kafka中的新条目。关于如何使用csv文件中的最新值处理来自Kafka的流的任何想法?

亲切的问候,

暗代

共有1个答案

贡斌
2023-03-14

您的目标似乎是将蒸汽数据与变化缓慢的目录(即侧面输入)连接起来。我不认为联接操作在这里有用,因为它不跨窗口存储目录条目。此外,文本文件是一个有界输入,其行读取一次。

考虑使用connect创建连接流,并将曲库数据存储为托管状态以执行查找。运算符的并行度需要为1。

通过研究“侧面输入”,看看人们今天使用的解决方案,你可能会找到更好的解决方案。参见FLIP-17和Dean Wampler在Flink Forward的演讲。

 类似资料:
  • 我使用的是和连接器jar版本为0.10.2,kafka版本为0.9.1,flink版本为1.0.0。 当我在IDE中作为独立的主程序运行Java消费者时,它工作得很好。但是当我从运行它时,我不会看到正在使用的消息,也不会看到中JobManager的stdout中的任何日志。请告诉我可能有什么问题。

  • 我有两个Kafka主题-和。第一个主题包含由唯一Id(称为)键入的recommendations对象。每个产品都有一个用户可以单击的URL。 主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由键控。 请注意 > 每个单击对象都会有一个相应的推荐对象。 click对象的时间戳将晚于Recommensions对象。 建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天

  • 我正在使用Apache Flink,并尝试通过使用Apache Kafka协议从它接收消息来连接到Azure eventhub。我设法连接到Azure eventhub并接收消息,但我不能使用这里(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-star

  • 我正在导入一个DB,其中包含一些表示多对多和一对多关系的链接表。 1-到目前为止,根据我对Kafka流的理解,我似乎需要为每个链接表提供一个流,以便执行聚合。KTable将不可用,因为记录是按键更新的。但是,聚合的结果可能是Ktable中的一个。 2-然后是外键上的连接问题。似乎唯一的方法是通过GlobalKtable。link-table-topic->link-table-stream->li

  • 我想使用Flink流媒体以低延迟处理市场数据( 我有一组计算,每个都订阅三个流:缓慢移动的参数数据、股票价格和汇率。 例如。 Params(缓慢滴答:每天一次或两次): 资源(每秒多次滴答声): fx(每秒多次滴答声): 每当任何股票、外汇汇率或参数数据发生变化时,我都想立即计算结果并将其输出为新流。这在逻辑上可以表示为连接: 例如选择价格=(params.strike-asset.spot)*f

  • 我正在使用Flink处理Kafka的流数据。流程非常基本,从Kafka开始消耗,数据丰富,然后汇到FS。 在我的例子中,分区的数量大于Flink并行级别。我注意到Flink并没有均匀地消耗所有分区。 有时,在一些Kafka分区中会创建滞后。重新启动该应用程序有助于Flink“重新平衡”消费,并快速关闭滞后。然而,过了一段时间,我看到其他分区出现了滞后等现象。 看到这种行为,我试图通过使用flink