1. AWS DynamoDB实战之表的创建

司徒元明
2023-12-01

DynamoDB简介

DynamoDB是Amazon提供的一种完全托管的 NoSQL 数据库服务,数据库的软硬件的管理和维护都由Amazon负责,相比传统数据库,免除了用户操作和扩展分布式数据库的管理工作负担,因而无需担心硬件预置、设置和配置、复制、软件修补或集群扩展等问题。DynamoDB具有以下特性:

  • 高性能,可预测的性能,能够实现无缝扩展:自动将表的数据和流量分布到足够多的服务器,以满足吞吐量和存储需求,同时保持一致、快速的性能
  • 高可用性:自动将数据在同一个地区(Region)的不同可用区(Availability Zone)进行复制
  • 支持数据加密
  • 备份功能:支持按需备份和按时间点备份
  • 支持数据过期自动删除(TTL)
  • 支持存储和检索任意量级的数据,并支持任何级别的请求流量

DynamoDB表的创建

DynamoDB表的创建有以下几种方式:

  • AWS Cloud Formation
  • AWS Command Line Interface
  • AWS SDK
  • AWS Console

实际开发过程中,开发环境,测试环境和生产环境都是使用AWS Cloud Formation进行自动化部署,在开发环境会用到AWS Command Line Interface或者AWS Console进行表的创建,几乎没有使用过AWS SDK进行表的创建。AWS SDK主要应用为数据库的CRUD操作。

DynamoDB表的属性

  • TableName:必要,表的名字在同一个地区(Region)内唯一
  • AttributeDefinitions:必要,只需包含表和索引的分区键(Partition/Hash Key)和排序键(RangeKey)属性,其他属性无需在此定义
    • AttributeName: 属性名,值为长度在1~255之间的字符串





    • AttributeType: 属性类型,可取值为:
      • S - 字符串(String)类型
      • N - 数字(Number)类型
      • B - 复杂(Binary)类型
  • KeySchema:必要,指定表的分区键(Partition/Hash Key)和排序键(RangeKey)
    • AttributeName:键的名称,必须在AttributeDefinitions中定义过
    • KeyType:键的类型,可取值为:
      • HASH - 分区键
      • RANGE - 排序键
  • GlobalSecondaryIndexes:非必要,全局索引列表,表创建之后可以添加,最多可创建20个
    • IndexName: 索引名称
    • KeySchema: 索引的分区键(Partition/Hash Key)和排序键(RangeKey)
    • Projection:指定哪些属性需要从表中复制到索引中
      • ProjectionType:类型,可取值为
        • KEYS_ONLY - 只包含表和索引的分区键(Partition/Hash Key)和排序键(RangeKey)
        • INCLUDE - 除了包含表和索引的分区键(Partition/Hash Key)和排序键(RangeKey)之外,还包含NonKeyAttributes指定的非键属性
        • ALL - 所有表属性
      • NonKeyAttributesProjectionType为INCLUDE 时用于指定的非键属性,最多包含20个属性
    • ProvisionedThroughput:指定ReadCapacityUnitsWriteCapacityUnits 
    • ContributorInsightsSpecification:是否启用CloudWatch Contributor Insights 
  • LocalSecondaryIndexes:非必要,局部索引列表,表创建之后不可添加,所以在创建表的时候要考虑好是否需要
    • IndexName: 索引名称
    • KeySchema: 索引的分区键(Partition/Hash Key)和排序键(RangeKey)
    • Projection:指定哪些属性需要从表中复制到索引中
      • ProjectionType:类型,可取值为
        • KEYS_ONLY - 只包含表和索引的分区键(Partition/Hash Key)和排序键(RangeKey)
        • INCLUDE - 除了包含表和索引的分区键(Partition/Hash Key)和排序键(RangeKey)之外,还包含NonKeyAttributes指定的非键属性
        • ALL - 所有表属性
      • NonKeyAttributesProjectionType为INCLUDE 时用于指定的非键属性,最多包含20个属性
  • TimeToLiveSpecification:非必要,设置是否开启TTL功能,并指定TTL属性
    • AttributeName:TTL属性名称
    • Enabled:true/false
  • BillingMode:计费方式,可取值为:
    • PROVISIONED - 预留方式,设置为此方式时必须同时设置ProvisionedThroughput,适用于可预测负载的情况
    • PAY_PER_REQUEST - 按需计费,适用于不可预测负载的情况
  • ProvisionedThroughput:指定ReadCapacityUnitsWriteCapacityUnitsBillingMode为 PROVISIONED时必须设置
  • PointInTimeRecoverySpecification:是否开启按时间点备份
  • SSESpecification:是否开启服务端加密
  • KinesisStreamSpecification:指定Kinesis data stream的ARN
  • StreamSpecification:指定数据修改时有哪些信息需要写入stream
    • StreamViewType: 可取值为:
      • KEYS_ONLY - 只写入键属性
      • NEW_IMAGE - 写入修改后的整条数据
      • OLD_IMAGE - 写入修改前的整条数据
      • NEW_AND_OLD_IMAGES - 修改前后的数据都写入
  • ContributorInsightsSpecification:是否启用CloudWatch Contributor Insights 
  • Tags:tag的列表

