当前位置: 首页 > 知识库问答 >
问题:

DynamoDB并行扫描-Java同步

罗智刚
2023-03-14

我正在尝试使用DynamoDB并行扫描示例:

<代码>http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/LowLevelJavaScanning.html

我有200,000个项目,我已经进行了顺序代码扫描,并根据我的使用对其进行了稍微修改:

Map<String, AttributeValue> lastKeyEvaluated = null;
do
{
    ScanRequest scanRequest = new ScanRequest()
    .withTableName(tableName)
    .withExclusiveStartKey(lastKeyEvaluated);

    ScanResult result = client.scan(scanRequest);


    double counter = 0;
    for(Map<String, AttributeValue> item : result.getItems())
    {
        itemSerialize.add("Set:"+counter);
        for (Map.Entry<String, AttributeValue> getItem : item.entrySet()) 
        {
            String attributeName = getItem.getKey();
            AttributeValue value = getItem.getValue();

            itemSerialize.add(attributeName
                    + (value.getS() == null ? "" : ":" + value.getS())
                    + (value.getN() == null ? "" : ":" + value.getN())
                    + (value.getB() == null ? "" : ":" + value.getB())
                    + (value.getSS() == null ? "" : ":" + value.getSS())
                    + (value.getNS() == null ? "" : ":" + value.getNS())
                    + (value.getBS() == null ? "" : ":" + value.getBS()));
        }
        counter += 1;
    }

    lastKeyEvaluated = result.getLastEvaluatedKey();
}
while(lastKeyEvaluated != null);

当这个代码完成时,计数器给出的正好是200000,然而,我还想尝试并行扫描。

函数调用:

ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();
try
{
    ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
    int totalSegments = numberOfThreads;

    for (int segment = 0; segment < totalSegments; segment++) 
    {
        // Runnable task that will only scan one segment
        task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment, list);

        // Execute the task
        executor.execute(task);
    }
    shutDownExecutorService(executor);
}
.......Catches something if error
return list;

类:

我有一个静态列表,数据与所有线程共享。我能够检索列表并输出数据量。

// Runnable task for scanning a single segment of a DynamoDB table
private static class ScanSegmentTask implements Runnable 
{

    // DynamoDB table to scan
    private String tableName;

    // number of items each scan request should return
    private int itemLimit;

    // Total number of segments
    // Equals to total number of threads scanning the table in parallel
    private int totalSegments;

    // Segment that will be scanned with by this task
    private int segment;

    static ArrayList<String> list_2;

    Object lock = new Object();

    public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment, ArrayList<String> list) 
    {
        this.tableName = tableName;
        this.itemLimit = itemLimit;
        this.totalSegments = totalSegments;
        this.segment = segment;
        list_2 = list;
    }

    public void run() 
    {
        System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
        Map<String, AttributeValue> exclusiveStartKey = null;
        int totalScannedItemCount = 0;
        int totalScanRequestCount = 0;
        int counter = 0;

        try 
        {
            while(true) 
            {
                ScanRequest scanRequest = new ScanRequest()
                    .withTableName(tableName)
                    .withLimit(itemLimit)
                    .withExclusiveStartKey(exclusiveStartKey)
                    .withTotalSegments(totalSegments)
                    .withSegment(segment);

                ScanResult result = client.scan(scanRequest);

                totalScanRequestCount++;
                totalScannedItemCount += result.getScannedCount();

                synchronized(lock)
                {
                    for(Map<String, AttributeValue> item : result.getItems())
                    {
                        list_2.add("Set:"+counter);
                        for (Map.Entry<String, AttributeValue> getItem : item.entrySet()) 
                        {
                            String attributeName = getItem.getKey();
                            AttributeValue value = getItem.getValue();

                            list_2.add(attributeName
                                    + (value.getS() == null ? "" : ":" + value.getS())
                                    + (value.getN() == null ? "" : ":" + value.getN())
                                    + (value.getB() == null ? "" : ":" + value.getB())
                                    + (value.getSS() == null ? "" : ":" + value.getSS())
                                    + (value.getNS() == null ? "" : ":" + value.getNS())
                                    + (value.getBS() == null ? "" : ":" + value.getBS()));
                        }
                        counter += 1;
                    }
                }

                exclusiveStartKey = result.getLastEvaluatedKey();
                if (exclusiveStartKey == null) 
                {
                    break;
                }
            }
        } 
        catch (AmazonServiceException ase) 
        {
            System.err.println(ase.getMessage());
        } 
        finally 
        {
            System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of " + totalSegments + " of " + tableName + " with " + totalScanRequestCount + " scan requests");
        }
    }
}

Executor服务关闭:

