当前位置: 首页 > 知识库问答 >
问题:

在python中消耗运动流

苏麒
2023-03-14

我似乎找不到一个像样的例子来说明如何通过Python使用AWS动态流。有人能给我举几个例子吗?

最好的

共有2个答案

佘缪文
2023-03-14

虽然这个问题已经得到了回答,但对于未来的读者来说,考虑使用Python的Kinesis客户端库(KCL)而不是直接使用boto可能是个好主意。当您有多个消费者实例和/或更改分片配置时,它简化了从流中的消费。

https://aws.amazon.com/blogs/aws/speak-to-kinesis-in-python/

更完整地列举KCL提供的内容

  • 连接到流
  • 枚举碎片
  • 与其他工作人员(如果有)协调碎片关联
  • 为其管理的每个碎片实例化一个记录处理器
  • 从流中提取数据记录
  • 将记录推送到相应的记录处理器
  • 检查点处理记录(它使用DynamoDB,因此您的代码不必手动保留检查点值)
  • 当工作者实例计数更改时,平衡碎片工作者关联
  • 分割或合并碎片时平衡碎片工作者关联

粗体的项目是我认为KCL在boto上真正提供了非平凡价值的项目。但是,根据您的用例,boto可能要简单得多。

奚修伟
2023-03-14

你应该使用boto。运动:

from boto import kinesis

创建流后:

第1步:连接到aws kinesis:

auth = {"aws_access_key_id":"id", "aws_secret_access_key":"key"}
connection = kinesis.connect_to_region('us-east-1',**auth)

第2步:获取流信息(例如有多少分片,如果它是活动的...)

tries = 0
while tries < 10:
    tries += 1
    time.sleep(1)
    try:
        response = connection.describe_stream('stream_name')   
        if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
            break 
    except :
        logger.error('error while trying to describe kinesis stream : %s')
else:
    raise TimeoutError('Stream is still not active, aborting...')

步骤3:获取所有碎片id,并为每个共享id获取碎片迭代器:

shard_ids = []
stream_name = None 
if response and 'StreamDescription' in response:
    stream_name = response['StreamDescription']['StreamName']                   
    for shard_id in response['StreamDescription']['Shards']:
         shard_id = shard_id['ShardId']
         shard_iterator = connection.get_shard_iterator(stream_name, shard_id, shard_iterator_type)
         shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })

步骤4:读取每个碎片的数据

limit是您希望接收的记录的限制。(最多可以接收10 MB)shard_迭代器是上一步的共享html" target="_blank">迭代器。

tries = 0
result = []
while tries < 100:
     tries += 1
     response = connection.get_records(shard_iterator = shard_iterator , limit = limit)
     shard_iterator = response['NextShardIterator']
     if len(response['Records'])> 0:
          for res in response['Records']: 
               result.append(res['Data'])                  
          return result , shard_iterator

在下一次打电话给get_records时,您应该使用上一次get_records结果中收到的shard_iterator。

注意:在一次获取_记录的调用中,(limit=None),您可以收到空记录。如果调用get_记录时有限制,您将获得相同分区键中的记录(当您将数据放入流时,您必须使用分区键:

connection.put_record(stream_name, data, partition_key)
 类似资料:
  • 问题内容: 我想使用win32com扩展来实现python com服务器。然后从.NET内部使用服务器。我使用以下示例来实现com服务器,并且该服务器可以正常运行,但是当我尝试使用C#对其进行使用时,出现了FileNotFoundException并显示以下消息“为具有CLSID {676E38A6-7FA7-4BFF-9179的组件检索COM类工厂” -AE959734DEBB}由于以下错误而失

  • 我使用KCL编写了一个Amazon Kinesis消费者,KCL管理我的记录处理任务。它当前正在处理记录而不进行筛选。我正在寻找一种方法来处理样本记录,同时跳过其中的一些记录。 例如,如果总共有100条记录,我只想处理其中的1/10(10个样本记录)。 谢谢

  • 我们使用的是Spring kafka 2.7非阻塞重试机制。在Spring Kafka重试机制中,Kafka listenser使用来自main topic、Retry topic和DLT topic的消息,我们希望侦听器仅使用来自main和Retry topic的消息。 有没有简单的方法来进行设置? 因为我们不希望同一个消费者处理DLT消息。DLT还将被另一个进程使用,以发送请求通知。

  • 我在spring boot中创建了一些服务,我有11个fat jars,我将它们部署在docker容器中,我怀疑每个jar在没有任何使用的情况下消耗了1到1.5 GB的RAM,我通过运行以下命令来检查RAM: 起初我以为是java容器,我试图改成一个使用alpine的容器,但没有任何变化,所以我认为唯一的问题是我的罐子。有没有办法更改罐子正在使用的 RAM?或者这种行为是正常的,因为每个罐子都有一

  • 我正在开发一个使用Spring Integration 5.0.1和Spring Boot 2.0.0的应用程序。RC1 目前,应用程序响应并运行一些可能需要一段时间才能完成的初始化代码。这不使用任何Spring集成组件。 我还有一些非常基本的集成流,使用JavaDSL编写,并在配置中声明为bean。 有什么方法可以推迟流何时开始消耗消息吗?我希望能够在初始化完成时手动启动它们。 配置似乎是解决方

  • 我有一个Java pc应用程序,一旦加载相应的配置数据,CPU性能就会逐步提高。 基本有2个线程,一个主线程,一个副线程。在主线程中生成一个登录和一个数据加载,而在第二个线程中每10秒生成一次查询。如果会话启动但未加载数据,则第二个线程继续查询但性能最小。CPU消耗的增加是在加载数据后产生的,并且逐渐增加。 此外,如果再次加载数据,则消耗将降至最低,一旦加载,消耗将再次增加。 由于保密问题,我不能