当前位置: 首页 > 知识库问答 >
问题:

在Apache Flink中使用DynamoDB流

司寇苗宣
2023-03-14

有人尝试在Apache Flink中使用DynamoDB流吗?

Flink有一个Kinesis消费者。但是我正在寻找如何直接使用Dynamo流。

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));

我试了很多次,但什么也没找到。然而,Flink Jira董事会发现一个未决请求。所以我想这个选项还不可用?我有什么选择?

允许FlinkKinesisConsumer适应AWS DynamoDB流

共有1个答案

韩自怡
2023-03-14

更新答案-2019年

FlinkKinesisConsumer连接器现在可以在实现这个JIRA票据后处理DynamoDB流。

更新的答案

Apache Flink似乎没有使用DynamoDB流连接器适配器,因此它可以从Kinesis读取数据,但无法从DynamoDB读取数据。

我认为一个选择是实现一个应用程序,将DynamoDB流中的数据写入Kinesis,然后在Apache Flink中从Kinesis中读取数据并进行处理。

另一种选择是为Apache Flink实现自定义DynamoDB连接器。可以使用现有连接器作为起点。

您还可以查看Apache Spark Kinesis连接器。但它似乎也有同样的问题。

原答案

DynamoDB有一个Kinesis适配器,允许您使用Kinesis客户端库使用DynamoDB更新流。使用Kinesis适配器是使用DynamoDB更新的推荐方法(根据AWS)。这将为您提供与直接使用DynamoDB流(也称为DynamoDB低级API)相同的数据。

 类似资料:
  • 我已经通读了AWS关于分页的文档: 根据他们的文件规定: 在响应中,DynamoDB返回限制值范围内的所有匹配结果。例如,如果发出限制值为6且没有筛选器表达式的查询或扫描请求,DynamoDB将返回表中与请求中指定的键条件匹配的前六项(或者在没有筛选器的扫描情况下仅返回前六项) 这意味着,给定我有一个名为的表,其属性称为(可以接受从到的任何数值),我可能会遇到以下难题: 客户提出请求,思考

  • 我正在使用AWS控制台和NodeJS。 我有一个带有分区键(user\u id)和排序键(company\u id)以及其他属性的dynamodb用户表。 我的一个属性是用户的电子邮件。电子邮件是唯一属性。 我需要通过电子邮件user_id,但我没有他的user_id和company_id。 我认为我应该使用全球二级指数。 我点击了用户表,打开索引选项卡并为该表创建了GSI。(名称:电子邮件,类型

  • 安装(下载 这是Flink的默认配置。 关于这里发生了什么事,有什么建议吗?

  • 我有一个用例可以将begins_with应用于AWS Dynamodb表的主排序键, 我可以从AWS控制台使用begins_with键条件查询表, 我希望使用AWS Javascript SDK实现同样的功能。 我的表中有以下字段- 1。user_id(主分区键) 2。user_relation(主排序键) 我尝试了下面的代码- 错误- 引用- https://docs.aws.amazon.co

  • 我试图在嵌套对象中使用DynamoDB注释,如下所示: 我没有看到上面的属性是在UserAction类中自动生成的。我想知道嵌套对象中是否支持这些注释用法。请建议。

  • 我正在本地环境中使用和。当我尝试使用,其中存储库类型扩展了接口,我得到以下异常: 通用域名格式。亚马逊。服务。dynamodbv2。数据建模。DynamoDBMappingException:类java。util。ArrayList必须使用接口com进行注释。亚马逊。服务。dynamodbv2。数据建模。发电机电缆 是否无法使用CrudePository保存对象列表,如在MongorPositor