public static void shutDownExecutorService(ExecutorService executor) 
{
    executor.shutdown();
    try 
    {
        if (!executor.awaitTermination(10, TimeUnit.SECONDS)) 
        {
            executor.shutdownNow();
        }
    } 
    catch (InterruptedException e) 
    {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

但是,每次运行这段代码时,项目的数量都会发生变化(总共变化60000个左右,每个线程6000个,创建了10个线程)。删除同步也不会更改结果。

同步或Amazon AWS API是否存在错误?

感谢所有

编辑:

新函数调用:

ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();

try
{
    ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
    int totalSegments = numberOfThreads;

    for (int segment = 0; segment < totalSegments; segment++) 
    {
        // Runnable task that will only scan one segment
        task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment);

        // Execute the task
        Future<ArrayList<String>> future = executor.submit(task);

        list.addAll(future.get());
    }
    shutDownExecutorService(executor);
}

新类别:

// Runnable task for scanning a single segment of a DynamoDB table
private static class ScanSegmentTask implements Callable<ArrayList<String>>
{

    // DynamoDB table to scan
    private String tableName;

    // number of items each scan request should return
    private int itemLimit;

    // Total number of segments
    // Equals to total number of threads scanning the table in parallel
    private int totalSegments;

    // Segment that will be scanned with by this task
    private int segment;

    ArrayList<String> list_2 = new ArrayList<String>();

    static int counter = 0;

    public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment)
    {
        this.tableName = tableName;
        this.itemLimit = itemLimit;
        this.totalSegments = totalSegments;
        this.segment = segment;
    }

    @SuppressWarnings("finally")
    public ArrayList<String> call() 
    {
        System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
        Map<String, AttributeValue> exclusiveStartKey = null;

        try 
        {
            while(true) 
            {
                ScanRequest scanRequest = new ScanRequest()
                    .withTableName(tableName)
                    .withLimit(itemLimit)
                    .withExclusiveStartKey(exclusiveStartKey)
                    .withTotalSegments(totalSegments)
                    .withSegment(segment);

                ScanResult result = client.scan(scanRequest);

                for(Map<String, AttributeValue> item : result.getItems())
                {
                    list_2.add("Set:"+counter);
                    for (Map.Entry<String, AttributeValue> getItem : item.entrySet()) 
                    {
                        String attributeName = getItem.getKey();
                        AttributeValue value = getItem.getValue();

                        list_2.add(attributeName
                                + (value.getS() == null ? "" : ":" + value.getS())
                                + (value.getN() == null ? "" : ":" + value.getN())
                                + (value.getB() == null ? "" : ":" + value.getB())
                                + (value.getSS() == null ? "" : ":" + value.getSS())
                                + (value.getNS() == null ? "" : ":" + value.getNS())
                                + (value.getBS() == null ? "" : ":" + value.getBS()));
                    }
                    counter += 1;
                }

                exclusiveStartKey = result.getLastEvaluatedKey();
                if (exclusiveStartKey == null) 
                {
                    break;
                }
            }
        } 
        catch (AmazonServiceException ase) 
        {
            System.err.println(ase.getMessage());
        } 
        finally 
        {
            return list_2;
        }
    }
}

最终编辑:

函数调用:

ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();
ArrayList<Future<ArrayList<String>>> holdFuture = new ArrayList<Future<ArrayList<String>>>();

try
{
    ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
    int totalSegments = numberOfThreads;

    for (int segment = 0; segment < totalSegments; segment++) 
    {
        // Runnable task that will only scan one segment
        task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment);

        // Execute the task
        Future<ArrayList<String>> future = executor.submit(task);
        holdFuture.add(future);
    }

    for (int i = 0 ; i < holdFuture.size(); i++)
    {
        boolean flag = false;
        while(flag == false)
        {
            Thread.sleep(1000);
            if(holdFuture.get(i).isDone())
            {
                list.addAll(holdFuture.get(i).get());
                flag = true;
            }
        }
    }
    shutDownExecutorService(executor);
}

类:私有静态类ScanSegmentTask实现可调用

    // DynamoDB table to scan
    private String tableName;

    // number of items each scan request should return
    private int itemLimit;

    // Total number of segments
    // Equals to total number of threads scanning the table in parallel
    private int totalSegments;

    // Segment that will be scanned with by this task
    private int segment;

    ArrayList<String> list_2 = new ArrayList<String>();

    static AtomicInteger counter = new AtomicInteger(0);

    public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment)
    {
        this.tableName = tableName;
        this.itemLimit = itemLimit;
        this.totalSegments = totalSegments;
        this.segment = segment;
    }

    @SuppressWarnings("finally")
    public ArrayList<String> call() 
    {
        System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
        Map<String, AttributeValue> exclusiveStartKey = null;

        try 
        {
            while(true) 
            {
                ScanRequest scanRequest = new ScanRequest()
                    .withTableName(tableName)
                    .withLimit(itemLimit)
                    .withExclusiveStartKey(exclusiveStartKey)
                    .withTotalSegments(totalSegments)
                    .withSegment(segment);

                ScanResult result = client.scan(scanRequest);

                for(Map<String, AttributeValue> item : result.getItems())
                {
                    list_2.add("Set:"+counter);
                    for (Map.Entry<String, AttributeValue> getItem : item.entrySet()) 
                    {
                        String attributeName = getItem.getKey();
                        AttributeValue value = getItem.getValue();

                        list_2.add(attributeName
                                + (value.getS() == null ? "" : ":" + value.getS())
                                + (value.getN() == null ? "" : ":" + value.getN())
                                + (value.getB() == null ? "" : ":" + value.getB())
                                + (value.getSS() == null ? "" : ":" + value.getSS())
                                + (value.getNS() == null ? "" : ":" + value.getNS())
                                + (value.getBS() == null ? "" : ":" + value.getBS()));
                    }
                    counter.addAndGet(1);
                }

                exclusiveStartKey = result.getLastEvaluatedKey();
                if (exclusiveStartKey == null) 
                {
                    break;
                }
            }
        } 
        catch (AmazonServiceException ase) 
        {
            System.err.println(ase.getMessage());
        } 
        finally 
        {
            return list_2;
        }
    }
}

