当前位置: 首页 > 知识库问答 >
问题:

重新启动Spark Structured Streaming Job会消耗数百万条Kafka消息并死亡

居琛
2023-03-14

我们有一个运行在Spark2.3.3上的Spark流应用程序

基本上,它开启了一条Kafka流:

  kafka_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "mykafka:9092") \
  .option("subscribe", "mytopic") \
  .load()
   stream = apply_operations(kafka_stream)
   stream.writeStream \
   .format("our.java.sink.Class") \
   .option("some-option", "value") \
   .trigger(processingTime='15 seconds') \
   .start()
   spark.streams.awaitAnyTermination()

我们尝试:

>

  • spark.streaming.backpressure.enabled=true以及spark.streaming.backpressure.initialrate=2000和spark.streaming.kafka.maxratePerpartition=1000和spark.streaming.receiver.maxrate=2000

    将spark.streaming.backpressure.pid.minrate设置为较低的值也没有效果

    设置选项(“maxoffsetspertrigger”,10000)也没有效果

    现在,在我们重新启动管道之后,迟早整个Spark作业会再次崩溃。我们不能简单地扩大内存或内核来用于spark工作。

    在控制一个流批处理中处理的事件的数量方面,我们是否遗漏了什么?

  • 共有1个答案

    程英资
    2023-03-14

    您在注释中写道,您使用的是spark-streaming-kafka-0-82.11,并且该api版本不能处理maxOffsetPerTrigger(或据我所知的任何其他减少已消耗消息数量的机制),因为它只是为较新的api spark-streaming-kafka-0-102.11实现的。根据文档,这个较新的api也适用于您的kafka版本0.10.2.2。

     类似资料:
    • 我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置:

    • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

    • 我们使用的是Spring kafka 2.7非阻塞重试机制。在Spring Kafka重试机制中,Kafka listenser使用来自main topic、Retry topic和DLT topic的消息,我们希望侦听器仅使用来自main和Retry topic的消息。 有没有简单的方法来进行设置? 因为我们不希望同一个消费者处理DLT消息。DLT还将被另一个进程使用,以发送请求通知。

    • 出身背景 我们有一个简单的生产者/消费者风格的应用程序,Kafka作为消息代理,消费者流程作为Kubernetes pods运行。我们定义了两个话题,即话题内话题和话题外话题。属于同一个消费者组的一组消费者pod从主题内读取消息,执行一些工作,并最终在工作完成后将相同的消息(密钥)写出到主题外。 问题描述 我们注意到,在 Kubernetes pod 中运行的消费者向外主题写出了重复的消息。换个说

    • 主要内容:1 并发消费重试,1.1 失败重试,1.2 超时重试,2 顺序消费重试,2.1 失败重试,2.2 超时重试,3 broker处理回退请求,3.1 asyncConsumerSendMsgBack处理回退请求,3.2 handleRetryAndDLQ处理重试和死信消息基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer消费者重试消息和死信消息源码。 消费重试:并发消费和顺序消费对于消费失败的消息均会有消息重试机制。 1 并发消费重试

    • ActiveMQ中的持久主题(这似乎是JMS本身的一个障碍)似乎是一个订阅服务器上只能有一个使用者活动。 也就是说,在ActiveMQ文档中: 使用唯一的JMS clientID和持久订阅服务器名称创建JMS持久订阅服务器MessageConsumer。为了符合JMS,对于一个JMS clientID,在任何时间点只能有一个JMS连接处于活动状态,对于一个clientID和订阅者名称,只能有一个使