@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
partitions = "#{@finder.partitions('compacted')}"),
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
// process record
}
由于我有几个主题,结果代码非常冗长:
@KafkaListener(topicPartitions = {
@TopicPartition(
topic = "${topic1}",
partitions = "#{@finder.partitions('${topic1}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
),
@TopicPartition(
topic = "${topic2}",
partitions = "#{@finder.partitions('${topic2}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
),
// and many more @TopicPartitions...
@TopicPartition(
topic = "${topicN}",
partitions = "#{@finder.partitions('${topicN}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
)
})
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
// process record
}
如何通过配置@kafkalistener
注释的topicpartitions
属性,使这种重复配置更加简洁?
@kafkalistener
目前不可能-请在GitHub上打开一个新的特性问题。
我能想到的唯一工作是从容器工厂以编程方式创建一个侦听器容器,并创建一个侦听器适配器。如果你需要,我可以提供一个例子。
编辑
@SpringBootApplication
public class So64022266Application {
public static void main(String[] args) {
SpringApplication.run(So64022266Application.class, args);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so64022266-1").partitions(10).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so64022266-2").partitions(10).replicas(1).build();
}
@Bean
ConcurrentMessageListenerContainer<String, String> container(@Value("${topics}") String[] topics,
PartitionFinder finder,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
MyListener listener) throws Exception {
MethodKafkaListenerEndpoint<String, String> endpoint = endpoint(topics, finder, listener);
ConcurrentMessageListenerContainer<String, String> container = factory.createListenerContainer(endpoint);
container.getContainerProperties().setGroupId("someGroup");
return container;
}
@Bean
MethodKafkaListenerEndpoint<String, String> endpoint(String[] topics, PartitionFinder finder,
MyListener listener) throws NoSuchMethodException {
MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setBean(listener);
endpoint.setMethod(MyListener.class.getDeclaredMethod("listen", String.class, String.class));
endpoint.setTopicPartitions(Arrays.stream(topics)
.flatMap(topic -> finder.partitions(topic))
.toArray(TopicPartitionOffset[]::new));
endpoint.setMessageHandlerMethodFactory(methodFactory());
return endpoint;
}
@Bean
DefaultMessageHandlerMethodFactory methodFactory() {
return new DefaultMessageHandlerMethodFactory();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
ConcurrentMessageListenerContainer<String, String> container) {
return args -> {
System.out.println(container.getAssignedPartitions());
template.send("so64022266-1", "key1", "foo");
template.send("so64022266-2", "key2", "bar");
};
}
}
@Component
class MyListener {
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
System.out.println(key + ":" + payload);
}
}
@Component
class PartitionFinder {
private final ConsumerFactory<String, String> consumerFactory;
public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}
public Stream<TopicPartitionOffset> partitions(String topic) {
System.out.println("+" + topic + "+");
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic).stream()
.map(part -> new TopicPartitionOffset(topic, part.partition(), 0L));
}
}
}
topics=so64022266-1, so64022266-2
我有@KafkaListener使用topicPattern与正则表达式,工作正常(foo。*),但现在我想将侦听器分配给所有匹配主题的所有分区。 https://docs.spring.io/spring-kafka/docs/2.6.1/reference/html/#tip-assign-all-parts并没有真正帮助我,因为我不知道主题名称。
问题内容: 我的故事: 我想做的事情像将行记录到文件的最简单的log4j记录器一样简单。我发现了几个具有某些功能的示例,但没有一个真正有效的基本通用示例,而没有一个解释每一行如何工作的示例。 题: 有人可以提供吗? 先决条件: 我已经知道将文件放在哪里,并且已经配置了log4j并可以用于控制台日志记录。 现在,我想登录到文件,并在程序运行后从文件系统中查找文件。 需要添加到现有文件的行是所需的输出
我有一个Kafka主题,通过设置“清理”启用了压缩。政策=紧凑。我的部分。字节属性设置为稍大一点的值(100Mb),这样我的代理运行良好。 如果我有一个Kafka流应用程序,它使用的主题是GlobalKTable,并且在主题中的单个分区键有多个记录,该应用程序将在GlobalKTable中只接收1个记录,还是在压缩开始之前有两个记录?
问题内容: 我有一个ZIP文件目录(在Windows计算机上创建)。我可以使用手动解压缩它们,但是如何通过外壳解压缩当前文件夹中的所有ZIP文件呢? 使用Ubuntu Linux服务器。 问题答案: 根据以下链接,这可以在bash中运行: 解压缩\ *。zip
我在《掌握Kafka Streams and ksqlDB》一书中遇到了以下两个短语,作者使用了两个术语,它们的真正含义是“压缩主题”和“未压缩主题” 他们对“日志压缩”有什么看法吗? 表可以被认为是对数据库的更新。在日志的这种视图中,只保留每个键的当前状态(给定键的最新记录或某种聚合)。表通常是从压缩的主题构建的。 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从
我正在使用kafka:Kafka2.12-2.1.0,在客户端使用spring kafka,但遇到了一个问题。 我需要通过阅读Kafka主题中的所有现有消息来加载内存中的映射。为此,我启动了一个新的使用者(具有唯一的使用者组id,并将偏移量设置为)。然后我迭代使用者(poll方法)以获取所有消息,并在使用者记录变为空时停止。 我尝试了很少的其他方法(比如使用偏移量数),但还没有找到任何解决方法,除