我有一个带有Kafka使用者的spring应用程序,它使用@KafKalisterner注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的情况。以编程方式实现这一目标的最佳方法是什么?我们不控制Kafka主题配置。
@KafkaListener(...)
public void listen(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
if (this.resetNeeded) {
consumer.seekToBeginning(consumer.assignment());
this.resetNeeded = false;
}
}
如果要在侦听器空闲(没有记录)时重置,可以启用空闲事件,并通过在ApplicationListener
或@eventListener
方法中侦听ListenerContainerIdleEvent
来执行查找。
该事件具有对使用者的引用。
编辑
@SpringBootApplication
public class So58769796Application {
public static void main(String[] args) {
SpringApplication.run(So58769796Application.class, args);
}
@KafkaListener(id = "so58769796", topics = "so58769796")
public void listen1(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("One:" + key + ":" + value);
}
@KafkaListener(id = "so58769796a", topics = "so58769796")
public void listen2(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("Two:" + key + ":" + value);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so58769796")
.compact()
.partitions(1)
.replicas(1)
.build();
}
boolean reset;
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so58769796", "foo", "bar");
System.out.println("Hit enter to rewind");
System.in.read();
this.reset = true;
};
}
@EventListener
public void listen(ListenerContainerIdleEvent event) {
System.out.println(event);
if (this.reset && event.getListenerId().startsWith("so58769796-")) {
event.getConsumer().seekToBeginning(event.getConsumer().assignment());
}
}
}
spring.kafka.listener.idle-event-interval=5000
这里还有另一个技巧--在这种情况下,我们每次启动应用程序时都要倒带(并按需)...
@SpringBootApplication
public class So58769796Application implements ConsumerSeekAware {
public static void main(String[] args) {
SpringApplication.run(So58769796Application.class, args);
}
@KafkaListener(id = "so58769796", topics = "so58769796")
public void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println(key + ":" + value);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so58769796")
.compact()
.partitions(1)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
KafkaListenerEndpointRegistry registry) {
return args -> {
template.send("so58769796", "foo", "bar");
System.out.println("Hit enter to rewind");
System.in.read();
registry.getListenerContainer("so58769796").stop();
registry.getListenerContainer("so58769796").start();
};
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
我在《掌握Kafka Streams and ksqlDB》一书中遇到了以下两个短语,作者使用了两个术语,它们的真正含义是“压缩主题”和“未压缩主题” 他们对“日志压缩”有什么看法吗? 表可以被认为是对数据库的更新。在日志的这种视图中,只保留每个键的当前状态(给定键的最新记录或某种聚合)。表通常是从压缩的主题构建的。 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从
我在一个输入主题上构建KTable,并且在两个Kafka Stream应用程序实例上加入KStream。 KTable的输入主题已经是一个日志压缩主题。因此,当我的一个应用程序实例关闭时,通过读取input log compacted主题,另一个实例状态存储似乎会用整个状态刷新。 所以不需要为我的KTable存储启用日志记录(更改日志)? 我的源输入日志压缩主题可能有数百万条记录,所以如果我在KT
一个与主题压缩有关的问题。在压缩主题中,当日志清理器在清理特定键的以前偏移量(3,4,5)时出现延迟(假设5是最新的偏移量),而作为使用者使用这些偏移量时,即使3和4还没有压缩,我会只看到该键的最新偏移量(5)吗?还是使用者将按照该顺序获得(3,4,5)?
我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。
我收到了一个数据库更改流,这些更改最终形成了一个压缩的主题。流基本上是键/值对,并且键空间很大(~4 GB)。 这个主题由一个kafka流进程使用,该进程将数据存储在RockDB中(每个消费者/碎片单独使用)。处理器做两件不同的事情: 将数据连接到另一个流中。 检查来自主题的邮件是新密钥还是对现有密钥的更新。如果是更新,则将旧的键/值和新的键/值对发送到不同的主题(更新很少)。 null
2016-07-05 03:59:25.042 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Tick,ID:{},[30] 2016-07-05 03:59:25.946 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Metrics_Tick,ID:{},[60] 我的测试