当前位置: 首页 > 知识库问答 >
问题:

Spark结构化流式Kafka偏移管理

万俟均
2023-03-14

我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreamsStream.AsInstanceOf[CanCommitOffsets].CommitAsync(offsetRanges)工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现?

我知道使用.option(“checkpointLocation”,checkpointLocation)进行hdfs检查点,但我对内置的偏移量管理感兴趣。

我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

共有1个答案

米树
2023-03-14

我正在使用在某处找到的这段代码。

public class OffsetManager {

    private String storagePrefix;

    public OffsetManager(String storagePrefix) {
        this.storagePrefix = storagePrefix;
    }

    /**
     * Overwrite the offset for the topic in an external storage.
     *
     * @param topic     - Topic name.
     * @param partition - Partition of the topic.
     * @param offset    - offset to be stored.
     */
    void saveOffsetInExternalStore(String topic, int partition, long offset) {

        try {

            FileWriter writer = new FileWriter(storageName(topic, partition), false);

            BufferedWriter bufferedWriter = new BufferedWriter(writer);
            bufferedWriter.write(offset + "");
            bufferedWriter.flush();
            bufferedWriter.close();

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    /**
     * @return he last offset + 1 for the provided topic and partition.
     */
    long readOffsetFromExternalStore(String topic, int partition) {

        try {

            Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));

            return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;

        } catch (Exception e) {
            e.printStackTrace();
        }

        return 0;
    }

    private String storageName(String topic, int partition) {
        return "Offsets\\" + storagePrefix + "-" + topic + "-" + partition;
    }

}

SaveOffset...在记录处理成功后调用,否则不存储偏移量。我使用Kafka主题作为源,所以我指定startingoffsets作为从readoffsets中检索到的偏移量...

 类似资料:
  • 背景:我写了一个简单的spark结构化蒸app,把数据从Kafka搬到S3。我发现,为了支持一次准确的保证,spark创建了_spark_metadata文件夹,但该文件夹最终变得太大,当流应用程序运行很长时间时,元数据文件夹变得太大,以至于我们开始出现OOM错误。我想摆脱Spark结构化流的元数据和检查点文件夹,自己管理偏移量。 我们如何管理Spark Streaming中的偏移量:我使用了va

  • 我正在使用spark structured streaming(2.2.1)来消费来自Kafka(0.10)的主题。 我的检查点位置设置在外部HDFS目录上。在某些情况下,我希望重新启动流式应用程序,从一开始就消费数据。然而,即使我从HDFS目录中删除所有检查点数据并重新提交jar,Spark仍然能够找到我上次使用的偏移量并从那里恢复。偏移量还在哪里?我怀疑与Kafka消费者ID有关。但是,我无法

  • 我正在使用Spark 2.2上的Spark结构化流媒体将文件从HDFS目录流式传输到Kafka主题。我想为我写的主题数据捕捉Kafka偏移量。 我正在使用 给Kafka写信。 当我利用 为了捕获流的进度信息,检索到的信息与Kafka中创建的偏移量不相关。 我假设这是因为流提供的信息实际上是关于我正在使用的文件流的,而与Kafka中编写的内容无关。 有没有一种Spark Structure流式处理方

  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

  • Spark(v2.4)程序功能: 在spark中以结构化流模式从Kafka队列读取JSON数据 按原样在控制台上打印读取的数据 问题获取: -获取

  • 批处理查询中似乎不支持“最新”。我想知道是否有可能用另一种方法做类似的事情(不直接处理偏移)