共有1个答案

东方涛
2023-03-14

好的,我相信问题在于你同步的方式。

在您的情况下,您的锁几乎毫无意义,因为每个线程都有自己的锁,因此同步从来不会阻止一个线程运行同一段代码。我认为,这就是删除同步不会改变结果的原因,因为它从一开始就不会产生影响。

我相信您的问题实际上是由于静态ArrayList

正如我之前所说,虽然您确实有一个同步的块,但它实际上什么都没有做。您可以在list\u 2上进行同步,但所要做的就是有效地使所有线程按顺序运行,因为只有在其中一个线程完成后才会释放ArrayList上的锁。

有几种解决方案。您可以使用集合。synchronizedList(list\u 2)为您的数组列表创建一个同步包装。这样,添加到列表中肯定会成功。然而,这会导致每个操作的同步成本,因此并不理想。

我要做的实际上是让ScanSegmentTask实现可调用的

为什么这很重要?我认为对你来说最好的结果是:

  1. 将实例变量初始化为空列表
  2. 让每个线程都像您所做的那样添加到此列表中
  3. 完成后返回列表2
  4. 连接每个结果的数组列表

这样,您就没有需要处理的同步开销了!

这将需要对执行器代码进行一些更改。您需要调用submit()而不是调用execute()。这将返回一个未来对象(Future

要检索结果,只需在未来对象的集合中循环并调用get()(我想)。此调用将一直阻止,直到与未来对象对应的线程完成。

我想就这样了。虽然这更复杂,但我认为这是您将获得的最佳性能,因为如果线程足够多,CPU争用或您的网络链接将成为瓶颈。如果您有任何问题,请询问,我会根据需要更新。

 类似资料:
  • 这是我的用例: 我有一个带有200k对象的JSON Api。数据集看起来有点像这样:日期、自行车型号、以分钟为单位的生产时间。我使用Lambda从JSON Api中读取并通过超文本传输协议请求在DynamoDB中写入。Lambda函数每天运行并使用最新数据更新DynamoDB。 然后,我按日期检索数据,因为我想计算每天的平均生产时间,并将其放在第二个表中。Alexa技能连接到第二个表,并读取每天的

  • 我正在尝试使用Java aws sdk版本1.11.140使用限制为1的DynamoDBScanExpression 即使我使用. with Limit(1)即。 返回所有条目的列表,即7。我做错什么了吗? P. S.我尝试使用cli进行查询 返回我只有1个结果。

  • 我们有一个设置,其中各种工作节点执行计算并更新DynamoDB表中的相对状态。该表充当工作节点活动的一种历史记录。看门狗节点需要定期扫描表,并构建一个表示工作节点及其作业的当前状态的对象。因此,我们的应用程序能够扫描表并按时间顺序检索数据(即按时间戳排序)是很重要的。表最终会太大,无法扫描到本地内存进行后期排序,所以我们扫描后无法排序。 从AWS留档读取主键: DynamoDB使用分区键值作为内部

  • 我对DynamoDB的性能有问题,我想澄清一下我有点困惑的地方。 执行扫描表中的100条记录,条件是使用(例如)。如果在表中找到20条记录,DynamoDB是否还扫描其他80条记录? 扫描时分页是如何工作的? 消耗超过分配的RCU和WCU的后果是什么?

  • 我有一个用例,我必须在Dynamo DB中返回表的所有元素。 假设我的表有一个分区键(列X),所有行中的值都相同,比如“monitor”和排序键(列Y),元素不同。 以下方法的执行时间是否会有任何差异,还是相同? 扫描整张桌子。 基于具有“监视器”的分区键查询数据。

  • 我正在使用DynamoDB,并通过将JSON传递给它来存储文档,所有这些都在Java中使用DynamoDBMapper类。 将数据放入表中已经足够简单了。还可以查询表中是否有可用的Hash或Range值。 但是我想用JSON文档对一个值进行扫描(我猜是这样)。我一直在四处寻找例子,但我找不到任何例子,或者至少在使用DynamoDBMapper方法做事时找不到。 那么我认为这是可以做到的,对吗?如果