我需要向Kafka的特定主题发送消息。我使用以下KafkaTemplate来实现这一点:KafkaTemplate
以下参数放在Kafka生产者中:
private ProducerFactory<String, RequestDto> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
创建生产者:
public KafkaTemplate<String, RequestDto> kafkaTemplate() {
KafkaTemplate<String, RequestDto> template = new KafkaTemplate<>(producerConfigs());
template.setMessageConverter(new StringJsonMessageConverter());
return template;
}
当我执行“发送”方法时,我有一条消息发送到Kafka主题,但同时我发送了标头,其中包含请求的DTO文件的路径。
ListenableFuture<SendResult<String, RequestDto>> result = kafkaTemplate.send(topic, requestDto);
偏移资源管理器中的示例标头
由于这个标题,应用程序中出现了一些问题,这是一个消费者,我对此无能为力。有没有办法从查询中删除此标题?
您可以通过添加带有ADD_TYPE_INFO_HEADERS
false的JsonSerializer来创建DefaultKafkaProducerFactory
JsonSerializer jsonSerializer = new JsonSerializer();
jsonSerializer.setAddTypeInfo(false); //
设置为false以禁用添加类型信息标头。
然后使用键和值序列化器创建DefaultKafkaProducerFactory
return new DefaultKafkaProducerFactory<>(props,new StringSerializer(),jsonSerializer);
您还可以通过producer-side-by-Configuration属性禁用此属性
ADD_TYPE_INFO_HEADERS(默认为true):您可以将其设置为false以在JsonSerializer上禁用此功能(设置addTypeInfo属性)。
在消费者方面忽略
USE_TYPE_INFO_HEADERS(默认为true):您可以将其设置为false以忽略序列化程序设置的标头。
我有哪些选项可以将Kafka与Spring靴骆驼连接? 我正在运行ActiveMQ Artemis和Camel,以建立进出客户端的JMS/MQTT和REST路由。我想把Kafka添加到这个二重唱中,以流式传输/交换数据(视频音频、文件/文本)。 到目前为止,我下载了Kafka汇合平台(免费试用),我正在测试他们提供什么。在融合平台中,我看到有可能将连接器作为“插件”添加。我假设我可以添加Camel
使用spring集成Kafka dsl,我想知道为什么监听器不能接收消息?但是同样的应用程序,如果我用KafkaListener注释的方法替换spring integration DSL,就能够很好地使用消息。DSL让我错过了什么? 不消耗的DSL代码:
我有几个Kafka的题目作为测试。现在我想通过清理我的Kafka主题列表来把它们全部除掉。我设置了变量,然后停止并重新启动zookeeper和kafka服务器。但什么也帮不了我。主题仍然存在,“标记为删除”。我读了这个问题,但没有找到任何答案。否则,这里建议手动移除任何主题。但我该怎么做呢?在故事的结尾,手动或通过命令行,我如何永久删除Kafka主题?
我有spring boot 2.0.2中内置的服务。我用的是redis和solrj。 现在如果我想得到redis和solr的指标。它不显示在 是否有任何方法,例如制作自定义endpoint以获取redis和solr指标? 任何帮助都将不胜感激。。
我不知道如何才能得到消息的密钥,这是发送在监听器。