我正在尝试使用DynamoDB并行扫描示例:
<代码>http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/LowLevelJavaScanning.html
我有200,000个项目,我已经进行了顺序代码扫描,并根据我的使用对其进行了稍微修改:
Map<String, AttributeValue> lastKeyEvaluated = null;
do
{
ScanRequest scanRequest = new ScanRequest()
.withTableName(tableName)
.withExclusiveStartKey(lastKeyEvaluated);
ScanResult result = client.scan(scanRequest);
double counter = 0;
for(Map<String, AttributeValue> item : result.getItems())
{
itemSerialize.add("Set:"+counter);
for (Map.Entry<String, AttributeValue> getItem : item.entrySet())
{
String attributeName = getItem.getKey();
AttributeValue value = getItem.getValue();
itemSerialize.add(attributeName
+ (value.getS() == null ? "" : ":" + value.getS())
+ (value.getN() == null ? "" : ":" + value.getN())
+ (value.getB() == null ? "" : ":" + value.getB())
+ (value.getSS() == null ? "" : ":" + value.getSS())
+ (value.getNS() == null ? "" : ":" + value.getNS())
+ (value.getBS() == null ? "" : ":" + value.getBS()));
}
counter += 1;
}
lastKeyEvaluated = result.getLastEvaluatedKey();
}
while(lastKeyEvaluated != null);
当这个代码完成时,计数器给出的正好是200000,然而,我还想尝试并行扫描。
函数调用:
ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();
try
{
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
int totalSegments = numberOfThreads;
for (int segment = 0; segment < totalSegments; segment++)
{
// Runnable task that will only scan one segment
task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment, list);
// Execute the task
executor.execute(task);
}
shutDownExecutorService(executor);
}
.......Catches something if error
return list;
类:
我有一个静态列表,数据与所有线程共享。我能够检索列表并输出数据量。
// Runnable task for scanning a single segment of a DynamoDB table
private static class ScanSegmentTask implements Runnable
{
// DynamoDB table to scan
private String tableName;
// number of items each scan request should return
private int itemLimit;
// Total number of segments
// Equals to total number of threads scanning the table in parallel
private int totalSegments;
// Segment that will be scanned with by this task
private int segment;
static ArrayList<String> list_2;
Object lock = new Object();
public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment, ArrayList<String> list)
{
this.tableName = tableName;
this.itemLimit = itemLimit;
this.totalSegments = totalSegments;
this.segment = segment;
list_2 = list;
}
public void run()
{
System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
Map<String, AttributeValue> exclusiveStartKey = null;
int totalScannedItemCount = 0;
int totalScanRequestCount = 0;
int counter = 0;
try
{
while(true)
{
ScanRequest scanRequest = new ScanRequest()
.withTableName(tableName)
.withLimit(itemLimit)
.withExclusiveStartKey(exclusiveStartKey)
.withTotalSegments(totalSegments)
.withSegment(segment);
ScanResult result = client.scan(scanRequest);
totalScanRequestCount++;
totalScannedItemCount += result.getScannedCount();
synchronized(lock)
{
for(Map<String, AttributeValue> item : result.getItems())
{
list_2.add("Set:"+counter);
for (Map.Entry<String, AttributeValue> getItem : item.entrySet())
{
String attributeName = getItem.getKey();
AttributeValue value = getItem.getValue();
list_2.add(attributeName
+ (value.getS() == null ? "" : ":" + value.getS())
+ (value.getN() == null ? "" : ":" + value.getN())
+ (value.getB() == null ? "" : ":" + value.getB())
+ (value.getSS() == null ? "" : ":" + value.getSS())
+ (value.getNS() == null ? "" : ":" + value.getNS())
+ (value.getBS() == null ? "" : ":" + value.getBS()));
}
counter += 1;
}
}
exclusiveStartKey = result.getLastEvaluatedKey();
if (exclusiveStartKey == null)
{
break;
}
}
}
catch (AmazonServiceException ase)
{
System.err.println(ase.getMessage());
}
finally
{
System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of " + totalSegments + " of " + tableName + " with " + totalScanRequestCount + " scan requests");
}
}
}
Executor服务关闭:
public static void shutDownExecutorService(ExecutorService executor)
{
executor.shutdown();
try
{
if (!executor.awaitTermination(10, TimeUnit.SECONDS))
{
executor.shutdownNow();
}
}
catch (InterruptedException e)
{
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
但是,每次运行这段代码时,项目的数量都会发生变化(总共变化60000个左右,每个线程6000个,创建了10个线程)。删除同步也不会更改结果。
同步或Amazon AWS API是否存在错误?
感谢所有
编辑:
新函数调用:
ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();
try
{
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
int totalSegments = numberOfThreads;
for (int segment = 0; segment < totalSegments; segment++)
{
// Runnable task that will only scan one segment
task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment);
// Execute the task
Future<ArrayList<String>> future = executor.submit(task);
list.addAll(future.get());
}
shutDownExecutorService(executor);
}
新类别:
// Runnable task for scanning a single segment of a DynamoDB table
private static class ScanSegmentTask implements Callable<ArrayList<String>>
{
// DynamoDB table to scan
private String tableName;
// number of items each scan request should return
private int itemLimit;
// Total number of segments
// Equals to total number of threads scanning the table in parallel
private int totalSegments;
// Segment that will be scanned with by this task
private int segment;
ArrayList<String> list_2 = new ArrayList<String>();
static int counter = 0;
public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment)
{
this.tableName = tableName;
this.itemLimit = itemLimit;
this.totalSegments = totalSegments;
this.segment = segment;
}
@SuppressWarnings("finally")
public ArrayList<String> call()
{
System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
Map<String, AttributeValue> exclusiveStartKey = null;
try
{
while(true)
{
ScanRequest scanRequest = new ScanRequest()
.withTableName(tableName)
.withLimit(itemLimit)
.withExclusiveStartKey(exclusiveStartKey)
.withTotalSegments(totalSegments)
.withSegment(segment);
ScanResult result = client.scan(scanRequest);
for(Map<String, AttributeValue> item : result.getItems())
{
list_2.add("Set:"+counter);
for (Map.Entry<String, AttributeValue> getItem : item.entrySet())
{
String attributeName = getItem.getKey();
AttributeValue value = getItem.getValue();
list_2.add(attributeName
+ (value.getS() == null ? "" : ":" + value.getS())
+ (value.getN() == null ? "" : ":" + value.getN())
+ (value.getB() == null ? "" : ":" + value.getB())
+ (value.getSS() == null ? "" : ":" + value.getSS())
+ (value.getNS() == null ? "" : ":" + value.getNS())
+ (value.getBS() == null ? "" : ":" + value.getBS()));
}
counter += 1;
}
exclusiveStartKey = result.getLastEvaluatedKey();
if (exclusiveStartKey == null)
{
break;
}
}
}
catch (AmazonServiceException ase)
{
System.err.println(ase.getMessage());
}
finally
{
return list_2;
}
}
}
最终编辑:
函数调用:
ScanSegmentTask task = null;
ArrayList<String> list = new ArrayList<String>();
ArrayList<Future<ArrayList<String>>> holdFuture = new ArrayList<Future<ArrayList<String>>>();
try
{
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
int totalSegments = numberOfThreads;
for (int segment = 0; segment < totalSegments; segment++)
{
// Runnable task that will only scan one segment
task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment);
// Execute the task
Future<ArrayList<String>> future = executor.submit(task);
holdFuture.add(future);
}
for (int i = 0 ; i < holdFuture.size(); i++)
{
boolean flag = false;
while(flag == false)
{
Thread.sleep(1000);
if(holdFuture.get(i).isDone())
{
list.addAll(holdFuture.get(i).get());
flag = true;
}
}
}
shutDownExecutorService(executor);
}
类:私有静态类ScanSegmentTask实现可调用
// DynamoDB table to scan
private String tableName;
// number of items each scan request should return
private int itemLimit;
// Total number of segments
// Equals to total number of threads scanning the table in parallel
private int totalSegments;
// Segment that will be scanned with by this task
private int segment;
ArrayList<String> list_2 = new ArrayList<String>();
static AtomicInteger counter = new AtomicInteger(0);
public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment)
{
this.tableName = tableName;
this.itemLimit = itemLimit;
this.totalSegments = totalSegments;
this.segment = segment;
}
@SuppressWarnings("finally")
public ArrayList<String> call()
{
System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time...");
Map<String, AttributeValue> exclusiveStartKey = null;
try
{
while(true)
{
ScanRequest scanRequest = new ScanRequest()
.withTableName(tableName)
.withLimit(itemLimit)
.withExclusiveStartKey(exclusiveStartKey)
.withTotalSegments(totalSegments)
.withSegment(segment);
ScanResult result = client.scan(scanRequest);
for(Map<String, AttributeValue> item : result.getItems())
{
list_2.add("Set:"+counter);
for (Map.Entry<String, AttributeValue> getItem : item.entrySet())
{
String attributeName = getItem.getKey();
AttributeValue value = getItem.getValue();
list_2.add(attributeName
+ (value.getS() == null ? "" : ":" + value.getS())
+ (value.getN() == null ? "" : ":" + value.getN())
+ (value.getB() == null ? "" : ":" + value.getB())
+ (value.getSS() == null ? "" : ":" + value.getSS())
+ (value.getNS() == null ? "" : ":" + value.getNS())
+ (value.getBS() == null ? "" : ":" + value.getBS()));
}
counter.addAndGet(1);
}
exclusiveStartKey = result.getLastEvaluatedKey();
if (exclusiveStartKey == null)
{
break;
}
}
}
catch (AmazonServiceException ase)
{
System.err.println(ase.getMessage());
}
finally
{
return list_2;
}
}
}
好的,我相信问题在于你同步的方式。
在您的情况下,您的锁几乎毫无意义,因为每个线程都有自己的锁,因此同步从来不会阻止一个线程运行同一段代码。我认为,这就是删除同步不会改变结果的原因,因为它从一开始就不会产生影响。
我相信您的问题实际上是由于静态ArrayList
正如我之前所说,虽然您确实有一个同步的块,但它实际上什么都没有做。您可以在list\u 2上进行同步,但所要做的就是有效地使所有线程按顺序运行,因为只有在其中一个线程完成后才会释放ArrayList上的锁。
有几种解决方案。您可以使用集合。synchronizedList(list\u 2)
为您的数组列表创建一个同步包装。这样,添加到列表中肯定会成功。然而,这会导致每个操作的同步成本,因此并不理想。
我要做的实际上是让ScanSegmentTask实现可调用的
为什么这很重要?我认为对你来说最好的结果是:
这样,您就没有需要处理的同步开销了!
这将需要对执行器代码进行一些更改。您需要调用submit()而不是调用execute()。这将返回一个未来对象(Future
要检索结果,只需在未来对象的集合中循环并调用get()(我想)。此调用将一直阻止,直到与未来对象对应的线程完成。
我想就这样了。虽然这更复杂,但我认为这是您将获得的最佳性能,因为如果线程足够多,CPU争用或您的网络链接将成为瓶颈。如果您有任何问题,请询问,我会根据需要更新。
这是我的用例: 我有一个带有200k对象的JSON Api。数据集看起来有点像这样:日期、自行车型号、以分钟为单位的生产时间。我使用Lambda从JSON Api中读取并通过超文本传输协议请求在DynamoDB中写入。Lambda函数每天运行并使用最新数据更新DynamoDB。 然后,我按日期检索数据,因为我想计算每天的平均生产时间,并将其放在第二个表中。Alexa技能连接到第二个表,并读取每天的
我正在尝试使用Java aws sdk版本1.11.140使用限制为1的DynamoDBScanExpression 即使我使用. with Limit(1)即。 返回所有条目的列表,即7。我做错什么了吗? P. S.我尝试使用cli进行查询 返回我只有1个结果。
我们有一个设置,其中各种工作节点执行计算并更新DynamoDB表中的相对状态。该表充当工作节点活动的一种历史记录。看门狗节点需要定期扫描表,并构建一个表示工作节点及其作业的当前状态的对象。因此,我们的应用程序能够扫描表并按时间顺序检索数据(即按时间戳排序)是很重要的。表最终会太大,无法扫描到本地内存进行后期排序,所以我们扫描后无法排序。 从AWS留档读取主键: DynamoDB使用分区键值作为内部
我对DynamoDB的性能有问题,我想澄清一下我有点困惑的地方。 执行扫描表中的100条记录,条件是使用(例如)。如果在表中找到20条记录,DynamoDB是否还扫描其他80条记录? 扫描时分页是如何工作的? 消耗超过分配的RCU和WCU的后果是什么?
我有一个用例,我必须在Dynamo DB中返回表的所有元素。 假设我的表有一个分区键(列X),所有行中的值都相同,比如“monitor”和排序键(列Y),元素不同。 以下方法的执行时间是否会有任何差异,还是相同? 扫描整张桌子。 基于具有“监视器”的分区键查询数据。
我正在使用DynamoDB,并通过将JSON传递给它来存储文档,所有这些都在Java中使用DynamoDBMapper类。 将数据放入表中已经足够简单了。还可以查询表中是否有可用的Hash或Range值。 但是我想用JSON文档对一个值进行扫描(我猜是这样)。我一直在四处寻找例子,但我找不到任何例子,或者至少在使用DynamoDBMapper方法做事时找不到。 那么我认为这是可以做到的,对吗?如果