当前位置: 首页 > 知识库问答 >
问题:

SpringKafka定制解串器

叶谦
2023-03-14

我按照这个链接中列出的步骤来创建一个客户反序列化器。我从Kafka收到的消息在json字符串前有纯文本“log message -”。我希望反序列化程序忽略这个字符串并解析json数据。有办法做到吗?

应用

@SpringBootApplication
public class TransactionauditServiceApplication {

    public static void main(String[] args) throws InterruptedException {
        new SpringApplicationBuilder(TransactionauditServiceApplication.class).web(false).run(args);
    }

    @Bean
    public MessageListener messageListener() {
        return new MessageListener();
    }

    public static class MessageListener {

        @KafkaListener(topics = "ctp_verbose", containerFactory = "kafkaListenerContainerFactory")
        public void listen(@Payload ConciseMessage message, 
                  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            System.out.println("Received Messasge in group foo: " + message.getStringValue("traceId") + " partion " + partition);
        }
    }
}

消费者配置

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress:localhost:9092}")
    private String bootstrapAddress;

    @Value(value = "${groupId:audit}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, ConciseMessage> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(ConciseMessage.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

共有1个答案

诸腾
2023-03-14

通过编写行new JsonDeserializer

 类似资料:
  • 使用者配置 Kafka听众 生产者配置

  • 我已经创建了一个消费者来接收来自Kafka主题的消息,但是我正在手动解析消息,有没有一种方法可以使用自定义的反序列化器来自动解析这个主题消费者? 预期结果是: 如果出现解析错误,我可以设置回调来处理此错误吗?

  • 我正在使用SpringKafka模板来生成消息。而且它生成消息的速度太慢了。生成 8 条消息大约需要 15000 分钟。 以下是我如何创建Kafka模板: 以下是我如何使用模板发送消息: 生产者属性: 下面是显示调用kakpatemplate send方法需要几毫秒的日志: 关于如何提高发件人性能的任何建议将不胜感激 Spring Kakfa版本:1.2.3 .发布Kafka客户端:0.10.2.

  • 前面编辑器的例子使用CString类的字符串来保存文本行,由于它是MFC类,因此可以串行化自己,将自己写入磁盘或从磁盘文件中读取二进制数据来建立对象。那么,如果不是标准的MFC类,比如用户自己定义的类,如何让它支持串行化呢?下面,我们结合前面第五章提到的就业调查表的例子来演示如何让用户定义的类支持串行化功能。 要让用户定义的类支持串行化,一般分为五步: 1.从CObject或其派生类派生出用户的类

  • 我想在事务中使用SpringKafka,但我真的不明白应该如何配置它以及它是如何工作的。 这是我的配置 此配置用于事务id前缀为的DefaultKafkaProducerFactory: 问题一: 我应该如何选择这个交易ID前缀?如果我理解正确,这个前缀被Spring用来为创建的每个生产者生成一个事务性id。 为什么我们不能只使用"UUID。随机UUID()? 问题二: 如果生产者被销毁,它将生成

  • 我有一个springboot应用程序,它侦听Kafka流并将记录发送到某个服务以进行进一步处理。服务有时可能会失败。注释中提到了异常情况。到目前为止,我自己模拟了服务成功和异常场景。 侦听器代码: 用户工厂配置如下: 由于REST服务正在抛出RestClientException,它应该进入上面提到的if块。关于FixedBackOff,我不希望SeekToCurrentErrorHandler执