我是Flink的新手,今天我遇到了一个奇怪的情况。
我运行Kafka服务器,然后使用confluent producer发送消息。使用consumer,我得到了正确的信息,但在应用程序中,我不能。我使用此图像设置message brokerconfluentinc/cp kafka:5.4.1
我用这个向Kafka服务器发送消息
./kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic test \
--property
value.schema='{"type":"record","namespace":"com.example.kafka","name":"User","fields":
[{"name":"name","type":"string"},{"name":"age","type":"int"}]}'
我发送的消息是{"name":"Huy","age": 12}
我用这个来听Kafka的留言
./kafka-avro-console-consumer --topic test \
--bootstrap-server localhost:9092
这是我的密码
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("test")
.setGroupId("console-consumer-29300")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka").print();
env.execute();
当我将kafka源代码更改为KafkaSource时
user class是由avro-maven-plugin
从avro模式生成的类,我在命令中将消息发送到kafka。
你们对此有什么想法吗?一个有效的示例代码确实有很大帮助。我找不到任何最新的样品。谢谢你。
SimpleStringSchema
无法读取Avro,因此您的终端正在输出解码的UTF-8。
AvroDeserializationSchema
可能会解析字节,但不知道注册表中的模式是否正确。
ConfluentRegistryAvroDeserializationSchema
是您想要的。例如,看看测试代码
最后,我需要做的将是像这样使用ConFluentCORstryAvroDeseriazationSchema
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KafkaSource<User> source = KafkaSource.<User>builder()
.setBootstrapServers("localhost:9092")
.setTopics("test")
.setGroupId("console-consumer-29300")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forSpecific(User.class, "http://localhost:8081"))
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka").print();
env.execute();
我需要指定指向模式注册表的链接以及从avro模式生成的类。如果没有指向模式注册表的url,它将无法工作
谢谢@OneCricketeer的指导。
这是我的第二个版本,我试图从Firebase检索代码,并用它做一些事情。这是我的第二种方式: 这将崩溃,并出现错误代码: 未能将类型“__NSCFString”(0x10A77F4A0)的值强制转换为“NSDictionary”(0x10A780288)。在“更新”行。这是我的第一次尝试: 打印更多数据: -路径通道引用:可选(https://x.com/channels/-kegkajavh6u
我通过在方法之外添加一个简单的Ride实例来测试它,它工作得很好。 我还尝试将监听器更改为,结果相同。 Edit2:当我试图从数据库中检索整数时,我会得到一条错误消息。
我有一个名为User的实体类,其中包含数据库的OneToMany列: 当我将用户插入数据库时,一切正常,他的汽车也被添加到user_cars表中。当检索汽车时,我得到这个异常: 我已经搜索了其他答案,但没有找到如何解决它。这就是我试图检索用户的方式。 问题是什么?我如何解决?我不明白后台发生了什么。
我使用Symfony HttpClient调用外部api。当stastusCode为200时,我可以使用方法来检索API响应。如果API响应为400,则抛出ClientExc0019,并且无法获取外部API消息。
我已经建立了一个由3个节点组成的AWS集群。我修改了节点的/etc/hosts文件,看起来像这样 当我从其中一个节点运行命令时 bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic first_topic --from-start 它可以工作,但是当我用ip替换主机名并用下面的命令运行它时 bin/kafka-co
我指的是Kafka源代码连接器的Flink 1.14版本,代码如下。 我期待以下要求。 在最新的应用程序开始时,必须阅读Kafka主题的最新偏移量 在检查点上,它必须将消耗的偏移量提交给Kafka 重新启动后(当应用程序手动终止/系统错误时),它必须从最后提交的偏移量中选取,并且必须消耗使用者延迟,然后再使用新的事件提要 有了Flink新的KafkaConsumer API(KafkaSource