这是我第一次使用Kafka。我遵循了本教程。启动Zookeper后,我启动了kafka服务器。接下来创建了一个主题,然后启动了该主题的消费者。这是当Zookeper日志说 导致会话0x0关闭的异常:null 我正在使用Windows 10。 kafka_2.11-2.1.0 zookeeper-3.4.12
我需要帮助消费者在Spring启动。当断开连接时,我需要停止应用程序,例如10分钟。当断开连接时 或者当无法连接时 我使用ConsumerFactory和ConcurrentKafkaListenerContainerFactory进行消费者的所有配置
我试图有多个消费者的Kafka主题的多个分区与相同的groupId,这将帮助我扩大消费的消息。 根据Kafka的文件,它说: 如何让多个消费者拥有相同的消费者groupId,以实现负载平衡?
在我的应用程序中,有多个企业。每个企业登录并做一些动作,如上传数据,然后Kafka生产者采取的数据,并发送到主题。另一方面,Kafka消费者使用来自主题的数据并执行业务逻辑。并保存到数据库中。在这种情况下,当单个企业登录时,一切都很完美。但当多个企业登录时,则Kafka依次消费。也就是说,我如何使过程并行?在多个客户端请求上。提前谢了。
我正在使用spark结构化流媒体、合流开源Kafka集群开发spark流媒体应用程序,并在AWS EMR中运行spark job。我们至少有20个Kafka主题,以AVRO格式将数据生成单个Kafka主题,每个主题在3到4个分区之间进行了分区。我正在使用Spark阅读所有20个主题(逗号分隔的主题值)。然后从生成的数据帧中过滤每个消息行,使用正确的Avro模式应用每个消息,并将生成的写入S3和Ca
关于斯普林斯-Kafka在某些场景中的行为,我有几个问题。任何答案或指针都将是伟大的。 背景:我正在构建一个与外部API对话并返回确认的kafka消费者。我的配置如下所示: 让我们来讨论一个场景,其中调用了消费者的kafkalistener,它调用但是服务关闭,然后根据,它重试3次然后失败,然后调用errorhandler,从而停止KafkalistenerEndpointRegistry。这将关
Spring Boot版本:1.5.2.发行版 下面是异常跟踪:
在以下两个示例中,处理通量流的行为似乎不同。 示例1: 从下面的日志中,我们了解到Flux消费者正在不同的线程中运行。(在 *** ). 我在主线程中引入了睡眠,以便可以在控制台中捕获消费者日志。 示例2 从下面的日志中,我们观察到消费者在同一个主线程中运行。(在***中突出显示) 澄清: 为什么第一个示例中的Flux消费者在不同的线程中运行,因为基于R2DBC的存储库(第二个示例)返回的Flux
我需要在一个代码块中使用流中的'n'项,然后完成,本质上是: 在我的情况下,我不能将签名更改为返回而简单地;实际上,我必须抛开流中的一些元素(而不是简单的逻辑)--以便为下游消费者做好准备,而下游消费者不需要知道这是如何发生的,甚至不需要知道这是如何发生的。 这个问题是关于“不做任何事”lambda的。 JDK中是否存在“Do Nothing”使用者,如“Do Nothing”函数?
我用的是Kafka:2.11-1.0.1。应用程序包含主题“X”的并发性为5的使用者,分区为5。 重新启动应用程序并在分区分配之前在主题“X”上发布消息时,主题“X”的5个使用者会找到组协调器,并将加入组请求发送给组协调器。预计会收到小组协调员的回复,但未收到回复。 我检查了Kafka服务器日志,但在调试日志级别找不到相关日志。 当我运行描述消费者组的命令时,作出如下观察: 消费群体正在重新平衡
Kafka 消费者在每个投票中轮询 500 条消息。我们禁用了, 假设我们已成功处理 100 条消息,偏移量也为 100 现在在第101条消息中,我们遇到了一个错误,我们没有提交偏移量 但是因为我们已经有了500条消息,所以我们处理了第102条消息,我们成功地处理了它,并且我们还提交了第102条消息的偏移量。 雀: 第 101 条消息会发生什么。 如何克服这个问题。
我正在使用Kafka和Spring Listener。下面是一段代码。过去,我们发布了超过10万条消息来测试主题,系统似乎运行良好。但几天前,我更改了消费者的groupId。之后,这个新消费者尝试从一开始就处理所有消息,这需要花费大量时间。但过了一段时间(10秒),经纪人会启动消费者。所以结果没有kafka寄存器来侦听消息。 Kafka消费者配置: 然后,我使用cli通过以下命令读取消息并观察到相
我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?
我已经编写了一个Java Kafka消费者。我想确定如何明确确保一旦Kafka消费者启动,它只读取从那时起由制作人发送的消息,即它不应读取制作人已发送给Kafka的任何消息。有人能解释一下如何确保这一点吗 这是我使用的属性的片段 更新9月14日: 我使用的是以下属性,似乎消费者有时仍然从一开始就阅读,有人能告诉我现在出了什么问题吗? 我使用Kafka版本0.8.2
我有两个Kafka集群说A和B,B是A的复制品。仅当 A 关闭且相反,我才希望使用来自集群 B 的消息。然而,使用来自两个集群的消息会导致重复的消息。那么,有什么办法可以将我的 kafka 使用者配置为仅从一个集群接收消息。 谢谢-