我们正在考虑在我们的for消息传递中使用Kafka,我们的应用程序是使用Spring开发的。所以,我们已经计划用Spring-Kafka。
生产者将消息作为HashMap对象放入队列。我们有JSON序列化器,并且假设映射将被序列化并放入队列。这是生产者配置。
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring:
kafka:
consumer:
group-id: xyz
key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
public void listener(SomeClass abx)
我们看到的文章很少,建议是这样做:
@Bean
public ConsumerFactory<String, Car> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Car.class));
}
我们不想为创建反序列化程序编写一些代码。有没有我们缺少的样板?任何帮助都将不胜感激!!
请参阅引导文档。特别是:
您还可以按照如下方式配置Spring Kafka JsonDeserializer:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JSONDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.invoide
spring.kafka.consumer.properties.spring.json.trusted.packages=com。示例,org.acme
我创建了一个简单的生产者-消费者应用程序,使用自定义序列化器和反序列化器。 在我生成的Message类中添加了一个新方法之后,使用者开始在反序列化时被堆栈。我的生产者使用的是新类(有新方法),消费者使用的是旧类(没有方法)。 JSONSerializer/Deserializer可以处理这些类型的修复吗?如果我要使用JSONSerialzier,它应该只关心类的模式,对吗?
我目前正在与Kafka和Flink合作,我有kafka在我的本地PC上运行,我创建了一个正在消费的主题。 桌面\kafka\bin\windows 有没有办法进一步了解这条消息的细节?比如说时间?钥匙我查看了Kafka的文档,但没有找到关于这个主题的内容
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
我试图使用flink从kafka中读取数据,执行一些函数,并将结果返回到不同的kafka主题,但出现以下错误`组织。阿帕奇。Flink。应用程序编程接口。常见的InvalidProgrameException:MapFunction的实现不可序列化。对象可能包含或引用不可序列化的字段。 我收到了来自kafka的消息-对其进行了一些操作,并返回了一个对象列表,我想发送到不同的主题。 内部类也实现了可