当前位置: 首页 > 知识库问答 >
问题:

Apache Beam Kinesio Java-使用kinesis流中的数据

鲁光霁
2023-03-14

首先,我想说这对Beam世界来说是全新的。我正在处理一个以Apache Beam为中心的任务,我的主要数据源是Kinesis流。在那里,当我使用流数据时,我注意到当我重新启动程序(我的消费者应用程序)时,相同的数据集也会出现。这是我的代码,

    String awsStreamName = KinesisStream.getProperty("stream.name");
    String awsAccessKey = KinesisStream.getProperty("access.key");
    String awsSecretKey = KinesisStream.getProperty("secret.key");
    String awsRegion = KinesisStream.getProperty("aws.region");
    Regions region = Regions.fromName(awsRegion);

    return KinesisIO.read()
            .withStreamName(awsStreamName)
            .withInitialPositionInStream(InitialPositionInStream.LATEST)
            .withAWSClientsProvider(awsAccessKey, awsSecretKey, region);

简单地说,我想要的是,我需要从我之前阅读的地方开始阅读数据。如果有人能提供一些资源,我真的很感激。

我还发现了一个类似的问题,但它对我没有帮助-Apache Beam Kinesis IOJava处理管道-应用程序状态,错误处理

共有1个答案

顾昊穹
2023-03-14

波束中的无界震源,如Kinesio。read()支持在重新启动应用程序后使用检查点标记从最新的检查点恢复检查点。

这些检查点必须持久化到持久存储。但是,具体如何完成取决于您正在使用的Beam运行程序,例如Dataflow、Apache Flink或Apache Spark。

我建议在检查点上阅读相应运行时的留档,并检查相应Beam运行程序的管道选项。

例如,在Apache Flink的情况下,您必须通过check point ingInterval(Flink PipelineOptions)启用检查点,并在Flink中配置检查点。

 类似资料:
  • 我正在测试Apache Flink(使用v1.8.2)从Kinesis Data Stream读取消息的速度。Kinesis Data Streams仅包含一个分片,它包含40,000条消息。每个消息大小小于5 KB。 尝试使用TRIM\u HORIZON从最旧的消息中读取流,我希望该应用程序能够快速读取所有消息,因为通过GetRecords,每个碎片可以支持高达每秒2 MB的最大总数据读取速率。

  • 我试图使用类似于https://github.com/aws-sample/amazon-kinesis-learning的Kinesis客户端库来使用Kinesis数据流。但在这个例子中,他们计划了这个过程。我想消费没有调度器传入的记录。 我不想使用DynamoDB,CloudWatch。期望一个简单的使用者使用流中的记录 有没有什么方法可以在没有调度程序的情况下使用java处理记录

  • 我想制作下面的数据发送架构。 生产商-- 消费者服务器可以关闭,因此我认为应该至少有两个消费者。是这样吗? 当一个数据流有两个使用者时,是否有任何方法可以处理每个使用者一半的数据?正如我所知,这是不可能的。如果每个消费者都使用相同的数据,那就是浪费时间和成本。因为我只为高可用性提供了两个消费者。(用于故障切换) 在web was体系结构中,ELB或L4可以通过负载平衡将一半数据发送到每个was服务

  • 我试图从覆盆子派流视频使用aws kinesis视频流。我们在Aws站点上使用了C++sdk(https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-cpp) [错误][19-04-2020 19:20:33:859.598 GMT]createKinesisVideoStreamSync():未能创建Kinesis

  • Serverless 适合用于事件驱动型应用,以及定时任务。今天,让我们来看看一个事件驱动的例子。 在之前的那篇《Serverless 应用开发指南:CRON 定时执行 Lambda 任务》中,我们介绍了如何调度的示例。 最初我想的是通过 Lambda + DynamoDB 来自定义数据格式,后来发现使用 Kinesis Streams 是一种更简单的方案。 Amazon Kinesis Stre

  • 我正试图成为一个动觉消费者客户。为了解决这个问题,我阅读了《Kinesis开发人员指南》和AWS文档http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app-java.html. 我想知道是否有可能从两个不同的流中获取数据并进行相应的处理。 假设我有两个不同的流,分别是流1和流