以下是我的情况:
我们有一个Spring cloud Stream 3 Kafka服务连接到同一个代理中的多个主题,但我想基于属性控制连接到特定主题。
每个主题都有自己的活页夹和绑定,但代理对所有人来说都是一样的。
我尝试使用下面的属性禁用绑定(这是我到目前为止找到的唯一解决方案),这适用于StreamListener不接收消息,但与主题的连接和重新平衡仍在发生。
spring:
cloud:
stream:
bindings:
...
anotherBinding:
consumer:
...
autostartup: false
我想知道活页夹级别是否有任何设置阻止它启动。其中一个主题消费者应该只在其中一个环境中可用。
谢谢
通过将AutoStartup
设置为false
来禁用绑定应该可以工作,我不确定问题是什么。
看起来您使用的是新功能模型,而是 StreamListener
。如果您使用的是功能模型,则可以尝试以下另一件事。可以通过在运行时不包含相应的函数来禁用绑定。例如,假设您有以下两个使用者。
@Bean
public Consumer<String> one() {}
@Bean
public Consumer<String> two() {}
运行此应用程序时,您可以提供属性<code>spring.cloud.function。definition</code>以包含/排除函数。例如,当您使用spring.cloud.function运行它时。definition=一个
,那么消费者两个
将根本不会被激活。使用spring.cloud.function运行时。definition=two
,则消费者one
将不会被激活。
上述方法的缺点是,如果您决定在应用程序启动后启动另一个函数(在另一个功能上,给定<code>autoStartup</code>为<code>false</code〕),则它将无法工作,因为它不是通过<code>spring.cloud.function.definition</code<的原始绑定的一部分。但是,根据您的需求,这可能不是一个问题,因为您知道相应主题的目标环境。换句话说,如果您知道消费者<code>one</code>需要始终从主题<code>one
本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费
问题内容: 通过Rabbitmq中的示例,消费者可以一次从队列中获取所有消息。如何使用一条消息并退出? 问题答案: 您必须声明basicQos设置,才能一次从ACK到NACK状态获取一条消息,并禁用自动ACK以便显式给出确认。 希望能帮助到你!
我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。 我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为,希望消费者从头开始阅读。但它没有。 应用程序. yml 在我的Java课上 我尝试了一些东西。 我将值设置为。不出所料,它抛出了一个异常,抱怨
我正在构建一个使用来自Kafka主题的消息并执行数据库更新任务的Kafka消费者应用程序。消息是每天一次大批量生产的--所以该主题在10分钟内加载了大约100万条消息。主题有8个分区。 Spring Kafka消费者(使用@KafKalistener注释并使用ConcurrentKafkaListenerContainerFactory)在非常短的批处理中被触发。 批处理大小有时仅为1或2条消息。
我在Kafka做数据复制。但是,kafka日志文件的大小增长很快。一天内大小达到5 gb。作为这个问题解决方案,我想立即删除处理过的数据。我正在使用AdminClient中的delete record方法删除偏移量。但当我查看日志文件时,对应于该偏移量的数据不会被删除。 我不想要类似(log.retention.hours,log.retention.bytes,log.segment.bytes
谁能请解释和指导我链接或资源阅读关于Kafka消费者如何在下面的场景下工作。 > 一个有5个消费者的消费者组和3个分区的主题(Kafka是如何决定的) 一个消费者组有5个消费者,主题有10个分区(kafka如何分担负载) 两个消费者组和两个服务器的kafka集群,其中一个主题被划分在节点1和节点2之间,当来自不同组的消费者订阅到一个分区时,如何避免重复。 上面可能不是配置kafka时的最佳实践,但