当前位置: 首页 > 知识库问答 >
问题:

使用Spark流处理Kafka消息时的挑战

山高峰
2023-03-14
  1. 如果每个Kafka消息属于一个特定的会话,如何管理会话关联,以便同一个Spark执行器看到链接到一个会话的所有消息?
  2. 如何确保属于会话的消息被Spark executor按照在Kafka中报告的顺序处理?我们能以某种方式实现这一点而不对线程计数施加限制并导致处理开销(如按消息时间戳排序)吗?
  3. 何时检查会话状态?在执行器节点崩溃的情况下,如何从最后一个检查点恢复状态?在驱动程序节点崩溃的情况下,如何从最后一个检查点恢复状态?
  4. 如果节点(执行器/驱动程序)在检查其状态之前崩溃,如何恢复状态?如果Spark通过重放消息重新创建状态RDD,那么它从哪里开始重放Kafka消息:wards上的最后一个检查点,还是处理重新创建分区所需的所有消息?Spark streaming是否可以在多个Spark streaming批处理中恢复状态,或者只对当前批处理恢复状态?例如,如果在上一批处理期间没有执行检查点,是否可以恢复状态?

共有1个答案

墨星鹏
2023-03-14

如果每个Kafka消息属于一个特定的会话,如何管理会话亲和力,以便同一个Spark执行器看到链接到一个会话的所有消息?

Kafka将主题划分为分区,每个分区一次只能由一个使用者读取,因此您需要确保属于一个会话的所有消息都进入同一个分区。分区分配是通过分配给每个消息的密钥来控制的,因此实现这一点的最简单方法可能是在发送数据时使用会话id作为密钥。这样,同一个使用者将获得一个会话的所有消息。但是有一个警告:当一个消费者加入或离开ConsumerGroup时,Kafka将重新平衡分区分配给消费者。如果这种情况发生在会话中期,那么它可能(也将)发生,即在重新平衡之后,该会话的一半消息将传递给一个消费者,另一半将传递给另一个消费者。为了避免这种情况,您需要手动订阅代码中的特定分区,以便每个处理器都有其特定的分区集,并且不更改这些分区。查看ConsumerStrategies.SparkKafka组件代码中的指定代码。

如何确保属于会话的消息被Spark executor按照在Kafka中报告的顺序处理?我们是否可以在不对线程计数施加限制和不增加处理开销的情况下实现这一点(比如按消息时间戳排序)?

我建议阅读Spark Streaming+Kafka集成指南中的偏移存储部分,它应该已经回答了很多问题。

简短的版本是,您可以将上次读取的偏移量保存到Kafka中,并且无论何时检查执行器都应该这样做。这样,每当一个新的执行者开始处理时,不管它是否是从检查点恢复的,它都知道在Kafka中从哪里读取。

如果节点(执行器/驱动程序)在检查其状态之前崩溃,如何恢复状态?如果Spark通过重放消息重新创建状态RDD,那么它从哪里开始重放Kafka消息:wards上的最后一个检查点,还是处理重新创建分区所需的所有消息?Spark streaming可以/是否可以在多个Spark streaming批中恢复状态,或者只对当前批恢复状态,即如果在最后一个批中没有执行检查点,是否可以恢复状态?

 类似资料:
  • 我最近看到了这篇关于Apache Kafka文档的文章,内容涉及如何处理Kafka流中的无序消息 https://kafka.apache.org/21/documentation/streams/core-concepts#streams_out_of_ordering 有人能给我解释一下下面这句话背后的原因吗: 在主题分区中,记录的时间戳可能不会随着它们的偏移量单调地增加。由于Kafka流总是

  • 我们一直在使用SI Kafka进行一个新项目,并取得了很大成功。在最近的一次切换之前,我们使用KafkaTopicOffsetManager来管理我们的消费者主题偏移量。为了避免每个消费者/主题对都有额外的主题,并使用Burrow或lag监控,我们决定使用最新的KafkaNativeOffsetManager,它使用Kafka提供的本机偏移管理。但在切换之后,我们注意到目标主题的消息消耗持续滞后。

  • 我们正在应用程序中使用apache kafka streams 0.10.2.0。我们利用kafka streams拓扑将处理后的数据传递到下一个主题,直到处理结束。 此外,我们使用AWS ECS容器来部署消费者应用程序。我们观察到消费者正在拾取非常旧的消息进行处理,尽管它们已经在更早的时候处理过。这个问题在服务扩展/缩减或新部署时随机发生。我知道在消费者重新平衡时,有些消息可以重新处理。但在这种

  • 我注意到Kafka记录有一个CRC字段。如果日志文件中的记录损坏(例如,消息中间的一个比特被翻转),那么在流应用程序中,我希望看到的是: 该主题被复制 由于我们使用的是Avro,我可以想象以下情况之一: 底层基础设施检测到CRC错误,并从另一个代理获取该错误

  • 我正在使用来使用来自spring-boot应用程序中某个主题的消息,我需要定期运行该应用程序。spring-kafka版本是2.2.4.发行版。

  • 在本公司的最后一个项目中:客户提出身份验证等请求,应用程序第一层得到客户请求并在Kafka上生成消息,核心服务消费该消息后向银行服务提出rest请求,得到响应后在Kafka上生成响应消息,应用程序第一层将消息传递给客户。是真的Kafka用例,还是去掉第一层和Kafka,在客户端和核心之间使用rest服务更好。谢谢