当前位置: 首页 > 知识库问答 >
问题:

使用Kinesis客户端库(KCL 2.x)将多个使用者连接到Kinesis流

汝志
2023-03-14

我有一个使用KCL 2. x从Kinesis消费记录的应用程序,不同流分片中存在的数据格式不同,我想使用具有不同配置的不同KCL消费者单独处理它们。例如,如果我在Kinesis流中有3个分片,我将为不同的分片生成3个具有不同配置的不同消费者,即每个分片1个消费者。当我创建多个消费者时,我面临的问题是,如果我通过java Code配置3个不同的消费者,那么他们中的任何一个都在所有分片上出现租赁锁,而其他消费者无法获得该租约。例如总分片:3,总消费者配置:3,

Application logs :

[2020-07-13 18:55:50,549] (LeaseCoordinator-0000) INFO Worker application-test-stream saw 3 total leases, 3 available leases, 1 workers. Target is 3 leases, I have 0 leases, I will take 3 leases (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:397)
[2020-07-13 18:55:50,549] (LeaseCoordinator-0002) INFO Worker application-test-stream saw 3 total leases, 3 available leases, 1 workers. Target is 3 leases, I have 0 leases, I will take 3 leases (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:397)
[2020-07-13 18:55:50,554] (Thread-22) INFO Initialization complete. Starting worker loop. (software.amazon.kinesis.coordinator.Scheduler:238)
[2020-07-13 18:55:50,842] (LeaseCoordinator-0004) INFO Worker application-test-stream saw 3 total leases, 3 available leases, 1 workers. Target is 3 leases, I have 0 leases, I will take 3 leases (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:397)
[2020-07-13 18:55:51,452] (LeaseCoordinator-0000) INFO Worker application-test-stream successfully took 3 leases: shardId-000000000002, shardId-000000000001, shardId-000000000000 (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:203)
[2020-07-13 18:55:51,457] (LeaseCoordinator-0002) INFO Worker application-test-stream failed to take 3 leases: shardId-000000000002, shardId-000000000001, shardId-000000000000 (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:208)
[2020-07-13 18:55:51,757] (LeaseCoordinator-0004) INFO Worker application-test-stream failed to take 3 leases: shardId-000000000002, shardId-000000000001, shardId-000000000000 (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:208)

我如何配置我的KCL消费者,以便消费者只能租用分配给他的碎片。

共有1个答案

漆雕奇逸
2023-03-14

通常,Kinesis消费者应该是流级别的消费者——例如,每个消费者都使用流中的所有分片。在您的情况下,您可能会忽略/跳过记录处理器中您不关心的记录,具体取决于分片。没有开箱即用的配置选项来仅使用特定的分片。

 类似资料:
  • 我想了解何时从worker调用IRecordProcessor的processRecords方法。如果我之前对processRecords的调用尚未完成,那么worker会调用下一个processRecords吗?worker将开始从kinesis获取新记录,还是等待当前记录完成执行。 基本上,如果processRecords在外部db中保存记录时出现异常,我会等待很长时间,因为db关闭或出现其他

  • 为什么我发现KCL与AWS Lambda一起使用的例子如此之少。https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html 它确实提供了一个很好的实现来跟踪您在流中的位置(检查点)。 我想使用KCL作为消费者。我的设置是一个具有多个碎片的流。每个碎片上都有一个Lambda在消耗。我想使用Lamb

  • 根据doc Dynamodb streams和Kinesis data streams,低级API是相似的,但它们并非完全相同。 我注意到Dynamodb流的GetShardIterator有点不同,即它不支持AT_TIMESTAMP作为分片迭代器类型。 所以,我 我的推理正确吗?我还没有实现它。如果这似乎是一个阻碍点,我宁愿寻找另一个解决方案。

  • 我可以从我的机器发送一个RTSP视频流到亚马逊Kinesis视频流。我想知道是否有可能从一个边缘设备发送多个RTSP视频流(多个生产者)?目前我关注的文档是https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/examples-gstreamer-plugin.html#examples-gstreamer-plugin-docker。

  • 问题内容: 是否可以使用X- Pack通过HTTPS连接到ElasticSearch?切换为使用证书后,自己的连接方式不再起作用。我使用证书没有问题,但是我需要知道从哪里获得证书或将密钥上传到云实例,但是我无法在任何地方找到信息。我也没有收到论坛或IRC中任何人的答复。 有人成功做到了吗?启动5.x实例时不再有警告,因此我认为这是可能的,但是我只是不知道该怎么做。还告诉我,我无法再启动2.4.1实

  • 根据AWS文件: worker使用Java ExecutorService任务调用记录处理器方法。如果任务失败,工作进程将保留对记录处理器正在处理的碎片的控制。工作进程启动一个新的记录处理器任务来处理该碎片。有关详细信息,请参阅阅读节流。 根据AWS文件的另一页: Kinesis客户端库(KCL)依靠您的进程记录代码来处理处理数据记录时出现的任何异常。从进程记录抛出的任何异常都被KCL吸收。为了避