当前位置: 首页 > 知识库问答 >
问题:

Kafka笔下的消费者

贝浩歌
2023-03-14

我正在与Kafka和阿帕奇·Flink合作。我正在尝试使用apache Flink中的一个kafka主题中的记录(这些记录是avro格式的)。下面是我正在尝试的一段代码。

使用自定义反序列化器来反序列化主题中的avro记录。

我发送到主题“test-topic”的数据的Avro模式如下所示。

{
  "namespace": "com.example.flink.avro",
  "type": "record",
  "name": "UserInfo",
  "fields": [
    {"name": "name", "type": "string"}
  ]
}

我正在使用的自定义反序列化器如下所示。

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

    private static final long serialVersionUID = 1L;

    private final Class<T> avroType;

    private transient DatumReader<T> reader;
    private transient BinaryDecoder decoder;

    public AvroDeserializationSchema(Class<T> avroType) {
        this.avroType = avroType;
    }


    public T deserialize(byte[] message) {
        ensureInitialized();
        try {
            decoder = DecoderFactory.get().binaryDecoder(message, decoder);
            T t = reader.read(null, decoder);
            return t;
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void ensureInitialized() {
        if (reader == null) {
            if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                reader = new SpecificDatumReader<T>(avroType);
            } else {
                reader = new ReflectDatumReader<T>(avroType);
            }
        }
    }


    public boolean isEndOfStream(T nextElement) {
        return false;
    }


    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avroType);
    }
}

我的flink应用程序就是这样写的。

public class FlinkKafkaApp {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers", "localhost:9092");
        kafkaProperties.put("group.id", "test");

        AvroDeserializationSchema<UserInfo> schema = new AvroDeserializationSchema<UserInfo>(UserInfo.class);

        FlinkKafkaConsumer011<UserInfo> consumer = new FlinkKafkaConsumer011<UserInfo>("test-topic", schema, kafkaProperties);

        DataStreamSource<UserInfo> userStream = env.addSource(consumer);

        userStream.map(new MapFunction<UserInfo, UserInfo>() {

            @Override
            public UserInfo map(UserInfo userInfo) {
                return userInfo;
            }
        }).print();

        env.execute("Test Kafka");

    }

我得到的输出是{“name”:“”}

谁能帮我弄清楚这里的问题是什么,以及为什么我没有得到{“name”:“sumit”}作为输出。

共有1个答案

宇文智敏
2023-03-14

Flink文档说:Flink的Kafka消费者称为FlinkKafkaConsumer08(或Kafka0.9.0.x版本的09,等等,或Kafka>=1.0.0版本的FlinkKafkaConsumer)。它提供对一个或多个Kafka主题的访问。

我们不必编写自定义的反序列化程序来使用来自Kafka的Avro消息。

-阅读特定记录:

DataStreamSource<UserInfo> stream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<>("test_topic", AvroDeserializationSchema.forSpecific(UserInfo.class), properties).setStartFromEarliest());
Schema schema = Schema.parse("{"namespace": "com.example.flink.avro","type": "record","name": "UserInfo","fields": [{"name": "name", "type": "string"}]}");
DataStreamSource<GenericRecord> stream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<>("test_topic", AvroDeserializationSchema.forGeneric(schema), properties).setStartFromEarliest());

更多详情:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumer

 类似资料:
  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。