使用AWS Cloud Formation创建表

创建serverless.yml文件:

service: jessica-dto
                 
provider:
  name: aws
  stage: develop
  region: ap-southeast-1
  stackName: ${self:provider.stage}-${self:service}

resources:
  Resources:
    TestTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${self:provider.stage}.TestTableCloudFormation
        AttributeDefinitions:
          - AttributeName: pk
            AttributeType: S
          - AttributeName: sk
            AttributeType: S
          - AttributeName: gsiOnePk
            AttributeType: S
          - AttributeName: gsiOneSk
            AttributeType: S
          - AttributeName: gsiTwoPk
            AttributeType: S
          - AttributeName: gsiTwoSk
            AttributeType: S
          - AttributeName: lsiOneSk
            AttributeType: S
          - AttributeName: lsiTwoSk
            AttributeType: S
        KeySchema:
          - AttributeName: pk
            KeyType: HASH
          - AttributeName: sk
            KeyType: RANGE
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        BillingMode: PROVISIONED
        LocalSecondaryIndexes:
          - IndexName: test.lsiOne
            KeySchema:
              - AttributeName: pk
                KeyType: HASH
              - AttributeName: lsiOneSk
                KeyType: RANGE
            Projection:
              ProjectionType: ALL
          - IndexName: test.lsiTwo
            KeySchema:
              - AttributeName: pk
                KeyType: HASH
              - AttributeName: lsiTwoSk
                KeyType: RANGE
            Projection:
              ProjectionType: ALL
        GlobalSecondaryIndexes:
          - IndexName: test.gsiOne
            KeySchema:
              - AttributeName: gsiOnePk
                KeyType: HASH
              - AttributeName: gsiOneSk
                KeyType: RANGE
            Projection:
              ProjectionType: KEYS_ONLY
            ProvisionedThroughput:
              ReadCapacityUnits: 1
              WriteCapacityUnits: 1
          - IndexName: test.gsiTwo
            KeySchema:
              - AttributeName: gsiTwoPk
                KeyType: HASH
              - AttributeName: gsiTwoSk
                KeyType: RANGE
            Projection:
              ProjectionType: KEYS_ONLY
            ProvisionedThroughput:
              ReadCapacityUnits: 1
              WriteCapacityUnits: 1
        TimeToLiveSpecification: 
          AttributeName: ttl
          Enabled: true
        PointInTimeRecoverySpecification:
          PointInTimeRecoveryEnabled: true
        SSESpecification:
          SSEEnabled: false
        ContributorInsightsSpecification:
          Enabled: true
        Tags:
          - Key: product
            Value: ${self:service}

使用sls deploy命令进行部署.

使用AWS Command Line Interface 创建表

执行命令aws dynamodb create-table 包含如下参数:

create-table
--attribute-definitions <value>
--table-name <value>
--key-schema <value>
[--local-secondary-indexes <value>]
[--global-secondary-indexes <value>]
[--billing-mode <value>]
[--provisioned-throughput <value>]
[--stream-specification <value>]
[--sse-specification <value>]
[--tags <value>]
[--cli-input-json <value>]
[--generate-cli-skeleton <value>]

aws cli不支持在创建表的同时设置TimeToLiveSpecification, PointInTimeRecoverySpecification, ContributorInsightsSpecification,只能在表创建之后通过更新来进行设置.

