当前位置: 首页 > 知识库问答 >
问题:

如何在Flink kafka消费者中动态获取处理kafka主题名称?

元修然
2023-03-14

目前,我有一个Flink集群,它想通过一个模式消费Kafka主题,通过使用这种方式,我们不需要维护一个硬代码Kafka主题列表。

import java.util.regex.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
...
private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
...
FlinkKafkaConsumer010<KafkaMessage> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, deserializerClazz.newInstance(), kafkaConsumerProps);
DataStream<KafkaMessage> input = env.addSource(kafkaConsumer);

--update--我需要知道主题信息的原因是我们需要这个主题名称作为参数,在即将到来的Flink sink部分中使用。

共有1个答案

陆子石
2023-03-14

您可以实现自己的自定义KafkaDeserializationSchema,如下所示:

  public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
    @Override
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }

    @Override
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
    }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    }
  }

使用自定义的KafkaDeserializationSchema,可以创建元素包含主题信息的DataStream。在我的演示案例中,元素类型是tuple2 ,因此可以通过tuple2#f0访问主题名称。

FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);
DataStream<Tuple2<String, String>> input = env.addSource(kafkaConsumer);

input.process(new ProcessFunction<Tuple2<String,String>, String>() {
            @Override
            public void processElement(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                String topicName = value.f0;
                // your processing logic here.
                out.collect(value.f1);
            }
        });
 类似资料:
  • 我在Scala中设置了Spark Kafka Consumer,它接收来自多个主题的消息: 我需要为每个主题的消息(将采用JSON格式)开发相应的操作代码。 我提到了以下问题,但其中的答案对我没有帮助: 从spark中的Kafka消息获取主题 那么,在接收到的DStream上是否有任何方法可用于获取主题名称以及消息以确定应该采取什么行动? 对此任何帮助都将不胜感激。谢谢你。

  • 大家好,我正在努力将一个简单的avro模式与模式注册表一起序列化。 设置: 两个用java编写的Flink jobs(一个消费者,一个生产者) 目标:生产者应该发送一条用ConfluentRegistryAvroSerializationSchema序列化的消息,其中包括更新和验证模式。 然后,使用者应将消息反序列化为具有接收到的模式的对象。使用。 到目前为止还不错:如果我将架构注册表上的主题配置

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 我正在开发一个spring boot kafka消费者应用程序。它将有不同的消费者在不同的主题上工作。使用者的所有信息都来自application.yml文件。 我无法将应用程序属性中的主题列表设置到KafKalistener。 在这两种情况下,我都得到以下错误: java.lang.IllegalArgumentException:无法解析占位符 从应用程序属性获取主题并将其设置在KafkaLi

  • 我的问题与单个消费者从多个话题消费有关。假设所有主题都加载了1M个记录,一个使用者必须处理这些记录。它将按照什么顺序从主题中读取(我的意思是首先读取哪个主题/分区,等等) Kafka内部资料的任何链接会有帮助吗?