这是我第一次使用Kafka。我遵循了本教程。启动Zookeper后,我启动了kafka服务器。接下来创建了一个主题,然后启动了该主题的消费者。这是当Zookeper日志说 导致会话0x0关闭的异常:null 我正在使用Windows 10。 kafka_2.11-2.1.0 zookeeper-3.4.12
我需要帮助消费者在Spring启动。当断开连接时,我需要停止应用程序,例如10分钟。当断开连接时 或者当无法连接时 我使用ConsumerFactory和ConcurrentKafkaListenerContainerFactory进行消费者的所有配置
我试图有多个消费者的Kafka主题的多个分区与相同的groupId,这将帮助我扩大消费的消息。 根据Kafka的文件,它说: 如何让多个消费者拥有相同的消费者groupId,以实现负载平衡?
在我的应用程序中,有多个企业。每个企业登录并做一些动作,如上传数据,然后Kafka生产者采取的数据,并发送到主题。另一方面,Kafka消费者使用来自主题的数据并执行业务逻辑。并保存到数据库中。在这种情况下,当单个企业登录时,一切都很完美。但当多个企业登录时,则Kafka依次消费。也就是说,我如何使过程并行?在多个客户端请求上。提前谢了。
我正在使用spark结构化流媒体、合流开源Kafka集群开发spark流媒体应用程序,并在AWS EMR中运行spark job。我们至少有20个Kafka主题,以AVRO格式将数据生成单个Kafka主题,每个主题在3到4个分区之间进行了分区。我正在使用Spark阅读所有20个主题(逗号分隔的主题值)。然后从生成的数据帧中过滤每个消息行,使用正确的Avro模式应用每个消息,并将生成的写入S3和Ca
Spring Boot版本:1.5.2.发行版 下面是异常跟踪:
在以下两个示例中,处理通量流的行为似乎不同。 示例1: 从下面的日志中,我们了解到Flux消费者正在不同的线程中运行。(在 *** ). 我在主线程中引入了睡眠,以便可以在控制台中捕获消费者日志。 示例2 从下面的日志中,我们观察到消费者在同一个主线程中运行。(在***中突出显示) 澄清: 为什么第一个示例中的Flux消费者在不同的线程中运行,因为基于R2DBC的存储库(第二个示例)返回的Flux
我需要在一个代码块中使用流中的'n'项,然后完成,本质上是: 在我的情况下,我不能将签名更改为返回而简单地;实际上,我必须抛开流中的一些元素(而不是简单的逻辑)--以便为下游消费者做好准备,而下游消费者不需要知道这是如何发生的,甚至不需要知道这是如何发生的。 这个问题是关于“不做任何事”lambda的。 JDK中是否存在“Do Nothing”使用者,如“Do Nothing”函数?
我用的是Kafka:2.11-1.0.1。应用程序包含主题“X”的并发性为5的使用者,分区为5。 重新启动应用程序并在分区分配之前在主题“X”上发布消息时,主题“X”的5个使用者会找到组协调器,并将加入组请求发送给组协调器。预计会收到小组协调员的回复,但未收到回复。 我检查了Kafka服务器日志,但在调试日志级别找不到相关日志。 当我运行描述消费者组的命令时,作出如下观察: 消费群体正在重新平衡
Kafka 消费者在每个投票中轮询 500 条消息。我们禁用了, 假设我们已成功处理 100 条消息,偏移量也为 100 现在在第101条消息中,我们遇到了一个错误,我们没有提交偏移量 但是因为我们已经有了500条消息,所以我们处理了第102条消息,我们成功地处理了它,并且我们还提交了第102条消息的偏移量。 雀: 第 101 条消息会发生什么。 如何克服这个问题。
我正在使用Kafka和Spring Listener。下面是一段代码。过去,我们发布了超过10万条消息来测试主题,系统似乎运行良好。但几天前,我更改了消费者的groupId。之后,这个新消费者尝试从一开始就处理所有消息,这需要花费大量时间。但过了一段时间(10秒),经纪人会启动消费者。所以结果没有kafka寄存器来侦听消息。 Kafka消费者配置: 然后,我使用cli通过以下命令读取消息并观察到相
我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?
我已经编写了一个Java Kafka消费者。我想确定如何明确确保一旦Kafka消费者启动,它只读取从那时起由制作人发送的消息,即它不应读取制作人已发送给Kafka的任何消息。有人能解释一下如何确保这一点吗 这是我使用的属性的片段 更新9月14日: 我使用的是以下属性,似乎消费者有时仍然从一开始就阅读,有人能告诉我现在出了什么问题吗? 我使用Kafka版本0.8.2
我有两个Kafka集群说A和B,B是A的复制品。仅当 A 关闭且相反,我才希望使用来自集群 B 的消息。然而,使用来自两个集群的消息会导致重复的消息。那么,有什么办法可以将我的 kafka 使用者配置为仅从一个集群接收消息。 谢谢-
我想知道什么是相对于最大水平扩展实例数配置分区数量的好方法。 假设我有一个有6个分区的主题。 我有一个应用程序,它使用的与的6.这意味着我将有6个KafkaMessageListenerContainer,每个都使用一个线程,并且均匀地消耗来自所有分区的消息。 如果以上是正确的,那么我想知道如果我通过添加另一个实例水平缩放应用程序会发生什么?如果新实例具有相同的配置,并发为6,当然也具有相同的消费