aws dynamodb create-table \
    --table-name develop.TestTableCli \
    --attribute-definitions \
        AttributeName=pk,AttributeType=S \
        AttributeName=sk,AttributeType=S \
        AttributeName=gsiPk,AttributeType=S \
        AttributeName=gsiSk,AttributeType=S \
        AttributeName=gsiTwoPk,AttributeType=S \
        AttributeName=gsiTwoSk,AttributeType=S \
        AttributeName=lsiOneSk,AttributeType=S \
        AttributeName=lsiTwoSk,AttributeType=S \
    --key-schema \
        AttributeName=pk,KeyType=HASH \
        AttributeName=sk,KeyType=RANGE \
    --billing-mode \
        PROVISIONED \
    --provisioned-throughput \
        "{
            \"ReadCapacityUnits\": 5,
            \"WriteCapacityUnits\": 5
        }" \
    --local-secondary-indexes \
        "[
            {
                \"IndexName\": \"test.lsiOne\",
                \"KeySchema\": [{\"AttributeName\":\"pk\",\"KeyType\":\"HASH\"},
                                {\"AttributeName\":\"lsiOneSk\",\"KeyType\":\"RANGE\"}],
                \"Projection\":{
                    \"ProjectionType\":\"ALL\"
                }
            },
            {
                \"IndexName\": \"test.lsiTwo\",
                \"KeySchema\": [{\"AttributeName\":\"pk\",\"KeyType\":\"HASH\"},
                                {\"AttributeName\":\"lsiTwoSk\",\"KeyType\":\"RANGE\"}],
                \"Projection\":{
                    \"ProjectionType\":\"ALL\"
                }
            }
        ]" \
    --global-secondary-indexes \
        "[
            {
                \"IndexName\": \"test.gsi\",
                \"KeySchema\": [{\"AttributeName\":\"gsiPk\",\"KeyType\":\"HASH\"},
                                {\"AttributeName\":\"gsiSk\",\"KeyType\":\"RANGE\"}],
                \"Projection\":{
                    \"ProjectionType\":\"ALL\"
                },
                \"ProvisionedThroughput\": {
                    \"ReadCapacityUnits\": 5,
                    \"WriteCapacityUnits\": 5
                }
            },
            {
                \"IndexName\": \"test.gsiTwo\",
                \"KeySchema\": [{\"AttributeName\":\"gsiTwoPk\",\"KeyType\":\"HASH\"},
                                {\"AttributeName\":\"gsiTwoSk\",\"KeyType\":\"RANGE\"}],
                \"Projection\":{
                    \"ProjectionType\":\"ALL\"
                },
                \"ProvisionedThroughput\": {
                    \"ReadCapacityUnits\": 5,
                    \"WriteCapacityUnits\": 5
                }
            }
        ]" \
    --sse-specification \
        Enabled=false \
    --tags \
        "[
            {
                \"Key\": \"product\",
                \"Value\": \"social\"
            },
            {
                \"Key\": \"Name\",
                \"Value\": \"test.develop\"
            }  
        ]"

设置TimeToLiveSpecification

aws dynamodb update-time-to-live \
    --table-name \
        develop.TestTableCli \
    --time-to-live-specification \
        Enabled=true,AttributeName=ttl

设置PointInTimeRecoverySpecification

aws dynamodb update-continuous-backups \
    --table-name \
        develop.TestTableCli \
    --point-in-time-recovery-specification \
        PointInTimeRecoveryEnabled=true

设置ContributorInsightsSpecification

aws dynamodb update-contributor-insights \
    --table-name \
        develop.TestTableCli \
    --contributor-insights-action ENABLE

使用AWS Java SDK 创建表

aws dynanodb java sdk不支持在创建表的同时设置TimeToLiveSpecification, PointInTimeRecoverySpecification, ContributorInsightsSpecification,只能在表创建之后通过更新来进行设置.

package com.jessica.dynamodb.table.create;

