当前位置: 首页 > 知识库问答 >
问题:

如何使用不同的KafKalistener阅读不同的主题

益思博
2023-03-14

我有一个Kafka的话题,我正在听。然后将消息内容写入websocket通道,在该通道中我有一个订阅了该通道的SockJS客户机。这很管用。然后我创建了一个新的主题,然后添加了第二个KafKalistener。但是,当调用secong侦听器时,我看到它正在尝试处理/读取与第一个KafkaListener和主题相对应的有效负载,由于它没有被配置为这样做,因此会引发一个MessageConversionException错误。

Models
------
@JsonPropertyOrder({ "ackdate", "ack_count" })
public class DailyTransfer {
    private String ackdate;
    private Long ack_count;

    public DailyTransfer() {}

    public DailyTransfer(String ackdate, Long ack_count) {
        this.ackdate = ackdate;
        this.ack_count = ack_count;
    }

    ... Getters and Setters omitted for brevity

    @Override
    public String toString() {
        return "DailyTransfer{" +
                "ackdate='" + ackdate + '\'' +
                ", ack_count=" + ack_count +
                '}';
    }
}

@JsonPropertyOrder({ "rgdno", "bizname", "tin", "incordate", "commencedate", "biz_pk", "ack_at", "ack_at_ms", "ack_message" })
public class BizAck {

    private String rgdno;
    private String ack_message;
    private String bizname;
    private String tin;
    private String incordate;
    private String commencedate;
    private Long biz_pk;
    private String ack_at;
    private Long ack_at_ms;

    public BizAck() {}

    public BizAck(String rgdno, String ack_message, String bizname, String tin, String incordate, String commencedate, Long biz_pk, String ack_at,
                    Long ack_at_ms) {
        this.rgdno = rgdno;
        this.ack_message = ack_message;
        this.bizname = bizname;
        this.tin = tin;
        this.incordate = incordate;
        this.commencedate = commencedate;
        this.biz_pk = biz_pk;
        this.ack_at = ack_at;
        this.ack_at_ms = ack_at_ms;
    }

    ... Getters and Setters omitted for brevity

    @Override
    public String toString() {
        return "BizAck{" +
                "rgdno='" + rgdno + '\'' +
                ", ack_message='" + ack_message + '\'' +
                ", bizname='" + bizname + '\'' +
                ", tin='" + tin + '\'' +
                ", incordate='" + incordate + '\'' +
                ", commencedate='" + commencedate + '\'' +
                ", biz_pk=" + biz_pk +
                ", ack_at='" + ack_at + '\'' +
                ", ack_at_ms=" + ack_at_ms +
                '}';
    }
}

Configuration
-------------
@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> cprops = new HashMap<>();
        cprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
        cprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        cprops.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id"));
        cprops.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        cprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return cprops;
    }

@Bean
    public ConsumerFactory<String, BizAck> bizAckConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(BizAck.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, BizAck> bizAckKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, BizAck> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(bizAckConsumerFactory());
        return factory;
    }

@Bean
    public ConsumerFactory<String , DailyTransfer> consumerFactoryDailyTransfer(){
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
        config.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("daily.transfer.consumer.group-id"));
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
     config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                new JsonDeserializer<>(DailyTransfer.class));
    }

@Bean(name="kafkaListenerContainerFactoryDailyTransfer")
    public ConcurrentKafkaListenerContainerFactory<String, DailyTransfer> kafkaListenerContainerFactoryDailyTransfer() {
        ConcurrentKafkaListenerContainerFactory<String, DailyTransfer> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryDailyTransfer());
        return factory;
    }

Listeners
---------
// listener to consume BizAck messages
    @KafkaListener( topics = "${spring.kafka.json.topic}", containerFactory = "bizAckKafkaListenerContainerFactory",
            groupId="${spring.kafka.consumer.group-id}")
    public void ssnitAckListener(BizAck bizAck) {
        logger.info("Received message='{}' from Kafka Topic", bizAck.toString());
        this.simpMessagingTemplate.convertAndSend("/bizTransfers/pushNotification", bizAck);
    }

// listener to consume DailyTransfer messages
@KafkaListener( topics="${spring.kafka.json.topic2}", containerFactory="kafkaListenerContainerFactoryDailyTransfer",
            groupId="${daily.transfer.consumer.group-id}" )
    public void dailyTransferListener(DailyTransfer dailyTransfer) {
        logger.info("Received message='{}' from transfer summary count Kafka Topic", dailyTransfer.toString());
        this.simpMessagingTemplate.convertAndSend("/summaryCounts/pushNotification", dailyTransfer);
    }

第一个监听器--消费古怪消息的监听器--工作得很好。请参阅以下日志

