我正在给Kafka写一个msg,然后在另一端消费。在里面做一些过程,并把它写回另一个Kafka主题。 我想知道哪个消息响应是哪个请求... 当前决定捕获来自消费者侧的偏移id然后在响应中写入和读取响应有效载荷并决定相同。 对于这种方法,我们需要阅读每条消息。根据消费者配置条件,我们还有其他方法可以使用吗?
我将 Kafka 提交策略设置为最新且缺少前几条消息。如果我在开始将消息发送到输入主题之前先睡20秒,那么一切都按预期工作。我不确定问题是否与消费者需要很长时间进行分区重新平衡有关。有没有办法在开始轮询之前知道消费者是否准备好了?
我在一个线程中创建了一个Kafka consumer实例,作为构造函数的一部分,在thread inside run方法中,我确实调用了不同的web服务,为了保持调用的非阻塞性,我正在使用completable future。我的问题是,我无法通过调用thenApply方法并传递Kafka consumer实例来发出commit,因为这会给我一个错误,即Kafka consumer不是线程安全的。
我正在开发一个spring boot kafka消费者应用程序。它将有不同的消费者在不同的主题上工作。使用者的所有信息都来自application.yml文件。 我无法将应用程序属性中的主题列表设置到KafKalistener。 在这两种情况下,我都得到以下错误: java.lang.IllegalArgumentException:无法解析占位符 从应用程序属性获取主题并将其设置在KafkaLi
我有一个由第三方发布的JMS队列。我想在不同的机器上设置多个使用者,只有一台特定机器的使用者确认该队列上的消息。简而言之,如果特定机器的使用者没有接收到消息,那么该消息不应从队列中删除。这是可以实现的吗?
我有一个kafka主题,3个分区,只有一个带批处理的消费者。我在消费者方面使用的是spring kafka和以下消费者道具: 即使队列中有数千条消息(GBs数据)在等待,kafka consumer在每次轮询中也会收到大约10条消息(总大小约为1MB)。使用者应该获取(在我的示例中为15MB)或(在我的示例中为10000)的批处理。有什么问题?
我有一个话题是两个消费群体消费的。题目中有10条留言。 现在我开始应用程序2(消费者组2),它正在消费相同的主题。它不在处理消息。当我描述kafka-consumer-groups(带有--group consumerGroup2)时,它令人惊讶地显示CURRENT-OFFSET=10和LOG-END-OFFSET=10。 理想情况下,这种情况不应该发生,并且kafka应该能够识别对于消费者组2没
问题内容: 现在,Golang Kafka库(sarama)提供了使用者组功能,而kafka 10没有任何外部库帮助。如何在任何给定时间获得使用者组正在处理的当前消息偏移量? 以前,我使用kazoo-go(https://github.com/wvanbergen/kazoo- go )来获取我的消费者组消息偏移量,因为它存储在Zookeeper中。现在,我使用sarama- cluster(ht
问题内容: 我想做的是拥有一组生产者goroutine(其中一些可能完成或可能不完成)和一个消费者例程。问题在于括号中的警告-我们不知道将返回答案的总数。 所以我想做的是: 所以问题是,如果我关闭它是错误的,如果我没有关闭-它仍然是错误的(请参见代码中的注释)。 现在,解决方案将是一个带外信号通道,所有生产者都将其写入: 这完全可以满足我的需求!但是在我看来,这似乎是满口的。我的问题是:是否有任何
我正在尝试了解RxJava并发的一些细节,但我不确定我的想法是否正确。我对SubscribeOn/观察的工作原理有很好的了解,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个1-N生产者-消费者链,其中消费者的数量与CPU一样多。 根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据Reactive合约,运算符只能获得顺序调用。
请帮忙。我在为中的Kinesis数据流设置消费者时遇到问题。NET控制台应用程序。 我已经按照留档做了所有的事情,但是每当我运行消费者时,我仍然会得到一个空白的控制台屏幕。到目前为止,生产者工作正常,AWS凭据也在工作。 我的系统上的JDK配置良好(对Java开发来说并不新鲜) 我有所有必要的政策附加到我的IAM用户 我可以看到生产者可以使用相同的AWS凭据以编程方式创建流、desc流等 我可以在
我正在尝试使用Kafka-Python编写一个消费者,以确保精确的once语义。分区中的消息是使用事务感知生成器生成的。我从Kafka文档中了解到,我应该将指定为,这样它将只读取提交的消息。问题是,我在Python客户机的文档中没有看到关于如何指定的任何地方。关于如何让我的消费者只阅读提交的消息,有什么想法吗? 预期结果:只需获取transation committed消息实际结果:使用者甚至读取
我们正在kubernetes上运行一个5节点flink集群(1.6.3),具有5个分区Kafka主题源。从该主题读取5个作业(具有不同的消费组),每个作业的并行度=5。 每个任务管理器都使用10Gb的ram运行,任务管理器堆大小限制为2Gb。摄取负载相当小(每秒100-200 msgs),平均消息大小约为4-8kb。所有作业都可以正常运行几个小时。经过一段时间,我们突然看到一个或多个作业失败: f
当有多个消费者时,我无法听到kafka主题(我的案例2主题)。在下面的例子中,我有2个消费者工厂,它将接受2个不同的JSON消息(一个是用户类型,另一个是事件类型)。这两条消息都发布到不同的主题。在这里,当我试图从topic1访问事件消息时,我无法访问,但我可以访问用户主题消息。 例如: 我的主要应用如下: 我的需要是首先收听事件主题并对消息进行一些按摩,然后向其发送用户主题,我有另一种方法将收听
当我点击/v2/kafka/local/consumer时得到以下响应: 当我运行bin/kafka-consumer-groups.sh--list--zookeeper localhost:2181时,我会得到以下使用者组。 Console-Consumer-98785 Console-Consumer-1054 Console-Consumer-70637 Console-Consumer-