我正在使用来探讨一个Kafka主题。 断断续续地,我会收到以下错误消息,然后是2个警告: 它在警告日志中建议: 这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加会话超时,或者通过使用max.poll.records减小poll()中返回的批的最大大小来解决这一问题。 因此,我需要增加或减少。
我有一个批处理作业,它将一天触发一次。要求是 使用该时间点上关于Kafka主题的所有可用消息 处理消息 如果进程已成功完成,则提交偏移量。 当前,我poll()while循环中的消息,直到ConsumerRecords.isEmpty()为true。当ConsumerRecords.isEmpty()为true时,我假设Topic在该时间点的所有可用记录都已被使用。应用程序维护偏移量并关闭kafk
为了使消费者保持活动(非常长的可变长度处理),我在后台线程中实现了一个空的poll()调用,如果我在poll()之间花费了太多时间,它将防止代理重新平衡。我已经将我的轮询间隔设置得很长,但我不想为了越来越长的处理而一直增加它。 什么是正确的投票没有记录?当前我正在调用poll(),然后重新查找poll call()中返回的每个分区的最早偏移量,这样主线程在处理完前面的消息后就可以正确地读取这些偏移
我当前运行的是Kafka0.10.0.1,两个值的对应文档如下: heartbeat.interval.ms-在使用Kafka的组管理设施时,向消费者协调器发送心跳的预期间隔时间。heartbeat用于确保消费者的会话保持活动,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于session.timeout.ms,但通常不应设置为高于该值的1/3。可以调得更低,以控制正常再平衡的预期时间。
我们有一个问题,有时调用‘轮询’方法的新KafkaConsumer挂起长达20到30分钟后,三个kafka经纪人中的一个得到重新启动! 我们使用的是3 broker kafka设置(0.9.0.1)。我们的消费者进程使用新的Java KafkaConsumer-API,并且我们将分配给特定的TopicPartition。 由于不同的原因我不能在这里展示真正的代码,但基本上我们的代码是这样工作的:
https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html提到,“只要使用者定期发送心跳,它就被认为是活动的、良好的,并且正在处理来自其分区的消息。事实上,轮询消息的行为是导致使用者发送这些心跳的原因。如果使用者停止发送心跳的时间足够长,它的会话将超时,组协调器将认为它已死亡
在火花批处理作业中,我通常将JSON数据源写入文件,并可以使用DataFrame阅读器的损坏列功能将损坏的数据写入单独的位置,并使用另一个阅读器从同一作业中写入有效数据。(数据写成拼花) 但是在火花结构流中,我首先通过Kafka作为字符串读取流,然后使用from_json来获取我的数据帧。然后from_json使用JsonToSTRts,它在解析器中使用FailFast模式,并且不会将未解析的字符
我正在尝试为我的Spark Batch工作检索Kafka补偿。在检索偏移量之后,我想关闭流上下文。 我尝试将streamlistener添加到流上下文,并在作业完成后实现onBatchCompleted方法关闭流,但收到异常“无法停止侦听器总线线程内的StreamingContext”。 有解决办法吗?我正在尝试检索偏移量以调用KafkaUtils。createRDD(sparkContext、k
spring . cloud . stream . Kafka . binder . zknodes是必须的吗?如果价值缺失会发生什么?
我有一个简单的Spring云kafka流应用程序。每次有异常时,应用程序都会终止,我无法覆盖此行为。当有某些类型的异常时,所需的结果是增量退回,或者继续其他类型的异常。我使用和 application.yaml 豆子 事件处理器 继续错误处理程序
我刚开始学Kafka,Kafka-蟒蛇。在下面的代码中,我试图在消息到达时读取它们。但出于某种原因,消费者似乎要等到一定数量的消息积累后才能获取它们。 我最初以为是因为正在批量出版的制片人。当我运行“kafka-console-consumer--bootstrap-servers--topic”时,我可以看到发布后收到的每一条消息(就像在consumer控制台上看到的那样) 有人能指出用KafK
java 我正在使用控制中心来检查这个主题的消费者,并跟踪正在消费的数据。在运行这个应用程序时,它与Kafka和所有分区都连接得很好,我可以在控制中心看到所有的数据都被提取了,但在我的java控制台中没有打印任何数据。但是我注意到,在向Kafka发送一些新数据时,它会在java控制台中打印出来(即,在运行我的消费者后将新数据发送给Kafka)。它应该是这样的吗?还是我做错了什么?根据我的理解,Ka
我有一个应用程序,是基于Spring启动,SpringKafka和Kafka流。当应用程序启动时,它会创建带有默认主题列表的kafka流拓扑。我需要做的是在运行时编辑/重新创建拓扑。例如,当应用程序已经运行时,有新的主题名称出现,我想将此主题添加到我的拓扑结构中。目前,我正在考虑以某种方式删除现有的拓扑,关闭并清理KafkaStreams,在创建拓扑但使用新主题名称的地方运行逻辑,并再次启动Kaf
我想启动一个反序列化动态创建的类的流。这个Bean是使用反射和URLCLassLOader创建的,参数为给定的字符串类,但是KafkaStreams API不识别我的新类。 流与预创建的Beans完美配合,但使用动态Beans时会自动关闭。反倾销者是和杰克逊一起创造的,也是单独工作的。 下面是类解析器代码 首先,我实例化接收类作为参数的serde 然后启动使用此Serdes的流拓扑 流拓扑应该正常