使用confluent-oss-5.0.0-2.11,我的Kafka生产者代码是
public class AvroProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
//props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
Schema.Parser parser = new Schema.Parser();
// I will get below schema string from SCHEMA REGISTRY
Schema schema = parser.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"userName\",\"type\":\"string\"},{\"name\":\"uID\",\"type\":\"string\"},{\"name\":\"company\",\"type\":\"string\",\"default\":\"ABC\"},{\"name\":\"age\",\"type\":\"int\",\"default\":0},{\"name\":\"location\",\"type\":\"string\",\"default\":\"Noida\"}]}");
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
GenericRecord record = new GenericData.Record(schema);
record.put("uID", "06080000");
record.put("userName", "User data10");
record.put("company", "User data10");
record.put("age", 12);
record.put("location", "User data10");
ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic, "ip", record);
producer.send(recordData);
System.out.println("Message Sent");
}
}
Kafka消费者代码为:
public class AvroConsumer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
props.put("acks", "all");
props.put("retries", 0);
props.put("group.id", "consumer1");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
while(true){
ConsumerRecords<String, GenericRecord> recs = consumer.poll(10000);
for (ConsumerRecord<String, GenericRecord> rec : recs) {
System.out.printf("{AvroUtilsConsumerUser}: Recieved [key= %s, value= %s]\n", rec.key(), rec.value());
}
}
}
}
我无法看到Kafka消费者端的消息(数据)。此外,我还检查了confluent_new主题的偏移量计数/状态及其未更新。好像生产者代码出了点问题。任何指针都会有帮助。
同时,下面的生产者代码正在工作,这里的POJO,即用户是avro-tools生成的POJO。
public class AvroProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
kafkaParams.put("auto.offset.reset", "smallest");
kafkaParams.put("ZOOKEEPER_HOST", "bihdp01");*/
props.put("bootstrap.servers", "localhost:9092");
props.put("ZOOKEEPER_HOST", "localhost");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
String topic = "confluent-new";
Producer<String, User> producer = new KafkaProducer<String, User>(props);
User user = new User();
user.setUID("0908");
user.setUserName("User data10");
user.setCompany("HCL");
user.setAge(20);
user.setLocation("Noida");
ProducerRecord<String, User> record = new ProducerRecord<String, User>(topic, (String) user.getUID(), user);
producer.send(record).get();
System.out.println("Sent");
}
}
附注。我的要求是将接收到的JSON数据以AVRO格式从源KAFKA主题发送到目的地KAFKA主题。首先,我使用AVRO4S从接收到的JSON数据推断出AVRO模式,并将该模式注册到html" target="_blank">模式注册中心。接下来是从接收到的JSON中提取数据,填充GenericRecord实例,并使用KafKaavroSerializer将此GenericRecord实例发送到Kafka主题。在消费者端,我将使用KafkaAvroDeserializer来反序列化接收到的AVRO数据。
请尝试在第一个生成器中添加get()
producer.send(recordData).get();
我对Kafka是完全陌生的,我在使用Kafka制作人时遇到了一些麻烦。生成器的send方法恰好阻塞1min,然后应用程序无一例外地继续。这显然是一些超时,但没有抛出异常。 我在原木上也看不出什么真正的东西。 kafka.properties文件中的属性: 因此,producer.send阻塞1分钟,然后继续。在结尾,Kafka没有储存任何东西,但新的话题被创造出来了。谢谢你的帮助!
有没有办法将 GenericRecord(我刚刚从 Kafka 消息中得到的)反序列化为嵌套 POJO?我实际上正在将其反序列化为 Scala 的案例类,但我意识到这更难。我通过互联网搜索,似乎每个人都在手动进行。您知道任何能够做到这一点的库吗?
问题内容: 给定GenericRecord,与对象相比,推荐的检索类型化值的方法是什么?我们是否应该强制转换值,如果是这样,从Avro类型到Java类型的映射是什么?例如,Avro Array == Java Collection ; 和Avro String == Java Utf8 。 由于每个GenericRecord都包含其架构,因此我希望找到一种类型安全的方法来检索值。 问题答案: 阿夫
给定GenericRecord,与对象相比,检索类型化值的推荐方法是什么?我们需要强制转换值吗?如果需要,从Avro类型到Java类型的映射是什么?例如,Avro数组==Java集合;和Avro String==Java UTF8。 由于每个GenericRecord都包含它的模式,所以我希望有一种类型安全的方法来检索值。
我正在尽可能地简化我的消费者。问题是,当我看到Kafka监听器中的记录时: <代码>列表 我在使用时注意到: SpringKafka。消费者值反序列化器=io。汇合的。Kafka。序列化程序。KafkaavroderializerSpring。Kafka。消费者键反序列化器=组织。阿帕奇。Kafka。常见的序列化。字符串反序列化器 我得到以下错误: 2020-12-02 17:04:42.745调
我正在向我的远程服务器发送以将其存储在文件系统中。为此,我使用Java协议。为了避免网络带宽和TCP输入/输出缓冲内存,我以压缩格式发送数据。但是,我无法解压缩从客户端收到的数据。我得到了ZLIB输入流的异常。这是因为服务器正在接收。 Java代码 客户 服务器 有没有办法在Socket中以GZIP压缩格式发送数据?