当前位置: 首页 > 知识库问答 >
问题:

在第一批处理后关闭Spark流式处理上下文(尝试检索kafka偏移量)

东郭自珍
2023-03-14

我正在尝试为我的Spark Batch工作检索Kafka补偿。在检索偏移量之后,我想关闭流上下文。

我尝试将streamlistener添加到流上下文,并在作业完成后实现onBatchCompleted方法关闭流,但收到异常“无法停止侦听器总线线程内的StreamingContext”。

有解决办法吗?我正在尝试检索偏移量以调用KafkaUtils。createRDD(sparkContext、kafkaProperties、OffsetRange[],LocationStrateg)

private OffsetRange[] getOffsets(SparkConf sparkConf) throws InterruptedException {
    final AtomicReference<OffsetRange[]> atomicReference = new AtomicReference<>();

    JavaStreamingContext sc = new JavaStreamingContext(sparkConf, Duration.apply(50));
    JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(sc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(Arrays.asList("test"), getKafkaParam()));
    stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {
                atomicReference.set(((HasOffsetRanges) rdd.rdd()).offsetRanges());
                // sc.stop(false); //this would throw exception saying consumer is already closed
            }
    );
    sc.addStreamingListener(new TopicListener(sc)); //Throws exception saying "Cannot stop StreamingContext within listener bus thread."
    sc.start();
    sc.awaitTermination();
    return atomicReference.get();
}



public class TopicListener implements StreamingListener {
private JavaStreamingContext sc;

public TopicListener(JavaStreamingContext sc){
    this.sc = sc;
}
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted streamingListenerBatchCompleted) {
    sc.stop(false);
}

非常感谢Stackers:)我曾尝试寻找可能的解决方案,但迄今为止没有成功

编辑:我使用KafkaConsumer来获取分区信息。获得分区信息后,我创建一个TopicPartition pojo列表,并调用position和endOffsets方法分别获取我的groupId的当前位置和end位置。

final List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor("theTopicName");
final List<TopicPartition> topicPartitions = new ArrayList<>();
partitionInfos.forEach(partitionInfo -> topicPartitions.add(new TopicPartition("theTopicName", partitionInfo.partition())));
final List<OffsetRange> offsetRanges = new ArrayList<>();
kafkaConsumer.assign(topicPartitions);
topicPartitions.foreach(topicPartition -> {
    long fromOffset = kafkaConsumer.position(topicPartition);
    kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
    long untilOffset = kafkaConsumer.position(topicPartition);
    offsetRanges.add(new OffsetRange(topicPartition.topic(), topicPartition.partition(), fromOffset, untilOffset));
});
return offsetRanges.toArray(new OffsetRange[offsetRanges.size()]);

共有1个答案

缪志新
2023-03-14

如果你想控制流,你可以考虑使用轮询而不是流api。这样,一旦你的目标达到,你就可以清楚地停止投票。

另外看看这个...

https://github.com/dibbhatt/kafka-spark-consumer

 类似资料:
  • 我在spark streaming应用程序中看到一些失败的批处理,原因是与内存相关的问题,如 无法计算拆分,找不到块输入-0-1464774108087

  • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

  • 在 Spark 流式处理中,如何检测空批次? 让我们以有状态流式处理字数为例:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java。是否可以仅在将新单词添加到流中时才打印字数RDD

  • 对于传入记录,我需要验证值,并且基于结果对象,我需要将错误转发到不同的主题,如果成功验证,则使用context.forward()转发相同的错误。可以使用本链接中提供的DSL来完成 现在,调用者再次需要检查并根据键来区分接收器主题。我使用processorAPI是因为我需要use头。 编辑: 当条件为false时,如何推送到不同的流。当前正在创建另一个谓词,该谓词收集链中不满足上述谓词的所有其他记

  • 我正在实现spring kafka批处理侦听器,它读取来自kafka主题的消息列表,并将数据发布到REST服务。我想了解在REST服务停止的情况下的偏移管理,不应该提交批处理的偏移,应该为下一次轮询处理消息。我已经阅读了spring kafka文档,但在理解侦听器错误处理程序和批量查找当前容器错误处理程序之间的区别时存在困惑。我使用的是spring-boot-2.0.0。M7及以下版本是我的代码。

  • 我正在使用Spring Cloud Stream和Kafka Binder批量消费来自一个Kafka主题的消息。我正在尝试实现一个错误处理机制。根据我的理解,我不能在批处理模式下使用Spring Cloud Stream的< code>enableDLQ属性。 我找到了和,以重试并从spring-kafka文档发送失败消息。但我无法理解如何按照功能编程标准将记录发送到自定义DLQ主题。我看到的所有