我是Apache Kafka的新手,能够从发送方发送消息(以JSON格式),但不能在消费者服务中消费。
@Service
public class SenderService {
private static final Logger LOG = LoggerFactory.getLogger(SenderService.class);
@Autowired
private KafkaTemplate<String, IdName> kafkaTemplate;
@Value("${app.topic.email}")
private String topic;
public void send(IdName idName) {
LOG.info("Sending Data='{}' to topic='{}' ", idName, topic);
Message<IdName> message = MessageBuilder.withPayload(idName).setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.MESSAGE_KEY, "TestMessage")
.build();
kafkaTemplate.send(message);
}
}
###Consumer Service
@Service
public class ConsumerService {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerService.class);
@Autowired
MailSender mailSender;
@KafkaListener(topics = "${app.topic.email}")
public void receive(@Payload IdName data,
@Header MessageHeaders headers) throws Exception{
LOG.info("Received data='{}'", data);
}
}
2018-09-21 11:01:41.738 ERROR 63487 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = emailclient, partition = 0, offset = 4, CreateTime = 1537507901660, serialized key size = 11, serialized value size = 122, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 97, 98, 99, 112, 108, 117, 115, 100, 46, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 46, 100, 111, 109, 97, 105, 110, 46, 73, 100, 78, 97, 109, 101])], isReadOnly = false), key = TestMessage, value = com.*****.****.domain.IdName@4a2d3006)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.*****.****.service.ConsumerService.receive(com.*****.****.domain.IdName,org.springframework.messaging.MessageHeaders) throws java.lang.Exception]
Bean [com.*****.****.service.ConsumerService@4649d70a]; nested exception is org.springframework.messaging.MessageHandlingException: Missing header 'headers' for method parameter type [class org.springframework.messaging.MessageHeaders], failedMessage=GenericMessage [payload=com.*****.****.domain.IdName@4a2d3006, headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@53fdf4bb, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=TestMessage, kafka_receivedPartitionId=0, kafka_receivedTopic=emailclient, kafka_receivedTimestamp=1537507901660, __TypeId__=[B@707343c2}]
有人能帮我吗?
如果您想要访问侦听器中的所有头,您使用了错误的注释,它应该是@headers
而不是@headers
。结果代码如下所示:
@Service
public class ConsumerService {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerService.class);
@Autowired
MailSender mailSender;
@KafkaListener(topics = "${app.topic.email}")
public void receive(@Payload IdName data,
@Headers MessageHeaders headers) throws Exception{
LOG.info("Received data='{}'", data);
}
}
当然,您也可以只注入消息键,如下所示:
@Service
public class ConsumerService {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerService.class);
@Autowired
MailSender mailSender;
@KafkaListener(topics = "${app.topic.email}")
public void receive(@Payload IdName data,
@Header(KafkaHeaders.MESSAGE_KEY) String messageKey) throws Exception{
LOG.info("Received data='{}'", data);
}
}
我有课: 配置类:公共类RabbitConfiguration{ 听众: a仅启动应用程序有错误 2017-08-08 12:58:02.128警告5024---[cTaskExecutor-1]S.A.R.L.ConditionalRejectingErrorHandler:Rabbit消息侦听器执行失败。 原因:org.SpringFramework.Messaging.Handler.Ann
我在学Kafka春靴。我想在我的consumer类中添加第二个consumer,它订阅了与第一个主题相同的主题,但具有不同的groupID。这些类不是很复杂,当我只有第一个消费Json的消费者(或者至少输出是Json?)时,它们就可以工作。还有一点需要说明的是,我从一个生产者和消费者开始,他们使用@EnableBindings方法,但这种方法不受欢迎,所以我正在学习正确/新的方法。 任何提示!请让
下面是我对kafka侦听器的方法定义,如果接收空或空的有效负载字符串,我想我会得到下面的错误...你能帮帮我吗。
这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。
我正在开发一个基于通知的应用程序,为此我需要监听传入的通知。我已经能够监听来电、短信、邮件等。我不知道如何通过代码监听Whatsapp上朋友的ping或消息。这实际上能做到吗?如果是,怎么做?可访问性服务是否可以用于此,使用包名为“com.whatsapp”?