当前位置: 首页 > 知识库问答 >
问题:

使用AWS处理DynamoDB流JavaDynamoDB流Kinesis适配器

易波涛
2023-03-14

我试图捕获DynamoDB表更改使用DynamoDB流和AWS提供JavaDynamoDB流Kinesis适配器。我正在Scala应用程序中使用AWSJavaSDK。

我从遵循AWS指南和通过AWS发布的代码示例开始。然而,我在让亚马逊自己发布的代码在我的环境中工作方面遇到了问题。我的问题在于Kinesis ClientLibConfiguration对象。

在示例代码中,使用DynamoDB提供的流ARN配置了Kinesis ClientLibConfiguration

new KinesisClientLibConfiguration("streams-adapter-demo",
    streamArn, 
    streamsCredentials, 
    "streams-demo-worker")

我在Scala应用程序中遵循了类似的模式,首先从Dynamo表中查找当前ARN:

lazy val streamArn = dynamoClient.describeTable(config.tableName)
.getTable.getLatestStreamArn

然后使用提供的ARN创建Kinesis ClientLibConfiguration

lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
  "testProcess",
  streamArn,
  defaultProviderChain,
  "testWorker"
).withMaxRecords(1000)
   .withRegionName("eu-west-1")
   .withMetricsLevel(MetricsLevel.NONE)
  .withIdleTimeBetweenReadsInMillis(500)
  .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

我已经验证了提供的流ARN,所有内容都与我在AWS控制台中看到的内容相匹配。

在运行时,我最终得到一个异常,说明提供的ARN不是一个有效的流名称:

com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call
SEVERE: Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation     
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at 
'streamName'    failed to satisfy constraint: Member must satisfy regular 
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code: 
400; Error Code: ValidationException; Request ID: )

查看KinesisClientLibConfiguration上提供的文档,这是有意义的,因为第二个参数列为streamName,没有提到ARN。

我似乎在kinisclientlibconfiguration上找不到任何与ARN相关的内容。因为我使用的是DynamoDB流,而不是Kinesis流,所以我也不确定如何找到我的流名称。

在这一点上,我不确定我在发布的AWS示例中遗漏了什么,看起来他们可能使用了更旧版本的KCL。我使用的是amazon kinesis客户端的1.7.0版。

共有3个答案

柏高丽
2023-03-14

只是想回答问题所在-您提供ARN时,它只需要流名称。

翟丰茂
2023-03-14

或者,您可以使用com。亚马逊。服务。发电机BV2。流线捕捉器。StreamsWorker而不是com。亚马逊。服务。运动。客户图书馆。自由党。工人Worker内部使用AmazondynamodStreamAdapterClient

即。

lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
    getClass.getName, //DynamoDB shard lease table name
    streamArn, //pulled from the dynamo table at runtime
    dynamoCredentials, //DefaultAWSCredentialsProviderChain 
    KeywordTrackingActor.NAME //Lease owner name
).withMaxRecords(1000) //using AWS recommended value
 .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

val worker = new com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker(recordProcessorFactory, kinesisConfig)
池阳伯
2023-03-14

这个问题实际上超出了我的kinisclientlibconfiguration

我能够通过使用相同的配置并提供DynamoDB流适配器库中包含的流适配器以及DynamoDB和CloudWatch的客户端来解决这个问题。

我的工作解决方案现在看起来是这样的。

定义Kinesis客户端配置。

//Kinesis config for DynamoDB streams
lazy val kinesisConfig :KinesisClientLibConfiguration =
    new KinesisClientLibConfiguration(
        getClass.getName, //DynamoDB shard lease table name
        streamArn, //pulled from the dynamo table at runtime
        dynamoCredentials, //DefaultAWSCredentialsProviderChain 
        KeywordTrackingActor.NAME //Lease owner name
    ).withMaxRecords(1000) //using AWS recommended value
     .withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
    .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)

定义流适配器和CloudWatch客户端

val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials)
streamAdapterClient.setRegion(region)

val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials)
cloudWatchClient.setRegion(region)

创建一个RecordProcessorFactory的实例,由您来定义一个类来实现KCL提供的IRecordProcessorFactory和返回的IRecord处理器

val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName)

而我丢失的部分,所有这些都需要提供给你的工人。

val worker :Worker =
  new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(kinesisConfig)
    .kinesisClient(streamAdapterClient)
    .dynamoDBClient(dynamoClient)
    .cloudWatchClient(cloudWatchClient)
    .build()

//this will start record processing
streamExecutorService.submit(worker)
 类似资料:
  • 从这个问题开始,AWS DynamoDB流进入红移 DynamoDB-- 如何配置我的Kinesis函数以获取Lambda函数源? 我创建了一个DynamoDB表(购销),并添加了DynamoDB流。然后,我配置Lambda函数来拾取DynamoDB流。我的问题是如何配置Kinesis以获取Lambda函数源?我知道如何配置Lambda转换,不过我想把它作为源代码。不确定如何配置下面的直接输入源。

  • 我目前正在使用DynamoDB流,并期待着转向Kinesis流,因为我想控制我喜欢从流中处理的记录数量。 我一直在读有关Kinesis流和lambda的文章。有很多关于Kinesis流和EC2的多用户和KCL等的文章。 null

  • 我们希望将数据从DynamoDB非关系型数据库作为流连续移动到红移数据库。我很难理解AWS中的所有新术语/技术。有 1) DynamoDB流 2)AWS Lambda 3) AWS Kinesis消防水带 有人能提供每一个的简要总结吗。什么是DynamoDB流?这与亚马逊运动有何不同?在阅读了所有的资源后,这是我对假设的理解,请在下面进行验证。 (a)我假设DynamoDB Streams,创建非

  • 我是AWS的新手。我已经使用Java在aws中实现了一些功能。我的要求是一次向RDS PostgreSQL实例插入50MB的csv。 我尝试了aws lmabda服务。但是5分钟后,lambda会停止,所以我就这样放弃了。(lambda函数的限制) 第二步,我编写了s3事件的java lambda代码,它将使用putrecord命令将s3上的csv文件读到kinesis流中。根据我的理解,kine

  • 希望创建一个DynamoDB全局表来存储客户信息。我的问题是,我目前的模式是监听此表上的更改,并使用Lambda触发器发送电子邮件更新。 i、 e.您的个人资料信息已更改。如果不是你。。 我现在是否需要在每个区域中使用该Lambda?数据复制是否意味着每个区域都会触发该Lambda?