我对将大型集合插入cassandra数据库的最快方法有点困惑。我了解到我不应该使用批量插入,因为它是为原子性而创建的。甚至Cassandra也给了我一个信息,让我使用异步写来提高性能。我使用了没有“batch”关键字的最快插入代码:
var cluster = Cluster.Builder()
.AddContactPoint(“127.0.0.1")
.Build();
var session = cluster.Connect();
//Save off the prepared statement you’re going to use
var statement = session.Prepare (“INSERT INTO tester.users (userID, firstName, lastName) VALUES (?,?,?)”);
var tasks = new List<Task>();
for (int i = 0; i < 1000; i++)
{
//please bind with whatever actually useful data you’re importing
var bind = statement.Bind (i, “John”, “Tester”);
var resultSetFuture = session.ExecuteAsync (bind);
tasks.Add (resultSetFuture);
}
Task.WaitAll(tasks.ToArray());
cluster.Shutdown();
出发地:https://medium.com/@fondev/cassandra-batch-loading-non-the-batch-keyword-40f00e35e23e
但它仍然比我使用的批处理选项慢得多。我当前的代码如下所示:
IList<Movie> moviesList = Movie.CreateMoviesCollectionForCassandra(collectionEntriesNumber);
var preparedStatements = new List<PreparedStatement>();
foreach (var statement in preparedStatements)
{
statement.SetConsistencyLevel(ConsistencyLevel.One);
}
var statementBinding = new BatchStatement();
statementBinding.SetBatchType(BatchType.Unlogged);
for (int i = 0; i < collectionEntriesNumber; i++)
{
preparedStatements.Add(Session.Prepare("INSERT INTO Movies (id, title, description, year, genres, rating, originallanguage, productioncountry, votingsnumber, director) VALUES (?,?,?,?,?,?,?,?,?,?)"));
}
for (int i = 0; i < collectionEntriesNumber; i++)
{
statementBinding.Add(preparedStatements[i].Bind(moviesList[i].Id, moviesList[i].Title,
moviesList[i].Description, moviesList[i].Year, moviesList[i].Genres, moviesList[i].Rating,
moviesList[i].OriginalLanguage, moviesList[i].ProductionCountry, moviesList[i].VotingsNumber,
new Director(moviesList[0].Director.Id, moviesList[i].Director.Firstname,
moviesList[i].Director.Lastname, moviesList[i].Director.Age)));
}
watch.Start();
Session.ExecuteAsync(statementBinding);
watch.Stop();
它确实工作得更快,但我只能插入大约2500个准备好的语句,仅此而已,我想测量大约100000个对象插入的时间。
我的代码正确吗?也许我应该增加插入门槛?请解释我如何正确地做这件事。
请记住,您应该准备一次,然后重用相同的准备语句
来绑定到不同的参数。
如果您的目标是同一个分区,您可以使用小批量,如果不是,您应该使用单个请求。
使用单个请求时,您可以并行计划执行,并使用信号量限制未完成的请求量。
像这样的东西:
public async Task<long> Execute(
IStatement[] statements, int parallelism, int maxOutstandingRequests)
{
var semaphore = new SemaphoreSlim(maxOutstandingRequests);
var tasks = new Task<RowSet>[statements.Length];
var chunkSize = statements.Length / parallelism;
if (chunkSize == 0)
{
chunkSize = 1;
}
var statementLength = statements.Length;
var launchTasks = new Task[parallelism + 1];
var watch = new Stopwatch();
watch.Start();
for (var i = 0; i < parallelism + 1; i++)
{
var startIndex = i * chunkSize;
//start to launch in parallel
launchTasks[i] = Task.Run(async () =>
{
for (var j = 0; j < chunkSize; j++)
{
var index = startIndex + j;
if (index >= statementLength)
{
break;
}
await semaphore.WaitAsync();
var t = _session.ExecuteAsync(statements[index]);
tasks[index] = t;
var rs = await t;
semaphore.Release();
}
});
}
await Task.WhenAll(launchTasks);
await Task.WhenAll(tasks);
watch.Stop();
return watch.ElapsedMilliseconds;
}
有人可以帮助我了解如何使用POJO类插入卡桑德拉UDT数据吗? 我创建了一个POJO类来映射Cassandra的表,并为Cassandra UDT创建了另一个类,但是当我插入映射Cassandra表的主POJO类时,它无法识别另一个POJO类(映射Cassandra的UDT)。我还在每个类和每个类对象上编写了注释。 这是我的一个POJO类:- 另一个POJO类:-
我使用的是spring数据cassandra,需要使用jpa映射一个字段,在cassandra中,该字段的类型为
我目前在cassandra中有一个名为macrecord的表,类似于以下内容: 在这种情况下,我想不出其他解决方案,只有在macadd值重复的情况下删除整行,然后插入具有更新时间戳的新行。 是否有更好的解决方案在macadd值重复时更新时间戳,或者在我的原始表中只有macadd是主键的范围内查询时间戳值的替代方法。
我用Spring Data Cassandra 2.2.1开发了一个新的应用程序,想在Cassandra 2.1.9服务器上运行它(旧的,我知道)。但是我们得到了错误 Spring数据卡桑德拉手册声称Spring数据2.2.1至少需要卡桑德拉2.1,所以这应该有效,但它没有。我们包含的唯一特定于卡桑德拉的依赖项是 我怎样才能让这个工作?
我有一个必须添加到采购订单的产品列表。采购订单具有序列号,添加产品后,应更改其状态以指示这些产品已出货。 1 个采购订单中处理的典型产品数为 500。 在DB上-我有2张桌子- 各方面的建议告诉我应该使用多个异步查询。然而,我关心的是整个操作的原子性。鉴于我的要求,请建议什么是最好的前进方式。 先谢谢你。
我是卡桑德拉的新手。我的流程工作者正试图插入到卡桑德拉数据库中。几个小时后,我看不到插入的任何进展。 我的调试日志在卡桑德拉的节点上说了以下内容: WARN [SharedPool-Worker-48] 2016-10-07 00:59:04,025 批次语句.java:287 - [my_database.my_table] 的预准备语句批次大小为 14264,超出指定的阈值 5120 x 91