我正在使用Spring Cloud Stream 3.0.6(Cloud:hoxton.sr6,Boot 2.3.0.release)和Solace PubSub+。我不能让并发消费者工作。无论我配置什么,总是有一个线程依次执行每个传入消息。 以下是我的代码: 这里会有什么问题? 安慰pubsub+活页夹 本地运行的Solace PubSub+实例的Docker组合文件:
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?
假设我的使用者从一个代理轮询,该代理有多个主题,每个主题有多个分区。我在同一个消费群体中总共有5个消费者。如果我的每个消费者都进行投票,将返回的数据顺序是什么? topicD-分区5 我的问题是,在这个单一的1轮询中,在按顺序移动到下一个主题/分区之前,我会收到来自该主题/分区的所有可用消息吗?意思例如: 在一次投票循环中,我收到了这个... 或者在那个单一的1轮询循环中,有可能接收到这个消息顺序
null 编辑:好的,所以我取得了一些进步(如果我错了请纠正我): 每个消费者都将获得所有消息。 租约被分配了一个EventProcessorHost,所以它需要一个唯一的名称,所以这里的使用者组名称实际上并不相关。 仍然不能百分之百确定context.checkpointasync,但我相信它只适用于ConsumerGroup?
我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。
我有一个Kafka流应用程序,它从几个主题中获取数据,并将数据加入另一个主题。 Kafka配置: 注意:我在运行Kafka Brokers的机器上运行Kafka Streams应用程序。 每小时消耗/产生数百万条记录。每当我让Kafka经纪人倒下时,都会进入再平衡阶段,再平衡大约需要30分钟,有时甚至更长时间。 有人知道如何解决Kafka消费者的再平衡问题吗?而且,很多时候,它在重新平衡时抛出异常
我的应用程序由一个带有POST方法的REST控制器组成,用于提交我必须使用生产者发送到主题的数据。 这是控制器 使用Spring-Cloud-Stream版本 从3.1版开始,和注释被弃用,所以我尝试切换到新的方式来设置生产者,我就是这样工作的 最后在应用程序中。yaml我有这个 现在的问题是,当我启动应用程序时,方法被无限调用(我在主题中看到消息)。然后使用供应商似乎我被迫在供应商内部定义消息数
我是的新手,正在尝试使用组件。 我在这里简化了代码,但基本上我有一个参与者,它从endpoint使用。 因此,我尝试这种“直接”方法,但在endpoint:endpoint[direct://mycamelendpoint]上没有可用的消费者。Exchange[message:My test message]异常。 我在这里缺少什么配置?
我们正在运行活动 MQ 5.6.0。在我们的测试环境中,我们有 3 个代理在静态网络中运行。下面是当前方案。我们有6个消费者随机连接到3个经纪人。一个经纪人有3个消费者,第二个有2个,第三个有1个。当我们向队列堆积消息时,我们看到消息积压在第三个代理上,有 1 个使用者,另外两个代理没有获得任何积压,其余 5 个使用者处于空闲状态。 在下面,您将找到我们所有一个代理(dev.queue01)的配置
我的问题是关于Kafka在爪哇的消费者 > 已启动Kafka服务器 创建的主题 创作者 创建的消费者 我在终端中做的所有这些事情,工作正常,能够在消费者处正确接收日志。运行下面的消费者(在Java),但没有收到任何记录。它继续汇集在 也没有收到任何记录。 下面给出的我的 java 消费者代码 请告诉我在java消费者类中接收消息,我在配置中做错了什么吗?属性中的“group.id”是怎么回事? 下
我正在尝试使用NodeJS从远程机器连接到远程Apache Kafka服务器。我无法从nodejs代码中生成所需的kafka主题的消息。我也无法消费任何数据从主题以及。 我使用的是Apache-kafka版本2.122.2.1和Java8。我也在使用节点版本8.11.0。我还启动了zookeeper服务器和kafka服务器。我在ubuntu机器上本地创建了一个主题和一个生产者和消费者,以检查apa
最近,我们与Kafka消费者和生产商之间出现了一些性能问题。我们在scala中使用Kafka Java API。打开和关闭消费者和生产者对象的良好做法是什么?我认为这是一个非常开放的问题,正确的答案总是,但我正在尝试对此进行推理。 消费者可以长时间运行连接并保持开放吗? 当我们完成信息生成时,生产者是否应该关闭?
我有一个处于RPC模式的消费者(RabbitListner),我想知道是否有可能引发发布者可以处理的异常。 为了更清楚地说明我的情况如下: 发布者以RPC模式发送消息 消费者收到消息,检查消息的有效性,如果由于缺少参数,消息无法计数,那么我想抛出异常。异常可以是特定的业务异常,也可以是特定的AmqpException,但我希望发布者可以在未进入超时状态时处理该异常 我尝试使用AmqpRejectA
目前,我每5分钟调用一次消费api来消费数据。在这种情况下,不必要地调用了消费者api,但我希望在Kafka中有新数据可用时调用该api。
那么我如何确保我的队列中只有一个消费者呢?