当前位置: 首页 > 知识库问答 >
问题:

从Kinesis中的两个不同流获取数据?

周枫涟
2023-03-14

我正试图成为一个动觉消费者客户。为了解决这个问题,我阅读了《Kinesis开发人员指南》和AWS文档http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app-java.html.

我想知道是否有可能从两个不同的流中获取数据并进行相应的处理。

假设我有两个不同的流,分别是流1和流2。

是否可以分别从流和进程中获取数据?

共有1个答案

葛驰
2023-03-14

为什么不呢?两个溪流都get_records。

如果您的流每个只有一个分片,您还将看到所有事件,因为建议使用单个辅助角色处理每个分片,但如果您的逻辑是以某种方式连接来自不同源/流的事件,您可以使用从两个流读取的单个辅助角色来实现它。

请注意,如果您有包含多个碎片的流,那么您的每个工作人员将只看到事件的一部分。您可以有以下选项:

>

一个流(stream1)具有一个碎片,第二个流(stream2)具有多个碎片-在这种情况下,您可以从stream1中读取所有Worker的数据,这也将分别处理stream2中的单个碎片。您的每个工作人员都将看到stream1的所有事件及其在stream2事件中的份额。请注意,您有一个速度限制,可以使用单个碎片(2MB/秒或5次读取/秒)从stream1读取事件,如果stream2中有许多碎片,这可能是一个真正的限制。

这两个流都可以有多个碎片-在这种情况下,由于需要将写入和读取同步到这些流,因此要确保能够“加入”这些事件将更加复杂。您还可以使用一个worker读取两个流的所有碎片,但这不是一个好的做法,因为它限制了您的扩展能力,因为您不再有分布式系统。另一个选项是在两个流中使用相同的partition\u键,并且两个流的碎片数和分区定义都相同,并验证您正在从每个worker中的每个流的“右”碎片中读取数据,并且每次您的一个worker出现故障并重新启动时,您都在正确执行该操作,这可能有点复杂。

您可以考虑的另一个选项是在单个流中写入这两种类型的事件,再次使用相同的partition\u键,然后如果需要以不同的方式处理它们(例如,将它们写入S3中的不同日志文件),则在读卡器端对其进行过滤。

 类似资料:
  • 我试图使用类似于https://github.com/aws-sample/amazon-kinesis-learning的Kinesis客户端库来使用Kinesis数据流。但在这个例子中,他们计划了这个过程。我想消费没有调度器传入的记录。 我不想使用DynamoDB,CloudWatch。期望一个简单的使用者使用流中的记录 有没有什么方法可以在没有调度程序的情况下使用java处理记录

  • 我有一个Kinesis生产者,它将单一类型的消息写入流。我想在多个完全不同的消费者应用程序中处理这个流。因此,给定主题/流的具有单个发布者的发布/订阅。我还想利用检查点来确保每个消费者处理写入流的每条消息。 最初,我对所有消费者和生产者使用相同的应用程序名称。但是,一旦我启动多个消费者,我就开始收到以下错误: 通用域名格式。amazonaws。服务。运动。模型InvalidArgumentExce

  • 我试图在lambda函数中的特定时间戳之后从动觉流中读取记录。我得到碎片,碎片迭代器,然后是数据<当我得到第一个迭代器时,我得到数据,并使用NextShardIterator(返回的数据中存在)递归地调用同一个函数。根据文档,当没有更多数据可读取且达到$latest时,NextShardIterator将返回null<但它从不返回null,函数不断被调用,最终我得到了配置吞吐量超过异常<我还尝试使

  • 我想从Amazon Kinesis流中获取最新记录。我打算从中提取时间戳,并将其与消费者应用程序检查指向的最后一条记录的时间戳进行比较,以检查消费者是否落后。 我不能使用最新的shard迭代器类型。这是因为LATEST指向最近的记录之后,因此它不能用于访问最近的记录。 有没有简单的方法可以获得最新记录? 我正在考虑的一种方法是获取消费者最近处理的记录序列号的碎片迭代器,使用该碎片迭代器发出GetR

  • 我有一个登录表单,用户在其中输入用户名和密码。每个用户都有自己的级别分配和分区。 我主要担心的是,我的代码无法确定当前登录的用户是谁,因此无法获取该代码的gradeassign和sectionassign,因此将显示来自tbl_学生的所有数据。 希望你能帮我。谢谢 这是桌子的结构 tbl_用户 清华大学学生 如果用户XXX登录,则获取的结果和日期必须为: 这是用户登录会话的代码。 和Pass='“

  • 现在,我正在Amazon Kinesis视频流的客户端工作,使用Video.js和HTTP Streaming来显示视频。 然而,在stream server上,每个片段都有一些元数据(仅限文本)(如以下链接:https://aws.amazon.com/about-aws/whats-new/2018/10/kinesis-video-streams-fragment-level-metadat