我们将在项目中部署Apache Kafka2.10,并通过JSON对象在生产者和消费者之间进行通信。
到目前为止,我想我需要:
@Override
public byte[] serialize(String topic, T data) {
if (data == null)
return null;
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
然而,到目前为止,有问题的2-4分。我在自定义反序列化程序中尝试了这样的操作:
@Override
public JsonNode deserialize(String topic, byte[] bytes) {
if (bytes == null)
return null;
JsonNode data;
try {
objectMapper = new ObjectMapper();
data = objectMapper.readTree(bytes);
} catch (Exception e) {
throw new SerializationException(e);
}
return data;
}
在我的消费者身上,我试着:
KafkaConsumer<String, TextNode> consumer = new KafkaConsumer<String, TextNode>(messageConsumer.properties);
consumer.subscribe(Arrays.asList(messageConsumer.topicName));
int i = 0;
while (true) {
ConsumerRecords<String, TextNode> records = consumer.poll(100);
for (ConsumerRecord<String, TextNode> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value().asText());
}
}
我以为这将生成一个正确的原始json字符串,但我调用record.value().aStext()
得到的只是一些散列字符串“IntCIMTLEVWiIDOGXCJ2YWX1ZVWiIH0I”
。
在Kafka中通过JSON进行通信的任何建议或示例都将非常感谢。
我建议您使用UTF-8编码作为字符串JSON序列化器:
1。Producer以JSON字符串(“{\”key\“:\”value\“}”
)
2的形式获取数据。生产者使用UTF-8(jsonString.getBytes(StandardCharSets.utf_8);
)
3将JSON字符串序列化为字节。制作人将这些字节发送给Kafka4。消费者从Kafka5读取字节。使用者使用UTF-8将字节反序列化为JSON字符串(新字符串(consumedByteArray,StandardCharsets.utf_8);
)
6。使用者对JSON字符串执行任何需要的操作
我故意不使用您的代码,这样流程就可以理解了,我认为您可以很容易地将这个示例应用到您的项目中:)
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?
我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。 我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。 到目前为止,我有两种解决方案,但都不是特别令人满意的
阅读主题中的所有分区: ~bin/kafka-console-consumer.sh--zookeeper localhost:2181--topic myTopic--从头开始 如何使用主题的特定分区?(例如使用分区键13) 以及如何使用特定分区键在分区中生成消息?有可能吗?
我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断
向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前
我有一个用例,希望在Spring云流应用程序中获得底层的Kafka生产者(KafkaTemplate)。在浏览代码时,我偶然发现了,它有一个方法。然而,它无法自动接线。 此外,如果我直接自动连接,模板将使用默认属性初始化,它将忽略SCSt配置的