@KafkaListener(groupId = Bindings.CONSUMER_GROUP_DATA_CLEANUP, topics = "users")
public void process( @Payload MyMessage myMessage){
MyExtended myExtendedKey= new MyExtendedKey(myKey.getX(), myKey.getY());
....
....
kafkaTemplate.send(TOPIC, myExtendedKey, message);
}
我不知道如何才能得到消息的密钥,这是发送在监听器。
请阅读文档。
...
最后,关于消息的元数据可以从消息头获得。您可以使用以下标头名称检索邮件的标头:
KafKaheaders.timestamp_type
下面的示例显示如何使用标头:
java prettyprint-override">@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
偏移也是可用的。
在kafka收听消息之前/之后是否有任何类型的钩子可用? 用例:必须为设置MDC关联id以执行日志跟踪 我在找什么?前/后回调方法,以便可以在进入时设置MDC关联id,并最终在退出时清除MDC。 编辑的场景:我正在获取作为Kafka Headers一部分的相关ID,我想在Kafka Listener中收到消息后立即在MDC中设置相同的ID 感谢您的帮助
假设我有50个Kafka Topics,每个有3个分区,总共有150个分区。如果我为这150个分区中的每一个配置了一个KafkaListener/消费者(由于每个分区的容量很大),这意味着我有150个侦听器在运行。据我所知,每个侦听器都有自己的线程。那么这是否意味着在这种情况下会有150个活动线程?这似乎很多。有什么方法可以将其限制为一次最多线程数(比如20个)?
我明白,要使一个方法成为Kafka消息侦听器的目标,我必须用@KafkaListener注释标记这个方法。此注释允许通过containerFactory元素指定KafkaListenerContainerFactory。 下面是巴尔东SpringKafka教程的一些片段。 Kafka消费者配置。JAVA MessageListener。JAVA 我不明白的是为什么我们需要一个监听器容器工厂。什么是
使用spring集成Kafka dsl,我想知道为什么监听器不能接收消息?但是同样的应用程序,如果我用KafkaListener注释的方法替换spring integration DSL,就能够很好地使用消息。DSL让我错过了什么? 不消耗的DSL代码:
我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连
我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka