当前位置: 首页 > 知识库问答 >
问题:

无法使用Flink从Kafka检索正确的消息

申屠昌胤
2023-03-14

我是Flink的新手,今天我遇到了一个奇怪的情况。

我运行Kafka服务器,然后使用confluent producer发送消息。使用consumer,我得到了正确的信息,但在应用程序中,我不能。我使用此图像设置message brokerconfluentinc/cp kafka:5.4.1

我用这个向Kafka服务器发送消息

./kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic test \
--property 
value.schema='{"type":"record","namespace":"com.example.kafka","name":"User","fields": 
[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'

我发送的消息是{"name":"Huy","age": 12}

我用这个来听Kafka的留言

./kafka-avro-console-consumer --topic test \
--bootstrap-server localhost:9092

这是我的密码

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
KafkaSource<String> source  = KafkaSource.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("test")
            .setGroupId("console-consumer-29300")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka").print();
env.execute();

当我将kafka源代码更改为KafkaSource时

user class是由avro-maven-plugin从avro模式生成的类,我在命令中将消息发送到kafka。

你们对此有什么想法吗?一个有效的示例代码确实有很大帮助。我找不到任何最新的样品。谢谢你。

共有2个答案

蔚琦
2023-03-14

SimpleStringSchema无法读取Avro,因此您的终端正在输出解码的UTF-8。

AvroDeserializationSchema可能会解析字节,但不知道注册表中的模式是否正确。

ConfluentRegistryAvroDeserializationSchema是您想要的。例如,看看测试代码

唐昊焜
2023-03-14

最后,我需要做的将是像这样使用ConFluentCORstryAvroDeseriazationSchema

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    KafkaSource<User> source  = KafkaSource.<User>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("test")
            .setGroupId("console-consumer-29300")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forSpecific(User.class, "http://localhost:8081"))
            .build();
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka").print();
    env.execute();

我需要指定指向模式注册表的链接以及从avro模式生成的类。如果没有指向模式注册表的url,它将无法工作

谢谢@OneCricketeer的指导。

 类似资料:
  • 这是我的第二个版本,我试图从Firebase检索代码,并用它做一些事情。这是我的第二种方式: 这将崩溃,并出现错误代码: 未能将类型“__NSCFString”(0x10A77F4A0)的值强制转换为“NSDictionary”(0x10A780288)。在“更新”行。这是我的第一次尝试: 打印更多数据: -路径通道引用:可选(https://x.com/channels/-kegkajavh6u

  • 我通过在方法之外添加一个简单的Ride实例来测试它,它工作得很好。 我还尝试将监听器更改为,结果相同。 Edit2:当我试图从数据库中检索整数时,我会得到一条错误消息。

  • 我有一个名为User的实体类,其中包含数据库的OneToMany列: 当我将用户插入数据库时,一切正常,他的汽车也被添加到user_cars表中。当检索汽车时,我得到这个异常: 我已经搜索了其他答案,但没有找到如何解决它。这就是我试图检索用户的方式。 问题是什么?我如何解决?我不明白后台发生了什么。

  • 我使用Symfony HttpClient调用外部api。当stastusCode为200时,我可以使用方法来检索API响应。如果API响应为400,则抛出ClientExc0019,并且无法获取外部API消息。

  • 我已经建立了一个由3个节点组成的AWS集群。我修改了节点的/etc/hosts文件,看起来像这样 当我从其中一个节点运行命令时 bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic first_topic --from-start 它可以工作,但是当我用ip替换主机名并用下面的命令运行它时 bin/kafka-co

  • 我正在使用< code > librdkafka C API consumer(特别是使用< code > rd _ Kafka _ consumer _ poll 来读取,在此之前我确实调用了< code > rd _ Kafka _ poll _ set _ consumer ) 我看到的问题是,在我的谷歌测试中,我做了以下操作 > 给Kafka写3条信息 初始化/启动kafka消费者(< c