我试图捕获DynamoDB表更改使用DynamoDB流和AWS提供JavaDynamoDB流Kinesis适配器。我正在Scala应用程序中使用AWSJavaSDK。
我从遵循AWS指南和通过AWS发布的代码示例开始。然而,我在让亚马逊自己发布的代码在我的环境中工作方面遇到了问题。我的问题在于Kinesis ClientLibConfiguration
对象。
在示例代码中,使用DynamoDB提供的流ARN配置了Kinesis ClientLibConfiguration
。
new KinesisClientLibConfiguration("streams-adapter-demo",
streamArn,
streamsCredentials,
"streams-demo-worker")
我在Scala应用程序中遵循了类似的模式,首先从Dynamo表中查找当前ARN:
lazy val streamArn = dynamoClient.describeTable(config.tableName)
.getTable.getLatestStreamArn
然后使用提供的ARN创建Kinesis ClientLibConfiguration
:
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
"testProcess",
streamArn,
defaultProviderChain,
"testWorker"
).withMaxRecords(1000)
.withRegionName("eu-west-1")
.withMetricsLevel(MetricsLevel.NONE)
.withIdleTimeBetweenReadsInMillis(500)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
我已经验证了提供的流ARN,所有内容都与我在AWS控制台中看到的内容相匹配。
在运行时,我最终得到一个异常,说明提供的ARN不是一个有效的流名称:
com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask call
SEVERE: Caught exception while sync'ing Kinesis shards and leases
com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation
error detected: Value 'arn:aws:dynamodb:eu-west-1:STREAM ARN' at
'streamName' failed to satisfy constraint: Member must satisfy regular
expression pattern: [a-zA-Z0-9_.-]+ (Service: AmazonKinesis; Status Code:
400; Error Code: ValidationException; Request ID: )
查看KinesisClientLibConfiguration
上提供的文档,这是有意义的,因为第二个参数列为streamName,没有提到ARN。
我似乎在kinisclientlibconfiguration
上找不到任何与ARN相关的内容。因为我使用的是DynamoDB流,而不是Kinesis流,所以我也不确定如何找到我的流名称。
在这一点上,我不确定我在发布的AWS示例中遗漏了什么,看起来他们可能使用了更旧版本的KCL。我使用的是amazon kinesis客户端的1.7.0版。
只是想回答问题所在-您提供ARN时,它只需要流名称。
或者,您可以使用com。亚马逊。服务。发电机BV2。流线捕捉器。StreamsWorker
而不是com。亚马逊。服务。运动。客户图书馆。自由党。工人Worker
内部使用AmazondynamodStreamAdapterClient
。
即。
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
getClass.getName, //DynamoDB shard lease table name
streamArn, //pulled from the dynamo table at runtime
dynamoCredentials, //DefaultAWSCredentialsProviderChain
KeywordTrackingActor.NAME //Lease owner name
).withMaxRecords(1000) //using AWS recommended value
.withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
val worker = new com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker(recordProcessorFactory, kinesisConfig)
这个问题实际上超出了我的kinisclientlibconfiguration
。
我能够通过使用相同的配置并提供DynamoDB流适配器库中包含的流适配器以及DynamoDB和CloudWatch的客户端来解决这个问题。
我的工作解决方案现在看起来是这样的。
定义Kinesis客户端配置。
//Kinesis config for DynamoDB streams
lazy val kinesisConfig :KinesisClientLibConfiguration =
new KinesisClientLibConfiguration(
getClass.getName, //DynamoDB shard lease table name
streamArn, //pulled from the dynamo table at runtime
dynamoCredentials, //DefaultAWSCredentialsProviderChain
KeywordTrackingActor.NAME //Lease owner name
).withMaxRecords(1000) //using AWS recommended value
.withIdleTimeBetweenReadsInMillis(500) //using AWS recommended value
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
定义流适配器和CloudWatch客户端
val streamAdapterClient :AmazonDynamoDBStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoCredentials)
streamAdapterClient.setRegion(region)
val cloudWatchClient :AmazonCloudWatchClient = new AmazonCloudWatchClient(dynamoCredentials)
cloudWatchClient.setRegion(region)
创建一个RecordProcessorFactory
的实例,由您来定义一个类来实现KCL提供的IRecordProcessorFactory
和返回的IRecord处理器
。
val recordProcessorFactory :RecordProcessorFactory = new RecordProcessorFactory(context, keywordActor, config.keywordColumnName)
而我丢失的部分,所有这些都需要提供给你的工人。
val worker :Worker =
new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(kinesisConfig)
.kinesisClient(streamAdapterClient)
.dynamoDBClient(dynamoClient)
.cloudWatchClient(cloudWatchClient)
.build()
//this will start record processing
streamExecutorService.submit(worker)
从这个问题开始,AWS DynamoDB流进入红移 DynamoDB-- 如何配置我的Kinesis函数以获取Lambda函数源? 我创建了一个DynamoDB表(购销),并添加了DynamoDB流。然后,我配置Lambda函数来拾取DynamoDB流。我的问题是如何配置Kinesis以获取Lambda函数源?我知道如何配置Lambda转换,不过我想把它作为源代码。不确定如何配置下面的直接输入源。
我目前正在使用DynamoDB流,并期待着转向Kinesis流,因为我想控制我喜欢从流中处理的记录数量。 我一直在读有关Kinesis流和lambda的文章。有很多关于Kinesis流和EC2的多用户和KCL等的文章。 null
我们希望将数据从DynamoDB非关系型数据库作为流连续移动到红移数据库。我很难理解AWS中的所有新术语/技术。有 1) DynamoDB流 2)AWS Lambda 3) AWS Kinesis消防水带 有人能提供每一个的简要总结吗。什么是DynamoDB流?这与亚马逊运动有何不同?在阅读了所有的资源后,这是我对假设的理解,请在下面进行验证。 (a)我假设DynamoDB Streams,创建非
我是AWS的新手。我已经使用Java在aws中实现了一些功能。我的要求是一次向RDS PostgreSQL实例插入50MB的csv。 我尝试了aws lmabda服务。但是5分钟后,lambda会停止,所以我就这样放弃了。(lambda函数的限制) 第二步,我编写了s3事件的java lambda代码,它将使用putrecord命令将s3上的csv文件读到kinesis流中。根据我的理解,kine
希望创建一个DynamoDB全局表来存储客户信息。我的问题是,我目前的模式是监听此表上的更改,并使用Lambda触发器发送电子邮件更新。 i、 e.您的个人资料信息已更改。如果不是你。。 我现在是否需要在每个区域中使用该Lambda?数据复制是否意味着每个区域都会触发该Lambda?