import java.util.ArrayList;
import java.util.List;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.ContributorInsightsAction;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndex;
import com.amazonaws.services.dynamodbv2.model.PointInTimeRecoverySpecification;
import com.amazonaws.services.dynamodbv2.model.Projection;
import com.amazonaws.services.dynamodbv2.model.ProjectionType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.SSESpecification;
import com.amazonaws.services.dynamodbv2.model.Tag;
import com.amazonaws.services.dynamodbv2.model.TimeToLiveSpecification;
import com.amazonaws.services.dynamodbv2.model.UpdateContinuousBackupsRequest;
import com.amazonaws.services.dynamodbv2.model.UpdateContributorInsightsRequest;
import com.amazonaws.services.dynamodbv2.model.UpdateTimeToLiveRequest;
import com.jessica.dynamodb.constant.DynamoDBConstant;

public class CreateTable {
	static AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder.standard().build();

	static String tableName = "develop.TestTableJava";
	
	static DynamoDB dynamoDB = new DynamoDB(dynamoDBClient);

	public static void main(String[] args) throws Exception {
		createTable();
		updateTableTTL();
		updateTableBackeUpPolicy();
		updateTableContributorInsights();
	}

	static void createTable() {
		try {
			// attributes
			List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
			attributeDefinitions
					.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.HASH_KEY).withAttributeType("S"));
			attributeDefinitions.add(
					new AttributeDefinition().withAttributeName(DynamoDBConstant.RANGE_KEY).withAttributeType("S"));
			attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.GSI_ONE_HASH_KEY)
					.withAttributeType("S"));
			attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.GSI_ONE_RANGE_KEY)
					.withAttributeType("S"));
			attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.GSI_TWO_HASH_KEY)
					.withAttributeType("S"));
			attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.GSI_TWO_RANGE_KEY)
					.withAttributeType("S"));
			attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.LSI_ONE_RANGE_KEY)
					.withAttributeType("S"));
			attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.LSI_TWO_RANGE_KEY)
					.withAttributeType("S"));

			// table key schema
			List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
			keySchema
					.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.HASH_KEY).withKeyType(KeyType.HASH)); // partition
																															// key
			keySchema.add(
					new KeySchemaElement().withAttributeName(DynamoDBConstant.RANGE_KEY).withKeyType(KeyType.RANGE)); // range
																														// key

			// provisioned Throughput
			ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(1L)
					.withWriteCapacityUnits(1L);

			// local secondary index
			List<LocalSecondaryIndex> localSecondaryIndexs = new ArrayList<>();
			List<KeySchemaElement> lsiOneKeySchema = new ArrayList<KeySchemaElement>();
			lsiOneKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.HASH_KEY)
					.withKeyType(KeyType.HASH));
			lsiOneKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.LSI_ONE_RANGE_KEY)
					.withKeyType(KeyType.RANGE));
			List<KeySchemaElement> lsiTwoKeySchema = new ArrayList<KeySchemaElement>();
			lsiTwoKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.HASH_KEY)
					.withKeyType(KeyType.HASH));
			lsiTwoKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.LSI_TWO_RANGE_KEY)
					.withKeyType(KeyType.RANGE));
			localSecondaryIndexs.add(new LocalSecondaryIndex().withIndexName(DynamoDBConstant.LSI_ONE_NAME)
					.withKeySchema(lsiOneKeySchema)
					.withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY)));
			localSecondaryIndexs.add(new LocalSecondaryIndex().withIndexName(DynamoDBConstant.LSI_TWO_NAME)
					.withKeySchema(lsiTwoKeySchema)
					.withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY)));

			// global secondary index
			List<GlobalSecondaryIndex> globalSecondaryIndexs = new ArrayList<>();
			List<KeySchemaElement> gsiOneKeySchema = new ArrayList<KeySchemaElement>();
			gsiOneKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.GSI_ONE_HASH_KEY)
					.withKeyType(KeyType.HASH));
			gsiOneKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.GSI_ONE_RANGE_KEY)
					.withKeyType(KeyType.RANGE)); 
			List<KeySchemaElement> gsiTwoKeySchema = new ArrayList<KeySchemaElement>();
			gsiTwoKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.GSI_TWO_HASH_KEY)
					.withKeyType(KeyType.HASH)); 
			gsiTwoKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.GSI_TWO_RANGE_KEY)
					.withKeyType(KeyType.RANGE)); 
			globalSecondaryIndexs.add(new GlobalSecondaryIndex().withIndexName(DynamoDBConstant.GSI_ONE_NAME)
					.withKeySchema(gsiOneKeySchema)
					.withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY))
					.withProvisionedThroughput(provisionedThroughput));
			globalSecondaryIndexs.add(new GlobalSecondaryIndex().withIndexName(DynamoDBConstant.GSI_TWO_NAME)
					.withKeySchema(gsiTwoKeySchema)
					.withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY))
					.withProvisionedThroughput(provisionedThroughput));

			SSESpecification sseSpecification = new SSESpecification().withEnabled(false);

			Tag tag = new Tag().withKey("product").withValue("create-table");

			CreateTableRequest request = new CreateTableRequest()
					.withTableName(tableName)
					.withAttributeDefinitions(attributeDefinitions)
					.withKeySchema(keySchema)
					.withBillingMode(BillingMode.PROVISIONED)
					.withProvisionedThroughput(provisionedThroughput)
					.withLocalSecondaryIndexes(localSecondaryIndexs)
					.withGlobalSecondaryIndexes(globalSecondaryIndexs)
					.withSSESpecification(sseSpecification)
					.withTags(tag);

			System.out.println("Issuing CreateTable request for " + tableName);
			Table table = dynamoDB.createTable(request);
            System.out.println("Waiting for " + tableName + " to be created...this may take a while...");
            table.waitForActive();
            System.out.println("Create " + tableName + " success");	
		} catch (Exception e) {
			System.err.println("CreateTable request failed for " + tableName);
			System.err.println(e.getMessage());
		}

	}

	static void updateTableTTL() {
		TimeToLiveSpecification timeToLiveSpecification = new TimeToLiveSpecification().withAttributeName("ttl")
				.withEnabled(true);

		UpdateTimeToLiveRequest updateTimeToLiveRequest = new UpdateTimeToLiveRequest().withTableName(tableName)
				.withTimeToLiveSpecification(timeToLiveSpecification);
		System.out.println("Update time to live for " + tableName);
		try {
			dynamoDBClient.updateTimeToLive(updateTimeToLiveRequest);
			System.out.println("Waiting for " + tableName + " to update TimeToLive");
			dynamoDB.getTable(tableName).waitForActive();
			System.out.println("Update " + tableName + " TimeToLive success");	
		} catch (Exception e) {
			System.err.println("Update time to live request failed for " + tableName);
			System.err.println(e.getMessage());
		}
	}

	static void updateTableBackeUpPolicy() {
		PointInTimeRecoverySpecification pointInTimeRecoverySpecification = new PointInTimeRecoverySpecification()
				.withPointInTimeRecoveryEnabled(true);
		UpdateContinuousBackupsRequest updateContinuousBackupsRequest = new UpdateContinuousBackupsRequest()
				.withTableName(tableName).withPointInTimeRecoverySpecification(pointInTimeRecoverySpecification);
		try {
			System.out.println("Update table back up policy for " + tableName);
			dynamoDBClient.updateContinuousBackups(updateContinuousBackupsRequest);
			System.out.println("Waiting for " + tableName + " to update ContinuousBackups");
			dynamoDB.getTable(tableName).waitForActive();
			System.out.println("Update " + tableName + " ContinuousBackups success");	
		} catch (Exception e) {
			System.err.println("Update table back up policy failed for " + tableName);
			System.err.println(e.getMessage());
		}
	}

	static void updateTableContributorInsights() {
		UpdateContributorInsightsRequest updateContributorInsightsRequest = new UpdateContributorInsightsRequest()
				.withTableName(tableName).withContributorInsightsAction(ContributorInsightsAction.ENABLE);
		try {
			System.out.println("Update table contributor insights for " + tableName);
			dynamoDBClient.updateContributorInsights(updateContributorInsightsRequest);
			System.out.println("Waiting for " + tableName + " to update ContributorInsights");
			dynamoDB.getTable(tableName).waitForActive();
			System.out.println("Update " + tableName + " ContributorInsights success");	
		} catch (Exception e) {
			System.err.println("Update table contributor insights failed for " + tableName);
			System.err.println(e.getMessage());
		}
	}

}

完整代码

https://github.com/JessicaWin/dynamodb-in-action/tree/create-table

 类似资料: