当前位置: 首页 > 知识库问答 >
问题:

如何扇出AWS kinesis流?

空枫涟
2023-03-14

共有1个答案

司徒宇
2023-03-14

有两种方法可以实现Amazon Kinesis流的扇出:

  • 使用Amazon Kinesis Analytics将记录复制到其他流
  • 触发AWS Lambda函数将记录复制到另一个流

选项1:使用Amazon Kinesis Analytics进行扇出

我设法实现了这一点,如下所示:

  • 创建了三个流:input、output1、output2
  • 创建了两个Amazon Kinesis Analytics应用程序:copy1、copy2

Amazon Kinesis Analytics SQL应用程序如下所示:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));

CREATE OR REPLACE PUMP "COPY_PUMP1" AS
  INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";
#!/usr/bin/env python

import json, time
from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
i = 0

while True:
  data={}
  data['log'] =  'Record ' + str(i)
  i += 1
  print data
  kinesis.put_record("input", json.dumps(data), "key")
  time.sleep(2)
from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]
[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']

选项2:使用AWS Lambda

如果要向多个流展开,一个更有效的方法可能是创建一个AWS Lambda函数:

  • 由Amazon Kinesis流记录触发
  • 将记录写入多个Amazon Kinesis“输出”流
 类似资料:
  • 本文向大家介绍如何找出Linux硬盘上的坏扇区,包括了如何找出Linux硬盘上的坏扇区的使用技巧和注意事项,需要的朋友参考一下 Badblocks是一个Linux实用程序,用于评估磁盘电源上不正常的扇区。它创建了可以与其他应用程序(例如mkfs)一起使用的那些扇区的列表,因此它们在某些时候不会被使用,因此不会破坏数据。本文介绍-–如何找出不良部门。Linux硬盘上的坏块。 要获取有关计算机上的块设

  • 我用鼠标光标制作了一个游戏,我想用绿色版本的图像覆盖光标来表示健康状况,但只有与健康百分比相对应的几何部分。来自以下帖子的解决方案:在java中绘制圆的切片? 简而言之,给定任何角度,如何绘制BuffereImage的扇区?

  • 我能够使用Publish/SubscribeRabbitMQ Java教程创建扇出交换,任何连接的使用者都将收到一个消息的副本。我想在连接任何使用者之前创建交换和绑定,而不是动态/编程地声明交换和绑定。我已经通过RabbitMQ管理控制台完成了这一点。然而,由于某种原因,我的消费者以循环方式接收消息,而不是全部接收消息的副本。我错过了什么?下面是一些代码片段: 发布者: 消费者: ...在Rabb

  • 我使用的是Flink 0.10.0数据流。这是我的要求。 我的源系统是广播消息的自定义系统。在我的自定义SourceFunction实现中,我实现了回调来侦听消息。 每个回调都会得到不同类型的消息。 我想解码/转换在回调中收到的对象发送到我的SinkFunction。我相信我可以用FlatMapFunction或类似的软件来完成。 因为我有各种回调,所以我听每个回调的解码逻辑是不同的。我想不能有一

  • 我最近一直在用芹菜,我不喜欢它。它的配置是混乱的,过于复杂的,并且没有很好的文档记录。 我想从一个生产者向多个消费者发送带有芹菜的广播消息。使我困惑的是芹菜术语和底层传输rabbitMQ的术语之间的差异。 在RabbitMQ中,您可以使用单个扇出交换和多个队列来广播消息: 但在Celery中,术语都搞乱了:这里可以有一个广播队列,它向多个消费者发送消息: 我甚至不明白Celery广播队列应该如何工

  • 在我的应用程序中,我使用spring cloud stream集成Rabbit MQ。默认情况下,spring cloud streams将目标创建为Rabbit MQ中的交换类型主题。如何配置spring cloud stream以创建fanout类型的交换?