我有一个Kafka的话题,我正在听。然后将消息内容写入websocket通道,在该通道中我有一个订阅了该通道的SockJS客户机。这很管用。然后我创建了一个新的主题,然后添加了第二个KafKalistener。但是,当调用secong侦听器时,我看到它正在尝试处理/读取与第一个KafkaListener和主题相对应的有效负载,由于它没有被配置为这样做,因此会引发一个MessageConversionException错误。
Models
------
@JsonPropertyOrder({ "ackdate", "ack_count" })
public class DailyTransfer {
private String ackdate;
private Long ack_count;
public DailyTransfer() {}
public DailyTransfer(String ackdate, Long ack_count) {
this.ackdate = ackdate;
this.ack_count = ack_count;
}
... Getters and Setters omitted for brevity
@Override
public String toString() {
return "DailyTransfer{" +
"ackdate='" + ackdate + '\'' +
", ack_count=" + ack_count +
'}';
}
}
@JsonPropertyOrder({ "rgdno", "bizname", "tin", "incordate", "commencedate", "biz_pk", "ack_at", "ack_at_ms", "ack_message" })
public class BizAck {
private String rgdno;
private String ack_message;
private String bizname;
private String tin;
private String incordate;
private String commencedate;
private Long biz_pk;
private String ack_at;
private Long ack_at_ms;
public BizAck() {}
public BizAck(String rgdno, String ack_message, String bizname, String tin, String incordate, String commencedate, Long biz_pk, String ack_at,
Long ack_at_ms) {
this.rgdno = rgdno;
this.ack_message = ack_message;
this.bizname = bizname;
this.tin = tin;
this.incordate = incordate;
this.commencedate = commencedate;
this.biz_pk = biz_pk;
this.ack_at = ack_at;
this.ack_at_ms = ack_at_ms;
}
... Getters and Setters omitted for brevity
@Override
public String toString() {
return "BizAck{" +
"rgdno='" + rgdno + '\'' +
", ack_message='" + ack_message + '\'' +
", bizname='" + bizname + '\'' +
", tin='" + tin + '\'' +
", incordate='" + incordate + '\'' +
", commencedate='" + commencedate + '\'' +
", biz_pk=" + biz_pk +
", ack_at='" + ack_at + '\'' +
", ack_at_ms=" + ack_at_ms +
'}';
}
}
Configuration
-------------
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> cprops = new HashMap<>();
cprops.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
cprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
cprops.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id"));
cprops.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
cprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return cprops;
}
@Bean
public ConsumerFactory<String, BizAck> bizAckConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(BizAck.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, BizAck> bizAckKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, BizAck> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(bizAckConsumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String , DailyTransfer> consumerFactoryDailyTransfer(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
config.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("daily.transfer.consumer.group-id"));
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(DailyTransfer.class));
}
@Bean(name="kafkaListenerContainerFactoryDailyTransfer")
public ConcurrentKafkaListenerContainerFactory<String, DailyTransfer> kafkaListenerContainerFactoryDailyTransfer() {
ConcurrentKafkaListenerContainerFactory<String, DailyTransfer> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryDailyTransfer());
return factory;
}
Listeners
---------
// listener to consume BizAck messages
@KafkaListener( topics = "${spring.kafka.json.topic}", containerFactory = "bizAckKafkaListenerContainerFactory",
groupId="${spring.kafka.consumer.group-id}")
public void ssnitAckListener(BizAck bizAck) {
logger.info("Received message='{}' from Kafka Topic", bizAck.toString());
this.simpMessagingTemplate.convertAndSend("/bizTransfers/pushNotification", bizAck);
}
// listener to consume DailyTransfer messages
@KafkaListener( topics="${spring.kafka.json.topic2}", containerFactory="kafkaListenerContainerFactoryDailyTransfer",
groupId="${daily.transfer.consumer.group-id}" )
public void dailyTransferListener(DailyTransfer dailyTransfer) {
logger.info("Received message='{}' from transfer summary count Kafka Topic", dailyTransfer.toString());
this.simpMessagingTemplate.convertAndSend("/summaryCounts/pushNotification", dailyTransfer);
}
第一个监听器--消费古怪消息的监听器--工作得很好。请参阅以下日志
信息9708---[NTainer#1-0-C-1]G.G.G.S.N.Kafka.BizackTopicListener:从Kafka主题接收消息='Bizack{rgdno='CS006192018',ack_message='收到的商业注册:CS006192018',bizname='Dasel工程有限公司‘,tin='C0010143181',incordate='09-jan-2018',communenceDate='09-jan-2018',biz_pk=3667,ack_at='2019-04-23T08:51:02.684z',
但是第二个侦听器(使用DailyTransfer消息的侦听器)会抛出错误。
错误9708---[NTainer#0-0-C-1]O.s.Kafka.Listener.LoggingErrorHandler:处理时出错:ConsumerRecord(主题=DAILY_TRANSFER_COUNTS,partition=3,offset=173,CreateTime=1556009462652,serialized key size=10,serialized value size=51,headers=RecordHeaders(headers=[],isReadOnly=false),key=2019-04-23,value=BizAck{rgdno=“null
为什么第二个监听器会选择奇怪的消息并尝试转换/处理它们?
@Autowired
private KafkaTemplate<String, BizAck> bizAckKafkaTemplate;
public void sendAcknowledgementMessage(String rcvdMessage) {
BizAck bizAck = utils.JsonStr2BizAck(rcvdMessage);
logger.info("Sending acknowledgement message to Kafka for : \n"+ "Biz Regn: "+ bizAck.getRgdno() +", TIN : " + bizAck.getTin()+", Name: " + bizAck.getBizname());
// the KafkaTemplate provides asynchronous send methods returning a Future
ListenableFuture<SendResult<String, BizAck>> future = bizAckKafkaTemplate.send(Objects.requireNonNull(env.getProperty("spring.kafka.json.topic")), bizAck);
// register a callback with the listener to receive the result of the send asynchronously
future.addCallback(new ListenableFutureCallback<SendResult<String, BizAck>>() {
@Override
public void onSuccess(SendResult<String, BizAck> result) {
logger.info("Successfully sent message=[ " + rcvdMessage + " ] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
logger.info("Unable to send message=[ " + rcvdMessage + " ] due to : " + ex.getMessage());
}
});
}
2019-04-23 11:06:02.999信息9708---[enerContainer-1]G.G.G.S.N.Kafka.AcknowdgementSender:正在向Kafka发送接收到的确认消息:Biz regn:CG094562018,TIN:C0910331870,Name:COMMUNITIES for DEVELOPMENT 2019-04-23 11:06:02.999信息9708---[ad producer-1]G.G.G.S.N.Kafka.AcknowdgementSender:成功发送消息=[{“RGDNO”:“CG094562018”,“BizName”DAILY_TRANSFER_COUNTS主题派生自在KSQL中对此主题执行的查询。**
很明显,第二个话题有一个奇怪的...
ConsumerRecord(主题=DAILY_TRANSFER_COUNTS,...value=BizAck{...
所以问题似乎出在发送方。
我有一个websocket服务器和一个websocket客户端,都是Java的。websocket服务器具有以下功能: 在 Java 网页滑板客户端中,我在我的踩踏会话处理程序中提供了以下内容: 然后,我能够通过客户端向服务器路径“hello”发送消息来在两者之间进行通信,然后由于客户端订阅了“topic/greetings”,所以我也要用我的stompFrameHandler来处理响应。 但是我
我们的拓扑使用从kafka主题获取消息。我们有约150个主题,包含约12个分区、8个storm执行器和2个storm节点上的任务。Storm版本1.0.5,Kafka经纪人版本10.0.2,Kafka客户端版本0.9.0.1。我们不会删除Kafka主题。 在某个时刻,我在worker中观察到大量重复的警告消息。日志 2018-05-29 14:36:57.928 o.a.s.k.KafkaUtil
所以我很想知道。这两个有什么不同?2.在哪种场景下选择哪一种?
在log4j2文档log4j2 java配置中,默认配置为: 根本就不在那里。它仍将只打印出错误日志。 问题 是否有其他人遇到此问题? 有人能复制它吗? 有人知道怎么修吗?我几乎要给别人一块钱。不过,我肯定会把他们的答案标成绿色。 我的最终目标是让特定的记录器发送SMTP请求。我很确定我知道怎么做,我只是在和这个小问题作斗争
null null