在某些情况下,我使用Kafka流对主题的小内存(hashmap)投影进行建模。K,V缓存确实需要一些操作,因此它不是GlobalKTable的好例子。在这种“缓存”场景中,我希望我的所有兄弟实例都具有相同的缓存,因此我需要绕过消费者组机制。 要实现这一点,我通常只需使用随机生成的应用程序ID启动我的应用程序,因此每个应用程序每次重新启动都会重新加载主题。唯一的警告是,我最终会有一些消费者群体在Ka
我有一个用例,其中消息需要广播到水平可扩展、无状态的应用程序集群中的所有节点,我正在考虑Kafka。由于集群的每个节点都需要接收主题中的所有消息,因此集群的每个节点都需要有自己的消费者组。 这里可以假设消息量不是很高,以至于每个节点都无法处理所有消息。 为了用Kafka实现这一点,当从主题消费时,我最终会使用消费者流程的instanceId(或某个唯一标识符)作为消费者组id。这将推高消费群体的数
我是Kafka的新手。我看了一眼Kafka文档。似乎分派给订阅消费者组的消息是通过将分区与消费者实例绑定来实现的。 在使用Apache Kafka时,我们应该记住一件重要的事情,即同一消费者组中的消费者数量应该小于或等于所使用主题中的分区数量。否则,将不会收到来自主题的任何消息。 在非prod环境中,我没有配置主题分区。在这种情况下,Kafka是否只有一个分区。如果我启动共享同一组的多个消费者并向
因此,我有一个AWS动态流,在这里我为多个消费者发布事件。对于大多数人来说,接收热门数据很重要,这意味着他们中的许多人可能会同时轮询和读取最新数据。根据AWS文档,增加分片的数量将提高并行度,而每秒读取的数量最多为5次/秒。我的问题是是否(以及如何?)添加更多的碎片是否有助于解决我的所有消费者都是最新的并试图从同一个碎片读取新的传入数据的情况?似乎这种每秒读取数限制自动对您可以拥有的消费者数量进行
我有一个相当原始的流用例:多个生产者和一个消费者周期性地批量处理消息。有时会有多个使用者,这就是为什么我想从“传统”的不可靠队列(rpush/lrange/ltrim)切换到流。 实际上有两个相关的问题: > 必须先通过创建消费者组,然后才能从中创建。每次一个新的消费者开始(使用随机名称)时,它都会被添加到消费者列表中,但即使在流程结束后,它仍会保留在那里。我的理解是,在这种情况下,我需要将所有挂
相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步
提前谢谢你。
我正在使用Spring Cloud Stream 3.0.6(Cloud:hoxton.sr6,Boot 2.3.0.release)和Solace PubSub+。我不能让并发消费者工作。无论我配置什么,总是有一个线程依次执行每个传入消息。 以下是我的代码: 这里会有什么问题? 安慰pubsub+活页夹 本地运行的Solace PubSub+实例的Docker组合文件:
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?
假设我的使用者从一个代理轮询,该代理有多个主题,每个主题有多个分区。我在同一个消费群体中总共有5个消费者。如果我的每个消费者都进行投票,将返回的数据顺序是什么? topicD-分区5 我的问题是,在这个单一的1轮询中,在按顺序移动到下一个主题/分区之前,我会收到来自该主题/分区的所有可用消息吗?意思例如: 在一次投票循环中,我收到了这个... 或者在那个单一的1轮询循环中,有可能接收到这个消息顺序
null 编辑:好的,所以我取得了一些进步(如果我错了请纠正我): 每个消费者都将获得所有消息。 租约被分配了一个EventProcessorHost,所以它需要一个唯一的名称,所以这里的使用者组名称实际上并不相关。 仍然不能百分之百确定context.checkpointasync,但我相信它只适用于ConsumerGroup?
我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。
我有一个Kafka流应用程序,它从几个主题中获取数据,并将数据加入另一个主题。 Kafka配置: 注意:我在运行Kafka Brokers的机器上运行Kafka Streams应用程序。 每小时消耗/产生数百万条记录。每当我让Kafka经纪人倒下时,都会进入再平衡阶段,再平衡大约需要30分钟,有时甚至更长时间。 有人知道如何解决Kafka消费者的再平衡问题吗?而且,很多时候,它在重新平衡时抛出异常
我的应用程序由一个带有POST方法的REST控制器组成,用于提交我必须使用生产者发送到主题的数据。 这是控制器 使用Spring-Cloud-Stream版本 从3.1版开始,和注释被弃用,所以我尝试切换到新的方式来设置生产者,我就是这样工作的 最后在应用程序中。yaml我有这个 现在的问题是,当我启动应用程序时,方法被无限调用(我在主题中看到消息)。然后使用供应商似乎我被迫在供应商内部定义消息数
我们正在运行活动 MQ 5.6.0。在我们的测试环境中,我们有 3 个代理在静态网络中运行。下面是当前方案。我们有6个消费者随机连接到3个经纪人。一个经纪人有3个消费者,第二个有2个,第三个有1个。当我们向队列堆积消息时,我们看到消息积压在第三个代理上,有 1 个使用者,另外两个代理没有获得任何积压,其余 5 个使用者处于空闲状态。 在下面,您将找到我们所有一个代理(dev.queue01)的配置