我必须记录消费者在SpringKafka中花费的时间。由于kafkaListener方法对每条消息都执行,因此在那里放置一个记录器是行不通的。此外,有时一些信息会丢失,而不是被消费者消费掉。我应该把记录器放在哪里,以找出消费者启动后的弹性时间。使用者不会退出或关闭,其轮询将无限期进行
我试图从单个分区主题安排我的消费过程。我可以使用endpointlistenerregistry启动它。start()但我想在消耗完当前分区中的所有消息后,即当我到达当前分区中的最后一个偏移量时停止它。制作成主题是在我完成消费并关闭它之后完成的。我应该如何确保在启动scheduler并停止我的消费者之前,我已经阅读了所有消息?我正在为消费者使用Kafkalistener。
在我的应用程序中(它正在使用Topic1、Topic2和Topic3),我还不断看到与其他主题相关的警告。 该消息通过以下Kafka代码打印:https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/networkclient.java 这是意料之中的吗?为什么这些警告消息会发
在Kafka消费者中,自动偏移重置:最新和启用自动提交:false的主要目的是什么。 在Kafka consumer中,我是否可以在KafkaListner期间检查是否能够接收重复记录或处理以前的记录 在spring boot中创建消费者端时,需要记住的所有必要事项以及日志记录的所有重要事项是什么,以便日志记录具有一定的意义
我使用API编写了一个最小的Kafka消费者代码。 我很好奇它为什么会那样保存。我是不是漏掉了什么?
如何提高Kafka消费者的绩效?我有(并且需要)至少一次Kafka消费语义学 我有以下配置。processInDB()需要2分钟才能完成。因此,仅处理10条消息(全部在单个分区中)就需要20分钟(假设每条消息2分钟)。我可以在不同的线程中调用processInDB,但我可能会丢失消息!。如何在2到4分钟的时间窗口内处理所有10条消息? 下面是我的Kafka消费者代码。
我创建了3个具有相同组id的使用者线程,订阅了相同的主题。已启用自动提交。由于所有3个使用者线程都订阅了相同的主题,我假设每个使用者都将获得一个要使用的分区,并将提交每个分区的偏移量日志。 但我在这里面临着一个奇怪的问题。我所有的留言都是重复的。我从我的每个线程在消费者端获得x倍多的记录。由于我的每个使用者线程都进行无限循环来轮询主题,所以我必须终止该进程。 我甚至尝试了单线程,但我仍然得到重复记
我正在使用Spring Kafka消费者。我已将并发设置为10,并创建了5个消费者(用于5个主题)。所以有50个Spring Kafka消费者线程。 Kafka消费者可以使用的最大线程数是多少?如何增加此线程池的大小?我查阅了spring文档,但没有发现任何相关内容。
我读过许多文章,其中有许多不同的配置来实现一次处理。 下面是我的生产者配置: 以下是我的使用者配置: 我试图跟随,但我遇到了一些问题: 下面是我的生产者代码: 我的消费代码:
在我的Spring Boot Kafka应用程序中,我有以下使用者配置: 消费者: 如果我理解正确的话,现在我有一个消费者的实例。我想增加post消费者的数量,假设有5个消费者将消费来自${kafka.topic.post.send}的不同(不同)消息,以加快消息消费。 它是否像添加工厂一样简单。setConcurrency(5) 至我的PostKafkAlisterContainerFactor
在我们的一个基于spring boot的服务中,我们打算同时连接到两个不同的kafka集群。这些集群都有自己的引导服务器集、主题配置等。它们之间没有任何关联,就像这个问题中的情况一样。 我将有不同类型的消息从不同主题名称的每个集群中读取。可能有或可能没有多个生产者通过此服务连接到两个集群,但我们肯定每个集群至少有一个消费者。 我想知道如何在application.yml中定义属性以满足此设置,以便
我有一个简单的Kafka 2.4.1(Confluent 5.4.1)安装程序在本地Docker中运行。并且我使用了用Java编写的测试生产者和测试使用者。该代码可在GitHub中获得。 单元测试做: 生成器向单个分区主题生成一条消息 用户订阅该主题并在Kafka中查询可用消息 问题是:使用者的第一次运行将跳过主题中已经生成的可用消息。真正的问题是,那些错过的消息会丢失(从使用者的角度来看:偏移量
假设,我有多个Kafka制作者同时为单个Kafka主题生成数据。 有可能得到哪个是给定生产者生产的最后一个偏移吗? 例如: 生产者: 我想找出分别由P1和P2发布的最后一条记录的偏移量。 请注意,我不是在要求全局主题分区偏移量。
给定:我在Kafka中有两个主题,假设主题A和主题B。Kafka流从主题A中读取一条记录,对其进行处理,并产生与所消耗记录相对应的多条记录(假设recordA和recordB)。现在的问题是我如何使用Kafka流来实现这一点。 在这里,读取的记录是消息;处理之后,它返回一个消息列表。如何将此列表划分为两个生产者流?任何帮助都将不胜感激。
我想在kafka主题中搜索特定的消息,我找到的唯一解决方案是使用 有没有一个有效的方法做这件事? 有没有一种方法可以用一个特定的偏移量来限制使用者,即从一开始读到in达到特定的偏移量?