我是Storm世界的新手。在我的拓扑中,我使用Kafka的数据,并使用SpoutConfig
。
通过一些测试,我得到了以下警告消息:
2015-10-01 23:31:51.753 s.k.KafkaUtils[警告]获取了偏移量超出范围的获取请求:[85970]2015-10-01 23:31:51.755 s.k.PartitionManager[警告]使用新偏移量:0
我的\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
但是,当我删除旧拓扑并提交新拓扑时,会显示警告消息,新拓扑无法使用偏移量。如果我忘了什么,你能给我一个好主意吗?
最后,我发现我错过了什么。由于我的拓扑使用两个不同的Kafka源,因此我使用不同的spoutConnifg
实例创建了两个KafkaSpout
,每个实例都分配给每个KafkaSpout
。
但是当我实例化每个SpoutConfig
实例时,我向每个实例传递了相同的id。这导致只有一个id为的znode,即使我希望从两个不同的来源消费。
因此,两个PartitionManager
尝试使用具有偏移信息的同一个znode。如果PartitionManager
中的一个尝试使用偏移量为另一端的新元组,则会显示警告消息。
如何从动物园管理员那里获得最后一次偏移时间?当使用Storm喷口阅读来自Kafka的消息时。上下文:Kafka 不断获取消息,使用者读取一段时间,然后由于任何原因关闭,然后使用者仅读取最新消息,但不读取上次偏移量读取
我创建了以批处理方式接收消息的ConsumerConfig: Spring启动配置: 侦听器类 : 我在处理消息后使用手动确认。 我找到了一些调试日志: 在上面的调试日志中,***获取偏移量发生在偏移量提交之前,该偏移量未提交,因此它返回offset_OUT_OF_RANGE,之后使用者无法接收任何消息。是否有任何方法处理使用者代码中的此错误,或如何仅在提交后获取偏移量****
我设置了3个节点汇流/Kafka都指向同一个动物园管理员 所有3台服务器都已播发。Listener=带有明文的公共ipv4 当我运行消费者py客户机时,它只是保持打开状态,没有得到任何消息来澄清上面我在网上找到的测试代码,我没有编写它,因为我还在学习Kafka API
我从Spring Boot应用程序向Kafka发送消息 application.properties 配置 在日志中,我可以看到如下消息: SUCCESS: SendResult[producerRecords=产品记录(主题=uniqTopic123,分区=null,标头=RecordHeaders(标头 = [], isReadOnly=true),key=testKey,value=Test
我对非常陌生,我们正在使用。 我需要做的是使用来自主题的消息。为此,我必须用Java编写一个消费者,它将消费来自主题的消息,然后将该消息保存到数据库。保存消息后,将向Java消费者发送一些确认。如果确认为true,则应使用主题中的下一条消息。如果AcknowlDement为false(这意味着由于某些错误消息,从主题读取的信息无法保存到数据库中),则应再次读取该消息。 我认为我需要使用<code>