我们正在使用与Kafka消费者和生产者Spring。我们正在生成大小为905字节的消息。我们正在序列化消息,并试图为下一个使用者反序列化它。
消息有效负载类示例:
{
"FILE_LIST":[
{
"KEY1": "Large String Value",
"KEY2": "Large String Value",
"Key3": "Large String Value",
"Key4": "Large String Value"
}
]
}
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@NoArgsConstructor
@AllArgsConstructor
public class InnerModel {
@JsonProperty("KEY1")
@Getter
@Setter
private String key1;
@JsonProperty("KEY2")
@Getter
@Setter
private String key2;
@JsonProperty("KEY3")
@Getter
@Setter
private String key3;
@JsonProperty("KEY4")
@Getter
@Setter
private String key4;
}
package com.consumer.model;
public class CustomModel {
public CustomModel(List<InnerModel> filesList) {
super();
this.filesList = filesList;
}
@JsonProperty("FILE_LIST")
@NonNull
@Getter
@Setter
List<InnerModel> filesList;
}
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@KafkaListener(topics = "customtopic", groupId = "customgroup")
public class CustomModelConsumer {
@KafkaHandler(isDefault = true)
private void consumeCustomModel(CustomModel model) {
System.out.println("Model Consumer");
System.out.println(model);
}
}
Application.Properties
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.compression-type=gzip
spring.kafka.consumer.properties.spring.json.trusted.packages=com.consumer.model
当我们接受字符串格式的消息负载时,Consumer工作得很好,但当我们将Consumer中的负载反序列化为对象时,我们面临着问题。下面的错误被抛出相同
正如@Naveen Kumar提到的,我缺少默认或all-args构造函数。更改了如下所示的代码,以使其工作
package com.consumer.model;
@NoArgsConstructor
@AllArgsConstructor
public class CustomModel {
@JsonProperty("FILE_LIST")
@NonNull
@Getter
@Setter
List<InnerModel> filesList;
}
我正在编写一个REST代理,就像合流REST代理一样。它接受JSON负载、模式主题和id,然后将JSON负载作为Avro对象写入流中。当我使用kafka avro控制台消费者阅读消息时,我收到了“未知魔法字节”错误。 这是我的Kafka制作人配置: 这就是REST控制器如何将传入的JSON转换为Avro 这是toAvro方法的实现: 然后将此对象传递给我使用上面给出的属性配置的SchemaVali
因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:
null 产生一些消息: 使用者无法使用消息,如果我给: 当给出服务器而不是时,能够使用消息的使用者:
我已经开发了一个使用apache storm使用kafka消息的应用程序,当我在eclipse中运行topology using in LocalCluster时,它可以正常工作,消息也可以正常使用,但是当我使用storm命令(bin\storm jar..\kafka-storm-0.0.1-SNAPSHOT.jar com.kafka_storm.util.topology storm kaf
我已经建立了一个由3个节点组成的AWS集群。我修改了节点的/etc/hosts文件,看起来像这样 当我从其中一个节点运行命令时 bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic first_topic --from-start 它可以工作,但是当我用ip替换主机名并用下面的命令运行它时 bin/kafka-co