以下是我的情况: 我们有一个Spring cloud Stream 3 Kafka服务连接到同一个代理中的多个主题,但我想基于属性控制连接到特定主题。 每个主题都有自己的活页夹和绑定,但代理对所有人来说都是一样的。 我尝试使用下面的属性禁用绑定(这是我到目前为止找到的唯一解决方案),这适用于StreamListener不接收消息,但与主题的连接和重新平衡仍在发生。 我想知道活页夹级别是否有任何设置
我对Kafka相对来说是新的,我试图在主题上发送消息后产生消费者。 单个生产者在不同的分区上发送200个msg。 我多次运行消费者脚本。
当我向主题“Test19”发送任何消息时,配置的ServiceActivator“ProcessMessage”方法将两条消息显示为配置的两个客户,但这里的问题是,在添加到消费者上下文之前,我需要为每个客户加载入站配置文件…否则,我只能在控制台中得到一条消息…是正确的方式还是我需要在这里改变什么? 谢了。
我的问题是,我无法足够快地轮询我的队列,以保持我的队列为空或接近空。我最初的想法是,我可以让使用者以x/s的速率通过Camel从SQS接收消息。从那里,我可以简单地创建更多的消费者,以达到我需要的消息处理速度。 我的消费者: 如图所示,我设置了和以提高消息的速率,但是我无法生成具有相同endpoint的多个使用者。 我在文档中读到,我相信SQSendpoint也是如此,因为生成多个使用者将只给我一
在某些情况下,我使用Kafka流对主题的小内存(hashmap)投影进行建模。K,V缓存确实需要一些操作,因此它不是GlobalKTable的好例子。在这种“缓存”场景中,我希望我的所有兄弟实例都具有相同的缓存,因此我需要绕过消费者组机制。 要实现这一点,我通常只需使用随机生成的应用程序ID启动我的应用程序,因此每个应用程序每次重新启动都会重新加载主题。唯一的警告是,我最终会有一些消费者群体在Ka
我有一个用例,其中消息需要广播到水平可扩展、无状态的应用程序集群中的所有节点,我正在考虑Kafka。由于集群的每个节点都需要接收主题中的所有消息,因此集群的每个节点都需要有自己的消费者组。 这里可以假设消息量不是很高,以至于每个节点都无法处理所有消息。 为了用Kafka实现这一点,当从主题消费时,我最终会使用消费者流程的instanceId(或某个唯一标识符)作为消费者组id。这将推高消费群体的数
我是Kafka的新手。我看了一眼Kafka文档。似乎分派给订阅消费者组的消息是通过将分区与消费者实例绑定来实现的。 在使用Apache Kafka时,我们应该记住一件重要的事情,即同一消费者组中的消费者数量应该小于或等于所使用主题中的分区数量。否则,将不会收到来自主题的任何消息。 在非prod环境中,我没有配置主题分区。在这种情况下,Kafka是否只有一个分区。如果我启动共享同一组的多个消费者并向
因此,我有一个AWS动态流,在这里我为多个消费者发布事件。对于大多数人来说,接收热门数据很重要,这意味着他们中的许多人可能会同时轮询和读取最新数据。根据AWS文档,增加分片的数量将提高并行度,而每秒读取的数量最多为5次/秒。我的问题是是否(以及如何?)添加更多的碎片是否有助于解决我的所有消费者都是最新的并试图从同一个碎片读取新的传入数据的情况?似乎这种每秒读取数限制自动对您可以拥有的消费者数量进行
我有一个相当原始的流用例:多个生产者和一个消费者周期性地批量处理消息。有时会有多个使用者,这就是为什么我想从“传统”的不可靠队列(rpush/lrange/ltrim)切换到流。 实际上有两个相关的问题: > 必须先通过创建消费者组,然后才能从中创建。每次一个新的消费者开始(使用随机名称)时,它都会被添加到消费者列表中,但即使在流程结束后,它仍会保留在那里。我的理解是,在这种情况下,我需要将所有挂
相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步
提前谢谢你。
我正在使用Spring Cloud Stream 3.0.6(Cloud:hoxton.sr6,Boot 2.3.0.release)和Solace PubSub+。我不能让并发消费者工作。无论我配置什么,总是有一个线程依次执行每个传入消息。 以下是我的代码: 这里会有什么问题? 安慰pubsub+活页夹 本地运行的Solace PubSub+实例的Docker组合文件:
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?
假设我的使用者从一个代理轮询,该代理有多个主题,每个主题有多个分区。我在同一个消费群体中总共有5个消费者。如果我的每个消费者都进行投票,将返回的数据顺序是什么? topicD-分区5 我的问题是,在这个单一的1轮询中,在按顺序移动到下一个主题/分区之前,我会收到来自该主题/分区的所有可用消息吗?意思例如: 在一次投票循环中,我收到了这个... 或者在那个单一的1轮询循环中,有可能接收到这个消息顺序
null 编辑:好的,所以我取得了一些进步(如果我错了请纠正我): 每个消费者都将获得所有消息。 租约被分配了一个EventProcessorHost,所以它需要一个唯一的名称,所以这里的使用者组名称实际上并不相关。 仍然不能百分之百确定context.checkpointasync,但我相信它只适用于ConsumerGroup?