当前位置: 首页 > 知识库问答 >
问题:

在Kafka中生产和消费JSON

艾雪风
2023-03-14

我们将在项目中部署Apache Kafka2.10,并通过JSON对象在生产者和消费者之间进行通信。

到目前为止,我想我需要:

  1. 实现自定义序列化程序以将JSON转换为字节数组
  2. 实现自定义反序列化器,将字节数组转换为JSON对象
  3. 生成消息
  4. 读取消费者类中的消息
@Override
public byte[] serialize(String topic, T data) {
    if (data == null)
        return null;
    try {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.writeValueAsBytes(data);
    } catch (Exception e) {
        throw new SerializationException("Error serializing JSON message", e);
    }
}

然而,到目前为止,有问题的2-4分。我在自定义反序列化程序中尝试了这样的操作:

@Override
public JsonNode deserialize(String topic, byte[] bytes) {
    if (bytes == null)
        return null;

    JsonNode data;
    try {
        objectMapper = new ObjectMapper();
        data = objectMapper.readTree(bytes);
    } catch (Exception e) {
        throw new SerializationException(e);
    }
    return data;
}

在我的消费者身上,我试着:

    KafkaConsumer<String, TextNode> consumer = new KafkaConsumer<String, TextNode>(messageConsumer.properties);
    consumer.subscribe(Arrays.asList(messageConsumer.topicName));
    int i = 0;
    while (true) {
        ConsumerRecords<String, TextNode> records = consumer.poll(100);
        for (ConsumerRecord<String, TextNode> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value().asText());
        }
    }

我以为这将生成一个正确的原始json字符串,但我调用record.value().aStext()得到的只是一些散列字符串“IntCIMTLEVWiIDOGXCJ2YWX1ZVWiIH0I”

在Kafka中通过JSON进行通信的任何建议或示例都将非常感谢。

共有1个答案

步致远
2023-03-14

我建议您使用UTF-8编码作为字符串JSON序列化器:
1。Producer以JSON字符串(“{\”key\“:\”value\“}”)
2的形式获取数据。生产者使用UTF-8(jsonString.getBytes(StandardCharSets.utf_8);)
3将JSON字符串序列化为字节。制作人将这些字节发送给Kafka4。消费者从Kafka5读取字节。使用者使用UTF-8将字节反序列化为JSON字符串(新字符串(consumedByteArray,StandardCharsets.utf_8);)
6。使用者对JSON字符串执行任何需要的操作

我故意不使用您的代码,这样流程就可以理解了,我认为您可以很容易地将这个示例应用到您的项目中:)

 类似资料:
  • 在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?

  • 我正在创建一个系统,其中前端服务将消息推送到Kafka请求主题,并为一些下游后端消费者(实际上是一个最终推送回Kafka的复杂系统)监听另一个响应主题,以处理请求消息并最终推进到“回应”话题。 我试图找出最优雅的方法来确保消费者监听适当的分区并收到响应,并且后端推送到前端消费者正在监听的分区。我们总是需要确保响应到达产生初始消息的同一个消费者。 到目前为止,我有两种解决方案,但都不是特别令人满意的

  • 阅读主题中的所有分区: ~bin/kafka-console-consumer.sh--zookeeper localhost:2181--topic myTopic--从头开始 如何使用主题的特定分区?(例如使用分区键13) 以及如何使用特定分区键在分区中生成消息?有可能吗?

  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 我有一个用例,希望在Spring云流应用程序中获得底层的Kafka生产者(KafkaTemplate)。在浏览代码时,我偶然发现了,它有一个方法。然而,它无法自动接线。 此外,如果我直接自动连接,模板将使用默认属性初始化,它将忽略SCSt配置的