当前位置: 首页 > 知识库问答 >
问题:

如何移除Kafka(Spring靴)中的标题?

谭池暝
2023-03-14

我需要向Kafka的特定主题发送消息。我使用以下KafkaTemplate来实现这一点:KafkaTemplate

以下参数放在Kafka生产者中:

private ProducerFactory<String, RequestDto> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(props);
}

创建生产者:

    public KafkaTemplate<String, RequestDto> kafkaTemplate() {
        KafkaTemplate<String, RequestDto> template = new KafkaTemplate<>(producerConfigs());
        template.setMessageConverter(new StringJsonMessageConverter());
        return template;
    }

当我执行“发送”方法时,我有一条消息发送到Kafka主题,但同时我发送了标头,其中包含请求的DTO文件的路径。

ListenableFuture<SendResult<String, RequestDto>> result = kafkaTemplate.send(topic, requestDto);

偏移资源管理器中的示例标头

由于这个标题,应用程序中出现了一些问题,这是一个消费者,我对此无能为力。有没有办法从查询中删除此标题?

共有1个答案

宋昕
2023-03-14

您可以通过添加带有ADD_TYPE_INFO_HEADERSfalse的JsonSerializer来创建DefaultKafkaProducerFactory

JsonSerializer jsonSerializer = new JsonSerializer();
jsonSerializer.setAddTypeInfo​(false);  //  

设置为false以禁用添加类型信息标头。

然后使用键和值序列化器创建DefaultKafkaProducerFactory

return new DefaultKafkaProducerFactory<>(props,new StringSerializer(),jsonSerializer);

您还可以通过producer-side-by-Configuration属性禁用此属性

ADD_TYPE_INFO_HEADERS(默认为true):您可以将其设置为false以在JsonSerializer上禁用此功能(设置addTypeInfo属性)。

在消费者方面忽略

USE_TYPE_INFO_HEADERS(默认为true):您可以将其设置为false以忽略序列化程序设置的标头。

 类似资料:
  • 我有哪些选项可以将Kafka与Spring靴骆驼连接? 我正在运行ActiveMQ Artemis和Camel,以建立进出客户端的JMS/MQTT和REST路由。我想把Kafka添加到这个二重唱中,以流式传输/交换数据(视频音频、文件/文本)。 到目前为止,我下载了Kafka汇合平台(免费试用),我正在测试他们提供什么。在融合平台中,我看到有可能将连接器作为“插件”添加。我假设我可以添加Camel

  • 我有几个Kafka的题目作为测试。现在我想通过清理我的Kafka主题列表来把它们全部除掉。我设置了变量,然后停止并重新启动zookeeper和kafka服务器。但什么也帮不了我。主题仍然存在,“标记为删除”。我读了这个问题,但没有找到任何答案。否则,这里建议手动移除任何主题。但我该怎么做呢?在故事的结尾,手动或通过命令行,我如何永久删除Kafka主题?

  • 使用spring集成Kafka dsl,我想知道为什么监听器不能接收消息?但是同样的应用程序,如果我用KafkaListener注释的方法替换spring integration DSL,就能够很好地使用消息。DSL让我错过了什么? 不消耗的DSL代码:

  • 我有spring boot 2.0.2中内置的服务。我用的是redis和solrj。 现在如果我想得到redis和solr的指标。它不显示在 是否有任何方法,例如制作自定义endpoint以获取redis和solr指标? 任何帮助都将不胜感激。。

  • 我不知道如何才能得到消息的密钥,这是发送在监听器。