信息9708---[NTainer#1-0-C-1]G.G.G.S.N.Kafka.BizackTopicListener:从Kafka主题接收消息='Bizack{rgdno='CS006192018',ack_message='收到的商业注册:CS006192018',bizname='Dasel工程有限公司‘,tin='C0010143181',incordate='09-jan-2018',communenceDate='09-jan-2018',biz_pk=3667,ack_at='2019-04-23T08:51:02.684z',

但是第二个侦听器(使用DailyTransfer消息的侦听器)会抛出错误。

错误9708---[NTainer#0-0-C-1]O.s.Kafka.Listener.LoggingErrorHandler:处理时出错:ConsumerRecord(主题=DAILY_TRANSFER_COUNTS,partition=3,offset=173,CreateTime=1556009462652,serialized key size=10,serialized value size=51,headers=RecordHeaders(headers=[],isReadOnly=false),key=2019-04-23,value=BizAck{rgdno=“null

为什么第二个监听器会选择奇怪的消息并尝试转换/处理它们?

@Autowired
    private KafkaTemplate<String, BizAck> bizAckKafkaTemplate;
    public void sendAcknowledgementMessage(String rcvdMessage) {
        BizAck bizAck = utils.JsonStr2BizAck(rcvdMessage);
        logger.info("Sending acknowledgement message to Kafka for : \n"+ "Biz Regn: "+ bizAck.getRgdno() +", TIN : " + bizAck.getTin()+", Name: " + bizAck.getBizname());
        // the KafkaTemplate provides asynchronous send methods returning a Future
        ListenableFuture<SendResult<String, BizAck>> future = bizAckKafkaTemplate.send(Objects.requireNonNull(env.getProperty("spring.kafka.json.topic")), bizAck);
        // register a callback with the listener to receive the result of the send asynchronously
        future.addCallback(new ListenableFutureCallback<SendResult<String, BizAck>>() {
            @Override
            public void onSuccess(SendResult<String, BizAck> result) {
                logger.info("Successfully sent message=[ " + rcvdMessage + " ] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                logger.info("Unable to send message=[ " + rcvdMessage + " ] due to : " + ex.getMessage());
            }
        });
    }

2019-04-23 11:06:02.999信息9708---[enerContainer-1]G.G.G.S.N.Kafka.AcknowdgementSender:正在向Kafka发送接收到的确认消息:Biz regn:CG094562018,TIN:C0910331870,Name:COMMUNITIES for DEVELOPMENT 2019-04-23 11:06:02.999信息9708---[ad producer-1]G.G.G.S.N.Kafka.AcknowdgementSender:成功发送消息=[{“RGDNO”:“CG094562018”,“BizName”DAILY_TRANSFER_COUNTS主题派生自在KSQL中对此主题执行的查询。**

共有1个答案

宋翔
2023-03-14

很明显,第二个话题有一个奇怪的...

ConsumerRecord(主题=DAILY_TRANSFER_COUNTS,...value=BizAck{...

所以问题似乎出在发送方。

 类似资料:
  • 我有一个websocket服务器和一个websocket客户端,都是Java的。websocket服务器具有以下功能: 在 Java 网页滑板客户端中,我在我的踩踏会话处理程序中提供了以下内容: 然后,我能够通过客户端向服务器路径“hello”发送消息来在两者之间进行通信,然后由于客户端订阅了“topic/greetings”,所以我也要用我的stompFrameHandler来处理响应。 但是我

  • 我们的拓扑使用从kafka主题获取消息。我们有约150个主题,包含约12个分区、8个storm执行器和2个storm节点上的任务。Storm版本1.0.5,Kafka经纪人版本10.0.2,Kafka客户端版本0.9.0.1。我们不会删除Kafka主题。 在某个时刻,我在worker中观察到大量重复的警告消息。日志 2018-05-29 14:36:57.928 o.a.s.k.KafkaUtil

  • 所以我很想知道。这两个有什么不同?2.在哪种场景下选择哪一种?

  • 在log4j2文档log4j2 java配置中,默认配置为: 根本就不在那里。它仍将只打印出错误日志。 问题 是否有其他人遇到此问题? 有人能复制它吗? 有人知道怎么修吗?我几乎要给别人一块钱。不过,我肯定会把他们的答案标成绿色。 我的最终目标是让特定的记录器发送SMTP请求。我很确定我知道怎么做,我只是在和这个小问题作斗争

  • 对于登录页面自动化,用户需要输入用户id、安全答案和密码。不同的环境有不同的用户。例如,qa和dev环境的用户具有不同的用户id、安全答案和密码。那么在自动化测试中如何妥善处理用户信息呢?我们可以将所有的属性存储在属性文件中,还有其他更好的解决方案吗?